Commit f697ba49 authored by Dave Watson's avatar Dave Watson Committed by Viswanath Sivakumar

AsyncUDPServerSocket passes socket in callback

Summary: AsyncUDPServerSocket doesn't make it easy to write to the same socket you read from.  Add the socket as a callback param, similar to AsyncServerSocket

Test Plan:
fbconfig -r folly; fbmake dbg

Will fixup any other spots contbuild finds

Reviewed By: hans@fb.com

Subscribers: bmatheny, doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1948936

Signature: t1:1948936:1427841651:20d13d73c06d31c75056624f051a6fd35b9701fb
parent 163a570e
...@@ -56,9 +56,11 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback ...@@ -56,9 +56,11 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
/** /**
* Invoked when a new packet is received * Invoked when a new packet is received
*/ */
virtual void onDataAvailable(const folly::SocketAddress& addr, virtual void onDataAvailable(
std::unique_ptr<folly::IOBuf> buf, std::shared_ptr<AsyncUDPSocket> socket,
bool truncated) noexcept = 0; const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept = 0;
virtual ~Callback() {} virtual ~Callback() {}
}; };
...@@ -85,7 +87,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback ...@@ -85,7 +87,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
void bind(const folly::SocketAddress& addy) { void bind(const folly::SocketAddress& addy) {
CHECK(!socket_); CHECK(!socket_);
socket_ = folly::make_unique<AsyncUDPSocket>(evb_); socket_ = std::make_shared<AsyncUDPSocket>(evb_);
socket_->setReusePort(reusePort_); socket_->setReusePort(reusePort_);
socket_->bind(addy); socket_->bind(addy);
} }
...@@ -131,6 +133,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback ...@@ -131,6 +133,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
void close() { void close() {
CHECK(socket_) << "Need to bind before closing"; CHECK(socket_) << "Need to bind before closing";
socket_->close();
socket_.reset(); socket_.reset();
} }
...@@ -165,11 +168,12 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback ...@@ -165,11 +168,12 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
auto mvp = auto mvp =
folly::MoveWrapper< folly::MoveWrapper<
std::unique_ptr<folly::IOBuf>>(std::move(data)); std::unique_ptr<folly::IOBuf>>(std::move(data));
auto socket = socket_;
// Schedule it in the listener's eventbase // Schedule it in the listener's eventbase
// XXX: Speed this up // XXX: Speed this up
std::function<void()> f = [client, callback, mvp, truncated] () mutable { std::function<void()> f = [socket, client, callback, mvp, truncated] () mutable {
callback->onDataAvailable(client, std::move(*mvp), truncated); callback->onDataAvailable(socket, client, std::move(*mvp), truncated);
}; };
listeners_[nextListener_].first->runInEventBaseThread(f); listeners_[nextListener_].first->runInEventBaseThread(f);
...@@ -196,7 +200,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback ...@@ -196,7 +200,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
EventBase* const evb_; EventBase* const evb_;
const size_t packetSize_; const size_t packetSize_;
std::unique_ptr<AsyncUDPSocket> socket_; std::shared_ptr<AsyncUDPSocket> socket_;
// List of listener to distribute packets among // List of listener to distribute packets among
typedef std::pair<EventBase*, Callback*> Listener; typedef std::pair<EventBase*, Callback*> Listener;
......
...@@ -47,7 +47,8 @@ class UDPAcceptor ...@@ -47,7 +47,8 @@ class UDPAcceptor
void onListenStopped() noexcept { void onListenStopped() noexcept {
} }
void onDataAvailable(const folly::SocketAddress& client, void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data, std::unique_ptr<folly::IOBuf> data,
bool truncated) noexcept { bool truncated) noexcept {
......
...@@ -233,7 +233,10 @@ class Acceptor : ...@@ -233,7 +233,10 @@ class Acceptor :
void onListenStarted() noexcept {} void onListenStarted() noexcept {}
void onListenStopped() noexcept {} void onListenStopped() noexcept {}
void onDataAvailable(const SocketAddress&, std::unique_ptr<IOBuf>, bool) noexcept {} void onDataAvailable(
std::shared_ptr<AsyncUDPSocket> socket,
const SocketAddress&,
std::unique_ptr<IOBuf>, bool) noexcept {}
virtual AsyncSocket::UniquePtr makeNewAsyncSocket(EventBase* base, int fd) { virtual AsyncSocket::UniquePtr makeNewAsyncSocket(EventBase* base, int fd) {
return AsyncSocket::UniquePtr(new AsyncSocket(base, fd)); return AsyncSocket::UniquePtr(new AsyncSocket(base, fd));
......
...@@ -98,7 +98,8 @@ class ServerAcceptor ...@@ -98,7 +98,8 @@ class ServerAcceptor
} }
// UDP thunk // UDP thunk
void onDataAvailable(const folly::SocketAddress& addr, void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf, std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept { bool truncated) noexcept {
acceptorPipeline_->read(buf.release()); acceptorPipeline_->read(buf.release());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment