Commit 57e3562c authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Rust-compatible adaptors for Task and AsyncGenerator

Summary:
This is a basic implementation of adaptors for coro::Task and coro::AsyncGenerator that are compatible with https://doc.rust-lang.org/std/future/trait.Future.html and https://docs.rs/futures/0.3.13/futures/stream/trait.Stream.html.
Note that this performs blocking cancellation which may result in long destruction time/or even deadlocks if a given Task/AsyncGenerator doesn't support inline cancellation.

Reviewed By: Imxset21, c-ryan747

Differential Revision: D26938974

fbshipit-source-id: fd043304441931c6f7aa078a1a5fee50ebfa2e90
parent 8ebb6e9e
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/CancellationToken.h>
#include <folly/Executor.h>
#include <folly/Optional.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Task.h>
#include <folly/futures/Future.h>
#include <folly/synchronization/Baton.h>
#if FOLLY_HAS_COROUTINES
namespace folly {
namespace coro {
template <typename T>
class PollFuture final : private Executor {
public:
using Poll = Optional<lift_unit_t<T>>;
using Waker = Function<void()>;
explicit PollFuture(Task<T> task) {
Executor* self = this;
std::move(task)
.scheduleOn(makeKeepAlive(self))
.start(
[&](Try<T>&& result) noexcept {
// Rust doesn't support exceptions
DCHECK(!result.hasException());
if constexpr (!std::is_same_v<T, void>) {
result_ = std::move(result).value();
} else {
result_ = unit;
}
},
cancellationSource_.getToken());
}
explicit PollFuture(SemiFuture<lift_unit_t<T>> future) {
Executor* self = this;
std::move(future)
.via(makeKeepAlive(self))
.setCallback_([&](Executor::KeepAlive<>&&, Try<T>&& result) mutable {
result_ = std::move(result).value();
});
}
~PollFuture() override {
cancellationSource_.requestCancellation();
if (keepAliveCount_.load(std::memory_order_relaxed) > 0) {
folly::Baton<> b;
while (!poll([&] { b.post(); })) {
b.wait();
b.reset();
}
}
}
Poll poll(Waker waker) {
while (true) {
std::queue<Func> funcs;
{
auto wQueueAndWaker = queueAndWaker_.wlock();
if (wQueueAndWaker->funcs.empty()) {
wQueueAndWaker->waker = std::move(waker);
break;
}
std::swap(funcs, wQueueAndWaker->funcs);
}
while (!funcs.empty()) {
funcs.front()();
funcs.pop();
}
}
if (keepAliveCount_.load(std::memory_order_relaxed) == 0) {
return std::move(result_);
}
return none;
}
private:
void add(Func func) override {
auto waker = [&] {
auto wQueueAndWaker = queueAndWaker_.wlock();
wQueueAndWaker->funcs.push(std::move(func));
return std::exchange(wQueueAndWaker->waker, {});
}();
if (waker) {
waker();
}
}
bool keepAliveAcquire() noexcept override {
auto keepAliveCount =
keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
DCHECK(keepAliveCount > 0);
return true;
}
void keepAliveRelease() noexcept override {
auto keepAliveCount = keepAliveCount_.load(std::memory_order_relaxed);
do {
DCHECK(keepAliveCount > 0);
if (keepAliveCount == 1) {
add([this] {
// the final count *must* be released from this executor so that we
// don't race with poll.
keepAliveCount_.fetch_sub(1, std::memory_order_relaxed);
});
return;
}
} while (!keepAliveCount_.compare_exchange_weak(
keepAliveCount,
keepAliveCount - 1,
std::memory_order_release,
std::memory_order_relaxed));
}
struct QueueAndWaker {
std::queue<Func> funcs;
Waker waker;
};
Synchronized<QueueAndWaker> queueAndWaker_;
std::atomic<ssize_t> keepAliveCount_{1};
Optional<lift_unit_t<T>> result_;
CancellationSource cancellationSource_;
};
template <typename T>
class PollStream {
public:
using Poll = Optional<Optional<T>>;
using Waker = Function<void()>;
explicit PollStream(AsyncGenerator<T> asyncGenerator)
: asyncGenerator_(std::move(asyncGenerator)) {}
Poll poll(Waker waker) {
if (!nextFuture_) {
nextFuture_.emplace(getNext());
}
auto nextPoll = nextFuture_->poll(std::move(waker));
if (!nextPoll) {
return none;
}
nextFuture_.reset();
return nextPoll;
}
private:
Task<Optional<T>> getNext() {
auto next = co_await asyncGenerator_.next();
if (next) {
co_return std::move(next).value();
}
co_return none;
}
AsyncGenerator<T> asyncGenerator_;
Optional<PollFuture<Optional<T>>> nextFuture_;
};
} // namespace coro
} // namespace folly
#endif // FOLLY_HAS_COROUTINES
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#include <folly/CancellationToken.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/RustAdaptors.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
#include <chrono>
#if FOLLY_HAS_COROUTINES
template <typename T>
T getPollFuture(folly::coro::PollFuture<T> future) {
while (true) {
folly::Baton<> b;
auto poll = future.poll([&] { b.post(); });
if (poll) {
return std::move(poll).value();
}
b.wait();
}
}
template <typename T>
folly::Optional<T> getNextPollStream(folly::coro::PollStream<T>& stream) {
while (true) {
folly::Baton<> b;
auto poll = stream.poll([&] { b.post(); });
if (poll) {
return std::move(poll).value();
}
b.wait();
}
}
folly::coro::Task<int> task42() {
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_return 42;
}
TEST(RustAdaptorsTest, PollFuture) {
EXPECT_EQ(42, getPollFuture(folly::coro::PollFuture<int>(task42())));
}
TEST(RustAdaptorsTest, PollFutureSemiFuture) {
EXPECT_EQ(42, getPollFuture(folly::coro::PollFuture<int>(task42().semi())));
}
folly::coro::AsyncGenerator<int> stream123() {
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_yield 1;
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_yield 2;
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_await folly::coro::sleep(std::chrono::milliseconds{10});
co_yield 3;
}
TEST(RustAdaptorsTest, PollStream) {
auto stream = folly::coro::PollStream<int>(stream123());
EXPECT_EQ(1, getNextPollStream(stream).value());
EXPECT_EQ(2, getNextPollStream(stream).value());
EXPECT_EQ(3, getNextPollStream(stream).value());
EXPECT_FALSE(getNextPollStream(stream).hasValue());
}
folly::coro::Task<void> cancellationTask(bool& done) {
folly::coro::Baton b;
folly::CancellationCallback cb(
co_await folly::coro::co_current_cancellation_token, [&] { b.post(); });
co_await b;
done = true;
}
TEST(RustAdaptorsTest, PollFutureCancellation) {
bool done{false};
{
auto future = folly::coro::PollFuture<void>(cancellationTask(done));
EXPECT_EQ(folly::none, future.poll([] {}));
EXPECT_FALSE(done);
}
EXPECT_TRUE(done);
}
folly::coro::AsyncGenerator<int> cancellationStream(bool& done) {
co_yield 1;
co_yield 2;
co_await cancellationTask(done);
}
TEST(RustAdaptorsTest, PollStreamCancellation) {
bool done{false};
{
auto stream = folly::coro::PollStream<int>(cancellationStream(done));
EXPECT_EQ(1, getNextPollStream(stream).value());
EXPECT_EQ(2, getNextPollStream(stream).value());
EXPECT_EQ(folly::none, stream.poll([] {}));
EXPECT_FALSE(done);
}
EXPECT_TRUE(done);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment