Commit fbc4c238 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Add SO_ZEROCOPY support

Summary: Add SO_ZEROCOPY support

Reviewed By: djwatson

Differential Revision: D5851637

fbshipit-source-id: 5378b7e44ce9d888ae08527506218998974d4309
parent 662b86a9
...@@ -1646,7 +1646,8 @@ int AsyncSSLSocket::bioWrite(BIO* b, const char* in, int inl) { ...@@ -1646,7 +1646,8 @@ int AsyncSSLSocket::bioWrite(BIO* b, const char* in, int inl) {
flags |= WriteFlags::CORK; flags |= WriteFlags::CORK;
} }
int msg_flags = tsslSock->getSendMsgParamsCB()->getFlags(flags); int msg_flags = tsslSock->getSendMsgParamsCB()->getFlags(
flags, false /*zeroCopyEnabled*/);
msg.msg_controllen = msg.msg_controllen =
tsslSock->getSendMsgParamsCB()->getAncillaryDataSize(flags); tsslSock->getSendMsgParamsCB()->getAncillaryDataSize(flags);
CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize, CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
......
...@@ -39,6 +39,13 @@ namespace fsp = folly::portability::sockets; ...@@ -39,6 +39,13 @@ namespace fsp = folly::portability::sockets;
namespace folly { namespace folly {
static constexpr bool msgErrQueueSupported =
#ifdef MSG_ERRQUEUE
true;
#else
false;
#endif // MSG_ERRQUEUE
const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue; const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
...@@ -331,6 +338,18 @@ void AsyncServerSocket::bindSocket( ...@@ -331,6 +338,18 @@ void AsyncServerSocket::bindSocket(
} }
} }
bool AsyncServerSocket::setZeroCopy(bool enable) {
if (msgErrQueueSupported) {
int fd = getSocket();
int val = enable ? 1 : 0;
int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
return (0 == ret);
}
return false;
}
void AsyncServerSocket::bind(const SocketAddress& address) { void AsyncServerSocket::bind(const SocketAddress& address) {
if (eventBase_) { if (eventBase_) {
eventBase_->dcheckIsInEventBaseThread(); eventBase_->dcheckIsInEventBaseThread();
......
...@@ -319,6 +319,11 @@ class AsyncServerSocket : public DelayedDestruction ...@@ -319,6 +319,11 @@ class AsyncServerSocket : public DelayedDestruction
} }
} }
/* enable zerocopy support for the server sockets - the s = accept sockets
* inherit it
*/
bool setZeroCopy(bool enable);
/** /**
* Bind to the specified address. * Bind to the specified address.
* *
......
...@@ -104,9 +104,28 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { ...@@ -104,9 +104,28 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
if (getNext() != nullptr) { if (getNext() != nullptr) {
writeFlags |= WriteFlags::CORK; writeFlags |= WriteFlags::CORK;
} }
socket_->adjustZeroCopyFlags(getOps(), getOpCount(), writeFlags);
auto writeResult = socket_->performWrite( auto writeResult = socket_->performWrite(
getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_); getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0; bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
if (bytesWritten_) {
if (socket_->isZeroCopyRequest(writeFlags)) {
if (isComplete()) {
socket_->addZeroCopyBuff(std::move(ioBuf_));
} else {
socket_->addZeroCopyBuff(ioBuf_.get());
}
} else {
// this happens if at least one of the prev requests were sent
// with zero copy but not the last one
if (isComplete() && socket_->getZeroCopy() &&
socket_->containsZeroCopyBuff(ioBuf_.get())) {
socket_->setZeroCopyBuff(std::move(ioBuf_));
}
}
}
return writeResult; return writeResult;
} }
...@@ -119,11 +138,13 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { ...@@ -119,11 +138,13 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
opIndex_ += opsWritten_; opIndex_ += opsWritten_;
assert(opIndex_ < opCount_); assert(opIndex_ < opCount_);
// If we've finished writing any IOBufs, release them if (!socket_->isZeroCopyRequest(flags_)) {
if (ioBuf_) { // If we've finished writing any IOBufs, release them
for (uint32_t i = opsWritten_; i != 0; --i) { if (ioBuf_) {
assert(ioBuf_); for (uint32_t i = opsWritten_; i != 0; --i) {
ioBuf_ = ioBuf_->pop(); assert(ioBuf_);
ioBuf_ = ioBuf_->pop();
}
} }
} }
...@@ -185,8 +206,9 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { ...@@ -185,8 +206,9 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
struct iovec writeOps_[]; ///< write operation(s) list struct iovec writeOps_[]; ///< write operation(s) list
}; };
int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags) int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
noexcept { folly::WriteFlags flags,
bool zeroCopyEnabled) noexcept {
int msg_flags = MSG_DONTWAIT; int msg_flags = MSG_DONTWAIT;
#ifdef MSG_NOSIGNAL // Linux-only #ifdef MSG_NOSIGNAL // Linux-only
...@@ -205,6 +227,10 @@ int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags) ...@@ -205,6 +227,10 @@ int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags)
msg_flags |= MSG_EOR; msg_flags |= MSG_EOR;
} }
if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
msg_flags |= MSG_ZEROCOPY;
}
return msg_flags; return msg_flags;
} }
...@@ -433,8 +459,11 @@ void AsyncSocket::connect(ConnectCallback* callback, ...@@ -433,8 +459,11 @@ void AsyncSocket::connect(ConnectCallback* callback,
// By default, turn on TCP_NODELAY // By default, turn on TCP_NODELAY
// If setNoDelay() fails, we continue anyway; this isn't a fatal error. // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
// setNoDelay() will log an error message if it fails. // setNoDelay() will log an error message if it fails.
// Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
// is not created
if (address.getFamily() != AF_UNIX) { if (address.getFamily() != AF_UNIX) {
(void)setNoDelay(true); (void)setNoDelay(true);
setZeroCopy(zeroCopyVal_);
} }
VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_ VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
...@@ -772,6 +801,156 @@ AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { ...@@ -772,6 +801,156 @@ AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
return readCallback_; return readCallback_;
} }
bool AsyncSocket::setZeroCopy(bool enable) {
if (msgErrQueueSupported) {
zeroCopyVal_ = enable;
if (fd_ < 0) {
return false;
}
int val = enable ? 1 : 0;
int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
// if enable == false, set zeroCopyEnabled_ = false regardless
// if SO_ZEROCOPY is set or not
if (!enable) {
zeroCopyEnabled_ = enable;
return true;
}
/* if the setsockopt failed, try to see if the socket inherited the flag
* since we cannot set SO_ZEROCOPY on a socket s = accept
*/
if (ret) {
val = 0;
socklen_t optlen = sizeof(val);
ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
if (!ret) {
enable = val ? true : false;
}
}
if (!ret) {
zeroCopyEnabled_ = enable;
return true;
}
}
return false;
}
void AsyncSocket::setZeroCopyWriteChainThreshold(size_t threshold) {
zeroCopyWriteChainThreshold_ = threshold;
}
bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
}
void AsyncSocket::adjustZeroCopyFlags(
folly::IOBuf* buf,
folly::WriteFlags& flags) {
if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_ && buf) {
if (buf->computeChainDataLength() >= zeroCopyWriteChainThreshold_) {
flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
} else {
flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
}
}
}
void AsyncSocket::adjustZeroCopyFlags(
const iovec* vec,
uint32_t count,
folly::WriteFlags& flags) {
if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_) {
count = std::min<uint32_t>(count, kIovMax);
size_t sum = 0;
for (uint32_t i = 0; i < count; ++i) {
const iovec* v = vec + i;
sum += v->iov_len;
}
if (sum >= zeroCopyWriteChainThreshold_) {
flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
} else {
flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
}
}
}
void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
uint32_t id = getNextZeroCopyBuffId();
folly::IOBuf* ptr = buf.get();
idZeroCopyBufPtrMap_[id] = ptr;
auto& p = idZeroCopyBufPtrToBufMap_[ptr];
p.first++;
CHECK(p.second.get() == nullptr);
p.second = std::move(buf);
}
void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) {
uint32_t id = getNextZeroCopyBuffId();
idZeroCopyBufPtrMap_[id] = ptr;
idZeroCopyBufPtrToBufMap_[ptr].first++;
}
void AsyncSocket::releaseZeroCopyBuff(uint32_t id) {
auto iter = idZeroCopyBufPtrMap_.find(id);
CHECK(iter != idZeroCopyBufPtrMap_.end());
auto ptr = iter->second;
auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr);
CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end());
if (0 == --iter1->second.first) {
idZeroCopyBufPtrToBufMap_.erase(iter1);
}
}
void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
folly::IOBuf* ptr = buf.get();
auto& p = idZeroCopyBufPtrToBufMap_[ptr];
CHECK(p.second.get() == nullptr);
p.second = std::move(buf);
}
bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) {
return (
idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
}
bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
#ifdef MSG_ERRQUEUE
if (zeroCopyEnabled_ &&
((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
(cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
const struct sock_extended_err* serr =
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
return (
(serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
}
#endif
return false;
}
void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
#ifdef MSG_ERRQUEUE
const struct sock_extended_err* serr =
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
uint32_t hi = serr->ee_data;
uint32_t lo = serr->ee_info;
for (uint32_t i = lo; i <= hi; i++) {
releaseZeroCopyBuff(i);
}
#endif
}
void AsyncSocket::write(WriteCallback* callback, void AsyncSocket::write(WriteCallback* callback,
const void* buf, size_t bytes, WriteFlags flags) { const void* buf, size_t bytes, WriteFlags flags) {
iovec op; iovec op;
...@@ -789,6 +968,8 @@ void AsyncSocket::writev(WriteCallback* callback, ...@@ -789,6 +968,8 @@ void AsyncSocket::writev(WriteCallback* callback,
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf, void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) { WriteFlags flags) {
adjustZeroCopyFlags(buf.get(), flags);
constexpr size_t kSmallSizeMax = 64; constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements(); size_t count = buf->countChainElements();
if (count <= kSmallSizeMax) { if (count <= kSmallSizeMax) {
...@@ -860,6 +1041,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, ...@@ -860,6 +1041,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
errnoCopy); errnoCopy);
return failWrite(__func__, callback, 0, ex); return failWrite(__func__, callback, 0, ex);
} else if (countWritten == count) { } else if (countWritten == count) {
// done, add the whole buffer
if (isZeroCopyRequest(flags)) {
addZeroCopyBuff(std::move(ioBuf));
}
// We successfully wrote everything. // We successfully wrote everything.
// Invoke the callback and return. // Invoke the callback and return.
if (callback) { if (callback) {
...@@ -867,6 +1052,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, ...@@ -867,6 +1052,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
} }
return; return;
} else { // continue writing the next writeReq } else { // continue writing the next writeReq
// add just the ptr
if (isZeroCopyRequest(flags)) {
addZeroCopyBuff(ioBuf.get());
}
if (bufferCallback_) { if (bufferCallback_) {
bufferCallback_->onEgressBuffered(); bufferCallback_->onEgressBuffered();
} }
...@@ -1545,7 +1734,8 @@ void AsyncSocket::handleErrMessages() noexcept { ...@@ -1545,7 +1734,8 @@ void AsyncSocket::handleErrMessages() noexcept {
// supporting per-socket error queues. // supporting per-socket error queues.
VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_ VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
<< ", state=" << state_; << ", state=" << state_;
if (errMessageCallback_ == nullptr) { if (errMessageCallback_ == nullptr &&
(!zeroCopyEnabled_ || idZeroCopyBufPtrMap_.empty())) {
VLOG(7) << "AsyncSocket::handleErrMessages(): " VLOG(7) << "AsyncSocket::handleErrMessages(): "
<< "no callback installed - exiting."; << "no callback installed - exiting.";
return; return;
...@@ -1587,11 +1777,15 @@ void AsyncSocket::handleErrMessages() noexcept { ...@@ -1587,11 +1777,15 @@ void AsyncSocket::handleErrMessages() noexcept {
} }
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg != nullptr && cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg->cmsg_len != 0 &&
errMessageCallback_ != nullptr;
cmsg = CMSG_NXTHDR(&msg, cmsg)) { cmsg = CMSG_NXTHDR(&msg, cmsg)) {
errMessageCallback_->errMessage(*cmsg); if (isZeroCopyMsg(*cmsg)) {
processZeroCopyMsg(*cmsg);
} else {
if (errMessageCallback_) {
errMessageCallback_->errMessage(*cmsg);
}
}
} }
} }
#endif //MSG_ERRQUEUE #endif //MSG_ERRQUEUE
...@@ -2127,7 +2321,7 @@ AsyncSocket::WriteResult AsyncSocket::performWrite( ...@@ -2127,7 +2321,7 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
} else { } else {
msg.msg_control = nullptr; msg.msg_control = nullptr;
} }
int msg_flags = sendMsgParamCallback_->getFlags(flags); int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags); auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
auto totalWritten = writeResult.writeReturn; auto totalWritten = writeResult.writeReturn;
......
...@@ -156,8 +156,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -156,8 +156,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
* *
* @param flags Write flags requested for the given write operation * @param flags Write flags requested for the given write operation
*/ */
int getFlags(folly::WriteFlags flags) noexcept { int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
return getFlagsImpl(flags, getDefaultFlags(flags)); return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
} }
/** /**
...@@ -211,7 +211,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -211,7 +211,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
* *
* @param flags Write flags requested for the given write operation * @param flags Write flags requested for the given write operation
*/ */
int getDefaultFlags(folly::WriteFlags flags) noexcept; int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
}; };
explicit AsyncSocket(); explicit AsyncSocket();
...@@ -504,6 +504,21 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -504,6 +504,21 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
void setReadCB(ReadCallback* callback) override; void setReadCB(ReadCallback* callback) override;
ReadCallback* getReadCallback() const override; ReadCallback* getReadCallback() const override;
static const size_t kDefaultZeroCopyThreshold = 32768; // 32KB
bool setZeroCopy(bool enable);
bool getZeroCopy() const {
return zeroCopyEnabled_;
}
void setZeroCopyWriteChainThreshold(size_t threshold);
size_t getZeroCopyWriteChainThreshold() const {
return zeroCopyWriteChainThreshold_;
}
bool isZeroCopyMsg(const cmsghdr& cmsg) const;
void processZeroCopyMsg(const cmsghdr& cmsg);
void write(WriteCallback* callback, const void* buf, size_t bytes, void write(WriteCallback* callback, const void* buf, size_t bytes,
WriteFlags flags = WriteFlags::NONE) override; WriteFlags flags = WriteFlags::NONE) override;
void writev(WriteCallback* callback, const iovec* vec, size_t count, void writev(WriteCallback* callback, const iovec* vec, size_t count,
...@@ -1133,6 +1148,32 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1133,6 +1148,32 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
void cacheLocalAddress() const; void cacheLocalAddress() const;
void cachePeerAddress() const; void cachePeerAddress() const;
bool isZeroCopyRequest(WriteFlags flags);
uint32_t getNextZeroCopyBuffId() {
return zeroCopyBuffId_++;
}
void adjustZeroCopyFlags(folly::IOBuf* buf, folly::WriteFlags& flags);
void adjustZeroCopyFlags(
const iovec* vec,
uint32_t count,
folly::WriteFlags& flags);
void addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
void addZeroCopyBuff(folly::IOBuf* ptr);
void setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
bool containsZeroCopyBuff(folly::IOBuf* ptr);
void releaseZeroCopyBuff(uint32_t id);
// a folly::IOBuf can be used in multiple partial requests
// so we keep a map that maps a buffer id to a raw folly::IOBuf ptr
// and one more map that adds a ref count for a folly::IOBuf that is either
// the original ptr or nullptr
uint32_t zeroCopyBuffId_{0};
std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
std::unordered_map<
folly::IOBuf*,
std::pair<uint32_t, std::unique_ptr<folly::IOBuf>>>
idZeroCopyBufPtrToBufMap_;
StateEnum state_; ///< StateEnum describing current state StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
uint16_t eventFlags_; ///< EventBase::HandlerFlags settings uint16_t eventFlags_; ///< EventBase::HandlerFlags settings
...@@ -1149,8 +1190,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1149,8 +1190,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
ConnectCallback* connectCallback_; ///< ConnectCallback ConnectCallback* connectCallback_; ///< ConnectCallback
ErrMessageCallback* errMessageCallback_; ///< TimestampCallback ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
SendMsgParamsCallback* ///< Callback for retreaving SendMsgParamsCallback* ///< Callback for retrieving
sendMsgParamCallback_; ///< ::sendmsg() parameters sendMsgParamCallback_; ///< ::sendmsg() parameters
ReadCallback* readCallback_; ///< ReadCallback ReadCallback* readCallback_; ///< ReadCallback
WriteRequest* writeReqHead_; ///< Chain of WriteRequests WriteRequest* writeReqHead_; ///< Chain of WriteRequests
WriteRequest* writeReqTail_; ///< End of WriteRequest chain WriteRequest* writeReqTail_; ///< End of WriteRequest chain
...@@ -1178,6 +1219,9 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1178,6 +1219,9 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
bool noTSocks_{false}; bool noTSocks_{false};
// Whether to track EOR or not. // Whether to track EOR or not.
bool trackEor_{false}; bool trackEor_{false};
bool zeroCopyEnabled_{false};
bool zeroCopyVal_{false};
size_t zeroCopyWriteChainThreshold_{kDefaultZeroCopyThreshold};
std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr}; std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
}; };
......
...@@ -60,6 +60,10 @@ enum class WriteFlags : uint32_t { ...@@ -60,6 +60,10 @@ enum class WriteFlags : uint32_t {
* this indicates that only the write side of socket should be shutdown * this indicates that only the write side of socket should be shutdown
*/ */
WRITE_SHUTDOWN = 0x04, WRITE_SHUTDOWN = 0x04,
/*
* use msg zerocopy if allowed
*/
WRITE_MSG_ZEROCOPY = 0x08,
}; };
/* /*
......
...@@ -60,7 +60,7 @@ class SendMsgParamsCallbackBase : ...@@ -60,7 +60,7 @@ class SendMsgParamsCallbackBase :
int getFlagsImpl(folly::WriteFlags flags, int /*defaultFlags*/) noexcept int getFlagsImpl(folly::WriteFlags flags, int /*defaultFlags*/) noexcept
override { override {
return oldCallback_->getFlags(flags); return oldCallback_->getFlags(flags, false /*zeroCopyEnabled*/);
} }
void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override { void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override {
...@@ -88,7 +88,7 @@ class SendMsgFlagsCallback : public SendMsgParamsCallbackBase { ...@@ -88,7 +88,7 @@ class SendMsgFlagsCallback : public SendMsgParamsCallbackBase {
if (flags_) { if (flags_) {
return flags_; return flags_;
} else { } else {
return oldCallback_->getFlags(flags); return oldCallback_->getFlags(flags, false /*zeroCopyEnabled*/);
} }
} }
......
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Benchmark.h>
#include <folly/ExceptionWrapper.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/portability/GFlags.h>
using namespace folly;
class TestAsyncSocket {
public:
explicit TestAsyncSocket(
folly::EventBase* evb,
int numLoops,
size_t bufferSize,
bool zeroCopy)
: evb_(evb),
numLoops_(numLoops),
sock_(new folly::AsyncSocket(evb)),
callback_(this),
client_(true) {
setBufferSize(bufferSize);
setZeroCopy(zeroCopy);
}
explicit TestAsyncSocket(
folly::EventBase* evb,
int fd,
int numLoops,
size_t bufferSize,
bool zeroCopy)
: evb_(evb),
numLoops_(numLoops),
sock_(new folly::AsyncSocket(evb, fd)),
callback_(this),
client_(false) {
setBufferSize(bufferSize);
setZeroCopy(zeroCopy);
// enable reads
if (sock_) {
sock_->setReadCB(&callback_);
}
}
~TestAsyncSocket() {
clearBuffers();
}
void connect(const folly::SocketAddress& remote) {
if (sock_) {
sock_->connect(&callback_, remote);
}
}
private:
void setZeroCopy(bool enable) {
zeroCopy_ = enable;
if (sock_) {
sock_->setZeroCopy(zeroCopy_);
}
}
void setBufferSize(size_t bufferSize) {
clearBuffers();
bufferSize_ = bufferSize;
readBuffer_ = new char[bufferSize_];
}
class Callback : public folly::AsyncSocket::ReadCallback,
public folly::AsyncSocket::ConnectCallback {
public:
explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
void connectSuccess() noexcept override {
parent_->sock_->setReadCB(this);
parent_->onConnected();
}
void connectErr(const folly::AsyncSocketException& ex) noexcept override {
LOG(ERROR) << "Connect error: " << ex.what();
parent_->onDataFinish(folly::exception_wrapper(ex));
}
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
parent_->getReadBuffer(bufReturn, lenReturn);
}
void readDataAvailable(size_t len) noexcept override {
parent_->readDataAvailable(len);
}
void readEOF() noexcept override {
parent_->onDataFinish(folly::exception_wrapper());
}
void readErr(const folly::AsyncSocketException& ex) noexcept override {
parent_->onDataFinish(folly::exception_wrapper(ex));
}
private:
TestAsyncSocket* parent_{nullptr};
};
void clearBuffers() {
if (readBuffer_) {
delete[] readBuffer_;
}
}
void getReadBuffer(void** bufReturn, size_t* lenReturn) {
*bufReturn = readBuffer_ + readOffset_;
*lenReturn = bufferSize_ - readOffset_;
}
void readDataAvailable(size_t len) noexcept {
readOffset_ += len;
if (readOffset_ == bufferSize_) {
readOffset_ = 0;
onDataReady();
}
}
void onConnected() {
setZeroCopy(zeroCopy_);
writeBuffer();
}
void onDataReady() {
currLoop_++;
if (client_ && currLoop_ >= numLoops_) {
evb_->terminateLoopSoon();
return;
}
writeBuffer();
}
void onDataFinish(folly::exception_wrapper) {
if (client_) {
evb_->terminateLoopSoon();
}
}
bool writeBuffer() {
writeBuffer_ =
folly::IOBuf::takeOwnership(::malloc(bufferSize_), bufferSize_);
if (sock_ && writeBuffer_) {
sock_->writeChain(
nullptr,
std::move(writeBuffer_),
zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
}
return true;
}
folly::EventBase* evb_;
int numLoops_{0};
int currLoop_{0};
bool zeroCopy_{false};
folly::AsyncSocket::UniquePtr sock_;
Callback callback_;
size_t bufferSize_{0};
size_t readOffset_{0};
char* readBuffer_{nullptr};
std::unique_ptr<folly::IOBuf> writeBuffer_;
bool client_;
};
class TestServer : public folly::AsyncServerSocket::AcceptCallback {
public:
explicit TestServer(
folly::EventBase* evb,
int numLoops,
size_t bufferSize,
bool zeroCopy)
: evb_(evb),
numLoops_(numLoops),
bufferSize_(bufferSize),
zeroCopy_(zeroCopy) {}
void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
sock.addAcceptCallback(this, evb_);
}
void connectionAccepted(
int fd,
const folly::SocketAddress& /* unused */) noexcept override {
auto client = std::make_shared<TestAsyncSocket>(
evb_, fd, numLoops_, bufferSize_, zeroCopy_);
clients_[client.get()] = client;
}
void acceptError(const std::exception&) noexcept override {}
private:
folly::EventBase* evb_;
int numLoops_;
size_t bufferSize_;
bool zeroCopy_;
std::unique_ptr<TestAsyncSocket> client_;
std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
clients_;
};
class Test {
public:
explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
: numLoops_(numLoops),
zeroCopy_(zeroCopy),
bufferSize_(bufferSize),
client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
listenSock_(new folly::AsyncServerSocket(&evb_)),
server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
if (listenSock_) {
server_.addCallbackToServerSocket(*listenSock_);
}
}
void run() {
evb_.runInEventBaseThread([this]() {
if (listenSock_) {
listenSock_->bind(0);
listenSock_->setZeroCopy(zeroCopy_);
listenSock_->listen(10);
listenSock_->startAccepting();
connectOne();
}
});
evb_.loopForever();
}
private:
void connectOne() {
SocketAddress addr = listenSock_->getAddress();
client_->connect(addr);
}
int numLoops_;
bool zeroCopy_;
size_t bufferSize_;
EventBase evb_;
std::unique_ptr<TestAsyncSocket> client_;
folly::AsyncServerSocket::UniquePtr listenSock_;
TestServer server_;
};
void runClient(
const std::string& host,
uint16_t port,
int numLoops,
bool zeroCopy,
size_t bufferSize) {
LOG(INFO) << "Running client. host = " << host << " port = " << port
<< " numLoops = " << numLoops << " zeroCopy = " << zeroCopy
<< " bufferSize = " << bufferSize;
EventBase evb;
std::unique_ptr<TestAsyncSocket> client(
new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
SocketAddress addr(host, port);
evb.runInEventBaseThread([&]() { client->connect(addr); });
evb.loopForever();
}
void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
LOG(INFO) << "Running server. port = " << port << " numLoops = " << numLoops
<< " zeroCopy = " << zeroCopy << " bufferSize = " << bufferSize;
EventBase evb;
folly::AsyncServerSocket::UniquePtr listenSock(
new folly::AsyncServerSocket(&evb));
TestServer server(&evb, numLoops, bufferSize, zeroCopy);
server.addCallbackToServerSocket(*listenSock);
evb.runInEventBaseThread([&]() {
listenSock->bind(port);
listenSock->setZeroCopy(zeroCopy);
listenSock->listen(10);
listenSock->startAccepting();
});
evb.loopForever();
}
static auto constexpr kMaxLoops = 200000;
void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
Test test(kMaxLoops, true, bufferSize);
test.run();
}
void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
Test test(kMaxLoops, false, bufferSize);
test.run();
}
BENCHMARK_PARAM(zeroCopyOn, 4096);
BENCHMARK_PARAM(zeroCopyOff, 4096);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 8192);
BENCHMARK_PARAM(zeroCopyOff, 8192);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 16384);
BENCHMARK_PARAM(zeroCopyOff, 16384);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 32768);
BENCHMARK_PARAM(zeroCopyOff, 32768);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 65536);
BENCHMARK_PARAM(zeroCopyOff, 65536);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 131072);
BENCHMARK_PARAM(zeroCopyOff, 131072);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 262144);
BENCHMARK_PARAM(zeroCopyOff, 262144);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 524288);
BENCHMARK_PARAM(zeroCopyOff, 524288);
BENCHMARK_DRAW_LINE()
BENCHMARK_PARAM(zeroCopyOn, 1048576);
BENCHMARK_PARAM(zeroCopyOff, 1048576);
BENCHMARK_DRAW_LINE()
DEFINE_bool(client, false, "client mode");
DEFINE_bool(server, false, "server mode");
DEFINE_bool(zeroCopy, false, "use zerocopy");
DEFINE_int32(numLoops, kMaxLoops, "number of loops");
DEFINE_int32(bufferSize, 524288, "buffer size");
DEFINE_int32(port, 33130, "port");
DEFINE_string(host, "::1", "host");
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_client) {
runClient(
FLAGS_host,
FLAGS_port,
FLAGS_numLoops,
FLAGS_zeroCopy,
FLAGS_bufferSize);
} else if (FLAGS_server) {
runServer(FLAGS_port, FLAGS_numLoops, FLAGS_zeroCopy, FLAGS_bufferSize);
} else {
runBenchmarks();
}
}
...@@ -25,6 +25,24 @@ ...@@ -25,6 +25,24 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#ifdef MSG_ERRQUEUE
/* for struct sock_extended_err*/
#include <linux/errqueue.h>
#endif
#ifndef SO_EE_ORIGIN_ZEROCOPY
#define SO_EE_ORIGIN_ZEROCOPY 5
#endif
#ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 60
#endif
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0x4000000
#endif
#else #else
#include <folly/portability/IOVec.h> #include <folly/portability/IOVec.h>
#include <folly/portability/SysTypes.h> #include <folly/portability/SysTypes.h>
...@@ -35,6 +53,11 @@ ...@@ -35,6 +53,11 @@
using nfds_t = int; using nfds_t = int;
using sa_family_t = ADDRESS_FAMILY; using sa_family_t = ADDRESS_FAMILY;
// these are not supported
#define SO_EE_ORIGIN_ZEROCOPY 0
#define SO_ZEROCOPY 0
#define MSG_ZEROCOPY 0x0
// We don't actually support either of these flags // We don't actually support either of these flags
// currently. // currently.
#define MSG_DONTWAIT 0x1000 #define MSG_DONTWAIT 0x1000
...@@ -198,5 +221,6 @@ int setsockopt( ...@@ -198,5 +221,6 @@ int setsockopt(
#ifdef _WIN32 #ifdef _WIN32
// Add our helpers to the overload set. // Add our helpers to the overload set.
/* using override */ using namespace folly::portability::sockets; /* using override */
using namespace folly::portability::sockets;
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment