Commit 430a2ed1 authored by Dave Watson's avatar Dave Watson Committed by JoelMarcey

AsyncUDPSocket

Summary:
Move AsyncUDPSocket to folly.

There is also one under realtime/voip/async that looks functionaly equivalent?  I think this one is only used in gangplank currently.

Test Plan: contbuild

Reviewed By: hans@fb.com

Subscribers: trunkagent, doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@

FB internal diff: D1710675

Tasks: 5788116

Signature: t1:1710675:1417477000:9aebb466757554a5fa49d7c36cb504b4d8711b68
parent 6a1addcc
......@@ -156,6 +156,8 @@ nobase_follyinclude_HEADERS = \
io/ShutdownSocketSet.h \
io/async/AsyncTimeout.h \
io/async/AsyncTransport.h \
io/async/AsyncUDPServerSocket.h \
io/async/AsyncUDPSocket.h \
io/async/AsyncServerSocket.h \
io/async/AsyncSSLServerSocket.h \
io/async/AsyncSocket.h \
......@@ -292,6 +294,7 @@ libfolly_la_SOURCES = \
io/RecordIO.cpp \
io/ShutdownSocketSet.cpp \
io/async/AsyncTimeout.cpp \
io/async/AsyncUDPSocket.cpp \
io/async/AsyncServerSocket.cpp \
io/async/AsyncSSLServerSocket.cpp \
io/async/AsyncSocket.cpp \
......
......@@ -16,6 +16,7 @@
#pragma once
#include <folly/Format.h>
#include <folly/io/async/DelayedDestruction.h>
namespace folly {
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/MoveWrapper.h>
#include <folly/io/IOBufQueue.h>
#include <folly/Memory.h>
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
namespace folly {
/**
* UDP server socket
*
* It wraps a UDP socket waiting for packets and distributes them among
* a set of event loops in round robin fashion.
*
* NOTE: At the moment it is designed to work with single packet protocols
* in mind. We distribute incoming packets among all the listeners in
* round-robin fashion. So, any protocol that expects to send/recv
* more than 1 packet will not work because they will end up with
* different event base to process.
*/
class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
public:
class Callback {
public:
/**
* Invoked when we start reading data from socket. It is invoked in
* each acceptors/listeners event base thread.
*/
virtual void onListenStarted() noexcept = 0;
/**
* Invoked when the server socket is closed. It is invoked in each
* acceptors/listeners event base thread.
*/
virtual void onListenStopped() noexcept = 0;
/**
* Invoked when a new packet is received
*/
virtual void onDataAvailable(const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept = 0;
virtual ~Callback() {}
};
/**
* Create a new UDP server socket
*
* Note about packet size - We allocate buffer of packetSize_ size to read.
* If packet are larger than this value, as per UDP protocol, remaining data
* is dropped and you get `truncated = true` in onDataAvailable callback
*/
explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
: evb_(evb),
packetSize_(sz),
nextListener_(0) {
}
~AsyncUDPServerSocket() {
if (socket_) {
close();
}
}
void bind(const folly::SocketAddress& address) {
CHECK(!socket_);
socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
socket_->bind(address);
}
folly::SocketAddress address() const {
CHECK(socket_);
return socket_->address();
}
/**
* Add a listener to the round robin list
*/
void addListener(EventBase* evb, Callback* callback) {
listeners_.emplace_back(evb, callback);
}
void listen() {
CHECK(socket_) << "Need to bind before listening";
for (auto& listener: listeners_) {
auto callback = listener.second;
listener.first->runInEventBaseThread([callback] () mutable {
callback->onListenStarted();
});
}
socket_->resumeRead(this);
}
int getFD() {
CHECK(socket_) << "Need to bind before getting FD";
return socket_->getFD();
}
void close() {
CHECK(socket_) << "Need to bind before closing";
socket_.reset();
}
private:
// AsyncUDPSocket::ReadCallback
void getReadBuffer(void** buf, size_t* len) noexcept {
std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
}
void onDataAvailable(const folly::SocketAddress& clientAddress,
size_t len,
bool truncated) noexcept {
buf_.postallocate(len);
auto data = buf_.split(len);
if (listeners_.empty()) {
LOG(WARNING) << "UDP server socket dropping packet, "
<< "no listener registered";
return;
}
if (nextListener_ >= listeners_.size()) {
nextListener_ = 0;
}
auto client = clientAddress;
auto callback = listeners_[nextListener_].second;
auto mvp =
folly::MoveWrapper<
std::unique_ptr<folly::IOBuf>>(std::move(data));
// Schedule it in the listener's eventbase
// XXX: Speed this up
std::function<void()> f = [client, callback, mvp, truncated] () mutable {
callback->onDataAvailable(client, std::move(*mvp), truncated);
};
listeners_[nextListener_].first->runInEventBaseThread(f);
++nextListener_;
}
void onReadError(const AsyncSocketException& ex) noexcept {
LOG(ERROR) << ex.what();
// Lets register to continue listening for packets
socket_->resumeRead(this);
}
void onReadClosed() noexcept {
for (auto& listener: listeners_) {
auto callback = listener.second;
listener.first->runInEventBaseThread([callback] () mutable {
callback->onListenStopped();
});
}
}
EventBase* const evb_;
const size_t packetSize_;
std::unique_ptr<AsyncUDPSocket> socket_;
// List of listener to distribute packets among
typedef std::pair<EventBase*, Callback*> Listener;
std::vector<Listener> listeners_;
// Next listener to send packet to
uint32_t nextListener_;
// Temporary buffer for data
folly::IOBufQueue buf_;
};
} // Namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
namespace folly {
AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
: EventHandler(CHECK_NOTNULL(evb)),
eventBase_(evb),
fd_(-1),
readCallback_(nullptr) {
DCHECK(evb->isInEventBaseThread());
}
AsyncUDPSocket::~AsyncUDPSocket() {
if (fd_ != -1) {
close();
}
}
void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
if (socket == -1) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"error creating async udp socket",
errno);
}
auto g = folly::makeGuard([&] { ::close(socket); });
// put the socket in non-blocking mode
int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
if (ret != 0) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"failed to put socket in non-blocking mode",
errno);
}
// put the socket in reuse mode
int value = 1;
if (setsockopt(socket,
SOL_SOCKET,
SO_REUSEADDR,
&value,
sizeof(value)) != 0) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"failed to put socket in reuse mode",
errno);
}
// bind to the address
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
if (::bind(socket, saddr, address.getActualSize()) != 0) {
throw AsyncSocketException(
AsyncSocketException::NOT_OPEN,
"failed to bind the async udp socket for:" + address.describe(),
errno);
}
// success
g.dismiss();
fd_ = socket;
ownership_ = FDOwnership::OWNS;
// attach to EventHandler
EventHandler::changeHandlerFD(fd_);
if (address.getPort() != 0) {
localAddress_ = address;
} else {
localAddress_.setFromLocalAddress(fd_);
}
}
void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
CHECK_EQ(-1, fd_) << "Already bound to another FD";
fd_ = fd;
ownership_ = ownership;
EventHandler::changeHandlerFD(fd_);
localAddress_.setFromLocalAddress(fd_);
}
ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
CHECK_NE(-1, fd_) << "Socket not yet bound";
// XXX: Use `sendmsg` instead of coalescing here
buf->coalesce();
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
return ::sendto(fd_,
buf->data(),
buf->length(),
MSG_DONTWAIT,
saddr,
address.getActualSize());
}
void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
CHECK(!readCallback_) << "Another read callback already installed";
CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
readCallback_ = CHECK_NOTNULL(cob);
if (!updateRegistration()) {
AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
"failed to register for accept events");
readCallback_ = nullptr;
cob->onReadError(ex);
return;
}
}
void AsyncUDPSocket::pauseRead() {
// It is ok to pause an already paused socket
readCallback_ = nullptr;
updateRegistration();
}
void AsyncUDPSocket::close() {
DCHECK(eventBase_->isInEventBaseThread());
if (readCallback_) {
auto cob = readCallback_;
readCallback_ = nullptr;
cob->onReadClosed();
}
// Unregister any events we are registered for
unregisterHandler();
if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
::close(fd_);
}
fd_ = -1;
}
void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
if (events & EventHandler::READ) {
DCHECK(readCallback_);
handleRead();
}
}
void AsyncUDPSocket::handleRead() noexcept {
void* buf{nullptr};
size_t len{0};
readCallback_->getReadBuffer(&buf, &len);
if (buf == nullptr || len == 0) {
AsyncSocketException ex(
AsyncSocketException::BAD_ARGS,
"AsyncUDPSocket::getReadBuffer() returned empty buffer");
auto cob = readCallback_;
readCallback_ = nullptr;
cob->onReadError(ex);
updateRegistration();
return;
}
struct sockaddr_storage addrStorage;
socklen_t addrLen = sizeof(addrStorage);
memset(&addrStorage, 0, addrLen);
struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = localAddress_.getFamily();
ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
if (bytesRead >= 0) {
clientAddress_.setFromSockaddr(rawAddr, addrLen);
if (bytesRead > 0) {
bool truncated = false;
if ((size_t)bytesRead > len) {
truncated = true;
bytesRead = len;
}
readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
}
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No data could be read without blocking the socket
return;
}
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
"::recvfrom() failed",
errno);
// In case of UDP we can continue reading from the socket
// even if the current request fails. We notify the user
// so that he can do some logging/stats collection if he wants.
auto cob = readCallback_;
readCallback_ = nullptr;
cob->onReadError(ex);
updateRegistration();
}
}
bool AsyncUDPSocket::updateRegistration() noexcept {
uint16_t flags = NONE;
if (readCallback_) {
flags |= READ;
}
return registerHandler(flags | PERSIST);
}
} // Namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/IOBuf.h>
#include <folly/ScopeGuard.h>
#include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/EventBase.h>
#include <folly/SocketAddress.h>
#include <memory>
namespace folly {
/**
* UDP socket
*/
class AsyncUDPSocket : public EventHandler {
public:
enum class FDOwnership {
OWNS,
SHARED
};
class ReadCallback {
public:
/**
* Invoked when the socket becomes readable and we want buffer
* to write to.
*
* NOTE: From socket we will end up reading at most `len` bytes
* and if there were more bytes in datagram, we will end up
* dropping them.
*/
virtual void getReadBuffer(void** buf, size_t* len) noexcept = 0;
/**
* Invoked when a new datagraom is available on the socket. `len`
* is the number of bytes read and `truncated` is true if we had
* to drop few bytes because of running out of buffer space.
*/
virtual void onDataAvailable(const folly::SocketAddress& client,
size_t len,
bool truncated) noexcept = 0;
/**
* Invoked when there is an error reading from the socket.
*
* NOTE: Since UDP is connectionless, you can still read from the socket.
* But you have to re-register readCallback yourself after
* onReadError.
*/
virtual void onReadError(const AsyncSocketException& ex)
noexcept = 0;
/**
* Invoked when socket is closed and a read callback is registered.
*/
virtual void onReadClosed() noexcept = 0;
virtual ~ReadCallback() {}
};
/**
* Create a new UDP socket that will run in the
* given eventbase
*/
explicit AsyncUDPSocket(EventBase* evb);
~AsyncUDPSocket();
/**
* Returns the address server is listening on
*/
const folly::SocketAddress& address() const {
CHECK_NE(-1, fd_) << "Server not yet bound to an address";
return localAddress_;
}
/**
* Bind the socket to the following address. If port is not
* set in the `address` an ephemeral port is chosen and you can
* use `address()` method above to get it after this method successfully
* returns.
*/
void bind(const folly::SocketAddress& address);
/**
* Use an already bound file descriptor. You can either transfer ownership
* of this FD by using ownership = FDOwnership::OWNS or share it using
* FDOwnership::SHARED. In case FD is shared, it will not be `close`d in
* destructor.
*/
void setFD(int fd, FDOwnership ownership);
/**
* Send the data in buffer to destination. Returns the return code from
* ::sendto.
*/
ssize_t write(const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf);
/**
* Start reading datagrams
*/
void resumeRead(ReadCallback* cob);
/**
* Pause reading datagrams
*/
void pauseRead();
/**
* Stop listening on the socket.
*/
void close();
/**
* Get internal FD used by this socket
*/
int getFD() const {
CHECK_NE(-1, fd_) << "Need to bind before getting FD out";
return fd_;
}
private:
AsyncUDPSocket(const AsyncUDPSocket&) = delete;
AsyncUDPSocket& operator=(const AsyncUDPSocket&) = delete;
// EventHandler
void handlerReady(uint16_t events) noexcept;
void handleRead() noexcept;
bool updateRegistration() noexcept;
EventBase* eventBase_;
folly::SocketAddress localAddress_;
int fd_;
FDOwnership ownership_;
// Temp space to receive client address
folly::SocketAddress clientAddress_;
// Non-null only when we are reading
ReadCallback* readCallback_;
};
} // Namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/AsyncUDPServerSocket.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
#include <folly/SocketAddress.h>
#include <boost/thread/barrier.hpp>
#include <folly/io/IOBuf.h>
#include <thread>
#include <gtest/gtest.h>
using folly::AsyncUDPSocket;
using folly::AsyncUDPServerSocket;
using folly::AsyncTimeout;
using folly::EventBase;
using folly::SocketAddress;
using folly::IOBuf;
class UDPAcceptor
: public AsyncUDPServerSocket::Callback {
public:
UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
}
void onListenStarted() noexcept {
}
void onListenStopped() noexcept {
}
void onDataAvailable(const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool truncated) noexcept {
lastClient_ = client;
lastMsg_ = data->moveToFbString().toStdString();
auto len = data->computeChainDataLength();
VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
<< "(trun:" << truncated << ") from " << client.describe()
<< " - " << lastMsg_;
sendPong();
}
void sendPong() noexcept {
try {
AsyncUDPSocket socket(evb_);
socket.bind(folly::SocketAddress("127.0.0.1", 0));
socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
} catch (const std::exception& ex) {
VLOG(4) << "Failed to send PONG " << ex.what();
}
}
private:
EventBase* const evb_{nullptr};
const int n_{-1};
folly::SocketAddress lastClient_;
std::string lastMsg_;
};
class UDPServer {
public:
UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
: evb_(evb), addr_(addr), evbs_(n) {
}
void start() {
CHECK(evb_->isInEventBaseThread());
socket_ = folly::make_unique<AsyncUDPServerSocket>(
evb_,
1500);
try {
socket_->bind(addr_);
VLOG(4) << "Server listening on " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
acceptors_.reserve(evbs_.size());
threads_.reserve(evbs_.size());
// Add numWorkers thread
int i = 0;
for (auto& evb: evbs_) {
acceptors_.emplace_back(&evb, i);
std::thread t([&] () {
evb.loopForever();
});
auto r = std::make_shared<boost::barrier>(2);
evb.runInEventBaseThread([r] () {
r->wait();
});
r->wait();
socket_->addListener(&evb, &acceptors_[i]);
threads_.emplace_back(std::move(t));
++i;
}
socket_->listen();
}
folly::SocketAddress address() const {
return socket_->address();
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->close();
socket_.reset();
for (auto& evb: evbs_) {
evb.terminateLoopSoon();
}
for (auto& t: threads_) {
t.join();
}
}
private:
EventBase* const evb_{nullptr};
const folly::SocketAddress addr_;
std::unique_ptr<AsyncUDPServerSocket> socket_;
std::vector<std::thread> threads_;
std::vector<folly::EventBase> evbs_;
std::vector<UDPAcceptor> acceptors_;
};
class UDPClient
: private AsyncUDPSocket::ReadCallback,
private AsyncTimeout {
public:
explicit UDPClient(EventBase* evb)
: AsyncTimeout(evb),
evb_(evb) {
}
void start(const folly::SocketAddress& server, int n) {
CHECK(evb_->isInEventBaseThread());
server_ = server;
socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
try {
socket_->bind(folly::SocketAddress("127.0.0.1", 0));
VLOG(4) << "Client bound to " << socket_->address().describe();
} catch (const std::exception& ex) {
LOG(FATAL) << ex.what();
}
socket_->resumeRead(this);
n_ = n;
// Start playing ping pong
sendPing();
}
void shutdown() {
CHECK(evb_->isInEventBaseThread());
socket_->pauseRead();
socket_->close();
socket_.reset();
evb_->terminateLoopSoon();
}
void sendPing() {
if (n_ == 0) {
shutdown();
return;
}
--n_;
scheduleTimeout(5);
socket_->write(
server_,
folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
}
void getReadBuffer(void** buf, size_t* len) noexcept {
*buf = buf_;
*len = 1024;
}
void onDataAvailable(const folly::SocketAddress& client,
size_t len,
bool truncated) noexcept {
VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
<< client.describe() << " - " << std::string(buf_, len);
VLOG(4) << n_ << " left";
++pongRecvd_;
sendPing();
}
void onReadError(const folly::AsyncSocketException& ex) noexcept {
VLOG(4) << ex.what();
// Start listening for next PONG
socket_->resumeRead(this);
}
void onReadClosed() noexcept {
CHECK(false) << "We unregister reads before closing";
}
void timeoutExpired() noexcept {
VLOG(4) << "Timeout expired";
sendPing();
}
int pongRecvd() const {
return pongRecvd_;
}
private:
EventBase* const evb_{nullptr};
folly::SocketAddress server_;
std::unique_ptr<AsyncUDPSocket> socket_;
int pongRecvd_{0};
int n_{0};
char buf_[1024];
};
TEST(AsyncSocketTest, PingPong) {
folly::EventBase sevb;
UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
boost::barrier barrier(2);
// Start event loop in a separate thread
auto serverThread = std::thread([&sevb] () {
sevb.loopForever();
});
// Wait for event loop to start
sevb.runInEventBaseThread([&] () { barrier.wait(); });
barrier.wait();
// Start the server
sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
barrier.wait();
folly::EventBase cevb;
UDPClient client(&cevb);
// Start event loop in a separate thread
auto clientThread = std::thread([&cevb] () {
cevb.loopForever();
});
// Wait for event loop to start
cevb.runInEventBaseThread([&] () { barrier.wait(); });
barrier.wait();
// Send ping
cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
// Wait for client to finish
clientThread.join();
// Check that some PING/PONGS were exchanged. Out of 1000 transactions
// at least 1 should succeed
CHECK_GT(client.pongRecvd(), 0);
// Shutdown server
sevb.runInEventBaseThread([&] () {
server.shutdown();
sevb.terminateLoopSoon();
});
// Wait for server thread to joib
serverThread.join();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment