Commit 2a196d5a authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot 6

Use folly::Function in folly::EventBase

Summary:[Folly] Use `folly::Function` in `folly::EventBase`.

`folly::Function` is moveable but noncopyable and therefore supports wrapping moveable but noncopyable lambdas - like the kind that arises when move-capturing a `std::unique_ptr`.

`std::function` is copyable - therefore it does not support wrapping such noncopyable lambdas.

Switching `folly::EventBase` to use it will allow callers to pass such noncopyable lambdas, allowing, e.g.:

```
auto numptr = folly::make_unique<int>(7); // unique_ptr is noncopyable
folly::EventBase eb;
eb.runInLoop([numptr = std::move(numptr)] { // therefore lambda is noncopyable
  int num = *numptr;
});
eb.loop();
```

This allows us to move away from the `folly::MoveWrapper` hack, which worked like:

```
auto numptr = folly::make_unique<int>(7); // unique_ptr is noncopyable
auto numptrw = folly::makeMoveWrapper(std::move(numptr)); // MoveWrapper is "copyable" - hacky
folly::EventBase eb;
eb.runInLoop([numptrw] { // therefore lambda is "copyable" - hacky
  int num = **numptrw;
});
```

We needed to do that hack while:

But neither condition is true anymore.

Reviewed By: spacedentist

Differential Revision: D3143931

fb-gh-sync-id: 4fbdf5fb77eb5848ed1c6de942b022382141577f
fbshipit-source-id: 4fbdf5fb77eb5848ed1c6de942b022382141577f
parent f549d777
...@@ -32,24 +32,20 @@ ...@@ -32,24 +32,20 @@
namespace { namespace {
using folly::Cob;
using folly::EventBase; using folly::EventBase;
template <typename Callback>
class FunctionLoopCallback : public EventBase::LoopCallback { class FunctionLoopCallback : public EventBase::LoopCallback {
public: public:
explicit FunctionLoopCallback(Cob&& function) explicit FunctionLoopCallback(EventBase::Func&& function)
: function_(std::move(function)) {} : function_(std::move(function)) {}
explicit FunctionLoopCallback(const Cob& function) : function_(function) {}
void runLoopCallback() noexcept override { void runLoopCallback() noexcept override {
function_(); function_();
delete this; delete this;
} }
private: private:
Callback function_; EventBase::Func function_;
}; };
} }
...@@ -59,10 +55,10 @@ namespace folly { ...@@ -59,10 +55,10 @@ namespace folly {
* EventBase::FunctionRunner * EventBase::FunctionRunner
*/ */
class EventBase::FunctionRunner : public NotificationQueue<Cob>::Consumer { class EventBase::FunctionRunner
: public NotificationQueue<EventBase::Func>::Consumer {
public: public:
void messageAvailable(Cob&& msg) override { void messageAvailable(Func&& msg) override {
// In libevent2, internal events do not break the loop. // In libevent2, internal events do not break the loop.
// Most users would expect loop(), followed by runInEventBaseThread(), // Most users would expect loop(), followed by runInEventBaseThread(),
// to break the loop and check if it should exit or not. // to break the loop and check if it should exit or not.
...@@ -508,20 +504,9 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { ...@@ -508,20 +504,9 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
} }
} }
void EventBase::runInLoop(const Cob& cob, bool thisIteration) { void EventBase::runInLoop(Func cob, bool thisIteration) {
DCHECK(isInEventBaseThread());
auto wrapper = new FunctionLoopCallback<Cob>(cob);
wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
runOnceCallbacks_->push_back(*wrapper);
} else {
loopCallbacks_.push_back(*wrapper);
}
}
void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
DCHECK(isInEventBaseThread()); DCHECK(isInEventBaseThread());
auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob)); auto wrapper = new FunctionLoopCallback(std::move(cob));
wrapper->context_ = RequestContext::saveContext(); wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) { if (runOnceCallbacks_ != nullptr && thisIteration) {
runOnceCallbacks_->push_back(*wrapper); runOnceCallbacks_->push_back(*wrapper);
...@@ -530,8 +515,8 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) { ...@@ -530,8 +515,8 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
} }
} }
void EventBase::runAfterDrain(Cob&& cob) { void EventBase::runAfterDrain(Func cob) {
auto callback = new FunctionLoopCallback<Cob>(std::move(cob)); auto callback = new FunctionLoopCallback(std::move(cob));
std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_); std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
callback->cancelLoopCallback(); callback->cancelLoopCallback();
runAfterDrainCallbacks_.push_back(*callback); runAfterDrainCallbacks_.push_back(*callback);
...@@ -549,7 +534,7 @@ void EventBase::runBeforeLoop(LoopCallback* callback) { ...@@ -549,7 +534,7 @@ void EventBase::runBeforeLoop(LoopCallback* callback) {
runBeforeLoopCallbacks_.push_back(*callback); runBeforeLoopCallbacks_.push_back(*callback);
} }
bool EventBase::runInEventBaseThread(const Cob& fn) { bool EventBase::runInEventBaseThread(Func fn) {
// Send the message. // Send the message.
// It will be received by the FunctionRunner in the EventBase's thread. // It will be received by the FunctionRunner in the EventBase's thread.
...@@ -562,13 +547,13 @@ bool EventBase::runInEventBaseThread(const Cob& fn) { ...@@ -562,13 +547,13 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
// Short-circuit if we are already in our event base // Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) { if (inRunningEventBaseThread()) {
runInLoop(fn); runInLoop(std::move(fn));
return true; return true;
} }
try { try {
queue_->putMessage(fn); queue_->putMessage(std::move(fn));
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function " LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
<< "for EventBase thread: " << ex.what(); << "for EventBase thread: " << ex.what();
...@@ -578,7 +563,7 @@ bool EventBase::runInEventBaseThread(const Cob& fn) { ...@@ -578,7 +563,7 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
return true; return true;
} }
bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) { bool EventBase::runInEventBaseThreadAndWait(Func fn) {
if (inRunningEventBaseThread()) { if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed"; << "allowed";
...@@ -605,28 +590,30 @@ bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) { ...@@ -605,28 +590,30 @@ bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
return true; return true;
} }
bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) { bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
if (isInEventBaseThread()) { if (isInEventBaseThread()) {
fn(); fn();
return true; return true;
} else { } else {
return runInEventBaseThreadAndWait(fn); return runInEventBaseThreadAndWait(std::move(fn));
} }
} }
void EventBase::runAfterDelay(const Cob& cob, void EventBase::runAfterDelay(
Func cob,
uint32_t milliseconds, uint32_t milliseconds,
TimeoutManager::InternalEnum in) { TimeoutManager::InternalEnum in) {
if (!tryRunAfterDelay(cob, milliseconds, in)) { if (!tryRunAfterDelay(std::move(cob), milliseconds, in)) {
folly::throwSystemError( folly::throwSystemError(
"error in EventBase::runAfterDelay(), failed to schedule timeout"); "error in EventBase::runAfterDelay(), failed to schedule timeout");
} }
} }
bool EventBase::tryRunAfterDelay(const Cob& cob, bool EventBase::tryRunAfterDelay(
Func cob,
uint32_t milliseconds, uint32_t milliseconds,
TimeoutManager::InternalEnum in) { TimeoutManager::InternalEnum in) {
CobTimeout* timeout = new CobTimeout(this, cob, in); CobTimeout* timeout = new CobTimeout(this, std::move(cob), in);
if (!timeout->scheduleTimeout(milliseconds)) { if (!timeout->scheduleTimeout(milliseconds)) {
delete timeout; delete timeout;
return false; return false;
...@@ -667,7 +654,7 @@ bool EventBase::runLoopCallbacks(bool setContext) { ...@@ -667,7 +654,7 @@ bool EventBase::runLoopCallbacks(bool setContext) {
void EventBase::initNotificationQueue() { void EventBase::initNotificationQueue() {
// Infinite size queue // Infinite size queue
queue_.reset(new NotificationQueue<Cob>()); queue_.reset(new NotificationQueue<Func>());
// We allocate fnRunner_ separately, rather than declaring it directly // We allocate fnRunner_ separately, rather than declaring it directly
// as a member of EventBase solely so that we don't need to include // as a member of EventBase solely so that we don't need to include
......
...@@ -33,7 +33,9 @@ ...@@ -33,7 +33,9 @@
#include <boost/intrusive/list.hpp> #include <boost/intrusive/list.hpp>
#include <boost/utility.hpp> #include <boost/utility.hpp>
#include <folly/Executor.h> #include <folly/Executor.h>
#include <folly/Function.h>
#include <folly/Portability.h> #include <folly/Portability.h>
#include <folly/experimental/ExecutionObserver.h> #include <folly/experimental/ExecutionObserver.h>
#include <folly/futures/DrivableExecutor.h> #include <folly/futures/DrivableExecutor.h>
...@@ -119,6 +121,8 @@ class EventBase : private boost::noncopyable, ...@@ -119,6 +121,8 @@ class EventBase : private boost::noncopyable,
public TimeoutManager, public TimeoutManager,
public DrivableExecutor { public DrivableExecutor {
public: public:
using Func = folly::Function<void()>;
/** /**
* A callback interface to use with runInLoop() * A callback interface to use with runInLoop()
* *
...@@ -304,9 +308,7 @@ class EventBase : private boost::noncopyable, ...@@ -304,9 +308,7 @@ class EventBase : private boost::noncopyable,
* *
* Use runInEventBaseThread() to schedule functions from another thread. * Use runInEventBaseThread() to schedule functions from another thread.
*/ */
void runInLoop(const Cob& c, bool thisIteration = false); void runInLoop(Func c, bool thisIteration = false);
void runInLoop(Cob&& c, bool thisIteration = false);
/** /**
* Adds the given callback to a queue of things run before destruction * Adds the given callback to a queue of things run before destruction
...@@ -327,7 +329,7 @@ class EventBase : private boost::noncopyable, ...@@ -327,7 +329,7 @@ class EventBase : private boost::noncopyable,
* Note: will be called from the thread that invoked EventBase destructor, * Note: will be called from the thread that invoked EventBase destructor,
* after the final run of loop callbacks. * after the final run of loop callbacks.
*/ */
void runAfterDrain(Cob&& cob); void runAfterDrain(Func cob);
/** /**
* Adds a callback that will run immediately *before* the event loop. * Adds a callback that will run immediately *before* the event loop.
...@@ -379,7 +381,7 @@ class EventBase : private boost::noncopyable, ...@@ -379,7 +381,7 @@ class EventBase : private boost::noncopyable,
* *
* The function must not throw any exceptions. * The function must not throw any exceptions.
*/ */
bool runInEventBaseThread(const Cob& fn); bool runInEventBaseThread(Func fn);
/* /*
* Like runInEventBaseThread, but the caller waits for the callback to be * Like runInEventBaseThread, but the caller waits for the callback to be
...@@ -392,7 +394,7 @@ class EventBase : private boost::noncopyable, ...@@ -392,7 +394,7 @@ class EventBase : private boost::noncopyable,
* Like runInEventBaseThread, but the caller waits for the callback to be * Like runInEventBaseThread, but the caller waits for the callback to be
* executed. * executed.
*/ */
bool runInEventBaseThreadAndWait(const Cob& fn); bool runInEventBaseThreadAndWait(Func fn);
/* /*
* Like runInEventBaseThreadAndWait, except if the caller is already in the * Like runInEventBaseThreadAndWait, except if the caller is already in the
...@@ -405,7 +407,7 @@ class EventBase : private boost::noncopyable, ...@@ -405,7 +407,7 @@ class EventBase : private boost::noncopyable,
* Like runInEventBaseThreadAndWait, except if the caller is already in the * Like runInEventBaseThreadAndWait, except if the caller is already in the
* event base thread, the functor is simply run inline. * event base thread, the functor is simply run inline.
*/ */
bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn); bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
/** /**
* Runs the given Cob at some time after the specified number of * Runs the given Cob at some time after the specified number of
...@@ -414,7 +416,7 @@ class EventBase : private boost::noncopyable, ...@@ -414,7 +416,7 @@ class EventBase : private boost::noncopyable,
* Throws a std::system_error if an error occurs. * Throws a std::system_error if an error occurs.
*/ */
void runAfterDelay( void runAfterDelay(
const Cob& c, Func c,
uint32_t milliseconds, uint32_t milliseconds,
TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
...@@ -425,7 +427,7 @@ class EventBase : private boost::noncopyable, ...@@ -425,7 +427,7 @@ class EventBase : private boost::noncopyable,
* *
* */ * */
bool tryRunAfterDelay( bool tryRunAfterDelay(
const Cob& cob, Func cob,
uint32_t milliseconds, uint32_t milliseconds,
TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
...@@ -434,10 +436,10 @@ class EventBase : private boost::noncopyable, ...@@ -434,10 +436,10 @@ class EventBase : private boost::noncopyable,
* called when that latency is exceeded. * called when that latency is exceeded.
* OBS: This functionality depends on time-measurement. * OBS: This functionality depends on time-measurement.
*/ */
void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) { void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
assert(enableTimeMeasurement_); assert(enableTimeMeasurement_);
maxLatency_ = maxLatency; maxLatency_ = maxLatency;
maxLatencyCob_ = maxLatencyCob; maxLatencyCob_ = std::move(maxLatencyCob);
} }
...@@ -576,7 +578,7 @@ class EventBase : private boost::noncopyable, ...@@ -576,7 +578,7 @@ class EventBase : private boost::noncopyable,
void add(Cob fn) override { void add(Cob fn) override {
// runInEventBaseThread() takes a const&, // runInEventBaseThread() takes a const&,
// so no point in doing std::move here. // so no point in doing std::move here.
runInEventBaseThread(fn); runInEventBaseThread(std::move(fn));
} }
/// Implements the DrivableExecutor interface /// Implements the DrivableExecutor interface
...@@ -614,13 +616,13 @@ class EventBase : private boost::noncopyable, ...@@ -614,13 +616,13 @@ class EventBase : private boost::noncopyable,
// appropriate client-provided Cob // appropriate client-provided Cob
class CobTimeout : public AsyncTimeout { class CobTimeout : public AsyncTimeout {
public: public:
CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in) CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
: AsyncTimeout(b, in), cob_(c) {} : AsyncTimeout(b, in), cob_(std::move(c)) {}
virtual void timeoutExpired() noexcept; virtual void timeoutExpired() noexcept;
private: private:
Cob cob_; Func cob_;
public: public:
typedef boost::intrusive::list_member_hook< typedef boost::intrusive::list_member_hook<
...@@ -673,7 +675,7 @@ class EventBase : private boost::noncopyable, ...@@ -673,7 +675,7 @@ class EventBase : private boost::noncopyable,
// A notification queue for runInEventBaseThread() to use // A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread. // to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<Cob>> queue_; std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_; std::unique_ptr<FunctionRunner> fnRunner_;
// limit for latency in microseconds (0 disables) // limit for latency in microseconds (0 disables)
...@@ -688,7 +690,7 @@ class EventBase : private boost::noncopyable, ...@@ -688,7 +690,7 @@ class EventBase : private boost::noncopyable,
SmoothLoopTime maxLatencyLoopTime_; SmoothLoopTime maxLatencyLoopTime_;
// callback called when latency limit is exceeded // callback called when latency limit is exceeded
Cob maxLatencyCob_; Func maxLatencyCob_;
// Enables/disables time measurements in loopBody(). if disabled, the // Enables/disables time measurements in loopBody(). if disabled, the
// following functionality that relies on time-measurement, will not // following functionality that relies on time-measurement, will not
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment