Commit a5f943d2 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add folly::coro::blockingWait(awaitable)

Summary:
Add a generic function for performing a blocking-wait on any awaitable object.

This obsoletes the use of the `.wait()` and `.get()` methods on  `folly::coro::Future<T>` and so these methods have now been removed.

Reviewed By: andriigrynenko

Differential Revision: D9384837

fbshipit-source-id: bcff5b348c024a3fc9337c67d996c447e0723fcc
parent 50757e76
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Try.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/synchronization/Baton.h>
#include <cassert>
#include <exception>
#include <experimental/coroutine>
#include <type_traits>
#include <utility>
namespace folly {
namespace coro {
namespace detail {
template <typename T>
class BlockingWaitTask;
class BlockingWaitPromiseBase {
struct FinalAwaiter {
bool await_ready() noexcept {
return false;
}
template <typename Promise>
void await_suspend(
std::experimental::coroutine_handle<Promise> coro) noexcept {
BlockingWaitPromiseBase& promise = coro.promise();
promise.baton_.post();
}
void await_resume() noexcept {}
};
public:
BlockingWaitPromiseBase() noexcept = default;
std::experimental::suspend_always initial_suspend() {
return {};
}
FinalAwaiter final_suspend() noexcept {
return {};
}
protected:
void wait() noexcept {
baton_.wait();
}
private:
folly::Baton<> baton_;
};
template <typename T>
class BlockingWaitPromise final : public BlockingWaitPromiseBase {
public:
BlockingWaitPromise() noexcept = default;
~BlockingWaitPromise() = default;
BlockingWaitTask<T> get_return_object() noexcept;
void unhandled_exception() noexcept {
result_->emplaceException(
folly::exception_wrapper::from_exception_ptr(std::current_exception()));
}
template <
typename U,
std::enable_if_t<std::is_convertible<U, T>::value, int> = 0>
void return_value(U&& value) noexcept(
std::is_nothrow_constructible<T, U&&>::value) {
result_->emplace(static_cast<U&&>(value));
}
folly::Try<T> getAsTry() {
folly::Try<T> result;
result_ = &result;
std::experimental::coroutine_handle<BlockingWaitPromise<T>>::from_promise(
*this)
.resume();
this->wait();
return result;
}
T get() {
return getAsTry().value();
}
private:
folly::Try<T>* result_;
};
template <typename T>
class BlockingWaitPromise<T&> final : public BlockingWaitPromiseBase {
public:
BlockingWaitPromise() noexcept = default;
~BlockingWaitPromise() = default;
BlockingWaitTask<T&> get_return_object() noexcept;
void unhandled_exception() noexcept {
result_->emplaceException(
folly::exception_wrapper::from_exception_ptr(std::current_exception()));
}
auto yield_value(T&& value) noexcept {
result_->emplace(std::ref(value));
return final_suspend();
}
auto yield_value(T& value) noexcept {
result_->emplace(std::ref(value));
return final_suspend();
}
#if 0
void return_value(T& value) noexcept {
result_->emplace(std::ref(value));
}
#endif
void return_void() {
// This should never be reachable.
// The coroutine should either have suspended at co_yield or should have
// thrown an exception and skipped over the implicit co_return and
// gone straight to unhandled_exception().
std::abort();
}
folly::Try<std::reference_wrapper<T>> getAsTry() {
folly::Try<std::reference_wrapper<T>> result;
result_ = &result;
std::experimental::coroutine_handle<BlockingWaitPromise<T&>>::from_promise(
*this)
.resume();
this->wait();
return result;
}
T& get() {
return getAsTry().value();
}
private:
folly::Try<std::reference_wrapper<T>>* result_;
};
template <>
class BlockingWaitPromise<void> final : public BlockingWaitPromiseBase {
public:
BlockingWaitPromise() = default;
BlockingWaitTask<void> get_return_object() noexcept;
void return_void() noexcept {}
void unhandled_exception() noexcept {
result_->emplaceException(
exception_wrapper::from_exception_ptr(std::current_exception()));
}
folly::Try<void> getAsTry() {
folly::Try<void> result;
result_ = &result;
std::experimental::coroutine_handle<
BlockingWaitPromise<void>>::from_promise(*this)
.resume();
this->wait();
return result;
}
void get() {
return getAsTry().value();
}
private:
folly::Try<void>* result_;
};
template <typename T>
class BlockingWaitTask {
public:
using promise_type = BlockingWaitPromise<T>;
using handle_t = std::experimental::coroutine_handle<promise_type>;
explicit BlockingWaitTask(handle_t coro) noexcept : coro_(coro) {}
BlockingWaitTask(BlockingWaitTask&& other) noexcept
: coro_(std::exchange(other.coro_, {})) {}
BlockingWaitTask& operator=(BlockingWaitTask&& other) noexcept = delete;
~BlockingWaitTask() {
if (coro_) {
coro_.destroy();
}
}
decltype(auto) getAsTry() && {
return coro_.promise().getAsTry();
}
decltype(auto) get() && {
return coro_.promise().get();
}
private:
handle_t coro_;
};
template <typename T>
inline BlockingWaitTask<T>
BlockingWaitPromise<T>::get_return_object() noexcept {
return BlockingWaitTask<T>{
std::experimental::coroutine_handle<BlockingWaitPromise<T>>::from_promise(
*this)};
}
template <typename T>
inline BlockingWaitTask<T&>
BlockingWaitPromise<T&>::get_return_object() noexcept {
return BlockingWaitTask<T&>{std::experimental::coroutine_handle<
BlockingWaitPromise<T&>>::from_promise(*this)};
}
inline BlockingWaitTask<void>
BlockingWaitPromise<void>::get_return_object() noexcept {
return BlockingWaitTask<void>{std::experimental::coroutine_handle<
BlockingWaitPromise<void>>::from_promise(*this)};
}
template <typename T>
struct decay_rvalue_reference {
using type = T;
};
template <typename T>
struct decay_rvalue_reference<T&&> : std::decay<T> {};
template <typename T>
using decay_rvalue_reference_t = typename decay_rvalue_reference<T>::type;
template <
typename Awaitable,
typename Result = await_result_t<Awaitable>,
std::enable_if_t<!std::is_lvalue_reference<Result>::value, int> = 0>
auto makeBlockingWaitTask(Awaitable&& awaitable)
-> BlockingWaitTask<decay_rvalue_reference_t<Result>> {
co_return co_await static_cast<Awaitable&&>(awaitable);
}
template <
typename Awaitable,
typename Result = await_result_t<Awaitable>,
std::enable_if_t<std::is_lvalue_reference<Result>::value, int> = 0>
auto makeBlockingWaitTask(Awaitable&& awaitable)
-> BlockingWaitTask<decay_rvalue_reference_t<Result>> {
co_yield co_await static_cast<Awaitable&&>(awaitable);
}
template <
typename Awaitable,
typename Result = await_result_t<Awaitable>,
std::enable_if_t<std::is_void<Result>::value, int> = 0>
BlockingWaitTask<void> makeRefBlockingWaitTask(Awaitable&& awaitable) {
co_await static_cast<Awaitable&&>(awaitable);
}
template <
typename Awaitable,
typename Result = await_result_t<Awaitable>,
std::enable_if_t<!std::is_void<Result>::value, int> = 0>
auto makeRefBlockingWaitTask(Awaitable&& awaitable)
-> BlockingWaitTask<std::add_lvalue_reference_t<Result>> {
co_yield co_await static_cast<Awaitable&&>(awaitable);
}
} // namespace detail
/// blockingWait(Awaitable&&) -> await_result_t<Awaitable>
///
/// This function co_awaits the passed awaitable object and blocks the current
/// thread until the operation completes.
///
/// This is useful for launching an asynchronous operation from the top-level
/// main() function or from unit-tests.
///
/// WARNING:
/// Avoid using this function within any code that might run on the thread
/// of an executor as this can potentially lead to deadlock if the operation
/// you are waiting on needs to do some work on that executor in order to
/// complete.
template <typename Awaitable>
auto blockingWait(Awaitable&& awaitable)
-> detail::decay_rvalue_reference_t<await_result_t<Awaitable>> {
return static_cast<std::add_rvalue_reference_t<await_result_t<Awaitable>>>(
detail::makeRefBlockingWaitTask(static_cast<Awaitable&&>(awaitable))
.get());
}
} // namespace coro
} // namespace folly
......@@ -37,16 +37,6 @@ class Future {
other.promise_ = nullptr;
}
Wait wait() {
(void)co_await *this;
co_return;
}
typename std::add_lvalue_reference<T>::type get() {
DCHECK(promise_->state_ == Promise<T>::State::HAS_RESULT);
return *promise_->result_;
}
bool await_ready() {
return promise_->state_.load(std::memory_order_acquire) ==
Promise<T>::State::HAS_RESULT;
......@@ -75,7 +65,8 @@ class Future {
}
typename std::add_lvalue_reference<T>::type await_resume() {
return get();
DCHECK(promise_->state_ == Promise<T>::State::HAS_RESULT);
return *promise_->result_;
}
auto toFuture() &&;
......
......@@ -17,6 +17,7 @@
#include <experimental/coroutine>
#include <future>
#include <type_traits>
#include <folly/Optional.h>
#include <folly/experimental/coro/Wait.h>
......@@ -28,24 +29,35 @@ namespace coro {
template <typename T>
class AwaitableReady {
public:
explicit AwaitableReady(T value) : value_(std::move(value)) {}
explicit AwaitableReady(T value) noexcept(
std::is_nothrow_move_constructible<T>::value)
: value_(static_cast<T&&>(value)) {}
bool await_ready() {
bool await_ready() noexcept {
return true;
}
bool await_suspend(std::experimental::coroutine_handle<>) {
return false;
}
void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
T await_resume() {
return std::move(value_);
T await_resume() noexcept(std::is_nothrow_move_constructible<T>::value) {
return static_cast<T&&>(value_);
}
private:
T value_;
};
template <>
class AwaitableReady<void> {
public:
AwaitableReady() noexcept = default;
bool await_ready() noexcept {
return true;
}
void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
void await_resume() noexcept {}
};
struct getCurrentExecutor {};
struct yield {
......
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Benchmark.h>
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Utils.h>
#include <string>
BENCHMARK(blockingWaitRVOInt, iters) {
for (size_t iter = 0; iter < iters; ++iter) {
auto result =
folly::coro::blockingWait(folly::coro::AwaitableReady<int>(42));
if (result != 42) {
std::abort();
}
}
}
constexpr folly::StringPiece longString =
"hello coroutines! this is a longer string that "
"should hopefully inhibit short string optimisations.";
BENCHMARK(blockingWaitRVOStrings, iters) {
for (size_t iter = 0; iter < iters; ++iter) {
auto result = folly::coro::blockingWait(
folly::coro::AwaitableReady<std::string>(longString.str()));
if (result.size() != longString.size()) {
std::abort();
}
}
}
struct IdentityMatrix {};
struct Matrix {
/* implicit */ Matrix(IdentityMatrix) noexcept {
for (int i = 0; i < 4; ++i) {
for (int j = 0; j < 4; ++j) {
values_[i][j] = (i == j) ? 1 : 0;
}
}
}
Matrix(const Matrix&) noexcept = default;
Matrix& operator=(const Matrix&) noexcept = default;
std::uint64_t values_[4][4];
};
BENCHMARK(blockingWaitRVO, iters) {
folly::coro::AwaitableReady<Matrix> identityAwaitable{IdentityMatrix{}};
for (size_t iter = 0; iter < iters; ++iter) {
auto result = folly::coro::blockingWait(identityAwaitable);
if (result.values_[3][3] != 1) {
std::abort();
}
}
}
#endif
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
folly::runBenchmarks();
return 0;
}
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/Optional.h>
#include <folly/ScopeGuard.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Utils.h>
#include <folly/portability/GTest.h>
#include <memory>
#include <type_traits>
static_assert(
std::is_same<
decltype(folly::coro::blockingWait(
std::declval<folly::coro::AwaitableReady<void>>())),
void>::value,
"");
static_assert(
std::is_same<
decltype(folly::coro::blockingWait(
std::declval<folly::coro::AwaitableReady<int>>())),
int>::value,
"");
static_assert(
std::is_same<
decltype(folly::coro::blockingWait(
std::declval<folly::coro::AwaitableReady<int&>>())),
int&>::value,
"");
static_assert(
std::is_same<
decltype(folly::coro::blockingWait(
std::declval<folly::coro::AwaitableReady<int&&>>())),
int>::value,
"blockingWait() should convert rvalue-reference-returning awaitables "
"into a returned prvalue to avoid potential lifetime issues since "
"its possible the rvalue reference could have been to some temporary "
"object stored inside the Awaiter which would have been destructed "
"by the time blockingWait returns.");
TEST(BlockingWait, SynchronousCompletionVoidResult) {
folly::coro::blockingWait(folly::coro::AwaitableReady<void>{});
}
TEST(BlockingWait, SynchronousCompletionPRValueResult) {
EXPECT_EQ(
123, folly::coro::blockingWait(folly::coro::AwaitableReady<int>{123}));
EXPECT_EQ(
"hello",
folly::coro::blockingWait(
folly::coro::AwaitableReady<std::string>("hello")));
}
TEST(BlockingWait, SynchronousCompletionLValueResult) {
int value = 123;
int& result =
folly::coro::blockingWait(folly::coro::AwaitableReady<int&>{value});
EXPECT_EQ(&value, &result);
EXPECT_EQ(123, result);
}
TEST(BlockingWait, SynchronousCompletionRValueResult) {
auto p = std::make_unique<int>(123);
auto* ptr = p.get();
// Should return a prvalue which will lifetime-extend when assigned to an
// auto&& local variable.
auto&& result = folly::coro::blockingWait(
folly::coro::AwaitableReady<std::unique_ptr<int>&&>{std::move(p)});
EXPECT_EQ(ptr, result.get());
EXPECT_FALSE(p);
}
struct TrickyAwaitable {
struct Awaiter {
std::unique_ptr<int> value_;
bool await_ready() const {
return false;
}
bool await_suspend(std::experimental::coroutine_handle<>) {
value_ = std::make_unique<int>(42);
return false;
}
std::unique_ptr<int>&& await_resume() {
return std::move(value_);
}
};
Awaiter operator co_await() {
return {};
}
};
TEST(BlockingWait, ReturnRvalueReferenceFromAwaiter) {
// This awaitable stores the result in the temporary Awaiter object that
// is placed on the coroutine frame as part of the co_await expression.
// It then returns an rvalue-reference to the value inside this temporary
// Awaiter object. This test is making sure that we copy/move the result
// before destructing the Awaiter object.
auto result = folly::coro::blockingWait(TrickyAwaitable{});
CHECK(result);
CHECK_EQ(42, *result);
}
TEST(BlockingWait, AsynchronousCompletionOnAnotherThread) {
folly::coro::Baton baton;
std::thread t{[&] { baton.post(); }};
SCOPE_EXIT {
t.join();
};
folly::coro::blockingWait(baton);
}
template <typename T>
class SimplePromise {
public:
class WaitOperation {
public:
explicit WaitOperation(
folly::coro::Baton& baton,
folly::Optional<T>& value) noexcept
: awaiter_(baton), value_(value) {}
bool await_ready() {
return awaiter_.await_ready();
}
template <typename Promise>
auto await_suspend(std::experimental::coroutine_handle<Promise> h) {
return awaiter_.await_suspend(h);
}
T&& await_resume() {
awaiter_.await_resume();
return std::move(*value_);
}
private:
folly::coro::Baton::WaitOperation awaiter_;
folly::Optional<T>& value_;
};
SimplePromise() = default;
WaitOperation operator co_await() {
return WaitOperation{baton_, value_};
}
template <typename... Args>
void emplace(Args&&... args) {
value_.emplace(static_cast<Args&&>(args)...);
baton_.post();
}
private:
folly::coro::Baton baton_;
folly::Optional<T> value_;
};
TEST(BlockingWait, WaitOnSimpleAsyncPromise) {
SimplePromise<std::string> p;
std::thread t{[&] { p.emplace("hello coroutines!"); }};
SCOPE_EXIT {
t.join();
};
auto result = folly::coro::blockingWait(p);
EXPECT_EQ("hello coroutines!", result);
}
struct MoveCounting {
int count_;
MoveCounting() noexcept : count_(0) {}
MoveCounting(MoveCounting&& other) noexcept : count_(other.count_ + 1) {}
MoveCounting& operator=(MoveCounting&& other) = delete;
};
TEST(BlockingWait, WaitOnMoveOnlyAsyncPromise) {
SimplePromise<MoveCounting> p;
std::thread t{[&] { p.emplace(); }};
SCOPE_EXIT {
t.join();
};
auto result = folly::coro::blockingWait(p);
// Number of move-constructions:
// 0. Value is in-place constructed in Optional<T>
// 0. await_resume() returns rvalue reference to Optional<T> value.
// 1. return_value() moves value into Try<T>
// 2. Value is moved from Try<T> to blockingWait() return value.
EXPECT_GE(2, result.count_);
}
TEST(BlockingWait, moveCountingAwaitableReady) {
folly::coro::AwaitableReady<MoveCounting> awaitable{MoveCounting{}};
auto result = folly::coro::blockingWait(awaitable);
// Moves:
// 1. Move value into AwaitableReady
// 2. Move value to await_resume() return-value
// 3. Move value to Try<T>
// 4. Move value to blockingWait() return-value
EXPECT_GE(4, result.count_);
}
#endif
......@@ -20,6 +20,7 @@
#include <folly/Chrono.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Future.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
......@@ -39,7 +40,7 @@ TEST(Coro, Basic) {
executor.drive();
EXPECT_TRUE(future.await_ready());
EXPECT_EQ(42, future.get());
EXPECT_EQ(42, folly::coro::blockingWait(future));
}
TEST(Coro, BasicFuture) {
......@@ -81,7 +82,7 @@ TEST(Coro, Sleep) {
EXPECT_FALSE(future.await_ready());
future.wait();
coro::blockingWait(future);
// The total time should be roughly 1 second. Some builds, especially
// optimized ones, may result in slightly less than 1 second, so we perform
......@@ -107,7 +108,7 @@ TEST(Coro, Throw) {
executor.drive();
EXPECT_TRUE(future.await_ready());
EXPECT_THROW(future.get(), std::runtime_error);
EXPECT_THROW(coro::blockingWait(future), std::runtime_error);
}
TEST(Coro, FutureThrow) {
......@@ -136,8 +137,7 @@ TEST(Coro, LargeStack) {
ScopedEventBaseThread evbThread;
auto future = via(evbThread.getEventBase(), taskRecursion(5000));
future.wait();
EXPECT_EQ(5000, future.get());
EXPECT_EQ(5000, coro::blockingWait(future));
}
coro::Task<void> taskThreadNested(std::thread::id threadId) {
......@@ -163,8 +163,7 @@ TEST(Coro, NestedThreads) {
ScopedEventBaseThread evbThread;
auto future = via(evbThread.getEventBase(), taskThread());
future.wait();
EXPECT_EQ(42, future.get());
EXPECT_EQ(42, coro::blockingWait(future));
}
coro::Task<int> taskYield(Executor* executor) {
......@@ -177,7 +176,7 @@ coro::Task<int> taskYield(Executor* executor) {
co_await coro::yield();
EXPECT_TRUE(future.await_ready());
co_return future.get();
co_return co_await future;
}
TEST(Coro, CurrentExecutor) {
......@@ -185,8 +184,7 @@ TEST(Coro, CurrentExecutor) {
auto future =
via(evbThread.getEventBase(), taskYield(evbThread.getEventBase()));
future.wait();
EXPECT_EQ(42, future.get());
EXPECT_EQ(42, coro::blockingWait(future));
}
coro::Task<void> taskTimedWait() {
......@@ -298,7 +296,7 @@ TEST(Coro, Baton) {
executor.run();
EXPECT_TRUE(future.await_ready());
EXPECT_EQ(42, future.get());
EXPECT_EQ(42, coro::blockingWait(future));
}
#endif
......@@ -22,6 +22,7 @@
#include <folly/executors/InlineExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Future.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/Promise.h>
......@@ -149,9 +150,9 @@ TEST(Mutex, ThreadSafety) {
auto f2 = makeTask().scheduleVia(&threadPool);
auto f3 = makeTask().scheduleVia(&threadPool);
f1.wait();
f2.wait();
f3.wait();
coro::blockingWait(f1);
coro::blockingWait(f2);
coro::blockingWait(f3);
CHECK_EQ(30'000, value);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment