Commit 0a14b4cc authored by James Sedgwick's avatar James Sedgwick Committed by Dave Watson

fix potential race/memory corruption in IOThreadPoolExecutor

Summary:
In unusual but possible circumstances, the EventBase and thus pending tasks will outlive the pool, so we shouldn't keep references of any kind to the pool in the task.
The only reference we were keeping was used to access the task stats rx subject. Store the subject as a shared ptr and give a copy of the ptr to the Thread object, which is itself
owned by a shared ptr and captured by every task. I thought this had to do with the thread local leak in mentioned in the test plan of D1682860 but this patch doesn't actually fix that :(
Thankfully, while task surfing I saw @phillip's awesome D1682698. Patching that in fixes the leak! Woo. Either way, this is more correct.

Test Plan: unit under clang/asan

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, philipp

FB internal diff: D1683221

Tasks: 5336655

Signature: t1:1683221:1416264933:946d29b5a3eb22ed08812f2adefb7284b1899e4e
parent fe22c76c
......@@ -93,7 +93,7 @@ void IOThreadPoolExecutor::add(
auto moveTask = folly::makeMoveWrapper(
Task(std::move(func), expiration, std::move(expireCallback)));
auto wrappedFunc = [this, ioThread, moveTask] () mutable {
auto wrappedFunc = [ioThread, moveTask] () mutable {
runTask(ioThread, std::move(*moveTask));
ioThread->pendingTasks--;
};
......@@ -107,7 +107,7 @@ void IOThreadPoolExecutor::add(
std::shared_ptr<ThreadPoolExecutor::Thread>
IOThreadPoolExecutor::makeThread() {
return std::make_shared<IOThread>();
return std::make_shared<IOThread>(this);
}
void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
......
......@@ -42,7 +42,10 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
uint64_t getPendingTaskCount() override;
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
IOThread() : shouldRun(true), pendingTasks(0) {};
IOThread(IOThreadPoolExecutor* pool)
: Thread(pool),
shouldRun(true),
pendingTasks(0) {};
std::atomic<bool> shouldRun;
std::atomic<size_t> pendingTasks;
EventBase* eventBase;
......
......@@ -21,7 +21,8 @@ namespace folly { namespace wangle {
ThreadPoolExecutor::ThreadPoolExecutor(
size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory)
: threadFactory_(std::move(threadFactory)) {}
: threadFactory_(std::move(threadFactory)),
taskStatsSubject_(std::make_shared<Subject<TaskStats>>()) {}
ThreadPoolExecutor::~ThreadPoolExecutor() {
CHECK(threadList_.get().size() == 0);
......@@ -63,7 +64,7 @@ void ThreadPoolExecutor::runTask(
task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
}
thread->idle = true;
taskStatsSubject_.onNext(std::move(task.stats_));
thread->taskStatsSubject->onNext(std::move(task.stats_));
}
size_t ThreadPoolExecutor::numThreads() {
......
......@@ -73,7 +73,7 @@ class ThreadPoolExecutor : public Executor {
Subscription<TaskStats> subscribeToTaskStats(
const ObserverPtr<TaskStats>& observer) {
return taskStatsSubject_.subscribe(observer);
return taskStatsSubject_->subscribe(observer);
}
protected:
......@@ -83,13 +83,20 @@ class ThreadPoolExecutor : public Executor {
void removeThreads(size_t n, bool isJoin);
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
explicit Thread(ThreadPoolExecutor* pool)
: id(nextId++),
handle(),
idle(true),
taskStatsSubject(pool->taskStatsSubject_) {}
virtual ~Thread() {}
Thread() : id(nextId++), handle(), idle(true) {};
static std::atomic<uint64_t> nextId;
uint64_t id;
std::thread handle;
bool idle;
Baton<> startupBaton;
std::shared_ptr<Subject<TaskStats>> taskStatsSubject;
};
typedef std::shared_ptr<Thread> ThreadPtr;
......@@ -106,7 +113,7 @@ class ThreadPoolExecutor : public Executor {
Func expireCallback_;
};
void runTask(const ThreadPtr& thread, Task&& task);
static void runTask(const ThreadPtr& thread, Task&& task);
// The function that will be bound to pool threads. It must call
// thread->startupBaton.post() when it's ready to consume work.
......@@ -118,7 +125,7 @@ class ThreadPoolExecutor : public Executor {
// Create a suitable Thread struct
virtual ThreadPtr makeThread() {
return std::make_shared<Thread>();
return std::make_shared<Thread>(this);
}
// Prerequisite: threadListLock_ readlocked
......@@ -168,7 +175,7 @@ class ThreadPoolExecutor : public Executor {
StoppedThreadQueue stoppedThreads_;
std::atomic<bool> isJoin_; // whether the current downsizing is a join
Subject<TaskStats> taskStatsSubject_;
std::shared_ptr<Subject<TaskStats>> taskStatsSubject_;
};
}} // folly::wangle
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment