Commit f6db2cea authored by Sven Over's avatar Sven Over Committed by Facebook Github Bot

remove unnecessary memory allocation in SerialExecutor

Summary:
SerialExecutor used to have public constructors and could be
constructed on the heap or on the stack. In order to be able
to reference the shared stater later from the parent executor,
the task queue was implemented in a separate class, of which
the SerialExecutor would create one, managed by a `shared_ptr`.

With the recent move towards `folly::Executor::KeepAlive`, the
SerialExecutor itself always has to be constructed on the heap
(hence no public constructor anymore but factory functions).
However, that means that we can embed the task queue implementation
in SerialExecutor, using `KeepAlive` objects to access it from
the parent executor.

This change simplifies the implementation of SerialExecutor and
reduces the number of heap objects and thus memory allocations.

Reviewed By: andriigrynenko

Differential Revision: D7927260

fbshipit-source-id: d3b6a2376bfdf512a946f2f45c730ed68176a441
parent f185ea84
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#include <folly/executors/SerialExecutor.h> #include <folly/executors/SerialExecutor.h>
#include <mutex>
#include <queue>
#include <glog/logging.h> #include <glog/logging.h>
...@@ -25,56 +23,8 @@ ...@@ -25,56 +23,8 @@
namespace folly { namespace folly {
class SerialExecutor::TaskQueueImpl {
public:
void add(Func&& func) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(func));
}
void run() {
std::unique_lock<std::mutex> lock(mutex_);
++scheduled_;
if (scheduled_ > 1) {
return;
}
do {
DCHECK(!queue_.empty());
Func func = std::move(queue_.front());
queue_.pop();
lock.unlock();
try {
func();
} catch (std::exception const& ex) {
LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
<< folly::exceptionStr(ex);
} catch (...) {
LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
"object";
}
// Destroy the function (and the data it captures) before we acquire the
// lock again.
func = {};
lock.lock();
--scheduled_;
} while (scheduled_);
}
private:
std::mutex mutex_;
std::size_t scheduled_{0};
std::queue<Func> queue_;
};
SerialExecutor::SerialExecutor(KeepAlive<Executor> parent) SerialExecutor::SerialExecutor(KeepAlive<Executor> parent)
: parent_(std::move(parent)), : parent_(std::move(parent)) {}
taskQueueImpl_(std::make_shared<TaskQueueImpl>()) {}
SerialExecutor::~SerialExecutor() { SerialExecutor::~SerialExecutor() {
DCHECK(!keepAliveCounter_); DCHECK(!keepAliveCounter_);
...@@ -107,19 +57,54 @@ void SerialExecutor::keepAliveRelease() { ...@@ -107,19 +57,54 @@ void SerialExecutor::keepAliveRelease() {
} }
void SerialExecutor::add(Func func) { void SerialExecutor::add(Func func) {
taskQueueImpl_->add(std::move(func)); {
parent_->add([impl = taskQueueImpl_, keepAlive = getKeepAliveToken(this)] { std::lock_guard<std::mutex> lock(mutex_);
impl->run(); queue_.push(std::move(func));
}); }
parent_->add([keepAlive = getKeepAliveToken(this)] { keepAlive->run(); });
} }
void SerialExecutor::addWithPriority(Func func, int8_t priority) { void SerialExecutor::addWithPriority(Func func, int8_t priority) {
taskQueueImpl_->add(std::move(func)); {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(func));
}
parent_->addWithPriority( parent_->addWithPriority(
[impl = taskQueueImpl_, keepAlive = getKeepAliveToken(this)] { [keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }, priority);
impl->run(); }
},
priority); void SerialExecutor::run() {
std::unique_lock<std::mutex> lock(mutex_);
++scheduled_;
if (scheduled_ > 1) {
return;
}
do {
DCHECK(!queue_.empty());
Func func = std::move(queue_.front());
queue_.pop();
lock.unlock();
try {
func();
} catch (std::exception const& ex) {
LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
<< folly::exceptionStr(ex);
} catch (...) {
LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
"object";
}
// Destroy the function (and the data it captures) before we acquire the
// lock again.
func = {};
lock.lock();
--scheduled_;
} while (scheduled_);
} }
} // namespace folly } // namespace folly
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <mutex>
#include <queue>
#include <folly/executors/GlobalExecutor.h> #include <folly/executors/GlobalExecutor.h>
#include <folly/executors/SequencedExecutor.h> #include <folly/executors/SequencedExecutor.h>
...@@ -49,11 +51,10 @@ namespace folly { ...@@ -49,11 +51,10 @@ namespace folly {
class SerialExecutor : public SequencedExecutor { class SerialExecutor : public SequencedExecutor {
public: public:
~SerialExecutor() override;
SerialExecutor(SerialExecutor const&) = delete; SerialExecutor(SerialExecutor const&) = delete;
SerialExecutor& operator=(SerialExecutor const&) = delete; SerialExecutor& operator=(SerialExecutor const&) = delete;
SerialExecutor(SerialExecutor&&) = default; SerialExecutor(SerialExecutor&&) = delete;
SerialExecutor& operator=(SerialExecutor&&) = default; SerialExecutor& operator=(SerialExecutor&&) = delete;
static KeepAlive<SerialExecutor> create( static KeepAlive<SerialExecutor> create(
KeepAlive<Executor> parent = getKeepAliveToken(getCPUExecutor().get())); KeepAlive<Executor> parent = getKeepAliveToken(getCPUExecutor().get()));
...@@ -106,11 +107,14 @@ class SerialExecutor : public SequencedExecutor { ...@@ -106,11 +107,14 @@ class SerialExecutor : public SequencedExecutor {
private: private:
explicit SerialExecutor(KeepAlive<Executor> parent); explicit SerialExecutor(KeepAlive<Executor> parent);
~SerialExecutor() override;
class TaskQueueImpl; void run();
KeepAlive<Executor> parent_; KeepAlive<Executor> parent_;
std::shared_ptr<TaskQueueImpl> taskQueueImpl_; std::mutex mutex_;
std::size_t scheduled_{0};
std::queue<Func> queue_;
std::atomic<ssize_t> keepAliveCounter_{1}; std::atomic<ssize_t> keepAliveCounter_{1};
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment