Commit 1a0ddc9c authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add cancellable folly::coro::sleep()

Summary:
Adds folly::coro::sleep() function as an alternative to using folly::futures::sleep().

The coro version returns a Task<void> and supports cancellation of the sleep operation when awaited within another Task.

Reviewed By: kirkshoop

Differential Revision: D16816328

fbshipit-source-id: 46bc4ee2475e6bd0bdfd0f7f30f3e0f1ea54d4d5
parent 14959740
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/CancellationToken.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/CurrentExecutor.h>
namespace folly {
namespace coro {
inline Task<void> sleep(Duration d, Timekeeper* tk) {
folly::coro::Baton baton;
auto future =
folly::futures::sleep(d, tk).toUnsafeFuture().ensure([&]() noexcept {
baton.post();
});
CancellationCallback cancelCallback(
co_await co_current_cancellation_token, [&]() noexcept {
future.cancel();
});
co_await baton;
}
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/coro/Task.h>
#include <folly/futures/Future.h>
namespace folly {
namespace coro {
/// Return a task that, when awaited, will sleep for the specified duration.
///
/// May complete sooner that the specified duration if cancellation is requested
/// on the the awaiting coroutine's associated CancellationToken.
Task<void> sleep(Duration d, Timekeeper* tk = nullptr);
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Sleep-inl.h>
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <folly/experimental/coro/AsyncGenerator.h> #include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Baton.h> #include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
...@@ -179,15 +180,15 @@ TEST(AsyncGenerator, ProduceResultsAsynchronously) { ...@@ -179,15 +180,15 @@ TEST(AsyncGenerator, ProduceResultsAsynchronously) {
auto makeGenerator = [&]() -> folly::coro::AsyncGenerator<int> { auto makeGenerator = [&]() -> folly::coro::AsyncGenerator<int> {
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
co_await folly::futures::sleep(1ms); co_await folly::coro::sleep(1ms);
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
co_yield 1; co_yield 1;
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
co_await folly::futures::sleep(1ms); co_await folly::coro::sleep(1ms);
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
co_yield 2; co_yield 2;
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
co_await folly::futures::sleep(1ms); co_await folly::coro::sleep(1ms);
CHECK_EQ(executor, co_await folly::coro::co_current_executor); CHECK_EQ(executor, co_await folly::coro::co_current_executor);
}; };
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h> #include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/CurrentExecutor.h> #include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/TimedWait.h> #include <folly/experimental/coro/TimedWait.h>
#include <folly/experimental/coro/Utils.h> #include <folly/experimental/coro/Utils.h>
...@@ -106,7 +107,7 @@ TEST(Coro, TaskOfMoveOnly) { ...@@ -106,7 +107,7 @@ TEST(Coro, TaskOfMoveOnly) {
} }
coro::Task<void> taskSleep() { coro::Task<void> taskSleep() {
(void)co_await futures::sleep(std::chrono::seconds{1}); (void)co_await coro::sleep(std::chrono::seconds{1});
co_return; co_return;
} }
...@@ -202,7 +203,7 @@ coro::Task<int> taskRecursion(int depth) { ...@@ -202,7 +203,7 @@ coro::Task<int> taskRecursion(int depth) {
if (depth > 0) { if (depth > 0) {
EXPECT_EQ(depth - 1, co_await taskRecursion(depth - 1)); EXPECT_EQ(depth - 1, co_await taskRecursion(depth - 1));
} else { } else {
(void)co_await futures::sleep(std::chrono::seconds{1}); (void)co_await coro::sleep(std::chrono::seconds{1});
} }
co_return depth; co_return depth;
...@@ -217,7 +218,7 @@ TEST(Coro, LargeStack) { ...@@ -217,7 +218,7 @@ TEST(Coro, LargeStack) {
coro::Task<void> taskThreadNested(std::thread::id threadId) { coro::Task<void> taskThreadNested(std::thread::id threadId) {
EXPECT_EQ(threadId, std::this_thread::get_id()); EXPECT_EQ(threadId, std::this_thread::get_id());
(void)co_await futures::sleep(std::chrono::seconds{1}); (void)co_await coro::sleep(std::chrono::seconds{1});
EXPECT_EQ(threadId, std::this_thread::get_id()); EXPECT_EQ(threadId, std::this_thread::get_id());
co_return; co_return;
} }
...@@ -309,7 +310,7 @@ TEST(Coro, TimedWaitFuture) { ...@@ -309,7 +310,7 @@ TEST(Coro, TimedWaitFuture) {
coro::Task<void> taskTimedWaitTask() { coro::Task<void> taskTimedWaitTask() {
auto fastTask = []() -> coro::Task<int> { auto fastTask = []() -> coro::Task<int> {
co_await futures::sleep(std::chrono::milliseconds{50}); co_await coro::sleep(std::chrono::milliseconds{50});
co_return 42; co_return 42;
}(); }();
auto fastResult = co_await coro::timed_wait( auto fastResult = co_await coro::timed_wait(
...@@ -322,7 +323,7 @@ coro::Task<void> taskTimedWaitTask() { ...@@ -322,7 +323,7 @@ coro::Task<void> taskTimedWaitTask() {
}; };
auto throwingTask = []() -> coro::Task<void> { auto throwingTask = []() -> coro::Task<void> {
co_await futures::sleep(std::chrono::milliseconds{50}); co_await coro::sleep(std::chrono::milliseconds{50});
throw ExpectedException(); throw ExpectedException();
}(); }();
EXPECT_THROW( EXPECT_THROW(
...@@ -349,8 +350,7 @@ TEST(Coro, TimedWaitKeepAlive) { ...@@ -349,8 +350,7 @@ TEST(Coro, TimedWaitKeepAlive) {
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();
coro::blockingWait([]() -> coro::Task<void> { coro::blockingWait([]() -> coro::Task<void> {
co_await coro::timed_wait( co_await coro::timed_wait(
futures::sleep(std::chrono::milliseconds{100}), coro::sleep(std::chrono::milliseconds{100}), std::chrono::seconds{60});
std::chrono::seconds{60});
co_return; co_return;
}()); }());
auto duration = std::chrono::steady_clock::now() - start; auto duration = std::chrono::steady_clock::now() - start;
...@@ -545,25 +545,6 @@ TEST(Coro, FutureTry) { ...@@ -545,25 +545,6 @@ TEST(Coro, FutureTry) {
}()); }());
} }
template <typename T>
folly::coro::Task<T> cancellableFuture(folly::SemiFuture<T> future) {
folly::coro::Baton baton;
// Attach an executor to ensure that the operation has been started.
auto future2 = std::move(future)
.via(co_await folly::coro::co_current_executor)
.ensure([&]() { baton.post(); });
{
folly::CancellationCallback cb{
co_await folly::coro::co_current_cancellation_token,
[&] { future2.cancel(); }};
co_await baton;
}
co_return co_await std::move(future2);
}
TEST(Coro, CancellableSleep) { TEST(Coro, CancellableSleep) {
using namespace std::chrono; using namespace std::chrono;
using namespace std::chrono_literals; using namespace std::chrono_literals;
...@@ -574,12 +555,8 @@ TEST(Coro, CancellableSleep) { ...@@ -574,12 +555,8 @@ TEST(Coro, CancellableSleep) {
coro::blockingWait([&]() -> coro::Task<void> { coro::blockingWait([&]() -> coro::Task<void> {
co_await coro::collectAll( co_await coro::collectAll(
[&]() -> coro::Task<void> { [&]() -> coro::Task<void> {
try { co_await coro::co_withCancellation(
co_await coro::co_withCancellation( cancelSrc.getToken(), coro::sleep(10s));
cancelSrc.getToken(),
cancellableFuture(folly::futures::sleep(10s)));
} catch (const folly::FutureCancellation&) {
}
}(), }(),
[&]() -> coro::Task<void> { [&]() -> coro::Task<void> {
co_await coro::co_reschedule_on_current_executor; co_await coro::co_reschedule_on_current_executor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment