Commit 9af8f2be authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Adding UDP generic segmentation offload support

Summary: Adding UDP generic segmentation offload support

Reviewed By: siyengar

Differential Revision: D8825396

fbshipit-source-id: 5974307c2a901f4967e6f1c678b604ec1c21b3d0
parent 4813f785
......@@ -227,11 +227,12 @@ void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
localAddress_.setFromLocalAddress(fd_);
}
ssize_t AsyncUDPSocket::write(
ssize_t AsyncUDPSocket::writeGSO(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
const std::unique_ptr<folly::IOBuf>& buf,
int gso) {
// UDP's typical MTU size is 1500, so high number of buffers
// really do not make sense. Optimze for buffer chains with
// really do not make sense. Optimize for buffer chains with
// buffers less than 16, which is the highest I can think of
// for a real use case.
iovec vec[16];
......@@ -243,13 +244,20 @@ ssize_t AsyncUDPSocket::write(
iovec_len = 1;
}
return writev(address, vec, iovec_len);
return writev(address, vec, iovec_len, gso);
}
ssize_t AsyncUDPSocket::write(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
return writeGSO(address, buf, 0);
}
ssize_t AsyncUDPSocket::writev(
const folly::SocketAddress& address,
const struct iovec* vec,
size_t iovec_len) {
size_t iovec_len,
int gso) {
CHECK_NE(-1, fd_) << "Socket not yet bound";
sockaddr_storage addrStorage;
......@@ -264,9 +272,34 @@ ssize_t AsyncUDPSocket::writev(
msg.msg_controllen = 0;
msg.msg_flags = 0;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (gso > 0) {
char control[CMSG_SPACE(sizeof(uint16_t))];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
uint16_t gso_len = static_cast<uint16_t>(gso);
memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
return sendmsg(fd_, &msg, 0);
}
#else
CHECK_LT(gso, 1) << "GSO not supported";
#endif
return sendmsg(fd_, &msg, 0);
}
ssize_t AsyncUDPSocket::writev(
const folly::SocketAddress& address,
const struct iovec* vec,
size_t iovec_len) {
return writev(address, vec, iovec_len, 0);
}
void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
CHECK(!readCallback_) << "Another read callback already installed";
CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
......@@ -465,6 +498,29 @@ bool AsyncUDPSocket::updateRegistration() noexcept {
return registerHandler(uint16_t(flags | PERSIST));
}
bool AsyncUDPSocket::setGSO(int val) {
int ret = ::setsockopt(fd_, SOL_UDP, UDP_SEGMENT, &val, sizeof(val));
gso_ = ret ? -1 : val;
return !ret;
}
int AsyncUDPSocket::getGSO() {
// check if we can return the cached value
if (FOLLY_UNLIKELY(!gso_.hasValue())) {
int gso = -1;
socklen_t optlen = sizeof(gso);
if (!::getsockopt(fd_, SOL_UDP, UDP_SEGMENT, &gso, &optlen)) {
gso_ = gso;
} else {
gso_ = -1;
}
}
return gso_.value();
}
void AsyncUDPSocket::detachEventBase() {
DCHECK(eventBase_ && eventBase_->isInEventBaseThread());
registerHandler(uint16_t(NONE));
......
......@@ -136,9 +136,29 @@ class AsyncUDPSocket : public EventHandler {
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf);
/**
* Send the data in buffer to destination. Returns the return code from
* ::sendmsg.
* gso is the generic segmentation offload value
* writeGSO will return -1 if
* buf->computeChainDataLength() <= gso
* Before calling writeGSO with a positive value
* verify GSO is supported on this platform by calling getGSO
*/
virtual ssize_t writeGSO(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf,
int gso);
/**
* Send data in iovec to destination. Returns the return code from sendmsg.
*/
virtual ssize_t writev(
const folly::SocketAddress& address,
const struct iovec* vec,
size_t veclen,
int gso);
virtual ssize_t writev(
const folly::SocketAddress& address,
const struct iovec* vec,
......@@ -252,6 +272,12 @@ class AsyncUDPSocket : public EventHandler {
virtual void attachEventBase(folly::EventBase* evb);
// generic segmentation offload get/set
// negative return value means GSO is not available
int getGSO();
bool setGSO(int val);
protected:
virtual ssize_t sendmsg(int socket, const struct msghdr* message, int flags) {
return ::sendmsg(socket, message, flags);
......@@ -289,6 +315,10 @@ class AsyncUDPSocket : public EventHandler {
int sndBuf_{0};
int busyPollUs_{0};
// generic segmentation offload value, if available
// See https://lwn.net/Articles/188489/ for more details
folly::Optional<int> gso_;
ErrMessageCallback* errMessageCallback_{nullptr};
};
......
/*
* Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <numeric>
#include <thread>
#include <folly/Conv.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/AsyncUDPServerSocket.h>
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
using folly::AsyncTimeout;
using folly::AsyncUDPServerSocket;
using folly::AsyncUDPSocket;
using folly::EventBase;
using folly::IOBuf;
using folly::SocketAddress;
using namespace testing;
struct TestData {
TestData(
int gso,
bool useSocketGSO,
int* in,
size_t inLen,
int* expected,
size_t expectedLen)
: gso_(gso), useSocketGSO_(useSocketGSO) {
in_.assign(in, in + inLen);
expected_.assign(expected, expected + expectedLen);
expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0);
}
bool checkIn() const {
return (expectedSize_ == std::accumulate(in_.begin(), in_.end(), 0));
}
bool checkOut() const {
return (expectedSize_ == std::accumulate(out_.begin(), out_.end(), 0));
}
bool appendOut(int num) {
out_.push_back(num);
outSize_ += num;
return (outSize_ >= expectedSize_);
}
std::unique_ptr<folly::IOBuf> getInBuf() {
if (!in_.size()) {
return nullptr;
}
std::string str(in_[0], 'A');
std::unique_ptr<folly::IOBuf> ret =
folly::IOBuf::copyBuffer(str.data(), str.size());
for (size_t i = 1; i < in_.size(); i++) {
str = std::string(in_[i], 'A');
ret->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size()));
}
return ret;
}
int gso_{0};
bool useSocketGSO_{false};
std::vector<int> in_;
std::vector<int> expected_; // expected
int expectedSize_;
std::vector<int> out_;
int outSize_{0};
};
class UDPAcceptor : public AsyncUDPServerSocket::Callback {
public:
UDPAcceptor(EventBase* evb) : evb_(evb) {}
void onListenStarted() noexcept override {}
void onListenStopped() noexcept override {}
void onDataAvailable(
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool /*unused*/) noexcept override {
// send pong
socket->write(client, data->clone());
}
private:
EventBase* const evb_{nullptr};
};
class UDPServer {
public:
UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
: evb_(evb), addr_(addr), evbs_(n) {}
void start() {
CHECK(evb_->isInEventBaseThread());
socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
try {
socket_->bind(addr_);
VLOG(4) << "Server listening on " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
acceptors_.reserve(evbs_.size());
threads_.reserve(evbs_.size());
// Add numWorkers thread
int i = 0;
for (auto& evb : evbs_) {
acceptors_.emplace_back(&evb);
std::thread t([&]() { evb.loopForever(); });
evb.waitUntilRunning();
socket_->addListener(&evb, &acceptors_[i]);
threads_.emplace_back(std::move(t));
++i;
}
socket_->listen();
}
folly::SocketAddress address() const {
return socket_->address();
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->close();
socket_.reset();
for (auto& evb : evbs_) {
evb.terminateLoopSoon();
}
for (auto& t : threads_) {
t.join();
}
}
void pauseAccepting() {
socket_->pauseAccepting();
}
void resumeAccepting() {
socket_->resumeAccepting();
}
private:
EventBase* const evb_{nullptr};
const folly::SocketAddress addr_;
std::unique_ptr<AsyncUDPServerSocket> socket_;
std::vector<std::thread> threads_;
std::vector<folly::EventBase> evbs_;
std::vector<UDPAcceptor> acceptors_;
};
class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
public:
explicit UDPClient(EventBase* evb, TestData& testData)
: AsyncTimeout(evb), evb_(evb), testData_(testData) {}
void start(const folly::SocketAddress& server) {
CHECK(evb_->isInEventBaseThread());
server_ = server;
socket_ = std::make_unique<AsyncUDPSocket>(evb_);
try {
socket_->bind(folly::SocketAddress("127.0.0.1", 0));
if (connectAddr_) {
connect();
}
VLOG(2) << "Client bound to " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
// succeed if GSO not available
if (socket_->getGSO() < 0) {
LOG(INFO) << "GSO not supported";
testData_.out_ = testData_.expected_;
shutdown();
return;
}
if (testData_.useSocketGSO_) {
socket_->setGSO(testData_.gso_);
} else {
socket_->setGSO(0);
}
socket_->resumeRead(this);
// Start playing ping pong
sendPing();
}
void connect() {
int ret = socket_->connect(*connectAddr_);
if (ret != 0) {
throw folly::AsyncSocketException(
folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
}
VLOG(2) << "Client connected to address=" << *connectAddr_;
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->pauseRead();
socket_->close();
socket_.reset();
evb_->terminateLoopSoon();
}
void sendPing() {
scheduleTimeout(5);
writePing(
testData_.getInBuf(), testData_.useSocketGSO_ ? -1 : testData_.gso_);
}
virtual void writePing(std::unique_ptr<folly::IOBuf> buf, int gso) {
socket_->writeGSO(server_, std::move(buf), gso);
}
void getReadBuffer(void** buf, size_t* len) noexcept override {
*buf = buf_;
*len = sizeof(buf_);
}
void onDataAvailable(
const folly::SocketAddress& /*unused*/,
size_t len,
bool /*unused*/) noexcept override {
VLOG(0) << "Got " << len << " bytes";
if (testData_.appendOut(len)) {
shutdown();
}
}
void onReadError(const folly::AsyncSocketException& ex) noexcept override {
VLOG(4) << ex.what();
// Start listening for next PONG
socket_->resumeRead(this);
}
void onReadClosed() noexcept override {
CHECK(false) << "We unregister reads before closing";
}
void timeoutExpired() noexcept override {
VLOG(4) << "Timeout expired";
shutdown();
}
AsyncUDPSocket& getSocket() {
return *socket_;
}
void setShouldConnect(const folly::SocketAddress& connectAddr) {
connectAddr_ = connectAddr;
}
protected:
folly::Optional<folly::SocketAddress> connectAddr_;
EventBase* const evb_{nullptr};
folly::SocketAddress server_;
std::unique_ptr<AsyncUDPSocket> socket_;
private:
char buf_[2048];
TestData& testData_;
};
class AsyncSocketGSOIntegrationTest : public Test {
public:
void SetUp() override {
server = std::make_unique<UDPServer>(
&sevb, folly::SocketAddress("127.0.0.1", 0), 1);
// Start event loop in a separate thread
serverThread =
std::make_unique<std::thread>([this]() { sevb.loopForever(); });
// Wait for event loop to start
sevb.waitUntilRunning();
}
void startServer() {
// Start the server
sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
LOG(INFO) << "Server listening=" << server->address();
}
void TearDown() override {
// Shutdown server
sevb.runInEventBaseThread([&]() {
server->shutdown();
sevb.terminateLoopSoon();
});
// Wait for server thread to join
serverThread->join();
}
std::unique_ptr<UDPClient> performPingPongTest(
TestData& testData,
folly::Optional<folly::SocketAddress> connectedAddress);
folly::EventBase sevb;
folly::EventBase cevb;
TestData* testData_{nullptr};
std::unique_ptr<std::thread> serverThread;
std::unique_ptr<UDPServer> server;
std::unique_ptr<UDPClient> client;
};
std::unique_ptr<UDPClient> AsyncSocketGSOIntegrationTest::performPingPongTest(
TestData& testData,
folly::Optional<folly::SocketAddress> connectedAddress) {
testData_ = &testData;
client = std::make_unique<UDPClient>(&cevb, testData);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
}
// Start event loop in a separate thread
auto clientThread = std::thread([this]() { cevb.loopForever(); });
// Wait for event loop to start
cevb.waitUntilRunning();
// Send ping
cevb.runInEventBaseThread([&]() { client->start(server->address()); });
// Wait for client to finish
clientThread.join();
return std::move(client);
}
TEST_F(AsyncSocketGSOIntegrationTest, PingPongGlobalGSO) {
int gso = 1000;
int in[] = {100, 1200, 3000, 200, 100, 300};
int expected[] = {1000, 1000, 1000, 1000, 900};
TestData testData(
gso,
true /*useSocketGSO*/,
in,
sizeof(in) / sizeof(in[0]),
expected,
sizeof(expected) / sizeof(expected[0]));
ASSERT_TRUE(testData.checkIn());
startServer();
auto pingClient = performPingPongTest(testData, folly::none);
ASSERT_TRUE(testData.checkOut());
}
TEST_F(AsyncSocketGSOIntegrationTest, PingPongRequestGSO) {
int gso = 421;
int in[] = {100, 1200, 3000, 200, 100, 300};
int expected[] = {421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 269};
TestData testData(
gso,
false /*useSocketGSO*/,
in,
sizeof(in) / sizeof(in[0]),
expected,
sizeof(expected) / sizeof(expected[0]));
ASSERT_TRUE(testData.checkIn());
startServer();
auto pingClient = performPingPongTest(testData, folly::none);
ASSERT_TRUE(testData.checkOut());
}
// buffer sizes
constexpr auto kGSO1 = 100;
constexpr auto kGSO2 = 200;
constexpr auto kGSO = kGSO1 + kGSO2;
class GSOBuf {
public:
explicit GSOBuf(size_t size1, size_t size2 = 0) {
std::string str(size1, 'A');
ioBuf_ = folly::IOBuf::copyBuffer(str.data(), str.size());
if (size2) {
str = std::string(size2, 'B');
auto tmp = folly::IOBuf::copyBuffer(str.data(), str.size());
ioBuf_->prependChain(std::move(tmp));
}
}
const std::unique_ptr<IOBuf>& get() const {
return ioBuf_;
}
private:
std::unique_ptr<IOBuf> ioBuf_;
};
class GSOSendTest {
public:
explicit GSOSendTest(
folly::AsyncUDPSocket& socket,
const folly::SocketAddress& address,
int gso,
size_t size1,
size_t size2 = 0) {
GSOBuf buf(size1, size2);
ret_ = socket.writeGSO(address, buf.get(), gso);
}
ssize_t get() const {
return ret_;
}
private:
ssize_t ret_;
};
TEST(AsyncSocketGSOTest, send) {
EventBase evb;
folly::AsyncUDPSocket client(&evb);
client.bind(folly::SocketAddress("127.0.0.1", 0));
if (client.getGSO() < 0) {
LOG(INFO) << "GSO not supported";
// GSO not supported
return;
}
folly::AsyncUDPSocket server(&evb);
server.bind(folly::SocketAddress("127.0.0.1", 0));
// send less than GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO - 1);
CHECK_LT(test.get(), 0);
}
// send less than GSO in multiple IOBufs
{
GSOSendTest test(client, server.address(), kGSO, kGSO1 - 1, kGSO2);
CHECK_LT(test.get(), 0);
}
// send GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO);
CHECK_LT(test.get(), 0);
}
// send GSO in multiple IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO1, kGSO2);
CHECK_LT(test.get(), 0);
}
// send more than GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO + 1);
CHECK_EQ(test.get(), kGSO + 1);
}
// send more than GSO in a multiple IOBufs
{
GSOSendTest test(client, server.address(), kGSO, kGSO1 + 1, kGSO2 + 1);
CHECK_EQ(test.get(), kGSO + 2);
}
}
......@@ -48,6 +48,22 @@
#define MSG_ZEROCOPY 0x4000000
#endif
#ifndef SOL_UDP
#define SOL_UDP 17
#endif
#ifndef ETH_MAX_MTU
#define ETH_MAX_MTU 0xFFFFU
#endif
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#ifndef UDP_MAX_SEGMENTS
#define UDP_MAX_SEGMENTS (1 << 6UL)
#endif
#else
#include <folly/portability/IOVec.h>
#include <folly/portability/SysTypes.h>
......@@ -62,6 +78,8 @@ using sa_family_t = ADDRESS_FAMILY;
#define SO_EE_ORIGIN_ZEROCOPY 0
#define SO_ZEROCOPY 0
#define MSG_ZEROCOPY 0x0
#define SOL_UDP 0x0
#define UDP_SEGMENT 0x0
// We don't actually support either of these flags
// currently.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment