Commit dcddc5c0 authored by Pranjal Raihan's avatar Pranjal Raihan Committed by Facebook GitHub Bot

Add ability to bound AtomicNotificationQueue size

Summary:
Allowing a bounded `AtomicNotificationQueue` is helpful to migrate `AsyncSocketServer` over from `NotificationQueue`.

`AtomicNotificationQueue::tryPutMessage` is the same as `putMessage` but also accepts a `maxSize`. If the queue size is >= `maxSize`, the message will not be queued. The user of the queue is responsible maintaining state regarding the max size of the queue.

Reviewed By: andriigrynenko

Differential Revision: D24698219

fbshipit-source-id: 1e3d1dc0073c65cd1803629f6a4a9b015e111b08
parent e5bb59b3
...@@ -104,8 +104,6 @@ AtomicNotificationQueue<Task, Consumer>::AtomicQueue::~AtomicQueue() { ...@@ -104,8 +104,6 @@ AtomicNotificationQueue<Task, Consumer>::AtomicQueue::~AtomicQueue() {
template <typename Task, typename Consumer> template <typename Task, typename Consumer>
template <typename T> template <typename T>
bool AtomicNotificationQueue<Task, Consumer>::AtomicQueue::push(T&& value) { bool AtomicNotificationQueue<Task, Consumer>::AtomicQueue::push(T&& value) {
pushCount_.fetch_add(1, std::memory_order_relaxed);
std::unique_ptr<typename Queue::Node> node( std::unique_ptr<typename Queue::Node> node(
new typename Queue::Node(std::forward<T>(value))); new typename Queue::Node(std::forward<T>(value)));
auto head = head_.load(std::memory_order_relaxed); auto head = head_.load(std::memory_order_relaxed);
...@@ -239,7 +237,7 @@ void AtomicNotificationQueue<Task, Consumer>::setMaxReadAtOnce( ...@@ -239,7 +237,7 @@ void AtomicNotificationQueue<Task, Consumer>::setMaxReadAtOnce(
template <typename Task, typename Consumer> template <typename Task, typename Consumer>
size_t AtomicNotificationQueue<Task, Consumer>::size() const { size_t AtomicNotificationQueue<Task, Consumer>::size() const {
auto queueSize = atomicQueue_.getPushCount() - auto queueSize = pushCount_.load(std::memory_order_relaxed) -
taskExecuteCount_.load(std::memory_order_relaxed); taskExecuteCount_.load(std::memory_order_relaxed);
return queueSize >= 0 ? queueSize : 0; return queueSize >= 0 ? queueSize : 0;
} }
...@@ -252,6 +250,36 @@ bool AtomicNotificationQueue<Task, Consumer>::empty() const { ...@@ -252,6 +250,36 @@ bool AtomicNotificationQueue<Task, Consumer>::empty() const {
template <typename Task, typename Consumer> template <typename Task, typename Consumer>
template <typename T> template <typename T>
void AtomicNotificationQueue<Task, Consumer>::putMessage(T&& task) { void AtomicNotificationQueue<Task, Consumer>::putMessage(T&& task) {
pushCount_.fetch_add(1, std::memory_order_relaxed);
putMessageImpl(std::forward<T>(task));
}
template <typename Task, typename Consumer>
template <typename T>
bool AtomicNotificationQueue<Task, Consumer>::tryPutMessage(
T&& task,
uint32_t maxSize) {
auto pushed = pushCount_.load(std::memory_order_relaxed);
while (true) {
auto executed = taskExecuteCount_.load(std::memory_order_relaxed);
if (pushed - executed >= maxSize) {
return false;
}
if (pushCount_.compare_exchange_weak(
pushed,
pushed + 1,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
break;
}
}
putMessageImpl(std::forward<T>(task));
return true;
}
template <typename Task, typename Consumer>
template <typename T>
void AtomicNotificationQueue<Task, Consumer>::putMessageImpl(T&& task) {
if (atomicQueue_.push(std::forward<T>(task))) { if (atomicQueue_.push(std::forward<T>(task))) {
notifyFd(); notifyFd();
} }
......
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#include <folly/io/async/EventFDWrapper.h> #include <folly/io/async/EventFDWrapper.h>
#endif #endif
#include <type_traits>
namespace folly { namespace folly {
/** /**
...@@ -171,25 +173,21 @@ class AtomicNotificationQueue : private EventBase::LoopCallback, ...@@ -171,25 +173,21 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
return successfulArmCount_ - consumerDisarmCount_; return successfulArmCount_ - consumerDisarmCount_;
} }
/*
* Returns how many times push was called.
* Can be called from any thread.
*/
ssize_t getPushCount() const {
return pushCount_.load(std::memory_order_relaxed);
}
private: private:
alignas( alignas(
folly::cacheline_align_v) std::atomic<typename Queue::Node*> head_{}; folly::cacheline_align_v) std::atomic<typename Queue::Node*> head_{};
std::atomic<ssize_t> pushCount_{0};
alignas(folly::cacheline_align_v) ssize_t successfulArmCount_{0}; alignas(folly::cacheline_align_v) ssize_t successfulArmCount_{0};
ssize_t consumerDisarmCount_{0}; ssize_t consumerDisarmCount_{0};
static constexpr intptr_t kQueueArmedTag = 1; static constexpr intptr_t kQueueArmedTag = 1;
}; };
public: public:
explicit AtomicNotificationQueue(Consumer&& consumer = Consumer()); explicit AtomicNotificationQueue(Consumer&& consumer);
template <
typename C = Consumer,
typename = std::enable_if_t<std::is_default_constructible<C>::value>>
AtomicNotificationQueue() : AtomicNotificationQueue(Consumer()) {}
~AtomicNotificationQueue() override; ~AtomicNotificationQueue() override;
...@@ -218,6 +216,14 @@ class AtomicNotificationQueue : private EventBase::LoopCallback, ...@@ -218,6 +216,14 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
template <typename T> template <typename T>
void putMessage(T&& task); void putMessage(T&& task);
/**
* Adds a task into the queue unless the max queue size is reached.
* Returns true iff the task was queued.
* Can be called from any thread.
*/
template <typename T>
FOLLY_NODISCARD bool tryPutMessage(T&& task, uint32_t maxSize);
/* /*
* Detaches the queue from an EventBase. * Detaches the queue from an EventBase.
* Can be called from consumer thread only. * Can be called from consumer thread only.
...@@ -255,6 +261,12 @@ class AtomicNotificationQueue : private EventBase::LoopCallback, ...@@ -255,6 +261,12 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
void execute(); void execute();
private: private:
/*
* Adds a task to the queue without incrementing the push count.
*/
template <typename T>
void putMessageImpl(T&& task);
/* /*
* Write into the signal fd to wake up the consumer thread. * Write into the signal fd to wake up the consumer thread.
*/ */
...@@ -302,6 +314,7 @@ class AtomicNotificationQueue : private EventBase::LoopCallback, ...@@ -302,6 +314,7 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
[[noreturn]] FOLLY_NOINLINE void checkPidFail() const; [[noreturn]] FOLLY_NOINLINE void checkPidFail() const;
alignas(folly::cacheline_align_v) std::atomic<ssize_t> pushCount_{0};
AtomicQueue atomicQueue_; AtomicQueue atomicQueue_;
Queue queue_; Queue queue_;
std::atomic<ssize_t> taskExecuteCount_{0}; std::atomic<ssize_t> taskExecuteCount_{0};
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/portability/GTest.h>
#include <functional>
#include <utility>
#include <vector>
using namespace folly;
using namespace std;
template <typename Task>
struct AtomicNotificationQueueConsumer {
explicit AtomicNotificationQueueConsumer(vector<Task>& tasks)
: tasks(tasks) {}
void operator()(Task&& value) noexcept {
tasks.push_back(value);
if (fn) {
fn(std::move(value));
}
}
function<void(Task&&)> fn;
vector<Task>& tasks;
};
TEST(AtomicNotificationQueueTest, TryPutMessage) {
vector<int> data;
AtomicNotificationQueueConsumer<int> consumer{data};
AtomicNotificationQueue<int, decltype(consumer)> queue{std::move(consumer)};
constexpr uint32_t kMaxSize = 10;
for (auto i = 1; i <= 9; ++i) {
queue.putMessage(std::move(i));
}
EXPECT_TRUE(queue.tryPutMessage(10, kMaxSize));
EXPECT_EQ(queue.size(), 10);
EXPECT_FALSE(queue.tryPutMessage(11, kMaxSize));
EXPECT_EQ(queue.size(), 10);
queue.putMessage(11);
EXPECT_EQ(queue.size(), 11);
EXPECT_FALSE(queue.tryPutMessage(12, kMaxSize));
queue.drain();
EXPECT_TRUE(queue.tryPutMessage(0, kMaxSize));
EXPECT_EQ(queue.size(), 1);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment