Commit 9eff0218 authored by Alex Eckert's avatar Alex Eckert Committed by Facebook GitHub Bot

Add api to check if AsyncPipe is still connected

Summary:
I have a use case where I would like to check if the
AsyncPipe is still active without pumping something in to
write(). Could set up an onClosed callback, but this would introduce
more complexity.

This adds a function to determine if the queue inside the AsyncPipe
still exists.

Reviewed By: iahs

Differential Revision: D28015007

fbshipit-source-id: 1ba37466218abd2f42eab1d8ea5677c3945e31f5
parent ab5d1a0b
...@@ -148,6 +148,8 @@ class AsyncPipe { ...@@ -148,6 +148,8 @@ class AsyncPipe {
} }
} }
bool isClosed() const { return queue_.expired(); }
private: private:
using Queue = using Queue =
folly::coro::UnboundedQueue<folly::Try<T>, SingleProducer, true>; folly::coro::UnboundedQueue<folly::Try<T>, SingleProducer, true>;
...@@ -267,6 +269,8 @@ class BoundedAsyncPipe { ...@@ -267,6 +269,8 @@ class BoundedAsyncPipe {
void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); } void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); }
void close() && { std::move(pipe_).close(); } void close() && { std::move(pipe_).close(); }
bool isClosed() const { return pipe_.isClosed(); }
private: private:
BoundedAsyncPipe( BoundedAsyncPipe(
Pipe&& pipe, Pipe&& pipe,
......
...@@ -150,6 +150,14 @@ TEST(AsyncPipeTest, BrokenPipe) { ...@@ -150,6 +150,14 @@ TEST(AsyncPipeTest, BrokenPipe) {
std::move(pipe.second).close(); std::move(pipe.second).close();
} }
TEST(AsyncPipeTest, IsClosed) {
auto pipe = folly::coro::AsyncPipe<int>::create();
EXPECT_FALSE(pipe.second.isClosed());
{ auto gen = std::move(pipe.first); }
EXPECT_TRUE(pipe.second.isClosed());
std::move(pipe.second).close();
}
TEST(AsyncPipeTest, WriteWhileBlocking) { TEST(AsyncPipeTest, WriteWhileBlocking) {
auto pipe = folly::coro::AsyncPipe<int>::create(); auto pipe = folly::coro::AsyncPipe<int>::create();
folly::ManualExecutor ex; folly::ManualExecutor ex;
...@@ -373,6 +381,18 @@ TEST(BoundedAsyncPipeTest, PublisherBlocks) { ...@@ -373,6 +381,18 @@ TEST(BoundedAsyncPipeTest, PublisherBlocks) {
}()); }());
} }
TEST(BoundedAsyncPipeTest, IsClosed) {
auto [generator, pipe] =
folly::coro::BoundedAsyncPipe<int>::create(/* tokens */ 2);
EXPECT_FALSE(pipe.isClosed());
{
// destroy the read end
auto _ = std::move(generator);
}
EXPECT_TRUE(pipe.isClosed());
}
TEST(BoundedAsyncPipeTest, BlockingPublisherCanceledOnDestroy) { TEST(BoundedAsyncPipeTest, BlockingPublisherCanceledOnDestroy) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> { folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::ManualExecutor executor; folly::ManualExecutor executor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment