Commit 1902d0ff authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add support for UDP RX timestamps

Summary: Add support for UDP RX timestamps

Reviewed By: mjoras

Differential Revision: D24146914

fbshipit-source-id: 381839fc5402f13e428364c47c5752473acda6af
parent 343a70ae
...@@ -737,8 +737,21 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -737,8 +737,21 @@ void AsyncUDPSocket::handleRead() noexcept {
ReadCallback::OnDataAvailableParams params; ReadCallback::OnDataAvailableParams params;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE #ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (gro_.has_value() && (gro_.value() > 0)) { bool use_gro = gro_.has_value() && (gro_.value() > 0);
char control[CMSG_SPACE(sizeof(uint16_t))] = {}; bool use_ts = ts_.has_value() && (ts_.value() > 0);
if (use_gro || use_ts) {
char control
[CMSG_SPACE(sizeof(uint16_t)) +
CMSG_SPACE(sizeof(ReadCallback::OnDataAvailableParams::Timestamp))];
::memset(
control,
0,
(use_gro ? CMSG_SPACE(sizeof(uint16_t)) : 0) +
(use_ts ? CMSG_SPACE(sizeof(
ReadCallback::OnDataAvailableParams::Timestamp))
: 0));
struct msghdr msg = {}; struct msghdr msg = {};
struct iovec iov = {}; struct iovec iov = {};
struct cmsghdr* cmsg; struct cmsghdr* cmsg;
...@@ -762,10 +775,23 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -762,10 +775,23 @@ void AsyncUDPSocket::handleRead() noexcept {
addrLen = msg.msg_namelen; addrLen = msg.msg_namelen;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msg, cmsg)) { cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) { if (cmsg->cmsg_level == SOL_UDP) {
grosizeptr = (uint16_t*)CMSG_DATA(cmsg); if (cmsg->cmsg_type == UDP_GRO) {
params.gro_ = *grosizeptr; grosizeptr = (uint16_t*)CMSG_DATA(cmsg);
break; params.gro_ = *grosizeptr;
}
} else {
if (cmsg->cmsg_level == SOL_SOCKET) {
if (cmsg->cmsg_type == SO_TIMESTAMPING ||
cmsg->cmsg_type == SO_TIMESTAMPNS) {
ReadCallback::OnDataAvailableParams::Timestamp ts;
memcpy(
&ts,
reinterpret_cast<struct timespec*>(CMSG_DATA(cmsg)),
sizeof(ts));
params.ts_ = ts;
}
}
} }
} }
} }
...@@ -867,6 +893,40 @@ bool AsyncUDPSocket::setGRO(bool bVal) { ...@@ -867,6 +893,40 @@ bool AsyncUDPSocket::setGRO(bool bVal) {
#endif #endif
} }
// packet timestamping
int AsyncUDPSocket::getTimestamping() {
// check if we can return the cached value
if (FOLLY_UNLIKELY(!ts_.has_value())) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
int ts = -1;
socklen_t optlen = sizeof(ts);
if (!netops::getsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, &ts, &optlen)) {
ts_ = ts;
} else {
ts_ = -1;
}
#else
ts_ = -1;
#endif
}
return ts_.value();
}
bool AsyncUDPSocket::setTimestamping(int val) {
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
int ret =
netops::setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, &val, sizeof(val));
ts_ = ret ? -1 : val;
return !ret;
#else
(void)val;
return false;
#endif
}
int AsyncUDPSocket::getGRO() { int AsyncUDPSocket::getGRO() {
// check if we can return the cached value // check if we can return the cached value
if (FOLLY_UNLIKELY(!gro_.has_value())) { if (FOLLY_UNLIKELY(!gro_.has_value())) {
......
...@@ -42,6 +42,18 @@ class AsyncUDPSocket : public EventHandler { ...@@ -42,6 +42,18 @@ class AsyncUDPSocket : public EventHandler {
public: public:
struct OnDataAvailableParams { struct OnDataAvailableParams {
int gro_ = -1; int gro_ = -1;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
// RX timestamp if available
using Timestamp = std::array<struct timespec, 3>;
folly::Optional<Timestamp> ts_;
static std::chrono::nanoseconds to(const struct timespec& ts) {
auto duration = std::chrono::seconds(ts.tv_sec) +
std::chrono::nanoseconds(ts.tv_nsec);
return std::chrono::duration_cast<std::chrono::nanoseconds>(duration);
}
#endif
}; };
/** /**
* Invoked when the socket becomes readable and we want buffer * Invoked when the socket becomes readable and we want buffer
...@@ -383,6 +395,10 @@ class AsyncUDPSocket : public EventHandler { ...@@ -383,6 +395,10 @@ class AsyncUDPSocket : public EventHandler {
bool setGRO(bool bVal); bool setGRO(bool bVal);
// packet timestamping
int getTimestamping();
bool setTimestamping(int val);
// disable/enable RX zero checksum check for UDP over IPv6 // disable/enable RX zero checksum check for UDP over IPv6
bool setRxZeroChksum6(bool bVal); bool setRxZeroChksum6(bool bVal);
...@@ -482,6 +498,9 @@ class AsyncUDPSocket : public EventHandler { ...@@ -482,6 +498,9 @@ class AsyncUDPSocket : public EventHandler {
// See https://lwn.net/Articles/770978/ for more details // See https://lwn.net/Articles/770978/ for more details
folly::Optional<int> gro_; folly::Optional<int> gro_;
// packet timestamping
folly::Optional<int> ts_;
ErrMessageCallback* errMessageCallback_{nullptr}; ErrMessageCallback* errMessageCallback_{nullptr};
}; };
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fcntl.h>
#include <linux/net_tstamp.h>
#include <linux/ptp_clock.h>
#include <linux/sockios.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <folly/String.h>
#include <folly/init/Init.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/AsyncUDPServerSocket.h>
#include <folly/io/async/EventBase.h>
#ifndef CLOCK_INVALID
#define CLOCK_INVALID -1
#endif
#define CLOCKFD 3
#define FD_TO_CLOCKID(fd) ((clockid_t)((((unsigned int)~fd) << 3) | CLOCKFD))
#define CLOCKID_TO_FD(clk) ((unsigned int)~((clk) >> 3))
DEFINE_bool(ts_client, false, "client mode");
DEFINE_bool(ts_server, false, "client mode");
DEFINE_string(ts_client_addr, "::1", "addr to bind on");
DEFINE_string(ts_server_addr, "::1", "addr to bind on/send packets to");
DEFINE_int32(ts_port, 7879, "port");
DEFINE_string(ts_eth, "eth0", "ethernet interface name");
DEFINE_int32(ts_num, 100, "number of iterations");
DEFINE_int32(ts_gso, 0, "GSO");
std::ostream& operator<<(std::ostream& os, const struct timespec& ts) {
return os << "(" << ts.tv_sec << "," << ts.tv_nsec << ")";
}
namespace {
class PTPTimeSource {
public:
PTPTimeSource(const char* name = "/dev/ptp0") {
int fd = ::open(name, O_RDWR);
if (fd >= 0) {
clkid_ = FD_TO_CLOCKID(fd);
} else {
clkid_ = CLOCK_INVALID;
}
}
~PTPTimeSource() {
if (clkid_ != CLOCK_INVALID) {
::close(CLOCKID_TO_FD(clkid_));
}
}
static PTPTimeSource& getInstance() {
static PTPTimeSource sInstance;
return sInstance;
}
static struct timespec now() {
return getInstance().nowInternal();
}
private:
struct timespec nowInternal() {
struct timespec ts;
if (clkid_ == CLOCK_INVALID || clock_gettime(clkid_, &ts)) {
ts.tv_sec = 0;
ts.tv_nsec = 0;
}
return ts;
}
clockid_t clkid_;
};
using OnDataAvailableParams =
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams;
class UDPAcceptor : public folly::AsyncUDPServerSocket::Callback {
public:
explicit UDPAcceptor(int n) : n_(n) {}
void onListenStarted() noexcept override {}
void onListenStopped() noexcept override {}
void onDataAvailable(
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool truncated,
OnDataAvailableParams params) noexcept override {
lastClient_ = client;
lastMsg_ = data->clone()->moveToFbString().toStdString();
auto len = data->computeChainDataLength();
OnDataAvailableParams::Timestamp ts;
if (params.ts_.has_value()) {
ts = params.ts_.value();
} else {
::memset(&ts, 0, sizeof(ts));
}
auto now = PTPTimeSource::getInstance().now();
LOG(INFO) << "Worker " << n_ << " read " << len << " bytes "
<< "(trun:" << truncated << ") from " << client.describe()
<< " - " << lastMsg_ << " gro = " << params.gro_
<< " ts = " << params.ts_.has_value() << " " << ts[0] << ts[1]
<< ts[2] << " now:" << now;
sendPong(socket);
}
void sendPong(std::shared_ptr<folly::AsyncUDPSocket> socket) noexcept {
try {
auto writeSocket = socket;
writeSocket->write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
} catch (const std::exception& ex) {
LOG(ERROR) << "Failed to send PONG " << ex.what();
}
}
private:
const int n_{-1};
folly::SocketAddress lastClient_;
std::string lastMsg_;
};
class UDPServer {
public:
UDPServer(folly::EventBase* evb, folly::SocketAddress addr, int n)
: evb_(evb), addr_(addr), evbs_(n) {}
void start() {
CHECK(evb_->isInEventBaseThread());
socket_ = std::make_unique<folly::AsyncUDPServerSocket>(evb_, 1500);
try {
socket_->bind(addr_);
int val = SOF_TIMESTAMPING_RX_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE |
SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_SOFTWARE;
bool ret = socket_->getSocket()->setTimestamping(val);
LOG(INFO) << "setTimestamping() returned " << ret;
ret = socket_->getSocket()->setGRO(true);
LOG(INFO) << "setGRO() returned " << ret;
LOG(INFO) << "Server listening on " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
acceptors_.reserve(evbs_.size());
threads_.reserve(evbs_.size());
// Add numWorkers thread
int i = 0;
for (auto& evb : evbs_) {
acceptors_.emplace_back(i);
std::thread t([&]() { evb.loopForever(); });
evb.waitUntilRunning();
socket_->addListener(&evb, &acceptors_[i]);
threads_.emplace_back(std::move(t));
++i;
}
socket_->listen();
}
private:
folly::EventBase* const evb_{nullptr};
const folly::SocketAddress addr_;
std::unique_ptr<folly::AsyncUDPServerSocket> socket_;
std::vector<std::thread> threads_;
std::vector<folly::EventBase> evbs_;
std::vector<UDPAcceptor> acceptors_;
};
class UDPClient : private folly::AsyncUDPSocket::ReadCallback,
private folly::AsyncTimeout {
public:
using folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams;
~UDPClient() override = default;
explicit UDPClient(folly::EventBase* evb)
: folly::AsyncTimeout(evb), evb_(evb) {}
void start(
const folly::SocketAddress& client,
const folly::SocketAddress& server,
int n) {
CHECK(evb_->isInEventBaseThread());
server_ = server;
socket_ = std::make_unique<folly::AsyncUDPSocket>(evb_);
try {
socket_->bind(client);
LOG(INFO) << "Client bound to " << socket_->address().describe();
socket_->setGSO(FLAGS_ts_gso);
auto ret = socket_->getGSO();
LOG(INFO) << "GSO = " << ret;
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
socket_->resumeRead(this);
n_ = n;
sendPing();
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->pauseRead();
socket_->close();
socket_.reset();
evb_->terminateLoopSoon();
}
void sendPing() {
if (n_ == 0) {
shutdown();
return;
}
--n_;
scheduleTimeout(1000);
writePing(folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
}
virtual void writePing(std::unique_ptr<folly::IOBuf> buf) {
socket_->write(server_, std::move(buf));
}
void getReadBuffer(void** buf, size_t* len) noexcept override {
*buf = buf_;
*len = 1024;
}
void onDataAvailable(
const folly::SocketAddress& client,
size_t len,
bool truncated,
OnDataAvailableParams) noexcept override {
VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
<< client.describe() << " - " << std::string(buf_, len);
VLOG(4) << n_ << " left";
++pongRecvd_;
scheduleTimeout(1000);
}
void onReadError(const folly::AsyncSocketException& ex) noexcept override {
LOG(ERROR) << ex.what();
// Start listening for next PONG
socket_->resumeRead(this);
}
void onReadClosed() noexcept override {
CHECK(false) << "We unregister reads before closing";
}
void timeoutExpired() noexcept override {
LOG(INFO) << "Timeout expired";
sendPing();
}
protected:
folly::EventBase* const evb_{nullptr};
folly::SocketAddress server_;
std::unique_ptr<folly::AsyncUDPSocket> socket_;
private:
int pongRecvd_{0};
int n_{0};
char buf_[1024];
};
} // namespace
int main(int argc, char* argv[]) {
folly::init(&argc, &argv, false);
if (!FLAGS_ts_eth.empty()) {
int fd = ::socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
strncpy(ifr.ifr_name, FLAGS_ts_eth.c_str(), sizeof(ifr.ifr_name) - 1);
struct hwtstamp_config hwc;
hwc.flags = 0;
hwc.tx_type = HWTSTAMP_TX_OFF;
hwc.rx_filter = HWTSTAMP_FILTER_ALL;
ifr.ifr_data = (char*)&hwc;
int ret = ::ioctl(fd, SIOCSHWTSTAMP, &ifr);
if (ret) {
auto copy = errno;
LOG(ERROR) << "ioctl(SIOCSHWTSTAMP) failed with " << copy << ":"
<< folly::errnoStr(copy) << std::endl;
} else {
LOG(INFO) << "ioctl(SIOCSHWTSTAMP) success";
}
::close(fd);
}
std::unique_ptr<std::thread> serverThread;
std::unique_ptr<std::thread> clientThread;
std::unique_ptr<UDPServer> server;
std::unique_ptr<UDPClient> client;
std::unique_ptr<folly::EventBase> serverEvb;
std::unique_ptr<folly::EventBase> clientEvb;
folly::SocketAddress clientAddr(FLAGS_ts_client_addr, 0);
folly::SocketAddress serverAddr(FLAGS_ts_server_addr, FLAGS_ts_port);
if (FLAGS_ts_server) {
serverEvb = std::make_unique<folly::EventBase>();
server = std::make_unique<UDPServer>(serverEvb.get(), serverAddr, 4);
serverThread =
std::make_unique<std::thread>([&]() { serverEvb->loopForever(); });
serverEvb->waitUntilRunning();
serverEvb->runInEventBaseThreadAndWait([&]() { server->start(); });
}
if (FLAGS_ts_client) {
clientEvb = std::make_unique<folly::EventBase>();
client = std::make_unique<UDPClient>(clientEvb.get());
clientThread =
std::make_unique<std::thread>([&]() { clientEvb->loopForever(); });
clientEvb->waitUntilRunning();
clientEvb->runInEventBaseThread(
[&]() { client->start(clientAddr, serverAddr, FLAGS_ts_num); });
clientThread->join();
}
if (serverThread) {
serverThread->join();
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment