Commit ac9a3c70 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add UDP GRO support

Summary: Add UDP GRO support

Reviewed By: mjoras

Differential Revision: D20347326

fbshipit-source-id: be3c31f070c4f2c1ef84f9e2df60f49a5fcabd93
parent c80c8e6c
......@@ -71,7 +71,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
std::shared_ptr<AsyncUDPSocket> socket,
const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept = 0;
bool truncated,
AsyncUDPSocket::ReadCallback::OnDataAvailableParams) noexcept = 0;
virtual ~Callback() = default;
};
......@@ -214,7 +215,9 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
void onDataAvailable(
const folly::SocketAddress& clientAddress,
size_t len,
bool truncated) noexcept override {
bool truncated,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams
params) noexcept override {
buf_.postallocate(len);
auto data = buf_.split(len);
......@@ -254,8 +257,10 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
client = clientAddress,
callback,
data = std::move(data),
truncated]() mutable {
callback->onDataAvailable(socket, client, std::move(data), truncated);
truncated,
params]() mutable {
callback->onDataAvailable(
socket, client, std::move(data), truncated, params);
};
listeners_[listenerId].first->runInEventBaseThread(std::move(f));
......
......@@ -682,8 +682,48 @@ void AsyncUDPSocket::handleRead() noexcept {
auto rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = localAddress_.getFamily();
ssize_t bytesRead =
netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
ssize_t bytesRead;
ReadCallback::OnDataAvailableParams params;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (gro_.has_value() && (gro_ != 0)) {
char control[CMSG_SPACE(sizeof(uint16_t))] = {};
struct msghdr msg = {};
struct iovec iov = {};
struct cmsghdr* cmsg;
uint16_t* grosizeptr;
iov.iov_base = buf;
iov.iov_len = len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = rawAddr;
msg.msg_namelen = addrLen;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
bytesRead = netops::recvmsg(fd_, &msg, MSG_TRUNC);
if (bytesRead >= 0) {
addrLen = msg.msg_namelen;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
grosizeptr = (uint16_t*)CMSG_DATA(cmsg);
params.gro_ = *grosizeptr;
break;
}
}
}
} else {
bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
}
#else
bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
#endif
if (bytesRead >= 0) {
clientAddress_.setFromSockaddr(rawAddr, addrLen);
......@@ -695,7 +735,7 @@ void AsyncUDPSocket::handleRead() noexcept {
}
readCallback_->onDataAvailable(
clientAddress_, size_t(bytesRead), truncated);
clientAddress_, size_t(bytesRead), truncated, params);
}
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
......@@ -759,6 +799,39 @@ int AsyncUDPSocket::getGSO() {
return gso_.value();
}
bool AsyncUDPSocket::setGRO(bool bVal) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
int val = bVal ? 1 : 0;
int ret = netops::setsockopt(fd_, SOL_UDP, UDP_GRO, &val, sizeof(val));
gro_ = ret ? -1 : val;
return !ret;
#else
(void)bVal;
return false;
#endif
}
int AsyncUDPSocket::getGRO() {
// check if we can return the cached value
if (FOLLY_UNLIKELY(!gro_.has_value())) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
int gro = -1;
socklen_t optlen = sizeof(gro);
if (!netops::getsockopt(fd_, SOL_UDP, UDP_GRO, &gro, &optlen)) {
gro_ = gro;
} else {
gro_ = -1;
}
#else
gro_ = -1;
#endif
}
return gro_.value();
}
void AsyncUDPSocket::setTrafficClass(int tclass) {
if (netops::setsockopt(
fd_, IPPROTO_IPV6, IPV6_TCLASS, &tclass, sizeof(int)) != 0) {
......
......@@ -40,6 +40,9 @@ class AsyncUDPSocket : public EventHandler {
class ReadCallback {
public:
struct OnDataAvailableParams {
int gro_ = -1;
};
/**
* Invoked when the socket becomes readable and we want buffer
* to write to.
......@@ -51,14 +54,16 @@ class AsyncUDPSocket : public EventHandler {
virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0;
/**
* Invoked when a new datagraom is available on the socket. `len`
* Invoked when a new datagram is available on the socket. `len`
* is the number of bytes read and `truncated` is true if we had
* to drop few bytes because of running out of buffer space.
* OnDataAvailableParams::gro is the GRO segment size
*/
virtual void onDataAvailable(
const folly::SocketAddress& client,
size_t len,
bool truncated) noexcept = 0;
bool truncated,
OnDataAvailableParams params) noexcept = 0;
/**
* Notifies when data is available. This is only invoked when
......@@ -347,6 +352,12 @@ class AsyncUDPSocket : public EventHandler {
bool setGSO(int val);
// generic receive offload get/set
// negative return value means GRO is not available
int getGRO();
bool setGRO(bool bVal);
void setTrafficClass(int tclass);
void applyOptions(
......@@ -426,6 +437,10 @@ class AsyncUDPSocket : public EventHandler {
// See https://lwn.net/Articles/188489/ for more details
folly::Optional<int> gso_;
// generic receive offload value, if available
// See https://lwn.net/Articles/770978/ for more details
folly::Optional<int> gro_;
ErrMessageCallback* errMessageCallback_{nullptr};
};
......
......@@ -164,9 +164,23 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool /*unused*/) noexcept override {
// send pong
socket->write(client, data->clone());
bool /*unused*/,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams
params) noexcept override {
// send pong(s)
if (params.gro_ == -1) {
socket->write(client, data->clone());
} else {
int total = data->length();
size_t offset = 0;
while (total > 0) {
auto size = (total > params.gro_) ? params.gro_ : total;
auto sendData = IOBuf::copyBuffer(data->data() + offset, size);
offset += size;
total -= size;
socket->write(client, sendData);
}
}
}
private:
......@@ -181,7 +195,7 @@ class UDPServer {
void start() {
CHECK(evb_->isInEventBaseThread());
socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 64 * 1024);
try {
socket_->bind(addr_);
......@@ -190,6 +204,9 @@ class UDPServer {
LOG(FATAL) << ex.what();
}
auto s = socket_->getSocket();
s->setGRO(true);
acceptors_.reserve(evbs_.size());
threads_.reserve(evbs_.size());
......@@ -331,7 +348,11 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
void onDataAvailable(
const folly::SocketAddress& /*unused*/,
size_t len,
bool /*unused*/) noexcept override {
bool /*unused*/,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams
params) noexcept override {
// no GRO on the client side
CHECK_EQ(params.gro_, -1);
VLOG(0) << "Got " << len << " bytes";
if (testData_.appendOut(len)) {
shutdown();
......@@ -596,30 +617,6 @@ TEST(AsyncSocketGSOTest, send) {
folly::AsyncUDPSocket server(&evb);
server.bind(folly::SocketAddress("127.0.0.1", 0));
// send less than GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO - 1);
CHECK_LT(test.get(), 0);
}
// send less than GSO in multiple IOBufs
{
GSOSendTest test(client, server.address(), kGSO, kGSO1 - 1, kGSO2);
CHECK_LT(test.get(), 0);
}
// send GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO);
CHECK_LT(test.get(), 0);
}
// send GSO in multiple IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO1, kGSO2);
CHECK_LT(test.get(), 0);
}
// send more than GSO in a single IOBuf
{
GSOSendTest test(client, server.address(), kGSO, kGSO + 1);
......
......@@ -102,7 +102,9 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool /*unused*/) noexcept override {
bool /*unused*/,
folly::AsyncUDPSocket::ReadCallback::
OnDataAvailableParams /*unused*/) noexcept override {
// send pong
socket->write(client, data->clone());
}
......@@ -245,7 +247,9 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
void onDataAvailable(
const folly::SocketAddress& /*unused*/,
size_t len,
bool /*unused*/) noexcept override {
bool /*unused*/,
folly::AsyncUDPSocket::ReadCallback::
OnDataAvailableParams /*unused*/) noexcept override {
VLOG(0) << "Got " << len << " bytes";
if (testData_.appendOut(buf_, len)) {
shutdown();
......
......@@ -50,7 +50,9 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool truncated) noexcept override {
bool truncated,
folly::AsyncUDPSocket::ReadCallback::
OnDataAvailableParams) noexcept override {
lastClient_ = client;
lastMsg_ = data->clone()->moveToFbString().toStdString();
......@@ -254,7 +256,9 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
void onDataAvailable(
const folly::SocketAddress& client,
size_t len,
bool truncated) noexcept override {
bool truncated,
folly::AsyncUDPSocket::ReadCallback::
OnDataAvailableParams) noexcept override {
VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
<< client.describe() << " - " << std::string(buf_, len);
VLOG(4) << n_ << " left";
......@@ -362,7 +366,11 @@ class UDPNotifyClient : public UDPClient {
SocketAddress addr;
addr.setFromSockaddr(rawAddr, addrLen);
onDataAvailable(addr, size_t(read), false);
onDataAvailable(
addr,
size_t(read),
false,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams());
}
void onRecvMmsg(AsyncUDPSocket& sock) {
......@@ -659,14 +667,20 @@ class MockUDPReadCallback : public AsyncUDPSocket::ReadCallback {
onNotifyDataAvailable_(sock);
}
MOCK_METHOD3(
MOCK_METHOD4(
onDataAvailable_,
void(const folly::SocketAddress&, size_t, bool));
void(
const folly::SocketAddress&,
size_t,
bool,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams));
void onDataAvailable(
const folly::SocketAddress& client,
size_t len,
bool truncated) noexcept override {
onDataAvailable_(client, len, truncated);
bool truncated,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams
params) noexcept override {
onDataAvailable_(client, len, truncated, params);
}
MOCK_METHOD1(onReadError_, void(const folly::AsyncSocketException&));
......@@ -821,9 +835,14 @@ TEST_F(AsyncUDPSocketTest, TestDetachAttach) {
*buf = data.data();
*len = 1024;
}));
EXPECT_CALL(readCb, onDataAvailable_(_, _, _))
EXPECT_CALL(readCb, onDataAvailable_(_, _, _, _))
.WillRepeatedly(Invoke(
[&](const folly::SocketAddress&, size_t, bool) { packetsRecvd++; }));
[&](const folly::SocketAddress&,
size_t,
bool,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams) {
packetsRecvd++;
}));
socket_->resumeRead(&readCb);
writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
while (packetsRecvd != 1) {
......
......@@ -69,6 +69,10 @@
#define UDP_SEGMENT 103
#endif
#ifndef UDP_GRO
#define UDP_GRO 104
#endif
#ifndef UDP_MAX_SEGMENTS
#define UDP_MAX_SEGMENTS (1 << 6UL)
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment