Commit 4594bbdf authored by Tristan Rice's avatar Tristan Rice Committed by Facebook Github Bot

folly: make within be a SemiFuture and fix deadlock bug

Summary:
This fixes a deadlock bug where the after continuation runs inline.

We resolve this by using defer to delay execution until both futures are created thus avoiding any extra synchronization.

Reviewed By: LeeHowes

Differential Revision: D16952748

fbshipit-source-id: a88a40d3135bdabc43bb51f9b5ea072f8e6564a3
parent 33fb91d9
......@@ -493,22 +493,23 @@ FutureBase<T>::thenImplementation(
return f;
}
} // namespace detail
} // namespace futures
template <class T>
template <typename E>
SemiFuture<T>
FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
if (this->isReady()) {
return std::move(*this);
}
struct Context {
explicit Context(E ex) : exception(std::move(ex)) {}
E exception;
Future<Unit> thisFuture;
SemiFuture<Unit> thisFuture;
Future<Unit> afterFuture;
Promise<T> promise;
std::atomic<bool> token{false};
// The baton is required since the continuation in thisFuture and
// afterFuture need to access each other.
// Use a spin lock baton since this should be fulfilled almost immediately.
Baton</*MayBlock=*/false> thisFutureSet;
Future<Unit> afterFuture;
};
std::shared_ptr<Timekeeper> tks;
......@@ -523,6 +524,13 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
auto ctx = std::make_shared<Context>(std::move(e));
ctx->thisFuture = std::move(*this).defer([ctx](Try<T>&& t) {
if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
ctx->promise.setTry(std::move(t));
ctx->afterFuture.cancel();
}
});
// Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished.
ctx->afterFuture = tk->after(dur).thenTry(
......@@ -539,7 +547,6 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
return;
}
// "after" completed first, cancel "this"
lockedCtx->thisFutureSet.wait();
lockedCtx->thisFuture.raise(FutureTimeout());
if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
if (t.hasException()) {
......@@ -550,17 +557,6 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
}
});
auto f = [ctx](Executor::KeepAlive<>&&, Try<T>&& t) {
if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
ctx->promise.setTry(std::move(t));
ctx->afterFuture.cancel();
}
};
using R = futures::detail::tryExecutorCallableResult<T, decltype(f)>;
ctx->thisFuture = this->thenImplementation(
std::move(f), R{}, futures::detail::InlineContinuation::forbid);
ctx->thisFutureSet.post();
// Properly propagate interrupt values through futures chained after within()
ctx->promise.setInterruptHandler(
[weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
......@@ -569,9 +565,14 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
}
});
return ctx->promise.getSemiFuture();
auto fut = ctx->promise.getSemiFuture();
fut.setExecutor(ctx->thisFuture.stealDeferredExecutor());
return std::move(fut);
}
namespace futures {
namespace detail {
class WaitExecutor final : public folly::Executor {
public:
void add(Func func) override {
......@@ -2094,7 +2095,8 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) && {
auto* exe = this->getExecutor();
return std::move(*this)
.withinImplementation(dur, e, tk)
.semi()
.within(dur, e, tk)
.via(exe ? exe : &InlineExecutor::instance());
}
......
......@@ -430,9 +430,6 @@ class FutureBase {
template <typename F, typename R>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
thenImplementation(F&& func, R, InlineContinuation);
template <typename E>
SemiFuture<T> withinImplementation(Duration dur, E e, Timekeeper* tk) &&;
};
template <class T>
Future<T> convertFuture(SemiFuture<T>&& sf, const Future<T>& f);
......@@ -799,21 +796,7 @@ class SemiFuture : private futures::detail::FutureBase<T> {
}
template <class E>
SemiFuture<T> within(Duration dur, E e, Timekeeper* tk = nullptr) && {
if (this->isReady()) {
return std::move(*this);
}
auto deferredExecutor = stealDeferredExecutor();
auto ret = std::move(*this).withinImplementation(dur, e, tk);
if (deferredExecutor) {
ret =
std::move(ret).defer([](Try<T>&& t) { return std::move(t).value(); });
std::vector<futures::detail::DeferredWrapper> des;
des.push_back(std::move(deferredExecutor));
ret.getDeferredExecutor()->setNestedExecutors(std::move(des));
}
return ret;
}
SemiFuture<T> within(Duration dur, E e, Timekeeper* tk = nullptr) &&;
/// Delay the completion of this SemiFuture for at least this duration from
/// now. The optional Timekeeper is as with futures::sleep().
......
......@@ -138,9 +138,43 @@ TEST(Timekeeper, semiFutureWithinCancelsTimeout) {
Promise<int> p;
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
p.setValue(1);
f.wait();
EXPECT_TRUE(tk.cancelled_);
}
TEST(Timekeeper, semiFutureWithinInlineAfter) {
struct MockTimekeeper : Timekeeper {
Future<Unit> after(Duration) override {
return folly::makeFuture<folly::Unit>(folly::FutureNoTimekeeper());
}
};
MockTimekeeper tk;
Promise<int> p;
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
EXPECT_THROW(std::move(f).get(), folly::FutureNoTimekeeper);
}
TEST(Timekeeper, semiFutureWithinReady) {
struct MockTimekeeper : Timekeeper {
Future<Unit> after(Duration) override {
called_ = true;
return folly::makeFuture<folly::Unit>(folly::FutureNoTimekeeper());
}
bool called_{false};
};
MockTimekeeper tk;
Promise<int> p;
p.setValue(1);
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
f.wait();
EXPECT_FALSE(tk.called_);
}
TEST(Timekeeper, futureDelayed) {
auto t1 = now();
auto dur = makeFuture()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment