Commit d2154674 authored by Giuseppe Ottaviano's avatar Giuseppe Ottaviano Committed by Facebook GitHub Bot

Account for allocated bytes in setEgressBufferBackpressureThreshold()

Summary:
The intention of `setEgressBufferBackpressureThreshold()` is to bound memory usage, but it accounts for write size instead. The two can have very large discrepancy, because Thrift allocates large `IOBuf`s when serializing a message: the minimum allocation size is 16KiB, but in Thrift streaming the messages can be much smaller, which can lead to under-accounting by orders of magnitude.

This diff changes the definition to account for allocated bytes.

Differential Revision: D31927400

fbshipit-source-id: 1172f91648b3b21d7d4076d51ec9f569b0e0ba22
parent dc171587
......@@ -161,9 +161,7 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
}
void destroy() override {
if (ioBuf_ && releaseIOBufCallback_) {
releaseIOBufCallback_->releaseIOBuf(std::move(ioBuf_));
}
socket_->releaseIOBuf(std::move(ioBuf_), releaseIOBufCallback_);
this->~BytesWriteRequest();
free(this);
}
......@@ -217,9 +215,7 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
for (uint32_t i = opsWritten_; i != 0; --i) {
assert(ioBuf_);
auto next = ioBuf_->pop();
if (releaseIOBufCallback_) {
releaseIOBufCallback_->releaseIOBuf(std::move(ioBuf_));
}
socket_->releaseIOBuf(std::move(ioBuf_), releaseIOBufCallback_);
ioBuf_ = std::move(next);
}
}
......@@ -650,6 +646,7 @@ AsyncSocket::~AsyncSocket() {
for (const auto& cb : lifecycleObservers_) {
cb->destroy(this);
}
DCHECK_EQ(allocatedBytesBuffered_, 0);
}
void AsyncSocket::destroy() {
......@@ -1280,9 +1277,7 @@ void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
CHECK(iter1 != idZeroCopyBufInfoMap_.end());
if (0 == --iter1->second.count_) {
if (iter1->second.cb_) {
iter1->second.cb_->releaseIOBuf(std::move(iter1->second.buf_));
}
releaseIOBuf(std::move(iter1->second.buf_), iter1->second.cb_);
idZeroCopyBufInfoMap_.erase(iter1);
}
......@@ -1339,6 +1334,19 @@ void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
#endif
}
void AsyncSocket::releaseIOBuf(
std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback) {
if (!buf) {
return;
}
const size_t allocated = buf->computeChainCapacity();
DCHECK_GE(allocatedBytesBuffered_, allocated);
allocatedBytesBuffered_ -= allocated;
if (callback) {
callback->releaseIOBuf(std::move(buf));
}
}
void AsyncSocket::enableByteEvents() {
if (!byteEventHelper_) {
byteEventHelper_ = std::make_unique<ByteEventHelper>();
......@@ -1517,13 +1525,12 @@ void AsyncSocket::writeImpl(
auto* releaseIOBufCallback =
callback ? callback->getReleaseIOBufCallback() : nullptr;
SCOPE_EXIT {
if (ioBuf && releaseIOBufCallback) {
releaseIOBufCallback->releaseIOBuf(std::move(ioBuf));
}
};
SCOPE_EXIT { releaseIOBuf(std::move(ioBuf), releaseIOBufCallback); };
totalAppBytesScheduledForWrite_ += totalBytes;
if (ioBuf) {
allocatedBytesBuffered_ += ioBuf->computeChainCapacity();
}
if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
// No new writes may be performed after the write side of the socket has
......@@ -1567,11 +1574,7 @@ void AsyncSocket::writeImpl(
if (countWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(std::move(ioBuf), releaseIOBufCallback);
} else {
if (releaseIOBufCallback) {
releaseIOBufCallback->releaseIOBuf(std::move(ioBuf));
} else {
ioBuf.reset();
}
releaseIOBuf(std::move(ioBuf), releaseIOBufCallback);
}
// We successfully wrote everything.
......
......@@ -734,6 +734,10 @@ class AsyncSocket : public AsyncTransport {
}
size_t getRawBytesBuffered() const override { return getAppBytesBuffered(); }
size_t getAllocatedBytesBuffered() const override {
return allocatedBytesBuffered_;
}
// End of methods inherited from AsyncTransport
std::chrono::nanoseconds getConnectTime() const {
......@@ -1510,6 +1514,9 @@ class AsyncSocket : public AsyncTransport {
bool containsZeroCopyBuf(folly::IOBuf* ptr);
void releaseZeroCopyBuf(uint32_t id);
void releaseIOBuf(
std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback);
/**
* Attempt to enable Observer ByteEvents for this socket.
*
......@@ -1581,6 +1588,8 @@ class AsyncSocket : public AsyncTransport {
// The total num of bytes passed to AsyncSocket's write functions. It doesn't
// include failed writes, but it does include buffered writes.
size_t totalAppBytesScheduledForWrite_;
// Num of bytes allocated in IOBufs pending write.
size_t allocatedBytesBuffered_{0};
// Lifecycle observers.
//
......
......@@ -741,6 +741,7 @@ class AsyncTransport : public DelayedDestruction,
*/
virtual size_t getAppBytesBuffered() const { return 0; }
virtual size_t getRawBytesBuffered() const { return 0; }
virtual size_t getAllocatedBytesBuffered() const { return 0; }
/**
* Callback class to signal changes in the transport's internal buffers.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment