Commit 835de793 authored by Shai Szulanski's avatar Shai Szulanski Committed by Facebook GitHub Bot

Use CancellationToken::merge in all collect functions

Summary: This ensures the token passed to child tasks continues to be signalled when the token applied to collect* is even after collect returns

Reviewed By: ispeters

Differential Revision: D30542448

fbshipit-source-id: b286175f4a8c5e5b3e2e628c6611f368ac79c1b6
parent 02a4b200
......@@ -131,10 +131,8 @@ auto collectAllImpl(
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
CancellationCallback cancelCallback(parentCancelToken, [&]() noexcept {
cancelSource.requestCancellation();
});
const CancellationToken cancelToken = cancelSource.getToken();
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
exception_wrapper firstException;
std::atomic<bool> anyFailures{false};
......@@ -156,8 +154,9 @@ auto collectAllImpl(
}
} catch (...) {
anyFailures.store(true, std::memory_order_relaxed);
if (!cancelSource.requestCancellation()) {
// This was the first failure, remember it's error.
if (!cancelSource.requestCancellation() &&
!parentCancelToken.isCancellationRequested()) {
// This was the first failure, remember its error.
firstException = exception_wrapper{std::current_exception()};
}
}
......@@ -324,11 +323,9 @@ auto collectAnyNoDiscardImpl(
std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
-> folly::coro::Task<
std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
const CancellationToken cancelToken = CancellationToken::merge(
co_await co_current_cancellation_token, cancelSource.getToken());
std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
co_await folly::coro::collectAll(folly::coro::co_withCancellation(
......@@ -371,12 +368,9 @@ auto collectAllRange(InputRange awaitables)
-> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>> {
const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
const CancellationSource cancelSource;
CancellationCallback cancelCallback(
co_await co_current_cancellation_token,
[&]() noexcept { cancelSource.requestCancellation(); });
const CancellationToken cancelToken = cancelSource.getToken();
const CancellationToken cancelToken = CancellationToken::merge(
co_await co_current_cancellation_token, cancelSource.getToken());
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
......@@ -464,12 +458,9 @@ template <
int>>
auto collectAllRange(InputRange awaitables) -> folly::coro::Task<void> {
const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
CancellationSource cancelSource;
CancellationCallback cancelCallback(
co_await co_current_cancellation_token,
[&]() noexcept { cancelSource.requestCancellation(); });
const CancellationToken cancelToken = cancelSource.getToken();
const CancellationSource cancelSource;
const CancellationToken cancelToken = CancellationToken::merge(
co_await co_current_cancellation_token, cancelSource.getToken());
exception_wrapper firstException;
std::atomic<bool> anyFailures = false;
......@@ -611,11 +602,9 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
assert(maxConcurrency > 0);
const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
const folly::CancellationSource cancelSource;
folly::CancellationCallback cancelCallback(
co_await folly::coro::co_current_cancellation_token,
[&]() noexcept { cancelSource.requestCancellation(); });
const folly::CancellationToken cancelToken = cancelSource.getToken();
const CancellationSource cancelSource;
const CancellationToken cancelToken = CancellationToken::merge(
co_await co_current_cancellation_token, cancelSource.getToken());
exception_wrapper firstException;
std::atomic<bool> anyFailures = false;
......@@ -735,18 +724,19 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
const folly::CancellationSource cancelSource;
folly::CancellationCallback cancelCallback(
co_await folly::coro::co_current_cancellation_token,
[&]() noexcept { cancelSource.requestCancellation(); });
const folly::CancellationToken cancelToken = cancelSource.getToken();
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
exception_wrapper firstException;
std::atomic<bool> anyFailures = false;
auto trySetFirstException = [&](exception_wrapper&& e) noexcept {
anyFailures.store(true, std::memory_order_relaxed);
if (!cancelSource.requestCancellation()) {
if (!cancelSource.requestCancellation() &&
!parentCancelToken.isCancellationRequested()) {
// This is first entity to request cancellation.
firstException = std::move(e);
}
......
......@@ -313,6 +313,33 @@ TEST_F(CollectAllTest, CollectAllCancelsSubtasksWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto task = folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), folly::coro::collectAll(std::move(task)));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
namespace {
class TestRequestData : public folly::RequestData {
......@@ -531,6 +558,33 @@ TEST_F(CollectAllTryTest, CollectAllCancelsSubtasksWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllTryTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto task = folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), folly::coro::collectAllTry(std::move(task)));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
TEST_F(CollectAllTryTest, KeepsRequestContextOfChildTasksIndependent) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::RequestContextScopeGuard requestScope;
......@@ -730,7 +784,7 @@ TEST_F(CollectAllRangeTest, SubtasksCancelledWhenParentTaskCancelled) {
auto generateTasks =
[&]() -> folly::coro::Generator<folly::coro::Task<void>&&> {
for (int i = 0; i < 10; ++i) {
co_yield folly::coro::sleep(10s);
co_yield folly::coro::sleepReturnEarlyOnCancel(10s);
}
co_yield [&]() -> folly::coro::Task<void> {
......@@ -757,6 +811,36 @@ TEST_F(CollectAllRangeTest, SubtasksCancelledWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllRangeTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto generateTasks =
[&]() -> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
};
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), folly::coro::collectAllRange(generateTasks()));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
TEST_F(CollectAllRangeTest, KeepsRequestContextOfChildTasksIndependent) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::RequestContextScopeGuard requestScope;
......@@ -1096,6 +1180,37 @@ TEST_F(CollectAllTryRangeTest, SubtasksCancelledWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllTryRangeTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto generateTasks =
[&]() -> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
};
co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAllTryRange(generateTasks()));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
TEST_F(CollectAllTryRangeTest, KeepsRequestContextOfChildTasksIndependent) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::RequestContextScopeGuard requestScope;
......@@ -1465,6 +1580,37 @@ TEST_F(CollectAllWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllWindowedTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto generateTasks =
[&]() -> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
};
co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAllWindowed(generateTasks(), 1));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
TEST_F(CollectAllWindowedTest, KeepsRequestContextOfChildTasksIndependent) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::RequestContextScopeGuard requestScope;
......@@ -1672,6 +1818,37 @@ TEST_F(CollectAllTryWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAllTryWindowedTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto generateTasks =
[&]() -> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
};
co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAllTryWindowed(generateTasks(), 1));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
class CollectAnyTest : public testing::Test {};
TEST_F(CollectAnyTest, OneTaskWithValue) {
......@@ -1894,6 +2071,33 @@ TEST_F(CollectAnyTest, CollectAnyCancelsSubtasksWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAnyTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto task = folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), folly::coro::collectAny(std::move(task)));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
class CollectAnyNoDiscardTest : public testing::Test {};
TEST_F(CollectAnyNoDiscardTest, OneTask) {
......@@ -2085,4 +2289,32 @@ TEST_F(CollectAnyNoDiscardTest, CancelSubtasksWhenParentTaskCancelled) {
}());
}
TEST_F(CollectAnyNoDiscardTest, CancellationTokenRemainsActiveAfterReturn) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::CancellationSource cancelSource;
folly::coro::AsyncScope scope;
folly::coro::Baton baton;
auto task = folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token = co_await folly::coro::co_current_cancellation_token;
auto ex = co_await folly::coro::co_current_executor;
scope.add(
folly::coro::co_withCancellation(
token, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto token =
co_await folly::coro::co_current_cancellation_token;
co_await baton;
EXPECT_TRUE(token.isCancellationRequested());
}))
.scheduleOn(ex));
});
co_await folly::coro::co_withCancellation(
cancelSource.getToken(),
folly::coro::collectAnyNoDiscard(std::move(task)));
cancelSource.requestCancellation();
baton.post();
co_await scope.joinAsync();
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment