Commit cd498c15 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

Use a simpler atomic state in Futures Core

Summary:
[Folly] Use a simpler atomic state in Futures `Core` v.s. `FSM` which abstracts an atomic state plus a spin-lock.

The `FSM` abstraction can be useful in more complex cases, but is not needed in the current case. All that is needed is a single barrier state, which is `Done` - setting callback and result happen before state transitions to `Done`, while invoking callback with result happens after state transitions to `Done`.

Reviewed By: marshallcline

Differential Revision: D8083703

fbshipit-source-id: 42d96b41cbdac5e7c42f3f8a661d5db3113655db
parent f7f840c3
......@@ -498,7 +498,6 @@ if (BUILD_TESTS)
TEST context_test SOURCES ContextTest.cpp
TEST core_test SOURCES CoreTest.cpp
TEST ensure_test SOURCES EnsureTest.cpp
TEST fsm_test SOURCES FSMTest.cpp
TEST filter_test SOURCES FilterTest.cpp
TEST future_splitter_test SOURCES FutureSplitterTest.cpp
# MSVC SFINAE bug
......
......@@ -222,7 +222,6 @@ nobase_follyinclude_HEADERS = \
futures/SharedPromise-inl.h \
futures/ThreadWheelTimekeeper.h \
futures/detail/Core.h \
futures/detail/FSM.h \
futures/detail/Types.h \
futures/test/TestExecutor.h \
hash/Checksum.h \
......
......@@ -28,7 +28,6 @@
#include <folly/ScopeGuard.h>
#include <folly/Try.h>
#include <folly/Utility.h>
#include <folly/futures/detail/FSM.h>
#include <folly/lang/Assume.h>
#include <folly/lang/Exception.h>
#include <folly/synchronization/MicroSpinLock.h>
......@@ -212,7 +211,8 @@ class Core final {
/// May call from any thread
bool hasCallback() const noexcept {
constexpr auto allowed = State::OnlyCallback | State::Done;
auto const ans = State() != (fsm_.getState() & allowed);
auto const state = state_.load(std::memory_order_acquire);
auto const ans = State() != (state & allowed);
assert(!ans || !!callback_); // callback_ must exist in this state
return ans;
}
......@@ -224,7 +224,8 @@ class Core final {
/// Identical to `this->ready()`
bool hasResult() const noexcept {
constexpr auto allowed = State::OnlyResult | State::Done;
auto const ans = State() != (fsm_.getState() & allowed);
auto const state = state_.load(std::memory_order_acquire);
auto const ans = State() != (state & allowed);
assert(!ans || !!result_); // result_ must exist if hasResult() is true
return ans;
}
......@@ -273,24 +274,32 @@ class Core final {
/// executor or if the executor is inline).
template <typename F>
void setCallback(F&& func) {
auto setCallback_ = [&]{
context_ = RequestContext::saveContext();
callback_ = std::forward<F>(func);
};
context_ = RequestContext::saveContext();
callback_ = std::forward<F>(func);
fsm_.transition([&](State state) {
auto state = state_.load(std::memory_order_acquire);
while (true) {
switch (state) {
case State::Start:
return fsm_.tryUpdateState(state, State::OnlyCallback, setCallback_);
if (state_.compare_exchange_strong(
state, State::OnlyCallback, std::memory_order_release)) {
return;
}
assume(state == State::OnlyResult);
FOLLY_FALLTHROUGH;
case State::OnlyResult:
return fsm_.tryUpdateState(
state, State::Done, setCallback_, [&] { doCallback(); });
if (state_.compare_exchange_strong(
state, State::Done, std::memory_order_release)) {
doCallback();
return;
}
FOLLY_FALLTHROUGH;
default:
terminate_with<std::logic_error>("setCallback unexpected state");
}
});
}
}
/// Call only from producer thread.
......@@ -302,20 +311,31 @@ class Core final {
/// and might also synchronously execute that callback (e.g., if there is no
/// executor or if the executor is inline).
void setResult(Try<T>&& t) {
auto setResult_ = [&]{ result_ = std::move(t); };
fsm_.transition([&](State state) {
result_ = std::move(t);
auto state = state_.load(std::memory_order_acquire);
while (true) {
switch (state) {
case State::Start:
return fsm_.tryUpdateState(state, State::OnlyResult, setResult_);
if (state_.compare_exchange_strong(
state, State::OnlyResult, std::memory_order_release)) {
return;
}
assume(state == State::OnlyCallback);
FOLLY_FALLTHROUGH;
case State::OnlyCallback:
return fsm_.tryUpdateState(
state, State::Done, setResult_, [&] { doCallback(); });
if (state_.compare_exchange_strong(
state, State::Done, std::memory_order_release)) {
doCallback();
return;
}
FOLLY_FALLTHROUGH;
default:
terminate_with<std::logic_error>("setResult unexpected state");
}
});
}
}
/// Called by a destructing Future (in the consumer thread, by definition).
......@@ -339,7 +359,7 @@ class Core final {
void setExecutor(
Executor::KeepAlive<> x,
int8_t priority = Executor::MID_PRI) {
DCHECK(fsm_.getState() != State::OnlyCallback);
DCHECK(state_ != State::OnlyCallback);
executor_ = std::move(x);
priority_ = priority;
}
......@@ -409,16 +429,16 @@ class Core final {
}
private:
Core() : result_(), fsm_(State::Start), attached_(2) {}
Core() : result_(), state_(State::Start), attached_(2) {}
explicit Core(Try<T>&& t)
: result_(std::move(t)), fsm_(State::OnlyResult), attached_(1) {}
: result_(std::move(t)), state_(State::OnlyResult), attached_(1) {}
template <typename... Args>
explicit Core(in_place_t, Args&&... args) noexcept(
std::is_nothrow_constructible<T, Args&&...>::value)
: result_(in_place, in_place, std::forward<Args>(args)...),
fsm_(State::OnlyResult),
state_(State::OnlyResult),
attached_(1) {}
~Core() {
......@@ -461,7 +481,7 @@ class Core final {
// May be called at most once.
void doCallback() {
DCHECK(fsm_.getState() == State::Done);
DCHECK(state_ == State::Done);
auto x = exchange(executor_, Executor::KeepAlive<>());
int8_t priority = priority_;
......@@ -546,7 +566,7 @@ class Core final {
// place result_ next to increase the likelihood that the value will be
// contained entirely in one cache line
folly::Optional<Try<T>> result_;
FSM<State, SpinLock> fsm_;
std::atomic<State> state_;
std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_{0};
std::atomic<bool> interruptHandlerSet_ {false};
......
/*
* Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <mutex>
#include <folly/synchronization/MicroSpinLock.h>
namespace folly {
namespace futures {
namespace detail {
/// Finite State Machine helper base class.
/// Inherit from this.
/// For best results, use an "enum class" for Enum.
template <class Enum, class Mutex>
class FSM {
private:
Mutex mutex_;
// This might not be necessary for all Enum types, e.g. anything
// that is atomically updated in practice on this CPU and there's no risk
// of returning a bogus state because of tearing.
// An optimization would be to use a static conditional on the Enum type.
std::atomic<Enum> state_;
public:
explicit FSM(Enum startState) : state_(startState) {}
Enum getState() const noexcept {
return state_.load(std::memory_order_acquire);
}
/// Atomically do a state transition with accompanying action.
/// The action will see the old state.
/// @returns true on success, false and action unexecuted otherwise
template <class F>
bool tryUpdateState(Enum A, Enum B, F const& action) {
std::lock_guard<Mutex> lock(mutex_);
if (state_.load(std::memory_order_acquire) != A) {
return false;
}
action();
state_.store(B, std::memory_order_release);
return true;
}
/// Atomically do a state transition with accompanying action. Then do the
/// unprotected action without holding the lock. If the atomic transition
/// fails, returns false and neither action was executed.
///
/// This facilitates code like this:
/// bool done = false;
/// while (!done) {
/// switch (getState()) {
/// case State::Foo:
/// done = tryUpdateState(State::Foo, State::Bar,
/// [&]{ /* do protected stuff */ },
/// [&]{ /* do unprotected stuff */});
/// break;
///
/// Which reads nicer than code like this:
/// while (true) {
/// switch (getState()) {
/// case State::Foo:
/// if (!tryUpdateState(State::Foo, State::Bar,
/// [&]{ /* do protected stuff */ })) {
/// continue;
/// }
/// /* do unprotected stuff */
/// return; // or otherwise break out of the loop
///
/// The protected action will see the old state, and the unprotected action
/// will see the new state.
template <class F1, class F2>
bool tryUpdateState(
Enum A,
Enum B,
F1 const& protectedAction,
F2 const& unprotectedAction) {
bool result = tryUpdateState(A, B, protectedAction);
if (result) {
unprotectedAction();
}
return result;
}
template <class F>
void transition(F f) {
while (!f(getState())) {
}
}
};
} // namespace detail
} // namespace futures
} // namespace folly
......@@ -26,8 +26,7 @@ TEST(Core, size) {
typename std::aligned_storage<lambdaBufSize>::type lambdaBuf_;
folly::Optional<Try<Unit>> result_;
folly::Function<void(Try<Unit>&&)> callback_;
futures::detail::FSM<futures::detail::State, futures::detail::SpinLock>
fsm_;
std::atomic<futures::detail::State> state_;
std::atomic<unsigned char> attached_;
std::atomic<bool> interruptHandlerSet_;
futures::detail::SpinLock interruptLock_;
......
/*
* Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/futures/detail/FSM.h>
#include <folly/lang/Assume.h>
#include <folly/portability/GTest.h>
using namespace folly::futures::detail;
enum class State { A, B };
TEST(FSM, example) {
FSM<State, std::mutex> fsm(State::A);
int count = 0;
int unprotectedCount = 0;
// somebody set up us the switch
auto tryTransition = [&]{
switch (fsm.getState()) {
case State::A:
return fsm.tryUpdateState(State::A, State::B, [&] { count++; });
case State::B:
return fsm.tryUpdateState(
State::B, State::A, [&] { count--; }, [&] { unprotectedCount--; });
}
folly::assume_unreachable();
};
// keep retrying until success (like a cas)
while (!tryTransition()) {
;
}
EXPECT_EQ(State::B, fsm.getState());
EXPECT_EQ(1, count);
EXPECT_EQ(0, unprotectedCount);
while (!tryTransition()) {
;
}
EXPECT_EQ(State::A, fsm.getState());
EXPECT_EQ(0, count);
EXPECT_EQ(-1, unprotectedCount);
}
TEST(FSM, transition) {
struct MyFSM {
FSM<State, std::mutex> fsm_;
int count = 0;
int unprotectedCount = 0;
MyFSM() : fsm_(State::A) {}
void twiddle() {
fsm_.transition([&](State state) {
switch (state) {
case State::A:
return fsm_.tryUpdateState(state, State::B, [&] { count++; });
case State::B:
return fsm_.tryUpdateState(
state, State::A, [&] { count--; }, [&] { unprotectedCount--; });
}
folly::assume_unreachable();
});
}
};
MyFSM fsm;
fsm.twiddle();
EXPECT_EQ(State::B, fsm.fsm_.getState());
EXPECT_EQ(1, fsm.count);
EXPECT_EQ(0, fsm.unprotectedCount);
fsm.twiddle();
EXPECT_EQ(State::A, fsm.fsm_.getState());
EXPECT_EQ(0, fsm.count);
EXPECT_EQ(-1, fsm.unprotectedCount);
}
TEST(FSM, ctor) {
FSM<State, std::mutex> fsm(State::A);
EXPECT_EQ(State::A, fsm.getState());
}
TEST(FSM, update) {
FSM<State, std::mutex> fsm(State::A);
EXPECT_TRUE(fsm.tryUpdateState(State::A, State::B, [] {}));
EXPECT_EQ(State::B, fsm.getState());
}
TEST(FSM, badUpdate) {
FSM<State, std::mutex> fsm(State::A);
EXPECT_FALSE(fsm.tryUpdateState(State::B, State::A, [] {}));
}
TEST(FSM, actionOnUpdate) {
FSM<State, std::mutex> fsm(State::A);
int count = 0;
fsm.tryUpdateState(State::A, State::B, [&] { count++; });
EXPECT_EQ(1, count);
}
TEST(FSM, noActionOnBadUpdate) {
FSM<State, std::mutex> fsm(State::A);
int count = 0;
fsm.tryUpdateState(State::B, State::A, [&] { count++; });
EXPECT_EQ(0, count);
}
TEST(FSM, stateTransitionAfterAction) {
FSM<State, std::mutex> fsm(State::A);
fsm.tryUpdateState(
State::A, State::B, [&] { EXPECT_EQ(State::A, fsm.getState()); });
}
......@@ -317,7 +317,6 @@ futures_test_SOURCES = \
../futures/test/ConversionOperatorTest.cpp \
../futures/test/CoreTest.cpp \
../futures/test/EnsureTest.cpp \
../futures/test/FSMTest.cpp \
../futures/test/FilterTest.cpp \
../futures/test/FutureTest.cpp \
../futures/test/HeaderCompileTest.cpp \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment