Commit fe22c76c authored by James Sedgwick's avatar James Sedgwick Committed by Dave Watson

priority CPU thread pool

Summary:
just extend CPUThreadPoolExecutor to use a queue that is itself composed of N mpmc queues, one per priority

the verbosity is starting to kill me, i had thought before of truncating Executor of all these pool types and now I'm definitely going to do that unless someone fights me.

Test Plan: added unit; maybe i'm not being clever enough as i couldn't think of many ways to test this reliably so there's just a basic preemption test

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, bmatheny

FB internal diff: D1676452

Tasks: 5002392

Signature: t1:1676452:1416263990:cdf5d44e4a50a6180ba547a3ed4c0c24d4ffdd8f
parent 146fda29
......@@ -15,6 +15,9 @@
*/
#pragma once
#include <glog/logging.h>
namespace folly { namespace wangle {
template <class T>
......@@ -22,6 +25,16 @@ class BlockingQueue {
public:
virtual ~BlockingQueue() {}
virtual void add(T item) = 0;
virtual void addWithPriority(T item, uint32_t priority) {
LOG_FIRST_N(WARNING, 1) <<
"add(item, priority) called on a non-priority queue";
add(std::move(item));
}
virtual uint32_t getNumPriorities() {
LOG_FIRST_N(WARNING, 1) <<
"getNumPriorities() called on a non-priority queue";
return 1;
}
virtual T take() = 0;
virtual size_t size() = 0;
};
......
......@@ -15,10 +15,12 @@
*/
#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
#include <folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
namespace folly { namespace wangle {
const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
......@@ -30,6 +32,31 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
CHECK(threadList_.get().size() == numThreads);
}
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory)
: CPUThreadPoolExecutor(
numThreads,
folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
CPUThreadPoolExecutor::kDefaultMaxQueueSize),
std::move(threadFactory)) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
: CPUThreadPoolExecutor(
numThreads,
std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
uint32_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory)
: CPUThreadPoolExecutor(
numThreads,
folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
numPriorities,
CPUThreadPoolExecutor::kDefaultMaxQueueSize),
std::move(threadFactory)) {}
CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
stop();
CHECK(threadsToStop_ == 0);
......@@ -48,6 +75,30 @@ void CPUThreadPoolExecutor::add(
CPUTask(std::move(func), expiration, std::move(expireCallback)));
}
void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
add(std::move(func), priority, std::chrono::milliseconds(0));
}
void CPUThreadPoolExecutor::add(
Func func,
uint32_t priority,
std::chrono::milliseconds expiration,
Func expireCallback) {
CHECK(priority < getNumPriorities());
taskQueue_->addWithPriority(
CPUTask(std::move(func), expiration, std::move(expireCallback)),
priority);
}
uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
return taskQueue_->getNumPriorities();
}
BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
CPUThreadPoolExecutor::getTaskQueue() {
return taskQueue_.get();
}
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
thread->startupBaton.post();
while (1) {
......
......@@ -15,6 +15,7 @@
*/
#pragma once
#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
namespace folly { namespace wangle {
......@@ -25,9 +26,19 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
explicit CPUThreadPoolExecutor(
size_t numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
CPUThreadPoolExecutor::kDefaultMaxQueueSize),
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
explicit CPUThreadPoolExecutor(size_t numThreads);
explicit CPUThreadPoolExecutor(
size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory);
explicit CPUThreadPoolExecutor(
size_t numThreads,
uint32_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
......@@ -39,6 +50,15 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
std::chrono::milliseconds expiration,
Func expireCallback = nullptr) override;
void add(Func func, uint32_t priority);
void add(
Func func,
uint32_t priority,
std::chrono::milliseconds expiration,
Func expireCallback = nullptr);
uint32_t getNumPriorities() const;
struct CPUTask : public ThreadPoolExecutor::Task {
// Must be noexcept move constructible so it can be used in MPMCQueue
explicit CPUTask(
......@@ -57,6 +77,10 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
};
static const size_t kDefaultMaxQueueSize;
static const size_t kDefaultNumPriorities;
protected:
BlockingQueue<CPUTask>* getTaskQueue();
private:
void threadRun(ThreadPtr thread) override;
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/wangle/concurrent/BlockingQueue.h>
#include <folly/LifoSem.h>
#include <folly/MPMCQueue.h>
namespace folly { namespace wangle {
template <class T>
class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
public:
explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
CHECK(numPriorities > 0);
queues_.reserve(numPriorities);
for (int i = 0; i < numPriorities; i++) {
queues_.push_back(MPMCQueue<T>(capacity));
}
}
uint32_t getNumPriorities() override {
return queues_.size();
}
// Add at lowest priority by default
void add(T item) override {
addWithPriority(std::move(item), 0);
}
void addWithPriority(T item, uint32_t priority) override {
CHECK(priority < queues_.size());
if (!queues_[priority].write(std::move(item))) {
throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
}
sem_.post();
}
T take() override {
T item;
while (true) {
for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
if (it->read(item)) {
return item;
}
}
sem_.wait();
}
}
size_t size() override {
size_t size = 0;
for (auto& q : queues_) {
size += q.size();
}
return size;
}
private:
LifoSem sem_;
std::vector<MPMCQueue<T>> queues_;
};
}} // folly::wangle
......@@ -277,3 +277,26 @@ TEST(ThreadPoolExecutorTest, CPUFuturePool) {
TEST(ThreadPoolExecutorTest, IOFuturePool) {
futureExecutor<IOThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
bool tookLopri = false;
auto completed = 0;
auto hipri = [&] {
EXPECT_FALSE(tookLopri);
completed++;
};
auto lopri = [&] {
tookLopri = true;
completed++;
};
CPUThreadPoolExecutor pool(0, 2);
for (int i = 0; i < 50; i++) {
pool.add(lopri, 0);
}
for (int i = 0; i < 50; i++) {
pool.add(hipri, 1);
}
pool.setNumThreads(1);
pool.join();
EXPECT_EQ(100, completed);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment