Commit 4011d18c authored by Shai Szulanski's avatar Shai Szulanski Committed by Facebook GitHub Bot

Use SmallUnboundedQueue by default in AsyncPipe

Summary: The empty size of UnboundedQueue is hundreds of bytes, which is a problem for many users of AsyncPipe. Switch the default to a smaller queue, adding a template parameter so high-throughput users can switch back.

Differential Revision: D29375068

fbshipit-source-id: 9fb2dad81697eeb70d58929e6b9c1c64102aa4a8
parent 772aab73
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <folly/experimental/coro/AsyncGenerator.h> #include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Coroutine.h> #include <folly/experimental/coro/Coroutine.h>
#include <folly/experimental/coro/Invoke.h> #include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/UnboundedQueue.h> #include <folly/experimental/coro/SmallUnboundedQueue.h>
#include <folly/experimental/coro/ViaIfAsync.h> #include <folly/experimental/coro/ViaIfAsync.h>
#include <folly/fibers/Semaphore.h> #include <folly/fibers/Semaphore.h>
...@@ -56,7 +56,10 @@ namespace coro { ...@@ -56,7 +56,10 @@ namespace coro {
// close() method) becomes thread-safe. close() must be sequenced after all // close() method) becomes thread-safe. close() must be sequenced after all
// write()s in this mode. // write()s in this mode.
template <typename T, bool SingleProducer = true> template <
typename T,
bool SingleProducer = true,
template <typename, bool, bool> typename QueueType = SmallUnboundedQueue>
class AsyncPipe { class AsyncPipe {
public: public:
~AsyncPipe() { ~AsyncPipe() {
...@@ -151,8 +154,7 @@ class AsyncPipe { ...@@ -151,8 +154,7 @@ class AsyncPipe {
bool isClosed() const { return queue_.expired(); } bool isClosed() const { return queue_.expired(); }
private: private:
using Queue = using Queue = QueueType<folly::Try<T>, SingleProducer, true>;
folly::coro::UnboundedQueue<folly::Try<T>, SingleProducer, true>;
class OnClosedCallback { class OnClosedCallback {
public: public:
...@@ -201,10 +203,13 @@ class AsyncPipe { ...@@ -201,10 +203,13 @@ class AsyncPipe {
// //
// close() functions the same as AsyncPipe, and must be invoked before // close() functions the same as AsyncPipe, and must be invoked before
// destruction if an onClose callback is attached. // destruction if an onClose callback is attached.
template <typename T, bool SingleProducer = true> template <
typename T,
bool SingleProducer = true,
template <typename, bool, bool> typename QueueType = SmallUnboundedQueue>
class BoundedAsyncPipe { class BoundedAsyncPipe {
public: public:
using Pipe = AsyncPipe<T, SingleProducer>; using Pipe = AsyncPipe<T, SingleProducer, QueueType>;
static std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe> create( static std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe> create(
size_t tokens, folly::Function<void()> onClosed = nullptr) { size_t tokens, folly::Function<void()> onClosed = nullptr) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment