Commit 2b32e9c1 authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

metered scheduling executor adapter

Summary:
Motivation: we sometimes need to add concept of lower priorities on top of executors which provide custom scheduling via custom APIs such `EDFThreadPoolExecutor`.

This adapter wraps an arbitrary executor, and adds an additional queue. Tasks from this queue will be fetched and placed on the wrapped executor's queue in a metered fashion - in particular we only allow up to "maxReadAtOnce" tasks to be placed onto the main executors queue.

Once work is added to metered queue, the adapter schedules callback into the wrapped executor. When callback is executed it:
1) fetches head task from the queue
2) fetches up to maxReadAtOnce - 1, and reschedules those onto the main executor.
3) reschedules itself onto the main executor
4) executes the task fetched at stage 1.

While in theory this algorithm does not provide strong priority guarantees, in practice lower priority tasks are almost always yielding to the higher priority tasks.

This primitive also allows interesting compositions to be built:
1. multiple priorities scheduling can be achieved by chaining metered adapters.
2. Fair queue scheduling (e.g. round robin consumption from N queues) can be achieved by creating N metered adapters wrapping the same executor.

(Note: this ignores all push blocking failures!)

Reviewed By: prshreshtha

Differential Revision: D24983861

fbshipit-source-id: 1a4ce61fc7cc706889da9724de71822bfaab440b
parent 90f04fa1
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/MeteredExecutor.h>
#include <limits>
#include <folly/io/async/AtomicNotificationQueue.h>
namespace folly {
MeteredExecutor::MeteredExecutor(KeepAlive keepAlive)
: kaInner_(std::move(keepAlive)) {
queue_.setMaxReadAtOnce(1);
queue_.arm();
}
MeteredExecutor::MeteredExecutor(std::unique_ptr<Executor> executor)
: MeteredExecutor(getKeepAliveToken(*executor)) {
ownedExecutor_ = std::move(executor);
}
void MeteredExecutor::setMaxReadAtOnce(uint32_t maxAtOnce) {
queue_.setMaxReadAtOnce(maxAtOnce);
}
void MeteredExecutor::add(Func func) {
if (queue_.push(std::move(func))) {
scheduleCallback();
}
}
void MeteredExecutor::loopCallback() {
if (UNLIKELY(draining_)) {
return drain();
}
Consumer consumer(*this);
if (queue_.drive(consumer) || !queue_.arm()) {
scheduleCallback();
}
consumer.executeIfNotEmpty();
}
void MeteredExecutor::scheduleCallback() {
folly::RequestContextScopeGuard g{std::shared_ptr<RequestContext>()};
kaInner_->add([this] { loopCallback(); });
}
void MeteredExecutor::drain() {
bool keepDriving = true;
while (keepDriving) {
Consumer consumer(*this);
keepDriving = queue_.drive(consumer);
consumer.executeIfNotEmpty();
}
drained_.post();
}
MeteredExecutor::~MeteredExecutor() {
joinKeepAlive();
// Shutdown sequence deserves explanation.
// After below task is consumed, consumer transitions into "draining"
// state and a loopcallback is rescheduled. Once loopback is executed again,
// it drains the queue and signals the baton.
add([&] { draining_ = true; });
drained_.wait();
}
MeteredExecutor::Consumer::~Consumer() {
DCHECK(!first_);
}
void MeteredExecutor::Consumer::executeIfNotEmpty() {
if (first_) {
RequestContextScopeGuard guard(std::move(firstRctx_));
auto first = std::move(first_);
first();
}
}
void MeteredExecutor::Consumer::operator()(
Func&& func,
std::shared_ptr<RequestContext>&& rctx) {
if (!first_) {
first_ = std::move(func);
firstRctx_ = std::move(rctx);
} else {
self_.kaInner_->add(
[func = std::move(func), rctx = std::move(rctx)]() mutable {
RequestContextScopeGuard guard(std::move(rctx));
func();
});
}
}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/io/async/AtomicNotificationQueue.h>
namespace folly {
// Attaches a queue to an already existing executor and exposes Executor
// interface to add tasks to that queue.
// Consumption from this queue is "metered". Specifically, only a limited number
// of tasks scheduled onto the resulting executor will be ever simultaneously
// present in the wrapped executor's queue. This mechanism can be used e.g. to
// attach lower priority queue to an already existing executor -
// meaning tasks scheduled on the wrapper will, in practice, yield to task
// scheduled directly on the wrapped executor.
//
// Notice that multi level priorities can be easily achieved via chaining,
// for example:
// auto hipri_exec = std::make_unique<CPUThreadPoolExecutor>(numThreads);
// auto hipri_ka = getKeepAliveToken(hipri_exec.get());
// auto mipri_exec = std::make_unique<MeteredExecutor>(hipri_ka);
// auto mipri_ka = getKeepAliveToken(mipri_exec.get());
// auto lopri_exec = std::make_unique<MeteredExecutor>(mipri_ka);
// auto lopri_ka = getKeepAliveToken(lopri_exec.get());
class MeteredExecutor : public DefaultKeepAliveExecutor {
public:
using KeepAlive = Executor::KeepAlive<>;
// owning constructor
explicit MeteredExecutor(std::unique_ptr<Executor> exe);
// non-owning constructor
explicit MeteredExecutor(KeepAlive keepAlive);
~MeteredExecutor() override;
void setMaxReadAtOnce(uint32_t maxAtOnce);
void add(Func func) override;
private:
void loopCallback();
void scheduleCallback();
void drain();
class Consumer {
Func first_;
std::shared_ptr<RequestContext> firstRctx_;
MeteredExecutor& self_;
public:
explicit Consumer(MeteredExecutor& self) : self_(self) {}
void executeIfNotEmpty();
void operator()(Func&& func, std::shared_ptr<RequestContext>&& rctx);
~Consumer();
};
folly::AtomicNotificationQueue<Func> queue_;
std::unique_ptr<Executor> ownedExecutor_;
KeepAlive kaInner_;
bool draining_{false};
folly::Baton<> drained_;
};
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/MeteredExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
using namespace folly;
class MeteredExecutorTest : public testing::Test {
protected:
void createAdapter(int numLevels, int maxReadAtOnce = 1) {
executors_.resize(numLevels + 1);
std::unique_ptr<folly::Executor> exc =
std::make_unique<CPUThreadPoolExecutor>(1);
executors_[0] = exc.get();
for (int i = 0; i < numLevels; i++) {
auto mlsa = std::make_unique<MeteredExecutor>(std::move(exc));
mlsa->setMaxReadAtOnce(maxReadAtOnce);
exc = std::move(mlsa);
executors_[i + 1] = exc.get();
}
owning_ = std::move(exc);
}
void add(Func f, uint8_t level = 0) { executors_[level]->add(std::move(f)); }
void join() {
folly::Baton baton;
executors_.back()->add([&] { baton.post(); });
baton.wait();
}
Executor::KeepAlive<> getKeepAlive(int level) { return executors_[level]; }
protected:
std::shared_ptr<void> owning_;
std::vector<Executor*> executors_;
};
TEST_F(MeteredExecutorTest, SingleLevel) {
createAdapter(1);
folly::Baton baton;
add([&] { baton.wait(); });
int32_t v = 0;
add([&] { EXPECT_EQ(0, v++); });
// first lopri task executes in the scheduling order with hipri
// tasks, but all subsequent lopri tasks yield.
add([&] { EXPECT_EQ(1, v++); }, 1);
add([&] { EXPECT_EQ(4, v++); }, 1);
add([&] { EXPECT_EQ(5, v++); }, 1);
add([&] { EXPECT_EQ(6, v++); }, 1);
add([&] { EXPECT_EQ(2, v++); });
add([&] { EXPECT_EQ(3, v++); });
baton.post();
join();
EXPECT_EQ(v, 7);
}
TEST_F(MeteredExecutorTest, TwoLevels) {
int32_t v = 0;
createAdapter(2);
folly::Baton baton;
add([&] { baton.wait(); });
add([&] { EXPECT_EQ(0, v++); });
add([&] { EXPECT_EQ(1, v++); }, 1);
add([&] { EXPECT_EQ(4, v++); }, 1);
add([&] { EXPECT_EQ(5, v++); }, 1);
add([&] { EXPECT_EQ(6, v++); }, 1);
add([&] { EXPECT_EQ(7, v++); }, 1);
add([&] { EXPECT_EQ(8, v++); }, 2);
add([&] { EXPECT_EQ(13, v++); }, 2);
add([&] { EXPECT_EQ(14, v++); }, 2);
add([&] { EXPECT_EQ(15, v++); }, 2);
add([&] { EXPECT_EQ(16, v++); }, 2);
add([&] { EXPECT_EQ(9, v++); }, 1);
add([&] { EXPECT_EQ(10, v++); }, 1);
add([&] { EXPECT_EQ(11, v++); }, 1);
add([&] { EXPECT_EQ(12, v++); }, 1);
add([&] { EXPECT_EQ(2, v++); });
add([&] { EXPECT_EQ(3, v++); });
baton.post();
join();
EXPECT_EQ(v, 17);
}
TEST_F(MeteredExecutorTest, PreemptTest) {
int32_t v = 0;
createAdapter(1);
folly::Baton baton1, baton2, baton3;
add([&] { baton1.wait(); });
add([&] { EXPECT_EQ(0, v++); });
// first lopri task executes in the scheduling order with hipri
// tasks, but all subsequent lopri tasks yield.
add([&] { EXPECT_EQ(1, v++); }, 1);
add([&] { EXPECT_EQ(4, v++); }, 1);
add([&] { EXPECT_EQ(5, v++); }, 1);
add([&] { EXPECT_EQ(2, v++); });
add([&] { EXPECT_EQ(3, v++); });
add(
[&] {
EXPECT_EQ(6, v++);
baton2.post();
baton3.wait();
},
1);
// These P1 tasks should be run AFTER the 3 P0 tasks are added below. We
// should expect the P0 tasks to preempt these.
add([&] { EXPECT_EQ(7, v++); }, 1);
add([&] { EXPECT_EQ(11, v++); }, 1);
add([&] { EXPECT_EQ(12, v++); }, 1);
add([&] { EXPECT_EQ(13, v++); }, 1);
// Kick off the tasks, but we'll halt part-way through the P1 tasks to add
// some P0 tasks into the mix.
baton1.post();
baton2.wait();
// Throw in the high pris.
add([&] { EXPECT_EQ(8, v++); });
add([&] { EXPECT_EQ(9, v++); });
add([&] { EXPECT_EQ(10, v++); });
baton3.post();
join();
EXPECT_EQ(v, 14);
}
class MeteredExecutorTestP
: public MeteredExecutorTest,
public ::testing::WithParamInterface<std::tuple<int>> {
protected:
int maxReadAtOnce;
void SetUp() override { std::tie(maxReadAtOnce) = GetParam(); }
};
TEST_P(MeteredExecutorTestP, TwoLevelsWithKeepAlives) {
auto hipri_exec = std::make_unique<CPUThreadPoolExecutor>(1);
auto hipri_ka = getKeepAliveToken(hipri_exec.get());
auto mipri_exec = std::make_unique<MeteredExecutor>(hipri_ka);
mipri_exec->setMaxReadAtOnce(maxReadAtOnce);
auto mipri_ka = getKeepAliveToken(mipri_exec.get());
auto lopri_exec = std::make_unique<MeteredExecutor>(mipri_ka);
lopri_exec->setMaxReadAtOnce(maxReadAtOnce);
executors_ = {hipri_exec.get(), mipri_exec.get(), lopri_exec.get()};
int32_t v = 0;
folly::Baton baton;
add([&] { baton.wait(); });
add([&] { EXPECT_EQ(0, v++); });
add([&] { EXPECT_EQ(1, v++); }, 1);
add([&] { EXPECT_EQ(4, v++); }, 1);
add([&] { EXPECT_EQ(5, v++); }, 1);
add([&] { EXPECT_EQ(6, v++); }, 1);
add([&] { EXPECT_EQ(7, v++); }, 1);
add([&] { EXPECT_EQ(8, v++); }, 2);
add([&] { EXPECT_EQ(13, v++); }, 2);
add([&] { EXPECT_EQ(14, v++); }, 2);
add([&] { EXPECT_EQ(15, v++); }, 2);
add([&] { EXPECT_EQ(16, v++); }, 2);
add([&] { EXPECT_EQ(9, v++); }, 1);
add([&] { EXPECT_EQ(10, v++); }, 1);
add([&] { EXPECT_EQ(11, v++); }, 1);
add([&] { EXPECT_EQ(12, v++); }, 1);
add([&] { EXPECT_EQ(2, v++); });
add([&] { EXPECT_EQ(3, v++); });
baton.post();
lopri_exec.reset();
EXPECT_EQ(v, 17);
}
TEST_P(MeteredExecutorTestP, RequestContext) {
createAdapter(3, maxReadAtOnce);
folly::Baton baton;
add([&] { baton.wait(); });
auto addAndCheckRCTX = [this](int8_t pri = 0) {
RequestContextScopeGuard g;
auto f = [rctx = RequestContext::saveContext()]() mutable {
EXPECT_EQ(rctx.get(), RequestContext::get());
// Verify we do not have dangling reference to finished requests.
static std::shared_ptr<folly::RequestContext> lastFinished;
EXPECT_TRUE(!lastFinished || lastFinished.unique());
lastFinished = std::move(rctx);
};
add(std::move(f), pri);
};
addAndCheckRCTX();
addAndCheckRCTX(3);
addAndCheckRCTX(3);
addAndCheckRCTX(3);
addAndCheckRCTX(1);
addAndCheckRCTX(1);
addAndCheckRCTX(1);
addAndCheckRCTX(2);
addAndCheckRCTX(2);
addAndCheckRCTX(2);
baton.post();
join();
}
INSTANTIATE_TEST_CASE_P(
MeteredExecutorSuite,
MeteredExecutorTestP,
testing::Values(1, 3));
TEST_F(MeteredExecutorTest, ResetJoins) {
createAdapter(2);
folly::Baton baton;
add([&] { baton.wait(); });
int v = 0;
add([&] { ++v; });
add([&] { ++v; }, 2);
add([&] { ++v; }, 2);
add([&] { ++v; }, 1);
add([&] { ++v; }, 1);
baton.post();
owning_.reset();
EXPECT_EQ(v, 5);
}
TEST_F(MeteredExecutorTest, ExceptionHandling) {
createAdapter(2);
folly::Baton baton;
add([&] {
baton.wait();
throw std::runtime_error("dummy");
});
bool thrown = false;
add(
[&] {
thrown = true;
throw std::runtime_error("dummy");
},
1);
baton.post();
join();
EXPECT_EQ(true, thrown);
}
namespace {
coro::Task<bool> co_isOnCPUExc() {
auto executor = co_await coro::co_current_executor;
auto cpuexec = dynamic_cast<CPUThreadPoolExecutor*>(executor);
co_return cpuexec != nullptr;
}
template <typename T>
coro::Task<bool> co_run(Executor::KeepAlive<> ka, coro::Task<T> f) {
auto cpuexec = dynamic_cast<CPUThreadPoolExecutor*>(ka.get());
EXPECT_TRUE(cpuexec != nullptr);
co_return co_await std::move(f).scheduleOn(cpuexec);
}
} // namespace
TEST_F(MeteredExecutorTest, UnderlyingExecutor) {
createAdapter(1);
EXPECT_FALSE(coro::blockingWait(co_isOnCPUExc().scheduleOn(getKeepAlive(1))));
EXPECT_TRUE(coro::blockingWait(
co_run(getKeepAlive(0), co_isOnCPUExc()).scheduleOn(getKeepAlive(0))));
}
......@@ -182,8 +182,13 @@ bool AtomicNotificationQueue<Task>::arm() {
if (!queue_.empty()) {
return false;
}
queue_ = atomicQueue_.arm();
return queue_.empty();
auto queue = atomicQueue_.arm();
if (queue.empty()) {
return true;
} else {
queue_ = std::move(queue);
return false;
}
}
template <typename Task>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment