Commit 955b416f authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Make folly::fibers::Semaphore::co_wait() cancellable

Summary:
The `Semaphore::co_await()` method now response to requests for cancellation
communicated through the awaiting context's CancellationToken.

This allows interrupting a wait-operation if the result is no longer required,
in which case the co_wait() task completes with the OperationCancelled exception.

Reviewed By: yfeldblum

Differential Revision: D21655946

fbshipit-source-id: 8c7f862580ddebd2b4d3eb3cc3ab5e01ad5ee289
parent f8a3d164
......@@ -43,7 +43,7 @@ struct S {
};
coro::Task<S> taskS() {
co_return {};
co_return{};
}
coro::Task<int> task42() {
......@@ -515,6 +515,72 @@ TEST_F(CoroTest, Semaphore) {
}
}
TEST_F(CoroTest, SemaphoreWaitWhenCancellationAlreadyRequested) {
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
// Run some logic while in an already-cancelled state.
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
folly::fibers::Semaphore sem{1};
// If in a signalled state then should complete normally
co_await sem.co_wait();
// And the semaphore should no longer be in the signalled state.
// But if not signalled then should complete with cancellation
// immediately.
EXPECT_THROW(co_await sem.co_wait(), folly::OperationCancelled);
}());
}());
}
TEST_F(CoroTest, CancelOutstandingSemaphoreWait) {
struct ExpectedError : std::exception {};
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::fibers::Semaphore sem{0};
try {
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<> {
EXPECT_THROW(co_await sem.co_wait(), OperationCancelled);
}(),
[]() -> folly::coro::Task<> {
co_await folly::coro::co_reschedule_on_current_executor;
// Completing the second task with an error will cause
// collectAll() to request cancellation of the other task
// whcih should reqeust cancellation of sem.co_wait().
co_yield folly::coro::co_error(ExpectedError{});
}());
} catch (ExpectedError) {
}
}());
}
TEST_F(CoroTest, CancelOneSemaphoreWaitDoesNotAffectOthers) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::fibers::Semaphore sem{0};
folly::CancellationSource cancelSource;
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<> {
EXPECT_THROW(
(co_await folly::coro::co_withCancellation(
cancelSource.getToken(), sem.co_wait())),
OperationCancelled);
}(),
[&]() -> folly::coro::Task<> { co_await sem.co_wait(); }(),
[&]() -> folly::coro::Task<> {
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
sem.signal();
}());
}());
}
TEST_F(CoroTest, FutureTry) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
{
......@@ -619,5 +685,4 @@ TEST(Coro, CoThrow) {
}()),
ExpectedException);
}
#endif
......@@ -129,7 +129,37 @@ coro::Task<void> Semaphore::co_wait() {
// If waitSlow fails it is because the token is non-zero by the time
// the lock is taken, so we can just continue round the loop
if (waitSlow(waiter)) {
co_await waiter.baton;
bool cancelled = false;
{
const auto& ct = co_await folly::coro::co_current_cancellation_token;
folly::CancellationCallback cb{
ct, [&] {
{
auto waitListLock = waitList_.wlock();
auto& waitList = *waitListLock;
if (!waiter.hook_.is_linked()) {
// Already dequeued by signalSlow()
return;
}
cancelled = true;
waitList.erase(waitList.iterator_to(waiter));
}
waiter.baton.post();
}};
co_await waiter.baton;
}
// Check 'cancelled' flag only after deregistering the callback so we're
// sure that we aren't reading it concurrently with a potential write
// from a thread requesting cancellation.
if (cancelled) {
co_yield folly::coro::co_error(folly::OperationCancelled{});
}
co_return;
}
oldVal = tokens_.load(std::memory_order_acquire);
......
......@@ -24,6 +24,8 @@
#include <folly/experimental/coro/Task.h>
#endif
#include <deque>
namespace folly {
namespace fibers {
......@@ -74,6 +76,19 @@ class Semaphore {
/*
* Wait for capacity in the semaphore.
*
* Note that this wait-operation can be cancelled by requesting cancellation
* on the awaiting coroutine's associated CancellationToken.
* If the operation is successfully cancelled then it will complete with
* an error of type folly::OperationCancelled.
*
* Note that requesting cancellation of the operation will only have an
* effect if the operation does not complete synchronously (ie. was not
* already in a signalled state).
*
* If the semaphore was already in a signalled state prior to awaiting the
* returned Task then the operation will complete successfully regardless
* of whether cancellation was requested.
*/
coro::Task<void> co_wait();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment