Commit 2a80218d authored by Lee Howes's avatar Lee Howes Committed by Facebook GitHub Bot

Add terminateOnBlocking parameter to CPUThreadPoolExecutor

Summary: Parameterises CPUThreadPoolExecutor to allow us to construct one that prohibits blocking operations on its threads. The flag is stored as a thread local and applied using a scoped guard.

Reviewed By: yfeldblum

Differential Revision: D27893465

fbshipit-source-id: 71af1c0b7c23752f499712f498b6abb58a345979
parent 2798cd1b
...@@ -49,20 +49,24 @@ const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14; ...@@ -49,20 +49,24 @@ const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: CPUThreadPoolExecutor( : CPUThreadPoolExecutor(
std::make_pair( std::make_pair(
numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads), numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads),
std::move(taskQueue), std::move(taskQueue),
std::move(threadFactory)) {} std::move(threadFactory),
std::move(opt)) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: ThreadPoolExecutor( : ThreadPoolExecutor(
numThreads.first, numThreads.second, std::move(threadFactory)), numThreads.first, numThreads.second, std::move(threadFactory)),
taskQueue_(taskQueue.release()) { taskQueue_(taskQueue.release()),
prohibitBlockingOnThreadPools_{opt.blocking} {
setNumThreads(numThreads.first); setNumThreads(numThreads.first);
if (numThreads.second == 0) { if (numThreads.second == 0) {
minThreads_.store(1, std::memory_order_relaxed); minThreads_.store(1, std::memory_order_relaxed);
...@@ -71,18 +75,23 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( ...@@ -71,18 +75,23 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
} }
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory) size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: CPUThreadPoolExecutor( : CPUThreadPoolExecutor(
std::make_pair( std::make_pair(
numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads), numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads),
std::move(threadFactory)) {} std::move(threadFactory),
std::move(opt)) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: ThreadPoolExecutor( : ThreadPoolExecutor(
numThreads.first, numThreads.second, std::move(threadFactory)), numThreads.first, numThreads.second, std::move(threadFactory)),
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) { taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})),
prohibitBlockingOnThreadPools_{opt.blocking} {
setNumThreads(numThreads.first); setNumThreads(numThreads.first);
if (numThreads.second == 0) { if (numThreads.second == 0) {
minThreads_.store(1, std::memory_order_relaxed); minThreads_.store(1, std::memory_order_relaxed);
...@@ -90,30 +99,36 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( ...@@ -90,30 +99,36 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
registerThreadPoolExecutor(this); registerThreadPoolExecutor(this);
} }
CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads) CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads, Options opt)
: CPUThreadPoolExecutor( : CPUThreadPoolExecutor(
numThreads, std::make_shared<NamedThreadFactory>("CPUThreadPool")) {} numThreads,
std::make_shared<NamedThreadFactory>("CPUThreadPool"),
std::move(opt)) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
int8_t numPriorities, int8_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: CPUThreadPoolExecutor( : CPUThreadPoolExecutor(
numThreads, numThreads,
std::make_unique<PriorityUnboundedBlockingQueue<CPUTask>>( std::make_unique<PriorityUnboundedBlockingQueue<CPUTask>>(
numPriorities), numPriorities),
std::move(threadFactory)) {} std::move(threadFactory),
std::move(opt)) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
int8_t numPriorities, int8_t numPriorities,
size_t maxQueueSize, size_t maxQueueSize,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory,
Options opt)
: CPUThreadPoolExecutor( : CPUThreadPoolExecutor(
numThreads, numThreads,
std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>( std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
numPriorities, maxQueueSize), numPriorities, maxQueueSize),
std::move(threadFactory)) {} std::move(threadFactory),
std::move(opt)) {}
CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
deregisterThreadPoolExecutor(this); deregisterThreadPoolExecutor(this);
...@@ -250,7 +265,15 @@ bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) { ...@@ -250,7 +265,15 @@ bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) {
void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) { void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
this->threadPoolHook_.registerThread(); this->threadPoolHook_.registerThread();
ExecutorBlockingGuard guard{ExecutorBlockingGuard::TrackTag{}, executorName}; auto guard = [&]() {
if (prohibitBlockingOnThreadPools_ == Options::Blocking::prohibit) {
return ExecutorBlockingGuard{
ExecutorBlockingGuard::ProhibitTag{}, executorName};
} else {
return ExecutorBlockingGuard{
ExecutorBlockingGuard::TrackTag{}, executorName};
}
}();
thread->startupBaton.post(); thread->startupBaton.post();
while (true) { while (true) {
......
...@@ -68,40 +68,62 @@ namespace folly { ...@@ -68,40 +68,62 @@ namespace folly {
class CPUThreadPoolExecutor : public ThreadPoolExecutor { class CPUThreadPoolExecutor : public ThreadPoolExecutor {
public: public:
struct CPUTask; struct CPUTask;
struct Options {
enum class Blocking {
prohibit,
allow,
};
constexpr Options() noexcept : blocking{Blocking::allow} {}
Options setBlocking(Blocking b) {
blocking = b;
return *this;
}
Blocking blocking;
};
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool")); std::make_shared<NamedThreadFactory>("CPUThreadPool"),
Options opt = {});
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool")); std::make_shared<NamedThreadFactory>("CPUThreadPool"),
Options opt = {});
explicit CPUThreadPoolExecutor(size_t numThreads); explicit CPUThreadPoolExecutor(size_t numThreads, Options opt = {});
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory); size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory,
Options opt = {});
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
std::shared_ptr<ThreadFactory> threadFactory); std::shared_ptr<ThreadFactory> threadFactory,
Options opt = {});
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
int8_t numPriorities, int8_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool")); std::make_shared<NamedThreadFactory>("CPUThreadPool"),
Options opt = {});
CPUThreadPoolExecutor( CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
int8_t numPriorities, int8_t numPriorities,
size_t maxQueueSize, size_t maxQueueSize,
std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool")); std::make_shared<NamedThreadFactory>("CPUThreadPool"),
Options opt = {});
~CPUThreadPoolExecutor() override; ~CPUThreadPoolExecutor() override;
...@@ -179,6 +201,7 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -179,6 +201,7 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
std::unique_ptr<folly::QueueObserverFactory> queueObserverFactory_{ std::unique_ptr<folly::QueueObserverFactory> queueObserverFactory_{
createQueueObserverFactory()}; createQueueObserverFactory()};
std::atomic<ssize_t> threadsToStop_{0}; std::atomic<ssize_t> threadsToStop_{0};
Options::Blocking prohibitBlockingOnThreadPools_ = Options::Blocking::allow;
}; };
} // namespace folly } // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment