Commit a13deea5 authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

Saturation detector. CPUThreadPoolExecutor integration. Diff 2/7

Summary: We should also use saturation detector on queues fronting CPUThreadPoolExecutor.

Reviewed By: andriigrynenko

Differential Revision: D20873689

fbshipit-source-id: 25ffc4cf62ddb222ede4f983f88a1c6545a0d4a5
parent d3443853
......@@ -17,6 +17,7 @@
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/Memory.h>
#include <folly/concurrency/QueueObserver.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
#include <folly/executors/task_queue/PriorityUnboundedBlockingQueue.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
......@@ -124,6 +125,34 @@ CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
deregisterThreadPoolExecutor(this);
stop();
CHECK(threadsToStop_ == 0);
for (auto& observer : queueObservers_) {
delete observer.load(std::memory_order_relaxed);
}
}
QueueObserver* FOLLY_NULLABLE
CPUThreadPoolExecutor::getQueueObserver(int8_t pri) {
if (!queueObserverFactory_) {
return nullptr;
}
auto& slot = queueObservers_[static_cast<uint64_t>(pri)];
if (auto observer = slot.load(std::memory_order_acquire)) {
return observer;
}
// common case is only one queue, need only one observer
if (getNumPriorities() == 1 && pri != 0) {
return getQueueObserver(0);
}
QueueObserver* existingObserver = nullptr;
QueueObserver* newObserver = queueObserverFactory_->create().release();
if (!slot.compare_exchange_strong(existingObserver, newObserver)) {
delete newObserver;
return existingObserver;
} else {
return newObserver;
}
}
void CPUThreadPoolExecutor::add(Func func) {
......@@ -134,8 +163,11 @@ void CPUThreadPoolExecutor::add(
Func func,
std::chrono::milliseconds expiration,
Func expireCallback) {
auto result = taskQueue_->add(
CPUTask(std::move(func), expiration, std::move(expireCallback)));
CPUTask task{std::move(func), expiration, std::move(expireCallback), 0};
if (auto queueObserver = getQueueObserver(0)) {
task.queueObserverPayload() = queueObserver->onEnqueued();
}
auto result = taskQueue_->add(std::move(task));
if (!result.reusedThread) {
ensureActiveThreads();
}
......@@ -151,9 +183,12 @@ void CPUThreadPoolExecutor::add(
std::chrono::milliseconds expiration,
Func expireCallback) {
CHECK(getNumPriorities() > 0);
auto result = taskQueue_->addWithPriority(
CPUTask(std::move(func), expiration, std::move(expireCallback)),
priority);
CPUTask task(
std::move(func), expiration, std::move(expireCallback), priority);
if (auto queueObserver = getQueueObserver(priority)) {
task.queueObserverPayload() = queueObserver->onEnqueued();
}
auto result = taskQueue_->addWithPriority(std::move(task), priority);
if (!result.reusedThread) {
ensureActiveThreads();
}
......@@ -219,6 +254,9 @@ void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
}
}
if (auto queueObserver = getQueueObserver(task->queuePriority())) {
queueObserver->onDequeued(task->queueObserverPayload());
}
runTask(thread, std::move(task.value()));
if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
......@@ -244,4 +282,12 @@ size_t CPUThreadPoolExecutor::getPendingTaskCountImpl() const {
return taskQueue_->size();
}
std::unique_ptr<folly::QueueObserverFactory>
CPUThreadPoolExecutor::createQueueObserverFactory() {
for (auto& observer : queueObservers_) {
observer.store(nullptr, std::memory_order_release);
}
return QueueObserverFactory::make();
}
} // namespace folly
......@@ -16,6 +16,10 @@
#pragma once
#include <limits.h>
#include <array>
#include <folly/concurrency/QueueObserver.h>
#include <folly/executors/ThreadPoolExecutor.h>
DECLARE_bool(dynamic_cputhreadpoolexecutor);
......@@ -124,13 +128,29 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
explicit CPUTask(
Func&& f,
std::chrono::milliseconds expiration,
Func&& expireCallback)
Func&& expireCallback,
int8_t pri)
: Task(std::move(f), expiration, std::move(expireCallback)),
poison(false) {}
poison(false),
priority_(pri) {}
CPUTask()
: Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
: Task(nullptr, std::chrono::milliseconds(0), nullptr),
poison(true),
priority_(0) {}
size_t queuePriority() const {
return priority_;
}
intptr_t& queueObserverPayload() {
return queueObserverPayload_;
}
bool poison;
private:
int8_t priority_;
intptr_t queueObserverPayload_;
};
static const size_t kDefaultMaxQueueSize;
......@@ -146,8 +166,15 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
bool tryDecrToStop();
bool taskShouldStop(folly::Optional<CPUTask>&);
std::unique_ptr<folly::QueueObserverFactory> createQueueObserverFactory();
QueueObserver* FOLLY_NULLABLE getQueueObserver(int8_t pri);
// shared_ptr for type erased dtor to handle extended alignment.
std::shared_ptr<BlockingQueue<CPUTask>> taskQueue_;
// It is possible to have as many detectors as there are priorities,
std::array<std::atomic<folly::QueueObserver*>, UCHAR_MAX + 1> queueObservers_;
std::unique_ptr<folly::QueueObserverFactory> queueObserverFactory_{
createQueueObserverFactory()};
std::atomic<ssize_t> threadsToStop_{0};
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment