Commit e5f8a09e authored by davejwatson's avatar davejwatson Committed by Dave Watson

Multi accept

Summary: Use acceptor pool instead of current thread for accepts.  Use reuse_port option to support multiple threads

Test Plan: Updated unittests

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, njormrod, folly-diffs@

FB internal diff: D1710619

Tasks: 5788110

Signature: t1:1710619:1417477350:eee5063186e582ef74c4802b8149563af029b3de
parent 49374068
......@@ -62,11 +62,7 @@ TEST(Bootstrap, ServerWithPipeline) {
TestServer server;
server.childPipeline(std::make_shared<TestPipelineFactory>());
server.bind(0);
auto base = EventBaseManager::get()->getEventBase();
base->runAfterDelay([&](){
server.stop();
}, 500);
base->loop();
}
TEST(Bootstrap, ClientServerTest) {
......@@ -81,10 +77,8 @@ TEST(Bootstrap, ClientServerTest) {
TestClient client;
client.connect(address);
base->runAfterDelay([&](){
server.stop();
}, 500);
base->loop();
server.stop();
CHECK(factory->pipelines == 1);
}
......@@ -109,11 +103,55 @@ TEST(Bootstrap, ClientConnectionManagerTest) {
TestClient client2;
client2.connect(address);
base->runAfterDelay([&](){
server.stop();
}, 500);
base->loop();
server.stop();
CHECK(factory->pipelines == 2);
}
TEST(Bootstrap, ServerAcceptGroupTest) {
// Verify that server is using the accept IO group
TestServer server;
auto factory = std::make_shared<TestPipelineFactory>();
server.childPipeline(factory);
server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
server.bind(0);
SocketAddress address;
server.getSockets()[0]->getAddress(&address);
boost::barrier barrier(2);
auto thread = std::thread([&](){
TestClient client;
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
barrier.wait();
});
barrier.wait();
server.stop();
thread.join();
CHECK(factory->pipelines == 1);
}
TEST(Bootstrap, ServerAcceptGroup2Test) {
// Verify that server is using the accept IO group
TestServer server;
auto factory = std::make_shared<TestPipelineFactory>();
server.childPipeline(factory);
server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
server.bind(0);
SocketAddress address;
server.getSockets()[0]->getAddress(&address);
TestClient client;
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
server.stop();
CHECK(factory->pipelines == 1);
}
......@@ -21,13 +21,13 @@ namespace folly {
std::thread ServerWorkerFactory::newThread(
folly::wangle::Func&& func) {
return internalFactory_->newThread([=](){
auto id = nextWorkerId_++;
auto worker = acceptorFactory_->newAcceptor();
{
folly::RWSpinLock::WriteHolder guard(workersLock_);
workers_.insert({id, worker});
}
return internalFactory_->newThread([=](){
EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
func();
EventBaseManager::get()->clearEventBase();
......
......@@ -16,6 +16,7 @@
#pragma once
#include <folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h>
#include <boost/thread.hpp>
namespace folly {
......@@ -137,37 +138,80 @@ class ServerBootstrap {
* @param port Port to listen on
*/
void bind(int port) {
// TODO bind to v4 and v6
// TODO take existing socket
// TODO use the acceptor thread
auto socket = folly::AsyncServerSocket::newSocket(
EventBaseManager::get()->getEventBase());
sockets_.push_back(socket);
socket->bind(port);
if (!workerFactory_) {
group(nullptr);
}
bool reusePort = false;
if (acceptor_group_->numThreads() >= 0) {
reusePort = true;
}
std::mutex sock_lock;
std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
auto socket = folly::AsyncServerSocket::newSocket();
sock_lock.lock();
new_sockets.push_back(socket);
sock_lock.unlock();
socket->setReusePortEnabled(reusePort);
socket->attachEventBase(EventBaseManager::get()->getEventBase());
socket->bind(port);
// TODO Take ServerSocketConfig
socket->listen(1024);
socket->startAccepting();
if (!workerFactory_) {
group(nullptr);
if (port == 0) {
SocketAddress address;
socket->getAddress(&address);
port = address.getPort();
}
barrier->wait();
};
auto bind0 = std::make_shared<boost::barrier>(2);
acceptor_group_->add(std::bind(startupFunc, bind0));
bind0->wait();
auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
for (int i = 1; i < acceptor_group_->numThreads(); i++) {
acceptor_group_->add(std::bind(startupFunc, barrier));
}
barrier->wait();
// Startup all the threads
for(auto socket : new_sockets) {
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
socket->addAcceptCallback(worker, worker->getEventBase());
});
});
socket->startAccepting();
}
for (auto& socket : new_sockets) {
sockets_.push_back(socket);
}
}
/*
* Stop listening on all sockets.
*/
void stop() {
for (auto& socket : sockets_) {
auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
for (auto socket : sockets_) {
socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
socket->stopAccepting();
socket->detachEventBase();
barrier->wait();
});
}
barrier->wait();
sockets_.clear();
acceptor_group_->join();
io_group_->join();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment