Commit 8f4003a8 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Disable zerocopy if we're notified about deferred copies, add a...

Disable zerocopy if we're notified about deferred copies, add a isZeroCopyWriteInProgress method, replace pair with a proper struct

Summary: Add zeroWriteDone callback

Reviewed By: djwatson

Differential Revision: D6097129

fbshipit-source-id: b82a942557680c3a7a3be8f81ee6f2886e99e165
parent 0e8c7e1c
...@@ -114,16 +114,16 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { ...@@ -114,16 +114,16 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
if (bytesWritten_) { if (bytesWritten_) {
if (socket_->isZeroCopyRequest(writeFlags)) { if (socket_->isZeroCopyRequest(writeFlags)) {
if (isComplete()) { if (isComplete()) {
socket_->addZeroCopyBuff(std::move(ioBuf_)); socket_->addZeroCopyBuf(std::move(ioBuf_));
} else { } else {
socket_->addZeroCopyBuff(ioBuf_.get()); socket_->addZeroCopyBuf(ioBuf_.get());
} }
} else { } else {
// this happens if at least one of the prev requests were sent // this happens if at least one of the prev requests were sent
// with zero copy but not the last one // with zero copy but not the last one
if (isComplete() && socket_->getZeroCopy() && if (isComplete() && socket_->getZeroCopy() &&
socket_->containsZeroCopyBuff(ioBuf_.get())) { socket_->containsZeroCopyBuf(ioBuf_.get())) {
socket_->setZeroCopyBuff(std::move(ioBuf_)); socket_->setZeroCopyBuf(std::move(ioBuf_));
} }
} }
} }
...@@ -891,46 +891,45 @@ void AsyncSocket::adjustZeroCopyFlags( ...@@ -891,46 +891,45 @@ void AsyncSocket::adjustZeroCopyFlags(
} }
} }
void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) { void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
uint32_t id = getNextZeroCopyBuffId(); uint32_t id = getNextZeroCopyBuffId();
folly::IOBuf* ptr = buf.get(); folly::IOBuf* ptr = buf.get();
idZeroCopyBufPtrMap_[id] = ptr; idZeroCopyBufPtrMap_[id] = ptr;
auto& p = idZeroCopyBufPtrToBufMap_[ptr]; auto& p = idZeroCopyBufInfoMap_[ptr];
p.first++; p.count_++;
CHECK(p.second.get() == nullptr); CHECK(p.buf_.get() == nullptr);
p.second = std::move(buf); p.buf_ = std::move(buf);
} }
void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) { void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
uint32_t id = getNextZeroCopyBuffId(); uint32_t id = getNextZeroCopyBuffId();
idZeroCopyBufPtrMap_[id] = ptr; idZeroCopyBufPtrMap_[id] = ptr;
idZeroCopyBufPtrToBufMap_[ptr].first++; idZeroCopyBufInfoMap_[ptr].count_++;
} }
void AsyncSocket::releaseZeroCopyBuff(uint32_t id) { void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
auto iter = idZeroCopyBufPtrMap_.find(id); auto iter = idZeroCopyBufPtrMap_.find(id);
CHECK(iter != idZeroCopyBufPtrMap_.end()); CHECK(iter != idZeroCopyBufPtrMap_.end());
auto ptr = iter->second; auto ptr = iter->second;
auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr); auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end()); CHECK(iter1 != idZeroCopyBufInfoMap_.end());
if (0 == --iter1->second.first) { if (0 == --iter1->second.count_) {
idZeroCopyBufPtrToBufMap_.erase(iter1); idZeroCopyBufInfoMap_.erase(iter1);
} }
} }
void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) { void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
folly::IOBuf* ptr = buf.get(); folly::IOBuf* ptr = buf.get();
auto& p = idZeroCopyBufPtrToBufMap_[ptr]; auto& p = idZeroCopyBufInfoMap_[ptr];
CHECK(p.second.get() == nullptr); CHECK(p.buf_.get() == nullptr);
p.second = std::move(buf); p.buf_ = std::move(buf);
} }
bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) { bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
return ( return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
} }
bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const { bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
...@@ -953,9 +952,16 @@ void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) { ...@@ -953,9 +952,16 @@ void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg)); reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
uint32_t hi = serr->ee_data; uint32_t hi = serr->ee_data;
uint32_t lo = serr->ee_info; uint32_t lo = serr->ee_info;
// disable zero copy if the buffer was actually copied
if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
<< "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
<< "on " << fd_;
zeroCopyEnabled_ = false;
}
for (uint32_t i = lo; i <= hi; i++) { for (uint32_t i = lo; i <= hi; i++) {
releaseZeroCopyBuff(i); releaseZeroCopyBuf(i);
} }
#endif #endif
} }
...@@ -1052,7 +1058,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, ...@@ -1052,7 +1058,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
} else if (countWritten == count) { } else if (countWritten == count) {
// done, add the whole buffer // done, add the whole buffer
if (isZeroCopyRequest(flags)) { if (isZeroCopyRequest(flags)) {
addZeroCopyBuff(std::move(ioBuf)); addZeroCopyBuf(std::move(ioBuf));
} }
// We successfully wrote everything. // We successfully wrote everything.
// Invoke the callback and return. // Invoke the callback and return.
...@@ -1063,7 +1069,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, ...@@ -1063,7 +1069,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
} else { // continue writing the next writeReq } else { // continue writing the next writeReq
// add just the ptr // add just the ptr
if (isZeroCopyRequest(flags)) { if (isZeroCopyRequest(flags)) {
addZeroCopyBuff(ioBuf.get()); addZeroCopyBuf(ioBuf.get());
} }
if (bufferCallback_) { if (bufferCallback_) {
bufferCallback_->onEgressBuffered(); bufferCallback_->onEgressBuffered();
...@@ -1509,6 +1515,11 @@ void AsyncSocket::cachePeerAddress() const { ...@@ -1509,6 +1515,11 @@ void AsyncSocket::cachePeerAddress() const {
} }
} }
bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
eventBase_->dcheckIsInEventBaseThread();
return (!idZeroCopyBufPtrMap_.empty());
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
cacheLocalAddress(); cacheLocalAddress();
*address = localAddr_; *address = localAddr_;
......
...@@ -808,6 +808,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -808,6 +808,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
*/ */
void cacheAddresses(); void cacheAddresses();
/**
* Returns true if there is any zero copy write in progress
* Needs to be called from within the socket's EVB thread
*/
bool isZeroCopyWriteInProgress() const noexcept;
/** /**
* writeReturn is the total number of bytes written, or WRITE_ERROR on error. * writeReturn is the total number of bytes written, or WRITE_ERROR on error.
* If no data has been written, 0 is returned. * If no data has been written, 0 is returned.
...@@ -1157,22 +1163,25 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1157,22 +1163,25 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
const iovec* vec, const iovec* vec,
uint32_t count, uint32_t count,
folly::WriteFlags& flags); folly::WriteFlags& flags);
void addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf); void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
void addZeroCopyBuff(folly::IOBuf* ptr); void addZeroCopyBuf(folly::IOBuf* ptr);
void setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf); void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
bool containsZeroCopyBuff(folly::IOBuf* ptr); bool containsZeroCopyBuf(folly::IOBuf* ptr);
void releaseZeroCopyBuff(uint32_t id); void releaseZeroCopyBuf(uint32_t id);
// a folly::IOBuf can be used in multiple partial requests // a folly::IOBuf can be used in multiple partial requests
// so we keep a map that maps a buffer id to a raw folly::IOBuf ptr // there is a that maps a buffer id to a raw folly::IOBuf ptr
// and one more map that adds a ref count for a folly::IOBuf that is either // and another one that adds a ref count for a folly::IOBuf that is either
// the original ptr or nullptr // the original ptr or nullptr
uint32_t zeroCopyBuffId_{0}; uint32_t zeroCopyBuffId_{0};
struct IOBufInfo {
uint32_t count_{0};
std::unique_ptr<folly::IOBuf> buf_;
};
std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_; std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
std::unordered_map< std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
folly::IOBuf*,
std::pair<uint32_t, std::unique_ptr<folly::IOBuf>>>
idZeroCopyBufPtrToBufMap_;
StateEnum state_; ///< StateEnum describing current state StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
......
...@@ -35,6 +35,10 @@ ...@@ -35,6 +35,10 @@
#define SO_EE_ORIGIN_ZEROCOPY 5 #define SO_EE_ORIGIN_ZEROCOPY 5
#endif #endif
#ifndef SO_EE_CODE_ZEROCOPY_COPIED
#define SO_EE_CODE_ZEROCOPY_COPIED 1
#endif
#ifndef SO_ZEROCOPY #ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 60 #define SO_ZEROCOPY 60
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment