Commit ebb45e07 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Fix a shutdown race in CPUThreadPoolExecutor::add

Summary:
It's not safe to expect that the executor is alive after a task is added to the queue (this task could be holding the last KeepAlive and when finished - it may unblock the executor shutdown).
The fix is to hold a KeepAlive if some operation has to be done after adding to the queue. However it's safe to avoid this in a case where executor thread count will never drop below 1 (otherwise not having active threads may result in a deadlock) and we already have the maximum number of threads running. This optimization should help avoid grabbing the KeepAlive on the fast path.

The difference from the previous implementation is that we still start with 0 threads. But keep minThreads at at least 1.

Differential Revision: D27751098

fbshipit-source-id: 7b992a73b40984376893c1fe2a4a072002f0a12a
parent c69aa19d
...@@ -14,8 +14,10 @@ ...@@ -14,8 +14,10 @@
* limitations under the License. * limitations under the License.
*/ */
#include <folly/Executor.h>
#include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/CPUThreadPoolExecutor.h>
#include <atomic>
#include <folly/Memory.h> #include <folly/Memory.h>
#include <folly/concurrency/QueueObserver.h> #include <folly/concurrency/QueueObserver.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h> #include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
...@@ -48,14 +50,11 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( ...@@ -48,14 +50,11 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, size_t numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor( : CPUThreadPoolExecutor(
numThreads, std::make_pair(
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads, numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads),
std::move(threadFactory)), std::move(taskQueue),
taskQueue_(taskQueue.release()) { std::move(threadFactory)) {}
setNumThreads(numThreads);
registerThreadPoolExecutor(this);
}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
...@@ -65,19 +64,18 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( ...@@ -65,19 +64,18 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads.first, numThreads.second, std::move(threadFactory)), numThreads.first, numThreads.second, std::move(threadFactory)),
taskQueue_(taskQueue.release()) { taskQueue_(taskQueue.release()) {
setNumThreads(numThreads.first); setNumThreads(numThreads.first);
if (numThreads.second == 0) {
minThreads_.store(1, std::memory_order_relaxed);
}
registerThreadPoolExecutor(this); registerThreadPoolExecutor(this);
} }
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory) size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor( : CPUThreadPoolExecutor(
numThreads, std::make_pair(
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads, numThreads, FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads),
std::move(threadFactory)), std::move(threadFactory)) {}
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads);
registerThreadPoolExecutor(this);
}
CPUThreadPoolExecutor::CPUThreadPoolExecutor( CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads, std::pair<size_t, size_t> numThreads,
...@@ -86,6 +84,9 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( ...@@ -86,6 +84,9 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads.first, numThreads.second, std::move(threadFactory)), numThreads.first, numThreads.second, std::move(threadFactory)),
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) { taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads.first); setNumThreads(numThreads.first);
if (numThreads.second == 0) {
minThreads_.store(1, std::memory_order_relaxed);
}
registerThreadPoolExecutor(this); registerThreadPoolExecutor(this);
} }
...@@ -159,15 +160,7 @@ void CPUThreadPoolExecutor::add(Func func) { ...@@ -159,15 +160,7 @@ void CPUThreadPoolExecutor::add(Func func) {
void CPUThreadPoolExecutor::add( void CPUThreadPoolExecutor::add(
Func func, std::chrono::milliseconds expiration, Func expireCallback) { Func func, std::chrono::milliseconds expiration, Func expireCallback) {
CPUTask task{std::move(func), expiration, std::move(expireCallback), 0}; addImpl<false>(std::move(func), 0, expiration, std::move(expireCallback));
if (auto queueObserver = getQueueObserver(0)) {
task.queueObserverPayload() =
queueObserver->onEnqueued(task.context_.get());
}
auto result = taskQueue_->add(std::move(task));
if (!result.reusedThread) {
ensureActiveThreads();
}
} }
void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) { void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
...@@ -179,15 +172,43 @@ void CPUThreadPoolExecutor::add( ...@@ -179,15 +172,43 @@ void CPUThreadPoolExecutor::add(
int8_t priority, int8_t priority,
std::chrono::milliseconds expiration, std::chrono::milliseconds expiration,
Func expireCallback) { Func expireCallback) {
CHECK(getNumPriorities() > 0); addImpl<true>(
std::move(func), priority, expiration, std::move(expireCallback));
}
template <bool withPriority>
void CPUThreadPoolExecutor::addImpl(
Func func,
int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback) {
if (withPriority) {
CHECK(getNumPriorities() > 0);
}
CPUTask task( CPUTask task(
std::move(func), expiration, std::move(expireCallback), priority); std::move(func), expiration, std::move(expireCallback), priority);
if (auto queueObserver = getQueueObserver(priority)) { if (auto queueObserver = getQueueObserver(priority)) {
task.queueObserverPayload() = task.queueObserverPayload() =
queueObserver->onEnqueued(task.context_.get()); queueObserver->onEnqueued(task.context_.get());
} }
auto result = taskQueue_->addWithPriority(std::move(task), priority);
if (!result.reusedThread) { // It's not safe to expect that the executor is alive after a task is added to
// the queue (this task could be holding the last KeepAlive and when finished
// - it may unblock the executor shutdown).
// If we need executor to be alive after adding into the queue, we have to
// acquire a KeepAlive.
bool mayNeedToAddThreads = minThreads_.load(std::memory_order_relaxed) == 0 ||
activeThreads_.load(std::memory_order_relaxed) <
maxThreads_.load(std::memory_order_relaxed);
folly::Executor::KeepAlive<> ka = mayNeedToAddThreads
? getKeepAliveToken(this)
: folly::Executor::KeepAlive<>{};
auto result = withPriority
? taskQueue_->addWithPriority(std::move(task), priority)
: taskQueue_->add(std::move(task));
if (mayNeedToAddThreads && !result.reusedThread) {
ensureActiveThreads(); ensureActiveThreads();
} }
} }
......
...@@ -162,6 +162,13 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -162,6 +162,13 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
bool tryDecrToStop(); bool tryDecrToStop();
bool taskShouldStop(folly::Optional<CPUTask>&); bool taskShouldStop(folly::Optional<CPUTask>&);
template <bool withPriority>
void addImpl(
Func func,
int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback);
std::unique_ptr<folly::QueueObserverFactory> createQueueObserverFactory(); std::unique_ptr<folly::QueueObserverFactory> createQueueObserverFactory();
QueueObserver* FOLLY_NULLABLE getQueueObserver(int8_t pri); QueueObserver* FOLLY_NULLABLE getQueueObserver(int8_t pri);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment