Commit c5d7abd5 authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

Allow Consumer of AtomicNotificationQueue to manage folly::RequestContext

Reviewed By: prshreshtha

Differential Revision: D25387330

fbshipit-source-id: 027e04024e01d1eab938d67a2d1dfc93c9322b15
parent ebb18e1e
...@@ -225,23 +225,63 @@ template < ...@@ -225,23 +225,63 @@ template <
AtomicNotificationQueueTaskStatus>::value>> AtomicNotificationQueueTaskStatus>::value>>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask( AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer&& consumer, Consumer&& consumer,
Task&& task) { Task&& task,
std::shared_ptr<RequestContext>&& rctx) {
RequestContextScopeGuard rcsg(std::move(rctx));
return consumer(std::forward<Task>(task)); return consumer(std::forward<Task>(task));
} }
template <
typename Task,
typename Consumer,
typename = std::enable_if_t<std::is_same<
invoke_result_t<Consumer, Task&&, std::shared_ptr<RequestContext>&&>,
AtomicNotificationQueueTaskStatus>::value>,
typename = void>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer&& consumer,
Task&& task,
std::shared_ptr<RequestContext>&& rctx) {
return consumer(
std::forward<Task>(task),
std::forward<std::shared_ptr<RequestContext>>(rctx));
}
template < template <
typename Task, typename Task,
typename Consumer, typename Consumer,
typename = std::enable_if_t< typename = std::enable_if_t<
std::is_same<invoke_result_t<Consumer, Task&&>, void>::value>, std::is_same<invoke_result_t<Consumer, Task&&>, void>::value>,
typename = void,
typename = void> typename = void>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask( AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer&& consumer, Consumer&& consumer,
Task&& task) { Task&& task,
std::shared_ptr<RequestContext>&& rctx) {
RequestContextScopeGuard rcsg(std::move(rctx));
consumer(std::forward<Task>(task)); consumer(std::forward<Task>(task));
return AtomicNotificationQueueTaskStatus::CONSUMED; return AtomicNotificationQueueTaskStatus::CONSUMED;
} }
template <
typename Task,
typename Consumer,
typename = std::enable_if_t<std::is_same<
invoke_result_t<Consumer, Task&&, std::shared_ptr<RequestContext>&&>,
void>::value>,
typename = void,
typename = void,
typename = void>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer&& consumer,
Task&& task,
std::shared_ptr<RequestContext>&& rctx) {
consumer(
std::forward<Task>(task),
std::forward<std::shared_ptr<RequestContext>>(rctx));
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
} // namespace detail } // namespace detail
template <typename Task> template <typename Task>
...@@ -278,10 +318,11 @@ bool AtomicNotificationQueue<Task>::drive(Consumer&& consumer) { ...@@ -278,10 +318,11 @@ bool AtomicNotificationQueue<Task>::drive(Consumer&& consumer) {
std::memory_order_relaxed); std::memory_order_relaxed);
{ {
auto& curNode = queue_.front(); auto& curNode = queue_.front();
RequestContextScopeGuard rcsg(std::move(curNode.rctx));
AtomicNotificationQueueTaskStatus consumeTaskStatus = AtomicNotificationQueueTaskStatus consumeTaskStatus =
detail::invokeConsumerWithTask( detail::invokeConsumerWithTask(
std::forward<Consumer>(consumer), std::move(curNode.task)); std::forward<Consumer>(consumer),
std::move(curNode.task),
std::move(curNode.rctx));
if (consumeTaskStatus == AtomicNotificationQueueTaskStatus::CONSUMED) { if (consumeTaskStatus == AtomicNotificationQueueTaskStatus::CONSUMED) {
++numConsumed; ++numConsumed;
} }
......
...@@ -63,6 +63,16 @@ class AtomicNotificationQueue { ...@@ -63,6 +63,16 @@ class AtomicNotificationQueue {
/* /*
* Executes one round of tasks. Returns true iff tasks were run. * Executes one round of tasks. Returns true iff tasks were run.
* Can be called from consumer thread only. * Can be called from consumer thread only.
*
* `Consumer::operator()` must accept `Task&&` as its first parameter.
* It may also optionally accept `std::shared_ptr<folly::RequestContext>&&` as
* its second parameter, in which case it must manage `folly::RequestContext`
* for the consumed task.
*
* Consumer::operator() can optionally return
* AtomicNotificationQueueTaskStatus to indicate if the provided task should
* be considered consumed or discarded. Discarded tasks are not counted
* towards `maxReadAtOnce`.
*/ */
template <typename Consumer> template <typename Consumer>
bool drive(Consumer&& consumer); bool drive(Consumer&& consumer);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment