Commit e132960c authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

Add thread timeout memory barrier

Summary:
Add correct memory barrier to thread timeout / add path.

This seems like the fastest way to maintain safety here.  We could use
a read-biased rw lock instead, but that would mean taking a heavy memory
barrier in add() if we do have to add a thread.

Reviewed By: davidtgoldblatt

Differential Revision: D8161950

fbshipit-source-id: af2bd3a601ced9bb04a8206696e7a998c75f6369
parent f22f6d96
...@@ -164,23 +164,7 @@ bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) { ...@@ -164,23 +164,7 @@ bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) {
if (task) { if (task) {
return false; return false;
} else { } else {
// Try to stop based on idle thread timeout (try_take_for), return tryTimeoutThread();
// if there are at least minThreads running.
if (!minActive()) {
return false;
}
// If this is based on idle thread timeout, then
// adjust vars appropriately (otherwise stop() or join()
// does this).
if (getPendingTaskCountImpl() > 0) {
return false;
}
activeThreads_.store(
activeThreads_.load(std::memory_order_relaxed) - 1,
std::memory_order_relaxed);
threadsToJoin_.store(
threadsToJoin_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
} }
return true; return true;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <folly/executors/ThreadPoolExecutor.h> #include <folly/executors/ThreadPoolExecutor.h>
#include <folly/executors/GlobalThreadPoolList.h> #include <folly/executors/GlobalThreadPoolList.h>
#include <folly/synchronization/AsymmetricMemoryBarrier.h>
namespace folly { namespace folly {
...@@ -384,12 +385,55 @@ void ThreadPoolExecutor::ensureJoined() { ...@@ -384,12 +385,55 @@ void ThreadPoolExecutor::ensureJoined() {
} }
} }
// threadListLock_ must be write locked.
bool ThreadPoolExecutor::tryTimeoutThread() {
// Try to stop based on idle thread timeout (try_take_for),
// if there are at least minThreads running.
if (!minActive()) {
return false;
}
// Remove thread from active count
activeThreads_.store(
activeThreads_.load(std::memory_order_relaxed) - 1,
std::memory_order_relaxed);
// There is a memory ordering constraint w.r.t the queue
// implementation's add() and getPendingTaskCountImpl() - while many
// queues have seq_cst ordering, some do not, so add an explicit
// barrier. tryTimeoutThread is the slow path and only happens once
// every thread timeout; use asymmetric barrier to keep add() fast.
asymmetricHeavyBarrier();
// If this is based on idle thread timeout, then
// adjust vars appropriately (otherwise stop() or join()
// does this).
if (getPendingTaskCountImpl() > 0) {
// There are still pending tasks, we can't stop yet.
// re-up active threads and return.
activeThreads_.store(
activeThreads_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
return false;
}
threadsToJoin_.store(
threadsToJoin_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
return true;
}
// If we can't ensure that we were able to hand off a task to a thread, // If we can't ensure that we were able to hand off a task to a thread,
// attempt to start a thread that handled the task, if we aren't already // attempt to start a thread that handled the task, if we aren't already
// running the maximum number of threads. // running the maximum number of threads.
void ThreadPoolExecutor::ensureActiveThreads() { void ThreadPoolExecutor::ensureActiveThreads() {
ensureJoined(); ensureJoined();
// Matches barrier in tryTimeoutThread(). Ensure task added
// is seen before loading activeThreads_ below.
asymmetricLightBarrier();
// Fast path assuming we are already at max threads. // Fast path assuming we are already at max threads.
auto active = activeThreads_.load(std::memory_order_relaxed); auto active = activeThreads_.load(std::memory_order_relaxed);
auto total = maxThreads_.load(std::memory_order_relaxed); auto total = maxThreads_.load(std::memory_order_relaxed);
......
...@@ -299,6 +299,7 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor { ...@@ -299,6 +299,7 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
void ensureActiveThreads(); void ensureActiveThreads();
void ensureJoined(); void ensureJoined();
bool minActive(); bool minActive();
bool tryTimeoutThread();
// These are only modified while holding threadListLock_, but // These are only modified while holding threadListLock_, but
// are read without holding the lock. // are read without holding the lock.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment