Commit 831405dc authored by Hans Fugal's avatar Hans Fugal Committed by dcsommer

(wangle) Interrupts (and therefore, cancellation)

Summary:
Modeled very closely after Finagle's interrupts. Compare with https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala if you like.
The basic idea is the promise holder can register an interrupt handler, and then interrupts will call that handler. A typical handler would fulfil the promise with an exception (or special value) indicating that it was interrupted (if it was interrupted in time).
Raising an interrupt does not prevent setting a value or callbacks executing or any of that - it is only advisory to the promise holder.

Test Plan: I wrote some unit tests.

Reviewed By: davejwatson@fb.com

Subscribers: folly-diffs@, net-systems@, fugalh, exa, hannesr, njormrod

FB internal diff: D1620805

Tasks: 4618297
parent 1be516d1
...@@ -206,6 +206,11 @@ bool Future<T>::isReady() const { ...@@ -206,6 +206,11 @@ bool Future<T>::isReady() const {
return core_->ready(); return core_->ready();
} }
template <class T>
void Future<T>::raise(std::exception_ptr exception) {
core_->raise(exception);
}
// makeFuture // makeFuture
template <class T> template <class T>
......
...@@ -208,6 +208,25 @@ class Future { ...@@ -208,6 +208,25 @@ class Future {
return core_->isActive(); return core_->isActive();
} }
template <class E>
void raise(E&& exception) {
raise(std::make_exception_ptr(std::forward<E>(exception)));
}
/// Raise an interrupt. If the promise holder has an interrupt
/// handler it will be called and potentially stop asynchronous work from
/// being done. This is advisory only - a promise holder may not set an
/// interrupt handler, or may do anything including ignore. But, if you know
/// your future supports this the most likely result is stopping or
/// preventing the asynchronous operation (if in time), and the promise
/// holder setting an exception on the future. (That may happen
/// asynchronously, of course.)
void raise(std::exception_ptr interrupt);
void cancel() {
raise(FutureCancellation());
}
private: private:
typedef detail::Core<T>* corePtr; typedef detail::Core<T>* corePtr;
......
...@@ -88,6 +88,12 @@ void Promise<T>::setException(std::exception_ptr const& e) { ...@@ -88,6 +88,12 @@ void Promise<T>::setException(std::exception_ptr const& e) {
core_->setResult(Try<T>(e)); core_->setResult(Try<T>(e));
} }
template <class T>
void Promise<T>::setInterruptHandler(
std::function<void(std::exception_ptr const&)> fn) {
core_->setInterruptHandler(std::move(fn));
}
template <class T> template <class T>
void Promise<T>::fulfilTry(Try<T>&& t) { void Promise<T>::fulfilTry(Try<T>&& t) {
throwIfFulfilled(); throwIfFulfilled();
......
...@@ -57,6 +57,13 @@ public: ...@@ -57,6 +57,13 @@ public:
*/ */
template <class E> void setException(E const&); template <class E> void setException(E const&);
/// Set an interrupt handler to handle interrupts. See the documentation for
/// Future::raise(). Your handler can do whatever it wants, but if you
/// bother to set one then you probably will want to fulfil the promise with
/// an exception (or special value) indicating how the interrupt was
/// handled.
void setInterruptHandler(std::function<void(std::exception_ptr const&)>);
/** Fulfil this Promise (only for Promise<void>) */ /** Fulfil this Promise (only for Promise<void>) */
void setValue(); void setValue();
......
...@@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException { ...@@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException {
WangleException("Using unitialized try") { } WangleException("Using unitialized try") { }
}; };
class FutureCancellation : public WangleException {
public:
FutureCancellation() : WangleException("Future was cancelled") {}
};
}} }}
...@@ -39,6 +39,8 @@ void empty_callback(Try<T>&&) { } ...@@ -39,6 +39,8 @@ void empty_callback(Try<T>&&) { }
enum class State { enum class State {
Waiting, Waiting,
Interruptible,
Interrupted,
Done, Done,
}; };
...@@ -81,36 +83,34 @@ class Core : protected FSM<State> { ...@@ -81,36 +83,34 @@ class Core : protected FSM<State> {
callback_ = std::move(func); callback_ = std::move(func);
}; };
bool done = false; FSM_START
while (!done) {
switch (getState()) {
case State::Waiting: case State::Waiting:
done = updateState(State::Waiting, State::Waiting, setCallback_); case State::Interruptible:
case State::Interrupted:
FSM_UPDATE(state, setCallback_);
break; break;
case State::Done: case State::Done:
done = updateState(State::Done, State::Done, FSM_UPDATE2(State::Done,
setCallback_, setCallback_,
[&]{ maybeCallback(); }); [&]{ maybeCallback(); });
break; break;
} FSM_END
}
} }
void setResult(Try<T>&& t) { void setResult(Try<T>&& t) {
bool done = false; FSM_START
while (!done) {
switch (getState()) {
case State::Waiting: case State::Waiting:
done = updateState(State::Waiting, State::Done, case State::Interruptible:
case State::Interrupted:
FSM_UPDATE2(State::Done,
[&]{ result_ = std::move(t); }, [&]{ result_ = std::move(t); },
[&]{ maybeCallback(); }); [&]{ maybeCallback(); });
break; break;
case State::Done: case State::Done:
throw std::logic_error("setResult called twice"); throw std::logic_error("setResult called twice");
} FSM_END
}
} }
bool ready() const { bool ready() const {
...@@ -151,6 +151,42 @@ class Core : protected FSM<State> { ...@@ -151,6 +151,42 @@ class Core : protected FSM<State> {
executor_ = x; executor_ = x;
} }
void raise(std::exception_ptr const& e) {
FSM_START
case State::Interruptible:
FSM_UPDATE2(State::Interrupted,
[&]{ interrupt_ = e; },
[&]{ interruptHandler_(interrupt_); });
break;
case State::Waiting:
case State::Interrupted:
FSM_UPDATE(State::Interrupted,
[&]{ interrupt_ = e; });
break;
case State::Done:
FSM_BREAK
FSM_END
}
void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
FSM_START
case State::Waiting:
case State::Interruptible:
FSM_UPDATE(State::Interruptible,
[&]{ interruptHandler_ = std::move(fn); });
break;
case State::Interrupted:
fn(interrupt_);
FSM_BREAK
case State::Done:
FSM_BREAK
FSM_END
}
private: private:
void maybeCallback() { void maybeCallback() {
assert(ready()); assert(ready());
...@@ -183,6 +219,8 @@ class Core : protected FSM<State> { ...@@ -183,6 +219,8 @@ class Core : protected FSM<State> {
std::atomic<unsigned char> detached_ {0}; std::atomic<unsigned char> detached_ {0};
std::atomic<bool> active_ {true}; std::atomic<bool> active_ {true};
std::atomic<Executor*> executor_ {nullptr}; std::atomic<Executor*> executor_ {nullptr};
std::exception_ptr interrupt_;
std::function<void(std::exception_ptr const&)> interruptHandler_;
}; };
template <typename... Ts> template <typename... Ts>
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/Promise.h>
using namespace folly::wangle;
TEST(Interrupts, raise) {
std::runtime_error eggs("eggs");
Promise<void> p;
p.setInterruptHandler([&](std::exception_ptr e) {
EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
});
p.getFuture().raise(eggs);
}
TEST(Interrupts, cancel) {
Promise<void> p;
p.setInterruptHandler([&](std::exception_ptr e) {
EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
});
p.getFuture().cancel();
}
TEST(Interrupts, handleThenInterrupt) {
Promise<int> p;
bool flag = false;
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
p.getFuture().cancel();
EXPECT_TRUE(flag);
}
TEST(Interrupts, interruptThenHandle) {
Promise<int> p;
bool flag = false;
p.getFuture().cancel();
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
EXPECT_TRUE(flag);
}
TEST(Interrupts, interruptAfterFulfilNoop) {
Promise<void> p;
bool flag = false;
p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
p.setValue();
p.getFuture().cancel();
EXPECT_FALSE(flag);
}
TEST(Interrupts, secondInterruptNoop) {
Promise<void> p;
int count = 0;
p.setInterruptHandler([&](std::exception_ptr e) { count++; });
auto f = p.getFuture();
f.cancel();
f.cancel();
EXPECT_EQ(1, count);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment