Commit 7a2b83b4 authored by Efrat Lewis's avatar Efrat Lewis Committed by Facebook GitHub Bot

AsyncSocket facility to read ancillary data

Summary: Defined new callback - ReadAncillaryDataCallback, which enables getting ancillary data when receive a message using recvmsg API.

Reviewed By: cgrushko

Differential Revision: D24246707

fbshipit-source-id: 3363b5fafe8d370cf5560afe00476fc8ea723e7a
parent a2000975
...@@ -395,6 +395,7 @@ void AsyncSocket::init() { ...@@ -395,6 +395,7 @@ void AsyncSocket::init() {
maxReadsPerEvent_ = 16; maxReadsPerEvent_ = 16;
connectCallback_ = nullptr; connectCallback_ = nullptr;
errMessageCallback_ = nullptr; errMessageCallback_ = nullptr;
readAncillaryDataCallback_ = nullptr;
readCallback_ = nullptr; readCallback_ = nullptr;
writeReqHead_ = nullptr; writeReqHead_ = nullptr;
writeReqTail_ = nullptr; writeReqTail_ = nullptr;
...@@ -640,6 +641,7 @@ void AsyncSocket::connect( ...@@ -640,6 +641,7 @@ void AsyncSocket::connect(
// yet, so we don't have to register for any events at the moment. // yet, so we don't have to register for any events at the moment.
VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this; VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
assert(errMessageCallback_ == nullptr); assert(errMessageCallback_ == nullptr);
assert(readAncillaryDataCallback_ == nullptr);
assert(readCallback_ == nullptr); assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr); assert(writeReqHead_ == nullptr);
if (state_ != StateEnum::FAST_OPEN) { if (state_ != StateEnum::FAST_OPEN) {
...@@ -808,6 +810,19 @@ AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const { ...@@ -808,6 +810,19 @@ AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
return errMessageCallback_; return errMessageCallback_;
} }
void AsyncSocket::setReadAncillaryDataCB(ReadAncillaryDataCallback* callback) {
VLOG(6) << "AsyncSocket::setReadAncillaryDataCB() this=" << this
<< ", fd=" << fd_ << ", callback=" << callback
<< ", state=" << state_;
readAncillaryDataCallback_ = callback;
}
AsyncSocket::ReadAncillaryDataCallback*
AsyncSocket::getReadAncillaryDataCallback() const {
return readAncillaryDataCallback_;
}
void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) { void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
sendMsgParamCallback_ = callback; sendMsgParamCallback_ = callback;
} }
...@@ -1951,7 +1966,40 @@ AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) { ...@@ -1951,7 +1966,40 @@ AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
return ReadResult(len); return ReadResult(len);
} }
ssize_t bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT); ssize_t bytes = 0;
// No callback to read ancillary data was set
if (readAncillaryDataCallback_ == nullptr) {
bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT);
} else {
struct msghdr msg;
struct iovec iov;
// Ancillary data buffer and length
msg.msg_control =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().data();
msg.msg_controllen =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().size();
// Dest address info
msg.msg_name = nullptr;
msg.msg_namelen = 0;
// Array of data buffers (scatter/gather)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// Data buffer pointer and length
iov.iov_base = *buf;
iov.iov_len = *buflen;
bytes = netops::recvmsg(fd_, &msg, 0);
if (bytes > 0) {
readAncillaryDataCallback_->ancillaryData(msg);
}
}
if (bytes < 0) { if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more data to read right now. // No more data to read right now.
......
...@@ -169,6 +169,28 @@ class AsyncSocket : public AsyncTransport { ...@@ -169,6 +169,28 @@ class AsyncSocket : public AsyncTransport {
virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0; virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
}; };
class ReadAncillaryDataCallback {
public:
virtual ~ReadAncillaryDataCallback() = default;
/**
* ancillaryData() will be invoked when we read a buffer
* from the socket together with the ancillary data.
*
* @param msgh Reference to msghdr structure describing
* a message read together with the data buffer associated
* with the socket.
*/
virtual void ancillaryData(struct msghdr& msgh) noexcept = 0;
/**
* getAncillaryDataCtrlBuffer() will be invoked in order to fill the
* ancillary data buffer when it is received.
* getAncillaryDataCtrlBuffer will never return nullptr.
*/
virtual folly::MutableByteRange getAncillaryDataCtrlBuffer() = 0;
};
class SendMsgParamsCallback { class SendMsgParamsCallback {
public: public:
virtual ~SendMsgParamsCallback() = default; virtual ~SendMsgParamsCallback() = default;
...@@ -499,6 +521,23 @@ class AsyncSocket : public AsyncTransport { ...@@ -499,6 +521,23 @@ class AsyncSocket : public AsyncTransport {
*/ */
virtual ErrMessageCallback* getErrMessageCallback() const; virtual ErrMessageCallback* getErrMessageCallback() const;
/**
* Set a pointer to ReadAncillaryDataCallback implementation which will
* be invoked with the ancillary data when we read a buffer from the
* associated socket.
* ReadAncillaryDataCallback is implemented only for platforms with
* kernel timestamp support.
*
*/
virtual void setReadAncillaryDataCB(ReadAncillaryDataCallback* callback);
/**
* Get a pointer to ReadAncillaryDataCallback implementation currently
* registered with this socket.
*
*/
virtual ReadAncillaryDataCallback* getReadAncillaryDataCallback() const;
/** /**
* Set a pointer to SendMsgParamsCallback implementation which * Set a pointer to SendMsgParamsCallback implementation which
* will be used to form ::sendmsg() system call parameters * will be used to form ::sendmsg() system call parameters
...@@ -1364,6 +1403,8 @@ class AsyncSocket : public AsyncTransport { ...@@ -1364,6 +1403,8 @@ class AsyncSocket : public AsyncTransport {
ConnectCallback* connectCallback_; ///< ConnectCallback ConnectCallback* connectCallback_; ///< ConnectCallback
ErrMessageCallback* errMessageCallback_; ///< TimestampCallback ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
ReadAncillaryDataCallback*
readAncillaryDataCallback_; ///< AncillaryDataCallback
SendMsgParamsCallback* ///< Callback for retrieving SendMsgParamsCallback* ///< Callback for retrieving
sendMsgParamCallback_; ///< ::sendmsg() parameters sendMsgParamCallback_; ///< ::sendmsg() parameters
ReadCallback* readCallback_; ///< ReadCallback ReadCallback* readCallback_; ///< ReadCallback
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h> #include <sys/types.h>
#include <time.h>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <thread> #include <thread>
...@@ -3036,6 +3037,9 @@ TEST(AsyncSocketTest, TestEvbDetachThenClose) { ...@@ -3036,6 +3037,9 @@ TEST(AsyncSocketTest, TestEvbDetachThenClose) {
/* copied from include/uapi/linux/net_tstamp.h */ /* copied from include/uapi/linux/net_tstamp.h */
/* SO_TIMESTAMPING gets an integer bit field comprised of these values */ /* SO_TIMESTAMPING gets an integer bit field comprised of these values */
enum SOF_TIMESTAMPING { enum SOF_TIMESTAMPING {
SOF_TIMESTAMPING_TX_SOFTWARE = (1 << 1),
SOF_TIMESTAMPING_RX_HARDWARE = (1 << 2),
SOF_TIMESTAMPING_RX_SOFTWARE = (1 << 3),
SOF_TIMESTAMPING_SOFTWARE = (1 << 4), SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
SOF_TIMESTAMPING_OPT_ID = (1 << 7), SOF_TIMESTAMPING_OPT_ID = (1 << 7),
SOF_TIMESTAMPING_TX_SCHED = (1 << 8), SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
...@@ -4254,3 +4258,105 @@ TEST(AsyncSocketTest, QueueTimeout) { ...@@ -4254,3 +4258,105 @@ TEST(AsyncSocketTest, QueueTimeout) {
// Since the second message is expired, it should NOT be dequeued // Since the second message is expired, it should NOT be dequeued
EXPECT_EQ(connectionEventCb.getConnectionDequeuedByAcceptCallback(), 1); EXPECT_EQ(connectionEventCb.getConnectionDequeuedByAcceptCallback(), 1);
} }
class TestRXTimestampsCallback
: public folly::AsyncSocket::ReadAncillaryDataCallback {
public:
TestRXTimestampsCallback() {}
void ancillaryData(struct msghdr& msgh) noexcept override {
struct cmsghdr* cmsg;
for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SO_TIMESTAMPING) {
continue;
}
callCount_++;
timespec* ts = (struct timespec*)CMSG_DATA(cmsg);
actualRxTimestampSec_ = ts[0].tv_sec;
}
}
folly::MutableByteRange getAncillaryDataCtrlBuffer() override {
return folly::MutableByteRange(ancillaryDataCtrlBuffer_);
}
uint32_t callCount_{0};
long actualRxTimestampSec_{0};
private:
std::array<uint8_t, 1024> ancillaryDataCtrlBuffer_;
};
/**
* Test read ancillary data callback
*/
TEST(AsyncSocketTest, readAncillaryData) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 1);
LOG(INFO) << "Client socket fd=" << socket->getNetworkSocket();
// Enable rx timestamp notifications
ASSERT_NE(socket->getNetworkSocket(), NetworkSocket());
int flags = SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_RX_SOFTWARE |
SOF_TIMESTAMPING_RX_HARDWARE;
SocketOptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
EXPECT_EQ(tstampingOpt.apply(socket->getNetworkSocket(), flags), 0);
// Accept the connection.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
LOG(INFO) << "Server socket fd=" << acceptedSocket->getNetworkSocket();
// Wait for connection
evb.loop();
ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
TestRXTimestampsCallback rxcb;
// Set read callback
ReadCallback rcb(100);
socket->setReadCB(&rcb);
// Get the timestamp when the message was write
struct timespec currentTime;
clock_gettime(CLOCK_REALTIME, &currentTime);
long writeTimestampSec = currentTime.tv_sec;
// write bytes from server (acceptedSocket) to client (socket).
std::vector<uint8_t> wbuf(128, 'a');
acceptedSocket->write(wbuf.data(), wbuf.size());
// Wait for reading to complete.
evb.loopOnce();
ASSERT_NE(rcb.buffers.size(), 0);
// Verify that if the callback is not set, it will not be called
ASSERT_EQ(rxcb.callCount_, 0);
// Set up rx timestamp callbacks
socket->setReadAncillaryDataCB(&rxcb);
acceptedSocket->write(wbuf.data(), wbuf.size());
// Wait for reading to complete.
evb.loopOnce();
ASSERT_NE(rcb.buffers.size(), 0);
// Verify that after setting callback, the callback was called
ASSERT_NE(rxcb.callCount_, 0);
// Compare the received timestamp is within an expected range
clock_gettime(CLOCK_REALTIME, &currentTime);
ASSERT_TRUE(rxcb.actualRxTimestampSec_ <= currentTime.tv_sec);
ASSERT_TRUE(rxcb.actualRxTimestampSec_ >= writeTimestampSec);
// Close both sockets
acceptedSocket->close();
socket->close();
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment