Commit 6ee198b0 authored by Subodh Iyengar's avatar Subodh Iyengar Committed by Facebook Github Bot

Clang format AsyncUDPSocket

Summary:
Clang format files related to AsyncUDPSocket.
This helps keep changes consistent.

Reviewed By: yfeldblum, lnicco

Differential Revision: D7632865

fbshipit-source-id: 0761f0cfd06417334e680ca0880280d65480fe7e
parent ba208d71
...@@ -51,9 +51,10 @@ AsyncUDPSocket::~AsyncUDPSocket() { ...@@ -51,9 +51,10 @@ AsyncUDPSocket::~AsyncUDPSocket() {
void AsyncUDPSocket::bind(const folly::SocketAddress& address) { void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP); int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
if (socket == -1) { if (socket == -1) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, throw AsyncSocketException(
"error creating async udp socket", AsyncSocketException::NOT_OPEN,
errno); "error creating async udp socket",
errno);
} }
auto g = folly::makeGuard([&] { ::close(socket); }); auto g = folly::makeGuard([&] { ::close(socket); });
...@@ -61,37 +62,33 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) { ...@@ -61,37 +62,33 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
// put the socket in non-blocking mode // put the socket in non-blocking mode
int ret = fcntl(socket, F_SETFL, O_NONBLOCK); int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
if (ret != 0) { if (ret != 0) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, throw AsyncSocketException(
"failed to put socket in non-blocking mode", AsyncSocketException::NOT_OPEN,
errno); "failed to put socket in non-blocking mode",
errno);
} }
if (reuseAddr_) { if (reuseAddr_) {
// put the socket in reuse mode // put the socket in reuse mode
int value = 1; int value = 1;
if (setsockopt(socket, if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) !=
SOL_SOCKET, 0) {
SO_REUSEADDR, throw AsyncSocketException(
&value, AsyncSocketException::NOT_OPEN,
sizeof(value)) != 0) { "failed to put socket in reuse mode",
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, errno);
"failed to put socket in reuse mode",
errno);
} }
} }
if (reusePort_) { if (reusePort_) {
// put the socket in port reuse mode // put the socket in port reuse mode
int value = 1; int value = 1;
if (setsockopt(socket, if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value)) !=
SOL_SOCKET, 0) {
SO_REUSEPORT, throw AsyncSocketException(
&value, AsyncSocketException::NOT_OPEN,
sizeof(value)) != 0) { "failed to put socket in reuse_port mode",
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, errno);
"failed to put socket in reuse_port mode",
errno);
} }
} }
...@@ -100,50 +97,38 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) { ...@@ -100,50 +97,38 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
// Set busy_poll time in microseconds on the socket. // Set busy_poll time in microseconds on the socket.
// It sets how long socket will be in busy_poll mode when no event occurs. // It sets how long socket will be in busy_poll mode when no event occurs.
int value = busyPollUs_; int value = busyPollUs_;
if (setsockopt(socket, if (setsockopt(socket, SOL_SOCKET, SO_BUSY_POLL, &value, sizeof(value)) !=
SOL_SOCKET, 0) {
SO_BUSY_POLL, throw AsyncSocketException(
&value, AsyncSocketException::NOT_OPEN,
sizeof(value)) != 0) { "failed to set SO_BUSY_POLL on the socket",
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, errno);
"failed to set SO_BUSY_POLL on the socket",
errno);
} }
#else /* SO_BUSY_POLL is not supported*/ #else /* SO_BUSY_POLL is not supported*/
throw AsyncSocketException(AsyncSocketException::NOT_OPEN, throw AsyncSocketException(
"SO_BUSY_POLL is not supported", AsyncSocketException::NOT_OPEN, "SO_BUSY_POLL is not supported", errno);
errno);
#endif #endif
} }
if (rcvBuf_ > 0) { if (rcvBuf_ > 0) {
// Set the size of the buffer for the received messages in rx_queues. // Set the size of the buffer for the received messages in rx_queues.
int value = rcvBuf_; int value = rcvBuf_;
if (setsockopt(socket, if (setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) != 0) {
SOL_SOCKET, throw AsyncSocketException(
SO_RCVBUF, AsyncSocketException::NOT_OPEN,
&value, "failed to set SO_RCVBUF on the socket",
sizeof(value)) != 0) { errno);
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"failed to set SO_RCVBUF on the socket",
errno);
} }
} }
if (sndBuf_ > 0) { if (sndBuf_ > 0) {
// Set the size of the buffer for the sent messages in tx_queues. // Set the size of the buffer for the sent messages in tx_queues.
int value = rcvBuf_; int value = rcvBuf_;
if (setsockopt(socket, if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) != 0) {
SOL_SOCKET, throw AsyncSocketException(
SO_SNDBUF, AsyncSocketException::NOT_OPEN,
&value, "failed to set SO_SNDBUF on the socket",
sizeof(value)) != 0) { errno);
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"failed to set SO_SNDBUF on the socket",
errno);
} }
} }
...@@ -152,9 +137,7 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) { ...@@ -152,9 +137,7 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
int flag = 1; int flag = 1;
if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) { if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
throw AsyncSocketException( throw AsyncSocketException(
AsyncSocketException::NOT_OPEN, AsyncSocketException::NOT_OPEN, "Failed to set IPV6_V6ONLY", errno);
"Failed to set IPV6_V6ONLY",
errno);
} }
} }
...@@ -223,14 +206,15 @@ void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) { ...@@ -223,14 +206,15 @@ void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
localAddress_.setFromLocalAddress(fd_); localAddress_.setFromLocalAddress(fd_);
} }
ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address, ssize_t AsyncUDPSocket::write(
const std::unique_ptr<folly::IOBuf>& buf) { const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
// UDP's typical MTU size is 1500, so high number of buffers // UDP's typical MTU size is 1500, so high number of buffers
// really do not make sense. Optimze for buffer chains with // really do not make sense. Optimze for buffer chains with
// buffers less than 16, which is the highest I can think of // buffers less than 16, which is the highest I can think of
// for a real use case. // for a real use case.
iovec vec[16]; iovec vec[16];
size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0])); size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0]));
if (UNLIKELY(iovec_len == 0)) { if (UNLIKELY(iovec_len == 0)) {
buf->coalesce(); buf->coalesce();
vec[0].iov_base = const_cast<uint8_t*>(buf->data()); vec[0].iov_base = const_cast<uint8_t*>(buf->data());
...@@ -241,8 +225,10 @@ ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address, ...@@ -241,8 +225,10 @@ ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
return writev(address, vec, iovec_len); return writev(address, vec, iovec_len);
} }
ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address, ssize_t AsyncUDPSocket::writev(
const struct iovec* vec, size_t iovec_len) { const folly::SocketAddress& address,
const struct iovec* vec,
size_t iovec_len) {
CHECK_NE(-1, fd_) << "Socket not yet bound"; CHECK_NE(-1, fd_) << "Socket not yet bound";
sockaddr_storage addrStorage; sockaddr_storage addrStorage;
...@@ -266,8 +252,8 @@ void AsyncUDPSocket::resumeRead(ReadCallback* cob) { ...@@ -266,8 +252,8 @@ void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
readCallback_ = CHECK_NOTNULL(cob); readCallback_ = CHECK_NOTNULL(cob);
if (!updateRegistration()) { if (!updateRegistration()) {
AsyncSocketException ex(AsyncSocketException::NOT_OPEN, AsyncSocketException ex(
"failed to register for accept events"); AsyncSocketException::NOT_OPEN, "failed to register for accept events");
readCallback_ = nullptr; readCallback_ = nullptr;
cob->onReadError(ex); cob->onReadError(ex);
...@@ -318,7 +304,6 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -318,7 +304,6 @@ void AsyncUDPSocket::handleRead() noexcept {
AsyncSocketException::BAD_ARGS, AsyncSocketException::BAD_ARGS,
"AsyncUDPSocket::getReadBuffer() returned empty buffer"); "AsyncUDPSocket::getReadBuffer() returned empty buffer");
auto cob = readCallback_; auto cob = readCallback_;
readCallback_ = nullptr; readCallback_ = nullptr;
...@@ -353,9 +338,8 @@ void AsyncUDPSocket::handleRead() noexcept { ...@@ -353,9 +338,8 @@ void AsyncUDPSocket::handleRead() noexcept {
return; return;
} }
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, AsyncSocketException ex(
"::recvfrom() failed", AsyncSocketException::INTERNAL_ERROR, "::recvfrom() failed", errno);
errno);
// In case of UDP we can continue reading from the socket // In case of UDP we can continue reading from the socket
// even if the current request fails. We notify the user // even if the current request fails. We notify the user
......
...@@ -33,10 +33,7 @@ namespace folly { ...@@ -33,10 +33,7 @@ namespace folly {
*/ */
class AsyncUDPSocket : public EventHandler { class AsyncUDPSocket : public EventHandler {
public: public:
enum class FDOwnership { enum class FDOwnership { OWNS, SHARED };
OWNS,
SHARED
};
class ReadCallback { class ReadCallback {
public: public:
...@@ -48,16 +45,17 @@ class AsyncUDPSocket : public EventHandler { ...@@ -48,16 +45,17 @@ class AsyncUDPSocket : public EventHandler {
* and if there were more bytes in datagram, we will end up * and if there were more bytes in datagram, we will end up
* dropping them. * dropping them.
*/ */
virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0; virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0;
/** /**
* Invoked when a new datagraom is available on the socket. `len` * Invoked when a new datagraom is available on the socket. `len`
* is the number of bytes read and `truncated` is true if we had * is the number of bytes read and `truncated` is true if we had
* to drop few bytes because of running out of buffer space. * to drop few bytes because of running out of buffer space.
*/ */
virtual void onDataAvailable(const folly::SocketAddress& client, virtual void onDataAvailable(
size_t len, const folly::SocketAddress& client,
bool truncated) noexcept = 0; size_t len,
bool truncated) noexcept = 0;
/** /**
* Invoked when there is an error reading from the socket. * Invoked when there is an error reading from the socket.
...@@ -66,8 +64,7 @@ class AsyncUDPSocket : public EventHandler { ...@@ -66,8 +64,7 @@ class AsyncUDPSocket : public EventHandler {
* But you have to re-register readCallback yourself after * But you have to re-register readCallback yourself after
* onReadError. * onReadError.
*/ */
virtual void onReadError(const AsyncSocketException& ex) virtual void onReadError(const AsyncSocketException& ex) noexcept = 0;
noexcept = 0;
/** /**
* Invoked when socket is closed and a read callback is registered. * Invoked when socket is closed and a read callback is registered.
...@@ -112,14 +109,17 @@ class AsyncUDPSocket : public EventHandler { ...@@ -112,14 +109,17 @@ class AsyncUDPSocket : public EventHandler {
* Send the data in buffer to destination. Returns the return code from * Send the data in buffer to destination. Returns the return code from
* ::sendmsg. * ::sendmsg.
*/ */
virtual ssize_t write(const folly::SocketAddress& address, virtual ssize_t write(
const std::unique_ptr<folly::IOBuf>& buf); const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf);
/** /**
* Send data in iovec to destination. Returns the return code from sendmsg. * Send data in iovec to destination. Returns the return code from sendmsg.
*/ */
virtual ssize_t writev(const folly::SocketAddress& address, virtual ssize_t writev(
const struct iovec* vec, size_t veclen); const folly::SocketAddress& address,
const struct iovec* vec,
size_t veclen);
/** /**
* Start reading datagrams * Start reading datagrams
......
...@@ -26,36 +26,34 @@ ...@@ -26,36 +26,34 @@
#include <folly/portability/GMock.h> #include <folly/portability/GMock.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
using folly::AsyncUDPSocket;
using folly::AsyncUDPServerSocket;
using folly::AsyncTimeout; using folly::AsyncTimeout;
using folly::AsyncUDPServerSocket;
using folly::AsyncUDPSocket;
using folly::EventBase; using folly::EventBase;
using folly::SocketAddress;
using folly::IOBuf; using folly::IOBuf;
using folly::SocketAddress;
using namespace testing; using namespace testing;
class UDPAcceptor class UDPAcceptor : public AsyncUDPServerSocket::Callback {
: public AsyncUDPServerSocket::Callback {
public: public:
UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) { UDPAcceptor(EventBase* evb, int n) : evb_(evb), n_(n) {}
}
void onListenStarted() noexcept override {} void onListenStarted() noexcept override {}
void onListenStopped() noexcept override {} void onListenStopped() noexcept override {}
void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */, void onDataAvailable(
const folly::SocketAddress& client, std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
std::unique_ptr<folly::IOBuf> data, const folly::SocketAddress& client,
bool truncated) noexcept override { std::unique_ptr<folly::IOBuf> data,
bool truncated) noexcept override {
lastClient_ = client; lastClient_ = client;
lastMsg_ = data->moveToFbString().toStdString(); lastMsg_ = data->moveToFbString().toStdString();
auto len = data->computeChainDataLength(); auto len = data->computeChainDataLength();
VLOG(4) << "Worker " << n_ << " read " << len << " bytes " VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
<< "(trun:" << truncated << ") from " << client.describe() << "(trun:" << truncated << ") from " << client.describe() << " - "
<< " - " << lastMsg_; << lastMsg_;
sendPong(); sendPong();
} }
...@@ -81,8 +79,7 @@ class UDPAcceptor ...@@ -81,8 +79,7 @@ class UDPAcceptor
class UDPServer { class UDPServer {
public: public:
UDPServer(EventBase* evb, folly::SocketAddress addr, int n) UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
: evb_(evb), addr_(addr), evbs_(n) { : evb_(evb), addr_(addr), evbs_(n) {}
}
void start() { void start() {
CHECK(evb_->isInEventBaseThread()); CHECK(evb_->isInEventBaseThread());
...@@ -101,12 +98,10 @@ class UDPServer { ...@@ -101,12 +98,10 @@ class UDPServer {
// Add numWorkers thread // Add numWorkers thread
int i = 0; int i = 0;
for (auto& evb: evbs_) { for (auto& evb : evbs_) {
acceptors_.emplace_back(&evb, i); acceptors_.emplace_back(&evb, i);
std::thread t([&] () { std::thread t([&]() { evb.loopForever(); });
evb.loopForever();
});
evb.waitUntilRunning(); evb.waitUntilRunning();
...@@ -127,11 +122,11 @@ class UDPServer { ...@@ -127,11 +122,11 @@ class UDPServer {
socket_->close(); socket_->close();
socket_.reset(); socket_.reset();
for (auto& evb: evbs_) { for (auto& evb : evbs_) {
evb.terminateLoopSoon(); evb.terminateLoopSoon();
} }
for (auto& t: threads_) { for (auto& t : threads_) {
t.join(); t.join();
} }
} }
...@@ -146,14 +141,9 @@ class UDPServer { ...@@ -146,14 +141,9 @@ class UDPServer {
std::vector<UDPAcceptor> acceptors_; std::vector<UDPAcceptor> acceptors_;
}; };
class UDPClient class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
: private AsyncUDPSocket::ReadCallback,
private AsyncTimeout {
public: public:
explicit UDPClient(EventBase* evb) explicit UDPClient(EventBase* evb) : AsyncTimeout(evb), evb_(evb) {}
: AsyncTimeout(evb),
evb_(evb) {
}
void start(const folly::SocketAddress& server, int n) { void start(const folly::SocketAddress& server, int n) {
CHECK(evb_->isInEventBaseThread()); CHECK(evb_->isInEventBaseThread());
...@@ -193,8 +183,7 @@ class UDPClient ...@@ -193,8 +183,7 @@ class UDPClient
--n_; --n_;
scheduleTimeout(5); scheduleTimeout(5);
socket_->write( socket_->write(
server_, server_, folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
} }
void getReadBuffer(void** buf, size_t* len) noexcept override { void getReadBuffer(void** buf, size_t* len) noexcept override {
...@@ -202,11 +191,12 @@ class UDPClient ...@@ -202,11 +191,12 @@ class UDPClient
*len = 1024; *len = 1024;
} }
void onDataAvailable(const folly::SocketAddress& client, void onDataAvailable(
size_t len, const folly::SocketAddress& client,
bool truncated) noexcept override { size_t len,
bool truncated) noexcept override {
VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from " VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
<< client.describe() << " - " << std::string(buf_, len); << client.describe() << " - " << std::string(buf_, len);
VLOG(4) << n_ << " left"; VLOG(4) << n_ << " left";
++pongRecvd_; ++pongRecvd_;
...@@ -251,9 +241,7 @@ TEST(AsyncSocketTest, PingPong) { ...@@ -251,9 +241,7 @@ TEST(AsyncSocketTest, PingPong) {
UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4); UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
// Start event loop in a separate thread // Start event loop in a separate thread
auto serverThread = std::thread([&sevb] () { auto serverThread = std::thread([&sevb]() { sevb.loopForever(); });
sevb.loopForever();
});
// Wait for event loop to start // Wait for event loop to start
sevb.waitUntilRunning(); sevb.waitUntilRunning();
...@@ -265,15 +253,13 @@ TEST(AsyncSocketTest, PingPong) { ...@@ -265,15 +253,13 @@ TEST(AsyncSocketTest, PingPong) {
UDPClient client(&cevb); UDPClient client(&cevb);
// Start event loop in a separate thread // Start event loop in a separate thread
auto clientThread = std::thread([&cevb] () { auto clientThread = std::thread([&cevb]() { cevb.loopForever(); });
cevb.loopForever();
});
// Wait for event loop to start // Wait for event loop to start
cevb.waitUntilRunning(); cevb.waitUntilRunning();
// Send ping // Send ping
cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); }); cevb.runInEventBaseThread([&]() { client.start(server.address(), 1000); });
// Wait for client to finish // Wait for client to finish
clientThread.join(); clientThread.join();
...@@ -283,7 +269,7 @@ TEST(AsyncSocketTest, PingPong) { ...@@ -283,7 +269,7 @@ TEST(AsyncSocketTest, PingPong) {
CHECK_GT(client.pongRecvd(), 0); CHECK_GT(client.pongRecvd(), 0);
// Shutdown server // Shutdown server
sevb.runInEventBaseThread([&] () { sevb.runInEventBaseThread([&]() {
server.shutdown(); server.shutdown();
sevb.terminateLoopSoon(); sevb.terminateLoopSoon();
}); });
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment