Commit b91dd2f6 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Fix a shutdown race in CPUThreadPoolExecutor::add

Summary:
It's not safe to expect that the executor is alive after a task is added to the queue (this task could be holding the last KeepAlive and when finished - it may unblock the executor shutdown).
The fix is to hold a KeepAlive if some operation has to be done after adding to the queue. However it's safe to avoid this in a case where executor thread count will never drop below 1 (otherwise not having active threads may result in a deadlock) and we already have the maximum number of threads running. This optimization should help avoid grabbing the KeepAlive on the fast path.

Differential Revision: D27584518

fbshipit-source-id: e1242e3f4c40cee4f7e0c6dfca39abe6d17415f1
parent 7c4413da
......@@ -14,8 +14,10 @@
* limitations under the License.
*/
#include <folly/Executor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <atomic>
#include <folly/Memory.h>
#include <folly/concurrency/QueueObserver.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
......@@ -50,7 +52,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::shared_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor(
numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 1 : numThreads,
std::move(threadFactory)),
taskQueue_(taskQueue.release()) {
setNumThreads(numThreads);
......@@ -72,7 +74,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads, std::shared_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor(
numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 1 : numThreads,
std::move(threadFactory)),
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads);
......@@ -83,7 +85,9 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
std::pair<size_t, size_t> numThreads,
std::shared_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor(
numThreads.first, numThreads.second, std::move(threadFactory)),
std::max(numThreads.first, size_t{1}),
numThreads.second,
std::move(threadFactory)),
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads.first);
registerThreadPoolExecutor(this);
......@@ -159,15 +163,7 @@ void CPUThreadPoolExecutor::add(Func func) {
void CPUThreadPoolExecutor::add(
Func func, std::chrono::milliseconds expiration, Func expireCallback) {
CPUTask task{std::move(func), expiration, std::move(expireCallback), 0};
if (auto queueObserver = getQueueObserver(0)) {
task.queueObserverPayload() =
queueObserver->onEnqueued(task.context_.get());
}
auto result = taskQueue_->add(std::move(task));
if (!result.reusedThread) {
ensureActiveThreads();
}
addImpl<false>(std::move(func), 0, expiration, std::move(expireCallback));
}
void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
......@@ -179,15 +175,43 @@ void CPUThreadPoolExecutor::add(
int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback) {
CHECK(getNumPriorities() > 0);
addImpl<true>(
std::move(func), priority, expiration, std::move(expireCallback));
}
template <bool withPriority>
void CPUThreadPoolExecutor::addImpl(
Func func,
int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback) {
if (withPriority) {
CHECK(getNumPriorities() > 0);
}
CPUTask task(
std::move(func), expiration, std::move(expireCallback), priority);
if (auto queueObserver = getQueueObserver(priority)) {
task.queueObserverPayload() =
queueObserver->onEnqueued(task.context_.get());
}
auto result = taskQueue_->addWithPriority(std::move(task), priority);
if (!result.reusedThread) {
// It's not safe to expect that the executor is alive after a task is added to
// the queue (this task could be holding the last KeepAlive and when finished
// - it may unblock the executor shutdown).
// If we need executor to be alive after adding into the queue, we have to
// acquire a KeepAlive.
bool mayNeedToAddThreads = minThreads_.load(std::memory_order_relaxed) == 0 ||
activeThreads_.load(std::memory_order_relaxed) <
maxThreads_.load(std::memory_order_relaxed);
folly::Executor::KeepAlive<> ka = mayNeedToAddThreads
? getKeepAliveToken(this)
: folly::Executor::KeepAlive<>{};
auto result = withPriority
? taskQueue_->addWithPriority(std::move(task), priority)
: taskQueue_->add(std::move(task));
if (mayNeedToAddThreads && !result.reusedThread) {
ensureActiveThreads();
}
}
......
......@@ -162,6 +162,13 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
bool tryDecrToStop();
bool taskShouldStop(folly::Optional<CPUTask>&);
template <bool withPriority>
void addImpl(
Func func,
int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback);
std::unique_ptr<folly::QueueObserverFactory> createQueueObserverFactory();
QueueObserver* FOLLY_NULLABLE getQueueObserver(int8_t pri);
......
......@@ -92,28 +92,38 @@ TEST(CleanupTest, EnsureCleanupAfterTaskBasic) {
}
TEST(CleanupTest, Errors) {
auto cleaned = std::make_unique<Cleaned>();
auto runTest = [](auto releaseCleanedFunc) {
auto cleaned = std::make_unique<Cleaned>();
cleaned->addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); }));
cleaned->addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); }));
cleaned->addCleanup(
folly::makeSemiFuture<folly::Unit>(std::runtime_error("failed cleanup")));
cleaned->addCleanup(folly::makeSemiFuture<folly::Unit>(
std::runtime_error("failed cleanup")));
releaseCleanedFunc(cleaned);
};
folly::ManualExecutor exec;
EXPECT_EXIT(
cleaned->cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec),
runTest([](auto& cleaned) {
folly::ManualExecutor exec;
cleaned->cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec);
}),
testing::KilledBySignal(SIGABRT),
".*noexcept.*");
EXPECT_EXIT(
cleaned.reset(), testing::KilledBySignal(SIGABRT), ".*destructed.*");
runTest([](auto& cleaned) { cleaned.reset(); }),
testing::KilledBySignal(SIGABRT),
".*destructed.*");
// must leak the Cleaned as its destructor will abort.
(void)cleaned.release();
runTest([](auto& cleaned) {
// must leak the Cleaned as its destructor will abort.
(void)cleaned.release();
});
}
TEST(CleanupTest, Invariants) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment