Commit ceee59a7 authored by Pranjal Raihan's avatar Pranjal Raihan Committed by Facebook GitHub Bot

Migrate AsyncServerSocket to use AtomicNotificationQueue

Reviewed By: andriigrynenko

Differential Revision: D24698220

fbshipit-source-id: aab8c96440d60bb653f0c537af56081819129306
parent dcddc5c0
......@@ -58,14 +58,12 @@ const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
void AsyncServerSocket::RemoteAcceptor::start(
EventBase* eventBase,
uint32_t maxAtOnce,
uint32_t maxInQueue) {
setMaxReadAtOnce(maxAtOnce);
queue_.setMaxQueueSize(maxInQueue);
uint32_t maxAtOnce) {
queue_.setMaxReadAtOnce(maxAtOnce);
eventBase->runInEventBaseThread([=]() {
callback_->acceptStarted();
this->startConsuming(eventBase, &queue_);
queue_.startConsuming(eventBase);
});
}
......@@ -78,20 +76,20 @@ void AsyncServerSocket::RemoteAcceptor::stop(
});
}
void AsyncServerSocket::RemoteAcceptor::messageAvailable(
void AsyncServerSocket::RemoteAcceptor::Consumer::operator()(
QueueMessage&& msg) noexcept {
switch (msg.type) {
case MessageType::MSG_NEW_CONN: {
if (connectionEventCallback_) {
connectionEventCallback_->onConnectionDequeuedByAcceptorCallback(
msg.fd, msg.address);
if (acceptor_.connectionEventCallback_) {
acceptor_.connectionEventCallback_
->onConnectionDequeuedByAcceptorCallback(msg.fd, msg.address);
}
callback_->connectionAccepted(msg.fd, msg.address);
acceptor_.callback_->connectionAccepted(msg.fd, msg.address);
break;
}
case MessageType::MSG_ERROR: {
std::runtime_error ex(msg.msg);
callback_->acceptError(ex);
acceptor_.callback_->acceptError(ex);
break;
}
default: {
......@@ -99,7 +97,7 @@ void AsyncServerSocket::RemoteAcceptor::messageAvailable(
<< int(msg.type);
std::runtime_error ex(
"received invalid accept notification message type");
callback_->acceptError(ex);
acceptor_.callback_->acceptError(ex);
}
}
}
......@@ -597,7 +595,7 @@ void AsyncServerSocket::addAcceptCallback(
RemoteAcceptor* acceptor = nullptr;
try {
acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
acceptor->start(eventBase, maxAtOnce);
} catch (...) {
callbacks_.pop_back();
delete acceptor;
......@@ -1017,7 +1015,8 @@ void AsyncServerSocket::dispatchSocket(
// Loop until we find a free queue to write to
while (true) {
if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
if (info->consumer->getQueue().tryPutMessage(
std::move(msg), maxNumMsgsInQueue_)) {
if (connectionEventCallback_) {
connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback(
socket, addr);
......@@ -1077,7 +1076,8 @@ void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) {
return;
}
if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
if (info->consumer->getQueue().tryPutMessage(
std::move(msg), maxNumMsgsInQueue_)) {
return;
}
// Fall through and try another callback
......
......@@ -21,10 +21,10 @@
#include <folly/io/ShutdownSocketSet.h>
#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/net/NetOps.h>
#include <folly/net/NetworkSocket.h>
#include <folly/portability/Sockets.h>
......@@ -550,8 +550,6 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
* Set the maximum number of unprocessed messages in NotificationQueue.
* No new message will be sent to that NotificationQueue if there are more
* than such number of unprocessed messages in that queue.
*
* Only works if called before addAcceptCallback.
*/
void setMaxNumMessagesInQueue(uint32_t num) { maxNumMsgsInQueue_ = num; }
......@@ -596,7 +594,7 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
int64_t numMsgs = 0;
for (const auto& callback : callbacks_) {
if (callback.consumer) {
numMsgs += callback.consumer->getQueue()->size();
numMsgs += callback.consumer->getQueue().size();
}
}
return numMsgs;
......@@ -736,28 +734,33 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
* receives notification of new sockets via a NotificationQueue,
* and then invokes the AcceptCallback.
*/
class RemoteAcceptor : private NotificationQueue<QueueMessage>::Consumer {
class RemoteAcceptor {
struct Consumer {
void operator()(QueueMessage&& msg) noexcept;
explicit Consumer(RemoteAcceptor& acceptor) : acceptor_(acceptor) {}
RemoteAcceptor& acceptor_;
};
public:
using Queue = AtomicNotificationQueue<QueueMessage, Consumer>;
explicit RemoteAcceptor(
AcceptCallback* callback,
ConnectionEventCallback* connectionEventCallback)
: callback_(callback),
connectionEventCallback_(connectionEventCallback) {}
connectionEventCallback_(connectionEventCallback),
queue_(Consumer(*this)) {}
~RemoteAcceptor() override = default;
void start(EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
void start(EventBase* eventBase, uint32_t maxAtOnce);
void stop(EventBase* eventBase, AcceptCallback* callback);
void messageAvailable(QueueMessage&& msg) noexcept override;
NotificationQueue<QueueMessage>* getQueue() { return &queue_; }
Queue& getQueue() { return queue_; }
private:
AcceptCallback* callback_;
ConnectionEventCallback* connectionEventCallback_;
NotificationQueue<QueueMessage> queue_;
Queue queue_;
};
/**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment