Commit ea7aeab5 authored by Cameron Pickett's avatar Cameron Pickett Committed by Facebook GitHub Bot

Implement range-accepting versions of collectAny*

Reviewed By: iahs

Differential Revision: D31233377

fbshipit-source-id: c61de3936ad2c3d573ef71aca385dda023b38ee7
parent 8ccd13f0
...@@ -1056,6 +1056,98 @@ auto collectAnyNoDiscard(SemiAwaitables&&... awaitables) ...@@ -1056,6 +1056,98 @@ auto collectAnyNoDiscard(SemiAwaitables&&... awaitables)
static_cast<SemiAwaitables&&>(awaitables)...); static_cast<SemiAwaitables&&>(awaitables)...);
} }
template <typename InputRange>
auto collectAnyRange(InputRange awaitables) -> folly::coro::Task<std::pair<
size_t,
folly::Try<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>>> {
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
std::pair<
size_t,
folly::Try<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>>
firstCompletion;
firstCompletion.first = size_t(-1);
using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
auto makeTask = [&](awaitable_type semiAwaitable,
size_t index) -> folly::coro::Task<void> {
auto result = co_await folly::coro::co_awaitTry(std::move(semiAwaitable));
if (!cancelSource.requestCancellation()) {
// This is first entity to request cancellation.
firstCompletion.first = index;
firstCompletion.second = std::move(result);
}
};
// Create a task to await each input awaitable.
std::vector<folly::coro::Task<void>> tasks;
// TODO: Detect when the input range supports constant-time
// .size() and pre-reserve storage for that many elements in 'tasks'.
size_t taskCount = 0;
for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitables)) {
tasks.push_back(makeTask(
static_cast<decltype(semiAwaitable)&&>(semiAwaitable), taskCount++));
}
co_await folly::coro::co_withCancellation(
cancelToken, folly::coro::collectAllRange(tasks | ranges::views::move));
if (parentCancelToken.isCancellationRequested()) {
co_yield co_cancelled;
}
co_return firstCompletion;
}
template <typename InputRange>
auto collectAnyNoDiscardRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>> {
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
results;
using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
auto makeTask = [&](awaitable_type semiAwaitable,
size_t index) -> folly::coro::Task<void> {
auto result = co_await folly::coro::co_awaitTry(std::move(semiAwaitable));
cancelSource.requestCancellation();
results[index] = std::move(result);
};
// Create a task to await each input awaitable.
std::vector<folly::coro::Task<void>> tasks;
// TODO: Detect when the input range supports constant-time
// .size() and pre-reserve storage for that many elements in 'tasks'.
size_t taskCount = 0;
for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitables)) {
tasks.push_back(makeTask(
static_cast<decltype(semiAwaitable)&&>(semiAwaitable), taskCount++));
}
results.resize(taskCount);
co_await folly::coro::co_withCancellation(
cancelToken, folly::coro::collectAllRange(tasks | ranges::views::move));
co_return results;
}
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
......
...@@ -434,7 +434,7 @@ auto collectAllTryWindowed( ...@@ -434,7 +434,7 @@ auto collectAllTryWindowed(
// folly::coro::Task<Foo> getDataOneWay(); // folly::coro::Task<Foo> getDataOneWay();
// folly::coro::Task<Foo> getDataAnotherWay(); // folly::coro::Task<Foo> getDataAnotherWay();
// //
// std::pair<std::size_t, Foo> result = co_await folly::coro::collectAny( // std::pair<std::size_t, Try<Foo>> result = co_await folly::coro::collectAny(
// getDataOneWay(), getDataAnotherWay()); // getDataOneWay(), getDataAnotherWay());
// //
template <typename SemiAwaitable, typename... SemiAwaitables> template <typename SemiAwaitable, typename... SemiAwaitables>
...@@ -465,13 +465,90 @@ auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables) ...@@ -465,13 +465,90 @@ auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
// folly::coro::Task<Bar> getDataAnotherWay(); // folly::coro::Task<Bar> getDataAnotherWay();
// //
// std::tuple<folly::Try<Foo>, folly::Try<Bar>> result = co_await // std::tuple<folly::Try<Foo>, folly::Try<Bar>> result = co_await
// folly::coro::collectAny(getDataOneWay(), getDataAnotherWay()); // folly::coro::collectAnyNoDiscard(getDataOneWay(), getDataAnotherWay());
// //
template <typename... SemiAwaitables> template <typename... SemiAwaitables>
auto collectAnyNoDiscard(SemiAwaitables&&... awaitables) auto collectAnyNoDiscard(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t< -> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>>; remove_cvref_t<SemiAwaitables>>...>>;
///////////////////////////////////////////////////////////////////////////
// collectAnyRange(RangeOf<SemiAwaitable<T>>&&)
// -> SemiAwaitable<std::pair<std::size_t, folly::Try<T>>>
//
// The collectAnyRange() function can be used to concurrently co_await on
// multiple SemiAwaitable objects, get the result and index of the first one
// completing, cancel the remaining ones and continue once they are completed.
//
// collectAnyRange() accepts zero or more SemiAwaitable objects and
// returns a SemiAwaitable object that will complete with a pair containing the
// result of the first one to complete and its index.
//
// collectAnyRange() is built on top of collectAllRange(), be aware of the
// coroutine starting behavior described in collectAll() documentation.
//
// The result of the first SemiAwaitable is going to be returned, whether it
// is a value or an exception. Any result of the remaining SemiAwaitables will
// be discarded, independently of whether it's a value or an exception.
//
// e.g.
//
// std::vector<Task<T>> tasks = ...;
// std::pair<size_t, Try<T>> result = co_await collectAnyRange(tasks |
// ranges::views::move);
//
template <typename InputRange>
auto collectAnyRange(InputRange awaitables) -> folly::coro::Task<std::pair<
size_t,
folly::Try<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>>>;
///////////////////////////////////////////////////////////////////////////
// collectAnyNoDiscardRange(RangeOf<SemiAwaitable<T>>&&)
// -> SemiAwaitable<std::vector<folly::Try<T>>>
//
// The collectAnyNoDiscardRange() function is similar to collectAnyRange() in
// that it co_awaits multiple SemiAwaitables and cancels any outstanding
// operations once at least one has finished. Unlike collectAnyRange(), it
// returns results from *all* SemiAwaitables, including
// folly::OperationCancelled for operations that were cancelled.
//
// collectAnyNoDiscardRange() is built on top of collectAllRange(), be aware of
// the coroutine starting behavior described in collectAll() documentation.
//
// The success/failure of individual results can be inspected by calling
// .hasValue() or .hasException() on the elements of the returned vector.
//
// Example:
// folly::coro::Task<Foo> getDataOneWay();
// folly::coro::Task<Foo> getDataAnotherWay();
//
// std::vector<folly::Try<Foo>> result = co_await
// folly::coro::collectAnyNoDiscard(getDataOneWay(), getDataAnotherWay());
//
template <typename InputRange>
auto collectAnyNoDiscardRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>>;
// collectAnyRange()/collectAnyNoDiscardRange() overloads that simplifies the
// common-case where an rvalue std::vector<SemiAwaitable> is passed.
//
// This avoids the caller needing to pipe the input through ranges::views::move
// transform to force the elements to be rvalue-references since the
// std::vector<T>::reference type is T& rather than T&& and some awaitables,
// such as Task<U>, are not lvalue awaitable.
template <typename SemiAwaitable>
auto collectAnyRange(std::vector<SemiAwaitable> awaitables)
-> decltype(collectAnyRange(awaitables | ranges::views::move)) {
co_return co_await collectAnyRange(awaitables | ranges::views::move);
}
template <typename SemiAwaitable>
auto collectAnyNoDiscardRange(std::vector<SemiAwaitable> awaitables)
-> decltype(collectAnyNoDiscardRange(awaitables | ranges::views::move)) {
co_return co_await collectAnyNoDiscardRange(awaitables | ranges::views::move);
}
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment