Commit d366ea8f authored by Pranjal Raihan's avatar Pranjal Raihan Committed by Facebook GitHub Bot

Allow AtomicNotificationQueue consumers to discard dequeued tasks

Summary:
If `Consumer::operator()` returns `AtomicNotificationQueueTaskStatus` then this value indicates if the consumer actually consumed the task, or if it should be discarded.

Enumerating all cases:

`operator()` return type is `void`:
*All tasks are considered consumed and counted towards `maxReadAtOnce`.*

`operator()` returns `AtomicNotificationQueueTaskStatus::CONSUMED`:
*Same as above.*

`operator()` returns `AtomicNotificationQueueTaskStatus::DISCARD`:
*Task does not count towards `maxReadAtOnce` and is silently discarded. The consumer is responsible for any cleanup other than calling the destructor.*

Reviewed By: andriigrynenko

Differential Revision: D24698221

fbshipit-source-id: 0da2364d18b67468addf737b67cae573c89b7e9c
parent 9f87fdb8
......@@ -324,14 +324,54 @@ void AtomicNotificationQueue<Task, Consumer>::startConsumingImpl(
}
}
namespace detail {
template <
typename Task,
typename Consumer,
typename = std::enable_if_t<std::is_same<
invoke_result_t<Consumer, Task&&>,
AtomicNotificationQueueTaskStatus>::value>>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer& consumer,
Task&& task) {
return consumer(std::forward<Task>(task));
}
template <
typename Task,
typename Consumer,
typename = std::enable_if_t<
std::is_same<invoke_result_t<Consumer, Task&&>, void>::value>,
typename = void>
AtomicNotificationQueueTaskStatus invokeConsumerWithTask(
Consumer& consumer,
Task&& task) {
consumer(std::forward<Task>(task));
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
} // namespace detail
template <typename Task, typename Consumer>
bool AtomicNotificationQueue<Task, Consumer>::drive() {
Queue nextQueue;
// Since we cannot know if a task will be discarded before trying to execute
// it, this check may cause this function to return early. That is, even
// though:
// 1. numConsumed < maxReadAtOnce_
// 2. atomicQueue_ is not empty
// This is not an issue in practice because these tasks will be executed in
// the next round.
//
// In short, if `size() > maxReadAtOnce_`:
// * at least maxReadAtOnce_ tasks will be "processed"
// * at most maxReadAtOnce_ tasks will be "executed" (rest being discarded)
if (maxReadAtOnce_ == 0 || queue_.size() < maxReadAtOnce_) {
nextQueue = atomicQueue_.getTasks();
}
int32_t i;
for (i = 0; maxReadAtOnce_ == 0 || i < maxReadAtOnce_; ++i) {
const bool wasAnyProcessed = !queue_.empty() || !nextQueue.empty();
for (int32_t numConsumed = 0;
maxReadAtOnce_ == 0 || numConsumed < maxReadAtOnce_;) {
if (queue_.empty()) {
queue_ = std::move(nextQueue);
if (queue_.empty()) {
......@@ -346,11 +386,15 @@ bool AtomicNotificationQueue<Task, Consumer>::drive() {
{
auto& curNode = queue_.front();
RequestContextScopeGuard rcsg(std::move(curNode.rctx));
consumer_(std::move(curNode.task));
AtomicNotificationQueueTaskStatus consumeTaskStatus =
detail::invokeConsumerWithTask(consumer_, std::move(curNode.task));
if (consumeTaskStatus == AtomicNotificationQueueTaskStatus::CONSUMED) {
++numConsumed;
}
}
queue_.pop();
}
return i > 0;
return wasAnyProcessed;
}
template <typename Task, typename Consumer>
......
......@@ -278,7 +278,7 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
void drainFd();
/*
* Executes one round of tasks. Returns true iff tasks were run.
* Executes one round of tasks. Returns true iff tasks were processed.
* Can be called from consumer thread only.
*/
bool drive();
......@@ -334,6 +334,19 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
Consumer consumer_;
};
/**
* Consumer::operator() can optionally return AtomicNotificationQueueTaskStatus
* to indicate if the provided task should be considered consumed or
* discarded. Discarded tasks are not counted towards maxReadAtOnce_.
*/
enum class AtomicNotificationQueueTaskStatus : bool {
// The dequeued task was consumed and should be counted as such
CONSUMED = true,
// The dequeued task should be discarded and the queue not count it as
// consumed
DISCARD = false
};
} // namespace folly
#include <folly/io/async/AtomicNotificationQueue-inl.h>
......@@ -15,6 +15,7 @@
*/
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/EventBase.h>
#include <folly/portability/GTest.h>
#include <functional>
......@@ -63,3 +64,68 @@ TEST(AtomicNotificationQueueTest, TryPutMessage) {
EXPECT_TRUE(queue.tryPutMessage(0, kMaxSize));
EXPECT_EQ(queue.size(), 1);
}
TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
struct TaskWithExpiry {
int datum;
bool isExpired;
};
struct Consumer {
explicit Consumer(std::vector<int>& data) : data(data) {}
AtomicNotificationQueueTaskStatus operator()(
TaskWithExpiry&& task) noexcept {
if (task.isExpired) {
return AtomicNotificationQueueTaskStatus::DISCARD;
}
data.push_back(task.datum);
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
vector<int>& data;
};
vector<int> data;
Consumer consumer{data};
AtomicNotificationQueue<TaskWithExpiry, Consumer> queue{std::move(consumer)};
queue.setMaxReadAtOnce(10);
vector<TaskWithExpiry> tasks = {
{0, false},
{1, true},
{2, true},
{3, false},
{4, false},
{5, false},
{6, false},
{7, true},
{8, false},
{9, false},
{10, false},
{11, false},
{12, true},
{13, false},
{14, true},
{15, false},
};
EventBase eventBase;
queue.startConsuming(&eventBase);
for (auto& t : tasks) {
queue.putMessage(t);
}
eventBase.loopOnce();
vector<int> expectedMessages = {0, 3, 4, 5, 6, 8, 9, 10, 11, 13};
EXPECT_EQ(data.size(), expectedMessages.size());
for (unsigned i = 0; i < expectedMessages.size(); ++i) {
EXPECT_EQ(data.at(i), expectedMessages[i]);
}
data.clear();
eventBase.loopOnce();
EXPECT_EQ(data.size(), 1);
EXPECT_EQ(data.at(0), 15);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment