Commit 08a2ff01 authored by Sven Over's avatar Sven Over Committed by Facebook Github Bot

fix dead-lock in Future when executor discards function

Summary:
This diff adds two tests to futures/test/ViaTest.cpp:
viaDummyExecutorFutureSetValueFirst and
viaDummyExecutorFutureSetCallbackFirst. The latter resulted in a
dead-lock before the fix contained in this diff.

It is important that the callback function is destroyed after
it is executed, since it may capture objects (like a Promise)
that should be destroyed (so that e.g. a corresponding Future
throws BrokenPromise). When the callback is executed via an
executor, it is possible that the executor doesn't get around to
executing the task. We shouldn't rely on the task being executed
to do necessary clean-up. That clean-up should (also) happen
when the task (with its captured data) is destroyed (in the
spirit of RIAA).

Reviewed By: djwatson

Differential Revision: D4779215

fbshipit-source-id: d029cf8b8f7b55e1b03357749c5fb62d95986ca7
parent 2a60bc19
...@@ -282,46 +282,27 @@ class Core final { ...@@ -282,46 +282,27 @@ class Core final {
} }
private: private:
class CountedReference { // Helper class that stores a pointer to the `Core` object and calls
// `derefCallback` and `detachOne` in the destructor.
class CoreAndCallbackReference {
public: public:
~CountedReference() { explicit CoreAndCallbackReference(Core* core) noexcept : core_(core) {}
if (core_) {
core_->detachOne();
core_ = nullptr;
}
}
explicit CountedReference(Core* core) noexcept : core_(core) {
// do not construct a CountedReference from nullptr!
DCHECK(core);
++core_->attached_; ~CoreAndCallbackReference() {
}
// CountedReference must be copy-constructable as long as
// folly::Executor::add takes a std::function
CountedReference(CountedReference const& o) noexcept : core_(o.core_) {
if (core_) { if (core_) {
++core_->attached_; core_->derefCallback();
core_->detachOne();
} }
} }
CountedReference& operator=(CountedReference const& o) noexcept { CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete;
~CountedReference(); CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) =
new (this) CountedReference(o); delete;
return *this;
}
CountedReference(CountedReference&& o) noexcept { CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept {
std::swap(core_, o.core_); std::swap(core_, o.core_);
} }
CountedReference& operator=(CountedReference&& o) noexcept {
~CountedReference();
new (this) CountedReference(std::move(o));
return *this;
}
Core* getCore() const noexcept { Core* getCore() const noexcept {
return core_; return core_;
} }
...@@ -357,23 +338,36 @@ class Core final { ...@@ -357,23 +338,36 @@ class Core final {
if (x) { if (x) {
exception_wrapper ew; exception_wrapper ew;
// We need to reset `callback_` after it was executed (which can happen
// through the executor or, if `Executor::add` throws, below). The
// executor might discard the function without executing it (now or
// later), in which case `callback_` also needs to be reset.
// The `Core` has to be kept alive throughout that time, too. Hence we
// increment `attached_` and `callbackReferences_` by two, and construct
// exactly two `CoreAndCallbackReference` objects, which call
// `derefCallback` and `detachOne` in their destructor. One will guard
// this scope, the other one will guard the lambda passed to the executor.
attached_ += 2;
callbackReferences_ += 2;
CoreAndCallbackReference guard_local_scope(this);
CoreAndCallbackReference guard_lambda(this);
try { try {
if (LIKELY(x->getNumPriorities() == 1)) { if (LIKELY(x->getNumPriorities() == 1)) {
x->add([core_ref = CountedReference(this)]() mutable { x->add([core_ref = std::move(guard_lambda)]() mutable {
auto cr = std::move(core_ref); auto cr = std::move(core_ref);
Core* const core = cr.getCore(); Core* const core = cr.getCore();
RequestContextScopeGuard rctx(core->context_); RequestContextScopeGuard rctx(core->context_);
SCOPE_EXIT { core->callback_ = {}; };
core->callback_(std::move(*core->result_)); core->callback_(std::move(*core->result_));
}); });
} else { } else {
x->addWithPriority([core_ref = CountedReference(this)]() mutable { x->addWithPriority(
auto cr = std::move(core_ref); [core_ref = std::move(guard_lambda)]() mutable {
Core* const core = cr.getCore(); auto cr = std::move(core_ref);
RequestContextScopeGuard rctx(core->context_); Core* const core = cr.getCore();
SCOPE_EXIT { core->callback_ = {}; }; RequestContextScopeGuard rctx(core->context_);
core->callback_(std::move(*core->result_)); core->callback_(std::move(*core->result_));
}, priority); },
priority);
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
ew = exception_wrapper(std::current_exception(), e); ew = exception_wrapper(std::current_exception(), e);
...@@ -381,16 +375,17 @@ class Core final { ...@@ -381,16 +375,17 @@ class Core final {
ew = exception_wrapper(std::current_exception()); ew = exception_wrapper(std::current_exception());
} }
if (ew) { if (ew) {
CountedReference core_ref(this);
RequestContextScopeGuard rctx(context_); RequestContextScopeGuard rctx(context_);
result_ = Try<T>(std::move(ew)); result_ = Try<T>(std::move(ew));
SCOPE_EXIT { callback_ = {}; };
callback_(std::move(*result_)); callback_(std::move(*result_));
} }
} else { } else {
CountedReference core_ref(this); attached_++;
SCOPE_EXIT {
callback_ = {};
detachOne();
};
RequestContextScopeGuard rctx(context_); RequestContextScopeGuard rctx(context_);
SCOPE_EXIT { callback_ = {}; };
callback_(std::move(*result_)); callback_(std::move(*result_));
} }
} }
...@@ -403,6 +398,11 @@ class Core final { ...@@ -403,6 +398,11 @@ class Core final {
} }
} }
void derefCallback() {
if (--callbackReferences_ == 0) {
callback_ = {};
}
}
folly::Function<void(Try<T>&&)> callback_; folly::Function<void(Try<T>&&)> callback_;
// place result_ next to increase the likelihood that the value will be // place result_ next to increase the likelihood that the value will be
...@@ -410,6 +410,7 @@ class Core final { ...@@ -410,6 +410,7 @@ class Core final {
folly::Optional<Try<T>> result_; folly::Optional<Try<T>> result_;
FSM<State> fsm_; FSM<State> fsm_;
std::atomic<unsigned char> attached_; std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_{0};
std::atomic<bool> active_ {true}; std::atomic<bool> active_ {true};
std::atomic<bool> interruptHandlerSet_ {false}; std::atomic<bool> interruptHandlerSet_ {false};
folly::MicroSpinLock interruptLock_ {0}; folly::MicroSpinLock interruptLock_ {0};
......
...@@ -472,6 +472,22 @@ TEST(Via, viaRaces) { ...@@ -472,6 +472,22 @@ TEST(Via, viaRaces) {
t2.join(); t2.join();
} }
TEST(Via, viaDummyExecutorFutureSetValueFirst) {
DummyDrivableExecutor x;
auto future = makeFuture().via(&x).then([] { return 42; });
EXPECT_THROW(future.get(), BrokenPromise);
}
TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
DummyDrivableExecutor x;
Promise<Unit> trigger;
auto future = trigger.getFuture().via(&x).then([] { return 42; });
trigger.setValue();
EXPECT_THROW(future.get(), BrokenPromise);
}
TEST(ViaFunc, liftsVoid) { TEST(ViaFunc, liftsVoid) {
ManualExecutor x; ManualExecutor x;
int count = 0; int count = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment