Commit 23cf10c5 authored by Hans Fugal's avatar Hans Fugal Committed by dcsommer

Revert "(wangle) express current Core functionality with a state machine"

Summary:

Test Plan:

Reviewed By: harishs@fb.com

Subscribers: net-systems@, fugalh, exa, njormrod, folly-diffs@

FB internal diff: D1633874

Tasks: 5438209
parent f85b2abe
......@@ -206,11 +206,6 @@ bool Future<T>::isReady() const {
return core_->ready();
}
template <class T>
void Future<T>::raise(std::exception_ptr exception) {
core_->raise(exception);
}
// makeFuture
template <class T>
......
......@@ -214,25 +214,6 @@ class Future {
return core_->isActive();
}
template <class E>
void raise(E&& exception) {
raise(std::make_exception_ptr(std::forward<E>(exception)));
}
/// Raise an interrupt. If the promise holder has an interrupt
/// handler it will be called and potentially stop asynchronous work from
/// being done. This is advisory only - a promise holder may not set an
/// interrupt handler, or may do anything including ignore. But, if you know
/// your future supports this the most likely result is stopping or
/// preventing the asynchronous operation (if in time), and the promise
/// holder setting an exception on the future. (That may happen
/// asynchronously, of course.)
void raise(std::exception_ptr interrupt);
void cancel() {
raise(FutureCancellation());
}
private:
typedef detail::Core<T>* corePtr;
......
......@@ -88,12 +88,6 @@ void Promise<T>::setException(std::exception_ptr const& e) {
core_->setResult(Try<T>(e));
}
template <class T>
void Promise<T>::setInterruptHandler(
std::function<void(std::exception_ptr const&)> fn) {
core_->setInterruptHandler(std::move(fn));
}
template <class T>
void Promise<T>::fulfilTry(Try<T>&& t) {
throwIfFulfilled();
......
......@@ -56,13 +56,6 @@ public:
*/
template <class E> void setException(E const&);
/// Set an interrupt handler to handle interrupts. See the documentation for
/// Future::raise(). Your handler can do whatever it wants, but if you
/// bother to set one then you probably will want to fulfil the promise with
/// an exception (or special value) indicating how the interrupt was
/// handled.
void setInterruptHandler(std::function<void(std::exception_ptr const&)>);
/** Fulfil this Promise (only for Promise<void>) */
void setValue();
......
......@@ -81,9 +81,4 @@ class UsingUninitializedTry : public WangleException {
WangleException("Using unitialized try") { }
};
class FutureCancellation : public WangleException {
public:
FutureCancellation() : WangleException("Future was cancelled") {}
};
}}
......@@ -28,7 +28,6 @@
#include <folly/wangle/Promise.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Executor.h>
#include <folly/wangle/detail/FSM.h>
namespace folly { namespace wangle { namespace detail {
......@@ -37,21 +36,14 @@ namespace folly { namespace wangle { namespace detail {
template<typename T>
void empty_callback(Try<T>&&) { }
enum class State {
Waiting,
Interruptible,
Interrupted,
Done,
};
/** The shared state object for Future and Promise. */
template<typename T>
class Core : protected FSM<State> {
class Core {
public:
// This must be heap-constructed. There's probably a way to enforce that in
// code but since this is just internal detail code and I don't know how
// off-hand, I'm punting.
Core() : FSM<State>(State::Waiting) {}
Core() = default;
~Core() {
assert(calledBack_);
assert(detached_ == 2);
......@@ -75,46 +67,36 @@ class Core : protected FSM<State> {
template <typename F>
void setCallback(F func) {
auto setCallback_ = [&]{
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (callback_) {
throw std::logic_error("setCallback called twice");
}
callback_ = std::move(func);
};
FSM_START
case State::Waiting:
case State::Interruptible:
case State::Interrupted:
FSM_UPDATE(state, setCallback_);
break;
case State::Done:
FSM_UPDATE2(State::Done,
setCallback_,
[&]{ maybeCallback(); });
break;
FSM_END
}
maybeCallback();
}
void setResult(Try<T>&& t) {
FSM_START
case State::Waiting:
case State::Interruptible:
case State::Interrupted:
FSM_UPDATE2(State::Done,
[&]{ result_ = std::move(t); },
[&]{ maybeCallback(); });
break;
case State::Done:
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (ready()) {
throw std::logic_error("setResult called twice");
FSM_END
}
result_ = std::move(t);
assert(ready());
}
maybeCallback();
}
bool ready() const {
return getState() == State::Done;
return result_.hasValue();
}
// Called by a destructing Future
......@@ -135,79 +117,54 @@ class Core : protected FSM<State> {
}
void deactivate() {
std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = false;
}
void activate() {
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
active_ = true;
if (ready()) {
maybeCallback();
}
maybeCallback();
}
bool isActive() { return active_; }
void setExecutor(Executor* x) {
std::lock_guard<decltype(mutex_)> lock(mutex_);
executor_ = x;
}
void raise(std::exception_ptr const& e) {
FSM_START
case State::Interruptible:
FSM_UPDATE2(State::Interrupted,
[&]{ interrupt_ = e; },
[&]{ interruptHandler_(interrupt_); });
break;
case State::Waiting:
case State::Interrupted:
FSM_UPDATE(State::Interrupted,
[&]{ interrupt_ = e; });
break;
case State::Done:
FSM_BREAK
FSM_END
}
void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
FSM_START
case State::Waiting:
case State::Interruptible:
FSM_UPDATE(State::Interruptible,
[&]{ interruptHandler_ = std::move(fn); });
break;
case State::Interrupted:
fn(interrupt_);
FSM_BREAK
case State::Done:
FSM_BREAK
FSM_END
}
private:
void maybeCallback() {
assert(ready());
if (!calledBack_ && isActive() && callback_) {
// TODO(5306911) we should probably try/catch
calledBack_ = true;
Executor* x = executor_;
if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
std::unique_lock<decltype(mutex_)> lock(mutex_);
if (!calledBack_ &&
result_ && callback_ && isActive()) {
// TODO(5306911) we should probably try/catch here
if (executor_) {
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
calledBack_ = true;
} else {
calledBack_ = true;
lock.unlock();
callback_(std::move(*result_));
}
}
}
void detachOne() {
++detached_;
bool shouldDelete;
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
detached_++;
assert(detached_ == 1 || detached_ == 2);
if (detached_ == 2) {
shouldDelete = (detached_ == 2);
}
if (shouldDelete) {
// we should have already executed the callback with the value
assert(calledBack_);
delete this;
......@@ -216,12 +173,15 @@ class Core : protected FSM<State> {
folly::Optional<Try<T>> result_;
std::function<void(Try<T>&&)> callback_;
std::atomic<bool> calledBack_ {false};
std::atomic<unsigned char> detached_ {0};
std::atomic<bool> active_ {true};
std::atomic<Executor*> executor_ {nullptr};
std::exception_ptr interrupt_;
std::function<void(std::exception_ptr const&)> interruptHandler_;
bool calledBack_ = false;
unsigned char detached_ = 0;
bool active_ = true;
Executor* executor_ = nullptr;
// this lock isn't meant to protect all accesses to members, only the ones
// that need to be threadsafe: the act of setting result_ and callback_, and
// seeing if they are set and whether we should then continue.
folly::MicroSpinLock mutex_ {0};
};
template <typename... Ts>
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Promise.h>
using namespace folly::wangle;
TEST(Interrupts, raise) {
std::runtime_error eggs("eggs");
Promise<void> p;
p.setInterruptHandler([&](std::exception_ptr e) {
EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
});
p.getFuture().raise(eggs);
}
TEST(Interrupts, cancel) {
Promise<void> p;
p.setInterruptHandler([&](std::exception_ptr e) {
EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
});
p.getFuture().cancel();
}
TEST(Interrupts, handleThenInterrupt) {
Promise<int> p;
bool flag = false;
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
p.getFuture().cancel();
EXPECT_TRUE(flag);
}
TEST(Interrupts, interruptThenHandle) {
Promise<int> p;
bool flag = false;
p.getFuture().cancel();
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
EXPECT_TRUE(flag);
}
TEST(Interrupts, interruptAfterFulfilNoop) {
Promise<void> p;
bool flag = false;
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
p.setValue();
p.getFuture().cancel();
EXPECT_FALSE(flag);
}
TEST(Interrupts, secondInterruptNoop) {
Promise<void> p;
int count = 0;
p.setInterruptHandler([&](std::exception_ptr e) { count++; });
auto f = p.getFuture();
f.cancel();
f.cancel();
EXPECT_EQ(1, count);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment