Commit 70f914ba authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Add cancellation support for folly::coro::UnboundedQueue::dequeue()

Summary:
Adds the ability to request cancellation of the folly::coro::UnboundedQueue's
dequeue() operation by injecting a CancellationToken using co_withCancellation().

Most of the work for supporting this was implemented in the change to
Semaphore::co_wait() to support cancellation.

This diff just adds some tests for cancellation of UnboundedQueue::dequeue()
and provides a more efficient code-path for the cancellation-case that does
not involve throwing an exception.

Reviewed By: yfeldblum

Differential Revision: D21555325

fbshipit-source-id: 7fbae8aa426b40bbea4d0d6dec85df4bad813733
parent 955b416f
...@@ -35,9 +35,14 @@ class UnboundedQueue { ...@@ -35,9 +35,14 @@ class UnboundedQueue {
} }
folly::coro::Task<T> dequeue() { folly::coro::Task<T> dequeue() {
co_await sem_.co_wait(); folly::Try<void> result = co_await folly::coro::co_awaitTry(sem_.co_wait());
if (result.hasException()) {
co_yield co_error(std::move(result).exception());
}
co_return queue_.dequeue(); co_return queue_.dequeue();
} }
folly::coro::Task<void> dequeue(T& out) { folly::coro::Task<void> dequeue(T& out) {
co_await sem_.co_wait(); co_await sem_.co_wait();
queue_.dequeue(out); queue_.dequeue(out);
...@@ -46,6 +51,7 @@ class UnboundedQueue { ...@@ -46,6 +51,7 @@ class UnboundedQueue {
folly::Optional<T> try_dequeue() { folly::Optional<T> try_dequeue() {
return queue_.try_dequeue(); return queue_.try_dequeue();
} }
bool try_dequeue(T& out) { bool try_dequeue(T& out) {
return queue_.try_dequeue(out); return queue_.try_dequeue(out);
} }
...@@ -53,6 +59,7 @@ class UnboundedQueue { ...@@ -53,6 +59,7 @@ class UnboundedQueue {
bool empty() { bool empty() {
return queue_.empty(); return queue_.empty();
} }
size_t size() { size_t size() {
return queue_.size(); return queue_.size();
} }
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
#if FOLLY_HAS_COROUTINES #if FOLLY_HAS_COROUTINES
#include <folly/CancellationToken.h>
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/UnboundedQueue.h> #include <folly/experimental/coro/UnboundedQueue.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
...@@ -150,4 +152,42 @@ TEST(UnboundedQueueTest, EnqueueDequeMPMC) { ...@@ -150,4 +152,42 @@ TEST(UnboundedQueueTest, EnqueueDequeMPMC) {
EXPECT_TRUE(queue.empty()); EXPECT_TRUE(queue.empty());
} }
TEST(UnboundedQueueTest, CancelledDequeueThrowsOperationCancelled) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
// Cancellation currently only supported on SingleConsumer variants of
// UnboundedQueue.
folly::coro::UnboundedQueue<int> queue;
folly::CancellationSource cancelSource;
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
EXPECT_THROW(
(co_await folly::coro::co_withCancellation(
cancelSource.getToken(), queue.dequeue())),
folly::OperationCancelled);
}(),
[&]() -> folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
}());
}());
}
TEST(UnboundedQueueTest, CancelledDequeueCompletesNormallyIfAnItemIsAvailable) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
// Cancellation currently only supported on SingleConsumer variants of
// UnboundedQueue.
folly::coro::UnboundedQueue<int> queue;
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
queue.enqueue(123);
int result = co_await folly::coro::co_withCancellation(
cancelSource.getToken(), queue.dequeue());
EXPECT_EQ(123, result);
}());
}
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment