Commit 25fe3ce3 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Avoid duplicate code for UDP CMSG processing

Summary:
Avoid duplicate code for UDP CMSG processing

(Note: this ignores all push blocking failures!)

Reviewed By: yfeldblum

Differential Revision: D24394694

fbshipit-source-id: 80664109154e78ec4d6ee8d33cf9a5269f60195e
parent 2d0d7cb6
...@@ -44,6 +44,36 @@ namespace fsp = folly::portability::sockets; ...@@ -44,6 +44,36 @@ namespace fsp = folly::portability::sockets;
namespace folly { namespace folly {
void AsyncUDPSocket::fromMsg(
FOLLY_MAYBE_UNUSED ReadCallback::OnDataAvailableParams& params,
FOLLY_MAYBE_UNUSED struct msghdr& msg) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
struct cmsghdr* cmsg;
uint16_t* grosizeptr;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP) {
if (cmsg->cmsg_type == UDP_GRO) {
grosizeptr = (uint16_t*)CMSG_DATA(cmsg);
params.gro = *grosizeptr;
}
} else {
if (cmsg->cmsg_level == SOL_SOCKET) {
if (cmsg->cmsg_type == SO_TIMESTAMPING ||
cmsg->cmsg_type == SO_TIMESTAMPNS) {
ReadCallback::OnDataAvailableParams::Timestamp ts;
memcpy(
&ts,
reinterpret_cast<struct timespec*>(CMSG_DATA(cmsg)),
sizeof(ts));
params.ts = ts;
}
}
}
}
#endif
}
AsyncUDPSocket::AsyncUDPSocket(EventBase* evb) AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
: EventHandler(CHECK_NOTNULL(evb)), : EventHandler(CHECK_NOTNULL(evb)),
readCallback_(nullptr), readCallback_(nullptr),
...@@ -740,21 +770,10 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -740,21 +770,10 @@ void AsyncUDPSocket::handleRead() noexcept {
bool use_gro = gro_.has_value() && (gro_.value() > 0); bool use_gro = gro_.has_value() && (gro_.value() > 0);
bool use_ts = ts_.has_value() && (ts_.value() > 0); bool use_ts = ts_.has_value() && (ts_.value() > 0);
if (use_gro || use_ts) { if (use_gro || use_ts) {
char control char control[ReadCallback::OnDataAvailableParams::kCmsgSpace] = {};
[CMSG_SPACE(sizeof(uint16_t)) +
CMSG_SPACE(sizeof(ReadCallback::OnDataAvailableParams::Timestamp))];
size_t control_size = (use_gro ? CMSG_SPACE(sizeof(uint16_t)) : 0) +
(use_ts ? CMSG_SPACE(
sizeof(ReadCallback::OnDataAvailableParams::Timestamp))
: 0);
::memset(control, 0, control_size);
struct msghdr msg = {}; struct msghdr msg = {};
struct iovec iov = {}; struct iovec iov = {};
struct cmsghdr* cmsg;
uint16_t* grosizeptr;
iov.iov_base = buf; iov.iov_base = buf;
iov.iov_len = len; iov.iov_len = len;
...@@ -766,33 +785,13 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -766,33 +785,13 @@ void AsyncUDPSocket::handleRead() noexcept {
msg.msg_namelen = addrLen; msg.msg_namelen = addrLen;
msg.msg_control = control; msg.msg_control = control;
msg.msg_controllen = control_size; msg.msg_controllen = sizeof(control);
bytesRead = netops::recvmsg(fd_, &msg, MSG_TRUNC); bytesRead = netops::recvmsg(fd_, &msg, MSG_TRUNC);
if (bytesRead >= 0) { if (bytesRead >= 0) {
addrLen = msg.msg_namelen; addrLen = msg.msg_namelen;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; fromMsg(params, msg);
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP) {
if (cmsg->cmsg_type == UDP_GRO) {
grosizeptr = (uint16_t*)CMSG_DATA(cmsg);
params.gro_ = *grosizeptr;
}
} else {
if (cmsg->cmsg_level == SOL_SOCKET) {
if (cmsg->cmsg_type == SO_TIMESTAMPING ||
cmsg->cmsg_type == SO_TIMESTAMPNS) {
ReadCallback::OnDataAvailableParams::Timestamp ts;
memcpy(
&ts,
reinterpret_cast<struct timespec*>(CMSG_DATA(cmsg)),
sizeof(ts));
params.ts_ = ts;
}
}
}
}
} }
} else { } else {
bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen); bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
......
...@@ -41,19 +41,17 @@ class AsyncUDPSocket : public EventHandler { ...@@ -41,19 +41,17 @@ class AsyncUDPSocket : public EventHandler {
class ReadCallback { class ReadCallback {
public: public:
struct OnDataAvailableParams { struct OnDataAvailableParams {
int gro_ = -1; int gro = -1;
// RX timestamp if available // RX timestamp if available
using Timestamp = std::array<struct timespec, 3>; using Timestamp = std::array<struct timespec, 3>;
folly::Optional<Timestamp> ts_; folly::Optional<Timestamp> ts;
static std::chrono::nanoseconds to(const struct timespec& ts) {
auto duration = std::chrono::seconds(ts.tv_sec) +
std::chrono::nanoseconds(ts.tv_nsec);
return std::chrono::duration_cast<std::chrono::nanoseconds>(duration); #ifdef FOLLY_HAVE_MSG_ERRQUEUE
} static constexpr size_t kCmsgSpace =
CMSG_SPACE(sizeof(uint16_t)) + CMSG_SPACE(sizeof(Timestamp));
#endif
}; };
/** /**
* Invoked when the socket becomes readable and we want buffer * Invoked when the socket becomes readable and we want buffer
* to write to. * to write to.
...@@ -132,6 +130,10 @@ class AsyncUDPSocket : public EventHandler { ...@@ -132,6 +130,10 @@ class AsyncUDPSocket : public EventHandler {
virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0; virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
}; };
static void fromMsg(
FOLLY_MAYBE_UNUSED ReadCallback::OnDataAvailableParams& params,
FOLLY_MAYBE_UNUSED struct msghdr& msg);
/** /**
* Create a new UDP socket that will run in the * Create a new UDP socket that will run in the
* given eventbase * given eventbase
......
...@@ -165,13 +165,13 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback { ...@@ -165,13 +165,13 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
bool /*unused*/, bool /*unused*/,
OnDataAvailableParams params) noexcept override { OnDataAvailableParams params) noexcept override {
// send pong(s) // send pong(s)
if (params.gro_ == -1) { if (params.gro == -1) {
socket->write(client, data->clone()); socket->write(client, data->clone());
} else { } else {
int total = data->length(); int total = data->length();
size_t offset = 0; size_t offset = 0;
while (total > 0) { while (total > 0) {
auto size = (total > params.gro_) ? params.gro_ : total; auto size = (total > params.gro) ? params.gro : total;
auto sendData = IOBuf::copyBuffer(data->data() + offset, size); auto sendData = IOBuf::copyBuffer(data->data() + offset, size);
offset += size; offset += size;
total -= size; total -= size;
...@@ -339,7 +339,7 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout { ...@@ -339,7 +339,7 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
bool /*unused*/, bool /*unused*/,
OnDataAvailableParams params) noexcept override { OnDataAvailableParams params) noexcept override {
// no GRO on the client side // no GRO on the client side
CHECK_EQ(params.gro_, -1); CHECK_EQ(params.gro, -1);
VLOG(0) << "Got " << len << " bytes"; VLOG(0) << "Got " << len << " bytes";
if (testData_.appendOut(len)) { if (testData_.appendOut(len)) {
shutdown(); shutdown();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment