Commit 2b1e8dc5 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Add support for returning SemiFuture/Future from defer functor

Summary: Because DeferredExecutor now acts as a simple proxy to user executor, we can actually support multi-hop SemiFutures.

Reviewed By: LeeHowes

Differential Revision: D9928793

fbshipit-source-id: c7e030bac3582b2931bc9639c766a4398ce23a76
parent c09be5f4
......@@ -525,7 +525,6 @@ class DeferredExecutor final : public Executor {
}
void setExecutor(folly::Executor::KeepAlive<> executor) {
DCHECK(!dynamic_cast<DeferredExecutor*>(executor.get()));
if (nestedExecutors_) {
auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
for (auto& nestedExecutor : *nestedExecutors) {
......@@ -883,9 +882,6 @@ template <class T>
template <typename F>
SemiFuture<typename futures::detail::tryCallableResult<T, F>::value_type>
SemiFuture<T>::defer(F&& func) && {
static_assert(
!futures::detail::tryCallableResult<T, F>::ReturnsFuture::value,
"defer does not support Future unwrapping");
DeferredExecutor* deferredExecutor = getDeferredExecutor();
if (!deferredExecutor) {
auto newDeferredExecutor = DeferredExecutor::create();
......@@ -905,9 +901,6 @@ template <class T>
template <typename F>
SemiFuture<typename futures::detail::valueCallableResult<T, F>::value_type>
SemiFuture<T>::deferValue(F&& func) && {
static_assert(
!futures::detail::valueCallableResult<T, F>::ReturnsFuture::value,
"deferValue does not support Future unwrapping");
return std::move(*this).defer([f = std::forward<F>(func)](
folly::Try<T>&& t) mutable {
return std::forward<F>(f)(
......@@ -920,17 +913,13 @@ SemiFuture<T>::deferValue(F&& func) && {
template <class T>
template <class ExceptionType, class F>
SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
static_assert(
!isFutureOrSemiFuture<decltype(
std::forward<F>(func)(std::declval<ExceptionType&>()))>::value,
"deferError does not support Future unwrapping");
return std::move(*this).defer(
[func = std::forward<F>(func)](Try<T>&& t) mutable {
if (auto e = t.template tryGetExceptionObject<ExceptionType>()) {
return Try<T>(makeTryWith([&] { return std::forward<F>(func)(*e); }))
.value();
return makeSemiFutureWith(
[&]() mutable { return std::forward<F>(func)(*e); });
} else {
return std::move(*t);
return makeSemiFuture<T>(std::move(t));
}
});
}
......@@ -938,19 +927,14 @@ SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
template <class T>
template <class F>
SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
static_assert(
!isFutureOrSemiFuture<decltype(
std::forward<F>(func)(std::declval<exception_wrapper&>()))>::value,
"deferError does not support Future unwrapping");
return std::move(*this).defer(
[func = std::forward<F>(func)](Try<T> t) mutable {
if (t.hasException()) {
return Try<T>(makeTryWith([&] {
return std::forward<F>(func)(std::move(t.exception()));
}))
.value();
return makeSemiFutureWith([&]() mutable {
return std::forward<F>(func)(std::move(t.exception()));
});
} else {
return std::move(*t);
return makeSemiFuture<T>(std::move(t));
}
});
}
......@@ -961,7 +945,7 @@ SemiFuture<T> SemiFuture<T>::delayed(Duration dur, Timekeeper* tk) && {
.toUnsafeFuture()
.thenValue([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(*t));
return makeFuture<T>(std::move(t));
});
}
......@@ -2104,9 +2088,11 @@ void waitViaImpl(
template <class T>
SemiFuture<T>& SemiFuture<T>::wait() & {
if (auto deferredExecutor = getDeferredExecutor()) {
// Make sure that the last callback in the future chain will be run on the
// WaitExecutor.
setCallback_([](auto&&) {});
auto waitExecutor = futures::detail::WaitExecutor::create();
deferredExecutor->setExecutor(waitExecutor.copy());
this->core_->setExecutor(nullptr);
while (!isReady()) {
waitExecutor->drive();
}
......@@ -2125,10 +2111,12 @@ SemiFuture<T>&& SemiFuture<T>::wait() && {
template <class T>
SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
if (auto deferredExecutor = getDeferredExecutor()) {
// Make sure that the last callback in the future chain will be run on the
// WaitExecutor.
setCallback_([](auto&&) {});
auto waitExecutor = futures::detail::WaitExecutor::create();
auto deadline = futures::detail::WaitExecutor::Clock::now() + dur;
deferredExecutor->setExecutor(waitExecutor.copy());
this->core_->setExecutor(nullptr);
while (!isReady()) {
if (!waitExecutor->driveUntil(deadline)) {
break;
......@@ -2169,10 +2157,6 @@ Try<T> SemiFuture<T>::getTry() && {
template <class T>
Try<T> SemiFuture<T>::getTry(Duration dur) && {
wait(dur);
if (auto deferredExecutor = getDeferredExecutor()) {
deferredExecutor->detach();
}
this->core_->setExecutor(nullptr);
auto future = folly::Future<T>(this->core_);
this->core_ = nullptr;
......
......@@ -1050,3 +1050,16 @@ TEST(SemiFuture, collectAllSemiFutureDeferredWork) {
EXPECT_TRUE(deferredDestroyed);
}
}
TEST(SemiFuture, DeferWithNestedSemiFuture) {
auto start = std::chrono::steady_clock::now();
auto future = futures::sleep(std::chrono::milliseconds{100})
.semi()
.deferValue([](auto&&) {
return futures::sleep(std::chrono::milliseconds{200});
});
future.wait();
EXPECT_TRUE(future.hasValue());
EXPECT_GE(
std::chrono::steady_clock::now() - start, std::chrono::milliseconds{300});
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment