Commit d233e724 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Template KeepAlive with Executor type

Summary:
This allows using KeepAlive token as a pointer to an specific Executor sub-type (e.g. SequencedExecutor).
getKeepAliveToken() has to be a free function to achieve this goal.

Reviewed By: yfeldblum

Differential Revision: D7847884

fbshipit-source-id: 95dc62433d30212ceb9b9b2fc7d6b38cb5425311
parent 68a6b5b5
...@@ -62,7 +62,8 @@ class DefaultKeepAliveExecutor : public virtual Executor { ...@@ -62,7 +62,8 @@ class DefaultKeepAliveExecutor : public virtual Executor {
std::atomic<ssize_t> keepAliveCounter_{1}; std::atomic<ssize_t> keepAliveCounter_{1};
Baton<> keepAliveReleaseBaton_; Baton<> keepAliveReleaseBaton_;
KeepAlive keepAlive_{makeKeepAlive()}; KeepAlive<DefaultKeepAliveExecutor> keepAlive_{
makeKeepAlive<DefaultKeepAliveExecutor>(this)};
}; };
} // namespace folly } // namespace folly
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#pragma once #pragma once
#include <cassert>
#include <climits> #include <climits>
#include <folly/Function.h> #include <folly/Function.h>
...@@ -43,13 +44,39 @@ class Executor { ...@@ -43,13 +44,39 @@ class Executor {
return 1; return 1;
} }
static const int8_t LO_PRI = SCHAR_MIN; static const int8_t LO_PRI = SCHAR_MIN;
static const int8_t MID_PRI = 0; static const int8_t MID_PRI = 0;
static const int8_t HI_PRI = SCHAR_MAX; static const int8_t HI_PRI = SCHAR_MAX;
class KeepAliveDeleter {
public:
explicit KeepAliveDeleter(bool dummy) : dummy_(dummy) {}
void operator()(folly::Executor* executor) {
if (dummy_) {
return;
}
executor->keepAliveRelease();
}
private:
bool dummy_;
};
template <typename ExecutorT = Executor>
class KeepAlive { class KeepAlive {
public: public:
KeepAlive() : executor_(nullptr, Deleter(true)) {} KeepAlive() : executor_(nullptr, KeepAliveDeleter(true)) {}
template <typename OtherExecutor>
/* implicit */ KeepAlive(KeepAlive<OtherExecutor>&& other) : KeepAlive() {
*this = std::move(other);
}
template <typename OtherExecutor>
KeepAlive& operator=(KeepAlive<OtherExecutor>&& other) {
executor_ = std::move(other.executor_);
return *this;
}
void reset() { void reset() {
executor_.reset(); executor_.reset();
...@@ -59,38 +86,47 @@ class Executor { ...@@ -59,38 +86,47 @@ class Executor {
return executor_ != nullptr; return executor_ != nullptr;
} }
Executor* get() const { ExecutorT* get() const {
return executor_.get(); return executor_.get();
} }
ExecutorT& operator*() const {
return *get();
}
ExecutorT* operator->() const {
return get();
}
private: private:
friend class Executor; friend class Executor;
KeepAlive(folly::Executor* executor, bool dummy) template <typename OtherExecutor>
: executor_(executor, Deleter(dummy)) {} friend class KeepAlive;
struct Deleter { KeepAlive(ExecutorT* executor, bool dummy)
explicit Deleter(bool dummy) : dummy_(dummy) {} : executor_(executor, KeepAliveDeleter(dummy)) {}
void operator()(folly::Executor* executor) {
if (dummy_) {
return;
}
executor->keepAliveRelease();
}
private: std::unique_ptr<ExecutorT, KeepAliveDeleter> executor_;
bool dummy_;
};
std::unique_ptr<folly::Executor, Deleter> executor_;
}; };
/// Returns a keep-alive token which guarantees that Executor will keep template <typename ExecutorT>
/// processing tasks until the token is released (if supported by Executor). static KeepAlive<ExecutorT> getKeepAliveToken(ExecutorT* executor) {
/// KeepAlive always contains a valid pointer to an Executor. static_assert(
KeepAlive getKeepAliveToken() { std::is_base_of<Executor, ExecutorT>::value,
if (keepAliveAcquire()) { "getKeepAliveToken only works for folly::Executor implementations.");
return makeKeepAlive(); folly::Executor* executorPtr = executor;
if (executorPtr->keepAliveAcquire()) {
return makeKeepAlive<ExecutorT>(executor);
} }
return KeepAlive{this, true}; return makeKeepAliveDummy<ExecutorT>(executor);
}
template <typename ExecutorT>
static KeepAlive<ExecutorT> getKeepAliveToken(ExecutorT& executor) {
static_assert(
std::is_base_of<Executor, ExecutorT>::value,
"getKeepAliveToken only works for folly::Executor implementations.");
return getKeepAliveToken(&executor);
} }
protected: protected:
...@@ -101,9 +137,41 @@ class Executor { ...@@ -101,9 +137,41 @@ class Executor {
// Will never be called if keepAliveAcquire() returns false. // Will never be called if keepAliveAcquire() returns false.
virtual void keepAliveRelease(); virtual void keepAliveRelease();
KeepAlive makeKeepAlive() { template <typename ExecutorT>
return KeepAlive{this, false}; static KeepAlive<ExecutorT> makeKeepAlive(ExecutorT* executor) {
static_assert(
std::is_base_of<Executor, ExecutorT>::value,
"makeKeepAlive only works for folly::Executor implementations.");
return KeepAlive<ExecutorT>{executor, false};
}
private:
template <typename ExecutorT>
static KeepAlive<ExecutorT> makeKeepAliveDummy(ExecutorT* executor) {
static_assert(
std::is_base_of<Executor, ExecutorT>::value,
"makeKeepAliveDummy only works for folly::Executor implementations.");
return KeepAlive<ExecutorT>{executor, true};
} }
}; };
/// Returns a keep-alive token which guarantees that Executor will keep
/// processing tasks until the token is released (if supported by Executor).
/// KeepAlive always contains a valid pointer to an Executor.
template <typename ExecutorT>
Executor::KeepAlive<ExecutorT> getKeepAliveToken(ExecutorT* executor) {
static_assert(
std::is_base_of<Executor, ExecutorT>::value,
"getKeepAliveToken only works for folly::Executor implementations.");
return Executor::getKeepAliveToken(executor);
}
template <typename ExecutorT>
Executor::KeepAlive<ExecutorT> getKeepAliveToken(ExecutorT& executor) {
static_assert(
std::is_base_of<Executor, ExecutorT>::value,
"getKeepAliveToken only works for folly::Executor implementations.");
return getKeepAliveToken(&executor);
}
} // namespace folly } // namespace folly
...@@ -636,7 +636,7 @@ void keepAliveTest() { ...@@ -636,7 +636,7 @@ void keepAliveTest() {
auto f = auto f =
futures::sleep(std::chrono::milliseconds{100}) futures::sleep(std::chrono::milliseconds{100})
.via(executor.get()) .via(executor.get())
.then([keepAlive = executor->getKeepAliveToken()] { return 42; }); .then([keepAlive = getKeepAliveToken(executor.get())] { return 42; });
executor.reset(); executor.reset();
......
...@@ -56,7 +56,7 @@ inline void EventBaseLoopController::schedule() { ...@@ -56,7 +56,7 @@ inline void EventBaseLoopController::schedule() {
// Schedule it to run in current iteration. // Schedule it to run in current iteration.
if (!eventBaseKeepAlive_) { if (!eventBaseKeepAlive_) {
eventBaseKeepAlive_ = eventBase_->getKeepAliveToken(); eventBaseKeepAlive_ = getKeepAliveToken(eventBase_);
} }
eventBase_->getEventBase().runInLoop(&callback_, true); eventBase_->getEventBase().runInLoop(&callback_, true);
awaitingScheduling_ = false; awaitingScheduling_ = false;
...@@ -70,7 +70,7 @@ inline void EventBaseLoopController::runLoop() { ...@@ -70,7 +70,7 @@ inline void EventBaseLoopController::runLoop() {
if (!fm_->hasTasks()) { if (!fm_->hasTasks()) {
return; return;
} }
eventBaseKeepAlive_ = eventBase_->getKeepAliveToken(); eventBaseKeepAlive_ = getKeepAliveToken(eventBase_);
} }
if (loopRunner_) { if (loopRunner_) {
if (fm_->hasReadyTasks()) { if (fm_->hasReadyTasks()) {
......
...@@ -59,7 +59,7 @@ class EventBaseLoopController : public LoopController { ...@@ -59,7 +59,7 @@ class EventBaseLoopController : public LoopController {
bool awaitingScheduling_{false}; bool awaitingScheduling_{false};
VirtualEventBase* eventBase_{nullptr}; VirtualEventBase* eventBase_{nullptr};
Executor::KeepAlive eventBaseKeepAlive_; Executor::KeepAlive<VirtualEventBase> eventBaseKeepAlive_;
ControllerCallback callback_; ControllerCallback callback_;
FiberManager* fm_{nullptr}; FiberManager* fm_{nullptr};
std::atomic<bool> eventBaseAttached_{false}; std::atomic<bool> eventBaseAttached_{false};
......
...@@ -1736,7 +1736,7 @@ void waitViaImpl( ...@@ -1736,7 +1736,7 @@ void waitViaImpl(
} }
// Chain operations, ensuring that the executor is kept alive for the duration // Chain operations, ensuring that the executor is kept alive for the duration
f = std::move(f).via(e).then( f = std::move(f).via(e).then(
[keepAlive = e->getKeepAliveToken()](T&& t) { return std::move(t); }); [keepAlive = getKeepAliveToken(e)](T&& t) { return std::move(t); });
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
auto deadline = now + timeout; auto deadline = now + timeout;
while (!f.isReady() && (now < deadline)) { while (!f.isReady() && (now < deadline)) {
......
...@@ -17,29 +17,34 @@ ...@@ -17,29 +17,34 @@
namespace folly { namespace folly {
VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) { VirtualEventBase::VirtualEventBase(EventBase& evb)
evbLoopKeepAlive_ = evb_.getKeepAliveToken(); : evb_(getKeepAliveToken(evb)) {}
}
std::future<void> VirtualEventBase::destroy() { std::future<void> VirtualEventBase::destroy() {
CHECK(evb_.runInEventBaseThread([this] { loopKeepAlive_.reset(); })); CHECK(evb_->runInEventBaseThread([this] { loopKeepAlive_.reset(); }));
return std::move(destroyFuture_); return std::move(destroyFuture_);
} }
void VirtualEventBase::destroyImpl() { void VirtualEventBase::destroyImpl() {
// Make sure we release EventBase KeepAlive token even if exception occurs
auto evbLoopKeepAlive = std::move(evbLoopKeepAlive_);
try { try {
clearCobTimeouts(); {
// After destroyPromise_ is posted this object may be destroyed, so make
// sure we release EventBase's keep-alive token before that.
SCOPE_EXIT {
evb_.reset();
};
clearCobTimeouts();
onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) { onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) {
while (!callbacks.empty()) { while (!callbacks.empty()) {
auto& callback = callbacks.front(); auto& callback = callbacks.front();
callbacks.pop_front(); callbacks.pop_front();
callback.runLoopCallback(); callback.runLoopCallback();
} }
}); });
}
destroyPromise_.set_value(); destroyPromise_.set_value();
} catch (...) { } catch (...) {
...@@ -51,7 +56,7 @@ VirtualEventBase::~VirtualEventBase() { ...@@ -51,7 +56,7 @@ VirtualEventBase::~VirtualEventBase() {
if (!destroyFuture_.valid()) { if (!destroyFuture_.valid()) {
return; return;
} }
CHECK(!evb_.inRunningEventBaseThread()); CHECK(!evb_->inRunningEventBaseThread());
destroy().get(); destroy().get();
} }
......
...@@ -46,7 +46,7 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ...@@ -46,7 +46,7 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
~VirtualEventBase() override; ~VirtualEventBase() override;
EventBase& getEventBase() { EventBase& getEventBase() {
return evb_; return *evb_;
} }
/** /**
...@@ -72,41 +72,40 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ...@@ -72,41 +72,40 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
// KeepAlive token has to be released in the EventBase thread. If // KeepAlive token has to be released in the EventBase thread. If
// runInEventBaseThread() fails, we can't extract the KeepAlive token // runInEventBaseThread() fails, we can't extract the KeepAlive token
// from the callback to properly release it. // from the callback to properly release it.
CHECK(evb_.runInEventBaseThread([ CHECK(evb_->runInEventBaseThread(
keepAliveToken = getKeepAliveToken(), [keepAliveToken = getKeepAliveToken(this),
f = std::forward<F>(f) f = std::forward<F>(f)]() mutable { f(); }));
]() mutable { f(); }));
} }
HHWheelTimer& timer() { HHWheelTimer& timer() {
return evb_.timer(); return evb_->timer();
} }
void attachTimeoutManager( void attachTimeoutManager(
AsyncTimeout* obj, AsyncTimeout* obj,
TimeoutManager::InternalEnum internal) override { TimeoutManager::InternalEnum internal) override {
evb_.attachTimeoutManager(obj, internal); evb_->attachTimeoutManager(obj, internal);
} }
void detachTimeoutManager(AsyncTimeout* obj) override { void detachTimeoutManager(AsyncTimeout* obj) override {
evb_.detachTimeoutManager(obj); evb_->detachTimeoutManager(obj);
} }
bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
override { override {
return evb_.scheduleTimeout(obj, timeout); return evb_->scheduleTimeout(obj, timeout);
} }
void cancelTimeout(AsyncTimeout* obj) override { void cancelTimeout(AsyncTimeout* obj) override {
evb_.cancelTimeout(obj); evb_->cancelTimeout(obj);
} }
void bumpHandlingTime() override { void bumpHandlingTime() override {
evb_.bumpHandlingTime(); evb_->bumpHandlingTime();
} }
bool isInTimeoutManagerThread() override { bool isInTimeoutManagerThread() override {
return evb_.isInTimeoutManagerThread(); return evb_->isInTimeoutManagerThread();
} }
/** /**
...@@ -117,14 +116,14 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ...@@ -117,14 +116,14 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
} }
bool inRunningEventBaseThread() const { bool inRunningEventBaseThread() const {
return evb_.inRunningEventBaseThread(); return evb_->inRunningEventBaseThread();
} }
protected: protected:
bool keepAliveAcquire() override { bool keepAliveAcquire() override {
DCHECK(loopKeepAliveCount_ + loopKeepAliveCountAtomic_.load() > 0); DCHECK(loopKeepAliveCount_ + loopKeepAliveCountAtomic_.load() > 0);
if (evb_.inRunningEventBaseThread()) { if (evb_->inRunningEventBaseThread()) {
++loopKeepAliveCount_; ++loopKeepAliveCount_;
} else { } else {
++loopKeepAliveCountAtomic_; ++loopKeepAliveCountAtomic_;
...@@ -133,8 +132,8 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ...@@ -133,8 +132,8 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
} }
void keepAliveRelease() override { void keepAliveRelease() override {
if (!evb_.inRunningEventBaseThread()) { if (!evb_->inRunningEventBaseThread()) {
return evb_.add([=] { keepAliveRelease(); }); return evb_->add([=] { keepAliveRelease(); });
} }
if (loopKeepAliveCountAtomic_.load()) { if (loopKeepAliveCountAtomic_.load()) {
loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0); loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0);
...@@ -160,15 +159,14 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ...@@ -160,15 +159,14 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
using LoopCallbackList = EventBase::LoopCallback::List; using LoopCallbackList = EventBase::LoopCallback::List;
EventBase& evb_; KeepAlive<EventBase> evb_;
ssize_t loopKeepAliveCount_{1}; ssize_t loopKeepAliveCount_{1};
std::atomic<ssize_t> loopKeepAliveCountAtomic_{0}; std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
std::promise<void> destroyPromise_; std::promise<void> destroyPromise_;
std::future<void> destroyFuture_{destroyPromise_.get_future()}; std::future<void> destroyFuture_{destroyPromise_.get_future()};
KeepAlive loopKeepAlive_{makeKeepAlive()}; KeepAlive<VirtualEventBase> loopKeepAlive_{
makeKeepAlive<VirtualEventBase>(this)};
KeepAlive evbLoopKeepAlive_;
folly::Synchronized<LoopCallbackList> onDestructionCallbacks_; folly::Synchronized<LoopCallbackList> onDestructionCallbacks_;
}; };
......
...@@ -1805,11 +1805,11 @@ TEST(EventBaseTest, LoopKeepAlive) { ...@@ -1805,11 +1805,11 @@ TEST(EventBaseTest, LoopKeepAlive) {
EventBase evb; EventBase evb;
bool done = false; bool done = false;
std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable { std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for( /* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
evb.runInEventBaseThread( evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
}); });
evb.loop(); evb.loop();
...@@ -1826,11 +1826,11 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { ...@@ -1826,11 +1826,11 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
std::thread t; std::thread t;
evb.runInEventBaseThread([&] { evb.runInEventBaseThread([&] {
t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable { t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for( /* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
evb.runInEventBaseThread( evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
}); });
}); });
...@@ -1854,9 +1854,9 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { ...@@ -1854,9 +1854,9 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
{ {
auto* ev = evb.get(); auto* ev = evb.get();
Executor::KeepAlive keepAlive; Executor::KeepAlive<EventBase> keepAlive;
ev->runInEventBaseThreadAndWait( ev->runInEventBaseThreadAndWait(
[&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); }); [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
ASSERT_FALSE(done) << "Loop finished before we asked it to"; ASSERT_FALSE(done) << "Loop finished before we asked it to";
ev->terminateLoopSoon(); ev->terminateLoopSoon();
/* sleep override */ /* sleep override */
...@@ -1874,11 +1874,9 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) { ...@@ -1874,11 +1874,9 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) {
bool done = false; bool done = false;
std::thread t([ std::thread t([&done,
&done, loopKeepAlive = getKeepAliveToken(evb.get()),
loopKeepAlive = evb->getKeepAliveToken(), evbPtr = evb.get()]() mutable {
evbPtr = evb.get()
]() mutable {
/* sleep override */ std::this_thread::sleep_for( /* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
evbPtr->runInEventBaseThread( evbPtr->runInEventBaseThread(
...@@ -1907,10 +1905,10 @@ TEST(EventBaseTest, LoopKeepAliveAtomic) { ...@@ -1907,10 +1905,10 @@ TEST(EventBaseTest, LoopKeepAliveAtomic) {
} }
for (size_t i = 0; i < kNumThreads; ++i) { for (size_t i = 0; i < kNumThreads; ++i) {
ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] { ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
std::vector<Executor::KeepAlive> keepAlives; std::vector<Executor::KeepAlive<EventBase>> keepAlives;
for (size_t j = 0; j < kNumTasks; ++j) { for (size_t j = 0; j < kNumTasks; ++j) {
keepAlives.emplace_back(evbPtr->getKeepAliveToken()); keepAlives.emplace_back(getKeepAliveToken(evbPtr));
} }
batonPtr->post(); batonPtr->post();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment