Commit 699acb25 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add folly::coro::collectAll[Try]Range()

Summary:
Add functions for concurrently awaiting a collection of SemiAwaitable objects.

* `folly::coro::collectAllRange()` takes an InputRange of SemiAwaitable objects and concurrently awaits them, producing a `std::vector` of the results. This will throw an exception if any of the input operations failed with an exception.
* `folly::coro::collectAllTryRange()` takes an InputRange of SemiAwaitable objects and concurrently awaits them, producing a `std::vector<folly::Try<T>>` containing the results. Failure of individual operations can be determined by inspecting the corresponding `Try` value.

This diff also refactors the existing implementations of `folly::coro::collectAll()` to make use of a new `Barrier` and `BarrierTask` implementation detail rather than using `DetachedInlineTask`, `Baton` and an atomic counter. This allows delaying the start of the child corouines until after all coroutines have been created which was necessary for cases where we didn't know the number of elements in the input range to allow storage for the results to be allocated in a vector before the tasks start producing results.

Reviewed By: andriigrynenko

Differential Revision: D14430147

fbshipit-source-id: 5ba4b7cdc3c9b65d1736f14a2d39f7beb6cb82b0
parent 1290f5f2
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
*/ */
#include <folly/ExceptionWrapper.h> #include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/Baton.h> #include <folly/experimental/coro/detail/Barrier.h>
#include <folly/experimental/coro/detail/InlineTask.h> #include <folly/experimental/coro/detail/BarrierTask.h>
namespace folly { namespace folly {
namespace coro { namespace coro {
...@@ -32,27 +32,43 @@ Unit getValueOrUnit(Try<void>&& value) { ...@@ -32,27 +32,43 @@ Unit getValueOrUnit(Try<void>&& value) {
return Unit{}; return Unit{};
} }
template <typename Awaitable, typename Result> // Helper class that can be used to annotate Awaitable objects that will
detail::InlineTaskDetached collectAllStartTask( // guarantee that they will be resumed on the correct executor so that
Awaitable awaitable, // when the object is awaited within a Task<T> it doesn't automatically
Try<Result>& result, // wrap the Awaitable in something that forces a reschedule onto the
folly::coro::Baton& baton, // executor.
std::atomic<size_t>& counter) noexcept { template <typename Awaitable>
class UnsafeResumeInlineSemiAwaitable {
public:
explicit UnsafeResumeInlineSemiAwaitable(Awaitable&& awaitable) noexcept
: awaitable_(awaitable) {}
Awaitable&& viaIfAsync(folly::Executor::KeepAlive<>) && noexcept {
return static_cast<Awaitable&&>(awaitable_);
}
private:
Awaitable awaitable_;
};
template <typename SemiAwaitable, typename Result>
detail::BarrierTask makeCollectAllTask(
folly::Executor* executor,
SemiAwaitable&& awaitable,
Try<Result>& result) {
try { try {
if constexpr (std::is_void_v<Result>) { if constexpr (std::is_void_v<Result>) {
co_await static_cast<Awaitable&&>(awaitable); co_await co_viaIfAsync(executor, static_cast<SemiAwaitable&&>(awaitable));
result.emplace(); result.emplace();
} else { } else {
result.emplace(co_await static_cast<Awaitable&&>(awaitable)); result.emplace(co_await co_viaIfAsync(
executor, static_cast<SemiAwaitable&&>(awaitable)));
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
result.emplaceException(std::current_exception(), ex); result.emplaceException(std::current_exception(), ex);
} catch (...) { } catch (...) {
result.emplaceException(std::current_exception()); result.emplaceException(std::current_exception());
} }
if (counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
baton.post();
}
} }
template <typename... SemiAwaitables, size_t... Indices> template <typename... SemiAwaitables, size_t... Indices>
...@@ -61,27 +77,33 @@ auto collectAllTryImpl( ...@@ -61,27 +77,33 @@ auto collectAllTryImpl(
SemiAwaitables... awaitables) SemiAwaitables... awaitables)
-> folly::coro::Task< -> folly::coro::Task<
std::tuple<collect_all_try_component_t<SemiAwaitables>...>> { std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
static_assert(sizeof...(Indices) == sizeof...(SemiAwaitables));
if constexpr (sizeof...(SemiAwaitables) == 0) { if constexpr (sizeof...(SemiAwaitables) == 0) {
co_return std::tuple<>{}; co_return std::tuple<>{};
} else { } else {
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results; std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
folly::coro::Baton baton;
std::atomic<size_t> counter(sizeof...(SemiAwaitables));
Executor* executor = co_await co_current_executor; Executor* executor = co_await co_current_executor;
// Use std::initializer_list to ensure that parameter pack is evaluated folly::coro::detail::BarrierTask tasks[sizeof...(SemiAwaitables)] = {
// in-order. makeCollectAllTask(
(void)std::initializer_list<int>{ executor,
(collectAllStartTask( static_cast<SemiAwaitables&&>(awaitables),
folly::coro::co_viaIfAsync( std::get<Indices>(results))...,
executor, static_cast<SemiAwaitables&&>(awaitables)), };
std::get<Indices>(results),
baton, folly::coro::detail::Barrier barrier{sizeof...(SemiAwaitables) + 1};
counter),
0)...};
co_await baton; // Use std::initializer_list to ensure that the sub-tasks are launched
// in the order they appear in the parameter pack.
(void)std::initializer_list<int>{(tasks[Indices].start(&barrier), 0)...};
// Wait for all of the sub-tasks to finish execution.
// Should be safe to avoid an executor transition here even if the
// operation completes asynchronously since all of the child tasks
// should already have transitioned to the correct executor due to
// the use of co_viaIfAsync() within makeBarrierTask().
co_await UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
co_return results; co_return results;
} }
...@@ -98,22 +120,27 @@ auto collectAllImpl( ...@@ -98,22 +120,27 @@ auto collectAllImpl(
} else { } else {
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results; std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
folly::coro::Baton baton;
std::atomic<size_t> counter(sizeof...(SemiAwaitables));
Executor* executor = co_await co_current_executor; Executor* executor = co_await co_current_executor;
// Use std::initializer_list to ensure that parameter pack is evaluated folly::coro::detail::BarrierTask tasks[sizeof...(SemiAwaitables)] = {
// in-order. makeCollectAllTask(
(void)std::initializer_list<int>{ executor,
(collectAllStartTask( static_cast<SemiAwaitables&&>(awaitables),
folly::coro::co_viaIfAsync( std::get<Indices>(results))...,
executor, static_cast<SemiAwaitables&&>(awaitables)), };
std::get<Indices>(results),
baton, folly::coro::detail::Barrier barrier{sizeof...(SemiAwaitables) + 1};
counter),
0)...};
co_await baton; // Use std::initializer_list to ensure that the sub-tasks are launched
// in the order they appear in the parameter pack.
(void)std::initializer_list<int>{(tasks[Indices].start(&barrier), 0)...};
// Wait for all of the sub-tasks to finish execution.
// Should be safe to avoid an executor transition here even if the
// operation completes asynchronously since all of the child tasks
// should already have transitioned to the correct executor due to
// the use of co_viaIfAsync() within makeBarrierTask().
co_await UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
co_return std::tuple<collect_all_component_t<SemiAwaitables>...>{ co_return std::tuple<collect_all_component_t<SemiAwaitables>...>{
getValueOrUnit(std::get<Indices>(std::move(results)))...}; getValueOrUnit(std::get<Indices>(std::move(results)))...};
...@@ -139,5 +166,155 @@ auto collectAllTry(SemiAwaitables&&... awaitables) ...@@ -139,5 +166,155 @@ auto collectAllTry(SemiAwaitables&&... awaitables)
static_cast<SemiAwaitables&&>(awaitables)...); static_cast<SemiAwaitables&&>(awaitables)...);
} }
template <
typename InputRange,
std::enable_if_t<
!std::is_void_v<
semi_await_result_t<detail::range_reference_t<InputRange>>>,
int>>
auto collectAllRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>> {
auto results =
co_await folly::coro::collectAllTryRange(std::move(awaitables));
// Collate the results into a single result vector.
std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>
values;
values.reserve(results.size());
for (auto&& result : results) {
values.push_back(std::move(result).value());
}
co_return std::move(values);
}
template <
typename InputRange,
std::enable_if_t<
std::is_void_v<
semi_await_result_t<detail::range_reference_t<InputRange>>>,
int>>
auto collectAllRange(InputRange awaitables) -> folly::coro::Task<void> {
using reference_type = detail::range_reference_t<InputRange>;
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
results;
folly::Executor::KeepAlive<> executor =
folly::getKeepAliveToken(co_await co_current_executor);
std::atomic<bool> anyFailures{false};
exception_wrapper firstException;
using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
auto makeTask = [&](awaitable_type semiAwaitable) -> detail::BarrierTask {
try {
co_await coro::co_viaIfAsync(
executor.copyDummy(), std::move(semiAwaitable));
} catch (const std::exception& ex) {
if (!anyFailures.exchange(true, std::memory_order_relaxed)) {
firstException = exception_wrapper{std::current_exception(), ex};
}
} catch (...) {
if (!anyFailures.exchange(true, std::memory_order_relaxed)) {
firstException = exception_wrapper{std::current_exception()};
}
}
};
// Create a task to await each input awaitable.
std::vector<detail::BarrierTask> tasks;
// TODO: Detect when the input range supports constant-time
// .size() and pre-reserve storage for that many elements in 'tasks'.
for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitables)) {
tasks.push_back(
makeTask(static_cast<decltype(semiAwaitable)&&>(semiAwaitable)));
}
// Launch the tasks and wait for them all to finish.
{
detail::Barrier barrier{tasks.size() + 1};
for (auto&& task : tasks) {
task.start(&barrier);
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
}
// Check if there were any exceptions and rethrow the first one.
if (anyFailures.load(std::memory_order_relaxed)) {
firstException.throw_exception();
}
}
template <typename InputRange>
auto collectAllTryRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>> {
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
results;
folly::Executor::KeepAlive<> executor =
folly::getKeepAliveToken(co_await co_current_executor);
using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
auto makeTask = [&](std::size_t index,
awaitable_type semiAwaitable) -> detail::BarrierTask {
assert(index < results.size());
auto& result = results[index];
try {
using await_result =
semi_await_result_t<detail::range_reference_t<InputRange>>;
if constexpr (std::is_void_v<await_result>) {
co_await coro::co_viaIfAsync(
executor.copyDummy(), std::move(semiAwaitable));
result.emplace();
} else {
result.emplace(co_await coro::co_viaIfAsync(
executor.copyDummy(), std::move(semiAwaitable)));
}
} catch (const std::exception& ex) {
result.emplaceException(std::current_exception(), ex);
} catch (...) {
result.emplaceException(std::current_exception());
}
};
// Create a task to await each input awaitable.
std::vector<detail::BarrierTask> tasks;
// TODO: Detect when the input range supports constant-time
// .size() and pre-reserve storage for that many elements in 'tasks'.
{
std::size_t index = 0;
for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitables)) {
tasks.push_back(makeTask(
index++, static_cast<decltype(semiAwaitable)&&>(semiAwaitable)));
}
}
// Now that we know how many tasks there are, allocate that
// many Try objects to store the results before we start
// executing the tasks.
results.resize(tasks.size());
// Launch the tasks and wait for them all to finish.
{
detail::Barrier barrier{tasks.size() + 1};
for (auto&& task : tasks) {
task.start(&barrier);
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
}
co_return std::move(results);
}
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
...@@ -21,8 +21,11 @@ ...@@ -21,8 +21,11 @@
#include <folly/experimental/coro/ViaIfAsync.h> #include <folly/experimental/coro/ViaIfAsync.h>
#include <folly/experimental/coro/detail/Traits.h> #include <folly/experimental/coro/detail/Traits.h>
#include <range/v3/view/move.hpp>
#include <experimental/coroutine> #include <experimental/coroutine>
#include <functional> #include <functional>
#include <iterator>
#include <tuple> #include <tuple>
#include <type_traits> #include <type_traits>
...@@ -38,6 +41,23 @@ template <typename SemiAwaitable> ...@@ -38,6 +41,23 @@ template <typename SemiAwaitable>
using collect_all_component_t = using collect_all_component_t =
decay_rvalue_reference_t<lift_unit_t<semi_await_result_t<SemiAwaitable>>>; decay_rvalue_reference_t<lift_unit_t<semi_await_result_t<SemiAwaitable>>>;
template <typename SemiAwaitable>
using collect_all_range_component_t = decay_rvalue_reference_t<
lift_lvalue_reference_t<semi_await_result_t<SemiAwaitable>>>;
template <typename SemiAwaitable>
using collect_all_try_range_component_t =
collect_all_try_component_t<SemiAwaitable>;
template <typename Range>
using range_iterator_t = decltype(std::begin(std::declval<Range>()));
template <typename Iterator>
using iterator_reference_t = typename std::iterator_traits<Iterator>::reference;
template <typename Range>
using range_reference_t = iterator_reference_t<range_iterator_t<Range>>;
} // namespace detail } // namespace detail
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
...@@ -122,6 +142,86 @@ auto collectAllTry(SemiAwaitables&&... awaitables) ...@@ -122,6 +142,86 @@ auto collectAllTry(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t< -> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>>; remove_cvref_t<SemiAwaitables>>...>>;
////////////////////////////////////////////////////////////////////////
// rangeCollectAll(RangeOf<SemiAwaitable<T>>&&)
// -> SemiAwaitable<std::vector<T>>
//
// The collectAllRange() function can be used to concurrently await a collection
// of SemiAwaitable objects, returning a std::vector of the individual results
// once all operations have completed.
//
// If any of the operations fail with an exception the entire operation fails
// with an exception and any partial results are discarded. If more than one
// operation fails with an exception then the exception from the first failed
// operation in the input range is rethrown. Other results and exceptions are
// discarded.
//
// If you need to be able to distinguish which operation failed or handle
// partial failures then use collectAllTryRange() instead.
//
// Note that the expression `*it` must be SemiAwaitable.
// This typically means that containers of Task<T> must be adapted to produce
// moved-elements by applying the ranges::view::move transform.
// e.g.
//
// std::vector<Task<T>> tasks = ...;
// std::vector<T> vals = co_await collectAllRange(tasks | ranges::view::move);
//
template <
typename InputRange,
std::enable_if_t<
!std::is_void_v<
semi_await_result_t<detail::range_reference_t<InputRange>>>,
int> = 0>
auto collectAllRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>>;
template <
typename InputRange,
std::enable_if_t<
std::is_void_v<
semi_await_result_t<detail::range_reference_t<InputRange>>>,
int> = 0>
auto collectAllRange(InputRange awaitables) -> folly::coro::Task<void>;
////////////////////////////////////////////////////////////////////////////
// collectAllTryRange(RangeOf<SemiAwaitable<T>>&&)
// -> SemiAwaitable<std::vector<folly::Try<T>>>
//
// The collectAllTryRange() function can be used to concurrently await a
// collection of SemiAwaitable objects and produces a std::vector of
// Try<T> objects once all of the input operations have completed.
//
// The element of the returned vector contains the result of the corresponding
// input operation in the same order that they appeared in the 'awaitables'
// sequence.
//
// The success/failure of individual results can be inspected by calling
// .hasValue() or .hasException() on the elements of the returned vector.
template <typename InputRange>
auto collectAllTryRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>>;
// collectAllRange()/collectAllTryRange() overloads that simplifies the
// common-case where an rvalue std::vector<Task<T>> is passed.
//
// This avoids the caller needing to pipe the input through ranges::view::move
// transform to force the Task<T> elements to be rvalue-references since the
// std::vector<T>::reference type is T& rather than T&& and Task<T>& is not
// awaitable.
template <typename T>
auto collectAllRange(std::vector<Task<T>> awaitables)
-> decltype(collectAllRange(awaitables | ranges::view::move)) {
co_return co_await collectAllRange(awaitables | ranges::view::move);
}
template <typename T>
auto collectAllTryRange(std::vector<Task<T>> awaitables)
-> decltype(collectAllTryRange(awaitables | ranges::view::move)) {
co_return co_await collectAllTryRange(awaitables | ranges::view::move);
}
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
......
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <experimental/coroutine>
#include <utility>
namespace folly {
namespace coro {
namespace detail {
// A Barrier is a synchronisation building block that can be used to
// implement higher-level coroutine-based synchronisation primitives.
//
// It allows a single coroutine to wait until a counter reaches zero.
// The counter typically represents the amount of outstanding work.
// When a coroutine completes some work it should call arrive() which
// will return a continuation.
class Barrier {
public:
explicit Barrier(std::size_t initialCount = 0) noexcept
: count_(initialCount) {}
void add(std::size_t count = 1) noexcept {
[[maybe_unused]] std::size_t oldCount =
count_.fetch_add(count, std::memory_order_relaxed);
// Check we didn't overflow the count.
assert(SIZE_MAX - oldCount >= count);
}
std::experimental::coroutine_handle<> arrive() noexcept {
const std::size_t oldCount = count_.fetch_sub(1, std::memory_order_acq_rel);
// Invalid to call arrive() if you haven't previously incremented the
// counter using .add().
assert(oldCount >= 1);
if (oldCount == 1) {
return std::exchange(continuation_, {});
} else {
return std::experimental::noop_coroutine();
}
}
auto arriveAndWait() noexcept {
class Awaiter {
public:
explicit Awaiter(Barrier& barrier) noexcept : barrier_(barrier) {}
bool await_ready() {
return false;
}
std::experimental::coroutine_handle<> await_suspend(
std::experimental::coroutine_handle<> continuation) noexcept {
barrier_.setContinuation(continuation);
return barrier_.arrive();
}
void await_resume() noexcept {}
private:
Barrier& barrier_;
};
return Awaiter{*this};
}
void setContinuation(
std::experimental::coroutine_handle<> continuation) noexcept {
assert(!continuation_);
continuation_ = continuation;
}
private:
std::atomic<std::size_t> count_;
std::experimental::coroutine_handle<> continuation_;
};
} // namespace detail
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/coro/detail/Barrier.h>
#include <cassert>
#include <experimental/coroutine>
#include <utility>
namespace folly {
namespace coro {
namespace detail {
class BarrierTask;
class BarrierTaskPromise {};
class BarrierTask {
public:
class promise_type {
struct FinalAwaiter {
bool await_ready() noexcept {
return false;
}
std::experimental::coroutine_handle<> await_suspend(
std::experimental::coroutine_handle<promise_type> h) noexcept {
auto& promise = h.promise();
assert(promise.barrier_ != nullptr);
return promise.barrier_->arrive();
}
void await_resume() noexcept {}
};
public:
BarrierTask get_return_object() noexcept {
return BarrierTask{
std::experimental::coroutine_handle<promise_type>::from_promise(
*this)};
}
std::experimental::suspend_always initial_suspend() noexcept {
return {};
}
FinalAwaiter final_suspend() noexcept {
return {};
}
void return_void() noexcept {}
[[noreturn]] void unhandled_exception() noexcept {
std::terminate();
}
void setBarrier(Barrier* barrier) noexcept {
assert(barrier_ == nullptr);
barrier_ = barrier;
}
private:
Barrier* barrier_ = nullptr;
};
private:
using handle_t = std::experimental::coroutine_handle<promise_type>;
explicit BarrierTask(handle_t coro) noexcept : coro_(coro) {}
public:
BarrierTask(BarrierTask&& other) noexcept
: coro_(std::exchange(other.coro_, {})) {}
~BarrierTask() {
if (coro_) {
coro_.destroy();
}
}
BarrierTask& operator=(BarrierTask other) noexcept {
swap(other);
return *this;
}
void swap(BarrierTask& b) noexcept {
std::swap(coro_, b.coro_);
}
void start(Barrier* barrier) noexcept {
assert(coro_);
coro_.promise().setBarrier(barrier);
coro_.resume();
}
auto startAndWaitForBarrier(Barrier* barrier) noexcept {
class awaiter {
public:
explicit awaiter(Barrier* barrier, handle_t coro) noexcept
: barrier_(barrier), coro_(coro) {}
bool await_ready() noexcept {
return false;
}
std::experimental::coroutine_handle<> await_suspend(
std::experimental::coroutine_handle<> continuation) noexcept {
coro_.promise().setBarrier(barrier_);
barrier_->setContinuation(continuation);
return coro_;
}
void await_resume() noexcept {}
private:
Barrier* barrier_;
handle_t coro_;
};
assert(coro_);
return awaiter{barrier, coro_};
}
private:
handle_t coro_;
};
} // namespace detail
} // namespace coro
} // namespace folly
...@@ -325,4 +325,163 @@ TEST(CollectAllTry, PartialFailure) { ...@@ -325,4 +325,163 @@ TEST(CollectAllTry, PartialFailure) {
}()); }());
} }
/////////////////////////////////////////////////////////////
// collectAllRange() tests
TEST(CollectAllRange, EmptyRangeOfVoidTask) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
std::vector<folly::coro::Task<void>> tasks;
auto collectTask = folly::coro::collectAllRange(std::move(tasks));
static_assert(
std::is_void<
folly::coro::semi_await_result_t<decltype(collectTask)>>::value,
"Result of awaiting collectAllRange() of Task<void> should be void");
co_await std::move(collectTask);
}());
}
TEST(CollectAllRange, RangeOfVoidAllSucceeding) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
int count = 0;
auto makeTask = [&]() -> folly::coro::Task<void> {
++count;
co_return;
};
std::vector<folly::coro::Task<void>> tasks;
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
co_await folly::coro::collectAllRange(std::move(tasks));
CHECK_EQ(3, count);
}());
}
TEST(CollectAllRange, RangeOfVoidSomeFailing) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
int count = 0;
auto makeTask = [&]() -> folly::coro::Task<void> {
if ((++count % 3) == 0) {
throw ErrorA{};
}
co_return;
};
std::vector<folly::coro::Task<void>> tasks;
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
try {
co_await folly::coro::collectAllRange(std::move(tasks));
CHECK(false);
} catch (const ErrorA&) {
}
CHECK_EQ(5, count);
}());
}
TEST(CollectAllRange, RangeOfNonVoid) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
int count = 0;
auto makeTask = [&]() -> folly::coro::Task<int> {
using namespace std::literals::chrono_literals;
int x = count++;
if ((x % 20) == 0) {
co_await folly::futures::sleep(50ms);
}
co_return x;
};
constexpr int taskCount = 50;
std::vector<folly::coro::Task<int>> tasks;
for (int i = 0; i < taskCount; ++i) {
tasks.push_back(makeTask());
}
CHECK_EQ(0, count);
std::vector<int> results =
co_await folly::coro::collectAllRange(std::move(tasks));
CHECK_EQ(taskCount, results.size());
CHECK_EQ(taskCount, count);
for (int i = 0; i < taskCount; ++i) {
CHECK_EQ(i, results[i]);
}
}());
}
////////////////////////////////////////////////////////////////////
// folly::coro::collectAllTryRange() tests
TEST(CollectAllTryRange, RangeOfVoidSomeFailing) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
int count = 0;
auto makeTask = [&]() -> folly::coro::Task<void> {
if ((++count % 3) == 0) {
throw ErrorA{};
}
co_return;
};
std::vector<folly::coro::Task<void>> tasks;
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
auto results = co_await folly::coro::collectAllTryRange(std::move(tasks));
CHECK_EQ(5, results.size());
CHECK(results[0].hasValue());
CHECK(results[1].hasValue());
CHECK(results[2].hasException());
CHECK(results[3].hasValue());
CHECK(results[4].hasValue());
}());
}
TEST(CollectAllTryRange, RangeOfValueSomeFailing) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
int count = 0;
auto makeTask = [&]() -> folly::coro::Task<std::string> {
if ((++count % 3) == 0) {
throw ErrorA{};
}
co_return "testing";
};
std::vector<folly::coro::Task<std::string>> tasks;
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
tasks.push_back(makeTask());
auto results = co_await folly::coro::collectAllTryRange(std::move(tasks));
CHECK_EQ(6, results.size());
CHECK(results[0].hasValue());
CHECK_EQ("testing", results[0].value());
CHECK(results[1].hasValue());
CHECK_EQ("testing", results[1].value());
CHECK(results[2].hasException());
CHECK(results[3].hasValue());
CHECK_EQ("testing", results[3].value());
CHECK(results[4].hasValue());
CHECK_EQ("testing", results[4].value());
CHECK(results[5].hasException());
}());
}
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment