Commit 8dfce3ec authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Make coro::sleep throw when cancelled

Summary: We shouldn't be sleeping less that requested and return silently.

Reviewed By: lewissbaker

Differential Revision: D24633977

fbshipit-source-id: 71ef422f0f72747bc19ac8491fdb8b148598d499
parent 244b6f44
......@@ -123,12 +123,11 @@ class AsyncScope {
try {
co_await std::move(awaitable);
} catch (const std::exception& e) {
} catch (const OperationCancelled&) {
} catch (...) {
LOG(DFATAL)
<< "Unhandled exception thrown from task added to AsyncScope: "
<< e.what();
} catch (...) {
LOG(DFATAL) << "Unhandled exception thrown from task added to AsyncScope";
<< folly::exceptionStr(std::current_exception());
}
}
......
......@@ -24,17 +24,36 @@ namespace folly {
namespace coro {
inline Task<void> sleep(Duration d, Timekeeper* tk) {
bool cancelled{false};
folly::coro::Baton baton;
auto future =
folly::futures::sleep(d, tk).toUnsafeFuture().ensure([&]() noexcept {
Try<Unit> result;
auto future = folly::futures::sleep(d, tk).toUnsafeFuture();
future.setCallback_(
[&result, &baton](Executor::KeepAlive<>&&, Try<Unit>&& t) {
result = std::move(t);
baton.post();
});
CancellationCallback cancelCallback(
co_await co_current_cancellation_token, [&]() noexcept {
future.cancel();
});
co_await baton;
{
CancellationCallback cancelCallback(
co_await co_current_cancellation_token, [&]() noexcept {
cancelled = true;
future.cancel();
});
co_await baton;
}
if (cancelled) {
co_yield co_error(OperationCancelled());
}
co_yield co_result(std::move(result));
}
inline Task<void> sleepReturnEarlyOnCancel(Duration d, Timekeeper* tk) {
auto result = co_await co_awaitTry(sleep(d, tk));
if (result.hasException<OperationCancelled>()) {
co_return;
}
co_yield co_result(std::move(result));
}
} // namespace coro
......
......@@ -24,10 +24,16 @@ namespace coro {
/// Return a task that, when awaited, will sleep for the specified duration.
///
/// May complete sooner that the specified duration if cancellation is requested
/// Throws folly::OperationCancelled if cancellation is requested
/// on the awaiting coroutine's associated CancellationToken.
Task<void> sleep(Duration d, Timekeeper* tk = nullptr);
/// Return a task that, when awaited, will sleep for the specified duration.
///
/// May complete sooner that the specified duration if cancellation is
/// requested on the awaiting coroutine's associated CancellationToken.
Task<void> sleepReturnEarlyOnCancel(Duration d, Timekeeper* tk = nullptr);
} // namespace coro
} // namespace folly
......
......@@ -35,6 +35,11 @@
#include <string>
#include <vector>
folly::coro::Task<void> sleepThatShouldBeCancelled(
std::chrono::milliseconds dur) {
EXPECT_THROW(co_await folly::coro::sleep(dur), folly::OperationCancelled);
}
class CollectAllTest : public testing::Test {};
TEST_F(CollectAllTest, WithNoArgs) {
......@@ -289,11 +294,11 @@ TEST_F(CollectAllTest, CollectAllCancelsSubtasksWhenParentTaskCancelled) {
cancelSource.getToken(),
folly::coro::collectAll(
[&]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(10s);
co_await sleepThatShouldBeCancelled(10s);
co_return 42;
}(),
[&]() -> folly::coro::Task<float> {
co_await folly::coro::sleep(5s);
co_await sleepThatShouldBeCancelled(5s);
co_return 3.14f;
}(),
[&]() -> folly::coro::Task<void> {
......@@ -507,11 +512,11 @@ TEST_F(CollectAllTryTest, CollectAllCancelsSubtasksWhenParentTaskCancelled) {
cancelSource.getToken(),
folly::coro::collectAllTry(
[&]() -> folly::coro::Task<int> {
co_await folly::coro::sleep(10s);
co_await sleepThatShouldBeCancelled(10s);
co_return 42;
}(),
[&]() -> folly::coro::Task<float> {
co_await folly::coro::sleep(5s);
co_await sleepThatShouldBeCancelled(5s);
co_return 3.14f;
}(),
[&]() -> folly::coro::Task<void> {
......@@ -930,7 +935,7 @@ TEST_F(CollectAllTryRangeTest, SubtasksCancelledWhenParentTaskCancelled) {
auto generateTasks = [&]()
-> folly::coro::Generator<folly::coro::Task<void>&&> {
for (int i = 0; i < 10; ++i) {
co_yield folly::coro::sleep(10s);
co_yield sleepThatShouldBeCancelled(10s);
}
co_yield[&]()->folly::coro::Task<void> {
......@@ -1217,8 +1222,8 @@ TEST_F(CollectAllWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
bool consumedAllTasks = false;
auto generateTasks = [&]()
-> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::sleep(10s);
co_yield folly::coro::sleep(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield[&]()->folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
......@@ -1232,8 +1237,8 @@ TEST_F(CollectAllWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
}
();
co_yield folly::coro::sleep(10s);
co_yield folly::coro::sleep(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield sleepThatShouldBeCancelled(10s);
consumedAllTasks = true;
};
......@@ -1423,8 +1428,8 @@ TEST_F(CollectAllTryWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
bool consumedAllTasks = false;
auto generateTasks = [&]()
-> folly::coro::Generator<folly::coro::Task<void>&&> {
co_yield folly::coro::sleep(10s);
co_yield folly::coro::sleep(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield[&]()->folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
......@@ -1438,8 +1443,8 @@ TEST_F(CollectAllTryWindowedTest, SubtasksCancelledWhenParentTaskCancelled) {
}
();
co_yield folly::coro::sleep(10s);
co_yield folly::coro::sleep(10s);
co_yield sleepThatShouldBeCancelled(10s);
co_yield sleepThatShouldBeCancelled(10s);
consumedAllTasks = true;
};
......
......@@ -609,19 +609,21 @@ TEST_F(CoroTest, CancellableSleep) {
CancellationSource cancelSrc;
auto start = steady_clock::now();
coro::blockingWait([&]() -> coro::Task<void> {
co_await coro::collectAll(
[&]() -> coro::Task<void> {
co_await coro::co_withCancellation(
cancelSrc.getToken(), coro::sleep(10s));
}(),
[&]() -> coro::Task<void> {
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
cancelSrc.requestCancellation();
}());
}());
EXPECT_THROW(
coro::blockingWait([&]() -> coro::Task<void> {
co_await coro::collectAll(
[&]() -> coro::Task<void> {
co_await coro::co_withCancellation(
cancelSrc.getToken(), coro::sleep(10s));
}(),
[&]() -> coro::Task<void> {
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
cancelSrc.requestCancellation();
}());
}()),
OperationCancelled);
auto end = steady_clock::now();
CHECK((end - start) < 1s);
}
......
......@@ -147,9 +147,8 @@ TEST(Timeout, CancelParent) {
cancelSource.getToken(),
coro::timeout(
[]() -> coro::Task<bool> {
co_await coro::sleep(5s);
co_return(co_await coro::co_current_cancellation_token)
.isCancellationRequested();
auto result = co_await coro::co_awaitTry(coro::sleep(5s));
co_return result.hasException<OperationCancelled>();
}(),
10s)),
[&]() -> coro::Task<void> {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment