Commit 17388729 authored by Yicheng Wang's avatar Yicheng Wang Committed by Facebook GitHub Bot

adding QueueObserver to MeteredExecutor

Summary: Adding QueueObserver to MeteredExecutor so that we are able to execute callbacks per enqueue and dequeue

Reviewed By: mshneer

Differential Revision: D30052794

fbshipit-source-id: eec2d55fcd02477791080a7b97cec98cb6ba9875
parent ba3aedb3
......@@ -22,14 +22,15 @@
namespace folly {
MeteredExecutor::MeteredExecutor(KeepAlive keepAlive)
: kaInner_(std::move(keepAlive)) {
MeteredExecutor::MeteredExecutor(KeepAlive keepAlive, Options options)
: options_(std::move(options)), kaInner_(std::move(keepAlive)) {
queue_.setMaxReadAtOnce(1);
queue_.arm();
}
MeteredExecutor::MeteredExecutor(std::unique_ptr<Executor> executor)
: MeteredExecutor(getKeepAliveToken(*executor)) {
MeteredExecutor::MeteredExecutor(
std::unique_ptr<Executor> executor, Options options)
: MeteredExecutor(getKeepAliveToken(*executor), std::move(options)) {
ownedExecutor_ = std::move(executor);
}
......@@ -37,8 +38,28 @@ void MeteredExecutor::setMaxReadAtOnce(uint32_t maxAtOnce) {
queue_.setMaxReadAtOnce(maxAtOnce);
}
std::unique_ptr<QueueObserver> MeteredExecutor::setupQueueObserver() {
if (options_.enableQueueObserver) {
std::string name = "unk";
if (options_.name != "") {
name = options_.name;
}
if (auto factory = folly::QueueObserverFactory::make(
"mex." + name, options_.numPriorities)) {
return factory->create(options_.priority);
}
}
return nullptr;
}
void MeteredExecutor::add(Func func) {
if (queue_.push(std::move(func))) {
auto task = Task(std::move(func));
auto rctxp = RequestContext::saveContext();
if (queueObserver_) {
auto payload = queueObserver_->onEnqueued(rctxp.get());
task.setQueueObserverPayload(payload);
}
if (queue_.push(std::move(rctxp), std::move(task))) {
scheduleCallback();
}
}
......@@ -61,27 +82,30 @@ MeteredExecutor::~MeteredExecutor() {
}
MeteredExecutor::Consumer::~Consumer() {
DCHECK(!first_);
DCHECK(!first_ || !first_->hasFunc());
}
void MeteredExecutor::Consumer::executeIfNotEmpty() {
if (first_) {
RequestContextScopeGuard guard(std::move(firstRctx_));
auto first = std::move(first_);
first();
first->run();
}
}
void MeteredExecutor::Consumer::operator()(
Func&& func, std::shared_ptr<RequestContext>&& rctx) {
Task&& task, std::shared_ptr<RequestContext>&& rctx) {
if (self_.queueObserver_) {
self_.queueObserver_->onDequeued(first_->getQueueObserverPayload());
}
if (!first_) {
first_ = std::move(func);
first_ = std::make_optional<Task>(std::move(task));
firstRctx_ = std::move(rctx);
} else {
self_.kaInner_->add(
[func = std::move(func), rctx = std::move(rctx)]() mutable {
[task = std::move(task), rctx = std::move(rctx)]() mutable {
RequestContextScopeGuard guard(std::move(rctx));
func();
task.run();
});
}
}
......
......@@ -16,7 +16,10 @@
#pragma once
#include <optional>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/executors/QueueObserver.h>
#include <folly/io/async/AtomicNotificationQueue.h>
namespace folly {
......@@ -40,11 +43,20 @@ namespace folly {
// auto lopri_ka = getKeepAliveToken(lopri_exec.get());
class MeteredExecutor : public DefaultKeepAliveExecutor {
public:
struct Options {
Options() {}
std::string name;
bool enableQueueObserver{false};
size_t numPriorities{1};
int8_t priority{0};
};
using KeepAlive = Executor::KeepAlive<>;
// owning constructor
explicit MeteredExecutor(std::unique_ptr<Executor> exe);
explicit MeteredExecutor(
std::unique_ptr<Executor> exe, Options options = Options());
// non-owning constructor
explicit MeteredExecutor(KeepAlive keepAlive);
explicit MeteredExecutor(KeepAlive keepAlive, Options options = Options());
~MeteredExecutor() override;
void setMaxReadAtOnce(uint32_t maxAtOnce);
......@@ -54,23 +66,47 @@ class MeteredExecutor : public DefaultKeepAliveExecutor {
size_t pendingTasks() const { return queue_.size(); }
private:
class Task {
public:
Task() = default;
explicit Task(Func&& func) : func_(std::move(func)) {}
void setQueueObserverPayload(intptr_t newValue) {
queueObserverPayload_ = newValue;
}
intptr_t getQueueObserverPayload() const { return queueObserverPayload_; }
void run() { func_(); }
bool hasFunc() { return func_ ? true : false; }
private:
Func func_;
intptr_t queueObserverPayload_;
};
std::unique_ptr<folly::QueueObserver> setupQueueObserver();
void loopCallback();
void scheduleCallback();
class Consumer {
Func first_;
std::optional<Task> first_;
std::shared_ptr<RequestContext> firstRctx_;
MeteredExecutor& self_;
public:
explicit Consumer(MeteredExecutor& self) : self_(self) {}
void executeIfNotEmpty();
void operator()(Func&& func, std::shared_ptr<RequestContext>&& rctx);
void operator()(Task&& task, std::shared_ptr<RequestContext>&& rctx);
~Consumer();
};
folly::AtomicNotificationQueue<Func> queue_;
Options options_;
folly::AtomicNotificationQueue<Task> queue_;
std::unique_ptr<Executor> ownedExecutor_;
KeepAlive kaInner_;
std::unique_ptr<folly::QueueObserver> queueObserver_{setupQueueObserver()};
};
} // namespace folly
......@@ -101,8 +101,10 @@ AtomicNotificationQueue<Task>::AtomicQueue::~AtomicQueue() {
template <typename Task>
template <typename... Args>
bool AtomicNotificationQueue<Task>::AtomicQueue::push(Args&&... args) {
std::unique_ptr<Node> node(new Node(std::forward<Args>(args)...));
bool AtomicNotificationQueue<Task>::AtomicQueue::pushImpl(
std::shared_ptr<RequestContext> rctx, Args&&... args) {
std::unique_ptr<Node> node(
new Node(std::move(rctx), std::forward<Args>(args)...));
auto head = head_.load(std::memory_order_relaxed);
while (true) {
node->next =
......@@ -118,6 +120,20 @@ bool AtomicNotificationQueue<Task>::AtomicQueue::push(Args&&... args) {
}
}
template <typename Task>
template <typename... Args>
bool AtomicNotificationQueue<Task>::AtomicQueue::push(Args&&... args) {
auto rctx = RequestContext::saveContext();
return pushImpl(std::move(rctx), std::forward<Args>(args)...);
}
template <typename Task>
template <typename... Args>
bool AtomicNotificationQueue<Task>::AtomicQueue::push(
std::shared_ptr<RequestContext> rctx, Args&&... args) {
return pushImpl(std::move(rctx), std::forward<Args>(args)...);
}
template <typename Task>
bool AtomicNotificationQueue<Task>::AtomicQueue::hasTasks() const {
auto head = head_.load(std::memory_order_relaxed);
......@@ -197,6 +213,14 @@ bool AtomicNotificationQueue<Task>::push(Args&&... args) {
return atomicQueue_.push(std::forward<Args>(args)...);
}
template <typename Task>
template <typename... Args>
bool AtomicNotificationQueue<Task>::push(
std::shared_ptr<RequestContext> rctx, Args&&... args) {
pushCount_.fetch_add(1, std::memory_order_relaxed);
return atomicQueue_.push(std::move(rctx), std::forward<Args>(args)...);
}
template <typename Task>
typename AtomicNotificationQueue<Task>::TryPushResult
AtomicNotificationQueue<Task>::tryPush(Task&& task, uint32_t maxSize) {
......
......@@ -87,6 +87,13 @@ class AtomicNotificationQueue {
template <typename... Args>
bool push(Args&&... args);
/*
* Same as above
* but RequestContext is passed from caller
*/
template <typename... Args>
bool push(std::shared_ptr<RequestContext> rctx, Args&&... args);
/*
* Attempts adding a task into the queue.
* Can be called from any thread.
......@@ -99,13 +106,15 @@ class AtomicNotificationQueue {
private:
struct Node {
Task task;
std::shared_ptr<RequestContext> rctx{RequestContext::saveContext()};
std::shared_ptr<RequestContext> rctx;
private:
friend class AtomicNotificationQueue;
template <typename... Args>
explicit Node(Args&&... args) : task(std::forward<Args>(args)...) {}
explicit Node(
std::shared_ptr<RequestContext> requestContext, Args&&... args)
: task(std::forward<Args>(args)...), rctx(std::move(requestContext)) {}
Node* next{};
};
......@@ -189,6 +198,13 @@ class AtomicNotificationQueue {
template <typename... Args>
bool push(Args&&... args);
/*
* Same as above
* but RequestContext is passed from caller
*/
template <typename... Args>
bool push(std::shared_ptr<RequestContext> rctx, Args&&... args);
/*
* Returns true if the queue has tasks.
* Can be called from any thread.
......@@ -212,6 +228,9 @@ class AtomicNotificationQueue {
Queue arm();
private:
template <typename... Args>
bool pushImpl(std::shared_ptr<RequestContext> rctx, Args&&... args);
alignas(folly::cacheline_align_v) std::atomic<Node*> head_{};
static constexpr intptr_t kQueueArmedTag = 1;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment