Commit a3af6255 authored by Subodh Iyengar's avatar Subodh Iyengar Committed by Facebook Github Bot

Add notification read callbacks to AsyncUDPSocket

Summary:
Add a mode to read callback which only notifies but does not call getReadBuffer.

This also adds a readmsg api for notification callbacks to invoke

This is useful for callbacks that want to perform their own logic of reading from the socket, for example
a callback might want to perform multiple read calls per callback to save on CPU of epoll

Differential Revision: D18797964

fbshipit-source-id: 86bf1553078a20ea16f56f0c404887fca5050065
parent e0a9c732
......@@ -451,6 +451,10 @@ int AsyncUDPSocket::writeImpl(
return ret;
}
ssize_t AsyncUDPSocket::recvmsg(struct msghdr* msg, int flags) {
return netops::recvmsg(fd_, msg, flags);
}
void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
CHECK(!readCallback_) << "Another read callback already installed";
CHECK_NE(NetworkSocket(), fd_)
......@@ -590,6 +594,9 @@ void AsyncUDPSocket::handleRead() noexcept {
// The socket may have been closed by the error callbacks.
return;
}
if (readCallback_->shouldOnlyNotify()) {
return readCallback_->onNotifyDataAvailable();
}
readCallback_->getReadBuffer(&buf, &len);
if (buf == nullptr || len == 0) {
......
......@@ -59,6 +59,24 @@ class AsyncUDPSocket : public EventHandler {
size_t len,
bool truncated) noexcept = 0;
/**
* Notifies when data is available. This is only invoked when
* shouldNotifyOnly() returns true.
*/
virtual void onNotifyDataAvailable() noexcept {}
/**
* Returns whether or not the read callback should only notify
* but not call getReadBuffer.
* If shouldNotifyOnly() returns true, AsyncUDPSocket will invoke
* onNotifyDataAvailable() instead of getReadBuffer().
* If shouldNotifyOnly() returns false, AsyncUDPSocket will invoke
* getReadBuffer() and onDataAvailable().
*/
virtual bool shouldOnlyNotify() {
return false;
}
/**
* Invoked when there is an error reading from the socket.
*
......@@ -177,6 +195,8 @@ class AsyncUDPSocket : public EventHandler {
const struct iovec* vec,
size_t veclen);
virtual ssize_t recvmsg(struct msghdr* msg, int flags);
/**
* Start reading datagrams
*/
......
......@@ -172,6 +172,8 @@ class UDPServer {
class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
public:
~UDPClient() override = default;
explicit UDPClient(EventBase* evb) : AsyncTimeout(evb), evb_(evb) {}
void start(const folly::SocketAddress& server, int n) {
......@@ -297,6 +299,57 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
char buf_[1024];
};
class UDPNotifyClient : public UDPClient {
public:
~UDPNotifyClient() override = default;
explicit UDPNotifyClient(EventBase* evb) : UDPClient(evb) {}
bool shouldOnlyNotify() override {
return true;
}
void onNotifyDataAvailable() noexcept override {
notifyInvoked = true;
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
void* buf{};
size_t len{};
getReadBuffer(&buf, &len);
iovec vec;
vec.iov_base = buf;
vec.iov_len = len;
struct sockaddr_storage addrStorage;
socklen_t addrLen = sizeof(addrStorage);
memset(&addrStorage, 0, size_t(addrLen));
auto rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = socket_->address().getFamily();
msg.msg_name = rawAddr;
msg.msg_namelen = addrLen;
msg.msg_iov = &vec;
msg.msg_iovlen = 1;
ssize_t ret = socket_->recvmsg(&msg, 0);
if (ret < 0) {
if (errno != EAGAIN || errno != EWOULDBLOCK) {
onReadError(folly::AsyncSocketException(
folly::AsyncSocketException::NETWORK_ERROR, "error"));
return;
}
}
SocketAddress addr;
addr.setFromSockaddr(rawAddr, addrLen);
onDataAvailable(addr, size_t(read), false);
}
bool notifyInvoked{false};
};
class AsyncSocketIntegrationTest : public Test {
public:
void SetUp() override {
......@@ -332,17 +385,20 @@ class AsyncSocketIntegrationTest : public Test {
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress);
std::unique_ptr<UDPNotifyClient> performPingPongNotifyTest(
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress);
folly::EventBase sevb;
folly::EventBase cevb;
std::unique_ptr<std::thread> serverThread;
std::unique_ptr<UDPServer> server;
std::unique_ptr<UDPClient> client;
};
std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress) {
client = std::make_unique<UDPClient>(&cevb);
auto client = std::make_unique<UDPClient>(&cevb);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
}
......@@ -357,7 +413,29 @@ std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
// Wait for client to finish
clientThread.join();
return std::move(client);
return client;
}
std::unique_ptr<UDPNotifyClient>
AsyncSocketIntegrationTest::performPingPongNotifyTest(
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress) {
auto client = std::make_unique<UDPNotifyClient>(&cevb);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
}
// Start event loop in a separate thread
auto clientThread = std::thread([this]() { cevb.loopForever(); });
// Wait for event loop to start
cevb.waitUntilRunning();
// Send ping
cevb.runInEventBaseThread([&]() { client->start(writeAddress, 100); });
// Wait for client to finish
clientThread.join();
return client;
}
TEST_F(AsyncSocketIntegrationTest, PingPong) {
......@@ -367,6 +445,14 @@ TEST_F(AsyncSocketIntegrationTest, PingPong) {
ASSERT_GT(pingClient->pongRecvd(), 0);
}
TEST_F(AsyncSocketIntegrationTest, PingPongNotify) {
startServer();
auto pingClient = performPingPongNotifyTest(server->address(), folly::none);
// This should succeed.
ASSERT_GT(pingClient->pongRecvd(), 0);
ASSERT_TRUE(pingClient->notifyInvoked);
}
TEST_F(AsyncSocketIntegrationTest, ConnectedPingPong) {
server->setChangePortForWrites(false);
startServer();
......@@ -446,6 +532,12 @@ class MockUDPReadCallback : public AsyncUDPSocket::ReadCallback {
getReadBuffer_(buf, len);
}
MOCK_METHOD0(shouldOnlyNotify, bool());
MOCK_METHOD0(onNotifyDataAvailable_, void());
void onNotifyDataAvailable() noexcept override {
onNotifyDataAvailable_();
}
MOCK_METHOD3(
onDataAvailable_,
void(const folly::SocketAddress&, size_t, bool));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment