Commit 3874d423 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add variadic folly::coro::collectAlll() and collectAllTry()

Summary:
Adds two new functions to folly::coro for concurrently awaiting a fixed number of sub-tasks.

* `folly::coro::collectAll(tasks...)`
* `folly::coro::collectAllTry(tasks...)`

Both can be used to concurrently await multiple input tasks. The difference is in how they report the results.

`collectAll()` produces a tuple of the result values for each of the input operations. If any of the input operations fails with an exception then the whole operation fails with an exception (which one is unspecified) any successful results are discarded.

`collectAllTry()` produces a tuple of `Try<T>` objects regardless of whether any of the input operations failed with an exception. The individual result objects can then be queried for the success/failure, allowing the caller to handle partial failure and/or determine which operation(s) failed.

Reviewed By: andriigrynenko

Differential Revision: D14334714

fbshipit-source-id: 22eb51e2198be42e77677a066bfbc15e1c7eb7dd
parent 4fe39e0f
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/detail/InlineTask.h>
namespace folly {
namespace coro {
namespace detail {
template <typename T>
T&& getValueOrUnit(Try<T>&& value) {
return std::move(value).value();
}
Unit getValueOrUnit(Try<void>&& value) {
value.throwIfFailed();
return Unit{};
}
template <typename Awaitable, typename Result>
detail::InlineTaskDetached collectAllStartTask(
Awaitable awaitable,
Try<Result>& result,
folly::coro::Baton& baton,
std::atomic<size_t>& counter) noexcept {
try {
if constexpr (std::is_void_v<Result>) {
co_await static_cast<Awaitable&&>(awaitable);
result.emplace();
} else {
result.emplace(co_await static_cast<Awaitable&&>(awaitable));
}
} catch (const std::exception& ex) {
result.emplaceException(std::current_exception(), ex);
} catch (...) {
result.emplaceException(std::current_exception());
}
if (counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
baton.post();
}
}
template <typename... SemiAwaitables, size_t... Indices>
auto collectAllTryImpl(
std::index_sequence<Indices...>,
SemiAwaitables... awaitables)
-> folly::coro::Task<
std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
if constexpr (sizeof...(SemiAwaitables) == 0) {
co_return std::tuple<>{};
} else {
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
folly::coro::Baton baton;
std::atomic<size_t> counter(sizeof...(SemiAwaitables));
Executor* executor = co_await co_current_executor;
// Use std::initializer_list to ensure that parameter pack is evaluated
// in-order.
(void)std::initializer_list<int>{
(collectAllStartTask(
folly::coro::co_viaIfAsync(
executor, static_cast<SemiAwaitables&&>(awaitables)),
std::get<Indices>(results),
baton,
counter),
0)...};
co_await baton;
co_return results;
}
}
template <typename... SemiAwaitables, size_t... Indices>
auto collectAllImpl(
std::index_sequence<Indices...>,
SemiAwaitables... awaitables)
-> folly::coro::Task<
std::tuple<collect_all_component_t<SemiAwaitables>...>> {
if constexpr (sizeof...(SemiAwaitables) == 0) {
co_return std::tuple<>{};
} else {
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
folly::coro::Baton baton;
std::atomic<size_t> counter(sizeof...(SemiAwaitables));
Executor* executor = co_await co_current_executor;
// Use std::initializer_list to ensure that parameter pack is evaluated
// in-order.
(void)std::initializer_list<int>{
(collectAllStartTask(
folly::coro::co_viaIfAsync(
executor, static_cast<SemiAwaitables&&>(awaitables)),
std::get<Indices>(results),
baton,
counter),
0)...};
co_await baton;
co_return std::tuple<collect_all_component_t<SemiAwaitables>...>{
getValueOrUnit(std::get<Indices>(std::move(results)))...};
}
}
} // namespace detail
template <typename... SemiAwaitables>
auto collectAll(SemiAwaitables&&... awaitables) -> folly::coro::Task<std::tuple<
detail::collect_all_component_t<remove_cvref_t<SemiAwaitables>>...>> {
return detail::collectAllImpl(
std::make_index_sequence<sizeof...(SemiAwaitables)>{},
static_cast<SemiAwaitables&&>(awaitables)...);
}
template <typename... SemiAwaitables>
auto collectAllTry(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>> {
return detail::collectAllTryImpl(
std::make_index_sequence<sizeof...(SemiAwaitables)>{},
static_cast<SemiAwaitables&&>(awaitables)...);
}
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Try.h>
#include <folly/Unit.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/ViaIfAsync.h>
#include <folly/experimental/coro/detail/Traits.h>
#include <experimental/coroutine>
#include <functional>
#include <tuple>
#include <type_traits>
namespace folly {
namespace coro {
namespace detail {
template <typename SemiAwaitable>
using collect_all_try_component_t = folly::Try<decay_rvalue_reference_t<
lift_lvalue_reference_t<semi_await_result_t<SemiAwaitable>>>>;
template <typename SemiAwaitable>
using collect_all_component_t =
decay_rvalue_reference_t<lift_unit_t<semi_await_result_t<SemiAwaitable>>>;
} // namespace detail
///////////////////////////////////////////////////////////////////////////
// collectAll(SemiAwaitable<Ts>...) -> SemiAwaitable<std::tuple<Ts...>>
//
// The collectAll() function can be used to concurrently co_await on multiple
// SemiAwaitable objects and continue once they are all complete.
//
// collectAll() accepts an arbitrary number of SemiAwaitable objects and returns
// a SemiAwaitable object that will complete with a std::tuple of the results.
//
// When the returned SemiAwaitable object is co_awaited it will launch
// a new coroutine for awaiting each input awaitable in-turn.
//
// Note that coroutines for awaiting the input awaitables of later arguments
// will not be launched until the prior coroutine reaches its first suspend
// point. This means that awaiting multiple sub-tasks that all complete
// synchronously will still execute them sequentially on the current thread.
//
// If any of the input operations complete with an exception then the whole
// collectAll() operation will also complete with an exception once all of the
// operations have completed. Any partial results will be discarded.
// If multiple operations fail with an exception then one of the exceptions
// will be rethrown to the caller (which one is unspecified) and the other
// exceptions are discarded.
//
// If you need to know which operation failed or you want to handle partial
// failures then you can use the folly::coro::collectAllTry() instead which
// returns a tuple of Try<T> objects instead of a tuple of values.
//
// Example: Serially awaiting multiple operations (slower)
// folly::coro::Task<Foo> doSomething();
// folly::coro::Task<Bar> doSomethingElse();
//
// Foo result1 = co_await doSomething();
// Bar result2 = co_await doSomethingElse();
//
// Example: Concurrently awaiting multiple operations (faster) C++17-only.
// auto [result1, result2] =
// co_await folly::coro::collectAll(doSomething(), doSomethingElse());
//
template <typename... SemiAwaitables>
auto collectAll(SemiAwaitables&&... awaitables) -> folly::coro::Task<std::tuple<
detail::collect_all_component_t<remove_cvref_t<SemiAwaitables>>...>>;
///////////////////////////////////////////////////////////////////////////
// collectAllTry(SemiAwaitable<Ts>...)
// -> SemiAwaitable<std::tuple<Try<Ts>...>>
//
// Like the collectAll() function, the collectAllTry() function can be used to
// concurrently await multiple input SemiAwaitable objects.
//
// The collectAllTry() function differs from collectAll() in that it produces a
// tuple of Try<T> objects rather than a tuple of the values.
// This allows the caller to inspect the success/failure of individual
// operations and handle partial failures but has a less-convenient interface
// than collectAll().
//
// Example: Handling partial failure with collectAllTry()
// folly::coro::Task<Foo> doSomething();
// folly::coro::Task<Bar> doSomethingElse();
//
// auto [result1, result2] = co_await folly::coro::collectAllTry(
// doSomething(), doSomethingElse());
//
// if (result1.hasValue()) {
// Foo& foo = result1.value();
// process(foo);
// } else {
// logError("doSomething() failed", result1.exception());
// }
//
// if (result2.hasValue()) {
// Bar& bar = result2.value();
// process(bar);
// } else {
// logError("doSomethingElse() failed", result2.exception());
// }
//
template <typename... SemiAwaitables>
auto collectAllTry(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>>;
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Collect-inl.h>
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly {
namespace coro {
namespace detail {
struct co_current_executor_ {
enum class secret_ { token_ };
explicit constexpr co_current_executor_(secret_) {}
};
} // namespace detail
using co_current_executor_t = detail::co_current_executor_;
// A special singleton object that can be co_await'ed within a Task<T> to query
// the current executor associated with the Task.
//
// Example:
// folly::coro::Task<void> example() {
// Executor* e = co_await folly::coro::co_current_executor;
// e->add([] { do_something(); });
// }
constexpr co_current_executor_t co_current_executor{
co_current_executor_t::secret_::token_};
} // namespace coro
} // namespace folly
......@@ -24,6 +24,7 @@
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
#include <folly/Try.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/experimental/coro/Utils.h>
#include <folly/experimental/coro/ViaIfAsync.h>
......@@ -34,17 +35,6 @@
namespace folly {
namespace coro {
namespace detail {
struct co_current_executor_ {
enum class secret_ { token_ };
explicit constexpr co_current_executor_(secret_) {}
};
} // namespace detail
using co_current_executor_t = detail::co_current_executor_;
constexpr co_current_executor_t co_current_executor{
co_current_executor_t::secret_::token_};
template <typename T = void>
class Task;
......
......@@ -21,6 +21,23 @@ namespace folly {
namespace coro {
namespace detail {
/**
* A type trait that lifts lvalue references into std::reference_wrapper<T>
* eg. so the value can be stored in std::optional or folly::Try.
*/
template <typename T>
struct lift_lvalue_reference {
using type = T;
};
template <typename T>
struct lift_lvalue_reference<T&> {
using type = std::reference_wrapper<T>;
};
template <typename T>
using lift_lvalue_reference_t = typename lift_lvalue_reference<T>::type;
/**
* A type trait to decay rvalue-reference types to a prvalue.
*/
......
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
#include <numeric>
#include <string>
////////////////////////////////////////////////////////
// folly::coro::collectAll() tests
TEST(CollectAll, WithNoArgs) {
bool completed = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
std::tuple<> result = co_await folly::coro::collectAll();
completed = true;
(void)result;
}());
CHECK(completed);
}
TEST(CollectAll, OneTaskWithValue) {
folly::coro::Baton baton;
auto f = [&]() -> folly::coro::Task<std::string> {
co_await baton;
co_return "hello";
};
bool completed = false;
auto run = [&]() -> folly::coro::Task<void> {
auto [result] = co_await folly::coro::collectAll(f());
CHECK_EQ("hello", result);
completed = true;
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
executor.drain();
CHECK(!completed);
baton.post();
// Posting the baton should have just scheduled the 'f()' coroutine
// for resumption on the executor but should not have executed
// until we drain the executor again.
CHECK(!completed);
executor.drain();
CHECK(completed);
CHECK(future.isReady());
}
TEST(CollectAll, OneVoidTask) {
bool completed = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
// Checks that the task actually runs and that 'void' results are
// promoted to folly::Unit when placed in a tuple.
std::tuple<folly::Unit> result =
co_await folly::coro::collectAll([&]() -> folly::coro::Task<void> {
completed = true;
co_return;
}());
(void)result;
}());
CHECK(completed);
}
TEST(CollectAll, CollectAllDoesntCompleteUntilAllTasksComplete) {
folly::coro::Baton baton1;
folly::coro::Baton baton2;
bool task1Started = false;
bool task2Started = false;
bool complete = false;
auto run = [&]() -> folly::coro::Task<void> {
auto [first, second] = co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
task1Started = true;
co_await baton1;
}(),
[&]() -> folly::coro::Task<void> {
task2Started = true;
co_await baton2;
}());
complete = true;
(void)first;
(void)second;
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
CHECK(!task1Started);
CHECK(!task2Started);
executor.drain();
CHECK(task1Started);
CHECK(task2Started);
CHECK(!complete);
baton1.post();
executor.drain();
CHECK(!complete);
baton2.post();
executor.drain();
CHECK(complete);
CHECK(future.isReady());
}
struct ErrorA : std::exception {};
struct ErrorB : std::exception {};
struct ErrorC : std::exception {};
TEST(CollectAll, ThrowsOneOfMultipleErrors) {
bool caughtException = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
try {
bool throwError = true;
auto [x, y, z] = co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<int> {
if (throwError) {
throw ErrorA{};
}
co_return 1;
}(),
[&]() -> folly::coro::Task<int> {
if (throwError) {
throw ErrorB{};
}
co_return 2;
}(),
[&]() -> folly::coro::Task<int> {
if (throwError) {
throw ErrorC{};
}
co_return 3;
}());
(void)x;
(void)y;
(void)z;
CHECK(false);
} catch (const ErrorA&) {
caughtException = true;
} catch (const ErrorB&) {
caughtException = true;
} catch (const ErrorC&) {
caughtException = true;
}
}());
CHECK(caughtException);
}
TEST(CollectAll, SynchronousCompletionInLoopDoesntCauseStackOverflow) {
// This test checks that collectAll() is using symmetric transfer to
// resume the awaiting coroutine without consume stack-space.
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
for (int i = 0; i < 1'000'000; ++i) {
auto [n, s] = co_await folly::coro::collectAll(
[]() -> folly::coro::Task<int> { co_return 123; }(),
[]() -> folly::coro::Task<std::string> { co_return "abc"; }());
CHECK_EQ(n, 123);
CHECK_EQ(s, "abc");
}
}());
}
template <
typename Iter,
typename Sentinel,
typename BinaryOp,
typename InitialValue = typename std::iterator_traits<Iter>::value_type>
folly::coro::Task<InitialValue> parallelAccumulate(
Iter begin,
Sentinel end,
BinaryOp op,
InitialValue initialValue = {}) {
auto distance = std::distance(begin, end);
if (distance < 512) {
co_return std::accumulate(
begin, end, std::move(initialValue), std::move(op));
} else {
auto mid = begin + (distance / 2);
auto [first, second] = co_await folly::coro::collectAll(
parallelAccumulate(begin, mid, op, std::move(initialValue))
.scheduleOn(co_await folly::coro::co_current_executor),
parallelAccumulate(mid + 1, end, op, *mid));
co_return op(std::move(first), std::move(second));
}
}
TEST(CollectAll, ParallelAccumulate) {
folly::CPUThreadPoolExecutor threadPool{
4, std::make_shared<folly::NamedThreadFactory>("TestThreadPool")};
folly::coro::blockingWait(
[]() -> folly::coro::Task<void> {
std::vector<int> values(100'000);
for (int i = 0; i < 100'000; ++i) {
values[i] = (1337 * i) % 1'000'000;
}
auto result = co_await parallelAccumulate(
values.begin(), values.end(), [](int a, int b) {
return std::max(a, b);
});
CHECK_EQ(999'989, result);
}()
.scheduleOn(&threadPool));
}
/////////////////////////////////////////////////////////
// folly::coro::collectAllTry() tests
TEST(CollectAllTry, WithNoArgs) {
bool completed = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
std::tuple<> result = co_await folly::coro::collectAllTry();
completed = true;
(void)result;
}());
CHECK(completed);
}
TEST(CollectAllTry, OneTaskWithValue) {
folly::coro::Baton baton;
auto f = [&]() -> folly::coro::Task<std::string> {
co_await baton;
co_return "hello";
};
bool completed = false;
auto run = [&]() -> folly::coro::Task<void> {
auto [result] = co_await folly::coro::collectAllTry(f());
CHECK_EQ("hello", result.value());
completed = true;
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
executor.drain();
CHECK(!completed);
baton.post();
// Posting the baton should have just scheduled the 'f()' coroutine
// for resumption on the executor but should not have executed
// until we drain the executor again.
CHECK(!completed);
executor.drain();
CHECK(completed);
CHECK(future.isReady());
}
TEST(CollectAllTry, OneTaskWithError) {
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto [result] =
co_await folly::coro::collectAllTry([&]() -> folly::coro::Task<void> {
if (false) {
co_return;
}
throw ErrorA{};
}());
CHECK(!result.hasValue());
CHECK(result.hasException());
CHECK(result.exception().get_exception<ErrorA>() != nullptr);
}());
}
TEST(CollectAllTry, PartialFailure) {
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto [aRes, bRes, cRes, dRes] = co_await folly::coro::collectAllTry(
[]() -> folly::coro::Task<int> { co_return 123; }(),
[]() -> folly::coro::Task<std::string> {
if (true) {
throw ErrorA{};
}
co_return "hello";
}(),
[]() -> folly::coro::Task<void> {
if (true) {
throw ErrorB{};
}
co_return;
}(),
[]() -> folly::coro::Task<double> { co_return 3.1415; }());
CHECK(cRes.hasException());
CHECK(cRes.exception().get_exception<ErrorB>() != nullptr);
CHECK(dRes.hasValue());
CHECK_EQ(3.1415, dRes.value());
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment