Commit 7c9f69f9 authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

python event loop consumes one task at a time from C++ notification queue

Summary:
Currently, when C++ threads communicate with python's event loop, they place
the items on the queue, notify the event loop, and when even loops starts
running, they would consume all items from this queue at once.
The downside of this approach is that we are unable to deprioritize the
upstream tasks to yield to internal python tasks. However, such ability to yield
is fundamental to how Thrift approaches load shedding under overload.
In this diff we change the approach to take only a single item from the queue at a
time. Notice that if queue is not emptied, the consumer will notify the event loop to
consume from this queue on the next iteration of the loop.

Reviewed By: andriigrynenko

Differential Revision: D26163397

fbshipit-source-id: 48cd6cb57c48ea67dce4eb5f0de6db5b0feb75ac
parent 19a84207
......@@ -229,7 +229,7 @@ class NotificationQueue {
}
template <typename F>
void consumeUntilDrained(F&& foreach);
void consume(F&& f);
private:
NotificationQueue& queue_;
......@@ -838,30 +838,27 @@ bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
template <typename MessageT>
template <typename F>
void NotificationQueue<MessageT>::SimpleConsumer::consumeUntilDrained(
F&& foreach) {
void NotificationQueue<MessageT>::SimpleConsumer::consume(F&& foreach) {
SCOPE_EXIT { queue_.syncSignalAndQueue(); };
queue_.checkPid();
while (true) {
std::unique_ptr<Node> data;
{
folly::SpinLockGuard g(queue_.spinlock_);
if (UNLIKELY(queue_.queue_.empty())) {
return;
}
std::unique_ptr<Node> data;
{
folly::SpinLockGuard g(queue_.spinlock_);
data.reset(&queue_.queue_.front());
queue_.queue_.pop_front();
if (UNLIKELY(queue_.queue_.empty())) {
return;
}
RequestContextScopeGuard rctx(std::move(data->ctx_));
foreach(std::move(data->msg_));
// Make sure message destructor is called with the correct RequestContext.
data.reset();
data.reset(&queue_.queue_.front());
queue_.queue_.pop_front();
}
RequestContextScopeGuard rctx(std::move(data->ctx_));
foreach(std::move(data->msg_));
// Make sure message destructor is called with the correct RequestContext.
data.reset();
}
/**
......
......@@ -48,7 +48,7 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
int fileno() const { return consumer_.getFd(); }
void drive() noexcept override {
consumer_.consumeUntilDrained([](Func&& func) {
consumer_.consume([](Func&& func) {
if (FOLLY_DETAIL_PY_ISFINALIZING()) {
// if Python is finalizing calling scheduled functions MAY segfault.
// any code that could have been called is now inconsequential.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment