Commit cf7284d0 authored by Robin Cheng's avatar Robin Cheng Committed by Facebook GitHub Bot

Fix a minor race condition for ThreadPoolExecutor.

Summary:
This fixes a race condition detected by ThreadSanitizer.

The thread list is protected by a mutex, but individual structs in the list are not protected, leading to racy access to two of their fields. This diff makes the fields atomic.

Reviewed By: yfeldblum

Differential Revision: D21484748

fbshipit-source-id: a981419f47a2aebaf1d40b23e9c4967cf2104aaa
parent b412547b
......@@ -366,7 +366,7 @@ void EDFThreadPoolExecutor::threadRun(ThreadPtr thread) {
continue;
}
thread->idle = false;
thread->idle.store(false, std::memory_order_relaxed);
auto startTime = std::chrono::steady_clock::now();
TaskStats stats;
stats.enqueueTime = task->enqueueTime_;
......@@ -385,8 +385,9 @@ void EDFThreadPoolExecutor::threadRun(ThreadPtr thread) {
<< "EDFThreadPoolExecutor: func threw unhandled non-exception object";
}
stats.runTime = std::chrono::steady_clock::now() - startTime;
thread->idle = true;
thread->lastActiveTime = std::chrono::steady_clock::now();
thread->idle.store(true, std::memory_order_relaxed);
thread->lastActiveTime.store(
std::chrono::steady_clock::now(), std::memory_order_relaxed);
thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
*thread->taskStatsCallbacks->inCallback = true;
SCOPE_EXIT {
......
......@@ -240,7 +240,7 @@ size_t IOThreadPoolExecutor::getPendingTaskCountImpl() const {
for (const auto& thread : threadList_.get()) {
auto ioThread = std::static_pointer_cast<IOThread>(thread);
size_t pendingTasks = ioThread->pendingTasks;
if (pendingTasks > 0 && !ioThread->idle) {
if (pendingTasks > 0 && !ioThread->idle.load(std::memory_order_relaxed)) {
pendingTasks--;
}
count += pendingTasks;
......
......@@ -79,7 +79,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
folly::EventBaseManager* getEventBaseManager();
private:
struct alignas(folly::cacheline_align_v) IOThread : public Thread {
struct alignas(Thread) IOThread : public Thread {
IOThread(IOThreadPoolExecutor* pool)
: Thread(pool), shouldRun(true), pendingTasks(0) {}
std::atomic<bool> shouldRun;
......
......@@ -77,7 +77,7 @@ ThreadPoolExecutor::Task::Task(
}
void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
thread->idle = false;
thread->idle.store(false, std::memory_order_relaxed);
auto startTime = std::chrono::steady_clock::now();
TaskStats stats;
stats.enqueueTime = task.enqueueTime_;
......@@ -120,8 +120,9 @@ void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
stats.waitTime.count(),
stats.runTime.count());
thread->idle = true;
thread->lastActiveTime = std::chrono::steady_clock::now();
thread->idle.store(true, std::memory_order_relaxed);
thread->lastActiveTime.store(
std::chrono::steady_clock::now(), std::memory_order_relaxed);
thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
*thread->taskStatsCallbacks->inCallback = true;
SCOPE_EXIT {
......@@ -296,8 +297,9 @@ ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() const {
size_t activeTasks = 0;
size_t idleAlive = 0;
for (const auto& thread : threadList_.get()) {
if (thread->idle) {
const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
if (thread->idle.load(std::memory_order_relaxed)) {
const std::chrono::nanoseconds idleTime =
now - thread->lastActiveTime.load(std::memory_order_relaxed);
stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
idleAlive++;
} else {
......
......@@ -28,6 +28,7 @@
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/io/async/Request.h>
#include <folly/portability/GFlags.h>
#include <folly/synchronization/AtomicStruct.h>
#include <folly/synchronization/Baton.h>
#include <glog/logging.h>
......@@ -180,7 +181,10 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
struct TaskStatsCallbackRegistry;
struct alignas(folly::cacheline_align_v) Thread : public ThreadHandle {
struct //
alignas(folly::cacheline_align_v) //
alignas(folly::AtomicStruct<std::chrono::steady_clock::time_point>) //
Thread : public ThreadHandle {
explicit Thread(ThreadPoolExecutor* pool)
: id(nextId++),
handle(),
......@@ -193,8 +197,8 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
static std::atomic<uint64_t> nextId;
uint64_t id;
std::thread handle;
bool idle;
std::chrono::steady_clock::time_point lastActiveTime;
std::atomic<bool> idle;
folly::AtomicStruct<std::chrono::steady_clock::time_point> lastActiveTime;
folly::Baton<> startupBaton;
std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment