Commit ed173626 authored by Cameron Pickett's avatar Cameron Pickett Committed by Facebook GitHub Bot

Build in automatic cancellation support to AsyncScope

Summary: Introduces a CancellableAsyncScope type that automatically adds a CancellationToken to every task added() on the scope.

Reviewed By: yfeldblum

Differential Revision: D28438228

fbshipit-source-id: 2416725360c16f8d95dd5c35997dd624278d0980
parent e47bc9b4
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#pragma once #pragma once
#include <folly/CancellationToken.h>
#include <folly/experimental/coro/Coroutine.h> #include <folly/experimental/coro/Coroutine.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/detail/Barrier.h> #include <folly/experimental/coro/detail/Barrier.h>
...@@ -157,7 +158,7 @@ FOLLY_NOINLINE inline void AsyncScope::add(Awaitable&& awaitable) { ...@@ -157,7 +158,7 @@ FOLLY_NOINLINE inline void AsyncScope::add(Awaitable&& awaitable) {
!joined_ && !joined_ &&
"It is invalid to add() more work after work has been joined"); "It is invalid to add() more work after work has been joined");
anyTasksStarted_.store(true, std::memory_order_relaxed); anyTasksStarted_.store(true, std::memory_order_relaxed);
addImpl((Awaitable &&) awaitable) addImpl(static_cast<Awaitable&&>(awaitable))
.start(&barrier_, FOLLY_ASYNC_STACK_RETURN_ADDRESS()); .start(&barrier_, FOLLY_ASYNC_STACK_RETURN_ADDRESS());
} }
...@@ -172,6 +173,59 @@ inline folly::SemiFuture<folly::Unit> AsyncScope::cleanup() noexcept { ...@@ -172,6 +173,59 @@ inline folly::SemiFuture<folly::Unit> AsyncScope::cleanup() noexcept {
return joinAsync().semi(); return joinAsync().semi();
} }
///////////////////////////////
// A cancellable version of AsyncScope. Work added to this scope will be
// provided a cancellation token for cancelling during join. See
// add() and cancelAndJoinAsync() for more information.
class CancellableAsyncScope {
public:
CancellableAsyncScope() noexcept = default;
// Query the number of tasks added to the scope that have not yet completed.
std::size_t remaining() const noexcept { return scope_.remaining(); }
// Start the specified task/awaitable by co_awaiting it. The awaitable will be
// provided a cancellation token to respond to cancelAndJoinAsync() in the
// future.
//
// Note that cancellation is cooperative, your task must handle cancellation
// in order to have any effect.
//
// See the documentation on AsyncScope::add.
template <typename Awaitable>
void add(Awaitable&& awaitable) {
scope_.add(co_withCancellation(
cancellationSource_.getToken(), static_cast<Awaitable&&>(awaitable)));
}
// Request cancellation for all started tasks that accepted a
// CancellationToken in add().
void requestCancellation() const noexcept {
cancellationSource_.requestCancellation();
}
// Request cancellation then asynchronously wait for all started tasks to
// complete.
//
// Either call this method, _or_ joinAsync() to join the work. It is invalid
// to call both of them.
Task<void> cancelAndJoinAsync() noexcept {
requestCancellation();
co_await joinAsync();
}
// Asynchronously wait for all started tasks to complete without requesting
// cancellation.
//
// Either call this method _or_ cancelAndJoinAsync() to join the
// work. It is invalid to call both of them.
Task<void> joinAsync() noexcept { co_await scope_.joinAsync(); }
private:
folly::CancellationSource cancellationSource_;
AsyncScope scope_;
};
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include <folly/experimental/coro/Baton.h> #include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h> #include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/GtestHelpers.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
...@@ -35,8 +37,7 @@ TEST_F(AsyncScopeTest, ConstructDestruct) { ...@@ -35,8 +37,7 @@ TEST_F(AsyncScopeTest, ConstructDestruct) {
folly::coro::AsyncScope scope; folly::coro::AsyncScope scope;
} }
TEST_F(AsyncScopeTest, AddAndJoin) { CO_TEST_F(AsyncScopeTest, AddAndJoin) {
folly::coro::blockingWait([]() -> folly::coro::Task<> {
std::atomic<int> count = 0; std::atomic<int> count = 0;
auto makeTask = [&]() -> folly::coro::Task<> { auto makeTask = [&]() -> folly::coro::Task<> {
++count; ++count;
...@@ -50,12 +51,10 @@ TEST_F(AsyncScopeTest, AddAndJoin) { ...@@ -50,12 +51,10 @@ TEST_F(AsyncScopeTest, AddAndJoin) {
co_await scope.joinAsync(); co_await scope.joinAsync();
CHECK(count == 100); EXPECT_EQ(count, 100);
}());
} }
TEST_F(AsyncScopeTest, StartChildTasksAfterCleanupStarted) { CO_TEST_F(AsyncScopeTest, StartChildTasksAfterCleanupStarted) {
folly::coro::blockingWait([]() -> folly::coro::Task<> {
folly::coro::AsyncScope scope; folly::coro::AsyncScope scope;
folly::coro::Baton baton; folly::coro::Baton baton;
bool childFinished = false; bool childFinished = false;
...@@ -79,34 +78,119 @@ TEST_F(AsyncScopeTest, StartChildTasksAfterCleanupStarted) { ...@@ -79,34 +78,119 @@ TEST_F(AsyncScopeTest, StartChildTasksAfterCleanupStarted) {
co_return; co_return;
}()); }());
CHECK(childFinished); EXPECT_TRUE(childFinished);
}());
} }
TEST_F(AsyncScopeTest, QueryRemainingCount) { CO_TEST_F(AsyncScopeTest, QueryRemainingCount) {
folly::coro::blockingWait([]() -> folly::coro::Task<> {
folly::coro::Baton baton; folly::coro::Baton baton;
auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; }; auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; };
auto executor = co_await folly::coro::co_current_executor; auto executor = co_await folly::coro::co_current_executor;
folly::coro::AsyncScope scope; folly::coro::AsyncScope scope;
CHECK_EQ(0, scope.remaining()); CO_ASSERT_EQ(0, scope.remaining());
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
scope.add(makeTask().scheduleOn(executor)); scope.add(makeTask().scheduleOn(executor));
} }
CO_ASSERT_EQ(10, scope.remaining());
CHECK_EQ(10, scope.remaining());
baton.post(); baton.post();
co_await scope.joinAsync(); co_await scope.joinAsync();
CO_ASSERT_EQ(0, scope.remaining());
}
struct CancellableAsyncScopeTest : public testing::Test {};
TEST_F(CancellableAsyncScopeTest, ConstructDestruct) {
// Safe to construct/destruct an AsyncScope without calling any methods.
folly::coro::CancellableAsyncScope scope;
}
CO_TEST_F(CancellableAsyncScopeTest, AddAndJoin) {
std::atomic<int> count = 0;
auto makeTask = [&]() -> folly::coro::Task<> {
++count;
co_return;
};
folly::coro::CancellableAsyncScope scope;
for (int i = 0; i < 100; ++i) {
scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
}
co_await scope.joinAsync();
EXPECT_EQ(count, 100);
}
CO_TEST_F(CancellableAsyncScopeTest, StartChildTasksAfterCleanupStarted) {
folly::coro::CancellableAsyncScope scope;
folly::coro::Baton baton;
bool childFinished = false;
auto executor = co_await folly::coro::co_current_executor;
CHECK_EQ(0, scope.remaining()); auto childTask = [&]() -> folly::coro::Task<> {
co_await folly::coro::co_reschedule_on_current_executor;
childFinished = true;
};
auto parentTask = [&]() -> folly::coro::Task<> {
co_await baton;
scope.add(childTask().scheduleOn(executor));
};
scope.add(parentTask().scheduleOn(executor));
co_await folly::coro::collectAll(
scope.joinAsync(), [&]() -> folly::coro::Task<> {
baton.post();
co_return;
}()); }());
EXPECT_TRUE(childFinished);
}
CO_TEST_F(CancellableAsyncScopeTest, QueryRemainingCount) {
folly::coro::Baton baton;
auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; };
auto executor = co_await folly::coro::co_current_executor;
folly::coro::CancellableAsyncScope scope;
CO_ASSERT_EQ(0, scope.remaining());
for (int i = 0; i < 10; ++i) {
scope.add(makeTask().scheduleOn(executor));
}
CO_ASSERT_EQ(10, scope.remaining());
baton.post();
co_await scope.joinAsync();
CO_ASSERT_EQ(0, scope.remaining());
}
CO_TEST_F(CancellableAsyncScopeTest, CancelSuspendedWork) {
using namespace std::chrono_literals;
auto makeTask = [&]() -> folly::coro::Task<> {
co_await folly::coro::sleep(300s);
};
folly::coro::CancellableAsyncScope scope;
CO_ASSERT_EQ(0, scope.remaining());
for (int i = 0; i < 10; ++i) {
scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
}
CO_ASSERT_EQ(10, scope.remaining());
// Although we are suspended while sleeping, cancelAndJoinAsync will handle
// this correctly.
co_await scope.cancelAndJoinAsync();
CO_ASSERT_EQ(0, scope.remaining());
} }
#endif // FOLLY_HAS_COROUTINES #endif // FOLLY_HAS_COROUTINES
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment