Commit 1be516d1 authored by Hans Fugal's avatar Hans Fugal Committed by dcsommer

(wangle) express current Core functionality with a state machine

Summary:
This is a refactor of the current functionality to use a state machine (inheriting from `FSM`). This will make it easier to extend to do cancellation and Future collapsing. Performance is the same, maybe slightly faster (abt 1%). (I might be a little conservative with the atomics, it might be worth going through and reasoning about whether they all need to be atomic).

The state machine is two states, Waiting (no value), and Done (has a value). Transitioning to Done will execute the callback if it exists and we are active. Otherwise the callback will happen when it is set and active is true.

There is a subjective balancing act in place here, between making a state for every single mutable bit of information (which results in an explosion of states: hasValue X hasCallback X isActive X isCancelled …), and finding a sweet spot of expressivity. This isn't too far from the way Twitter did it, though we don't (yet) have all the states they have (and they don't have the concept of hot/cold futures). But I got there by way of replacing the `mutex_` with the state, after changing all those variables to atomics so they didn't need mutex protection (resulting in only `callback_` and `result_` needing it). I expect the state machine will morph as the rest of the functionality is added, but hopefully it will be easier to understand and keep correct (that's the idea anyway).

Test Plan: Tests still pass (and not by accident, I made several awesome mistakes along the way).

Reviewed By: davejwatson@fb.com

Subscribers: net-systems@, fugalh, exa, njormrod

FB internal diff: D1618240

Tasks: 4618297
parent 4c7a8438
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <folly/wangle/Promise.h> #include <folly/wangle/Promise.h>
#include <folly/wangle/Future.h> #include <folly/wangle/Future.h>
#include <folly/wangle/Executor.h> #include <folly/wangle/Executor.h>
#include <folly/wangle/detail/FSM.h>
namespace folly { namespace wangle { namespace detail { namespace folly { namespace wangle { namespace detail {
...@@ -36,14 +37,19 @@ namespace folly { namespace wangle { namespace detail { ...@@ -36,14 +37,19 @@ namespace folly { namespace wangle { namespace detail {
template<typename T> template<typename T>
void empty_callback(Try<T>&&) { } void empty_callback(Try<T>&&) { }
enum class State {
Waiting,
Done,
};
/** The shared state object for Future and Promise. */ /** The shared state object for Future and Promise. */
template<typename T> template<typename T>
class Core { class Core : protected FSM<State> {
public: public:
// This must be heap-constructed. There's probably a way to enforce that in // This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how // code but since this is just internal detail code and I don't know how
// off-hand, I'm punting. // off-hand, I'm punting.
Core() = default; Core() : FSM<State>(State::Waiting) {}
~Core() { ~Core() {
assert(calledBack_); assert(calledBack_);
assert(detached_ == 2); assert(detached_ == 2);
...@@ -67,36 +73,48 @@ class Core { ...@@ -67,36 +73,48 @@ class Core {
template <typename F> template <typename F>
void setCallback(F func) { void setCallback(F func) {
{ auto setCallback_ = [&]{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (callback_) { if (callback_) {
throw std::logic_error("setCallback called twice"); throw std::logic_error("setCallback called twice");
} }
callback_ = std::move(func); callback_ = std::move(func);
};
bool done = false;
while (!done) {
switch (getState()) {
case State::Waiting:
done = updateState(State::Waiting, State::Waiting, setCallback_);
break;
case State::Done:
done = updateState(State::Done, State::Done,
setCallback_,
[&]{ maybeCallback(); });
break;
}
} }
maybeCallback();
} }
void setResult(Try<T>&& t) { void setResult(Try<T>&& t) {
{ bool done = false;
std::lock_guard<decltype(mutex_)> lock(mutex_); while (!done) {
switch (getState()) {
if (ready()) { case State::Waiting:
done = updateState(State::Waiting, State::Done,
[&]{ result_ = std::move(t); },
[&]{ maybeCallback(); });
break;
case State::Done:
throw std::logic_error("setResult called twice"); throw std::logic_error("setResult called twice");
} }
result_ = std::move(t);
assert(ready());
} }
maybeCallback();
} }
bool ready() const { bool ready() const {
return result_.hasValue(); return getState() == State::Done;
} }
// Called by a destructing Future // Called by a destructing Future
...@@ -117,71 +135,54 @@ class Core { ...@@ -117,71 +135,54 @@ class Core {
} }
void deactivate() { void deactivate() {
std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = false; active_ = false;
} }
void activate() { void activate() {
{ active_ = true;
std::lock_guard<decltype(mutex_)> lock(mutex_); if (ready()) {
active_ = true; maybeCallback();
} }
maybeCallback();
} }
bool isActive() { return active_; } bool isActive() { return active_; }
void setExecutor(Executor* x) { void setExecutor(Executor* x) {
std::lock_guard<decltype(mutex_)> lock(mutex_);
executor_ = x; executor_ = x;
} }
private: private:
void maybeCallback() { void maybeCallback() {
std::unique_lock<decltype(mutex_)> lock(mutex_); assert(ready());
if (!calledBack_ && if (!calledBack_ && isActive() && callback_) {
result_ && callback_ && isActive()) { // TODO(5306911) we should probably try/catch
// TODO(5306911) we should probably try/catch here calledBack_ = true;
if (executor_) { Executor* x = executor_;
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_)); if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_)); MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); }); MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
calledBack_ = true; x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else { } else {
calledBack_ = true;
lock.unlock();
callback_(std::move(*result_)); callback_(std::move(*result_));
} }
} }
} }
void detachOne() { void detachOne() {
bool shouldDelete; if (++detached_ == 2) {
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
detached_++;
assert(detached_ == 1 || detached_ == 2);
shouldDelete = (detached_ == 2);
}
if (shouldDelete) {
// we should have already executed the callback with the value // we should have already executed the callback with the value
assert(calledBack_); assert(calledBack_);
delete this; delete this;
} }
assert(detached_ == 1 || detached_ == 2);
} }
folly::Optional<Try<T>> result_; folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_; std::function<void(Try<T>&&)> callback_;
bool calledBack_ = false; std::atomic<bool> calledBack_ {false};
unsigned char detached_ = 0; std::atomic<unsigned char> detached_ {0};
bool active_ = true; std::atomic<bool> active_ {true};
Executor* executor_ = nullptr; std::atomic<Executor*> executor_ {nullptr};
// this lock isn't meant to protect all accesses to members, only the ones
// that need to be threadsafe: the act of setting result_ and callback_, and
// seeing if they are set and whether we should then continue.
folly::MicroSpinLock mutex_ {0};
}; };
template <typename... Ts> template <typename... Ts>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment