Commit 3a1cb2bc authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

split AtomicNotificationQueue into pure queue and event base driven queue

Summary:
EventBaseAtomicNotificationQueue is now what used to be just AtomicNotificationQueue.
In the future, this allows reusing AtomicNotificationQueue with other types of executors.

(Note: this ignores all push blocking failures!)

Reviewed By: andriigrynenko

Differential Revision: D24878180

fbshipit-source-id: d719e31c7194cb2dd6d57a103814581b802a9250
parent f8c8b74f
......@@ -21,9 +21,9 @@
#include <folly/io/ShutdownSocketSet.h>
#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/io/async/EventHandler.h>
#include <folly/net/NetOps.h>
#include <folly/net/NetworkSocket.h>
......@@ -770,7 +770,7 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
};
public:
using Queue = AtomicNotificationQueue<QueueMessage, Consumer>;
using Queue = EventBaseAtomicNotificationQueue<QueueMessage, Consumer>;
explicit RemoteAcceptor(
AcceptCallback* callback,
......
......@@ -16,34 +16,77 @@
#pragma once
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/Unistd.h>
#if defined(__linux__) && !defined(__ANDROID__)
#define FOLLY_HAVE_EVENTFD
#include <folly/io/async/EventFDWrapper.h>
#endif
#include <folly/io/async/Request.h>
#include <folly/lang/Align.h>
#include <type_traits>
namespace folly {
/**
* A producer-consumer queue for passing tasks to EventBase thread.
* A producer-consumer queue for passing tasks to consumer thread.
*
* Tasks can be added to the queue from any thread. A single EventBase
* thread can be listening to the queue. Tasks are processed in FIFO order.
* Users of this class are expected to implement functionality to wakeup
* consumer thread.
*/
template <typename Task, typename Consumer>
class AtomicNotificationQueue : private EventBase::LoopCallback,
private EventHandler {
static_assert(
noexcept(std::declval<Consumer>()(std::declval<Task&&>())),
"Consumer::operator()(Task&&) should be noexcept.");
class AtomicQueue;
template <typename Task>
class AtomicNotificationQueue {
public:
explicit AtomicNotificationQueue();
~AtomicNotificationQueue();
/*
* Set the maximum number of tasks processed in a single round.
* Can be called from consumer thread only.
*/
void setMaxReadAtOnce(uint32_t maxAtOnce);
/*
* Returns the number of tasks in the queue.
* Can be called from any thread.
*/
size_t size() const;
/*
* Checks if the queue is empty.
* Can be called from consumer thread only.
*/
bool empty() const;
/*
* Tries to arm the queue.
* 1) If the queue was empty: the queue becomes armed and true is returned.
* 2) Otherwise returns false.
* Can be called from consumer thread only.
*/
bool arm();
/*
* Executes one round of tasks. Returns true iff tasks were run.
* Can be called from consumer thread only.
*/
template <typename Consumer>
bool drive(Consumer&& consumer);
/*
* Adds a task into the queue.
* Can be called from any thread.
* Returns true iff the queue was armed, in which case
* producers are expected to notify consumer thread.
*/
template <typename T>
bool push(T&& task);
/*
* Attempts adding a task into the queue.
* Can be called from any thread.
* Similarly to push(), producers are expected to notify
* consumer iff SUCCESS_AND_ARMED is returned.
*/
enum class TryPushResult { FAILED_LIMIT_REACHED, SUCCESS, SUCCESS_AND_ARMED };
template <typename T>
TryPushResult tryPush(T&& task, uint32_t maxSize);
private:
struct Node {
Task task;
std::shared_ptr<RequestContext> rctx{RequestContext::saveContext()};
......@@ -57,6 +100,7 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
Node* next{};
};
class AtomicQueue;
class Queue {
public:
Queue() {}
......@@ -65,13 +109,9 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
~Queue();
bool empty() const;
ssize_t size() const;
Node& front();
void pop();
void clear();
private:
......@@ -120,7 +160,7 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
* getTasks() can be called in any state. It always transitions the queue into
* Empty.
*
* arm() can be can't be called if the queue is already in Armed state:
* arm() can't be called if the queue is already in Armed state:
* When Empty - arm() returns an empty queue and transitions into Armed
* When Non-Empty: equivalent to getTasks()
*
......@@ -161,175 +201,17 @@ class AtomicNotificationQueue : private EventBase::LoopCallback,
*/
Queue arm();
/*
* Returns how many armed push happened.
* Can be called from consumer thread only. And only when queue state is
* Empty.
*/
ssize_t getArmedPushCount() const {
DCHECK(!head_) << "AtomicQueue state has to be Empty";
DCHECK(successfulArmCount_ >= consumerDisarmCount_);
return successfulArmCount_ - consumerDisarmCount_;
}
private:
alignas(folly::cacheline_align_v) std::atomic<Node*> head_{};
alignas(folly::cacheline_align_v) ssize_t successfulArmCount_{0};
ssize_t consumerDisarmCount_{0};
static constexpr intptr_t kQueueArmedTag = 1;
};
public:
explicit AtomicNotificationQueue(Consumer&& consumer);
template <
typename C = Consumer,
typename = std::enable_if_t<std::is_default_constructible<C>::value>>
AtomicNotificationQueue() : AtomicNotificationQueue(Consumer()) {}
~AtomicNotificationQueue() override;
/*
* Set the maximum number of tasks processed in a single round.
* Can be called from consumer thread only.
*/
void setMaxReadAtOnce(uint32_t maxAtOnce);
/*
* Returns the number of tasks in the queue.
* Can be called from any thread.
*/
size_t size() const;
/*
* Checks if the queue is empty.
* Can be called from consumer thread only.
*/
bool empty() const;
/*
* Adds a task into the queue.
* Can be called from any thread.
*/
template <typename T>
void putMessage(T&& task);
/**
* Adds a task into the queue unless the max queue size is reached.
* Returns true iff the task was queued.
* Can be called from any thread.
*/
template <typename T>
FOLLY_NODISCARD bool tryPutMessage(T&& task, uint32_t maxSize);
/*
* Detaches the queue from an EventBase.
* Can be called from consumer thread only.
*/
void stopConsuming();
/*
* Attaches the queue to an EventBase.
* Can be called from consumer thread only.
*/
void startConsuming(EventBase* evb);
/*
* Attaches the queue to an EventBase.
* Can be called from consumer thread only.
*
* Unlike startConsuming, startConsumingInternal registers this queue as
* an internal event. This means that this event may be skipped if
* EventBase doesn't have any other registered events. This generally should
* only be used for queues managed by an EventBase itself.
*/
void startConsumingInternal(EventBase* evb);
/*
* Executes all tasks until the queue is empty.
* Can be called from consumer thread only.
*/
void drain();
/*
* Executes one round of tasks. Re-activates the event if more tasks are
* available.
* Can be called from consumer thread only.
*/
void execute();
private:
/*
* Adds a task to the queue without incrementing the push count.
*/
template <typename T>
void putMessageImpl(T&& task);
/*
* Write into the signal fd to wake up the consumer thread.
*/
void notifyFd();
/*
* Read all messages from the signal fd.
*/
void drainFd();
/*
* Executes one round of tasks. Returns true iff tasks were processed.
* Can be called from consumer thread only.
*/
bool drive();
/*
* Either arm the queue or reactivate the EventBase event.
* This has to be a loop callback because the event can't be activated from
* within the event callback. It also allows delayed re-arming the queue.
*/
void runLoopCallback() noexcept override;
void startConsumingImpl(EventBase* evb, bool internal);
void handlerReady(uint16_t) noexcept override;
void activateEvent();
/**
* Check that the AtomicNotificationQueue is being used from the correct
* process.
*
* If you create a AtomicNotificationQueue in one process, then fork, and try
* to send messages to the queue from the child process, you're going to have
* a bad time. Unfortunately users have (accidentally) run into this.
*
* Because we use an eventfd/pipe, the child process can actually signal the
* parent process that an event is ready. However, it can't put anything on
* the parent's queue, so the parent wakes up and finds an empty queue. This
* check ensures that we catch the problem in the misbehaving child process
* code, and crash before signalling the parent process.
*/
void checkPid() const;
[[noreturn]] FOLLY_NOINLINE void checkPidFail() const;
alignas(folly::cacheline_align_v) std::atomic<ssize_t> pushCount_{0};
AtomicQueue atomicQueue_;
Queue queue_;
std::atomic<ssize_t> taskExecuteCount_{0};
int32_t maxReadAtOnce_{10};
int eventfd_{-1};
int pipeFds_[2]{-1, -1}; // to fallback to on older/non-linux systems
/*
* If event is registered with the EventBase, this describes whether
* edge-triggered flag was set for it. For edge-triggered events we don't
* need to drain the fd to deactivate them.
*/
bool edgeTriggeredSet_{false};
EventBase* evb_{nullptr};
ssize_t writesObserved_{0};
ssize_t writesLocal_{0};
const pid_t pid_;
Consumer consumer_;
};
/**
......
......@@ -29,7 +29,7 @@
#include <folly/ExceptionString.h>
#include <folly/Memory.h>
#include <folly/String.h>
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/io/async/EventBaseBackendBase.h>
#include <folly/io/async/VirtualEventBase.h>
#include <folly/portability/Unistd.h>
......@@ -689,7 +689,8 @@ bool EventBase::runLoopCallbacks() {
void EventBase::initNotificationQueue() {
// Infinite size queue
queue_ = std::make_unique<AtomicNotificationQueue<Func, FuncRunner>>();
queue_ =
std::make_unique<EventBaseAtomicNotificationQueue<Func, FuncRunner>>();
// Mark this as an internal event, so event_base_loop() will return if
// there are no other events besides this one installed.
......
......@@ -56,7 +56,7 @@ class EventBaseBackendBase;
using Cob = Func; // defined in folly/Executor.h
template <typename Task, typename Consumer>
class AtomicNotificationQueue;
class EventBaseAtomicNotificationQueue;
template <typename MessageT>
class NotificationQueue;
......@@ -903,7 +903,7 @@ class EventBase : public TimeoutManager,
// A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread.
std::unique_ptr<AtomicNotificationQueue<Func, FuncRunner>> queue_;
std::unique_ptr<EventBaseAtomicNotificationQueue<Func, FuncRunner>> queue_;
ssize_t loopKeepAliveCount_{0};
std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
bool loopKeepAliveActive_{false};
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/FileUtil.h>
#include <folly/system/Pid.h>
namespace folly {
template <typename Task, typename Consumer>
EventBaseAtomicNotificationQueue<Task, Consumer>::
EventBaseAtomicNotificationQueue(Consumer&& consumer)
: pid_(get_cached_pid()),
notificationQueue_(),
consumer_(std::move(consumer)) {
#ifdef FOLLY_HAVE_EVENTFD
eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (eventfd_ == -1) {
if (errno == ENOSYS || errno == EINVAL) {
// eventfd not availalble
LOG(ERROR) << "failed to create eventfd for AtomicNotificationQueue: "
<< errno << ", falling back to pipe mode (is your kernel "
<< "> 2.6.30?)";
} else {
// some other error
folly::throwSystemError(
"Failed to create eventfd for AtomicNotificationQueue", errno);
}
}
#endif
if (eventfd_ == -1) {
if (pipe(pipeFds_)) {
folly::throwSystemError(
"Failed to create pipe for AtomicNotificationQueue", errno);
}
try {
// put both ends of the pipe into non-blocking mode
if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
folly::throwSystemError(
"failed to put AtomicNotificationQueue pipe read "
"endpoint into non-blocking mode",
errno);
}
if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
folly::throwSystemError(
"failed to put AtomicNotificationQueue pipe write "
"endpoint into non-blocking mode",
errno);
}
} catch (...) {
::close(pipeFds_[0]);
::close(pipeFds_[1]);
throw;
}
}
}
template <typename Task, typename Consumer>
EventBaseAtomicNotificationQueue<Task, Consumer>::
~EventBaseAtomicNotificationQueue() {
// discard pending tasks and disarm the queue
while (drive(
[](Task&&) { return AtomicNotificationQueueTaskStatus::DISCARD; })) {
}
// Don't drain fd in the child process.
if (pid_ == get_cached_pid()) {
// Wait till we observe all the writes before closing fds
while (writesObserved_ <
(successfulArmCount_ - consumerDisarmedCount_) + writesLocal_) {
drainFd();
}
DCHECK(
writesObserved_ ==
(successfulArmCount_ - consumerDisarmedCount_) + writesLocal_);
}
if (eventfd_ >= 0) {
::close(eventfd_);
eventfd_ = -1;
}
if (pipeFds_[0] >= 0) {
::close(pipeFds_[0]);
pipeFds_[0] = -1;
}
if (pipeFds_[1] >= 0) {
::close(pipeFds_[1]);
pipeFds_[1] = -1;
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::setMaxReadAtOnce(
uint32_t maxAtOnce) {
notificationQueue_.setMaxReadAtOnce(maxAtOnce);
}
template <typename Task, typename Consumer>
size_t EventBaseAtomicNotificationQueue<Task, Consumer>::size() const {
return notificationQueue_.size();
}
template <typename Task, typename Consumer>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::empty() const {
return notificationQueue_.empty();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::drain() {
while (drive(consumer_)) {
}
}
template <typename Task, typename Consumer>
template <typename T>
void EventBaseAtomicNotificationQueue<Task, Consumer>::putMessage(T&& task) {
if (notificationQueue_.push(std::forward<T>(task))) {
notifyFd();
}
}
template <typename Task, typename Consumer>
template <typename T>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::tryPutMessage(
T&& task,
uint32_t maxSize) {
auto result = notificationQueue_.tryPush(std::forward<T>(task), maxSize);
if (result ==
AtomicNotificationQueue<Task>::TryPushResult::SUCCESS_AND_ARMED) {
notifyFd();
}
return result !=
AtomicNotificationQueue<Task>::TryPushResult::FAILED_LIMIT_REACHED;
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::stopConsuming() {
evb_ = nullptr;
cancelLoopCallback();
unregisterHandler();
detachEventBase();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsuming(
EventBase* evb) {
startConsumingImpl(evb, false);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsumingInternal(
EventBase* evb) {
startConsumingImpl(evb, true);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::startConsumingImpl(
EventBase* evb,
bool internal) {
evb_ = evb;
initHandler(
evb_,
folly::NetworkSocket::fromFd(eventfd_ >= 0 ? eventfd_ : pipeFds_[0]));
auto registerHandlerResult = internal
? registerInternalHandler(READ | PERSIST)
: registerHandler(READ | PERSIST);
if (registerHandlerResult) {
edgeTriggeredSet_ = eventfd_ >= 0 && setEdgeTriggered();
++writesLocal_;
notifyFd();
} else {
edgeTriggeredSet_ = false;
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::notifyFd() {
checkPid();
ssize_t bytes_written = 0;
size_t bytes_expected = 0;
do {
if (eventfd_ >= 0) {
// eventfd(2) dictates that we must write a 64-bit integer
uint64_t signal = 1;
bytes_expected = sizeof(signal);
bytes_written = ::write(eventfd_, &signal, bytes_expected);
} else {
uint8_t signal = 1;
bytes_expected = sizeof(signal);
bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
}
} while (bytes_written == -1 && errno == EINTR);
if (bytes_written != ssize_t(bytes_expected)) {
folly::throwSystemError(
"failed to signal AtomicNotificationQueue after "
"write",
errno);
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::drainFd() {
checkPid();
uint64_t message = 0;
if (eventfd_ >= 0) {
auto result = readNoInt(eventfd_, &message, sizeof(message));
CHECK(result == sizeof(message) || errno == EAGAIN);
writesObserved_ += message;
} else {
ssize_t result;
while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) {
writesObserved_ += result;
}
CHECK(result == -1 && errno == EAGAIN);
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::
runLoopCallback() noexcept {
DCHECK(!armed_);
if (!notificationQueue_.arm()) {
activateEvent();
} else {
armed_ = true;
successfulArmCount_++;
}
}
template <typename Task, typename Consumer>
template <typename T>
bool EventBaseAtomicNotificationQueue<Task, Consumer>::drive(T&& consumer) {
auto wasEmpty = !notificationQueue_.drive(std::forward<T>(consumer));
if (wasEmpty && armed_) {
consumerDisarmedCount_++;
}
armed_ = false;
return !wasEmpty;
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::handlerReady(
uint16_t) noexcept {
execute();
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::execute() {
if (!edgeTriggeredSet_) {
drainFd();
}
drive(consumer_);
evb_->runInLoop(this, false, nullptr);
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::activateEvent() {
if (!EventHandler::activateEvent(0)) {
// Fallback for EventBase backends that don't support activateEvent
++writesLocal_;
notifyFd();
}
}
template <typename Task, typename Consumer>
void EventBaseAtomicNotificationQueue<Task, Consumer>::checkPid() const {
if (FOLLY_UNLIKELY(pid_ != get_cached_pid())) {
checkPidFail();
}
}
template <typename Task, typename Consumer>
[[noreturn]] FOLLY_NOINLINE void
EventBaseAtomicNotificationQueue<Task, Consumer>::checkPidFail() const {
folly::terminate_with<std::runtime_error>(
"Pid mismatch. Pid = " + folly::to<std::string>(get_cached_pid()) +
". Expecting " + folly::to<std::string>(pid_));
}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/Unistd.h>
#if defined(__linux__) && !defined(__ANDROID__)
#define FOLLY_HAVE_EVENTFD
#include <folly/io/async/EventFDWrapper.h>
#endif
namespace folly {
/**
* A producer-consumer queue for passing tasks to EventBase thread.
*
* Tasks can be added to the queue from any thread. A single EventBase
* thread can be listening to the queue. Tasks are processed in FIFO order.
*/
template <typename Task, typename Consumer>
class EventBaseAtomicNotificationQueue : private EventBase::LoopCallback,
private EventHandler {
static_assert(
noexcept(std::declval<Consumer>()(std::declval<Task&&>())),
"Consumer::operator()(Task&&) should be noexcept.");
public:
explicit EventBaseAtomicNotificationQueue(Consumer&& consumer);
template <
typename C = Consumer,
typename = std::enable_if_t<std::is_default_constructible<C>::value>>
EventBaseAtomicNotificationQueue()
: EventBaseAtomicNotificationQueue(Consumer()) {}
~EventBaseAtomicNotificationQueue() override;
/*
* Set the maximum number of tasks processed in a single round.
* Can be called from consumer thread only.
*/
void setMaxReadAtOnce(uint32_t maxAtOnce);
/*
* Returns the number of tasks in the queue.
* Can be called from any thread.
*/
size_t size() const;
/*
* Checks if the queue is empty.
* Can be called from consumer thread only.
*/
bool empty() const;
/*
* Executes all tasks until the queue is empty.
* Can be called from consumer thread only.
*/
void drain();
/*
* Adds a task into the queue.
* Can be called from any thread.
*/
template <typename T>
void putMessage(T&& task);
/**
* Adds a task into the queue unless the max queue size is reached.
* Returns true iff the task was queued.
* Can be called from any thread.
*/
template <typename T>
FOLLY_NODISCARD bool tryPutMessage(T&& task, uint32_t maxSize);
/*
* Detaches the queue from an EventBase.
* Can be called from consumer thread only.
*/
void stopConsuming();
/*
* Attaches the queue to an EventBase.
* Can be called from consumer thread only.
*/
void startConsuming(EventBase* evb);
/*
* Attaches the queue to an EventBase.
* Can be called from consumer thread only.
*
* Unlike startConsuming, startConsumingInternal registers this queue as
* an internal event. This means that this event may be skipped if
* EventBase doesn't have any other registered events. This generally should
* only be used for queues managed by an EventBase itself.
*/
void startConsumingInternal(EventBase* evb);
/*
* Executes one round of tasks. Re-activates the event if more tasks are
* available.
* Can be called from consumer thread only.
*/
void execute();
private:
/*
* Adds a task to the queue without incrementing the push count.
*/
template <typename T>
void putMessageImpl(T&& task);
template <typename T>
bool drive(T&& t);
/*
* Write into the signal fd to wake up the consumer thread.
*/
void notifyFd();
/*
* Read all messages from the signal fd.
*/
void drainFd();
/*
* Either arm the queue or reactivate the EventBase event.
* This has to be a loop callback because the event can't be activated from
* within the event callback. It also allows delayed re-arming the queue.
*/
void runLoopCallback() noexcept override;
void startConsumingImpl(EventBase* evb, bool internal);
void handlerReady(uint16_t) noexcept override;
void activateEvent();
/**
* Check that the AtomicNotificationQueue is being used from the correct
* process.
*
* If you create a AtomicNotificationQueue in one process, then fork, and try
* to send messages to the queue from the child process, you're going to have
* a bad time. Unfortunately users have (accidentally) run into this.
*
* Because we use an eventfd/pipe, the child process can actually signal the
* parent process that an event is ready. However, it can't put anything on
* the parent's queue, so the parent wakes up and finds an empty queue. This
* check ensures that we catch the problem in the misbehaving child process
* code, and crash before signalling the parent process.
*/
void checkPid() const;
[[noreturn]] FOLLY_NOINLINE void checkPidFail() const;
int eventfd_{-1};
int pipeFds_[2]{-1, -1}; // to fallback to on older/non-linux systems
/*
* If event is registered with the EventBase, this describes whether
* edge-triggered flag was set for it. For edge-triggered events we don't
* need to drain the fd to deactivate them.
*/
EventBase* evb_{nullptr};
const pid_t pid_;
AtomicNotificationQueue<Task> notificationQueue_;
Consumer consumer_;
ssize_t successfulArmCount_{0};
ssize_t consumerDisarmedCount_{0};
ssize_t writesObserved_{0};
ssize_t writesLocal_{0};
bool armed_{false};
bool edgeTriggeredSet_{false};
};
} // namespace folly
#include <folly/io/async/EventBaseAtomicNotificationQueue-inl.h>
......@@ -14,8 +14,8 @@
* limitations under the License.
*/
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/portability/GTest.h>
#include <functional>
......@@ -44,7 +44,8 @@ struct AtomicNotificationQueueConsumer {
TEST(AtomicNotificationQueueTest, TryPutMessage) {
vector<int> data;
AtomicNotificationQueueConsumer<int> consumer{data};
AtomicNotificationQueue<int, decltype(consumer)> queue{std::move(consumer)};
EventBaseAtomicNotificationQueue<int, decltype(consumer)> queue{
std::move(consumer)};
constexpr uint32_t kMaxSize = 10;
......@@ -86,7 +87,8 @@ TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
vector<int> data;
Consumer consumer{data};
AtomicNotificationQueue<TaskWithExpiry, Consumer> queue{std::move(consumer)};
EventBaseAtomicNotificationQueue<TaskWithExpiry, Consumer> queue{
std::move(consumer)};
queue.setMaxReadAtOnce(10);
vector<TaskWithExpiry> tasks = {
......
......@@ -18,8 +18,8 @@
#include <thread>
#include <folly/Benchmark.h>
#include <folly/io/async/AtomicNotificationQueue.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/synchronization/Baton.h>
......@@ -49,7 +49,7 @@ class MockConsumer : public NotificationQueue<Func>::Consumer {
struct AtomicNotificationQueueConsumerAdaptor {
void startConsuming(
EventBase* evb,
AtomicNotificationQueue<Func, FuncRunner>* queue) {
EventBaseAtomicNotificationQueue<Func, FuncRunner>* queue) {
queue->startConsuming(evb);
}
};
......@@ -152,7 +152,7 @@ void multiProducerMultiConsumerANQ(
size_t numConsumers) {
CHECK(numConsumers == 1);
multiProducerMultiConsumer<
AtomicNotificationQueue<Func, FuncRunner>,
EventBaseAtomicNotificationQueue<Func, FuncRunner>,
AtomicNotificationQueueConsumerAdaptor>(
iters, numProducers, numConsumers);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment