Commit bba17c09 authored by Michael Park's avatar Michael Park Committed by Facebook Github Bot

Added `SemiFuture::within`

Summary:
Moved `Future::within` to `FutureBase::withinImplementation`, and changed it to return a `SemiFuture`.
`SemiFuture::within` uses it directly, and `Future::within` uses it then attaches the current executor or falls back to the inline executor to preserve existing behavior.

Reviewed By: yfeldblum

Differential Revision: D8269907

fbshipit-source-id: 76e235a2ecb2c648603961d0ac0ac17bf646d027
parent a1dd1ce6
......@@ -417,6 +417,69 @@ FutureBase<T>::thenImplementation(
return f;
}
template <class T>
template <typename E>
SemiFuture<T>
FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) {
struct Context {
explicit Context(E ex) : exception(std::move(ex)) {}
E exception;
Future<Unit> thisFuture;
Promise<T> promise;
std::atomic<bool> token{false};
};
std::shared_ptr<Timekeeper> tks;
if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
tk = tks.get();
}
if (UNLIKELY(!tk)) {
return makeSemiFuture<T>(FutureNoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
auto f = [ctx](Try<T>&& t) {
if (!ctx->token.exchange(true)) {
ctx->promise.setTry(std::move(t));
}
};
using R = futures::detail::callableResult<T, decltype(f)>;
ctx->thisFuture = this->template thenImplementation<decltype(f), R>(
std::move(f), typename R::Arg());
// Properly propagate interrupt values through futures chained after within()
ctx->promise.setInterruptHandler(
[weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
if (auto lockedCtx = weakCtx.lock()) {
lockedCtx->thisFuture.raise(ex);
}
});
// Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished.
tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
auto lockedCtx = weakCtx.lock();
if (!lockedCtx) {
// ctx already released. "this" completed first, cancel "after"
return;
}
// "after" completed first, cancel "this"
lockedCtx->thisFuture.raise(FutureTimeout());
if (!lockedCtx->token.exchange(true)) {
if (t.hasException()) {
lockedCtx->promise.setException(std::move(t.exception()));
} else {
lockedCtx->promise.setException(std::move(lockedCtx->exception));
}
}
});
return ctx->promise.getSemiFuture();
}
/**
* Defer work until executor is actively boosted.
*
......@@ -1669,67 +1732,15 @@ Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
template <class T>
template <class E>
Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
struct Context {
Context(E ex) : exception(std::move(ex)), promise() {}
E exception;
Future<Unit> thisFuture;
Promise<T> promise;
std::atomic<bool> token {false};
};
if (this->isReady()) {
return std::move(*this);
}
std::shared_ptr<Timekeeper> tks;
if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
tk = tks.get();
}
if (UNLIKELY(!tk)) {
return makeFuture<T>(FutureNoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
if (ctx->token.exchange(true) == false) {
ctx->promise.setTry(std::move(t));
}
});
// Properly propagate interrupt values through futures chained after within()
ctx->promise.setInterruptHandler(
[weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
if (auto lockedCtx = weakCtx.lock()) {
lockedCtx->thisFuture.raise(ex);
}
});
// Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished.
tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
auto lockedCtx = weakCtx.lock();
if (!lockedCtx) {
// ctx already released. "this" completed first, cancel "after"
return;
}
// "after" completed first, cancel "this"
lockedCtx->thisFuture.raise(FutureTimeout());
if (lockedCtx->token.exchange(true) == false) {
if (t.hasException()) {
lockedCtx->promise.setException(std::move(t.exception()));
} else {
lockedCtx->promise.setException(std::move(lockedCtx->exception));
}
}
});
auto* currentExecutor = this->getExecutor();
return ctx->promise.getSemiFuture().via(
currentExecutor ? currentExecutor : &folly::InlineExecutor::instance());
return this->withinImplementation(dur, e, tk)
.via(
currentExecutor ? currentExecutor
: &folly::InlineExecutor::instance());
}
// delayed
......
......@@ -286,6 +286,9 @@ class FutureBase {
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
thenImplementation(F&& func, futures::detail::argResult<isTry, F, Args...>);
template <typename E>
SemiFuture<T> withinImplementation(Duration dur, E e, Timekeeper* tk);
};
template <class T>
void convertFuture(SemiFuture<T>&& sf, Future<T>& f);
......@@ -484,6 +487,16 @@ class SemiFuture : private futures::detail::FutureBase<T> {
return std::move(*this).deferError(&func);
}
SemiFuture<T> within(Duration dur, Timekeeper* tk = nullptr) && {
return std::move(*this).within(dur, FutureTimeout(), tk);
}
template <class E>
SemiFuture<T> within(Duration dur, E e, Timekeeper* tk = nullptr) && {
return this->isReady() ? std::move(*this)
: this->withinImplementation(dur, e, tk);
}
/// Return a future that completes inline, as if the future had no executor.
/// Intended for porting legacy code without behavioural change, and for rare
/// cases where this is really the intended behaviour.
......
......@@ -73,7 +73,7 @@ TEST(Interrupt, secondInterruptNoop) {
EXPECT_EQ(1, count);
}
TEST(Interrupt, withinTimedOut) {
TEST(Interrupt, futureWithinTimedOut) {
Promise<int> p;
Baton<> done;
p.setInterruptHandler([&](const exception_wrapper& /* e */) { done.post(); });
......@@ -81,3 +81,12 @@ TEST(Interrupt, withinTimedOut) {
// Give it 100ms to time out and call the interrupt handler
EXPECT_TRUE(done.try_wait_for(std::chrono::milliseconds(100)));
}
TEST(Interrupt, semiFutureWithinTimedOut) {
Promise<int> p;
Baton<> done;
p.setInterruptHandler([&](const exception_wrapper& /* e */) { done.post(); });
p.getSemiFuture().within(std::chrono::milliseconds(1));
// Give it 100ms to time out and call the interrupt handler
EXPECT_TRUE(done.try_wait_for(std::chrono::milliseconds(100)));
}
......@@ -1112,3 +1112,36 @@ TEST(SemiFuture, invokeCallbackReturningFutureWithOriginalCVRef) {
EXPECT_EQ(202, makeSemiFuture<int>(200).deferValue(cfoo).wait().value());
EXPECT_EQ(303, makeSemiFuture<int>(300).deferValue(Foo()).wait().value());
}
TEST(SemiFuture, semiFutureWithinCtxCleanedUpWhenTaskFinishedInTime) {
// Used to track the use_count of callbackInput even outside of its scope
std::weak_ptr<int> target;
{
Promise<std::shared_ptr<int>> promise;
auto input = std::make_shared<int>(1);
auto longEnough = std::chrono::milliseconds(1000);
promise.getSemiFuture()
.within(longEnough)
.toUnsafeFuture()
.then([&target](
folly::Try<std::shared_ptr<int>>&& callbackInput) mutable {
target = callbackInput.value();
});
promise.setValue(input);
}
// After promise's life cycle is finished, make sure no one is holding the
// input anymore, in other words, ctx should have been cleaned up.
EXPECT_EQ(0, target.use_count());
}
TEST(SemiFuture, semiFutureWithinNoValueReferenceWhenTimeOut) {
Promise<std::shared_ptr<int>> promise;
auto veryShort = std::chrono::milliseconds(1);
promise.getSemiFuture().within(veryShort).toUnsafeFuture().then(
[](folly::Try<std::shared_ptr<int>>&& callbackInput) {
// Timeout is fired. Verify callbackInput is not referenced
EXPECT_EQ(0, callbackInput.value().use_count());
});
}
......@@ -99,6 +99,16 @@ TEST(Timekeeper, futureWithinHandlesNullTimekeeperSingleton) {
EXPECT_THROW(f.get(), FutureNoTimekeeper);
}
TEST(Timekeeper, semiFutureWithinHandlesNullTimekeeperSingleton) {
Singleton<ThreadWheelTimekeeper>::make_mock([] { return nullptr; });
SCOPE_EXIT {
Singleton<ThreadWheelTimekeeper>::make_mock();
};
Promise<int> p;
auto f = p.getSemiFuture().within(one_ms);
EXPECT_THROW(std::move(f).get(), FutureNoTimekeeper);
}
TEST(Timekeeper, futureDelayed) {
auto t1 = now();
auto dur =
......@@ -182,6 +192,14 @@ TEST(Timekeeper, futureWithinThrows) {
EXPECT_EQ(-1, f.get());
}
TEST(Timekeeper, semiFutureWithinThrows) {
Promise<int> p;
auto f = p.getSemiFuture().within(one_ms).toUnsafeFuture().onError(
[](FutureTimeout&) { return -1; });
EXPECT_EQ(-1, std::move(f).get());
}
TEST(Timekeeper, futureWithinAlreadyComplete) {
auto f =
makeFuture(42).within(one_ms).onError([&](FutureTimeout&) { return -1; });
......@@ -189,6 +207,13 @@ TEST(Timekeeper, futureWithinAlreadyComplete) {
EXPECT_EQ(42, f.get());
}
TEST(Timekeeper, semiFutureWithinAlreadyComplete) {
auto f = makeSemiFuture(42).within(one_ms).toUnsafeFuture().onError(
[&](FutureTimeout&) { return -1; });
EXPECT_EQ(42, f.get());
}
TEST(Timekeeper, futureWithinFinishesInTime) {
Promise<int> p;
auto f = p.getFuture()
......@@ -199,16 +224,37 @@ TEST(Timekeeper, futureWithinFinishesInTime) {
EXPECT_EQ(42, f.get());
}
TEST(Timekeeper, semiFutureWithinFinishesInTime) {
Promise<int> p;
auto f = p.getSemiFuture()
.within(std::chrono::minutes(1))
.toUnsafeFuture()
.onError([&](FutureTimeout&) { return -1; });
p.setValue(42);
EXPECT_EQ(42, f.get());
}
TEST(Timekeeper, futureWithinVoidSpecialization) {
makeFuture().within(one_ms);
}
TEST(Timekeeper, semiFutureWithinVoidSpecialization) {
makeSemiFuture().within(one_ms);
}
TEST(Timekeeper, futureWithinException) {
Promise<Unit> p;
auto f = p.getFuture().within(awhile, std::runtime_error("expected"));
EXPECT_THROW(f.get(), std::runtime_error);
}
TEST(Timekeeper, semiFutureWithinException) {
Promise<Unit> p;
auto f = p.getSemiFuture().within(awhile, std::runtime_error("expected"));
EXPECT_THROW(std::move(f).get(), std::runtime_error);
}
TEST(Timekeeper, onTimeout) {
bool flag = false;
makeFuture(42)
......@@ -268,7 +314,7 @@ TEST(Timekeeper, chainedInterruptTest) {
EXPECT_FALSE(test);
}
TEST(Timekeeper, withinChainedInterruptTest) {
TEST(Timekeeper, futureWithinChainedInterruptTest) {
bool test = false;
Promise<Unit> p;
p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
......@@ -283,6 +329,21 @@ TEST(Timekeeper, withinChainedInterruptTest) {
EXPECT_TRUE(test);
}
TEST(Timekeeper, semiFutureWithinChainedInterruptTest) {
bool test = false;
Promise<Unit> p;
p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
ex.handle(
[&test](const FutureCancellation& /* cancellation */) { test = true; });
p.setException(ex);
});
auto f = p.getSemiFuture().within(milliseconds(100));
EXPECT_FALSE(test) << "Sanity check";
f.cancel();
f.wait();
EXPECT_TRUE(test);
}
TEST(Timekeeper, executor) {
class ExecutorTester : public Executor {
public:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment