Commit 73ffab43 authored by Subodh Iyengar's avatar Subodh Iyengar Committed by Facebook Github Bot

call the correct method when socket is connected

Summary:
When the udp socket is connected we shouldn't supply the peer's address. This changes it so that
we can use write() methods in AsyncUDPSocket when the socket is connected.

Differential Revision: D15384587

fbshipit-source-id: ad26e2d04a7303c7ff9e25f32422d7c56e4283e5
parent d2005259
......@@ -272,13 +272,21 @@ ssize_t AsyncUDPSocket::writev(
size_t iovec_len,
int gso) {
CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
struct msghdr msg;
msg.msg_name = reinterpret_cast<void*>(&addrStorage);
msg.msg_namelen = address.getActualSize();
if (!connected_) {
msg.msg_name = reinterpret_cast<void*>(&addrStorage);
msg.msg_namelen = address.getActualSize();
} else {
if (connectedAddress_ != address) {
errno = ENOTSUP;
return -1;
}
msg.msg_name = nullptr;
msg.msg_namelen = 0;
}
msg.msg_iov = const_cast<struct iovec*>(vec);
msg.msg_iovlen = iovec_len;
msg.msg_control = nullptr;
......@@ -533,8 +541,13 @@ int AsyncUDPSocket::connect(const folly::SocketAddress& address) {
CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
return netops::connect(
int ret = netops::connect(
fd_, reinterpret_cast<sockaddr*>(&addrStorage), address.getActualSize());
if (ret == 0) {
connected_ = true;
connectedAddress_ = address;
}
return ret;
}
void AsyncUDPSocket::handleRead() noexcept {
......
......@@ -348,6 +348,10 @@ class AsyncUDPSocket : public EventHandler {
// Temp space to receive client address
folly::SocketAddress clientAddress_;
// If the socket is connected.
folly::SocketAddress connectedAddress_;
bool connected_{false};
bool reuseAddr_{false};
bool reusePort_{false};
int rcvBuf_{0};
......
......@@ -222,7 +222,10 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
}
virtual void writePing(std::unique_ptr<folly::IOBuf> buf) {
socket_->write(server_, std::move(buf));
auto ret = socket_->write(server_, std::move(buf));
if (ret == -1) {
error_ = true;
}
}
void getReadBuffer(void** buf, size_t* len) noexcept override {
......@@ -271,12 +274,17 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
connectAddr_ = connectAddr;
}
bool error() const {
return error_;
}
protected:
folly::Optional<folly::SocketAddress> connectAddr_;
EventBase* const evb_{nullptr};
folly::SocketAddress server_;
std::unique_ptr<AsyncUDPSocket> socket_;
bool error_{false};
private:
int pongRecvd_{0};
......@@ -285,45 +293,6 @@ class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
char buf_[1024];
};
class ConnectedWriteUDPClient : public UDPClient {
public:
~ConnectedWriteUDPClient() override = default;
ConnectedWriteUDPClient(EventBase* evb) : UDPClient(evb) {}
// When the socket is connected you don't need to supply the address to send
// msg. This will test that connect worked.
void writePing(std::unique_ptr<folly::IOBuf> buf) override {
iovec vec[16];
size_t iovec_len =
buf->fillIov(vec, sizeof(vec) / sizeof(vec[0])).numIovecs;
if (UNLIKELY(iovec_len == 0)) {
buf->coalesce();
vec[0].iov_base = const_cast<uint8_t*>(buf->data());
vec[0].iov_len = buf->length();
iovec_len = 1;
}
struct msghdr msg;
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = const_cast<struct iovec*>(vec);
msg.msg_iovlen = iovec_len;
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_flags = 0;
ssize_t ret = folly::netops::sendmsg(socket_->getNetworkSocket(), &msg, 0);
if (ret == -1) {
if (errno != EAGAIN || errno != EWOULDBLOCK) {
throw folly::AsyncSocketException(
folly::AsyncSocketException::NOT_OPEN, "WriteFail", errno);
}
}
connect();
}
};
class AsyncSocketIntegrationTest : public Test {
public:
void SetUp() override {
......@@ -356,8 +325,8 @@ class AsyncSocketIntegrationTest : public Test {
}
std::unique_ptr<UDPClient> performPingPongTest(
folly::Optional<folly::SocketAddress> connectedAddress,
bool useConnectedWrite);
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress);
folly::EventBase sevb;
folly::EventBase cevb;
......@@ -367,17 +336,11 @@ class AsyncSocketIntegrationTest : public Test {
};
std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
folly::Optional<folly::SocketAddress> connectedAddress,
bool useConnectedWrite) {
if (useConnectedWrite) {
CHECK(connectedAddress.hasValue());
client = std::make_unique<ConnectedWriteUDPClient>(&cevb);
folly::SocketAddress writeAddress,
folly::Optional<folly::SocketAddress> connectedAddress) {
client = std::make_unique<UDPClient>(&cevb);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
} else {
client = std::make_unique<UDPClient>(&cevb);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
}
}
// Start event loop in a separate thread
auto clientThread = std::thread([this]() { cevb.loopForever(); });
......@@ -386,7 +349,7 @@ std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
cevb.waitUntilRunning();
// Send ping
cevb.runInEventBaseThread([&]() { client->start(server->address(), 100); });
cevb.runInEventBaseThread([&]() { client->start(writeAddress, 100); });
// Wait for client to finish
clientThread.join();
......@@ -395,7 +358,7 @@ std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
TEST_F(AsyncSocketIntegrationTest, PingPong) {
startServer();
auto pingClient = performPingPongTest(folly::none, false);
auto pingClient = performPingPongTest(server->address(), folly::none);
// This should succeed.
ASSERT_GT(pingClient->pongRecvd(), 0);
}
......@@ -403,7 +366,7 @@ TEST_F(AsyncSocketIntegrationTest, PingPong) {
TEST_F(AsyncSocketIntegrationTest, ConnectedPingPong) {
server->setChangePortForWrites(false);
startServer();
auto pingClient = performPingPongTest(server->address(), false);
auto pingClient = performPingPongTest(server->address(), server->address());
// This should succeed
ASSERT_GT(pingClient->pongRecvd(), 0);
}
......@@ -411,7 +374,7 @@ TEST_F(AsyncSocketIntegrationTest, ConnectedPingPong) {
TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongServerWrongAddress) {
server->setChangePortForWrites(true);
startServer();
auto pingClient = performPingPongTest(server->address(), false);
auto pingClient = performPingPongTest(server->address(), server->address());
// This should fail.
ASSERT_EQ(pingClient->pongRecvd(), 0);
}
......@@ -421,25 +384,21 @@ TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongClientWrongAddress) {
startServer();
folly::SocketAddress connectAddr(
server->address().getIPAddress(), server->address().getPort() + 1);
auto pingClient = performPingPongTest(connectAddr, false);
auto pingClient = performPingPongTest(server->address(), connectAddr);
// This should fail.
ASSERT_EQ(pingClient->pongRecvd(), 0);
EXPECT_TRUE(pingClient->error());
}
TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsg) {
TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongDifferentWriteAddress) {
server->setChangePortForWrites(false);
startServer();
auto pingClient = performPingPongTest(server->address(), true);
// This should succeed.
ASSERT_GT(pingClient->pongRecvd(), 0);
}
TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsgServerWrongAddr) {
server->setChangePortForWrites(true);
startServer();
auto pingClient = performPingPongTest(server->address(), true);
folly::SocketAddress connectAddr(
server->address().getIPAddress(), server->address().getPort() + 1);
auto pingClient = performPingPongTest(connectAddr, server->address());
// This should fail.
ASSERT_EQ(pingClient->pongRecvd(), 0);
EXPECT_TRUE(pingClient->error());
}
TEST_F(AsyncSocketIntegrationTest, PingPongPauseResumeListening) {
......@@ -447,12 +406,12 @@ TEST_F(AsyncSocketIntegrationTest, PingPongPauseResumeListening) {
// Exchange should not happen when paused.
server->pauseAccepting();
auto pausedClient = performPingPongTest(folly::none, false);
auto pausedClient = performPingPongTest(server->address(), folly::none);
ASSERT_EQ(pausedClient->pongRecvd(), 0);
// Exchange does occur after resuming.
server->resumeAccepting();
auto pingClient = performPingPongTest(folly::none, false);
auto pingClient = performPingPongTest(server->address(), folly::none);
ASSERT_GT(pingClient->pongRecvd(), 0);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment