Commit b8f35551 authored by Francesco Zoffoli's avatar Francesco Zoffoli Committed by Facebook GitHub Bot

Implement coro::collectAny

Summary:
`collectAll` allows to `co_await`s multiple tasks using structured concurrency.

Unfortunately `future::collectAny` does not follow the structured concurrency pattern, and detaches the uncompleted operations.
This can result in memory errors (the coroutines access data that has already been freed).

This diff introduces `coro::collectAny`, which given a number of awaitables it returns the result of the first awaitable to finish, in addition to its index, cancels the remaining operations **and waits for them to complete**.

The implementation uses `collectAll` as a building block.
The return signature mirrors the one from `future::collectAny`.

Reviewed By: yfeldblum, rptynan

Differential Revision: D28945040

fbshipit-source-id: 402be03e004d373cbc74821ae8282b1aaf621b2d
parent dc7ba0b5
......@@ -14,6 +14,9 @@
* limitations under the License.
*/
#include <utility>
#include <folly/CancellationToken.h>
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/AsyncPipe.h>
#include <folly/experimental/coro/AsyncScope.h>
......@@ -276,6 +279,46 @@ auto makeUnorderedAsyncGeneratorFromAwaitableRangeImpl(
}(scope, std::move(awaitables));
}
template <typename... SemiAwaitables, size_t... Indices>
auto collectAnyImpl(
std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::pair<
std::size_t,
folly::Try<collect_any_component_t<SemiAwaitables...>>>> {
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
CancellationCallback cancelCallback(parentCancelToken, [&]() noexcept {
cancelSource.requestCancellation();
});
const CancellationToken cancelToken = cancelSource.getToken();
std::atomic<bool> resultHasBeenSet{false};
std::pair<std::size_t, folly::Try<collect_any_component_t<SemiAwaitables...>>>
firstCompletion{0, {}};
co_await folly::coro::collectAll(folly::coro::co_withCancellation(
cancelToken,
folly::coro::co_invoke(
[&, aw = static_cast<SemiAwaitables&&>(awaitables)]() mutable
-> folly::coro::Task<void> {
auto result = co_await folly::coro::co_awaitTry(
static_cast<SemiAwaitables&&>(aw));
if (!resultHasBeenSet.load(std::memory_order_relaxed) &&
!resultHasBeenSet.exchange(true, std::memory_order_relaxed)) {
cancelSource.requestCancellation();
firstCompletion.first = Indices;
firstCompletion.second = std::move(result);
}
}))...);
if (parentCancelToken.isCancellationRequested()) {
co_yield co_cancelled;
}
co_return firstCompletion;
}
} // namespace detail
template <typename... SemiAwaitables>
......@@ -960,6 +1003,19 @@ auto makeUnorderedAsyncGeneratorFromAwaitableTryRange(
scope, std::move(awaitables), bool_constant<true>{});
}
template <typename SemiAwaitable, typename... SemiAwaitables>
auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::pair<
std::size_t,
folly::Try<detail::collect_any_component_t<
SemiAwaitable,
SemiAwaitables...>>>> {
return detail::collectAnyImpl(
std::make_index_sequence<sizeof...(SemiAwaitables) + 1>{},
static_cast<SemiAwaitable&&>(awaitable),
static_cast<SemiAwaitables&&>(awaitables)...);
}
} // namespace coro
} // namespace folly
......
......@@ -55,6 +55,10 @@ template <typename SemiAwaitable>
using collect_all_try_range_component_t =
collect_all_try_component_t<SemiAwaitable>;
template <typename... SemiAwaitables>
using collect_any_component_t = std::common_type_t<
decay_rvalue_reference_t<semi_await_result_t<SemiAwaitables>>...>;
template <typename Range>
using range_iterator_t = decltype(access::begin(std::declval<Range&>()));
......@@ -368,6 +372,40 @@ auto collectAllTryWindowed(
awaitables | ranges::views::move, maxConcurrency);
}
///////////////////////////////////////////////////////////////////////////
// collectAny(SemiAwaitable<Ts>...) -> SemiAwaitable<
// std::pair<std::size_t, folly::Try<std::common_type<Ts...>>>>
//
// The collectAny() function can be used to concurrently co_await on multiple
// SemiAwaitable objects, get the result and index of the first one completing,
// cancel the remaining ones and continue once they are completed.
//
// collectAny() accepts a positive number of SemiAwaitable objects and
// returns a SemiAwaitable object that will complete with a pair containing the
// result of the first one to complete and its index.
//
// collectAny() is built on top of collectAll(), be aware of the coroutine
// starting behavior described in collectAll() documentation.
//
// The result of the first SemiAwaitable is going to be returned, whether it
// is a value or an exception. Any result of the remaining SemiAwaitables will
// be discarded, independently of whether it's a value or an exception.
//
// Example:
// folly::coro::Task<Foo> getDataOneWay();
// folly::coro::Task<Foo> getDataAnotherWay();
//
// std::pair<std::size_t, Foo> result = co_await folly::coro::collectAny(
// getDataOneWay(), getDataAnotherWay());
//
template <typename SemiAwaitable, typename... SemiAwaitables>
auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::pair<
std::size_t,
folly::Try<detail::collect_any_component_t<
SemiAwaitable,
SemiAwaitables...>>>>;
} // namespace coro
} // namespace folly
......
......@@ -1645,4 +1645,210 @@ TEST_F(CollectAllTryWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
}());
}
class CollectAnyTest : public testing::Test {};
TEST_F(CollectAnyTest, OneTaskWithValue) {
folly::coro::Baton baton;
auto f = [&]() -> folly::coro::Task<std::string> {
co_await baton;
co_return "hello";
};
bool completed = false;
auto run = [&]() -> folly::coro::Task<void> {
auto [index, result] = co_await folly::coro::collectAny(f());
CHECK_EQ("hello", result.value());
CHECK_EQ(0, index);
completed = true;
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
executor.drain();
CHECK(!completed);
baton.post();
// Posting the baton should have just scheduled the 'f()' coroutine
// for resumption on the executor but should not have executed
// until we drain the executor again.
CHECK(!completed);
executor.drain();
CHECK(completed);
CHECK(future.isReady());
}
TEST_F(CollectAnyTest, OneVoidTask) {
bool completed = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
// Checks that the task actually runs and that 'void' results are
// promoted to folly::Unit when placed in a tuple.
std::pair<std::size_t, folly::Try<void>> result =
co_await folly::coro::collectAny([&]() -> folly::coro::Task<void> {
completed = true;
co_return;
}());
(void)result;
}());
CHECK(completed);
}
TEST_F(CollectAnyTest, CollectAnyDoesntCompleteUntilAllTasksComplete) {
folly::coro::Baton baton1;
folly::coro::Baton baton2;
bool task1Started = false;
bool task2Started = false;
bool complete = false;
constexpr std::size_t taskTerminatingExpectedIndex = 1;
auto run = [&]() -> folly::coro::Task<void> {
auto [index, result] = co_await folly::coro::collectAny(
[&]() -> folly::coro::Task<int> {
task1Started = true;
co_await baton1;
co_return 42;
}(),
[&]() -> folly::coro::Task<int> {
task2Started = true;
co_await baton2;
co_return 314;
}());
complete = true;
CHECK_EQ(taskTerminatingExpectedIndex, index);
CHECK_EQ(result.value(), 314);
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
CHECK(!task1Started);
CHECK(!task2Started);
executor.drain();
CHECK(task1Started);
CHECK(task2Started);
CHECK(!complete);
baton2.post();
executor.drain();
CHECK(!complete);
baton1.post();
executor.drain();
CHECK(complete);
CHECK(future.isReady());
}
TEST_F(CollectAnyTest, ThrowsFirstError) {
bool caughtException = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
bool throwError = true;
// Child tasks are started in-order.
// The first task will reschedule itself onto the executor.
// The second task will fail immediately and will be the first
// task to fail.
// Then the third and first tasks will fail.
// As the second task failed first we should see its exception
// propagate out of collectAny().
auto [index, result] = co_await folly::coro::collectAny(
[&]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
if (throwError) {
throw ErrorA{};
}
co_return 1;
}(),
[&]() -> folly::coro::Task<int> {
if (throwError) {
throw ErrorB{};
}
co_return 2;
}(),
[&]() -> folly::coro::Task<int> {
if (throwError) {
throw ErrorC{};
}
co_return 3;
}());
CHECK_EQ(1, index);
try {
result.value();
CHECK(false);
} catch (const ErrorB&) {
caughtException = true;
}
}());
CHECK(caughtException);
}
TEST_F(CollectAnyTest, CollectAnyCancelsSubtasksWhenASubtaskCompletes) {
using namespace std::chrono_literals;
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto start = std::chrono::steady_clock::now();
auto [index, result] = co_await folly::coro::collectAny(
[]() -> folly::coro::Task<int> {
co_await sleepThatShouldBeCancelled(10s);
co_return 42;
}(),
[]() -> folly::coro::Task<int> {
co_await sleepThatShouldBeCancelled(5s);
co_return 314;
}(),
[]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
throw ErrorA{};
}());
CHECK_EQ(2, index);
try {
result.value();
CHECK(false);
} catch (const ErrorA&) {
}
auto end = std::chrono::steady_clock::now();
CHECK((end - start) < 1s);
}());
}
TEST_F(CollectAnyTest, CollectAnyCancelsSubtasksWhenParentTaskCancelled) {
using namespace std::chrono_literals;
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto start = std::chrono::steady_clock::now();
folly::CancellationSource cancelSource;
try {
auto [index, result] = co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAny(
[&]() -> folly::coro::Task<int> {
co_await sleepThatShouldBeCancelled(10s);
co_return 42;
}(),
[&]() -> folly::coro::Task<int> {
co_await sleepThatShouldBeCancelled(5s);
co_return 314;
}(),
[&]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
co_await folly::coro::co_reschedule_on_current_executor;
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
co_await sleepThatShouldBeCancelled(15s);
co_return 123;
}()));
CHECK(false);
} catch (const folly::OperationCancelled&) {
auto end = std::chrono::steady_clock::now();
CHECK((end - start) < 1s);
}
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment