Commit 2b4a5c9a authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Basic coroutine library

Summary: This implements Executor-aware coroutine library.

Reviewed By: wqfish

Differential Revision: D7133189

fbshipit-source-id: 43022e0b4a44378dae670720d8144f2e042f1a54
parent 8a515308
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <experimental/coroutine>
#include <folly/ExceptionString.h>
#include <folly/Executor.h>
namespace folly {
namespace coro {
template <typename Awaitable>
class AwaitWrapper {
public:
struct promise_type {
std::experimental::suspend_always initial_suspend() {
return {};
}
std::experimental::suspend_never final_suspend() {
executor_->add(awaiter_);
awaitWrapper_->promise_ = nullptr;
return {};
}
void return_void() {}
void unhandled_exception() {
LOG(FATAL) << "Failed to schedule a task to awake a coroutine: "
<< exceptionStr(std::current_exception());
}
AwaitWrapper get_return_object() {
return {*this};
}
Executor* executor_;
std::experimental::coroutine_handle<> awaiter_;
AwaitWrapper* awaitWrapper_{nullptr};
};
static AwaitWrapper create(Awaitable* awaitable) {
return {awaitable};
}
static AwaitWrapper create(Awaitable* awaitable, Executor* executor) {
auto ret = awaitWrapper();
ret.awaitable_ = awaitable;
ret.promise_->executor_ = executor;
return ret;
}
bool await_ready() {
return awaitable_->await_ready();
}
using await_suspend_return_type =
decltype((*static_cast<Awaitable*>(nullptr))
.await_suspend(std::experimental::coroutine_handle<>()));
await_suspend_return_type await_suspend(
std::experimental::coroutine_handle<> awaiter) {
if (promise_) {
promise_->awaiter_ = std::move(awaiter);
return awaitable_->await_suspend(
std::experimental::coroutine_handle<promise_type>::from_promise(
*promise_));
}
return awaitable_->await_suspend(awaiter);
}
decltype((*static_cast<Awaitable*>(nullptr)).await_resume()) await_resume() {
return awaitable_->await_resume();
}
~AwaitWrapper() {
if (promise_) {
// This happens if await_ready() returns true or await_suspend() returns
// false.
std::experimental::coroutine_handle<promise_type>::from_promise(*promise_)
.destroy();
}
}
private:
AwaitWrapper(Awaitable* awaitable) : awaitable_(awaitable) {}
AwaitWrapper(promise_type& promise) : promise_(&promise) {
promise.awaitWrapper_ = this;
}
static AwaitWrapper awaitWrapper() {
co_return;
}
promise_type* promise_{nullptr};
Awaitable* awaitable_{nullptr};
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <glog/logging.h>
#include <folly/Executor.h>
#include <folly/experimental/coro/Promise.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/Wait.h>
namespace folly {
namespace coro {
/*
* Future object attached to a running coroutine. Implement await_* APIs.
*/
template <typename T>
class Future {
public:
Future(const Future&) = delete;
Future(Future&& other) : promise_(other.promise_) {
other.promise_ = nullptr;
}
Wait wait() {
co_await *this;
co_return;
}
typename std::add_lvalue_reference<T>::type get() {
DCHECK(promise_->state_ == Promise<T>::State::HAS_RESULT);
return *promise_->result_;
}
bool await_ready() {
return promise_->state_.load(std::memory_order_acquire) ==
Promise<T>::State::HAS_RESULT;
}
bool await_suspend(std::experimental::coroutine_handle<> awaiter) {
auto state = promise_->state_.load(std::memory_order_acquire);
if (state == Promise<T>::State::HAS_RESULT) {
return false;
}
DCHECK(state == Promise<T>::State::EMPTY);
promise_->awaiter_ = std::move(awaiter);
if (promise_->state_.compare_exchange_strong(
state,
Promise<T>::State::HAS_AWAITER,
std::memory_order_release,
std::memory_order_acquire)) {
return true;
}
DCHECK(promise_->state_ == Promise<T>::State::HAS_RESULT);
return false;
}
typename std::add_lvalue_reference<T>::type await_resume() {
return get();
}
~Future() {
if (!promise_) {
return;
}
auto state = promise_->state_.load(std::memory_order_acquire);
do {
DCHECK(state != Promise<T>::State::DETACHED);
DCHECK(state != Promise<T>::State::HAS_AWAITER);
if (state == Promise<T>::State::HAS_RESULT) {
auto ch = std::experimental::coroutine_handle<Promise<T>>::from_promise(
*promise_);
DCHECK(ch.done());
ch.destroy();
return;
}
DCHECK(state == Promise<T>::State::EMPTY);
} while (!promise_->state_.compare_exchange_weak(
state,
Promise<T>::State::DETACHED,
std::memory_order::memory_order_release,
std::memory_order::memory_order_acquire));
}
private:
friend class Task<T>;
template <typename U>
friend class Promise;
Future(Promise<T>& promise) : promise_(&promise) {}
Promise<T>* promise_;
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <glog/logging.h>
#include <folly/Try.h>
#include <folly/experimental/coro/AwaitWrapper.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/Utils.h>
#include <folly/futures/Future.h>
namespace folly {
namespace coro {
enum class PromiseState {
// Coroutine hasn't started
EMPTY,
// Coroutine is running, but Future object managing this coroutine was
// destroyed
DETACHED,
// Some other coroutine is waiting on this coroutine to be complete
HAS_AWAITER,
// Coroutine is finished, result is stored inside Promise
HAS_RESULT
};
template <typename T>
class Future;
template <typename T>
class PromiseBase {
public:
template <typename U>
void return_value(U&& value) {
result_ = Try<T>(std::forward<U>(value));
}
protected:
folly::Try<T> result_;
};
template <>
class PromiseBase<void> {
public:
void return_void() {}
protected:
folly::Try<void> result_;
};
template <typename T>
class Promise : public PromiseBase<T> {
public:
using State = PromiseState;
Promise() {}
~Promise() {}
Task<T> get_return_object() {
return {*this};
}
std::experimental::suspend_always initial_suspend() {
return {};
}
template <typename U>
auto await_transform(Task<U>&& task) {
return std::move(task).viaInline(executor_);
}
template <typename U>
auto await_transform(folly::SemiFuture<U>& future) {
return folly::detail::FutureAwaitable<U>(future.via(executor_));
}
template <typename U>
auto await_transform(folly::SemiFuture<U>&& future) {
return folly::detail::FutureAwaitable<U>(future.via(executor_));
}
template <typename U>
auto await_transform(folly::Future<U>& future) {
future = future.via(executor_);
return folly::detail::FutureRefAwaitable<U>(future);
}
template <typename U>
auto await_transform(folly::Future<U>&& future) {
future = future.via(executor_);
return folly::detail::FutureRefAwaitable<U>(future);
}
template <typename U>
AwaitWrapper<Future<U>> await_transform(Future<U>& future) {
if (future.promise_->executor_ == executor_) {
return AwaitWrapper<Future<U>>::create(future);
}
return AwaitWrapper<Future<U>>::create(future, executor_);
}
template <typename U>
AwaitWrapper<Future<U>> await_transform(Future<U>&& future) {
if (future.promise_->executor_ == executor_) {
return AwaitWrapper<Future<U>>::create(&future);
}
return AwaitWrapper<Future<U>>::create(&future, executor_);
}
template <typename U>
AwaitWrapper<U> await_transform(U&& awaitable) {
return AwaitWrapper<U>::create(&awaitable, executor_);
}
auto await_transform(getCurrentExecutor) {
return AwaitableReady<Executor*>(executor_);
}
class FinalSuspender;
FinalSuspender final_suspend() {
return {*this};
}
void unhandled_exception() {
this->result_ = Try<T>(std::current_exception());
}
void start() {
std::experimental::coroutine_handle<Promise>::from_promise (*this)();
}
private:
friend class Future<T>;
friend class Task<T>;
template <typename U>
friend class Promise;
std::atomic<State> state_{State::EMPTY};
std::experimental::coroutine_handle<> awaiter_;
Executor* executor_{nullptr};
};
template <typename T>
class Promise<T>::FinalSuspender {
public:
bool await_ready() {
return promise_.state_.load(std::memory_order_acquire) == State::DETACHED;
}
bool await_suspend(std::experimental::coroutine_handle<>) {
auto state = promise_.state_.load(std::memory_order_acquire);
do {
if (state == State::DETACHED) {
return false;
}
DCHECK(state != State::HAS_RESULT);
} while (!promise_.state_.compare_exchange_weak(
state,
State::HAS_RESULT,
std::memory_order_release,
std::memory_order_acquire));
if (state == State::HAS_AWAITER) {
promise_.awaiter_.resume();
}
return true;
}
void await_resume() {}
private:
friend class Promise;
FinalSuspender(Promise& promise) : promise_(promise) {}
Promise& promise_;
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <glog/logging.h>
#include <folly/Executor.h>
namespace folly {
namespace coro {
template <typename T>
class Promise;
template <typename T>
class Future;
/*
* Represents allocated, but not-started coroutine, which is not yet assigned to
* any executor.
*/
template <typename T>
class Task {
public:
using promise_type = Promise<T>;
Task(const Task&) = delete;
Task(Task&& other) : promise_(other.promise_) {
other.promise_ = nullptr;
}
~Task() {
DCHECK(!promise_);
}
Future<T> via(folly::Executor* executor) && {
promise_->executor_ = executor;
promise_->executor_->add([promise = promise_] { promise->start(); });
return {*std::exchange(promise_, nullptr)};
}
private:
friend promise_type;
Future<T> viaInline(folly::Executor* executor) && {
promise_->executor_ = executor;
promise_->start();
return {*std::exchange(promise_, nullptr)};
}
Task(promise_type& promise) : promise_(&promise) {}
Promise<T>* promise_;
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <experimental/coroutine>
#include <future>
namespace folly {
namespace coro {
template <typename T>
class AwaitableReady {
public:
explicit AwaitableReady(T value) : value_(std::move(value)) {}
bool await_ready() {
return true;
}
bool await_suspend(std::experimental::coroutine_handle<>) {
return false;
}
T await_resume() {
return std::move(value_);
}
private:
T value_;
};
struct getCurrentExecutor {};
struct yield {
bool await_ready() {
return false;
}
void await_suspend(std::experimental::coroutine_handle<> ch) {
ch();
}
void await_resume() {}
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <experimental/coroutine>
#include <future>
namespace folly {
namespace coro {
class Wait {
public:
class promise_type {
public:
Wait get_return_object() {
return Wait(promise_.get_future());
}
std::experimental::suspend_never initial_suspend() {
return {};
}
std::experimental::suspend_never final_suspend() {
return {};
}
void return_void() {
promise_.set_value();
}
void unhandled_exception() {
promise_.set_exception(std::current_exception());
}
private:
std::promise<void> promise_;
};
explicit Wait(std::future<void> future) : future_(std::move(future)) {}
Wait(Wait&&) = default;
~Wait() {
if (future_.valid()) {
future_.get();
}
}
private:
std::future<void> future_;
};
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Future.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
using namespace folly;
coro::Task<int> task42() {
co_return 42;
}
TEST(Coro, Basic) {
ManualExecutor executor;
auto future = task42().via(&executor);
EXPECT_FALSE(future.await_ready());
executor.drive();
EXPECT_TRUE(future.await_ready());
EXPECT_EQ(42, future.get());
}
coro::Task<void> taskSleep() {
co_await futures::sleep(std::chrono::seconds{1});
co_return;
}
TEST(Coro, Sleep) {
ScopedEventBaseThread evbThread;
auto startTime = std::chrono::steady_clock::now();
auto future = taskSleep().via(evbThread.getEventBase());
EXPECT_FALSE(future.await_ready());
future.wait();
EXPECT_GE(
std::chrono::steady_clock::now() - startTime, std::chrono::seconds{1});
EXPECT_TRUE(future.await_ready());
}
coro::Task<void> taskException() {
throw std::runtime_error("Test exception");
co_return;
}
TEST(Coro, Throw) {
ManualExecutor executor;
auto future = taskException().via(&executor);
EXPECT_FALSE(future.await_ready());
executor.drive();
EXPECT_TRUE(future.await_ready());
EXPECT_THROW(future.get(), std::runtime_error);
}
coro::Task<int> taskRecursion(int depth) {
if (depth > 0) {
EXPECT_EQ(depth - 1, co_await taskRecursion(depth - 1));
} else {
co_await futures::sleep(std::chrono::seconds{1});
}
co_return depth;
}
TEST(Coro, LargeStack) {
ScopedEventBaseThread evbThread;
auto future = taskRecursion(10000).via(evbThread.getEventBase());
future.wait();
EXPECT_EQ(10000, future.get());
}
coro::Task<void> taskThreadNested(std::thread::id threadId) {
EXPECT_EQ(threadId, std::this_thread::get_id());
co_await futures::sleep(std::chrono::seconds{1});
EXPECT_EQ(threadId, std::this_thread::get_id());
co_return;
}
coro::Task<int> taskThread() {
auto threadId = std::this_thread::get_id();
folly::ScopedEventBaseThread evbThread;
co_await taskThreadNested(evbThread.getThreadId())
.via(evbThread.getEventBase());
EXPECT_EQ(threadId, std::this_thread::get_id());
co_return 42;
}
TEST(Coro, NestedThreads) {
ScopedEventBaseThread evbThread;
auto future = taskThread().via(evbThread.getEventBase());
future.wait();
EXPECT_EQ(42, future.get());
}
coro::Task<int> taskYield(Executor* executor) {
auto currentExecutor = co_await coro::getCurrentExecutor();
EXPECT_EQ(executor, currentExecutor);
auto future = task42().via(currentExecutor);
EXPECT_FALSE(future.await_ready());
co_await coro::yield();
EXPECT_TRUE(future.await_ready());
co_return future.get();
}
TEST(Coro, CurrentExecutor) {
ScopedEventBaseThread evbThread;
auto future =
taskYield(evbThread.getEventBase()).via(evbThread.getEventBase());
future.wait();
EXPECT_EQ(42, future.get());
}
......@@ -825,4 +825,61 @@ class Future : private futures::detail::FutureBase<T> {
} // namespace folly
#if FOLLY_HAS_COROUTINES
#include <experimental/coroutine>
namespace folly {
namespace detail {
template <typename T>
class FutureAwaitable {
public:
explicit FutureAwaitable(folly::Future<T>&& future)
: future_(std::move(future)) {}
bool await_ready() const {
return future_.isReady();
}
T await_resume() {
return std::move(future_.value());
}
void await_suspend(std::experimental::coroutine_handle<> h) {
future_.setCallback_([h](Try<T>&&) mutable { h(); });
}
private:
folly::Future<T> future_;
};
template <typename T>
class FutureRefAwaitable {
public:
explicit FutureRefAwaitable(folly::Future<T>& future) : future_(future) {}
bool await_ready() const {
return future_.isReady();
}
T await_resume() {
return std::move(future_.value());
}
void await_suspend(std::experimental::coroutine_handle<> h) {
future_.setCallback_([h](Try<T>&&) mutable { h(); });
}
private:
folly::Future<T>& future_;
};
} // namespace detail
} // namespace folly
template <typename T>
folly::detail::FutureAwaitable<T>
/* implicit */ operator co_await(folly::Future<T>& future) {
return folly::detail::FutureRefAwaitable<T>(future);
}
#endif
#include <folly/futures/Future-inl.h>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment