Commit 244b6f44 authored by Junqi Wang's avatar Junqi Wang Committed by Facebook GitHub Bot

Fix tsan warning in AsyncUDPSocket test

Summary:
tsan reports warning in the test because the server socket writes in
a thread but closes in another thread without synchronization. This was by
design and is not a bug since implicit synchronization is done by joining client
thread, but this isn't visible to tsan. The fix is to use a port reused new
server socket to write response to client.

Reviewed By: yfeldblum

Differential Revision: D24675113

fbshipit-source-id: bdab8a5d5b6f85aa36ba84a6299e4457710625de
parent aaf2076d
......@@ -42,15 +42,22 @@ using OnDataAvailableParams =
class UDPAcceptor : public AsyncUDPServerSocket::Callback {
public:
UDPAcceptor(EventBase* evb, int n, bool changePortForWrites)
: evb_(evb), n_(n), changePortForWrites_(changePortForWrites) {}
UDPAcceptor(
EventBase* evb,
int n,
bool changePortForWrites,
const folly::SocketAddress& serverAddress)
: evb_(evb),
n_(n),
changePortForWrites_(changePortForWrites),
serverAddress_(serverAddress) {}
void onListenStarted() noexcept override {}
void onListenStopped() noexcept override {}
void onDataAvailable(
std::shared_ptr<folly::AsyncUDPSocket> socket,
std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool truncated,
......@@ -63,16 +70,18 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
<< "(trun:" << truncated << ") from " << client.describe() << " - "
<< lastMsg_;
sendPong(socket);
sendPong();
}
void sendPong(std::shared_ptr<folly::AsyncUDPSocket> socket) noexcept {
void sendPong() noexcept {
try {
auto writeSocket = socket;
auto writeSocket = std::make_shared<folly::AsyncUDPSocket>(evb_);
if (changePortForWrites_) {
writeSocket = std::make_shared<folly::AsyncUDPSocket>(evb_);
writeSocket->setReuseAddr(false);
writeSocket->bind(folly::SocketAddress("127.0.0.1", 0));
} else {
writeSocket->setReusePort(true);
writeSocket->bind(serverAddress_);
}
writeSocket->write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
} catch (const std::exception& ex) {
......@@ -86,6 +95,7 @@ class UDPAcceptor : public AsyncUDPServerSocket::Callback {
// Whether to create a new port per write.
bool changePortForWrites_{true};
folly::SocketAddress serverAddress_;
folly::SocketAddress lastClient_;
std::string lastMsg_;
};
......@@ -99,6 +109,7 @@ class UDPServer {
CHECK(evb_->isInEventBaseThread());
socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
socket_->setReusePort(true);
try {
socket_->bind(addr_);
......@@ -113,7 +124,8 @@ class UDPServer {
// Add numWorkers thread
int i = 0;
for (auto& evb : evbs_) {
acceptors_.emplace_back(&evb, i, changePortForWrites_);
acceptors_.emplace_back(
&evb, i, changePortForWrites_, socket_->address());
std::thread t([&]() { evb.loopForever(); });
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment