Commit b02ec278 authored by Subodh Iyengar's avatar Subodh Iyengar Committed by Facebook Github Bot

Add detachEvb to AsyncUDPSocket

Summary:
Detach evb from asyncudpsocket to be able to
switch the socket between threads

Reviewed By: yfeldblum

Differential Revision: D8131599

fbshipit-source-id: 5d2a6f9b43b72076da1f6c09ce37b3c0bcd43db2
parent e2a84628
...@@ -465,4 +465,19 @@ bool AsyncUDPSocket::updateRegistration() noexcept { ...@@ -465,4 +465,19 @@ bool AsyncUDPSocket::updateRegistration() noexcept {
return registerHandler(uint16_t(flags | PERSIST)); return registerHandler(uint16_t(flags | PERSIST));
} }
void AsyncUDPSocket::detachEventBase() {
DCHECK(eventBase_ && eventBase_->isInEventBaseThread());
registerHandler(uint16_t(NONE));
eventBase_ = nullptr;
EventHandler::detachEventBase();
}
void AsyncUDPSocket::attachEventBase(folly::EventBase* evb) {
DCHECK(!eventBase_);
DCHECK(evb && evb->isInEventBaseThread());
eventBase_ = evb;
EventHandler::attachEventBase(evb);
updateRegistration();
}
} // namespace folly } // namespace folly
...@@ -248,6 +248,10 @@ class AsyncUDPSocket : public EventHandler { ...@@ -248,6 +248,10 @@ class AsyncUDPSocket : public EventHandler {
return fd_ != -1; return fd_ != -1;
} }
virtual void detachEventBase();
virtual void attachEventBase(folly::EventBase* evb);
protected: protected:
virtual ssize_t sendmsg(int socket, const struct msghdr* message, int flags) { virtual ssize_t sendmsg(int socket, const struct msghdr* message, int flags) {
return ::sendmsg(socket, message, flags); return ::sendmsg(socket, message, flags);
......
...@@ -581,3 +581,60 @@ TEST_F(AsyncUDPSocketTest, TestBound) { ...@@ -581,3 +581,60 @@ TEST_F(AsyncUDPSocketTest, TestBound) {
socket.bind(address); socket.bind(address);
EXPECT_TRUE(socket.isBound()); EXPECT_TRUE(socket.isBound());
} }
TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbWithReadCallback) {
socket_->resumeRead(&readCb);
EXPECT_TRUE(socket_->isHandlerRegistered());
socket_->detachEventBase();
EXPECT_FALSE(socket_->isHandlerRegistered());
socket_->attachEventBase(&evb_);
EXPECT_TRUE(socket_->isHandlerRegistered());
}
TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbNoReadCallback) {
EXPECT_FALSE(socket_->isHandlerRegistered());
socket_->detachEventBase();
EXPECT_FALSE(socket_->isHandlerRegistered());
socket_->attachEventBase(&evb_);
EXPECT_FALSE(socket_->isHandlerRegistered());
}
TEST_F(AsyncUDPSocketTest, TestDetachAttach) {
folly::EventBase evb2;
auto writeSocket = std::make_shared<folly::AsyncUDPSocket>(&evb_);
folly::SocketAddress address("127.0.0.1", 0);
writeSocket->bind(address);
std::array<uint8_t, 1024> data;
std::atomic<int> packetsRecvd{0};
EXPECT_CALL(readCb, getReadBuffer_(_, _))
.WillRepeatedly(Invoke([&](void** buf, size_t* len) {
*buf = data.data();
*len = 1024;
}));
EXPECT_CALL(readCb, onDataAvailable_(_, _, _))
.WillRepeatedly(Invoke(
[&](const folly::SocketAddress&, size_t, bool) { packetsRecvd++; }));
socket_->resumeRead(&readCb);
writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
while (packetsRecvd != 1) {
evb_.loopOnce();
}
EXPECT_EQ(packetsRecvd, 1);
socket_->detachEventBase();
std::thread t([&] { evb2.loopForever(); });
evb2.runInEventBaseThreadAndWait([&] { socket_->attachEventBase(&evb2); });
writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
auto now = std::chrono::steady_clock::now();
while (packetsRecvd != 2 ||
std::chrono::steady_clock::now() <
now + std::chrono::milliseconds(10)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
evb2.runInEventBaseThread([&] {
socket_ = nullptr;
evb2.terminateLoopSoon();
});
t.join();
EXPECT_EQ(packetsRecvd, 2);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment