Commit 2825daef authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Implement weak reference for DefaultKeepAliveExecutor

Summary: Weak reference can be used if it's ok for some tasks to be dropped when Executor is destroyed.

Reviewed By: yfeldblum

Differential Revision: D8309958

fbshipit-source-id: 8ecc86a8be181a9eb695aa12305b2a30b6f4b60e
parent bba17c09
...@@ -35,6 +35,10 @@ class DefaultKeepAliveExecutor : public virtual Executor { ...@@ -35,6 +35,10 @@ class DefaultKeepAliveExecutor : public virtual Executor {
DCHECK(!keepAlive_); DCHECK(!keepAlive_);
} }
folly::Executor::KeepAlive<> weakRef() {
return WeakRef::create(controlBlock_, this);
}
protected: protected:
void joinKeepAlive() { void joinKeepAlive() {
DCHECK(keepAlive_); DCHECK(keepAlive_);
...@@ -43,24 +47,101 @@ class DefaultKeepAliveExecutor : public virtual Executor { ...@@ -43,24 +47,101 @@ class DefaultKeepAliveExecutor : public virtual Executor {
} }
private: private:
struct ControlBlock {
std::atomic<ssize_t> keepAliveCount_{1};
};
class WeakRef : public Executor {
public:
static folly::Executor::KeepAlive<> create(
std::shared_ptr<ControlBlock> controlBlock,
Executor* executor) {
return makeKeepAlive(new WeakRef(std::move(controlBlock), executor));
}
void add(Func f) override {
if (auto executor = lock()) {
executor->add(std::move(f));
}
}
void addWithPriority(Func f, int8_t priority) override {
if (auto executor = lock()) {
executor->addWithPriority(std::move(f), priority);
}
}
virtual uint8_t getNumPriorities() const override {
return numPriorities_;
}
private:
WeakRef(std::shared_ptr<ControlBlock> controlBlock, Executor* executor)
: controlBlock_(std::move(controlBlock)),
executor_(executor),
numPriorities_(executor->getNumPriorities()) {}
bool keepAliveAcquire() override {
auto keepAliveCount =
keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
// We should never increment from 0
DCHECK(keepAliveCount > 0);
return true;
}
void keepAliveRelease() override {
auto keepAliveCount =
keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK(keepAliveCount >= 1);
if (keepAliveCount == 1) {
delete this;
}
}
folly::Executor::KeepAlive<> lock() {
auto controlBlock =
controlBlock_->keepAliveCount_.load(std::memory_order_relaxed);
do {
if (controlBlock == 0) {
return {};
}
} while (!controlBlock_->keepAliveCount_.compare_exchange_weak(
controlBlock,
controlBlock + 1,
std::memory_order_release,
std::memory_order_relaxed));
return makeKeepAlive(executor_);
}
std::atomic<size_t> keepAliveCount_{1};
std::shared_ptr<ControlBlock> controlBlock_;
Executor* executor_;
uint8_t numPriorities_;
};
bool keepAliveAcquire() override { bool keepAliveAcquire() override {
auto keepAliveCounter = auto keepAliveCount =
keepAliveCounter_.fetch_add(1, std::memory_order_relaxed); controlBlock_->keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
// We should never increment from 0 // We should never increment from 0
DCHECK(keepAliveCounter > 0); DCHECK(keepAliveCount > 0);
return true; return true;
} }
void keepAliveRelease() override { void keepAliveRelease() override {
auto keepAliveCounter = --keepAliveCounter_; auto keepAliveCount =
DCHECK(keepAliveCounter >= 0); controlBlock_->keepAliveCount_.fetch_sub(1, std::memory_order_acquire);
DCHECK(keepAliveCount >= 1);
if (keepAliveCounter == 0) { if (keepAliveCount == 1) {
keepAliveReleaseBaton_.post(); keepAliveReleaseBaton_.post(); // std::memory_order_release
} }
} }
std::atomic<ssize_t> keepAliveCounter_{1}; std::shared_ptr<ControlBlock> controlBlock_{std::make_shared<ControlBlock>()};
Baton<> keepAliveReleaseBaton_; Baton<> keepAliveReleaseBaton_;
KeepAlive<DefaultKeepAliveExecutor> keepAlive_{ KeepAlive<DefaultKeepAliveExecutor> keepAlive_{
makeKeepAlive<DefaultKeepAliveExecutor>(this)}; makeKeepAlive<DefaultKeepAliveExecutor>(this)};
......
...@@ -730,3 +730,31 @@ TEST(ThreadPoolExecutorTest, AddPerf) { ...@@ -730,3 +730,31 @@ TEST(ThreadPoolExecutorTest, AddPerf) {
} }
e.stop(); e.stop();
} }
template <typename TPE>
static void WeakRefTest() {
// test that adding a .then() after we have
// started shutting down does not deadlock
folly::Optional<folly::Future<folly::Unit>> f;
int counter{0};
{
TPE fe(1);
f = folly::makeFuture()
.via(&fe)
.then([]() { burnMs(100)(); })
.then([&] { ++counter; })
.via(fe.weakRef())
.then([]() { burnMs(100)(); })
.then([&] { ++counter; });
}
EXPECT_THROW(f->get(), folly::BrokenPromise);
EXPECT_EQ(1, counter);
}
TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
WeakRefTest<IOThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
WeakRefTest<CPUThreadPoolExecutor>();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment