Commit 552c07c3 authored by Shai Szulanski's avatar Shai Szulanski Committed by Facebook Github Bot

Add coro::AsyncPipe

Summary: we've seen people using `apache::thrift::Stream` as a heavyweight way to produce this functionality. As we move to delete that we want to provide an alternative using `AsyncGenerator`.

Reviewed By: andriigrynenko

Differential Revision: D19774841

fbshipit-source-id: 5ebaa0afcb08f04d7fb899b34ac7dc0270186852
parent 5000a12b
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Try.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/UnboundedQueue.h>
#include <memory>
#include <utility>
namespace folly {
namespace coro {
// An AsyncGenerator with a write end
//
// Usage:
// auto pipe = AsyncPipe<T>::create();
// pipe.second.write(std::move(val1));
// auto val2 = co_await pipe.first.next();
//
// write() returns false if the read end has been destroyed
// The generator is completed when the write end is destroyed or on close()
// close() can also be passed an exception, which is thrown when read
template <typename T>
class AsyncPipe {
public:
~AsyncPipe() {
std::move(*this).close();
}
AsyncPipe(AsyncPipe&& pipe) noexcept {
queue_ = std::move(pipe.queue_);
}
AsyncPipe& operator=(AsyncPipe&& pipe) {
std::move(*this).close();
queue_ = std::move(pipe.queue_);
}
static std::pair<folly::coro::AsyncGenerator<T&&>, AsyncPipe<T>> create() {
auto queue = std::make_shared<Queue>();
return {
folly::coro::co_invoke([queue]() -> folly::coro::AsyncGenerator<T&&> {
while (true) {
auto val = co_await queue->dequeue();
if (val.hasValue() || val.hasException()) {
co_yield std::move(*val);
} else {
co_return;
}
}
}),
AsyncPipe(queue)};
}
template <typename U = T>
bool write(U&& val) {
if (auto queue = queue_.lock()) {
queue->enqueue(folly::Try<T>(std::forward<U>(val)));
return true;
}
return false;
}
void close(folly::exception_wrapper ew) && {
if (auto queue = queue_.lock()) {
queue->enqueue(folly::Try<T>(std::move(ew)));
queue_.reset();
}
}
void close() && {
if (auto queue = queue_.lock()) {
queue->enqueue(folly::Try<T>());
queue_.reset();
}
}
private:
using Queue = folly::coro::UnboundedQueue<folly::Try<T>, true, true>;
explicit AsyncPipe(std::weak_ptr<Queue> queue) : queue_(std::move(queue)) {}
std::weak_ptr<Queue> queue_;
};
} // namespace coro
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/AsyncPipe.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
#include <string>
TEST(AsyncPipeTest, PublishConsume) {
auto pipe = folly::coro::AsyncPipe<int>::create();
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(pipe.second.write(i));
}
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
for (int i = 0; i < 5; ++i) {
auto val = co_await pipe.first.next();
EXPECT_TRUE(val);
EXPECT_EQ(*val, i);
}
}());
}
TEST(AsyncPipeTest, PublishLRValue) {
auto pipe = folly::coro::AsyncPipe<std::string>::create();
constexpr auto val = "a string";
std::string val1 = val;
EXPECT_TRUE(pipe.second.write(val1));
EXPECT_TRUE(pipe.second.write(std::move(val1)));
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto gen = std::move(pipe.first);
for (int i = 0; i < 2; ++i) {
auto val2 = co_await gen.next();
EXPECT_TRUE(val2);
EXPECT_EQ(*val2, val);
}
}());
}
TEST(AsyncPipeTest, PublishConsumeClose) {
auto pipe = folly::coro::AsyncPipe<int>::create();
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(pipe.second.write(i));
}
std::move(pipe.second).close();
EXPECT_FALSE(pipe.second.write(0));
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
for (int i = 0; i < 5; ++i) {
auto val = co_await pipe.first.next();
EXPECT_TRUE(val);
EXPECT_EQ(*val, i);
}
auto val = co_await pipe.first.next();
EXPECT_FALSE(val);
}());
}
TEST(AsyncPipeTest, PublishConsumeError) {
auto pipe = folly::coro::AsyncPipe<int>::create();
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(pipe.second.write(i));
}
std::move(pipe.second).close(std::runtime_error(""));
EXPECT_FALSE(pipe.second.write(0));
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
for (int i = 0; i < 5; ++i) {
auto val = co_await pipe.first.next();
EXPECT_TRUE(val);
EXPECT_EQ(*val, i);
}
EXPECT_THROW({ co_await pipe.first.next(); }, std::runtime_error);
}());
}
TEST(AsyncPipeTest, PublishConsumeDestroy) {
folly::coro::AsyncGenerator<int&&> gen;
{
auto pipe = folly::coro::AsyncPipe<int>::create();
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(pipe.second.write(i));
}
gen = std::move(pipe.first);
}
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
for (int i = 0; i < 5; ++i) {
auto val = co_await gen.next();
EXPECT_TRUE(val);
EXPECT_EQ(*val, i);
}
auto val = co_await gen.next();
EXPECT_FALSE(val);
}());
}
TEST(AsyncPipeTest, BrokenPipe) {
auto pipe = folly::coro::AsyncPipe<int>::create();
EXPECT_TRUE(pipe.second.write(0));
{ auto gen = std::move(pipe.first); }
EXPECT_FALSE(pipe.second.write(0));
std::move(pipe.second).close();
}
TEST(AsyncPipeTest, WriteWhileBlocking) {
auto pipe = folly::coro::AsyncPipe<int>::create();
folly::ManualExecutor ex;
auto fut = folly::coro::co_invoke(
[&]() -> folly::coro::Task<
folly::coro::AsyncGenerator<int&&>::NextResult> {
co_return co_await pipe.first.next();
})
.scheduleOn(&ex)
.start();
ex.drain();
EXPECT_FALSE(fut.isReady());
EXPECT_TRUE(pipe.second.write(0));
ex.drain();
EXPECT_TRUE(fut.isReady());
EXPECT_EQ(*std::move(fut).get(), 0);
}
TEST(AsyncPipeTest, CloseWhileBlocking) {
auto pipe = folly::coro::AsyncPipe<int>::create();
folly::ManualExecutor ex;
auto fut = folly::coro::co_invoke(
[&]() -> folly::coro::Task<
folly::coro::AsyncGenerator<int&&>::NextResult> {
co_return co_await pipe.first.next();
})
.scheduleOn(&ex)
.start();
ex.drain();
EXPECT_FALSE(fut.isReady());
std::move(pipe.second).close();
ex.drain();
EXPECT_TRUE(fut.isReady());
EXPECT_FALSE(std::move(fut).get());
}
TEST(AsyncPipeTest, DestroyWhileBlocking) {
auto pipe = folly::coro::AsyncPipe<int>::create();
folly::ManualExecutor ex;
auto fut = folly::coro::co_invoke(
[&]() -> folly::coro::Task<
folly::coro::AsyncGenerator<int&&>::NextResult> {
co_return co_await pipe.first.next();
})
.scheduleOn(&ex)
.start();
ex.drain();
EXPECT_FALSE(fut.isReady());
{ auto pipe_ = std::move(pipe.second); }
ex.drain();
EXPECT_TRUE(fut.isReady());
EXPECT_FALSE(std::move(fut).get());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment