Commit 5dff2932 authored by Rob Lyerly's avatar Rob Lyerly Committed by Facebook GitHub Bot

Add collectAnyNoDiscard()

Summary:
D28945040 added `collectAny()` which early-returns when any of the SemiAwaitables produces a value (or exception).  There's the potential for discarded results, however - multiple SemiAwaitables can produce results depending on whether they're at a cancellation point and when cancellation is signaled.

This diff adds a variant `collectAnyNoDiscard()` that signals cancellation when any SemiAwaitable finishes and returns *all* results that completed.  It produces a tuple of optional values from the SemiAwaitables (or `std::nullopt` if it was canceled).

Reviewed By: iahs

Differential Revision: D29240725

fbshipit-source-id: 3e664339e8692cbb9114138a96345cf9f9d5cb0b
parent 28858c2e
......@@ -320,6 +320,29 @@ auto collectAnyImpl(
co_return firstCompletion;
}
template <typename... SemiAwaitables, size_t... Indices>
auto collectAnyNoDiscardImpl(
std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
-> folly::coro::Task<
std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
co_await folly::coro::collectAll(folly::coro::co_withCancellation(
cancelToken, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto result = co_await folly::coro::co_awaitTry(
std::forward<SemiAwaitables>(awaitables));
cancelSource.requestCancellation();
std::get<Indices>(results) = std::move(result);
}))...);
co_return results;
}
} // namespace detail
template <typename... SemiAwaitables>
......@@ -1017,6 +1040,15 @@ auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
static_cast<SemiAwaitables&&>(awaitables)...);
}
template <typename... SemiAwaitables>
auto collectAnyNoDiscard(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>> {
return detail::collectAnyNoDiscardImpl(
std::make_index_sequence<sizeof...(SemiAwaitables)>{},
static_cast<SemiAwaitables&&>(awaitables)...);
}
} // namespace coro
} // namespace folly
......
......@@ -406,6 +406,33 @@ auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
SemiAwaitable,
SemiAwaitables...>>>>;
///////////////////////////////////////////////////////////////////////////
// collectAnyNoDiscard(SemiAwaitable<Ts>...) ->
// SemiAwaitable<std::tuple<folly::Try<Ts>...>>
//
// The collectAnyNoDiscard() function is similar to collectAny() in that it
// co_awaits multiple SemiAwaitables and cancels any outstanding operations once
// at least one has finished. Unlike collectAny(), it returns results from *all*
// SemiAwaitables, including folly::OperationCancelled for operations that were
// cancelled.
//
// collectAnyNoDiscard() is built on top of collectAll(), be aware of the
// coroutine starting behavior described in collectAll() documentation.
//
// The returned tuple contains the results of all the SemiAwaitables.
//
// Example:
// folly::coro::Task<Foo> getDataOneWay();
// folly::coro::Task<Bar> getDataAnotherWay();
//
// std::tuple<folly::Try<Foo>, folly::Try<Bar>> result = co_await
// folly::coro::collectAny(getDataOneWay(), getDataAnotherWay());
//
template <typename... SemiAwaitables>
auto collectAnyNoDiscard(SemiAwaitables&&... awaitables)
-> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
remove_cvref_t<SemiAwaitables>>...>>;
} // namespace coro
} // namespace folly
......
......@@ -1867,4 +1867,195 @@ TEST_F(CollectAnyTest, CollectAnyCancelsSubtasksWhenParentTaskCancelled) {
}());
}
class CollectAnyNoDiscardTest : public testing::Test {};
TEST_F(CollectAnyNoDiscardTest, OneTask) {
auto value = [&]() -> folly::coro::Task<std::string> { co_return "hello"; };
auto throws = [&]() -> folly::coro::Task<std::string> {
co_yield folly::coro::co_error(ErrorA{});
};
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto [result] = co_await folly::coro::collectAnyNoDiscard(value());
CHECK(result.hasValue());
CHECK_EQ("hello", result.value());
std::tie(result) = co_await folly::coro::collectAnyNoDiscard(throws());
CHECK(result.hasException<ErrorA>());
}());
}
TEST_F(CollectAnyNoDiscardTest, MultipleTasksWithValues) {
std::atomic_size_t count{0};
// Busy wait until all threads have started before returning
auto busyWait = [](std::atomic_size_t& count,
size_t num) -> folly::coro::Task<void> {
count.fetch_add(1);
while (count.load() < num) {
// Need to yield because collectAnyNoDiscard() won't start the second and
// third coroutines until the first one gets to a suspend point
co_await folly::coro::co_reschedule_on_current_executor;
}
};
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto [first, second, third] = co_await folly::coro::collectAnyNoDiscard(
busyWait(count, 3), busyWait(count, 3), busyWait(count, 3));
CHECK(first.hasValue());
CHECK(second.hasValue());
CHECK(third.hasValue());
}());
}
TEST_F(CollectAnyNoDiscardTest, OneVoidTask) {
bool completed = false;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
// Checks that the task actually runs and that 'void' results are
// promoted to folly::Unit when placed in a tuple.
std::tuple<folly::Try<void>> result =
co_await folly::coro::collectAnyNoDiscard(
[&]() -> folly::coro::Task<void> {
completed = true;
co_return;
}());
(void)result;
}());
CHECK(completed);
}
TEST_F(CollectAnyNoDiscardTest, DoesntCompleteUntilAllTasksComplete) {
folly::coro::Baton baton1;
folly::coro::Baton baton2;
bool task1Started = false;
bool task2Started = false;
bool complete = false;
auto run = [&]() -> folly::coro::Task<void> {
auto [first, second] = co_await folly::coro::collectAnyNoDiscard(
[&]() -> folly::coro::Task<int> {
task1Started = true;
co_await baton1;
co_return 42;
}(),
[&]() -> folly::coro::Task<int> {
task2Started = true;
co_await baton2;
co_return 314;
}());
complete = true;
CHECK(first.hasValue());
CHECK_EQ(first.value(), 42);
CHECK(second.hasValue());
CHECK_EQ(second.value(), 314);
};
folly::ManualExecutor executor;
auto future = run().scheduleOn(&executor).start();
CHECK(!task1Started);
CHECK(!task2Started);
executor.drain();
CHECK(task1Started);
CHECK(task2Started);
CHECK(!complete);
baton2.post();
executor.drain();
CHECK(!complete);
baton1.post();
executor.drain();
CHECK(complete);
CHECK(future.isReady());
}
TEST_F(CollectAnyNoDiscardTest, ThrowsAllErrors) {
using namespace std::chrono_literals;
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
// Child tasks are started in-order.
// The first task will reschedule itself onto the executor.
// The second task will fail immediately and will be the first task to fail.
// Then the third and first tasks will fail.
// We should see all exceptions propagate out of collectAnyNoDiscard().
auto [first, second, third] = co_await folly::coro::collectAnyNoDiscard(
[&]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
co_yield folly::coro::co_error(ErrorA{});
}(),
[&]() -> folly::coro::Task<int> {
co_yield folly::coro::co_error(ErrorB{});
}(),
[&]() -> folly::coro::Task<int> {
co_yield folly::coro::co_error(ErrorC{});
}());
CHECK(first.hasException<ErrorA>());
CHECK(second.hasException<ErrorB>());
CHECK(third.hasException<ErrorC>());
}());
}
TEST_F(CollectAnyNoDiscardTest, CancelSubtasksWhenASubtaskCompletes) {
using namespace std::chrono_literals;
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto start = std::chrono::steady_clock::now();
auto [first, second, third] = co_await folly::coro::collectAnyNoDiscard(
[]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(10s);
co_return 42;
}(),
[]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(5s);
co_return 314;
}(),
[]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
co_yield folly::coro::co_error(ErrorA{});
}());
CHECK(first.hasException<folly::OperationCancelled>());
CHECK(second.hasException<folly::OperationCancelled>());
CHECK(third.hasException<ErrorA>());
auto end = std::chrono::steady_clock::now();
CHECK((end - start) < 1s);
}());
}
TEST_F(CollectAnyNoDiscardTest, CancelSubtasksWhenParentTaskCancelled) {
using namespace std::chrono_literals;
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto start = std::chrono::steady_clock::now();
folly::CancellationSource cancelSource;
auto [first, second, third] = co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAnyNoDiscard(
[&]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(10s);
co_return 42;
}(),
[&]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(5s);
co_return 314;
}(),
[&]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
co_await folly::coro::co_reschedule_on_current_executor;
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
co_await folly::coro::sleep(15s);
co_return 123;
}()));
auto end = std::chrono::steady_clock::now();
CHECK((end - start) < 1s);
CHECK(first.hasException<folly::OperationCancelled>());
CHECK(second.hasException<folly::OperationCancelled>());
CHECK(third.hasException<folly::OperationCancelled>());
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment