Commit c289b7f5 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Add folly::StrandExecutor type

Summary:
This diff adds a new StrandExecutor executor-type that is similar
to SerialExecutor in that it serialises execution of work enqueued
to it, but is more general in that it allows different tasks to be
executed on different executors.

To do this, it separates the queue out into a StrandContext object
which can be passed to multiple StrandExecutor objects, allowing them
each to share a queue, while allowing each StrandExecutor to dispatch
work queued to it to a different parent Executor.

Reviewed By: andriigrynenko

Differential Revision: D21848165

fbshipit-source-id: 4dda6bd13a9cd5b275a11f13101242cd73b327e7
parent 59ce0d0d
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/StrandExecutor.h>
#include <folly/ExceptionString.h>
#include <folly/executors/GlobalExecutor.h>
#include <glog/logging.h>
namespace folly {
// Callable that will process a batch of tasks scheduled
// for execution on the same executor when called.
class StrandContext::Task {
public:
explicit Task(std::shared_ptr<StrandContext> ctx) noexcept
: context_(std::move(ctx)) {}
Task(Task&& t) noexcept : context_(std::move(t.context_)) {}
Task(const Task&) = delete;
Task& operator=(Task&&) = delete;
Task& operator=(const Task&) = delete;
~Task() {
if (context_) {
// Task destructed without being invoked.
// This either happens because an error occurred when attempting
// to enqueue this task to an Executor, or if the Executor dropped
// the task without executing it. Both of these situations are fatal.
LOG(FATAL) << "StrandExecutor Task destroyed without being executed";
}
}
void operator()() noexcept {
DCHECK(context_);
StrandContext::executeNext(std::move(context_));
}
private:
std::shared_ptr<StrandContext> context_;
};
std::shared_ptr<StrandContext> StrandContext::create() {
return std::make_shared<StrandContext>(PrivateTag{});
}
void StrandContext::add(Func func, Executor::KeepAlive<> executor) {
addImpl(QueueItem{std::move(func), std::move(executor), folly::none});
}
void StrandContext::addWithPriority(
Func func,
Executor::KeepAlive<> executor,
int8_t priority) {
addImpl(QueueItem{std::move(func), std::move(executor), priority});
}
void StrandContext::addImpl(QueueItem&& item) {
queue_.enqueue(std::move(item));
if (scheduled_.fetch_add(1, std::memory_order_acq_rel) == 0) {
// This thread was first to mark queue as nonempty, so we are
// responsible for scheduling the first queued item onto the
// executor.
// Note that due to a potential race with another thread calling
// add[WithPriority]() concurrently, the first item in queue is
// not necessarily the item this thread just enqueued so need
// to re-query it from the queue.
dispatchFrontQueueItem(shared_from_this());
}
}
void StrandContext::executeNext(
std::shared_ptr<StrandContext> thisPtr) noexcept {
// Put a cap on the number of items we process in one batch before
// rescheduling on to the executor to avoid starvation of other
// items queued to the current executor.
const std::size_t maxItemsToProcessSynchronously = 32;
std::size_t queueSize = thisPtr->scheduled_.load(std::memory_order_acquire);
DCHECK(queueSize != 0u);
const QueueItem* nextItem = nullptr;
std::size_t pendingCount = 0;
for (std::size_t i = 0; i < maxItemsToProcessSynchronously; ++i) {
QueueItem item = thisPtr->queue_.dequeue();
try {
std::exchange(item.func, {})();
} catch (const std::exception& ex) {
LOG(DFATAL) << "StrandExecutor: func threw unhandled exception "
<< folly::exceptionStr(ex);
} catch (...) {
LOG(DFATAL) << "StrandExecutor: func threw unhandled non-exception "
"object";
}
++pendingCount;
if (pendingCount == queueSize) {
queueSize = thisPtr->scheduled_.fetch_sub(
pendingCount, std::memory_order_acq_rel) -
pendingCount;
if (queueSize == 0) {
// Queue is now empty
return;
}
pendingCount = 0;
}
nextItem = thisPtr->queue_.try_peek();
DCHECK(nextItem != nullptr);
// Check if the next item has the same executor.
// If so we'll go around the loop again, otherwise
// we'll dispatch to the other executor and return.
if (nextItem->executor.get() != item.executor.get()) {
break;
}
}
DCHECK(nextItem != nullptr);
DCHECK(pendingCount < queueSize);
[[maybe_unused]] auto prevQueueSize =
thisPtr->scheduled_.fetch_sub(pendingCount, std::memory_order_relaxed);
DCHECK(pendingCount < prevQueueSize);
// Reuse the shared_ptr from the previous item.
dispatchFrontQueueItem(std::move(thisPtr));
}
void StrandContext::dispatchFrontQueueItem(
std::shared_ptr<StrandContext> thisPtr) noexcept {
const QueueItem* item = thisPtr->queue_.try_peek();
DCHECK(item != nullptr);
// NOTE: We treat any failure to schedule work onto the item's executor as a
// fatal error. This will either result in LOG(FATAL) being called from
// the Task destructor or std::terminate() being called by
// exception-unwinding, depending on the compiler/runtime.
Task task{std::move(thisPtr)};
if (item->priority.has_value()) {
item->executor->addWithPriority(std::move(task), item->priority.value());
} else {
item->executor->add(std::move(task));
}
}
Executor::KeepAlive<StrandExecutor> StrandExecutor::create() {
return create(StrandContext::create(), getGlobalCPUExecutor());
}
Executor::KeepAlive<StrandExecutor> StrandExecutor::create(
std::shared_ptr<StrandContext> context) {
return create(std::move(context), getGlobalCPUExecutor());
}
Executor::KeepAlive<StrandExecutor> StrandExecutor::create(
Executor::KeepAlive<> parentExecutor) {
return create(StrandContext::create(), std::move(parentExecutor));
}
Executor::KeepAlive<StrandExecutor> StrandExecutor::create(
std::shared_ptr<StrandContext> context,
Executor::KeepAlive<> parentExecutor) {
auto* ex = new StrandExecutor(std::move(context), std::move(parentExecutor));
return Executor::makeKeepAlive<StrandExecutor>(ex);
}
void StrandExecutor::add(Func f) {
context_->add(std::move(f), parent_);
}
void StrandExecutor::addWithPriority(Func f, int8_t priority) {
context_->addWithPriority(std::move(f), parent_, priority);
}
uint8_t StrandExecutor::getNumPriorities() const {
return parent_->getNumPriorities();
}
bool StrandExecutor::keepAliveAcquire() {
[[maybe_unused]] auto oldCount =
refCount_.fetch_add(1, std::memory_order_relaxed);
DCHECK(oldCount > 0);
return true;
}
void StrandExecutor::keepAliveRelease() {
const auto oldCount = refCount_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK(oldCount > 0);
if (oldCount == 1) {
// Last reference.
delete this;
}
}
StrandExecutor::StrandExecutor(
std::shared_ptr<StrandContext> context,
Executor::KeepAlive<> parent) noexcept
: refCount_(1), parent_(std::move(parent)), context_(std::move(context)) {}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Optional.h>
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/executors/SequencedExecutor.h>
#include <atomic>
#include <memory>
namespace folly {
class StrandExecutor;
// StrandExecutor / StrandContext
//
// A StrandExecutor, like a SerialExecutor, serialises execution of work added
// to it, while deferring the actual execution of this work to another
// Executor.
//
// However, unlike the SerialExecutor, a StrandExecutor allows multiple
// StrandExecutors, each potentially delegating to a different parent Executor,
// to share the same serialising queue.
//
// This can be used in cases where you want to execute work on a
// caller-provided execution context, but want to make sure that work is
// serialised with respect to other work that might be scheduled on other
// underlying Executors.
//
// e.g. Using StrandExecutor with folly::coro::Task to enforce that only
// a single thread accesses the object at a time, while still allowing
// other other methods to execute while the current one is suspended
//
// class MyObject {
// public:
//
// folly::coro::Task<T> someMethod() {
// auto currentEx = co_await folly::coro::co_current_executor;
// auto strandEx = folly::StrandExecutor::create(strand_, currentEx);
// co_return co_await someMethodImpl().scheduleOn(strandEx);
// }
//
// private:
// folly::coro::Task<T> someMethodImpl() {
// // Safe to access otherState_ without further synchronisation.
// modify(otherState_);
//
// // Other instances of someMethod() calls will be able to run
// // while this coroutine is suspended waiting for an RPC call
// // to complete.
// co_await getClient()->co_someRpcCall();
//
// // When that call resumes, this coroutine is once again
// // executing with mutual exclusion.
// modify(otherState_);
// }
//
// std::shared_ptr<StrandContext> strand_{StrandContext::create()};
// X otherState_;
// };
//
class StrandContext : public std::enable_shared_from_this<StrandContext> {
public:
// Create a new StrandContext object. This will allow scheduling work
// that will execute at most one task at a time but delegate the actual
// execution to an execution context associated with each particular
// function.
static std::shared_ptr<StrandContext> create();
// Schedule 'func()' to be called on 'executor' after all prior functions
// scheduled to this context have completed.
void add(Func func, Executor::KeepAlive<> executor);
// Schedule `func()` to be called on `executor` with specified priority
// after all prior functions scheduled to this context have completed.
//
// Note, that the priority will only affect the priority of the scheduling
// of this particular function once all prior tasks have finished executing.
void
addWithPriority(Func func, Executor::KeepAlive<> executor, int8_t priority);
private:
struct PrivateTag {};
class Task;
public:
// Public to allow construction using std::make_shared() but a logically
// private constructor. Try to enforce this by forcing use of a private
// tag-type as a parameter.
explicit StrandContext(PrivateTag) {}
private:
struct QueueItem {
Func func;
Executor::KeepAlive<> executor;
Optional<std::int8_t> priority;
};
void addImpl(QueueItem&& item);
static void executeNext(std::shared_ptr<StrandContext> thisPtr) noexcept;
static void dispatchFrontQueueItem(
std::shared_ptr<StrandContext> thisPtr) noexcept;
std::atomic<std::size_t> scheduled_{0};
UMPSCQueue<QueueItem, /*MayBlock=*/false> queue_;
};
class StrandExecutor final : public SequencedExecutor {
public:
// Creates a new StrandExecutor that is independent of other StrandExcutors.
//
// Work enqueued to the returned executor will be executed on the global CPU
// thread-pool but will only execute at most one function enqueued to this
// executor at a time.
static Executor::KeepAlive<StrandExecutor> create();
// Creates a new StrandExecutor that shares the serialised queue with other
// StrandExecutors constructed with the same context.
//
// Work enqueued to the returned executor will be executed on the global CPU
// thread-pool but will only execute at most one function enqueued to any
// StrandExecutor created with the same StrandContext.
static Executor::KeepAlive<StrandExecutor> create(
std::shared_ptr<StrandContext> context);
// Creates a new StrandExecutor that will serialise execution of any work
// enqueued to it, running the work on the parentExecutor.
static Executor::KeepAlive<StrandExecutor> create(
Executor::KeepAlive<> parentExecutor);
// Creates a new StrandExecutor that will execute work on 'parentExecutor'
// but that will execute at most one function at a time enqueued to any
// StrandExecutor created with the same StrandContext.
static Executor::KeepAlive<StrandExecutor> create(
std::shared_ptr<StrandContext> context,
Executor::KeepAlive<> parentExecutor);
void add(Func f) override;
void addWithPriority(Func f, int8_t priority) override;
uint8_t getNumPriorities() const override;
protected:
bool keepAliveAcquire() override;
void keepAliveRelease() override;
private:
explicit StrandExecutor(
std::shared_ptr<StrandContext> context,
Executor::KeepAlive<> parent) noexcept;
private:
std::atomic<std::size_t> refCount_;
Executor::KeepAlive<> parent_;
std::shared_ptr<StrandContext> context_;
};
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/StrandExecutor.h>
#include <folly/CancellationToken.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/synchronization/Baton.h>
#include <atomic>
#include <chrono>
#include <stdexcept>
#include <thread>
#include <vector>
#include <folly/portability/GTest.h>
using namespace folly;
using namespace std::chrono_literals;
namespace {
template <typename Duration>
void burnTime(Duration d) {
/* sleep override */ std::this_thread::sleep_for(d);
}
} // namespace
TEST(StrandExecutor, SimpleTest) {
auto exec = StrandExecutor::create();
// Checks that tasks are serialised (ie. that we don't corrupt the vector)
// and that task are processed in-order.
std::vector<int> v;
for (int i = 0; i < 20; ++i) {
exec->add([&, i] {
v.emplace_back();
burnTime(1ms);
v.back() = i;
});
}
folly::Baton baton;
exec->add([&] { baton.post(); });
baton.wait();
CHECK_EQ(20, v.size());
for (int i = 0; i < 20; ++i) {
CHECK_EQ(i, v[i]);
}
}
TEST(StrandExecutor, ThreadSafetyTest) {
auto strandContext = StrandContext::create();
ManualExecutor ex1;
ManualExecutor ex2;
CancellationSource cancelSrc;
auto runUntilStopped = [&](ManualExecutor& ex) {
CancellationCallback cb(
cancelSrc.getToken(), [&]() noexcept { ex.add([] {}); });
while (!cancelSrc.isCancellationRequested()) {
ex.makeProgress();
}
};
std::thread t1{[&] { runUntilStopped(ex1); }};
std::thread t2{[&] { runUntilStopped(ex2); }};
int value = 0;
auto incrementValue = [&]() noexcept {
++value;
};
auto strandEx1 =
StrandExecutor::create(strandContext, getKeepAliveToken(ex1));
auto strandEx2 =
StrandExecutor::create(strandContext, getKeepAliveToken(ex2));
auto submitSomeTasks = [&]() {
for (int i = 0; i < 10'000; ++i) {
strandEx1->add(incrementValue);
strandEx2->add(incrementValue);
}
};
std::thread submitter1{submitSomeTasks};
std::thread submitter2{submitSomeTasks};
submitter1.join();
submitter2.join();
folly::Baton b1;
folly::Baton b2;
strandEx1->add([&] { b1.post(); });
strandEx2->add([&] { b2.post(); });
b1.wait();
b2.wait();
CHECK_EQ(40'000, value);
cancelSrc.requestCancellation();
t1.join();
t2.join();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment