Commit 2109302c authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

AsyncUDPSocket zerocopy support

Summary:
AsyncUDPSocket zerocopy support

(Note: this ignores all push blocking failures!)

Reviewed By: mjoras

Differential Revision: D24553259

fbshipit-source-id: 8c2385a2a336f3c84c76e4e29f42596a5af657e7
parent 93ee3410
......@@ -73,6 +73,12 @@ void AsyncUDPSocket::fromMsg(
}
#endif
}
static constexpr bool msgErrQueueSupported =
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
true;
#else
false;
#endif // FOLLY_HAVE_MSG_ERRQUEUE
AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
: EventHandler(CHECK_NOTNULL(evb)),
......@@ -345,6 +351,48 @@ void AsyncUDPSocket::setFD(NetworkSocket fd, FDOwnership ownership) {
localAddress_.setFromLocalAddress(fd_);
}
bool AsyncUDPSocket::setZeroCopy(bool enable) {
if (msgErrQueueSupported) {
zeroCopyVal_ = enable;
if (fd_ == NetworkSocket()) {
return false;
}
int val = enable ? 1 : 0;
int ret =
netops::setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
// if enable == false, set zeroCopyEnabled_ = false regardless
// if SO_ZEROCOPY is set or not
if (!enable) {
zeroCopyEnabled_ = enable;
return true;
}
/* if the setsockopt failed, try to see if the socket inherited the flag
* since we cannot set SO_ZEROCOPY on a socket s = accept
*/
if (ret) {
val = 0;
socklen_t optlen = sizeof(val);
ret = netops::getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
if (!ret) {
enable = val != 0;
}
}
if (!ret) {
zeroCopyEnabled_ = enable;
return true;
}
}
return false;
}
ssize_t AsyncUDPSocket::writeGSO(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf,
......@@ -365,6 +413,107 @@ ssize_t AsyncUDPSocket::writeGSO(
return writev(address, vec, iovec_len, gso);
}
int AsyncUDPSocket::getZeroCopyFlags() {
if (!zeroCopyEnabled_) {
// if the zeroCopyReenableCounter_ is > 0
// we try to dec and if we reach 0
// we set zeroCopyEnabled_ to true
if (zeroCopyReenableCounter_) {
if (0 == --zeroCopyReenableCounter_) {
zeroCopyEnabled_ = true;
return MSG_ZEROCOPY;
}
}
return 0;
}
return MSG_ZEROCOPY;
}
void AsyncUDPSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
uint32_t id = getNextZeroCopyBufId();
idZeroCopyBufMap_[id] = std::move(buf);
}
ssize_t AsyncUDPSocket::writeChain(
const folly::SocketAddress& address,
std::unique_ptr<folly::IOBuf>&& buf,
WriteOptions options) {
int msg_flags = options.zerocopy ? getZeroCopyFlags() : 0;
iovec vec[16];
size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0])).numIovecs;
if (UNLIKELY(iovec_len == 0)) {
buf->coalesce();
vec[0].iov_base = const_cast<uint8_t*>(buf->data());
vec[0].iov_len = buf->length();
iovec_len = 1;
}
CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
struct msghdr msg;
if (!connected_) {
msg.msg_name = reinterpret_cast<void*>(&addrStorage);
msg.msg_namelen = address.getActualSize();
} else {
if (connectedAddress_ != address) {
errno = ENOTSUP;
return -1;
}
msg.msg_name = nullptr;
msg.msg_namelen = 0;
}
msg.msg_iov = const_cast<struct iovec*>(vec);
msg.msg_iovlen = iovec_len;
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_flags = 0;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
char control[CMSG_SPACE(sizeof(uint16_t))];
if (options.gso > 0) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
auto gso_len = static_cast<uint16_t>(options.gso);
memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
}
#else
CHECK_LT(options.gso, 1) << "GSO not supported";
#endif
auto ret = sendmsg(fd_, &msg, msg_flags);
if (msg_flags) {
if (ret < 0) {
if (errno == ENOBUFS) {
LOG(INFO) << "ENOBUFS...";
// workaround for running with zerocopy enabled but without a big enough
// memlock value - see ulimit -l
// Also see /proc/sys/net/core/optmem_max
zeroCopyEnabled_ = false;
zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
ret = sendmsg(fd_, &msg, 0);
}
} else {
addZeroCopyBuf(std::move(buf));
}
}
if (ioBufFreeFunc_ && buf) {
ioBufFreeFunc_(std::move(buf));
}
return ret;
}
ssize_t AsyncUDPSocket::write(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
......@@ -653,15 +802,64 @@ void AsyncUDPSocket::close() {
}
void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
if (events & (EventHandler::READ | EventHandler::WRITE)) {
if (handleErrMessages()) {
return;
}
}
if (events & EventHandler::READ) {
DCHECK(readCallback_);
handleRead();
}
}
void AsyncUDPSocket::releaseZeroCopyBuf(uint32_t id) {
auto iter = idZeroCopyBufMap_.find(id);
CHECK(iter != idZeroCopyBufMap_.end());
if (ioBufFreeFunc_) {
ioBufFreeFunc_(std::move(iter->second));
}
idZeroCopyBufMap_.erase(iter);
}
bool AsyncUDPSocket::isZeroCopyMsg(FOLLY_MAYBE_UNUSED const cmsghdr& cmsg) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
(cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
auto serr =
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
return (
(serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
}
#endif
return false;
}
void AsyncUDPSocket::processZeroCopyMsg(
FOLLY_MAYBE_UNUSED const cmsghdr& cmsg) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
auto serr =
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
uint32_t hi = serr->ee_data;
uint32_t lo = serr->ee_info;
// disable zero copy if the buffer was actually copied
if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
<< "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
<< "on " << fd_;
zeroCopyEnabled_ = false;
}
for (uint32_t i = lo; i <= hi; i++) {
releaseZeroCopyBuf(i);
}
#endif
}
size_t AsyncUDPSocket::handleErrMessages() noexcept {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (errMessageCallback_ == nullptr) {
if (errMessageCallback_ == nullptr && idZeroCopyBufMap_.empty()) {
return 0;
}
uint8_t ctrl[1024];
......@@ -703,7 +901,11 @@ size_t AsyncUDPSocket::handleErrMessages() noexcept {
cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
++num;
errMessageCallback_->errMessage(*cmsg);
if (isZeroCopyMsg(*cmsg)) {
processZeroCopyMsg(*cmsg);
} else {
errMessageCallback_->errMessage(*cmsg);
}
if (fd_ == NetworkSocket()) {
// once the socket is closed there is no use for more read errors.
return num;
......
......@@ -18,6 +18,7 @@
#include <memory>
#include <folly/Function.h>
#include <folly/ScopeGuard.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
......@@ -134,6 +135,16 @@ class AsyncUDPSocket : public EventHandler {
FOLLY_MAYBE_UNUSED ReadCallback::OnDataAvailableParams& params,
FOLLY_MAYBE_UNUSED struct msghdr& msg);
using IOBufFreeFunc = folly::Function<void(std::unique_ptr<folly::IOBuf>&&)>;
struct WriteOptions {
WriteOptions() = default;
WriteOptions(int gsoVal, bool zerocopyVal)
: gso(gsoVal), zerocopy(zerocopyVal) {}
int gso{1};
bool zerocopy{false};
};
/**
* Create a new UDP socket that will run in the
* given eventbase
......@@ -186,6 +197,19 @@ class AsyncUDPSocket : public EventHandler {
*/
virtual void setFD(NetworkSocket fd, FDOwnership ownership);
bool setZeroCopy(bool enable);
bool getZeroCopy() const { return zeroCopyEnabled_; }
uint32_t getZeroCopyBufId() const { return zeroCopyBufId_; }
size_t getZeroCopyReenableThreshold() const {
return zeroCopyReenableThreshold_;
}
void setZeroCopyReenableThreshold(size_t threshold) {
zeroCopyReenableThreshold_ = threshold;
}
/**
* Send the data in buffer to destination. Returns the return code from
* ::sendmsg.
......@@ -219,6 +243,11 @@ class AsyncUDPSocket : public EventHandler {
const std::unique_ptr<folly::IOBuf>& buf,
int gso);
virtual ssize_t writeChain(
const folly::SocketAddress& address,
std::unique_ptr<folly::IOBuf>&& buf,
WriteOptions options);
/**
* Send the data in buffers to destination. Returns the return code from
* ::sendmmsg.
......@@ -368,6 +397,10 @@ class AsyncUDPSocket : public EventHandler {
bool setGSO(int val);
void setIOBufFreeFunc(IOBufFreeFunc&& ioBufFreeFunc) {
ioBufFreeFunc_ = std::move(ioBufFreeFunc);
}
// generic receive offload get/set
// negative return value means GRO is not available
int getGRO();
......@@ -481,6 +514,26 @@ class AsyncUDPSocket : public EventHandler {
folly::Optional<int> ts_;
ErrMessageCallback* errMessageCallback_{nullptr};
bool zeroCopyEnabled_{false};
bool zeroCopyVal_{false};
// zerocopy re-enable logic
size_t zeroCopyReenableThreshold_{0};
size_t zeroCopyReenableCounter_{0};
uint32_t zeroCopyBufId_{0};
int getZeroCopyFlags();
static bool isZeroCopyMsg(FOLLY_MAYBE_UNUSED const cmsghdr& cmsg);
void processZeroCopyMsg(FOLLY_MAYBE_UNUSED const cmsghdr& cmsg);
void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
void releaseZeroCopyBuf(uint32_t id);
uint32_t getNextZeroCopyBufId() { return zeroCopyBufId_++; }
std::unordered_map<uint32_t, std::unique_ptr<folly::IOBuf>> idZeroCopyBufMap_;
IOBufFreeFunc ioBufFreeFunc_;
};
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment