Commit 175f274c authored by Lee Howes's avatar Lee Howes Committed by Facebook GitHub Bot

Make retrying always return a SemiFuture

Summary: Make futures::retrying return a SemiFuture and safely constrain where the work runs to avoid accidental inline execution.

Reviewed By: yfeldblum

Differential Revision: D24490957

fbshipit-source-id: 9b8c0f369084f2d26bbb4764abde6866fb09b4d9
parent 1e90876e
......@@ -24,7 +24,7 @@ namespace folly {
namespace futures {
/**
* retrying
* retrying and retryingUnsafe
*
* Given a policy and a future-factory, creates futures according to the
* policy.
......@@ -40,11 +40,17 @@ namespace futures {
* (Semi)Future<bool> which, when completed with true, indicates that a retry
* is desired.
*
* If the callable or policy returns a SemiFuture, then retrying returns a
* SemiFuture. Note that, consistent with other SemiFuture-returning functions
* the implication of this statement is that retrying should be assumed to be
* lazy: it may do nothing until .wait()/.get() is called on the result or
* an executor is attached with .via.
* retrying always returns a SemiFuture and enforces transfer onto an executor
* in the implementation. This ensures that recursive policies are safe and
* run in the right place.
* retryingUnsafe returns a Future and restricts both the callable and policy
* to return Future<bool> or bool. SemiFuture forms are not valid.
* As the name suggests, prefer retrying to retryingUnsafe to avoid surprising
* recursion or putting work on the TimeKeeper that should not run there.
* Note that, consistent with other SemiFuture-returning functions retrying
* should be assumed to be lazy: it may do nothing until .wait()/.get() is
* called on the result or an executor is attached with .via.
*
* We provide a few generic policies:
* - Basic
......@@ -70,7 +76,9 @@ template <class Policy, class FF>
Future<typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retryingUnsafe(Policy&& p, FF&& ff);
template <class Policy, class FF>
auto retrying(Policy&& p, FF&& ff);
FOLLY_NODISCARD SemiFuture<
typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retrying(Policy&& p, FF&& ff);
namespace detail {
......@@ -124,11 +132,8 @@ void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
}
template <class Policy, class FF>
typename std::enable_if<
!(isSemiFuture<invoke_result_t<FF, size_t>>::value ||
isSemiFuture<invoke_result_t<Policy, size_t, exception_wrapper>>::value),
invoke_result_t<FF, size_t>>::type
retrying(size_t k, Policy&& p, FF&& ff) {
Future<typename invoke_result_t<FF, size_t>::value_type>
retryingFuture(size_t k, Policy&& p, FF&& ff) {
using F = invoke_result_t<FF, size_t>;
using T = typename F::value_type;
auto prom = Promise<T>();
......@@ -139,38 +144,52 @@ retrying(size_t k, Policy&& p, FF&& ff) {
}
template <class Policy, class FF>
typename std::enable_if<
isSemiFuture<invoke_result_t<FF, size_t>>::value ||
isSemiFuture<invoke_result_t<Policy, size_t, exception_wrapper>>::value,
SemiFuture<typename isFutureOrSemiFuture<
invoke_result_t<FF, size_t>>::Inner>>::type
retrying(size_t k, Policy&& p, FF&& ff) {
SemiFuture<typename invoke_result_t<FF, size_t>::value_type>
retryingSemiFuture(size_t k, Policy&& p, FF&& ff) {
auto sf = folly::makeSemiFuture().deferExValue(
[k, p = static_cast<Policy&&>(p), ff = static_cast<FF&&>(ff)](
Executor::KeepAlive<> ka, auto&&) mutable {
auto futureP = [p = static_cast<Policy&&>(p), ka](
size_t kk, exception_wrapper e) {
size_t kk, exception_wrapper e) mutable {
return p(kk, std::move(e)).via(ka);
};
auto futureFF = [ff = static_cast<FF&&>(ff), ka = std::move(ka)](
size_t v) { return ff(std::move(v)).via(ka); };
return retrying(k, std::move(futureP), std::move(futureFF));
auto futureFF = [ff = static_cast<FF&&>(ff),
ka = std::move(ka)](size_t v) mutable {
return ff(std::move(v)).via(ka);
};
return retryingFuture(k, std::move(futureP), std::move(futureFF));
});
return sf;
}
template <class Policy, class FF>
invoke_result_t<FF, size_t>
retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
Future<typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retryingFuture(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
auto q = [pm = static_cast<Policy&&>(p)](size_t k, exception_wrapper x) {
return makeFuture<bool>(pm(k, x));
};
return retrying(0, std::move(q), static_cast<FF&&>(ff));
return retryingFuture(0, std::move(q), static_cast<FF&&>(ff));
}
template <class Policy, class FF>
SemiFuture<typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retryingSemiFuture(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
auto q = [pm = static_cast<Policy&&>(p)](size_t k, exception_wrapper x) {
return makeSemiFuture<bool>(pm(k, x));
};
return retryingSemiFuture(0, std::move(q), static_cast<FF&&>(ff));
}
template <class Policy, class FF>
Future<typename invoke_result_t<FF, size_t>::value_type>
retryingFuture(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
return retryingFuture(0, static_cast<Policy&&>(p), static_cast<FF&&>(ff));
}
template <class Policy, class FF>
auto retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
return retrying(0, static_cast<Policy&&>(p), static_cast<FF&&>(ff));
SemiFuture<typename invoke_result_t<FF, size_t>::value_type>
retryingSemiFuture(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
return retryingSemiFuture(0, static_cast<Policy&&>(p), static_cast<FF&&>(ff));
}
// jittered exponential backoff, clamped to [backoff_min, backoff_max]
......@@ -276,14 +295,15 @@ template <class Policy, class FF>
Future<typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retryingUnsafe(Policy&& p, FF&& ff) {
using tag = typename detail::retrying_policy_traits<Policy>::tag;
return detail::retrying(
return detail::retryingFuture(
static_cast<Policy&&>(p), static_cast<FF&&>(ff), tag());
}
template <class Policy, class FF>
auto retrying(Policy&& p, FF&& ff) {
SemiFuture<typename isFutureOrSemiFuture<invoke_result_t<FF, size_t>>::Inner>
retrying(Policy&& p, FF&& ff) {
using tag = typename detail::retrying_policy_traits<Policy>::tag;
return detail::retrying(
return detail::retryingSemiFuture(
static_cast<Policy&&>(p), static_cast<FF&&>(ff), tag());
}
......
......@@ -363,7 +363,7 @@ TEST(RetryingTest, large_retries) {
});
};
vector<Future<LargeReturn>> futures;
vector<SemiFuture<LargeReturn>> futures;
for (auto idx = 0; idx < 40; ++idx) {
futures.emplace_back(futures::retrying(
[&executor](size_t, const exception_wrapper&) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment