Commit db4f831f authored by Miroslav Crnic's avatar Miroslav Crnic Committed by Facebook Github Bot

Replace queue with UMPSCQueue and decouple queueing from scheduling

Summary:
Reduced contention with removing connection between queue_ and schedule_ access.
Replaced std::queue with unbounded multiple producer single consumer queue.

Reviewed By: yfeldblum

Differential Revision: D10345872

fbshipit-source-id: 3307d908d6e4826eab4d981447c8771021bfe580
parent 6687bf85
...@@ -56,36 +56,24 @@ void SerialExecutor::keepAliveRelease() { ...@@ -56,36 +56,24 @@ void SerialExecutor::keepAliveRelease() {
} }
void SerialExecutor::add(Func func) { void SerialExecutor::add(Func func) {
{ queue_.enqueue(std::move(func));
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(func));
}
parent_->add([keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }); parent_->add([keepAlive = getKeepAliveToken(this)] { keepAlive->run(); });
} }
void SerialExecutor::addWithPriority(Func func, int8_t priority) { void SerialExecutor::addWithPriority(Func func, int8_t priority) {
{ queue_.enqueue(std::move(func));
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(func));
}
parent_->addWithPriority( parent_->addWithPriority(
[keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }, priority); [keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }, priority);
} }
void SerialExecutor::run() { void SerialExecutor::run() {
std::unique_lock<std::mutex> lock(mutex_); if (scheduled_.fetch_add(1, std::memory_order_relaxed) > 0) {
++scheduled_;
if (scheduled_ > 1) {
return; return;
} }
do { do {
DCHECK(!queue_.empty()); Func func;
Func func = std::move(queue_.front()); queue_.dequeue(func);
queue_.pop();
lock.unlock();
try { try {
func(); func();
...@@ -97,13 +85,7 @@ void SerialExecutor::run() { ...@@ -97,13 +85,7 @@ void SerialExecutor::run() {
"object"; "object";
} }
// Destroy the function (and the data it captures) before we acquire the } while (scheduled_.fetch_sub(1, std::memory_order_relaxed) > 1);
// lock again.
func = {};
lock.lock();
--scheduled_;
} while (scheduled_);
} }
} // namespace folly } // namespace folly
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue>
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/executors/GlobalExecutor.h> #include <folly/executors/GlobalExecutor.h>
#include <folly/executors/SequencedExecutor.h> #include <folly/executors/SequencedExecutor.h>
...@@ -111,9 +111,12 @@ class SerialExecutor : public SequencedExecutor { ...@@ -111,9 +111,12 @@ class SerialExecutor : public SequencedExecutor {
void run(); void run();
KeepAlive<Executor> parent_; KeepAlive<Executor> parent_;
std::mutex mutex_; std::atomic<std::size_t> scheduled_{0};
std::size_t scheduled_{0}; /**
std::queue<Func> queue_; * Unbounded multi producer single consumer queue where consumers don't block
* on dequeue.
*/
folly::UnboundedQueue<Func, false, true, false> queue_;
std::atomic<ssize_t> keepAliveCounter_{1}; std::atomic<ssize_t> keepAliveCounter_{1};
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment