Commit 4a46ffa2 authored by Yang Chi's avatar Yang Chi Committed by facebook-github-bot-1

Add a per-socket buffer callback

Summary: this is way simpler than D2623385 + D2709121. There will be a followup diff to clean the existing per-write call BufferCallback. The new one is on per-socket basis, much straightforward. I will only setup this in HTTPUpstreamSession.

Reviewed By: afrind

Differential Revision: D2723493

fb-gh-sync-id: 6b1c21a719281b9693330b6a4074f7149d7c342a
parent 76fcf389
......@@ -688,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
callback->writeSuccess();
}
return;
} // else { continue writing the next writeReq }
} else { // continue writing the next writeReq
if (bufferCallback_) {
bufferCallback_->onEgressBuffered();
}
}
mustRegister = true;
}
} else if (!connecting()) {
......@@ -1505,6 +1509,9 @@ void AsyncSocket::handleWrite() noexcept {
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
if (bufferCallback_) {
bufferCallback_->onEgressBuffered();
}
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
......@@ -1528,6 +1535,9 @@ void AsyncSocket::handleWrite() noexcept {
return;
}
}
if (!writeReqHead_ && bufferCallback_) {
bufferCallback_->onEgressBufferCleared();
}
}
void AsyncSocket::checkForImmediateRead() noexcept {
......@@ -2049,4 +2059,8 @@ std::string AsyncSocket::withAddr(const std::string& s) {
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
void AsyncSocket::setBufferCallback(BufferCallback* cb) {
bufferCallback_ = cb;
}
} // folly
......@@ -510,6 +510,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
ERROR
};
void setBufferCallback(BufferCallback* cb);
/**
* A WriteRequest object tracks information about a pending write operation.
*/
......@@ -813,6 +815,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
bool persistentCork_{false};
// Whether we've applied the TCP_CORK option to the socket
bool corked_{false};
BufferCallback* bufferCallback_{nullptr};
};
......
......@@ -332,6 +332,13 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
virtual size_t getAppBytesReceived() const = 0;
virtual size_t getRawBytesReceived() const = 0;
class BufferCallback {
public:
virtual ~BufferCallback() {}
virtual void onEgressBuffered() = 0;
virtual void onEgressBufferCleared() = 0;
};
protected:
virtual ~AsyncTransport() = default;
};
......
......@@ -185,6 +185,23 @@ class ReadCallback : public AsyncTransportWrapper::ReadCallback {
const size_t maxBufferSz;
};
class BufferCallback : public AsyncTransport::BufferCallback {
public:
BufferCallback() : buffered_(false), bufferCleared_(false) {}
void onEgressBuffered() override { buffered_ = true; }
void onEgressBufferCleared() override { bufferCleared_ = true; }
bool hasBuffered() const { return buffered_; }
bool hasBufferCleared() const { return bufferCleared_; }
private:
bool buffered_{false};
bool bufferCleared_{false};
};
class ReadVerifier {
};
......
......@@ -2238,3 +2238,36 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
eventBase.loop();
}
/**
* Test AsyncTransport::BufferCallback
*/
TEST(AsyncSocketTest, BufferTest) {
TestServer server;
EventBase evb;
AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30, option);
char buf[100 * 1024];
memset(buf, 'c', sizeof(buf));
WriteCallback wcb;
BufferCallback bcb;
socket->setBufferCallback(&bcb);
socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
evb.loop();
CHECK_EQ(ccb.state, STATE_SUCCEEDED);
CHECK_EQ(wcb.state, STATE_SUCCEEDED);
ASSERT_TRUE(bcb.hasBuffered());
ASSERT_TRUE(bcb.hasBufferCleared());
socket->close();
server.verifyConnection(buf, sizeof(buf));
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment