Commit 68154677 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Add support for sendmmsg GSO option

Summary: Add support for sendmmsg GSO option

Reviewed By: yfeldblum

Differential Revision: D19521628

fbshipit-source-id: 5611074a5cfb626ce42ffe99d1de78b8e57238c2
parent e12b7c2a
...@@ -358,18 +358,47 @@ int AsyncUDPSocket::writem( ...@@ -358,18 +358,47 @@ int AsyncUDPSocket::writem(
const folly::SocketAddress& address, const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs, const std::unique_ptr<folly::IOBuf>* bufs,
size_t count) { size_t count) {
return writemGSO(address, bufs, count, nullptr);
}
int AsyncUDPSocket::writemGSO(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
const int* gso) {
int ret; int ret;
constexpr size_t kSmallSizeMax = 8; constexpr size_t kSmallSizeMax = 8;
char* gsoControl = nullptr;
#ifndef FOLLY_HAVE_MSG_ERRQUEUE
CHECK(!gso) << "GSO not supported";
#endif
if (count <= kSmallSizeMax) { if (count <= kSmallSizeMax) {
// suppress "warning: variable length array 'vec' is used [-Wvla]" // suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla") FOLLY_GNU_DISABLE_WARNING("-Wvla")
mmsghdr vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)]; mmsghdr vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)];
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
// we will allocate this on the stack anyway even if we do not use it
char control
[(BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)) *
(CMSG_SPACE(sizeof(uint16_t)))];
if (gso) {
gsoControl = control;
}
#endif
FOLLY_POP_WARNING FOLLY_POP_WARNING
ret = writeImpl(address, bufs, count, vec); ret = writeImpl(address, bufs, count, vec, gso, gsoControl);
} else { } else {
std::unique_ptr<mmsghdr[]> vec(new mmsghdr[count]); std::unique_ptr<mmsghdr[]> vec(new mmsghdr[count]);
ret = writeImpl(address, bufs, count, vec.get()); #ifdef FOLLY_HAVE_MSG_ERRQUEUE
std::unique_ptr<char[]> control(
gso ? (new char[count * (CMSG_SPACE(sizeof(uint16_t)))]) : nullptr);
if (gso) {
gsoControl = control.get();
}
#endif
ret = writeImpl(address, bufs, count, vec.get(), gso, gsoControl);
} }
return ret; return ret;
...@@ -382,21 +411,41 @@ void AsyncUDPSocket::fillMsgVec( ...@@ -382,21 +411,41 @@ void AsyncUDPSocket::fillMsgVec(
size_t count, size_t count,
struct mmsghdr* msgvec, struct mmsghdr* msgvec,
struct iovec* iov, struct iovec* iov,
size_t iov_count) { size_t iov_count,
const int* gso,
char* gsoControl) {
size_t remaining = iov_count; size_t remaining = iov_count;
size_t iov_pos = 0; size_t iov_pos = 0;
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
// we can use remaining here to avoid calling countChainElements() again // we can use remaining here to avoid calling countChainElements() again
size_t iovec_len = bufs[i]->fillIov(&iov[iov_pos], remaining).numIovecs; auto ret = bufs[i]->fillIov(&iov[iov_pos], remaining);
size_t iovec_len = ret.numIovecs;
remaining -= iovec_len; remaining -= iovec_len;
auto& msg = msgvec[i].msg_hdr; auto& msg = msgvec[i].msg_hdr;
msg.msg_name = reinterpret_cast<void*>(addr); msg.msg_name = reinterpret_cast<void*>(addr);
msg.msg_namelen = addr_len; msg.msg_namelen = addr_len;
msg.msg_iov = &iov[iov_pos]; msg.msg_iov = &iov[iov_pos];
msg.msg_iovlen = iovec_len; msg.msg_iovlen = iovec_len;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (gso && gso[i] > 0) {
msg.msg_control = &gsoControl[i * CMSG_SPACE(sizeof(uint16_t))];
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
auto gso_len = static_cast<uint16_t>(gso[i]);
memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
} else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
}
#else
msg.msg_control = nullptr; msg.msg_control = nullptr;
msg.msg_controllen = 0; msg.msg_controllen = 0;
#endif
msg.msg_flags = 0; msg.msg_flags = 0;
msgvec[i].msg_len = 0; msgvec[i].msg_len = 0;
...@@ -409,7 +458,9 @@ int AsyncUDPSocket::writeImpl( ...@@ -409,7 +458,9 @@ int AsyncUDPSocket::writeImpl(
const folly::SocketAddress& address, const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs, const std::unique_ptr<folly::IOBuf>* bufs,
size_t count, size_t count,
struct mmsghdr* msgvec) { struct mmsghdr* msgvec,
const int* gso,
char* gsoControl) {
sockaddr_storage addrStorage; sockaddr_storage addrStorage;
address.getAddress(&addrStorage); address.getAddress(&addrStorage);
...@@ -433,7 +484,9 @@ int AsyncUDPSocket::writeImpl( ...@@ -433,7 +484,9 @@ int AsyncUDPSocket::writeImpl(
count, count,
msgvec, msgvec,
iov, iov,
iov_count); iov_count,
gso,
gsoControl);
ret = sendmmsg(fd_, msgvec, count, 0); ret = sendmmsg(fd_, msgvec, count, 0);
} else { } else {
std::unique_ptr<iovec[]> iov(new iovec[iov_count]); std::unique_ptr<iovec[]> iov(new iovec[iov_count]);
...@@ -444,7 +497,9 @@ int AsyncUDPSocket::writeImpl( ...@@ -444,7 +497,9 @@ int AsyncUDPSocket::writeImpl(
count, count,
msgvec, msgvec,
iov.get(), iov.get(),
iov_count); iov_count,
gso,
gsoControl);
ret = sendmmsg(fd_, msgvec, count, 0); ret = sendmmsg(fd_, msgvec, count, 0);
} }
......
...@@ -181,6 +181,21 @@ class AsyncUDPSocket : public EventHandler { ...@@ -181,6 +181,21 @@ class AsyncUDPSocket : public EventHandler {
const std::unique_ptr<folly::IOBuf>& buf, const std::unique_ptr<folly::IOBuf>& buf,
int gso); int gso);
/**
* Send the data in buffers to destination. Returns the return code from
* ::sendmmsg.
* bufs is an array of std::unique_ptr<folly::IOBuf>
* of size num
* gso is an array with the generic segmentation offload values or nullptr
* Before calling writeGSO with a positive value
* verify GSO is supported on this platform by calling getGSO
*/
virtual int writemGSO(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
const int* gso);
/** /**
* Send data in iovec to destination. Returns the return code from sendmsg. * Send data in iovec to destination. Returns the return code from sendmsg.
*/ */
...@@ -354,13 +369,17 @@ class AsyncUDPSocket : public EventHandler { ...@@ -354,13 +369,17 @@ class AsyncUDPSocket : public EventHandler {
size_t count, size_t count,
struct mmsghdr* msgvec, struct mmsghdr* msgvec,
struct iovec* iov, struct iovec* iov,
size_t iov_count); size_t iov_count,
const int* gso,
char* gsoControl);
virtual int writeImpl( virtual int writeImpl(
const folly::SocketAddress& address, const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs, const std::unique_ptr<folly::IOBuf>* bufs,
size_t count, size_t count,
struct mmsghdr* msgvec); struct mmsghdr* msgvec,
const int* gso,
char* gsoControl);
size_t handleErrMessages() noexcept; size_t handleErrMessages() noexcept;
......
...@@ -41,21 +41,48 @@ struct TestData { ...@@ -41,21 +41,48 @@ struct TestData {
bool useSocketGSO, bool useSocketGSO,
int* in, int* in,
size_t inLen, size_t inLen,
int* expected, const int* expected,
size_t expectedLen) size_t expectedLen)
: gso_(gso), useSocketGSO_(useSocketGSO) { : gso_(gso), useSocketGSO_(useSocketGSO) {
in_.assign(in, in + inLen); std::vector<int> inVec;
inVec.assign(in, in + inLen);
in_.emplace_back(std::move(inVec));
expected_.assign(expected, expected + expectedLen);
expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0);
}
TestData(
const std::vector<int>& gsoVec,
bool useSocketGSO,
const std::vector<std::vector<int>>& in,
const int* expected,
size_t expectedLen)
: gsoVec_(gsoVec), useSocketGSO_(useSocketGSO), in_(in) {
expected_.assign(expected, expected + expectedLen); expected_.assign(expected, expected + expectedLen);
expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0); expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0);
} }
bool checkIn() const { bool checkIn() const {
return (expectedSize_ == std::accumulate(in_.begin(), in_.end(), 0)); int expected = 0;
for (const auto& in : in_) {
expected += std::accumulate(in.begin(), in.end(), 0);
}
return (expectedSize_ == expected);
} }
bool checkOut() const { bool checkOut() const {
return (expectedSize_ == std::accumulate(out_.begin(), out_.end(), 0)); auto size = std::accumulate(out_.begin(), out_.end(), 0);
auto ret = (expectedSize_ == size);
if (!ret) {
LOG(ERROR) << "expected = " << expectedSize_ << " actual = " << size;
for (const auto& out : out_) {
LOG(ERROR) << out;
}
}
return ret;
} }
bool appendOut(int num) { bool appendOut(int num) {
...@@ -65,26 +92,61 @@ struct TestData { ...@@ -65,26 +92,61 @@ struct TestData {
return (outSize_ >= expectedSize_); return (outSize_ >= expectedSize_);
} }
bool isMulti() const {
return (in_.size() > 1);
}
const int* getGSOVec() const {
return (!gsoVec_.empty()) ? gsoVec_.data() : nullptr;
}
std::unique_ptr<folly::IOBuf> getInBuf() { std::unique_ptr<folly::IOBuf> getInBuf() {
if (!in_.size()) { if (!in_.size()) {
return nullptr; return nullptr;
} }
std::string str(in_[0], 'A'); auto& in = in_[0];
std::string str(in[0], 'A');
std::unique_ptr<folly::IOBuf> ret = std::unique_ptr<folly::IOBuf> ret =
folly::IOBuf::copyBuffer(str.data(), str.size()); folly::IOBuf::copyBuffer(str.data(), str.size());
for (size_t i = 1; i < in_.size(); i++) { for (size_t i = 1; i < in.size(); i++) {
str = std::string(in_[i], 'A'); str = std::string(in[i], 'A');
ret->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size())); ret->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size()));
} }
return ret; return ret;
} }
std::vector<std::unique_ptr<folly::IOBuf>> getInBufs() {
if (!in_.size()) {
return std::vector<std::unique_ptr<folly::IOBuf>>();
}
std::vector<std::unique_ptr<folly::IOBuf>> ret;
ret.reserve(in_.size());
for (const auto& in : in_) {
std::string str(in[0], 'A');
std::unique_ptr<folly::IOBuf> buf =
folly::IOBuf::copyBuffer(str.data(), str.size());
for (size_t i = 1; i < in.size(); i++) {
str = std::string(in[i], 'A');
buf->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size()));
}
ret.emplace_back(std::move(buf));
}
return ret;
}
int gso_{0}; int gso_{0};
std::vector<int> gsoVec_;
bool useSocketGSO_{false}; bool useSocketGSO_{false};
std::vector<int> in_; std::vector<std::vector<int>> in_;
std::vector<int> expected_; // expected std::vector<int> expected_; // expected
int expectedSize_; int expectedSize_;
std::vector<int> out_; std::vector<int> out_;
...@@ -244,14 +306,24 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout { ...@@ -244,14 +306,24 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
void sendPing() { void sendPing() {
scheduleTimeout(5); scheduleTimeout(5);
writePing( if (testData_.isMulti()) {
testData_.getInBuf(), testData_.useSocketGSO_ ? -1 : testData_.gso_); writePing(testData_.getInBufs(), testData_.getGSOVec());
} else {
writePing(
testData_.getInBuf(), testData_.useSocketGSO_ ? -1 : testData_.gso_);
}
} }
virtual void writePing(std::unique_ptr<folly::IOBuf> buf, int gso) { virtual void writePing(std::unique_ptr<folly::IOBuf> buf, int gso) {
socket_->writeGSO(server_, std::move(buf), gso); socket_->writeGSO(server_, std::move(buf), gso);
} }
virtual void writePing(
const std::vector<std::unique_ptr<folly::IOBuf>>& vec,
const int* gso) {
socket_->writemGSO(server_, vec.data(), vec.size(), gso);
}
void getReadBuffer(void** buf, size_t* len) noexcept override { void getReadBuffer(void** buf, size_t* len) noexcept override {
*buf = buf_; *buf = buf_;
*len = sizeof(buf_); *len = sizeof(buf_);
...@@ -404,6 +476,67 @@ TEST_F(AsyncSocketGSOIntegrationTest, PingPongRequestGSO) { ...@@ -404,6 +476,67 @@ TEST_F(AsyncSocketGSOIntegrationTest, PingPongRequestGSO) {
ASSERT_TRUE(testData.checkOut()); ASSERT_TRUE(testData.checkOut());
} }
TEST_F(AsyncSocketGSOIntegrationTest, MultiPingPongGlobalGSO) {
std::vector<int> gsoVec = {1000, 800, 1100, 1200};
std::vector<std::vector<int>> inVec;
inVec.reserve(gsoVec.size());
std::vector<int> in = {100, 1200, 3000, 200, 100, 300};
int total = std::accumulate(in.begin(), in.end(), 0);
std::vector<int> expected;
for (size_t i = 0; i < gsoVec.size(); i++) {
inVec.push_back(in);
auto remaining = total;
while (remaining) {
if (remaining > gsoVec[i]) {
expected.push_back(gsoVec[i]);
remaining -= gsoVec[i];
} else {
expected.push_back(remaining);
remaining = 0;
}
}
}
TestData testData(
gsoVec, true /*useSocketGSO*/, inVec, expected.data(), expected.size());
ASSERT_TRUE(testData.checkIn());
startServer();
auto pingClient = performPingPongTest(testData, folly::none);
ASSERT_TRUE(testData.checkOut());
}
TEST_F(AsyncSocketGSOIntegrationTest, MultiPingPongRequestGSO) {
std::vector<int> gsoVec = {421, 300, 528, 680};
std::vector<std::vector<int>> inVec;
inVec.reserve(gsoVec.size());
std::vector<int> in = {100, 1200, 3000, 200, 100, 300};
int total = std::accumulate(in.begin(), in.end(), 0);
std::vector<int> expected;
for (size_t i = 0; i < gsoVec.size(); i++) {
inVec.push_back(in);
auto remaining = total;
while (remaining) {
if (remaining > gsoVec[i]) {
expected.push_back(gsoVec[i]);
remaining -= gsoVec[i];
} else {
expected.push_back(remaining);
remaining = 0;
}
}
}
TestData testData(
gsoVec, false /*useSocketGSO*/, inVec, expected.data(), expected.size());
ASSERT_TRUE(testData.checkIn());
startServer();
auto pingClient = performPingPongTest(testData, folly::none);
ASSERT_TRUE(testData.checkOut());
}
// buffer sizes // buffer sizes
constexpr auto kGSO1 = 100; constexpr auto kGSO1 = 100;
constexpr auto kGSO2 = 200; constexpr auto kGSO2 = 200;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment