Commit cf214a5b authored by Dave Watson's avatar Dave Watson Committed by Anton Likhtarov

Remove extraneous syscalls if NotificationQueue size() > 1

Currently notification queue does 2 syscalls per item: one read, one write.  We only need the eventfd to notify to wake up the thread, so instead, if the thread is already awake, don't bother writing to the fd.

Benchmark shows that when the queue size() > 1, this is ~4x faster.

Note that this might be unfair if there are multiple consumers: I could imagine a situation where one thread eats all the wakeups written to the fd, so only one thread is actually working.  However, multiple consumers is a bad idea anyway, and I'd consider removing it entirely:  If the same fd is in multiple epoll() loops, _all_ epolls will wake up, resulting in a thundering herd problem.  I don't see any multiConsumer cases in fbcode

Using EFD_SEMAPHORE or not doesn't seem to matter, since hopefully we're only writing 1 wakeup per thread - and it wouldn't work at all for multiConsumer case.

Test Plan:
fbconfig thrift/lib/cpp/test:TNotificationQueueTest; fbamke runtests
fbconfig common/concurrency:QueueBenchmark
fbmake opt
QueueBenchmark --bm_min_iters=10000

Reviewed By:

Subscribers: doug, folly@lists, fbcode-common-diffs@lists, alandau, bmatheny, haijunz

FB internal diff: D1272872

Tasks: 2802758
parent 5a5aee33
......@@ -27,6 +27,7 @@
#include "folly/io/async/Request.h"
#include "folly/Likely.h"
#include "folly/SmallLocks.h"
#include "folly/ScopeGuard.h"
#include <glog/logging.h>
#include <deque>
......@@ -141,12 +142,23 @@ class NotificationQueue {
virtual void handlerReady(uint16_t events) noexcept;
void setActive(bool active) {
if (!active_ && active) {
} else if (active_ && !active) {
active_ = active;
void init(EventBase* eventBase, NotificationQueue* queue);
NotificationQueue* queue_;
bool* destroyedFlagPtr_;
uint32_t maxReadAtOnce_;
EventBase* base_;
bool active_{false};
enum class FdType {
......@@ -320,18 +332,12 @@ class NotificationQueue {
bool tryConsume(MessageT& result) {
if (!tryConsumeEvent()) {
return false;
try {
folly::MSLGuard g(spinlock_);
// This shouldn't happen normally. See the comments in
// Consumer::handlerReady() for more details.
if (UNLIKELY(queue_.empty())) {
LOG(ERROR) << "found empty queue after signalled event";
return false;
......@@ -446,30 +452,44 @@ class NotificationQueue {
bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
bool signal = false;
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
// We only need to signal an event if not all consumers are
// awake.
if (numActiveConsumers_ < numConsumers_) {
signal = true;
if (signal) {
return true;
bool putMessageImpl(
const MessageT& message, size_t maxSize, bool throws=true) {
bool signal = false;
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
if (numActiveConsumers_ < numConsumers_) {
signal = true;
queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
if (signal) {
return true;
......@@ -477,6 +497,7 @@ class NotificationQueue {
void putMessagesImpl(InputIteratorT first, InputIteratorT last,
std::input_iterator_tag) {
bool signal = false;
size_t numAdded = 0;
folly::MSLGuard g(spinlock_);
......@@ -485,8 +506,13 @@ class NotificationQueue {
if (numActiveConsumers_ < numConsumers_) {
signal = true;
if (signal) {
mutable folly::MicroSpinLock spinlock_;
......@@ -495,6 +521,8 @@ class NotificationQueue {
uint32_t advisoryMaxQueueSize_;
pid_t pid_;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
int numConsumers_{0};
std::atomic<int> numActiveConsumers_{0};
template<typename MessageT>
......@@ -512,28 +540,22 @@ template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
noexcept {
uint32_t numProcessed = 0;
bool firstRun = true;
SCOPE_EXIT { setActive(false); };
while (true) {
// Try to decrement the eventfd.
// We decrement the eventfd before checking the queue, and only pop a
// message off the queue if we read from the eventfd.
// Reading the eventfd first allows us to not have to hold the spinlock
// while accessing the eventfd. If we popped from the queue first, we
// would have to hold the lock while reading from or writing to the
// eventfd. (Multiple consumers may be woken up from a single eventfd
// notification. If we popped from the queue first, we could end up
// popping a message from the queue before the eventfd has been notified by
// the producer, unless the consumer and producer both held the spinlock
// around the entire operation.)
if (!queue_->tryConsumeEvent()) {
// no message available right now
// The eventfd is only used to wake up the consumer - there may or
// may not actually be an event available (another consumer may
// have read it). We don't really care, we only care about
// emptying the queue.
if (firstRun) {
firstRun = false;
// Now pop the message off of the queue.
// We successfully consumed the eventfd notification.
// There should be a message available for us to consume.
// We have to manually acquire and release the spinlock here, rather than
// using SpinLockHolder since the MessageT has to be constructed while
......@@ -545,22 +567,8 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
bool locked = true;
try {
// The eventfd is incremented once for every message, and only
// decremented when a message is popped off. There should always be a
// message here to read.
if (UNLIKELY(queue_->queue_.empty())) {
// Unfortunately we have seen this happen in practice if a user forks
// the process, and then the child tries to send a message to a
// NotificationQueue being monitored by a thread in the parent.
// The child can signal the parent via the eventfd, but won't have been
// able to put anything on the parent's queue since it has a separate
// address space.
// This is a bug in the sender's code. putMessagesImpl() should cause
// the sender to crash now before trying to send a message from the
// wrong process. However, just in case let's handle this case in the
// consumer without crashing.
LOG(ERROR) << "found empty queue after signalled event";
// If there is no message, we've reached the end of the queue, return.
......@@ -577,6 +585,9 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
// We use this as an optimization to see if we should bother trying to
// loop again and read another message after invoking this callback.
bool wasEmpty = queue_->queue_.empty();
if (wasEmpty) {
// Now unlock the spinlock before we invoke the callback.
......@@ -604,6 +615,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
// If we have hit maxReadAtOnce_, we are done.
if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
......@@ -653,6 +665,13 @@ void NotificationQueue<MessageT>::Consumer::init(
base_ = eventBase;
queue_ = queue;
folly::MSLGuard g(queue_->spinlock_);
if (queue_->eventfd_ >= 0) {
initHandler(eventBase, queue_->eventfd_);
} else {
......@@ -667,6 +686,12 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
folly::MSLGuard g(queue_->spinlock_);
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment