Commit af8e836f authored by Scott Pruett's avatar Scott Pruett Committed by Facebook GitHub Bot

Bounded version of AsyncPipe

Summary:
Some use-cases of AsyncPipe may want to backpressure writers in the
case that readers start to slow down. Without backpressure, the queue
(and pending work) can essentially grow without bound, leading to OOMs
or starvation of other resources.

This implements a bounded channel primitive by layering a semaphore
in front of an existing AsyncPipe. It inherits similar cancellation behavior,
though pending writes must be canceled in the case that the read end is destroyed.

Reviewed By: iahs

Differential Revision: D26260278

fbshipit-source-id: a936cf021023b31a6db868e27e9855480a1dc405
parent 02c80c5d
......@@ -21,6 +21,8 @@
#include <folly/experimental/coro/Coroutine.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/UnboundedQueue.h>
#include <folly/experimental/coro/ViaIfAsync.h>
#include <folly/fibers/Semaphore.h>
#include <memory>
#include <utility>
......@@ -86,18 +88,19 @@ class AsyncPipe {
static std::pair<folly::coro::AsyncGenerator<T&&>, AsyncPipe> create(
folly::Function<void()> onClosed = nullptr) {
auto queue = std::make_shared<Queue>();
auto cancellationSource = std::shared_ptr<folly::CancellationSource>();
auto cancellationSource = std::optional<folly::CancellationSource>();
auto onClosedCallback = std::unique_ptr<OnClosedCallback>();
if (onClosed != nullptr) {
cancellationSource = std::make_shared<folly::CancellationSource>();
cancellationSource.emplace();
onClosedCallback = std::make_unique<OnClosedCallback>(
cancellationSource, std::move(onClosed));
cancellationSource.value(), std::move(onClosed));
}
auto guard = folly::makeGuard([cancellationSource] {
if (cancellationSource != nullptr) {
cancellationSource->requestCancellation();
}
});
auto guard =
folly::makeGuard([cancellationSource = std::move(cancellationSource)] {
if (cancellationSource) {
cancellationSource->requestCancellation();
}
});
return {
folly::coro::co_invoke(
[queue,
......@@ -152,20 +155,20 @@ class AsyncPipe {
class OnClosedCallback {
public:
OnClosedCallback(
std::shared_ptr<folly::CancellationSource> cancellationSource,
folly::CancellationSource cancellationSource,
folly::Function<void()> onClosedFunc)
: cancellationSource_(std::move(cancellationSource)),
cancellationCallback_(
cancellationSource_->getToken(), std::move(onClosedFunc)) {}
cancellationSource_.getToken(), std::move(onClosedFunc)) {}
void requestInvoke() { cancellationSource_->requestCancellation(); }
void requestInvoke() { cancellationSource_.requestCancellation(); }
bool wasInvokeRequested() {
return cancellationSource_->isCancellationRequested();
return cancellationSource_.isCancellationRequested();
}
private:
std::shared_ptr<folly::CancellationSource> cancellationSource_;
folly::CancellationSource cancellationSource_;
folly::CancellationCallback cancellationCallback_;
};
......@@ -177,6 +180,107 @@ class AsyncPipe {
std::unique_ptr<OnClosedCallback> onClosed_;
};
// Bounded variant of AsyncPipe which buffers a fixed number of writes
// before blocking new attempts to write until the buffer is drained.
//
// Usage:
// auto [generator, pipe] = BoundedAsyncPipe<T>::create(/* tokens */ 10);
// co_await pipe.write(std::move(entry));
// auto entry = co_await generator.next().value();
//
// write() is a coroutine which only blocks when
// no capacity is remaining. write() returns false if the read-end has been
// destroyed or was destroyed while blocking, only throwing OperationCanceled
// if the parent coroutine was canceled while blocking.
//
// try_write() is offered which will never block, but will return false
// and not write if no capacity is remaining or the read end is already
// destroyed.
//
// close() functions the same as AsyncPipe, and must be invoked before
// destruction if an onClose callback is attached.
template <typename T, bool SingleProducer = true>
class BoundedAsyncPipe {
public:
using Pipe = AsyncPipe<T, SingleProducer>;
static std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe> create(
size_t tokens, folly::Function<void()> onClosed = nullptr) {
auto [generator, pipe] = Pipe::create(std::move(onClosed));
auto semaphore = std::make_shared<folly::fibers::Semaphore>(tokens);
folly::CancellationSource cancellationSource;
auto cancellationToken = cancellationSource.getToken();
auto guard = folly::makeGuard(
[cancellationSource = std::move(cancellationSource)]() {
cancellationSource.requestCancellation();
});
auto signalingGenerator = co_invoke(
[generator = std::move(generator),
guard = std::move(guard),
semaphore]() mutable -> folly::coro::AsyncGenerator<T&&> {
while (true) {
auto itemTry = co_await co_awaitTry(generator.next());
semaphore->signal();
co_yield co_result(std::move(itemTry));
}
});
return std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe>(
std::move(signalingGenerator),
BoundedAsyncPipe(
std::move(pipe),
std::move(semaphore),
std::move(cancellationToken)));
}
template <typename U = T>
folly::coro::Task<bool> write(U&& u) {
auto parentToken = co_await co_current_cancellation_token;
auto waitResult = co_await co_awaitTry(co_withCancellation(
folly::CancellationToken::merge(
std::move(parentToken), cancellationToken_),
semaphore_->co_wait()));
if (cancellationToken_.isCancellationRequested()) {
// eagerly return false if the read-end was destroyed instead of throwing
// OperationCanceled, to have uniform behavior when the generator is
// destroyed
co_return false;
} else if (waitResult.hasException()) {
co_yield co_error(std::move(waitResult).exception());
}
co_return pipe_.write(std::forward<U>(u));
}
template <typename U = T>
bool try_write(U&& u) {
bool available = semaphore_->try_wait();
if (!available) {
return false;
}
return pipe_.write(std::forward<U>(u));
}
void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); }
void close() && { std::move(pipe_).close(); }
private:
BoundedAsyncPipe(
Pipe&& pipe,
std::shared_ptr<folly::fibers::Semaphore> semaphore,
folly::CancellationToken cancellationToken)
: pipe_(std::move(pipe)),
semaphore_(std::move(semaphore)),
cancellationToken_(std::move(cancellationToken)) {}
Pipe pipe_;
std::shared_ptr<folly::fibers::Semaphore> semaphore_;
folly::CancellationToken cancellationToken_;
};
} // namespace coro
} // namespace folly
......
......@@ -327,4 +327,154 @@ TEST(AsyncPipeTest, PublisherMustCloseIfCallbackSetAndGeneratorAlive) {
"If an onClosed callback is specified and the generator still exists, "
"the publisher must explicitly close the pipe prior to destruction.");
}
TEST(BoundedAsyncPipeTest, PublishConsume) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 10);
for (int i = 0; i < 15; ++i) {
EXPECT_TRUE(co_await pipe.write(i));
auto item = co_await generator.next();
EXPECT_TRUE(item.has_value());
EXPECT_EQ(item.value(), i);
}
std::move(pipe).close();
EXPECT_FALSE(co_await generator.next());
}());
}
TEST(BoundedAsyncPipeTest, PublisherBlocks) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::ManualExecutor executor;
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 10);
for (size_t i = 0; i < 10; ++i) {
co_await pipe.write(i);
}
auto writeFuture = pipe.write(20).scheduleOn(&executor).start();
executor.drain();
EXPECT_FALSE(writeFuture.isReady());
auto item = co_await generator.next();
EXPECT_TRUE(item.has_value());
executor.drain();
EXPECT_TRUE(writeFuture.isReady());
}());
}
TEST(BoundedAsyncPipeTest, BlockingPublisherCanceledOnDestroy) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::ManualExecutor executor;
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
for (size_t i = 0; i < 2; ++i) {
co_await pipe.write(i);
}
std::vector<folly::SemiFuture<bool>> futures;
for (size_t i = 0; i < 5; ++i) {
auto writeFuture = pipe.write(20).scheduleOn(&executor).start();
executor.drain();
EXPECT_FALSE(writeFuture.isReady());
futures.emplace_back(std::move(writeFuture));
}
{
// destroy the read end
auto _ = std::move(generator);
}
executor.drain();
for (auto& future : futures) {
EXPECT_TRUE(future.isReady());
EXPECT_FALSE(std::move(future).get());
}
}());
}
TEST(BoundedAsyncPipeTest, PublisherFailsAfterDestroyWithRemainingTokens) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
{
// destroy the read end
auto _ = std::move(generator);
}
EXPECT_FALSE(co_await pipe.write(1));
EXPECT_FALSE(co_await pipe.write(1));
// No tokens left, blocking path also returns false
EXPECT_FALSE(co_await pipe.write(1));
EXPECT_FALSE(co_await pipe.write(1));
}());
}
TEST(BoundedAsyncPipeTest, BlockingPublisherCancelsWithParent) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::ManualExecutor executor;
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
for (size_t i = 0; i < 2; ++i) {
co_await pipe.write(i);
}
folly::CancellationSource cs;
auto future =
folly::coro::co_withCancellation(
cs.getToken(),
folly::coro::co_invoke([&pipe = pipe]() -> folly::coro::Task<bool> {
co_return co_await pipe.write(100);
}))
.scheduleOn(&executor)
.start();
executor.drain();
EXPECT_FALSE(future.isReady());
cs.requestCancellation();
executor.drain();
EXPECT_TRUE(future.isReady());
EXPECT_TRUE(std::move(future).getTry().hasException());
}());
}
TEST(BoundedAsyncPipeTest, ClosingPublisherEndsConsumer) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
for (size_t i = 0; i < 2; ++i) {
co_await pipe.write(i);
}
std::move(pipe).close();
EXPECT_TRUE(co_await generator.next());
EXPECT_TRUE(co_await generator.next());
EXPECT_FALSE(co_await generator.next());
}());
}
TEST(BoundedAsyncPipeTest, ClosingPublisherWithException) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
for (size_t i = 0; i < 2; ++i) {
co_await pipe.write(i);
}
std::move(pipe).close(std::runtime_error("error!"));
EXPECT_TRUE(co_await generator.next());
EXPECT_TRUE(co_await generator.next());
auto itemTry = co_await folly::coro::co_awaitTry(generator.next());
EXPECT_TRUE(itemTry.hasException());
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment