Commit 2d6a8fcb authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Folly zerocopy improvements

Summary: Folly zerocopy improvements

Reviewed By: djwatson

Differential Revision: D10324653

fbshipit-source-id: dacb2941e08eaf2e7588c04a01800e297a0643cf
parent a242bf16
...@@ -898,12 +898,25 @@ bool AsyncSocket::setZeroCopy(bool enable) { ...@@ -898,12 +898,25 @@ bool AsyncSocket::setZeroCopy(bool enable) {
return false; return false;
} }
void AsyncSocket::setZeroCopyReenableThreshold(size_t threshold) {
zeroCopyReenableThreshold_ = threshold;
}
bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) { bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)); return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
} }
void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) { void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {
if (!zeroCopyEnabled_) { if (!zeroCopyEnabled_) {
// if the zeroCopyReenableCounter_ is > 0
// we try to dec and if we reach 0
// we set zeroCopyEnabled_ to true
if (zeroCopyReenableCounter_) {
if (0 == --zeroCopyReenableCounter_) {
zeroCopyEnabled_ = true;
return;
}
}
flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY); flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
} }
} }
...@@ -2402,6 +2415,15 @@ AsyncSocket::WriteResult AsyncSocket::performWrite( ...@@ -2402,6 +2415,15 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags); auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
auto totalWritten = writeResult.writeReturn; auto totalWritten = writeResult.writeReturn;
if (totalWritten < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
// workaround for running with zerocopy enabled but without a big enough
// memlock value - see ulimit -l
zeroCopyEnabled_ = false;
zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
writeResult = sendSocketMessage(fd_, &msg, msg_flags);
totalWritten = writeResult.writeReturn;
}
if (totalWritten < 0) { if (totalWritten < 0) {
bool tryAgain = (errno == EAGAIN); bool tryAgain = (errno == EAGAIN);
#ifdef __APPLE__ #ifdef __APPLE__
...@@ -2412,13 +2434,6 @@ AsyncSocket::WriteResult AsyncSocket::performWrite( ...@@ -2412,13 +2434,6 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
tryAgain |= (errno == ENOTCONN); tryAgain |= (errno == ENOTCONN);
#endif #endif
// workaround for running with zerocopy enabled but without a proper
// memlock value - see ulimit -l
if (zeroCopyEnabled_ && (errno == ENOBUFS)) {
tryAgain = true;
zeroCopyEnabled_ = false;
}
if (!writeResult.exception && tryAgain) { if (!writeResult.exception && tryAgain) {
// TCP buffer is full; we can't write any more data right now. // TCP buffer is full; we can't write any more data right now.
*countWritten = 0; *countWritten = 0;
......
...@@ -509,6 +509,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -509,6 +509,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
return zeroCopyBufId_; return zeroCopyBufId_;
} }
size_t getZeroCopyReenableThreshold() const {
return zeroCopyReenableThreshold_;
}
void setZeroCopyReenableThreshold(size_t threshold);
void write( void write(
WriteCallback* callback, WriteCallback* callback,
const void* buf, const void* buf,
...@@ -1268,6 +1274,9 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1268,6 +1274,9 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
bool trackEor_{false}; bool trackEor_{false};
bool zeroCopyEnabled_{false}; bool zeroCopyEnabled_{false};
bool zeroCopyVal_{false}; bool zeroCopyVal_{false};
// zerocopy reenable logic
size_t zeroCopyReenableThreshold_{0};
size_t zeroCopyReenableCounter_{0};
// subclasses may cache these on first call to get // subclasses may cache these on first call to get
mutable std::unique_ptr<const AsyncTransportCertificate> peerCertData_{ mutable std::unique_ptr<const AsyncTransportCertificate> peerCertData_{
......
...@@ -19,14 +19,24 @@ ...@@ -19,14 +19,24 @@
namespace folly { namespace folly {
// ZeroCopyTest // ZeroCopyTest
ZeroCopyTest::ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize) ZeroCopyTest::ZeroCopyTest(
: numLoops_(numLoops), size_t numClients,
int numLoops,
bool zeroCopy,
size_t bufferSize)
: numClients_(numClients),
counter_(numClients),
numLoops_(numLoops),
zeroCopy_(zeroCopy), zeroCopy_(zeroCopy),
bufferSize_(bufferSize), bufferSize_(bufferSize),
client_(
new ZeroCopyTestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
listenSock_(new folly::AsyncServerSocket(&evb_)), listenSock_(new folly::AsyncServerSocket(&evb_)),
server_(&evb_, numLoops_, bufferSize_, zeroCopy) { server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
clients_.reserve(numClients_);
for (size_t i = 0; i < numClients_; i++) {
clients_.emplace_back(std::make_unique<ZeroCopyTestAsyncSocket>(
&counter_, &evb_, numLoops_, bufferSize_, zeroCopy));
}
if (listenSock_) { if (listenSock_) {
server_.addCallbackToServerSocket(*listenSock_); server_.addCallbackToServerSocket(*listenSock_);
} }
...@@ -40,13 +50,19 @@ bool ZeroCopyTest::run() { ...@@ -40,13 +50,19 @@ bool ZeroCopyTest::run() {
listenSock_->listen(10); listenSock_->listen(10);
listenSock_->startAccepting(); listenSock_->startAccepting();
connectOne(); connectAll();
} }
}); });
evb_.loopForever(); evb_.loopForever();
return !client_->isZeroCopyWriteInProgress(); for (auto& client : clients_) {
if (client->isZeroCopyWriteInProgress()) {
return false;
}
}
return true;
} }
} // namespace folly } // namespace folly
...@@ -28,11 +28,13 @@ namespace folly { ...@@ -28,11 +28,13 @@ namespace folly {
class ZeroCopyTestAsyncSocket { class ZeroCopyTestAsyncSocket {
public: public:
explicit ZeroCopyTestAsyncSocket( explicit ZeroCopyTestAsyncSocket(
size_t* counter,
folly::EventBase* evb, folly::EventBase* evb,
int numLoops, int numLoops,
size_t bufferSize, size_t bufferSize,
bool zeroCopy) bool zeroCopy)
: evb_(evb), : counter_(counter),
evb_(evb),
numLoops_(numLoops), numLoops_(numLoops),
sock_(new folly::AsyncSocket(evb)), sock_(new folly::AsyncSocket(evb)),
callback_(this), callback_(this),
...@@ -42,12 +44,14 @@ class ZeroCopyTestAsyncSocket { ...@@ -42,12 +44,14 @@ class ZeroCopyTestAsyncSocket {
} }
explicit ZeroCopyTestAsyncSocket( explicit ZeroCopyTestAsyncSocket(
size_t* counter,
folly::EventBase* evb, folly::EventBase* evb,
int fd, int fd,
int numLoops, int numLoops,
size_t bufferSize, size_t bufferSize,
bool zeroCopy) bool zeroCopy)
: evb_(evb), : counter_(counter),
evb_(evb),
numLoops_(numLoops), numLoops_(numLoops),
sock_(new folly::AsyncSocket(evb, fd)), sock_(new folly::AsyncSocket(evb, fd)),
callback_(this), callback_(this),
...@@ -152,7 +156,12 @@ class ZeroCopyTestAsyncSocket { ...@@ -152,7 +156,12 @@ class ZeroCopyTestAsyncSocket {
currLoop_++; currLoop_++;
if (client_ && currLoop_ >= numLoops_) { if (client_ && currLoop_ >= numLoops_) {
evb_->runInLoop( evb_->runInLoop(
[this] { evb_->terminateLoopSoon(); }, false /*thisIteration*/); [this] {
if (counter_ && (0 == --(*counter_))) {
evb_->terminateLoopSoon();
}
},
false /*thisIteration*/);
return; return;
} }
writeBuffer(); writeBuffer();
...@@ -160,9 +169,11 @@ class ZeroCopyTestAsyncSocket { ...@@ -160,9 +169,11 @@ class ZeroCopyTestAsyncSocket {
void onDataFinish(folly::exception_wrapper) { void onDataFinish(folly::exception_wrapper) {
if (client_) { if (client_) {
if (counter_ && (0 == --(*counter_))) {
evb_->terminateLoopSoon(); evb_->terminateLoopSoon();
} }
} }
}
bool writeBuffer() { bool writeBuffer() {
// use calloc to make sure the memory is touched // use calloc to make sure the memory is touched
...@@ -182,6 +193,7 @@ class ZeroCopyTestAsyncSocket { ...@@ -182,6 +193,7 @@ class ZeroCopyTestAsyncSocket {
return true; return true;
} }
size_t* counter_{nullptr};
folly::EventBase* evb_; folly::EventBase* evb_;
int numLoops_{0}; int numLoops_{0};
int currLoop_{0}; int currLoop_{0};
...@@ -218,7 +230,7 @@ class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback { ...@@ -218,7 +230,7 @@ class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback {
int fd, int fd,
const folly::SocketAddress& /* unused */) noexcept override { const folly::SocketAddress& /* unused */) noexcept override {
auto client = std::make_shared<ZeroCopyTestAsyncSocket>( auto client = std::make_shared<ZeroCopyTestAsyncSocket>(
evb_, fd, numLoops_, bufferSize_, zeroCopy_); nullptr, evb_, fd, numLoops_, bufferSize_, zeroCopy_);
clients_[client.get()] = client; clients_[client.get()] = client;
} }
...@@ -238,21 +250,29 @@ class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback { ...@@ -238,21 +250,29 @@ class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback {
class ZeroCopyTest { class ZeroCopyTest {
public: public:
explicit ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize); explicit ZeroCopyTest(
size_t numClients,
int numLoops,
bool zeroCopy,
size_t bufferSize);
bool run(); bool run();
private: private:
void connectOne() { void connectAll() {
SocketAddress addr = listenSock_->getAddress(); SocketAddress addr = listenSock_->getAddress();
client_->connect(addr); for (auto& client : clients_) {
client->connect(addr);
}
} }
size_t numClients_;
size_t counter_;
int numLoops_; int numLoops_;
bool zeroCopy_; bool zeroCopy_;
size_t bufferSize_; size_t bufferSize_;
EventBase evb_; EventBase evb_;
std::unique_ptr<ZeroCopyTestAsyncSocket> client_; std::vector<std::unique_ptr<ZeroCopyTestAsyncSocket>> clients_;
folly::AsyncServerSocket::UniquePtr listenSock_; folly::AsyncServerSocket::UniquePtr listenSock_;
ZeroCopyTestServer server_; ZeroCopyTestServer server_;
}; };
......
...@@ -30,9 +30,10 @@ void runClient( ...@@ -30,9 +30,10 @@ void runClient(
<< " numLoops = " << numLoops << " zeroCopy = " << zeroCopy << " numLoops = " << numLoops << " zeroCopy = " << zeroCopy
<< " bufferSize = " << bufferSize; << " bufferSize = " << bufferSize;
size_t counter = 1;
EventBase evb; EventBase evb;
std::unique_ptr<ZeroCopyTestAsyncSocket> client( std::unique_ptr<ZeroCopyTestAsyncSocket> client(new ZeroCopyTestAsyncSocket(
new ZeroCopyTestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy)); &counter, &evb, numLoops, bufferSize, zeroCopy));
SocketAddress addr(host, port); SocketAddress addr(host, port);
evb.runInEventBaseThread([&]() { client->connect(addr); }); evb.runInEventBaseThread([&]() { client->connect(addr); });
...@@ -63,14 +64,30 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) { ...@@ -63,14 +64,30 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
static auto constexpr kMaxLoops = 20000; static auto constexpr kMaxLoops = 20000;
void zeroCopyOn(unsigned /* unused */, size_t bufferSize) { void zeroCopyOn(unsigned iters, size_t bufferSize, size_t numClients = 1) {
ZeroCopyTest test(kMaxLoops, true, bufferSize); BenchmarkSuspender susp;
ZeroCopyTest test(numClients, iters, true, bufferSize);
susp.dismiss();
test.run(); test.run();
susp.rehire();
} }
void zeroCopyOff(unsigned /* unused */, size_t bufferSize) { void zeroCopyOff(unsigned iters, size_t bufferSize, size_t numClients = 1) {
ZeroCopyTest test(kMaxLoops, false, bufferSize); BenchmarkSuspender susp;
ZeroCopyTest test(numClients, iters, false, bufferSize);
susp.dismiss();
test.run(); test.run();
susp.rehire();
}
static auto constexpr kNumClients = 40;
void zeroCopyOnMulti(unsigned iters, size_t bufferSize) {
zeroCopyOn(iters, bufferSize, kNumClients);
}
void zeroCopyOffMulti(unsigned iters, size_t bufferSize) {
zeroCopyOff(iters, bufferSize, kNumClients);
} }
BENCHMARK_PARAM(zeroCopyOn, 4096) BENCHMARK_PARAM(zeroCopyOn, 4096)
...@@ -100,6 +117,9 @@ BENCHMARK_DRAW_LINE(); ...@@ -100,6 +117,9 @@ BENCHMARK_DRAW_LINE();
BENCHMARK_PARAM(zeroCopyOn, 1048576) BENCHMARK_PARAM(zeroCopyOn, 1048576)
BENCHMARK_PARAM(zeroCopyOff, 1048576) BENCHMARK_PARAM(zeroCopyOff, 1048576)
BENCHMARK_DRAW_LINE(); BENCHMARK_DRAW_LINE();
BENCHMARK_PARAM(zeroCopyOnMulti, 1048576)
BENCHMARK_PARAM(zeroCopyOffMulti, 1048576)
BENCHMARK_DRAW_LINE();
DEFINE_bool(client, false, "client mode"); DEFINE_bool(client, false, "client mode");
DEFINE_bool(server, false, "server mode"); DEFINE_bool(server, false, "server mode");
......
...@@ -24,6 +24,6 @@ static auto constexpr kMaxLoops = 20; ...@@ -24,6 +24,6 @@ static auto constexpr kMaxLoops = 20;
static auto constexpr kBufferSize = 4096; static auto constexpr kBufferSize = 4096;
TEST(ZeroCopyTest, zero_copy_in_progress) { TEST(ZeroCopyTest, zero_copy_in_progress) {
ZeroCopyTest test(kMaxLoops, true, kBufferSize); ZeroCopyTest test(1, kMaxLoops, true, kBufferSize);
CHECK(test.run()); CHECK(test.run());
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment