Commit 47039856 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Add Executor::KeepAlive<> version of futures::window

Reviewed By: yfeldblum

Differential Revision: D10458805

fbshipit-source-id: 6a36ecdc4de3e20115cf3d4805448e2713af1956
parent 8cfc7d7f
......@@ -1805,14 +1805,24 @@ auto window(size_t times, F func, size_t n)
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Executor* executor, Collection input, F func, size_t n) {
return window(
getKeepAliveToken(executor), std::move(input), std::move(func), n);
}
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Executor::KeepAlive<> executor, Collection input, F func, size_t n) {
struct WindowContext {
WindowContext(Executor* executor_, Collection&& input_, F&& func_)
: executor(executor_),
WindowContext(
Executor::KeepAlive<> executor_,
Collection&& input_,
F&& func_)
: executor(std::move(executor_)),
input(std::move(input_)),
promises(input.size()),
func(std::move(func_)) {}
std::atomic<size_t> i{0};
Executor* executor;
Executor::KeepAlive<> executor;
Collection input;
std::vector<Promise<Result>> promises;
F func;
......@@ -1823,12 +1833,12 @@ window(Executor* executor, Collection input, F func, size_t n) {
auto fut = makeSemiFutureWith(
[&] { return ctx->func(std::move(ctx->input[i])); });
fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
const auto executor_ = ctx->executor;
executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
ctx->executor->add(
[ctx = std::move(ctx), i, t = std::move(t)]() mutable {
ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
});
}
}
......@@ -1837,7 +1847,7 @@ window(Executor* executor, Collection input, F func, size_t n) {
auto max = std::min(n, input.size());
auto ctx = std::make_shared<WindowContext>(
executor, std::move(input), std::move(func));
executor.copy(), std::move(input), std::move(func));
// Start the first n Futures
for (size_t i = 0; i < max; ++i) {
......@@ -1847,7 +1857,7 @@ window(Executor* executor, Collection input, F func, size_t n) {
std::vector<Future<Result>> futures;
futures.reserve(ctx->promises.size());
for (auto& promise : ctx->promises) {
futures.emplace_back(promise.getSemiFuture().via(executor));
futures.emplace_back(promise.getSemiFuture().via(executor.copy()));
}
return futures;
......
......@@ -431,6 +431,15 @@ template <
std::vector<Future<Result>>
window(Executor* executor, Collection input, F func, size_t n);
template <
class Collection,
class F,
class ItT = typename std::iterator_traits<
typename Collection::iterator>::value_type,
class Result = typename invoke_result_t<F, ItT&&>::value_type>
std::vector<Future<Result>>
window(Executor::KeepAlive<> executor, Collection input, F func, size_t n);
template <typename F, typename T, typename ItT>
using MaybeTryArg = typename std::
conditional<is_invocable<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment