Commit 90781bd9 authored by Akrama Baig Mirza's avatar Akrama Baig Mirza Committed by Facebook GitHub Bot

Add thread ID collection to IOThreadPoolExecutor for dogpiles

Summary:
- Use `ThreadIdWorkerProvider` in `IOThreadPoolExecutor` for Thrift dogpiles
- Only enable thread ID collection in `IOThreadPoolExecutor` if `THRIFT_FLAG(enable_io_queue_lag_detection)` is enabled

Reviewed By: yfeldblum

Differential Revision: D32815984

fbshipit-source-id: 9c923e44ca0e3ac93697f7332ac34a95b4c784cb
parent 93c44c56
...@@ -92,6 +92,9 @@ IOThreadPoolExecutor::IOThreadPoolExecutor( ...@@ -92,6 +92,9 @@ IOThreadPoolExecutor::IOThreadPoolExecutor(
eventBaseManager_(ebm) { eventBaseManager_(ebm) {
setNumThreads(numThreads); setNumThreads(numThreads);
registerThreadPoolExecutor(this); registerThreadPoolExecutor(this);
if (options.enableThreadIdCollection) {
threadIdCollector_ = std::make_unique<ThreadIdWorkerProvider>();
}
} }
IOThreadPoolExecutor::IOThreadPoolExecutor( IOThreadPoolExecutor::IOThreadPoolExecutor(
...@@ -106,6 +109,9 @@ IOThreadPoolExecutor::IOThreadPoolExecutor( ...@@ -106,6 +109,9 @@ IOThreadPoolExecutor::IOThreadPoolExecutor(
eventBaseManager_(ebm) { eventBaseManager_(ebm) {
setNumThreads(maxThreads); setNumThreads(maxThreads);
registerThreadPoolExecutor(this); registerThreadPoolExecutor(this);
if (options.enableThreadIdCollection) {
threadIdCollector_ = std::make_unique<ThreadIdWorkerProvider>();
}
} }
IOThreadPoolExecutor::~IOThreadPoolExecutor() { IOThreadPoolExecutor::~IOThreadPoolExecutor() {
...@@ -209,6 +215,16 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { ...@@ -209,6 +215,16 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
ioThread->eventBase = eventBaseManager_->getEventBase(); ioThread->eventBase = eventBaseManager_->getEventBase();
thisThread_.reset(new std::shared_ptr<IOThread>(ioThread)); thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
auto tid = folly::getOSThreadID();
if (threadIdCollector_) {
threadIdCollector_->addTid(tid);
}
SCOPE_EXIT {
if (threadIdCollector_) {
threadIdCollector_->removeTid(tid);
}
};
auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase); auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
ioThread->eventBase->runBeforeLoop(idler.get()); ioThread->eventBase->runBeforeLoop(idler.get());
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/Portability.h> #include <folly/Portability.h>
#include <folly/executors/IOExecutor.h> #include <folly/executors/IOExecutor.h>
#include <folly/executors/QueueObserver.h>
#include <folly/executors/ThreadPoolExecutor.h> #include <folly/executors/ThreadPoolExecutor.h>
#include <folly/io/async/EventBaseManager.h> #include <folly/io/async/EventBaseManager.h>
#include <folly/synchronization/RelaxedAtomic.h> #include <folly/synchronization/RelaxedAtomic.h>
...@@ -57,14 +58,19 @@ FOLLY_MSVC_DISABLE_WARNING(4250) ...@@ -57,14 +58,19 @@ FOLLY_MSVC_DISABLE_WARNING(4250)
class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
public: public:
struct Options { struct Options {
Options() : waitForAll(false) {} Options() : waitForAll(false), enableThreadIdCollection(false) {}
Options& setWaitForAll(bool b) { Options& setWaitForAll(bool b) {
this->waitForAll = b; this->waitForAll = b;
return *this; return *this;
} }
Options& setEnableThreadIdCollection(bool b) {
this->enableThreadIdCollection = b;
return *this;
}
bool waitForAll; bool waitForAll;
bool enableThreadIdCollection;
}; };
explicit IOThreadPoolExecutor( explicit IOThreadPoolExecutor(
...@@ -99,6 +105,11 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { ...@@ -99,6 +105,11 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
folly::EventBaseManager* getEventBaseManager(); folly::EventBaseManager* getEventBaseManager();
// Returns nullptr unless explicitly enabled through constructor
folly::WorkerProvider* getThreadIdCollector() {
return threadIdCollector_.get();
}
protected: protected:
struct alignas(Thread) IOThread : public Thread { struct alignas(Thread) IOThread : public Thread {
IOThread(IOThreadPoolExecutor* pool) IOThread(IOThreadPoolExecutor* pool)
...@@ -120,6 +131,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { ...@@ -120,6 +131,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
relaxed_atomic<size_t> nextThread_; relaxed_atomic<size_t> nextThread_;
folly::ThreadLocal<std::shared_ptr<IOThread>> thisThread_; folly::ThreadLocal<std::shared_ptr<IOThread>> thisThread_;
folly::EventBaseManager* eventBaseManager_; folly::EventBaseManager* eventBaseManager_;
std::unique_ptr<ThreadIdWorkerProvider> threadIdCollector_;
}; };
FOLLY_POP_WARNING FOLLY_POP_WARNING
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment