Commit df63a8e5 authored by Jorge Lopez Silva's avatar Jorge Lopez Silva Committed by Facebook Github Bot

Add pause/resume support to AsyncUDPServerSocket

Summary: Extend AsyncUDPServerSocket to support dynamically pausing/resuming reading from the underlying socket. This functionality already exists for AsyncServerSocket but not for UDP server sockets.

Reviewed By: djwatson

Differential Revision: D8358256

fbshipit-source-id: 7d5cf14b656ef9280747515b23cd0ae2899ec38b
parent 9a34add5
......@@ -52,6 +52,18 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
*/
virtual void onListenStopped() noexcept = 0;
/**
* Invoked when the server socket is paused. It is invoked in each
* acceptors/listeners event base thread.
*/
virtual void onListenPaused() noexcept {}
/**
* Invoked when the server socket is resumed. It is invoked in each
* acceptors/listeners event base thread.
*/
virtual void onListenResumed() noexcept {}
/**
* Invoked when a new packet is received
*/
......@@ -140,6 +152,32 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
return evb_;
}
/**
* Pauses accepting datagrams on the underlying socket.
*/
void pauseAccepting() {
socket_->pauseRead();
for (auto& listener : listeners_) {
auto callback = listener.second;
listener.first->runInEventBaseThread(
[callback]() mutable { callback->onListenPaused(); });
}
}
/**
* Starts accepting datagrams once again.
*/
void resumeAccepting() {
socket_->resumeRead(this);
for (auto& listener : listeners_) {
auto callback = listener.second;
listener.first->runInEventBaseThread(
[callback]() mutable { callback->onListenResumed(); });
}
}
private:
// AsyncUDPSocket::ReadCallback
void getReadBuffer(void** buf, size_t* len) noexcept override {
......
......@@ -138,6 +138,14 @@ class UDPServer {
}
}
void pauseAccepting() {
socket_->pauseAccepting();
}
void resumeAccepting() {
socket_->resumeAccepting();
}
// Whether writes from the UDP server should change the port for each message.
void setChangePortForWrites(bool changePortForWrites) {
changePortForWrites_ = changePortForWrites;
......@@ -338,7 +346,7 @@ class AsyncSocketIntegrationTest : public Test {
sevb.terminateLoopSoon();
});
// Wait for server thread to joib
// Wait for server thread to join
serverThread->join();
}
......@@ -429,6 +437,20 @@ TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsgServerWrongAddr) {
ASSERT_EQ(pingClient->pongRecvd(), 0);
}
TEST_F(AsyncSocketIntegrationTest, PingPongPauseResumeListening) {
startServer();
// Exchange should not happen when paused.
server->pauseAccepting();
auto pausedClient = performPingPongTest(folly::none, false);
ASSERT_EQ(pausedClient->pongRecvd(), 0);
// Exchange does occur after resuming.
server->resumeAccepting();
auto pingClient = performPingPongTest(folly::none, false);
ASSERT_GT(pingClient->pongRecvd(), 0);
}
class TestAsyncUDPSocket : public AsyncUDPSocket {
public:
explicit TestAsyncUDPSocket(EventBase* evb) : AsyncUDPSocket(evb) {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment