Commit 9095cd53 authored by Sven Over's avatar Sven Over Committed by Facebook Github Bot 0

futures: fix behaviour when executors don't exec callback

Summary:
When future callbacks are to be executed by an executor (via `via`)
and the executor does not actually execute the callback function
(for whatever reason), then waiting for the final future (the one
returned by `via`) block forever. In case the callback function
that got passed to the executor gets destroyed without being executed,
the future should be set to a folly::BrokenPromise exception instead
of remaining unset forever.

This diff modifies the reference counting in folly::detail::Core
to make sure the reference held by the callback function is
properly removed not only after the callback gets executed, but
also when the callback is destroyed without having been executed.

Reviewed By: yfeldblum

Differential Revision: D3455931

fbshipit-source-id: debb6f3563384a658d1e0149a4aadbbcb268938c
parent 0cc6ea39
...@@ -73,7 +73,7 @@ enum class State : uint8_t { ...@@ -73,7 +73,7 @@ enum class State : uint8_t {
/// doesn't access a Future or Promise object from more than one thread at a /// doesn't access a Future or Promise object from more than one thread at a
/// time there won't be any problems. /// time there won't be any problems.
template<typename T> template<typename T>
class Core { class Core final {
static_assert(!std::is_void<T>::value, static_assert(!std::is_void<T>::value,
"void futures are not supported. Use Unit instead."); "void futures are not supported. Use Unit instead.");
public: public:
...@@ -300,7 +300,55 @@ class Core { ...@@ -300,7 +300,55 @@ class Core {
interruptHandler_ = std::move(fn); interruptHandler_ = std::move(fn);
} }
protected: private:
class CountedReference {
public:
~CountedReference() {
if (core_) {
core_->detachOne();
core_ = nullptr;
}
}
explicit CountedReference(Core* core) noexcept : core_(core) {
// do not construct a CountedReference from nullptr!
DCHECK(core);
++core_->attached_;
}
// CountedReference must be copy-constructable as long as
// folly::Executor::add takes a std::function
CountedReference(CountedReference const& o) noexcept : core_(o.core_) {
if (core_) {
++core_->attached_;
}
}
CountedReference& operator=(CountedReference const& o) noexcept {
~CountedReference();
new (this) CountedReference(o);
return *this;
}
CountedReference(CountedReference&& o) noexcept {
std::swap(core_, o.core_);
}
CountedReference& operator=(CountedReference&& o) noexcept {
~CountedReference();
new (this) CountedReference(std::move(o));
return *this;
}
Core* getCore() const noexcept {
return core_;
}
private:
Core* core_{nullptr};
};
void maybeCallback() { void maybeCallback() {
FSM_START(fsm_) FSM_START(fsm_)
case State::Armed: case State::Armed:
...@@ -326,35 +374,34 @@ class Core { ...@@ -326,35 +374,34 @@ class Core {
executorLock_.unlock(); executorLock_.unlock();
} }
// keep Core alive until callback did its thing
++attached_;
if (x) { if (x) {
try { try {
if (LIKELY(x->getNumPriorities() == 1)) { if (LIKELY(x->getNumPriorities() == 1)) {
x->add([this]() mutable { x->add([core_ref = CountedReference(this)]() mutable {
SCOPE_EXIT { detachOne(); }; auto cr = std::move(core_ref);
RequestContextScopeGuard rctx(context_); Core* const core = cr.getCore();
SCOPE_EXIT { callback_ = {}; }; RequestContextScopeGuard rctx(core->context_);
callback_(std::move(*result_)); SCOPE_EXIT { core->callback_ = {}; };
core->callback_(std::move(*core->result_));
}); });
} else { } else {
x->addWithPriority([this]() mutable { x->addWithPriority([core_ref = CountedReference(this)]() mutable {
SCOPE_EXIT { detachOne(); }; auto cr = std::move(core_ref);
RequestContextScopeGuard rctx(context_); Core* const core = cr.getCore();
SCOPE_EXIT { callback_ = {}; }; RequestContextScopeGuard rctx(core->context_);
callback_(std::move(*result_)); SCOPE_EXIT { core->callback_ = {}; };
core->callback_(std::move(*core->result_));
}, priority); }, priority);
} }
} catch (...) { } catch (...) {
--attached_; // Account for extra ++attached_ before try CountedReference core_ref(this);
RequestContextScopeGuard rctx(context_); RequestContextScopeGuard rctx(context_);
result_ = Try<T>(exception_wrapper(std::current_exception())); result_ = Try<T>(exception_wrapper(std::current_exception()));
SCOPE_EXIT { callback_ = {}; }; SCOPE_EXIT { callback_ = {}; };
callback_(std::move(*result_)); callback_(std::move(*result_));
} }
} else { } else {
SCOPE_EXIT { detachOne(); }; CountedReference core_ref(this);
RequestContextScopeGuard rctx(context_); RequestContextScopeGuard rctx(context_);
SCOPE_EXIT { callback_ = {}; }; SCOPE_EXIT { callback_ = {}; };
callback_(std::move(*result_)); callback_(std::move(*result_));
...@@ -362,10 +409,9 @@ class Core { ...@@ -362,10 +409,9 @@ class Core {
} }
void detachOne() { void detachOne() {
auto a = --attached_; auto a = attached_--;
assert(a >= 0); assert(a >= 1);
assert(a <= 2); if (a == 1) {
if (a == 0) {
delete this; delete this;
} }
} }
......
...@@ -210,3 +210,30 @@ TEST(Executor, CrappyExecutor) { ...@@ -210,3 +210,30 @@ TEST(Executor, CrappyExecutor) {
}); });
EXPECT_TRUE(flag); EXPECT_TRUE(flag);
} }
class DoNothingExecutor : public Executor {
public:
void add(Func f) override {
storedFunc_ = std::move(f);
}
private:
Func storedFunc_;
};
TEST(Executor, DoNothingExecutor) {
DoNothingExecutor x;
// Submit future callback to DoNothingExecutor
auto f = folly::via(&x).then([] { return 42; });
// Callback function is stored in DoNothingExecutor, but not executed.
EXPECT_FALSE(f.isReady());
// Destroy the function stored in DoNothingExecutor. The future callback
// will never get executed.
x.add({});
EXPECT_TRUE(f.isReady());
EXPECT_THROW(f.get(), folly::BrokenPromise);
}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/futures/InlineExecutor.h>
using namespace folly; using namespace folly;
...@@ -27,12 +28,51 @@ TEST(SelfDestruct, then) { ...@@ -27,12 +28,51 @@ TEST(SelfDestruct, then) {
return x + 1; return x + 1;
}); });
p->setValue(123); p->setValue(123);
EXPECT_EQ(future.get(), 124); EXPECT_EQ(124, future.get());
} }
TEST(SelfDestruct, ensure) { TEST(SelfDestruct, ensure) {
auto* p = new Promise<int>(); auto* p = new Promise<int>();
auto future = p->getFuture().ensure([p] { delete p; }); auto future = p->getFuture().ensure([p] { delete p; });
p->setValue(123); p->setValue(123);
EXPECT_EQ(future.get(), 123); EXPECT_EQ(123, future.get());
}
class ThrowingExecutorError : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};
class ThrowingExecutor : public folly::Executor {
public:
void add(folly::Func) override {
throw ThrowingExecutorError("ThrowingExecutor::add");
}
};
TEST(SelfDestruct, throwingExecutor) {
ThrowingExecutor executor;
auto* p = new Promise<int>();
auto future =
p->getFuture().via(&executor).onError([p](ThrowingExecutorError const&) {
delete p;
return 456;
});
p->setValue(123);
EXPECT_EQ(456, future.get());
}
TEST(SelfDestruct, throwingInlineExecutor) {
folly::InlineExecutor executor;
auto* p = new Promise<int>();
auto future = p->getFuture()
.via(&executor)
.then([p]() -> int {
delete p;
throw ThrowingExecutorError("callback throws");
})
.onError([](ThrowingExecutorError const&) { return 456; });
p->setValue(123);
EXPECT_EQ(456, future.get());
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment