Commit c533bbd1 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Add AsyncUDPSocket support for sendmmsg

Summary: Add AsyncUDPSocket support for sendmmsg

Reviewed By: djwatson

Differential Revision: D13521601

fbshipit-source-id: 89382e18943e01012ff1e56a40f655d634a6e146
parent 0c1847c1
......@@ -22,6 +22,7 @@
#include <folly/portability/Sockets.h>
#include <folly/portability/Unistd.h>
#include <boost/preprocessor/control/if.hpp>
#include <errno.h>
// Due to the way kernel headers are included, this may or may not be defined.
......@@ -30,6 +31,12 @@
#define SO_REUSEPORT 15
#endif
#if FOLLY_HAVE_VLA
#define FOLLY_HAVE_VLA_01 1
#else
#define FOLLY_HAVE_VLA_01 0
#endif
namespace fsp = folly::portability::sockets;
namespace folly {
......@@ -304,6 +311,108 @@ ssize_t AsyncUDPSocket::writev(
size_t iovec_len) {
return writev(address, vec, iovec_len, 0);
}
/**
* Send the data in buffers to destination. Returns the return code from
* ::sendmmsg.
*/
int AsyncUDPSocket::writem(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count) {
int ret;
constexpr size_t kSmallSizeMax = 8;
if (count <= kSmallSizeMax) {
// suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla")
mmsghdr vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)];
FOLLY_POP_WARNING
ret = writeImpl(address, bufs, count, vec);
} else {
std::unique_ptr<mmsghdr[]> vec(new mmsghdr[count]);
ret = writeImpl(address, bufs, count, vec.get());
}
return ret;
}
void AsyncUDPSocket::fillMsgVec(
sockaddr_storage* addr,
socklen_t addr_len,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
struct mmsghdr* msgvec,
struct iovec* iov,
size_t iov_count) {
size_t remaining = iov_count;
size_t iov_pos = 0;
for (size_t i = 0; i < count; i++) {
// we can use remaining here to avoid calling countChainElements() again
size_t iovec_len = bufs[i]->fillIov(&iov[iov_pos], remaining).numIovecs;
remaining -= iovec_len;
auto& msg = msgvec[i].msg_hdr;
msg.msg_name = reinterpret_cast<void*>(addr);
msg.msg_namelen = addr_len;
msg.msg_iov = &iov[iov_pos];
msg.msg_iovlen = iovec_len;
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msgvec[i].msg_len = 0;
iov_pos += iovec_len;
}
}
int AsyncUDPSocket::writeImpl(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
struct mmsghdr* msgvec) {
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
size_t iov_count = 0;
for (size_t i = 0; i < count; i++) {
iov_count += bufs[i]->countChainElements();
}
int ret;
constexpr size_t kSmallSizeMax = 16;
if (iov_count <= kSmallSizeMax) {
// suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla")
iovec iov[BOOST_PP_IF(FOLLY_HAVE_VLA_01, iov_count, kSmallSizeMax)];
FOLLY_POP_WARNING
fillMsgVec(
&addrStorage,
address.getActualSize(),
bufs,
count,
msgvec,
iov,
iov_count);
ret = sendmmsg(fd_, msgvec, count, 0);
} else {
std::unique_ptr<iovec[]> iov(new iovec[iov_count]);
fillMsgVec(
&addrStorage,
address.getActualSize(),
bufs,
count,
msgvec,
iov.get(),
iov_count);
ret = sendmmsg(fd_, msgvec, count, 0);
}
return ret;
}
void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
CHECK(!readCallback_) << "Another read callback already installed";
CHECK_NE(NetworkSocket(), fd_)
......
......@@ -138,6 +138,17 @@ class AsyncUDPSocket : public EventHandler {
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf);
/**
* Send the data in buffers to destination. Returns the return code from
* ::sendmmsg.
* bufs is an array of std::unique_ptr<folly::IOBuf>
* of size num
*/
virtual int writem(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t num);
/**
* Send the data in buffer to destination. Returns the return code from
* ::sendmsg.
......@@ -286,6 +297,29 @@ class AsyncUDPSocket : public EventHandler {
return netops::sendmsg(socket, message, flags);
}
virtual int sendmmsg(
NetworkSocket socket,
struct mmsghdr* msgvec,
unsigned int vlen,
int flags) {
return netops::sendmmsg(socket, msgvec, vlen, flags);
}
void fillMsgVec(
sockaddr_storage* addr,
socklen_t addr_len,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
struct mmsghdr* msgvec,
struct iovec* iov,
size_t iov_count);
virtual int writeImpl(
const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>* bufs,
size_t count,
struct mmsghdr* msgvec);
size_t handleErrMessages() noexcept;
void failErrMessageRead(const AsyncSocketException& ex);
......
/*
* Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <numeric>
#include <thread>
#include <folly/Conv.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/AsyncUDPServerSocket.h>
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
using folly::AsyncTimeout;
using folly::AsyncUDPServerSocket;
using folly::AsyncUDPSocket;
using folly::EventBase;
using folly::IOBuf;
using folly::SocketAddress;
using namespace testing;
using SizeVec = std::vector<size_t>;
using IOBufVec = std::vector<std::unique_ptr<folly::IOBuf>>;
struct TestData {
explicit TestData(const SizeVec& in) : in_(in) {}
bool checkOut() const {
return (outNum_ == in_.size());
}
char getCharAt(size_t pos) {
if (pos < in_.size()) {
return static_cast<char>(in_[pos] % 256);
}
return 0;
}
bool appendOut(const char* data, size_t len) {
outNum_++;
if (outNum_ == in_.size()) {
return true;
}
// check the size
CHECK_EQ(len, in_[outNum_ - 1]);
// check the payload
char c = getCharAt(outNum_ - 1);
for (size_t i = 0; i < len; i++) {
CHECK_EQ(data[i], c);
}
return false;
}
IOBufVec getInBufs() {
if (!in_.size()) {
return IOBufVec();
}
IOBufVec ret;
for (size_t i = 0; i < in_.size(); i++) {
std::string str(in_[i], getCharAt(i));
std::unique_ptr<folly::IOBuf> buf =
folly::IOBuf::copyBuffer(str.data(), str.size());
ret.emplace_back(std::move(buf));
}
return ret;
}
SizeVec in_;
size_t outNum_{0};
bool check_{true};
};
class UDPAcceptor : public AsyncUDPServerSocket::Callback {
public:
UDPAcceptor(EventBase* evb) : evb_(evb) {}
void onListenStarted() noexcept override {}
void onListenStopped() noexcept override {}
void onDataAvailable(
std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool /*unused*/) noexcept override {
// send pong
socket->write(client, data->clone());
}
private:
EventBase* const evb_{nullptr};
};
class UDPServer {
public:
UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
: evb_(evb), addr_(addr), evbs_(n) {}
void start() {
CHECK(evb_->isInEventBaseThread());
socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
try {
socket_->bind(addr_);
VLOG(4) << "Server listening on " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
acceptors_.reserve(evbs_.size());
threads_.reserve(evbs_.size());
// Add numWorkers thread
int i = 0;
for (auto& evb : evbs_) {
acceptors_.emplace_back(&evb);
std::thread t([&]() { evb.loopForever(); });
evb.waitUntilRunning();
socket_->addListener(&evb, &acceptors_[i]);
threads_.emplace_back(std::move(t));
++i;
}
socket_->listen();
}
folly::SocketAddress address() const {
return socket_->address();
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->close();
socket_.reset();
for (auto& evb : evbs_) {
evb.terminateLoopSoon();
}
for (auto& t : threads_) {
t.join();
}
}
void pauseAccepting() {
socket_->pauseAccepting();
}
void resumeAccepting() {
socket_->resumeAccepting();
}
private:
EventBase* const evb_{nullptr};
const folly::SocketAddress addr_;
std::unique_ptr<AsyncUDPServerSocket> socket_;
std::vector<std::thread> threads_;
std::vector<folly::EventBase> evbs_;
std::vector<UDPAcceptor> acceptors_;
};
class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
public:
explicit UDPClient(EventBase* evb, TestData& testData)
: AsyncTimeout(evb), evb_(evb), testData_(testData) {}
void start(const folly::SocketAddress& server) {
CHECK(evb_->isInEventBaseThread());
server_ = server;
socket_ = std::make_unique<AsyncUDPSocket>(evb_);
try {
socket_->bind(folly::SocketAddress("127.0.0.1", 0));
if (connectAddr_) {
connect();
}
VLOG(2) << "Client bound to " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
socket_->resumeRead(this);
// Start playing ping pong
sendPing();
}
void connect() {
int ret = socket_->connect(*connectAddr_);
if (ret != 0) {
throw folly::AsyncSocketException(
folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
}
VLOG(2) << "Client connected to address=" << *connectAddr_;
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->pauseRead();
socket_->close();
socket_.reset();
evb_->terminateLoopSoon();
}
void sendPing() {
scheduleTimeout(5);
auto bufs = testData_.getInBufs();
writePing(bufs);
}
virtual void writePing(IOBufVec& bufs) {
socket_->writem(server_, bufs.data(), bufs.size());
}
void getReadBuffer(void** buf, size_t* len) noexcept override {
*buf = buf_;
*len = sizeof(buf_);
}
void onDataAvailable(
const folly::SocketAddress& /*unused*/,
size_t len,
bool /*unused*/) noexcept override {
VLOG(0) << "Got " << len << " bytes";
if (testData_.appendOut(buf_, len)) {
shutdown();
}
}
void onReadError(const folly::AsyncSocketException& ex) noexcept override {
VLOG(4) << ex.what();
// Start listening for next PONG
socket_->resumeRead(this);
}
void onReadClosed() noexcept override {
CHECK(false) << "We unregister reads before closing";
}
void timeoutExpired() noexcept override {
VLOG(4) << "Timeout expired";
shutdown();
}
AsyncUDPSocket& getSocket() {
return *socket_;
}
void setShouldConnect(const folly::SocketAddress& connectAddr) {
connectAddr_ = connectAddr;
}
protected:
folly::Optional<folly::SocketAddress> connectAddr_;
EventBase* const evb_{nullptr};
folly::SocketAddress server_;
std::unique_ptr<AsyncUDPSocket> socket_;
private:
char buf_[2048];
TestData& testData_;
};
class AsyncSocketSendmmsgIntegrationTest : public Test {
public:
void SetUp() override {
server = std::make_unique<UDPServer>(
&sevb, folly::SocketAddress("127.0.0.1", 0), 1);
// Start event loop in a separate thread
serverThread =
std::make_unique<std::thread>([this]() { sevb.loopForever(); });
// Wait for event loop to start
sevb.waitUntilRunning();
}
void startServer() {
// Start the server
sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
LOG(INFO) << "Server listening=" << server->address();
}
void TearDown() override {
// Shutdown server
sevb.runInEventBaseThread([&]() {
server->shutdown();
sevb.terminateLoopSoon();
});
// Wait for server thread to join
serverThread->join();
}
std::unique_ptr<UDPClient> performPingPongTest(
TestData& testData,
folly::Optional<folly::SocketAddress> connectedAddress);
folly::EventBase sevb;
folly::EventBase cevb;
TestData* testData_{nullptr};
std::unique_ptr<std::thread> serverThread;
std::unique_ptr<UDPServer> server;
std::unique_ptr<UDPClient> client;
};
std::unique_ptr<UDPClient>
AsyncSocketSendmmsgIntegrationTest::performPingPongTest(
TestData& testData,
folly::Optional<folly::SocketAddress> connectedAddress) {
testData_ = &testData;
client = std::make_unique<UDPClient>(&cevb, testData);
if (connectedAddress) {
client->setShouldConnect(*connectedAddress);
}
// Start event loop in a separate thread
auto clientThread = std::thread([this]() { cevb.loopForever(); });
// Wait for event loop to start
cevb.waitUntilRunning();
// Send ping
cevb.runInEventBaseThread([&]() { client->start(server->address()); });
// Wait for client to finish
clientThread.join();
return std::move(client);
}
TEST_F(AsyncSocketSendmmsgIntegrationTest, PingPongRequest) {
SizeVec in{1, 2, 3, 4, 5, 8, 8, 9, 10, 11,
22, 33, 44, 55, 66, 77, 88, 99, 110, 120,
220, 320, 420, 520, 620, 720, 820, 920, 1020};
TestData testData(in);
startServer();
auto pingClient = performPingPongTest(testData, folly::none);
CHECK(testData.checkOut());
}
......@@ -366,6 +366,33 @@ ssize_t sendmsg(NetworkSocket socket, const msghdr* message, int flags) {
#endif
}
int sendmmsg(
NetworkSocket socket,
mmsghdr* msgvec,
unsigned int vlen,
int flags) {
#if FOLLY_HAVE_SENDMMSG
return wrapSocketFunction<int>(::sendmmsg, socket, msgvec, vlen, flags);
#else
// implement via sendmsg
for (unsigned int i = 0; i < vlen; i++) {
ssize_t ret = sendmsg(socket, &msgvec[i].msg_hdr, flags);
// in case of an error
// we return the number of msgs sent if > 0
// or an error if no msg was sent
if (ret < 0) {
if (i) {
return static_cast<int>(i);
}
return static_cast<int>(ret);
}
}
return static_cast<int>(vlen);
#endif
}
ssize_t sendto(
NetworkSocket s,
const void* buf,
......
......@@ -71,6 +71,15 @@
#define UDP_MAX_SEGMENTS (1 << 6UL)
#endif
#if (!__linux__) || (defined(__ANDROID__) && (__ANDROID_API__ < 21))
struct mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
#else
#define FOLLY_HAVE_SENDMMSG 1
#endif
#else
#include <WS2tcpip.h> // @manual
......@@ -98,6 +107,11 @@ struct msghdr {
int msg_flags;
};
struct mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
struct sockaddr_un {
sa_family_t sun_family;
char sun_path[108];
......@@ -164,6 +178,11 @@ ssize_t sendto(
const sockaddr* to,
socklen_t tolen);
ssize_t sendmsg(NetworkSocket socket, const msghdr* message, int flags);
int sendmmsg(
NetworkSocket socket,
mmsghdr* msgvec,
unsigned int vlen,
int flags);
int setsockopt(
NetworkSocket s,
int level,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment