Commit 64b6d1b6 authored by Matthieu Martin's avatar Matthieu Martin Committed by Facebook Github Bot

Implement KeepAlive in AsyncioLoopController

Summary:
I'm continue to try keeping the implementation simple, instead of copy-pasting EventBase/EventBaseLoopController code.
My understanding is that AsyncioExecutor/AsyncioLoopController will always be single threaded because they are tied to a unique asyncio event_loop, so I don't need an atomic counter.
Then it should be cheap to just always aquire/release the counter in runLoop.

Reviewed By: andriigrynenko

Differential Revision: D7875202

fbshipit-source-id: 900a03e6870c5e4828521a99785cfa976baac54a
parent 7672324a
...@@ -92,15 +92,16 @@ inline void EventBaseLoopController::scheduleThreadSafe() { ...@@ -92,15 +92,16 @@ inline void EventBaseLoopController::scheduleThreadSafe() {
3) We fulfill the promise from the other thread. */ 3) We fulfill the promise from the other thread. */
assert(eventBaseAttached_); assert(eventBaseAttached_);
eventBase_->runInEventBaseThread([this]() { eventBase_->runInEventBaseThread(
if (fm_->shouldRunLoopRemote()) { [this, eventBaseKeepAlive = getKeepAliveToken(eventBase_)]() {
return runLoop(); if (fm_->shouldRunLoopRemote()) {
} return runLoop();
}
if (!fm_->hasTasks()) {
eventBaseKeepAlive_.reset(); if (!fm_->hasTasks()) {
} eventBaseKeepAlive_.reset();
}); }
});
} }
inline void EventBaseLoopController::timedSchedule( inline void EventBaseLoopController::timedSchedule(
......
...@@ -28,6 +28,13 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor { ...@@ -28,6 +28,13 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
public: public:
using Func = folly::Func; using Func = folly::Func;
~AsyncioExecutor() override {
keepAliveRelease();
while (keepAliveCounter_ > 0) {
drive();
}
}
void add(Func func) override { void add(Func func) override {
queue_.putMessage(std::move(func)); queue_.putMessage(std::move(func));
} }
...@@ -48,9 +55,24 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor { ...@@ -48,9 +55,24 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
}); });
} }
protected:
bool keepAliveAcquire() override {
auto keepAliveCounter =
keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
// We should never increment from 0
DCHECK(keepAliveCounter > 0);
return true;
}
void keepAliveRelease() override {
auto keepAliveCounter = --keepAliveCounter_;
DCHECK(keepAliveCounter >= 0);
}
private: private:
folly::NotificationQueue<Func> queue_; folly::NotificationQueue<Func> queue_;
folly::NotificationQueue<Func>::SimpleConsumer consumer_{queue_}; folly::NotificationQueue<Func>::SimpleConsumer consumer_{queue_};
std::atomic<size_t> keepAliveCounter_{1};
}; // AsyncioExecutor }; // AsyncioExecutor
} // namespace python } // namespace python
......
...@@ -30,21 +30,32 @@ inline void AsyncioLoopController::setFiberManager(fibers::FiberManager* fm) { ...@@ -30,21 +30,32 @@ inline void AsyncioLoopController::setFiberManager(fibers::FiberManager* fm) {
inline void AsyncioLoopController::schedule() { inline void AsyncioLoopController::schedule() {
// add() is thread-safe, so this isn't properly optimized for addTask() // add() is thread-safe, so this isn't properly optimized for addTask()
if (!executorKeepAlive_) {
executorKeepAlive_ = getKeepAliveToken(executor_);
}
executor_->add([this]() { return runLoop(); }); executor_->add([this]() { return runLoop(); });
} }
inline void AsyncioLoopController::runLoop() { inline void AsyncioLoopController::runLoop() {
if (fm_->hasTasks()) { if (!executorKeepAlive_) {
fm_->loopUntilNoReadyImpl(); if (!fm_->hasTasks()) {
return;
}
executorKeepAlive_ = getKeepAliveToken(executor_);
}
fm_->loopUntilNoReadyImpl();
if (!fm_->hasTasks()) {
executorKeepAlive_.reset();
} }
} }
inline void AsyncioLoopController::scheduleThreadSafe() { inline void AsyncioLoopController::scheduleThreadSafe() {
executor_->add([this]() { executor_->add(
if (fm_->shouldRunLoopRemote()) { [this, executorKeepAlive = getKeepAliveToken(executor_)]() mutable {
return runLoop(); if (fm_->shouldRunLoopRemote()) {
} return runLoop();
}); }
});
} }
inline void AsyncioLoopController::timedSchedule( inline void AsyncioLoopController::timedSchedule(
......
...@@ -31,8 +31,8 @@ class AsyncioLoopController : public fibers::LoopController { ...@@ -31,8 +31,8 @@ class AsyncioLoopController : public fibers::LoopController {
~AsyncioLoopController() override; ~AsyncioLoopController() override;
private: private:
// TODO: KeepAlive token to guarantee lifetime
AsyncioExecutor* executor_; AsyncioExecutor* executor_;
Executor::KeepAlive<AsyncioExecutor> executorKeepAlive_;
fibers::FiberManager* fm_{nullptr}; fibers::FiberManager* fm_{nullptr};
void setFiberManager(fibers::FiberManager* fm) override; void setFiberManager(fibers::FiberManager* fm) override;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment