Commit 88ff31c0 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Basic detachOnCancel helper

Summary:
detachOnCancel adapter can be used with Awaitables that don't support active cancellation, but are safely detachable. When the cancellation is requested the adaptor will emit OperationCancelled exception without waiting for the awaitable to complete.

This diff also introduces tryAssign helper, similar to Try::operator=, but which handles a case of throwing move constructor.

Reviewed By: lewissbaker

Differential Revision: D22578847

fbshipit-source-id: cab9998c8c6a709108edefc3d59e6f44792d562f
parent 27a0093e
......@@ -346,6 +346,17 @@ auto unwrapTryTuple(Tuple&& instance) {
return try_detail::unwrapTryTupleImpl(Seq{}, std::forward<Tuple>(instance));
}
template <typename T>
void tryAssign(Try<T>& t, Try<T>&& other) noexcept {
try {
t = std::move(other);
} catch (const std::exception& ex) {
t.emplaceException(std::current_exception(), ex);
} catch (...) {
t.emplaceException(std::current_exception());
}
}
// limited to the instances unconditionally forced by the futures library
extern template class Try<Unit>;
......
......@@ -695,6 +695,15 @@ bool tryEmplaceWith(Try<void>& t, Func&& func) noexcept;
template <typename Tuple>
auto unwrapTryTuple(Tuple&&);
/*
* Try to move the value/exception from another Try object.
*
* If T's constructor throws an exception then this is caught and the Try<T>
* object is initialised to hold that exception.
*/
template <typename T>
void tryAssign(Try<T>& t, Try<T>&& other) noexcept;
} // namespace folly
#include <folly/Try-inl.h>
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <experimental/coroutine>
#include <type_traits>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/experimental/coro/detail/Helpers.h>
namespace folly {
namespace coro {
template <typename Awaitable>
Task<semi_await_result_t<Awaitable>> detachOnCancel(Awaitable awaitable) {
auto posted = std::make_unique<std::atomic<bool>>(false);
Baton baton;
Try<detail::lift_lvalue_reference_t<semi_await_result_t<Awaitable>>> result;
co_invoke(
[awaitable = std::move(
awaitable)]() mutable -> Task<semi_await_result_t<Awaitable>> {
co_return co_await std::move(awaitable);
})
.scheduleOn(co_await co_current_executor)
.startInlineUnsafe(
[postedPtr = posted.get(), &baton, &result](auto&& r) {
std::unique_ptr<std::atomic<bool>> posted(postedPtr);
if (!posted->exchange(true, std::memory_order_relaxed)) {
posted.release();
tryAssign(result, std::move(r));
baton.post();
}
},
co_await co_current_cancellation_token);
{
CancellationCallback cancelCallback(
co_await co_current_cancellation_token, [&posted, &baton, &result] {
if (!posted->exchange(true, std::memory_order_relaxed)) {
posted.release();
result.emplaceException(folly::OperationCancelled{});
baton.post();
}
});
co_await baton;
}
if (result.hasException()) {
co_yield folly::coro::co_error(result.exception());
}
co_return std::move(result).value();
}
} // namespace coro
} // namespace folly
......@@ -25,6 +25,7 @@
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/DetachOnCancel.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h>
......@@ -687,4 +688,45 @@ TEST(Coro, CoThrow) {
}()),
ExpectedException);
}
TEST_F(CoroTest, DetachOnCancel) {
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
// Run some logic while in an already-cancelled state.
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
EXPECT_THROW(
co_await folly::coro::detachOnCancel(
folly::futures::sleep(std::chrono::seconds{1})
.deferValue([](auto) { return 42; })),
folly::OperationCancelled);
}());
}());
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
EXPECT_EQ(
42,
co_await folly::coro::detachOnCancel(
folly::futures::sleep(std::chrono::milliseconds{10})
.deferValue([](auto) { return 42; })));
}());
}());
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
co_await folly::coro::detachOnCancel([]() -> folly::coro::Task<void> {
co_await folly::futures::sleep(std::chrono::milliseconds{10});
}());
}());
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment