Commit 008f5538 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Add folly::coro::timeout() algorithm

Summary:
This algorithm takes an awaitable and a duration and
cancels the operation and completes with a `FutureTimeout`
error if the operation does not complete within the specified
duration.

This differs from `timed_wait()` in that it still waits
for the child operation to complete, even once the timeout
has elapsed, rather than just detaching from the
operation and letting it complete in the background.

This relies on the operation responding to cancellation
in a timely manner for the algorithm to have the effect
of returning shortly after the timeout elapses.

The benefit of waiting for the child operation to complete,
however, is that it allows you to safely pass parameters by
reference to the child operation and be safe in knowing that
the child operation will complete before the awaiting
coroutine resumes and potentially destroys those parameters.

Reviewed By: yfeldblum

Differential Revision: D22199932

fbshipit-source-id: 4f6295a23a860ec52b639eb56a02f8029ffb9009
parent 586a200d
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/CancellationToken.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/WithCancellation.h>
namespace folly::coro {
template <typename SemiAwaitable, typename Duration>
Task<semi_await_result_t<SemiAwaitable>>
timeout(SemiAwaitable semiAwaitable, Duration timeoutDuration, Timekeeper* tk) {
CancellationSource cancelSource;
folly::coro::Baton baton;
exception_wrapper timeoutResult;
auto sleepFuture =
folly::futures::sleep(timeoutDuration, tk).toUnsafeFuture();
sleepFuture.setCallback_([&](
Executor::KeepAlive<>&&, Try<Unit> && result) noexcept {
if (result.hasException()) {
timeoutResult = std::move(result.exception());
} else {
timeoutResult = folly::make_exception_wrapper<FutureTimeout>();
}
cancelSource.requestCancellation();
baton.post();
});
bool isSleepCancelled = false;
auto tryCancelSleep = [&]() noexcept {
if (!isSleepCancelled) {
isSleepCancelled = true;
sleepFuture.cancel();
}
};
bool parentCancelled = false;
std::optional<CancellationCallback> cancelCallback{
std::in_place, co_await co_current_cancellation_token, [&]() {
cancelSource.requestCancellation();
tryCancelSleep();
parentCancelled = true;
}};
bool checkedTimeout = false;
exception_wrapper error;
try {
auto resultTry =
co_await folly::coro::co_awaitTry(folly::coro::co_withCancellation(
cancelSource.getToken(), std::move(semiAwaitable)));
cancelCallback.reset();
if (!parentCancelled && baton.ready()) {
// Timer already fired
co_yield folly::coro::co_error(std::move(timeoutResult));
}
checkedTimeout = true;
tryCancelSleep();
co_await baton;
if (resultTry.hasException()) {
co_yield folly::coro::co_error(std::move(resultTry).exception());
}
co_return std::move(resultTry).value();
} catch (const std::exception& ex) {
error = exception_wrapper{std::current_exception(), ex};
} catch (...) {
error = exception_wrapper{std::current_exception()};
}
assert(error);
cancelCallback.reset();
if (!checkedTimeout && !parentCancelled && baton.ready()) {
// Timer already fired
co_yield folly::coro::co_error(std::move(timeoutResult));
}
tryCancelSleep();
co_await baton;
co_yield folly::coro::co_error(std::move(error));
}
} // namespace folly::coro
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/futures/Future.h>
namespace folly::coro {
// Returns a Task that, when started, starts a timer of duration
// 'timeoutDuration' and awaits the passed SemiAwaitable.
//
// If the timeoutDuration elapses before the 'co_await semiAwaitable'
// operation completes then requests cancellation of the child operation
// and completes with an error of type folly::FutureTimeout.
// Otherwise, if the 'co_await semiAwaitable' operation completes before
// the timeoutDuration elapses then cancels the timer and completes with
// the result of the semiAwaitable.
//
// IMPORTANT: The operation passed as the first argument must be able
// to respond to a request for cancellation on the CancellationToken
// injected to it via co_withCancellation() in a timely manner for the
// timeout to work as expected.
//
// If a timekeeper is provided then uses that timekeeper to start the timer,
// otherwise uses the process' default TimeKeeper if 'tk' is null.
template <typename SemiAwaitable, typename Duration>
Task<semi_await_result_t<SemiAwaitable>> timeout(
SemiAwaitable semiAwaitable,
Duration timeoutDuration,
Timekeeper* tk = nullptr);
} // namespace folly::coro
#include <folly/experimental/coro/Timeout-inl.h>
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Timeout.h>
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
#include <chrono>
#include <stdexcept>
using namespace std::chrono_literals;
using namespace folly;
TEST(Timeout, CompletesSynchronously) {
coro::blockingWait([]() -> coro::Task<> {
// Completing synchronously with void
co_await coro::timeout([]() -> coro::Task<void> { co_return; }(), 1s);
// Completing synchronously with a value.
auto result =
co_await coro::timeout([]() -> coro::Task<int> { co_return 42; }(), 1s);
EXPECT_EQ(42, result);
// Test that it handles failing synchronously
auto tryResult = co_await coro::co_awaitTry(coro::timeout(
[&]() -> coro::Task<int> {
if (true) {
throw std::runtime_error{"bad value"};
}
co_return result;
}(),
1s));
EXPECT_TRUE(tryResult.hasException<std::runtime_error>());
}());
}
TEST(Timeout, CompletesWithinTimeout) {
coro::blockingWait([]() -> coro::Task<> {
// Completing synchronously with void
co_await coro::timeout(
[]() -> coro::Task<void> {
co_await coro::sleep(1ms);
co_return;
}(),
1s);
// Completing synchronously with a value.
auto result = co_await coro::timeout(
[]() -> coro::Task<int> {
co_await coro::sleep(1ms);
co_return 42;
}(),
1s);
EXPECT_EQ(42, result);
// Test that it handles failing synchronously
auto tryResult = co_await coro::co_awaitTry(coro::timeout(
[&]() -> coro::Task<int> {
co_await coro::sleep(1ms);
if (true) {
throw std::runtime_error{"bad value"};
}
co_return result;
}(),
1s));
EXPECT_TRUE(tryResult.hasException<std::runtime_error>());
}());
}
TEST(Timeout, TimeoutElapsed) {
coro::blockingWait([]() -> coro::Task<> {
// Completing synchronously with void
auto start = std::chrono::steady_clock::now();
folly::Try<void> voidResult = co_await coro::co_awaitTry(coro::timeout(
[]() -> coro::Task<void> {
co_await coro::sleep(1s);
EXPECT_TRUE((co_await coro::co_current_cancellation_token)
.isCancellationRequested());
co_return;
}(),
5ms));
auto elapsed = std::chrono::steady_clock::now() - start;
EXPECT_LT(elapsed, 100ms);
EXPECT_TRUE(voidResult.hasException<folly::FutureTimeout>());
// Completing synchronously with a value.
start = std::chrono::steady_clock::now();
auto result = co_await coro::co_awaitTry(coro::timeout(
[]() -> coro::Task<int> {
co_await coro::sleep(1s);
EXPECT_TRUE((co_await coro::co_current_cancellation_token)
.isCancellationRequested());
co_return 42;
}(),
5ms));
elapsed = std::chrono::steady_clock::now() - start;
EXPECT_LT(elapsed, 100ms);
EXPECT_TRUE(result.hasException<folly::FutureTimeout>());
// Test that it handles failing synchronously
start = std::chrono::steady_clock::now();
auto failResult = co_await coro::co_awaitTry(coro::timeout(
[&]() -> coro::Task<int> {
co_await coro::sleep(1s);
EXPECT_TRUE((co_await coro::co_current_cancellation_token)
.isCancellationRequested());
if (true) {
throw std::runtime_error{"bad value"};
}
co_return result;
}(),
5ms));
elapsed = std::chrono::steady_clock::now() - start;
EXPECT_LT(elapsed, 100ms);
EXPECT_TRUE(result.hasException<folly::FutureTimeout>());
}());
}
TEST(Timeout, CancelParent) {
coro::blockingWait([]() -> coro::Task<> {
CancellationSource cancelSource;
auto start = std::chrono::steady_clock::now();
auto [cancelled, _] = co_await coro::collectAll(
coro::co_withCancellation(
cancelSource.getToken(),
coro::timeout(
[]() -> coro::Task<bool> {
co_await coro::sleep(5s);
co_return(co_await coro::co_current_cancellation_token)
.isCancellationRequested();
}(),
10s)),
[&]() -> coro::Task<void> {
cancelSource.requestCancellation();
co_return;
}());
auto elapsed = std::chrono::steady_clock::now() - start;
EXPECT_LT(elapsed, 1s);
EXPECT_TRUE(cancelled);
}());
}
#endif // FOLLY_HAS_COROUTINES
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment