Commit ca5fc366 authored by James Sedgwick's avatar James Sedgwick Committed by Andrii Grynenko

expose event base from IOThreadPoolExecutor

Summary:
I'm not 100% sure this is the best way to go about this but I don't hate it either.
I'm going to start seeing how it might fit into tserver - my guess is that some sort Cpp2WorkerFactory which also manages those objects would get plugged in as the thread factory
Haven't fleshed out how this would relate to TEventBaseManager

Test Plan: added unit, starting to play with this in Thrift2 server

Reviewed By: davejwatson@fb.com

Subscribers: alandau, bmatheny, trunkagent, fugalh, njormrod

FB internal diff: D1574660
parent 5617e556
...@@ -49,6 +49,7 @@ void CPUThreadPoolExecutor::add( ...@@ -49,6 +49,7 @@ void CPUThreadPoolExecutor::add(
} }
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) { void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
thread->startupBaton.post();
while (1) { while (1) {
auto task = taskQueue_->take(); auto task = taskQueue_->take();
if (UNLIKELY(task.poison)) { if (UNLIKELY(task.poison)) {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/MoveWrapper.h> #include <folly/MoveWrapper.h>
#include <glog/logging.h> #include <glog/logging.h>
#include <thrift/lib/cpp/async/TEventBaseManager.h>
namespace folly { namespace wangle { namespace folly { namespace wangle {
...@@ -57,7 +58,7 @@ void IOThreadPoolExecutor::add( ...@@ -57,7 +58,7 @@ void IOThreadPoolExecutor::add(
}; };
ioThread->pendingTasks++; ioThread->pendingTasks++;
if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) { if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
ioThread->pendingTasks--; ioThread->pendingTasks--;
throw std::runtime_error("Unable to run func in event base thread"); throw std::runtime_error("Unable to run func in event base thread");
} }
...@@ -70,12 +71,15 @@ IOThreadPoolExecutor::makeThread() { ...@@ -70,12 +71,15 @@ IOThreadPoolExecutor::makeThread() {
void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
const auto ioThread = std::static_pointer_cast<IOThread>(thread); const auto ioThread = std::static_pointer_cast<IOThread>(thread);
ioThread->eventBase =
apache::thrift::async::TEventBaseManager::get()->getEventBase();
thread->startupBaton.post();
while (ioThread->shouldRun) { while (ioThread->shouldRun) {
ioThread->eventBase.loopForever(); ioThread->eventBase->loopForever();
} }
if (isJoin_) { if (isJoin_) {
while (ioThread->pendingTasks > 0) { while (ioThread->pendingTasks > 0) {
ioThread->eventBase.loopOnce(); ioThread->eventBase->loopOnce();
} }
} }
stoppedThreads_.add(ioThread); stoppedThreads_.add(ioThread);
...@@ -87,7 +91,7 @@ void IOThreadPoolExecutor::stopThreads(size_t n) { ...@@ -87,7 +91,7 @@ void IOThreadPoolExecutor::stopThreads(size_t n) {
const auto ioThread = std::static_pointer_cast<IOThread>( const auto ioThread = std::static_pointer_cast<IOThread>(
threadList_.get()[i]); threadList_.get()[i]);
ioThread->shouldRun = false; ioThread->shouldRun = false;
ioThread->eventBase.terminateLoopSoon(); ioThread->eventBase->terminateLoopSoon();
} }
} }
......
...@@ -45,7 +45,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -45,7 +45,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
IOThread() : shouldRun(true), pendingTasks(0) {}; IOThread() : shouldRun(true), pendingTasks(0) {};
std::atomic<bool> shouldRun; std::atomic<bool> shouldRun;
std::atomic<size_t> pendingTasks; std::atomic<size_t> pendingTasks;
EventBase eventBase; EventBase* eventBase;
}; };
size_t nextThread_; size_t nextThread_;
......
...@@ -84,14 +84,20 @@ void ThreadPoolExecutor::setNumThreads(size_t n) { ...@@ -84,14 +84,20 @@ void ThreadPoolExecutor::setNumThreads(size_t n) {
// threadListLock_ is writelocked // threadListLock_ is writelocked
void ThreadPoolExecutor::addThreads(size_t n) { void ThreadPoolExecutor::addThreads(size_t n) {
std::vector<ThreadPtr> newThreads;
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto thread = makeThread(); newThreads.push_back(makeThread());
}
for (auto& thread : newThreads) {
// TODO need a notion of failing to create the thread // TODO need a notion of failing to create the thread
// and then handling for that case // and then handling for that case
thread->handle = threadFactory_->newThread( thread->handle = threadFactory_->newThread(
std::bind(&ThreadPoolExecutor::threadRun, this, thread)); std::bind(&ThreadPoolExecutor::threadRun, this, thread));
threadList_.add(thread); threadList_.add(thread);
} }
for (auto& thread : newThreads) {
thread->startupBaton.wait();
}
} }
// threadListLock_ is writelocked // threadListLock_ is writelocked
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h> #include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h> #include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
#include <folly/experimental/wangle/rx/Observable.h> #include <folly/experimental/wangle/rx/Observable.h>
#include <folly/Baton.h>
#include <folly/Memory.h> #include <folly/Memory.h>
#include <folly/RWSpinLock.h> #include <folly/RWSpinLock.h>
...@@ -83,6 +84,7 @@ class ThreadPoolExecutor : public experimental::Executor { ...@@ -83,6 +84,7 @@ class ThreadPoolExecutor : public experimental::Executor {
uint64_t id; uint64_t id;
std::thread handle; std::thread handle;
bool idle; bool idle;
Baton<> startupBaton;
}; };
typedef std::shared_ptr<Thread> ThreadPtr; typedef std::shared_ptr<Thread> ThreadPtr;
...@@ -101,7 +103,8 @@ class ThreadPoolExecutor : public experimental::Executor { ...@@ -101,7 +103,8 @@ class ThreadPoolExecutor : public experimental::Executor {
void runTask(const ThreadPtr& thread, Task&& task); void runTask(const ThreadPtr& thread, Task&& task);
// The function that will be bound to pool threads // The function that will be bound to pool threads. It must call
// thread->startupBaton.post() when it's ready to consume work.
virtual void threadRun(ThreadPtr thread) = 0; virtual void threadRun(ThreadPtr thread) = 0;
// Stop n threads and put their ThreadPtrs in the threadsStopped_ queue // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment