Commit 7cfb9591 authored by Tristan Rice's avatar Tristan Rice Committed by Facebook Github Bot

folly: move SemiFuture::within to be with the other within methods

Summary: This moves within to be in a more logical spot. In the parent diff we kept it where it was to make it easier to see the differences.

Reviewed By: LeeHowes

Differential Revision: D16967106

fbshipit-source-id: 9f9cd2ea38a996ac12d3ca8256c1ec050d57dfc0
parent 4594bbdf
...@@ -493,86 +493,6 @@ FutureBase<T>::thenImplementation( ...@@ -493,86 +493,6 @@ FutureBase<T>::thenImplementation(
return f; return f;
} }
} // namespace detail
} // namespace futures
template <class T>
template <typename E>
SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
if (this->isReady()) {
return std::move(*this);
}
struct Context {
explicit Context(E ex) : exception(std::move(ex)) {}
E exception;
SemiFuture<Unit> thisFuture;
Future<Unit> afterFuture;
Promise<T> promise;
std::atomic<bool> token{false};
};
std::shared_ptr<Timekeeper> tks;
if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
tk = tks.get();
}
if (UNLIKELY(!tk)) {
return makeSemiFuture<T>(FutureNoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
ctx->thisFuture = std::move(*this).defer([ctx](Try<T>&& t) {
if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
ctx->promise.setTry(std::move(t));
ctx->afterFuture.cancel();
}
});
// Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished.
ctx->afterFuture = tk->after(dur).thenTry(
[weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
if (t.hasException() &&
t.exception().is_compatible_with<FutureCancellation>()) {
// This got cancelled by thisFuture so we can just return.
return;
}
auto lockedCtx = weakCtx.lock();
if (!lockedCtx) {
// ctx already released. "this" completed first, cancel "after"
return;
}
// "after" completed first, cancel "this"
lockedCtx->thisFuture.raise(FutureTimeout());
if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
if (t.hasException()) {
lockedCtx->promise.setException(std::move(t.exception()));
} else {
lockedCtx->promise.setException(std::move(lockedCtx->exception));
}
}
});
// Properly propagate interrupt values through futures chained after within()
ctx->promise.setInterruptHandler(
[weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
if (auto lockedCtx = weakCtx.lock()) {
lockedCtx->thisFuture.raise(ex);
}
});
auto fut = ctx->promise.getSemiFuture();
fut.setExecutor(ctx->thisFuture.stealDeferredExecutor());
return std::move(fut);
}
namespace futures {
namespace detail {
class WaitExecutor final : public folly::Executor { class WaitExecutor final : public folly::Executor {
public: public:
void add(Func func) override { void add(Func func) override {
...@@ -2100,6 +2020,80 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) && { ...@@ -2100,6 +2020,80 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) && {
.via(exe ? exe : &InlineExecutor::instance()); .via(exe ? exe : &InlineExecutor::instance());
} }
template <class T>
template <typename E>
SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
if (this->isReady()) {
return std::move(*this);
}
struct Context {
explicit Context(E ex) : exception(std::move(ex)) {}
E exception;
SemiFuture<Unit> thisFuture;
Future<Unit> afterFuture;
Promise<T> promise;
std::atomic<bool> token{false};
};
std::shared_ptr<Timekeeper> tks;
if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
tk = tks.get();
}
if (UNLIKELY(!tk)) {
return makeSemiFuture<T>(FutureNoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
ctx->thisFuture = std::move(*this).defer([ctx](Try<T>&& t) {
if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
ctx->promise.setTry(std::move(t));
ctx->afterFuture.cancel();
}
});
// Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished.
ctx->afterFuture = tk->after(dur).thenTry(
[weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
if (t.hasException() &&
t.exception().is_compatible_with<FutureCancellation>()) {
// This got cancelled by thisFuture so we can just return.
return;
}
auto lockedCtx = weakCtx.lock();
if (!lockedCtx) {
// ctx already released. "this" completed first, cancel "after"
return;
}
// "after" completed first, cancel "this"
lockedCtx->thisFuture.raise(FutureTimeout());
if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
if (t.hasException()) {
lockedCtx->promise.setException(std::move(t.exception()));
} else {
lockedCtx->promise.setException(std::move(lockedCtx->exception));
}
}
});
// Properly propagate interrupt values through futures chained after within()
ctx->promise.setInterruptHandler(
[weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
if (auto lockedCtx = weakCtx.lock()) {
lockedCtx->thisFuture.raise(ex);
}
});
auto fut = ctx->promise.getSemiFuture();
fut.setExecutor(ctx->thisFuture.stealDeferredExecutor());
return std::move(fut);
}
// delayed // delayed
template <class T> template <class T>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment