Commit 42b60e0e authored by Srivatsan Ramesh's avatar Srivatsan Ramesh Committed by Facebook GitHub Bot

Extending putMessage() to take multiple arguments

Summary: putMessage() function takes variable number of arguments required to construct the queue node.

Differential Revision: D26785602

fbshipit-source-id: 0a9bc711bf75a83397e6de3764c41d316ef64d89
parent e7c48a6e
...@@ -100,9 +100,9 @@ AtomicNotificationQueue<Task>::AtomicQueue::~AtomicQueue() { ...@@ -100,9 +100,9 @@ AtomicNotificationQueue<Task>::AtomicQueue::~AtomicQueue() {
} }
template <typename Task> template <typename Task>
template <typename T> template <typename... Args>
bool AtomicNotificationQueue<Task>::AtomicQueue::push(T&& value) { bool AtomicNotificationQueue<Task>::AtomicQueue::push(Args&&... args) {
std::unique_ptr<Node> node(new Node(std::forward<T>(value))); std::unique_ptr<Node> node(new Node(std::forward<Args>(args)...));
auto head = head_.load(std::memory_order_relaxed); auto head = head_.load(std::memory_order_relaxed);
while (true) { while (true) {
node->next = node->next =
...@@ -191,16 +191,15 @@ bool AtomicNotificationQueue<Task>::arm() { ...@@ -191,16 +191,15 @@ bool AtomicNotificationQueue<Task>::arm() {
} }
template <typename Task> template <typename Task>
template <typename T> template <typename... Args>
bool AtomicNotificationQueue<Task>::push(T&& task) { bool AtomicNotificationQueue<Task>::push(Args&&... args) {
pushCount_.fetch_add(1, std::memory_order_relaxed); pushCount_.fetch_add(1, std::memory_order_relaxed);
return atomicQueue_.push(std::forward<T>(task)); return atomicQueue_.push(std::forward<Args>(args)...);
} }
template <typename Task> template <typename Task>
template <typename T>
typename AtomicNotificationQueue<Task>::TryPushResult typename AtomicNotificationQueue<Task>::TryPushResult
AtomicNotificationQueue<Task>::tryPush(T&& task, uint32_t maxSize) { AtomicNotificationQueue<Task>::tryPush(Task&& task, uint32_t maxSize) {
auto pushed = pushCount_.load(std::memory_order_relaxed); auto pushed = pushCount_.load(std::memory_order_relaxed);
while (true) { while (true) {
auto executed = taskExecuteCount_.load(std::memory_order_relaxed); auto executed = taskExecuteCount_.load(std::memory_order_relaxed);
...@@ -215,8 +214,7 @@ AtomicNotificationQueue<Task>::tryPush(T&& task, uint32_t maxSize) { ...@@ -215,8 +214,7 @@ AtomicNotificationQueue<Task>::tryPush(T&& task, uint32_t maxSize) {
break; break;
} }
} }
return atomicQueue_.push(std::forward<T>(task)) return atomicQueue_.push(std::move(task)) ? TryPushResult::SUCCESS_AND_ARMED
? TryPushResult::SUCCESS_AND_ARMED
: TryPushResult::SUCCESS; : TryPushResult::SUCCESS;
} }
......
...@@ -84,8 +84,8 @@ class AtomicNotificationQueue { ...@@ -84,8 +84,8 @@ class AtomicNotificationQueue {
* Returns true iff the queue was armed, in which case * Returns true iff the queue was armed, in which case
* producers are expected to notify consumer thread. * producers are expected to notify consumer thread.
*/ */
template <typename T> template <typename... Args>
bool push(T&& task); bool push(Args&&... args);
/* /*
* Attempts adding a task into the queue. * Attempts adding a task into the queue.
...@@ -94,8 +94,7 @@ class AtomicNotificationQueue { ...@@ -94,8 +94,7 @@ class AtomicNotificationQueue {
* consumer iff SUCCESS_AND_ARMED is returned. * consumer iff SUCCESS_AND_ARMED is returned.
*/ */
enum class TryPushResult { FAILED_LIMIT_REACHED, SUCCESS, SUCCESS_AND_ARMED }; enum class TryPushResult { FAILED_LIMIT_REACHED, SUCCESS, SUCCESS_AND_ARMED };
template <typename T> TryPushResult tryPush(Task&& task, uint32_t maxSize);
TryPushResult tryPush(T&& task, uint32_t maxSize);
private: private:
struct Node { struct Node {
...@@ -105,8 +104,8 @@ class AtomicNotificationQueue { ...@@ -105,8 +104,8 @@ class AtomicNotificationQueue {
private: private:
friend class AtomicNotificationQueue; friend class AtomicNotificationQueue;
template <typename T> template <typename... Args>
explicit Node(T&& t) : task(std::forward<T>(t)) {} explicit Node(Args&&... args) : task(std::forward<Args>(args)...) {}
Node* next{}; Node* next{};
}; };
...@@ -187,8 +186,8 @@ class AtomicNotificationQueue { ...@@ -187,8 +186,8 @@ class AtomicNotificationQueue {
* Pushes a task into the queue. Returns true iff the queue was armed. * Pushes a task into the queue. Returns true iff the queue was armed.
* Can be called from any thread. * Can be called from any thread.
*/ */
template <typename T> template <typename... Args>
bool push(T&& value); bool push(Args&&... args);
/* /*
* Returns true if the queue has tasks. * Returns true if the queue has tasks.
......
...@@ -123,18 +123,18 @@ void EventBaseAtomicNotificationQueue<Task, Consumer>::drain() { ...@@ -123,18 +123,18 @@ void EventBaseAtomicNotificationQueue<Task, Consumer>::drain() {
} }
template <typename Task, typename Consumer> template <typename Task, typename Consumer>
template <typename T> template <typename... Args>
void EventBaseAtomicNotificationQueue<Task, Consumer>::putMessage(T&& task) { void EventBaseAtomicNotificationQueue<Task, Consumer>::putMessage(
if (notificationQueue_.push(std::forward<T>(task))) { Args&&... args) {
if (notificationQueue_.push(std::forward<Args>(args)...)) {
notifyFd(); notifyFd();
} }
} }
template <typename Task, typename Consumer> template <typename Task, typename Consumer>
template <typename T>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::tryPutMessage( bool EventBaseAtomicNotificationQueue<Task, Consumer>::tryPutMessage(
T&& task, uint32_t maxSize) { Task&& task, uint32_t maxSize) {
auto result = notificationQueue_.tryPush(std::forward<T>(task), maxSize); auto result = notificationQueue_.tryPush(std::forward<Task>(task), maxSize);
if (result == if (result ==
AtomicNotificationQueue<Task>::TryPushResult::SUCCESS_AND_ARMED) { AtomicNotificationQueue<Task>::TryPushResult::SUCCESS_AND_ARMED) {
notifyFd(); notifyFd();
......
...@@ -82,16 +82,15 @@ class EventBaseAtomicNotificationQueue : private EventBase::LoopCallback, ...@@ -82,16 +82,15 @@ class EventBaseAtomicNotificationQueue : private EventBase::LoopCallback,
* Adds a task into the queue. * Adds a task into the queue.
* Can be called from any thread. * Can be called from any thread.
*/ */
template <typename T> template <typename... Args>
void putMessage(T&& task); void putMessage(Args&&... task);
/** /**
* Adds a task into the queue unless the max queue size is reached. * Adds a task into the queue unless the max queue size is reached.
* Returns true iff the task was queued. * Returns true iff the task was queued.
* Can be called from any thread. * Can be called from any thread.
*/ */
template <typename T> FOLLY_NODISCARD bool tryPutMessage(Task&& task, uint32_t maxSize);
FOLLY_NODISCARD bool tryPutMessage(T&& task, uint32_t maxSize);
/* /*
* Detaches the queue from an EventBase. * Detaches the queue from an EventBase.
......
...@@ -131,3 +131,49 @@ TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) { ...@@ -131,3 +131,49 @@ TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
EXPECT_EQ(data.size(), 1); EXPECT_EQ(data.size(), 1);
EXPECT_EQ(data.at(0), 15); EXPECT_EQ(data.at(0), 15);
} }
TEST(AtomicNotificationQueueTest, PutMessage) {
struct Data {
int datum;
bool isExpired;
explicit Data(int datum, bool isExpired)
: datum(datum), isExpired(isExpired) {}
bool operator==(const Data& data) const {
return datum == data.datum && isExpired == data.isExpired;
}
};
struct Consumer {
explicit Consumer(vector<Data>& data) : data(data) {}
void operator()(Data&& task) noexcept { data.push_back(task); }
vector<Data>& data;
};
vector<Data> expected =
{Data(10, false),
Data(20, true),
Data(-8, true),
Data(0, false)},
actual;
Consumer consumer{actual};
EventBaseAtomicNotificationQueue<Data, decltype(consumer)> queue{
std::move(consumer)};
queue.setMaxReadAtOnce(0);
EventBase eventBase;
queue.startConsuming(&eventBase);
for (auto& t : expected) {
queue.putMessage(t.datum, t.isExpired);
}
eventBase.loopOnce();
EXPECT_EQ(expected.size(), actual.size());
for (unsigned i = 0; i < expected.size(); ++i) {
EXPECT_EQ(expected[i], actual[i]);
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment