Commit cdf0badc authored by Srivatsan Ramesh's avatar Srivatsan Ramesh Committed by Facebook GitHub Bot

Change socket QueueMessage to a variant

Summary: QueueMessage is now a variant of NewConnMessage and ErrorMessage

Reviewed By: praihan

Differential Revision: D27384053

fbshipit-source-id: a92f514b56f31e879dc329984bf0a615ed339836
parent 0b19393b
...@@ -75,39 +75,27 @@ void AsyncServerSocket::RemoteAcceptor::stop( ...@@ -75,39 +75,27 @@ void AsyncServerSocket::RemoteAcceptor::stop(
}); });
} }
AtomicNotificationQueueTaskStatus AtomicNotificationQueueTaskStatus AsyncServerSocket::NewConnMessage::operator()(
AsyncServerSocket::RemoteAcceptor::Consumer::operator()( RemoteAcceptor& acceptor) noexcept {
QueueMessage&& msg) noexcept { if (isExpired()) {
if (msg.isExpired()) { closeNoInt(fd);
closeNoInt(msg.fd); if (acceptor.connectionEventCallback_) {
if (acceptor_.connectionEventCallback_) { acceptor.connectionEventCallback_->onConnectionDropped(fd, clientAddr);
acceptor_.connectionEventCallback_->onConnectionDropped(
msg.fd, msg.address);
} }
return AtomicNotificationQueueTaskStatus::DISCARD; return AtomicNotificationQueueTaskStatus::DISCARD;
} }
switch (msg.type) { if (acceptor.connectionEventCallback_) {
case MessageType::MSG_NEW_CONN: { acceptor.connectionEventCallback_->onConnectionDequeuedByAcceptorCallback(
if (acceptor_.connectionEventCallback_) { fd, clientAddr);
acceptor_.connectionEventCallback_
->onConnectionDequeuedByAcceptorCallback(msg.fd, msg.address);
}
acceptor_.callback_->connectionAccepted(msg.fd, msg.address);
break;
}
case MessageType::MSG_ERROR: {
auto ex = make_exception_wrapper<std::runtime_error>(msg.msg);
acceptor_.callback_->acceptError(std::move(ex));
break;
}
default: {
LOG(ERROR) << "invalid accept notification message type "
<< int(msg.type);
auto ex = make_exception_wrapper<std::runtime_error>(
"received invalid accept notification message type");
acceptor_.callback_->acceptError(std::move(ex));
}
} }
acceptor.callback_->connectionAccepted(fd, clientAddr);
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
AtomicNotificationQueueTaskStatus AsyncServerSocket::ErrorMessage::operator()(
RemoteAcceptor& acceptor) noexcept {
auto ex = make_exception_wrapper<std::runtime_error>(msg);
acceptor.callback_->acceptError(std::move(ex));
return AtomicNotificationQueueTaskStatus::CONSUMED; return AtomicNotificationQueueTaskStatus::CONSUMED;
} }
...@@ -1059,14 +1047,12 @@ void AsyncServerSocket::dispatchSocket( ...@@ -1059,14 +1047,12 @@ void AsyncServerSocket::dispatchSocket(
const SocketAddress addr(address); const SocketAddress addr(address);
// Create a message to send over the notification queue // Create a message to send over the notification queue
QueueMessage msg;
msg.type = MessageType::MSG_NEW_CONN;
msg.address = std::move(address);
msg.fd = socket;
auto queueTimeout = *queueTimeout_; auto queueTimeout = *queueTimeout_;
std::chrono::steady_clock::time_point deadline;
if (queueTimeout.count() != 0) { if (queueTimeout.count() != 0) {
msg.deadline = std::chrono::steady_clock::now() + queueTimeout; deadline = std::chrono::steady_clock::now() + queueTimeout;
} }
NewConnMessage msg{socket, std::move(address), deadline};
// Loop until we find a free queue to write to // Loop until we find a free queue to write to
while (true) { while (true) {
...@@ -1117,10 +1103,7 @@ void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) { ...@@ -1117,10 +1103,7 @@ void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) {
CallbackInfo* info = nextCallback(); CallbackInfo* info = nextCallback();
// Create a message to send over the notification queue // Create a message to send over the notification queue
QueueMessage msg; ErrorMessage msg{errnoValue, msgstr};
msg.type = MessageType::MSG_ERROR;
msg.err = errnoValue;
msg.msg = msgstr;
while (true) { while (true) {
// Short circuit if the callback is in the primary EventBase thread // Short circuit if the callback is in the primary EventBase thread
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <exception> #include <exception>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <boost/variant.hpp>
#include <folly/ExceptionWrapper.h> #include <folly/ExceptionWrapper.h>
#include <folly/SocketAddress.h> #include <folly/SocketAddress.h>
...@@ -775,22 +776,32 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase { ...@@ -775,22 +776,32 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
~AsyncServerSocket() override; ~AsyncServerSocket() override;
private: private:
enum class MessageType { MSG_NEW_CONN = 0, MSG_ERROR = 1 }; class RemoteAcceptor;
struct QueueMessage { struct NewConnMessage {
MessageType type;
NetworkSocket fd; NetworkSocket fd;
int err; SocketAddress clientAddr;
SocketAddress address;
std::string msg;
std::chrono::steady_clock::time_point deadline; std::chrono::steady_clock::time_point deadline;
bool isExpired() const { bool isExpired() const {
return deadline.time_since_epoch().count() != 0 && return deadline.time_since_epoch().count() != 0 &&
std::chrono::steady_clock::now() > deadline; std::chrono::steady_clock::now() > deadline;
} }
AtomicNotificationQueueTaskStatus operator()(
RemoteAcceptor& acceptor) noexcept;
}; };
struct ErrorMessage {
int err;
std::string msg;
AtomicNotificationQueueTaskStatus operator()(
RemoteAcceptor& acceptor) noexcept;
};
using QueueMessage = boost::variant<NewConnMessage, ErrorMessage>;
/** /**
* A class to receive notifications to invoke AcceptCallback objects * A class to receive notifications to invoke AcceptCallback objects
* in other EventBase threads. * in other EventBase threads.
...@@ -802,12 +813,19 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase { ...@@ -802,12 +813,19 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
*/ */
class RemoteAcceptor { class RemoteAcceptor {
struct Consumer { struct Consumer {
AtomicNotificationQueueTaskStatus operator()(QueueMessage&& msg) noexcept; AtomicNotificationQueueTaskStatus operator()(
QueueMessage&& msg) noexcept {
return boost::apply_visitor(
[this](auto& visitMsg) { return visitMsg(acceptor_); }, msg);
}
explicit Consumer(RemoteAcceptor& acceptor) : acceptor_(acceptor) {} explicit Consumer(RemoteAcceptor& acceptor) : acceptor_(acceptor) {}
RemoteAcceptor& acceptor_; RemoteAcceptor& acceptor_;
}; };
friend NewConnMessage;
friend ErrorMessage;
public: public:
using Queue = EventBaseAtomicNotificationQueue<QueueMessage, Consumer>; using Queue = EventBaseAtomicNotificationQueue<QueueMessage, Consumer>;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment