Commit 173044e4 authored by Hans Fugal's avatar Hans Fugal Committed by woo

Rework the Future::Core state machine

Summary:
There was a race reading `callback_` in `maybeCallback` and setting `callback_` in `setCallback`. This diff reworks the state machine to make this unpossible. To avoid the explosion of states due to the cross-product of has-interrupt-handler/has-been-interrupted/etc. I introduce a separate lock for setting interrupt handler and interruption, since this is primarily orthogonal. Other attributes (active, for example) are still atomic variables, and while somewhat tied into the state machine logically (e.g. transitioning from Armed to Done only happens when active) they are mostly independent, keeping the state machine simple (and probably faster).

I think it may even be possible to do some things cheaper. In some states, we may not need to protect the writing of `callback_` and `result_`. But we'd need to enforce some ordering so I'm not going to try to tackle that. But that could be some speedup if we can do it cheaply.

Test Plan:
Builds and all existing tests pass.

Reviewed By: rockyliu4@fb.com

Subscribers: yfeldblum, stepan, trunkagent, exa, folly-diffs@, jsedgwick

FB internal diff: D1807854

Tasks: 6087856

Signature: t1:1807854:1422656713:25b62706cd7952b2dde06dab08074f8030db456b
parent ba243592
......@@ -34,28 +34,52 @@
namespace folly { namespace detail {
// As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
// to functions, using a helper avoids a call to malloc.
template<typename T>
void empty_callback(Try<T>&&) { }
/*
OnlyCallback
/ \
Start Armed - Done
\ /
OnlyResult
This state machine is fairly self-explanatory. The most important bit is
that the callback is only executed on the transition from Armed to Done,
and that transition can happen immediately after transitioning from Only*
to Armed, if it is active (the usual case).
*/
enum class State {
Waiting,
Interruptible,
Interrupted,
Start,
OnlyResult,
OnlyCallback,
Armed,
Done,
};
/** The shared state object for Future and Promise. */
/// The shared state object for Future and Promise.
/// Some methods must only be called by either the Future thread or the
/// Promise thread. The Future thread is the thread that currently "owns" the
/// Future and its callback-related operations, and the Promise thread is
/// likewise the thread that currently "owns" the Promise and its
/// result-related operations. Also, Futures own interruption, Promises own
/// interrupt handlers. Unfortunately, there are things that users can do to
/// break this, and we can't detect that. However if they follow move
/// semantics religiously wrt threading, they should be ok.
///
/// It's worth pointing out that Futures and/or Promises can and usually will
/// migrate between threads, though this usually happens within the API code.
/// For example, an async operation will probably make a Promise, grab its
/// Future, then move the Promise into another thread that will eventually
/// fulfil it. With executors and via, this gets slightly more complicated at
/// first blush, but it's the same principle. In general, as long as the user
/// doesn't access a Future or Promise object from more than one thread at a
/// time there won't be any problems.
template<typename T>
class Core : protected FSM<State> {
public:
// This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how
// off-hand, I'm punting.
Core() : FSM<State>(State::Waiting) {}
/// This must be heap-constructed. There's probably a way to enforce that in
/// code but since this is just internal detail code and I don't know how
/// off-hand, I'm punting.
Core() : FSM<State>(State::Start) {}
~Core() {
assert(calledBack_);
assert(detached_ == 2);
}
......@@ -67,6 +91,26 @@ class Core : protected FSM<State> {
Core(Core&&) noexcept = delete;
Core& operator=(Core&&) = delete;
/// May call from any thread
bool hasResult() const {
switch (getState()) {
case State::OnlyResult:
case State::Armed:
case State::Done:
assert(!!result_);
return true;
default:
return false;
}
}
/// May call from any thread
bool ready() const {
return hasResult();
}
/// May call from any thread
Try<T>& getTry() {
if (ready()) {
return *result_;
......@@ -75,138 +119,148 @@ class Core : protected FSM<State> {
}
}
/// Call only from Future thread.
template <typename F>
void setCallback(F func) {
bool transitionToArmed = false;
auto setCallback_ = [&]{
if (callback_) {
throw std::logic_error("setCallback called twice");
}
context_ = RequestContext::saveContext();
callback_ = std::move(func);
};
FSM_START
case State::Waiting:
case State::Interruptible:
case State::Interrupted:
FSM_UPDATE(state, setCallback_);
case State::Start:
FSM_UPDATE(State::OnlyCallback, setCallback_);
break;
case State::Done:
FSM_UPDATE2(State::Done,
setCallback_,
[&]{ maybeCallback(); });
case State::OnlyResult:
FSM_UPDATE(State::Armed, setCallback_);
transitionToArmed = true;
break;
case State::OnlyCallback:
case State::Armed:
case State::Done:
throw std::logic_error("setCallback called twice");
FSM_END
// we could always call this, it is an optimization to only call it when
// it might be needed.
if (transitionToArmed) {
maybeCallback();
}
}
/// Call only from Promise thread
void setResult(Try<T>&& t) {
bool transitionToArmed = false;
auto setResult_ = [&]{ result_ = std::move(t); };
FSM_START
case State::Waiting:
case State::Interruptible:
case State::Interrupted:
FSM_UPDATE2(State::Done,
[&]{ result_ = std::move(t); },
[&]{ maybeCallback(); });
case State::Start:
FSM_UPDATE(State::OnlyResult, setResult_);
break;
case State::OnlyCallback:
FSM_UPDATE(State::Armed, setResult_);
transitionToArmed = true;
break;
case State::OnlyResult:
case State::Armed:
case State::Done:
throw std::logic_error("setResult called twice");
FSM_END
}
bool ready() const {
return getState() == State::Done;
if (transitionToArmed) {
maybeCallback();
}
}
// Called by a destructing Future
/// Called by a destructing Future (in the Future thread, by definition)
void detachFuture() {
if (!callback_) {
setCallback(empty_callback<T>);
}
activate();
detachOne();
}
// Called by a destructing Promise
/// Called by a destructing Promise (in the Promise thread, by definition)
void detachPromise() {
if (!ready()) {
// detachPromise() and setResult() should never be called in parallel
// so we don't need to protect this.
if (!result_) {
setResult(Try<T>(exception_wrapper(BrokenPromise())));
}
detachOne();
}
/// May call from any thread
void deactivate() {
active_ = false;
}
/// May call from any thread
void activate() {
active_ = true;
if (ready()) {
maybeCallback();
}
maybeCallback();
}
/// May call from any thread
bool isActive() { return active_; }
/// Call only from Future thread
void setExecutor(Executor* x) {
executor_ = x;
}
void raise(exception_wrapper const& e) {
FSM_START
case State::Interruptible:
FSM_UPDATE2(State::Interrupted,
[&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); },
[&]{ interruptHandler_(*interrupt_); });
break;
case State::Waiting:
case State::Interrupted:
FSM_UPDATE(State::Interrupted,
[&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); });
break;
case State::Done:
FSM_BREAK
FSM_END
/// Call only from Future thread
void raise(exception_wrapper e) {
std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
if (!interrupt_ && !hasResult()) {
interrupt_ = std::move(e);
if (interruptHandler_) {
interruptHandler_(interrupt_);
}
}
}
/// Call only from Promise thread
void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
FSM_START
case State::Waiting:
case State::Interruptible:
FSM_UPDATE(State::Interruptible,
[&]{ interruptHandler_ = std::move(fn); });
break;
std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
if (!hasResult()) {
if (!!interrupt_) {
fn(interrupt_);
} else {
interruptHandler_ = std::move(fn);
}
}
}
case State::Interrupted:
fn(*interrupt_);
private:
void maybeCallback() {
FSM_START
case State::Armed:
if (active_) {
FSM_UPDATE2(State::Done, []{}, std::bind(&Core::doCallback, this));
}
FSM_BREAK
case State::Done:
default:
FSM_BREAK
FSM_END
}
private:
void maybeCallback() {
assert(ready());
if (isActive() && callback_) {
if (!calledBack_.exchange(true)) {
// TODO(5306911) we should probably try/catch
Executor* x = executor_;
RequestContext::setContext(context_);
if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else {
callback_(std::move(*result_));
}
}
void doCallback() {
// TODO(5306911) we should probably try/catch around the callback
RequestContext::setContext(context_);
// TODO(6115514) semantic race on reading executor_ and setExecutor()
Executor* x = executor_;
if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
MoveWrapper<Try<T>> val(std::move(*result_));
x->add([cb, val]() mutable { (*cb)(std::move(*val)); });
} else {
callback_(std::move(*result_));
}
}
......@@ -215,8 +269,6 @@ class Core : protected FSM<State> {
assert(d >= 1);
assert(d <= 2);
if (d == 2) {
// we should have already executed the callback with the value
assert(calledBack_);
delete this;
}
}
......@@ -224,12 +276,12 @@ class Core : protected FSM<State> {
folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
std::shared_ptr<RequestContext> context_{nullptr};
std::atomic<bool> calledBack_ {false};
std::atomic<unsigned char> detached_ {0};
std::atomic<bool> active_ {true};
std::atomic<Executor*> executor_ {nullptr};
std::unique_ptr<exception_wrapper> interrupt_;
exception_wrapper interrupt_;
std::function<void(exception_wrapper const&)> interruptHandler_;
folly::MicroSpinLock interruptLock_ {0};
};
template <typename... Ts>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment