Commit 7f3dac64 authored by James Sedgwick's avatar James Sedgwick Committed by Anton Likhtarov

stats for ThreadPoolExecutor

Summary:
pool-wide stats via a call on the pool, and per-task stats (e.g. to be funneled into a histogram) via an rx subscription
rx needs a little work before this diff is safe - e.g. synchronization around the subscriber list, and perhaps exposing whether there are any subscribers so we can skip stat tracking if no one is listening
won't commit this without moving rx into folly/experimental of course

the idea is that timeout/expiration notifications can also go through the same subscription channel

haven't run the benchmarks yet and have to leave for the evening but tmrw i'll commit changes to the benchmark and get this stuff into windtunnel so i don't have to do so much manual output inspection on future diffs

Test Plan: added unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, njormrod, bmatheny

FB internal diff: D1558424

Tasks: 5002392, 5002425
parent c049b564
......@@ -22,7 +22,7 @@ const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
std::unique_ptr<BlockingQueue<Task>> taskQueue,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::unique_ptr<ThreadFactory> threadFactory)
: ThreadPoolExecutor(numThreads, std::move(threadFactory)),
taskQueue_(std::move(taskQueue)) {
......@@ -37,29 +37,19 @@ CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
void CPUThreadPoolExecutor::add(Func func) {
// TODO handle enqueue failure, here and in other add() callsites
taskQueue_->add(Task(std::move(func)));
taskQueue_->add(CPUTask(std::move(func)));
}
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
while (1) {
// TODO expiration / codel
auto t = taskQueue_->take();
if (UNLIKELY(t.poison)) {
auto task = taskQueue_->take();
if (UNLIKELY(task.poison)) {
CHECK(threadsToStop_-- > 0);
stoppedThreads_.add(thread);
return;
} else {
thread->idle = false;
try {
t.func();
} catch (const std::exception& e) {
LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " <<
typeid(e).name() << " exception: " << e.what();
} catch (...) {
LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled "
"non-exception object";
}
thread->idle = true;
runTask(thread, std::move(task));
}
if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
......@@ -77,8 +67,12 @@ void CPUThreadPoolExecutor::stopThreads(size_t n) {
CHECK(stoppedThreads_.size() == 0);
threadsToStop_ = n;
for (int i = 0; i < n; i++) {
taskQueue_->add(Task());
taskQueue_->add(CPUTask());
}
}
uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
return taskQueue_->size();
}
}} // folly::wangle
......@@ -21,13 +21,12 @@ namespace folly { namespace wangle {
class CPUThreadPoolExecutor : public ThreadPoolExecutor {
public:
struct Task;
struct CPUTask;
// TODO thread naming, perhaps a required input to ThreadFactories
explicit CPUThreadPoolExecutor(
size_t numThreads,
std::unique_ptr<BlockingQueue<Task>> taskQueue =
folly::make_unique<LifoSemMPMCQueue<Task>>(
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
CPUThreadPoolExecutor::kDefaultMaxQueueSize),
std::unique_ptr<ThreadFactory> threadFactory =
folly::make_unique<NamedThreadFactory>("CPUThreadPool"));
......@@ -36,15 +35,14 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
void add(Func func) override;
struct Task {
explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {}
Task() : func(nullptr), poison(true) {}
Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {}
Task(const Task&) = default;
Task& operator=(const Task&) = default;
Func func;
struct CPUTask : public ThreadPoolExecutor::Task {
// Must be noexcept move constructible so it can be used in MPMCQueue
explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {}
CPUTask() : Task(nullptr), poison(true) {}
CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {}
CPUTask(const CPUTask&) = default;
CPUTask& operator=(const CPUTask&) = default;
bool poison;
// TODO per-task stats, timeouts, expirations
};
static const size_t kDefaultMaxQueueSize;
......@@ -52,8 +50,9 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
private:
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
uint64_t getPendingTaskCount() override;
std::unique_ptr<BlockingQueue<Task>> taskQueue_;
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
std::atomic<ssize_t> threadsToStop_{0};
};
......
......@@ -22,10 +22,14 @@ namespace folly { namespace wangle {
typedef std::function<void()> Func;
namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h
class Executor {
public:
virtual ~Executor() {};
virtual void add(Func func) = 0;
};
}
}} // folly::wangle
......@@ -42,15 +42,15 @@ void IOThreadPoolExecutor::add(Func func) {
auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
auto ioThread = std::static_pointer_cast<IOThread>(thread);
auto moveFunc = folly::makeMoveWrapper(std::move(func));
auto wrappedFunc = [moveFunc, ioThread] () {
(*moveFunc)();
ioThread->outstandingTasks--;
auto moveTask = folly::makeMoveWrapper(Task(std::move(func)));
auto wrappedFunc = [this, ioThread, moveTask] () mutable {
runTask(ioThread, std::move(*moveTask));
ioThread->pendingTasks--;
};
ioThread->outstandingTasks++;
ioThread->pendingTasks++;
if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
ioThread->outstandingTasks--;
ioThread->pendingTasks--;
throw std::runtime_error("Unable to run func in event base thread");
}
}
......@@ -66,13 +66,14 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
ioThread->eventBase.loopForever();
}
if (isJoin_) {
while (ioThread->outstandingTasks > 0) {
while (ioThread->pendingTasks > 0) {
ioThread->eventBase.loopOnce();
}
}
stoppedThreads_.add(ioThread);
}
// threadListLock_ is writelocked
void IOThreadPoolExecutor::stopThreads(size_t n) {
for (int i = 0; i < n; i++) {
const auto ioThread = std::static_pointer_cast<IOThread>(
......@@ -82,4 +83,18 @@ void IOThreadPoolExecutor::stopThreads(size_t n) {
}
}
// threadListLock_ is readlocked
uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
uint64_t count = 0;
for (const auto& thread : threadList_.get()) {
auto ioThread = std::static_pointer_cast<IOThread>(thread);
size_t pendingTasks = ioThread->pendingTasks;
if (pendingTasks > 0 && !ioThread->idle) {
pendingTasks--;
}
count += pendingTasks;
}
return count;
}
}} // folly::wangle
......@@ -35,11 +35,12 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
ThreadPtr makeThread() override;
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
uint64_t getPendingTaskCount() override;
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
IOThread() : shouldRun(true), outstandingTasks(0) {};
IOThread() : shouldRun(true), pendingTasks(0) {};
std::atomic<bool> shouldRun;
std::atomic<size_t> outstandingTasks;
std::atomic<size_t> pendingTasks;
EventBase eventBase;
};
......
......@@ -27,6 +27,25 @@ ThreadPoolExecutor::~ThreadPoolExecutor() {
CHECK(threadList_.get().size() == 0);
}
void ThreadPoolExecutor::runTask(
const ThreadPtr& thread,
Task&& task) {
thread->idle = false;
task.started();
try {
task.func();
} catch (const std::exception& e) {
LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
typeid(e).name() << " exception: " << e.what();
} catch (...) {
LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
"object";
}
task.completed();
taskStatsSubject_.onNext(std::move(task.stats));
thread->idle = true;
}
size_t ThreadPoolExecutor::numThreads() {
RWSpinLock::ReadHolder{&threadListLock_};
return threadList_.get().size();
......@@ -43,6 +62,7 @@ void ThreadPoolExecutor::setNumThreads(size_t n) {
CHECK(threadList_.get().size() == n);
}
// threadListLock_ is writelocked
void ThreadPoolExecutor::addThreads(size_t n) {
for (int i = 0; i < n; i++) {
auto thread = makeThread();
......@@ -54,6 +74,7 @@ void ThreadPoolExecutor::addThreads(size_t n) {
}
}
// threadListLock_ is writelocked
void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
CHECK(n <= threadList_.get().size());
CHECK(stoppedThreads_.size() == 0);
......@@ -79,6 +100,22 @@ void ThreadPoolExecutor::join() {
CHECK(threadList_.get().size() == 0);
}
ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
RWSpinLock::ReadHolder{&threadListLock_};
ThreadPoolExecutor::PoolStats stats;
stats.threadCount = threadList_.get().size();
for (auto thread : threadList_.get()) {
if (thread->idle) {
stats.idleThreadCount++;
} else {
stats.activeThreadCount++;
}
}
stats.pendingTaskCount = getPendingTaskCount();
stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount;
return stats;
}
std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
void ThreadPoolExecutor::StoppedThreadQueue::add(
......
......@@ -18,6 +18,7 @@
#include <folly/experimental/wangle/concurrent/Executor.h>
#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
#include <folly/experimental/wangle/rx/Observable.h>
#include <folly/Memory.h>
#include <folly/RWSpinLock.h>
......@@ -29,7 +30,7 @@
namespace folly { namespace wangle {
class ThreadPoolExecutor : public Executor {
class ThreadPoolExecutor : public experimental::Executor {
public:
explicit ThreadPoolExecutor(
size_t numThreads,
......@@ -41,10 +42,32 @@ class ThreadPoolExecutor : public Executor {
void setNumThreads(size_t numThreads);
void stop();
void join();
// TODO expose stats
struct PoolStats {
PoolStats() : threadCount(0), idleThreadCount(0), activeThreadCount(0),
pendingTaskCount(0), totalTaskCount(0) {}
size_t threadCount, idleThreadCount, activeThreadCount;
uint64_t pendingTaskCount, totalTaskCount;
};
PoolStats getPoolStats();
struct TaskStats {
TaskStats() : expired(false), waitTime(0), runTime(0) {}
bool expired;
std::chrono::microseconds waitTime;
std::chrono::microseconds runTime;
};
Subscription subscribeToTaskStats(
const ObserverPtr<TaskStats>& observer) {
return taskStatsSubject_.subscribe(observer);
}
protected:
// Prerequisite: threadListLock_ writelocked
void addThreads(size_t n);
// Prerequisite: threadListLock_ writelocked
void removeThreads(size_t n, bool isJoin);
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
......@@ -54,20 +77,50 @@ class ThreadPoolExecutor : public Executor {
uint64_t id;
std::thread handle;
bool idle;
// TODO per-thread stats go here
};
typedef std::shared_ptr<Thread> ThreadPtr;
struct Task {
explicit Task(Func&& f) : func(std::move(f)) {
// Assume that the task in enqueued on creation
intervalBegin = std::chrono::steady_clock::now();
}
Func func;
TaskStats stats;
// TODO per-task timeouts, expirations
void started() {
auto now = std::chrono::steady_clock::now();
stats.waitTime = std::chrono::duration_cast<std::chrono::microseconds>(
now - intervalBegin);
intervalBegin = now;
}
void completed() {
stats.runTime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - intervalBegin);
}
std::chrono::steady_clock::time_point intervalBegin;
};
void runTask(const ThreadPtr& thread, Task&& task);
// The function that will be bound to pool threads
virtual void threadRun(ThreadPtr thread) = 0;
// Stop n threads and put their Thread structs in the threadsStopped_ queue
// Stop n threads and put their ThreadPtrs in the threadsStopped_ queue
// Prerequisite: threadListLock_ writelocked
virtual void stopThreads(size_t n) = 0;
// Create a suitable Thread struct
virtual ThreadPtr makeThread() {
return std::make_shared<Thread>();
}
// need a stopThread(id) for keepalive feature
// Prerequisite: threadListLock_ readlocked
virtual uint64_t getPendingTaskCount() = 0;
class ThreadList {
public:
......@@ -112,6 +165,8 @@ class ThreadPoolExecutor : public Executor {
RWSpinLock threadListLock_;
StoppedThreadQueue stoppedThreads_;
std::atomic<bool> isJoin_; // whether the current downsizing is a join
Subject<TaskStats> taskStatsSubject_;
};
}} // folly::wangle
......@@ -22,6 +22,10 @@
using namespace folly::wangle;
static Func burnMs(uint64_t ms) {
return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); };
}
template <class TPE>
static void basic() {
// Create and destroy
......@@ -59,7 +63,7 @@ static void stop() {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
......@@ -82,7 +86,7 @@ static void join() {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
......@@ -105,7 +109,7 @@ static void resizeUnderLoad() {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
......@@ -124,3 +128,75 @@ TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
resizeUnderLoad<IOThreadPoolExecutor>();
}
template <class TPE>
static void poolStats() {
{
TPE tpe(10);
for (int i = 0; i < 20; i++) {
tpe.add(burnMs(20));
}
burnMs(10)();
auto stats = tpe.getPoolStats();
EXPECT_EQ(10, stats.threadCount);
EXPECT_EQ(0, stats.idleThreadCount);
EXPECT_EQ(10, stats.activeThreadCount);
EXPECT_EQ(10, stats.pendingTaskCount);
EXPECT_EQ(20, stats.totalTaskCount);
}
{
TPE tpe(10);
for (int i = 0; i < 5; i++) {
tpe.add(burnMs(20));
}
burnMs(10)();
auto stats = tpe.getPoolStats();
EXPECT_EQ(10, stats.threadCount);
EXPECT_EQ(5, stats.idleThreadCount);
EXPECT_EQ(5, stats.activeThreadCount);
EXPECT_EQ(0, stats.pendingTaskCount);
EXPECT_EQ(5, stats.totalTaskCount);
}
}
TEST(ThreadPoolExecutorTest, CPUPoolStats) {
poolStats<CPUThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, IOPoolStats) {
poolStats<IOThreadPoolExecutor>();
}
template <class TPE>
static void taskStats() {
TPE tpe(10);
std::atomic<int> c(0);
tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
[&] (ThreadPoolExecutor::TaskStats stats) {
int i = c++;
if (i < 10) {
EXPECT_GE(10000, stats.waitTime.count());
EXPECT_LE(20000, stats.runTime.count());
} else {
EXPECT_LE(10000, stats.waitTime.count());
EXPECT_LE(10000, stats.runTime.count());
}
}));
for (int i = 0; i < 10; i++) {
tpe.add(burnMs(20));
}
for (int i = 0; i < 10; i++) {
tpe.add(burnMs(10));
}
tpe.join();
EXPECT_EQ(20, c);
}
TEST(ThreadPoolExecutorTest, CPUTaskStats) {
taskStats<CPUThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, IOTaskStats) {
taskStats<IOThreadPoolExecutor>();
}
......@@ -17,6 +17,7 @@
#pragma once
#include <folly/ExceptionWrapper.h>
#include <folly/wangle/Executor.h>
namespace folly { namespace wangle {
typedef folly::exception_wrapper Error;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment