Commit 9385c05a authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Add Transform overload that accepts a Transformer object

Summary:
This diff reduces the amount of memory that transforms use. Currently, transform explicitly takes an executor and a transform lambda (and resumableTransform takes an additional initialization lambda). Often, these lambdas end up taking pointers to some shared state object. In the case of resumableTransform, both lambdas need to capture a pointer to the same shared state object. Furthermore, the executor passed to transform is often present on the shared state object, making it redundant. This problem will get even worse when we add rate limiters (which also can be placed on the shared state object).

To reduce memory waste, this diff adds a new overload for transform and resumableTransform that takes in a Transformer object. This transformer object must implement getExecutor, transformValue, and (for resumableTransform) initializeValue. This allows a transformer object to contain just one 8-byte pointer to a shared state object, rather than duplicating pointers for different lambda captures and executors (and eventually rate limiters). It also slightly simplifies code in certain cases, as the shared state can be accessed as a field from any function in the class (rather than captured and passed down as a parameter to all helper functions).

We will still leave the existing overloads for cases where it is simpler for the user to not create a new object (when the user doesn't care as much about the memory savings).

Reviewed By: aary

Differential Revision: D33033365

fbshipit-source-id: e57c16e76ad3795a7f93b39565f553e76623beb6
parent b56cffe6
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <folly/executors/SequencedExecutor.h> #include <folly/executors/SequencedExecutor.h>
#include <folly/experimental/channels/Channel.h> #include <folly/experimental/channels/Channel.h>
#include <folly/experimental/channels/detail/Utility.h> #include <folly/experimental/channels/detail/Utility.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
namespace folly { namespace folly {
...@@ -54,22 +55,20 @@ namespace detail { ...@@ -54,22 +55,20 @@ namespace detail {
template < template <
typename InputValueType, typename InputValueType,
typename OutputValueType, typename OutputValueType,
typename TransformValueFunc> typename TransformerType>
class TransformProcessorBase : public IChannelCallback { class TransformProcessorBase : public IChannelCallback {
public: public:
TransformProcessorBase( TransformProcessorBase(
Sender<OutputValueType> sender, Sender<OutputValueType> sender, TransformerType transformer)
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
TransformValueFunc transformValue)
: sender_(std::move(senderGetBridge(sender))), : sender_(std::move(senderGetBridge(sender))),
executor_(std::move(executor)), transformer_(std::move(transformer)) {}
transformValue_(std::move(transformValue)) {}
template <typename ReceiverType> template <typename ReceiverType>
void startTransform(ReceiverType receiver) { void startTransform(ReceiverType receiver) {
executor_->add([=, receiver = std::move(receiver)]() mutable { transformer_.getExecutor()->add(
[=, receiver = std::move(receiver)]() mutable {
runOperationWithSenderCancellation( runOperationWithSenderCancellation(
this->executor_, transformer_.getExecutor(),
this->sender_, this->sender_,
false /* alreadyStartedWaiting */, false /* alreadyStartedWaiting */,
this /* channelCallbackToRestore */, this /* channelCallbackToRestore */,
...@@ -98,12 +97,12 @@ class TransformProcessorBase : public IChannelCallback { ...@@ -98,12 +97,12 @@ class TransformProcessorBase : public IChannelCallback {
* sender). * sender).
*/ */
void consume(ChannelBridgeBase* bridge) override { void consume(ChannelBridgeBase* bridge) override {
executor_->add([=]() { transformer_.getExecutor()->add([=]() {
if (bridge == receiver_.get()) { if (bridge == receiver_.get()) {
// We have received new values from the input receiver. // We have received new values from the input receiver.
CHECK_NE(getReceiverState(), ChannelState::CancellationProcessed); CHECK_NE(getReceiverState(), ChannelState::CancellationProcessed);
runOperationWithSenderCancellation( runOperationWithSenderCancellation(
this->executor_, transformer_.getExecutor(),
this->sender_, this->sender_,
true /* alreadyStartedWaiting */, true /* alreadyStartedWaiting */,
this /* channelCallbackToRestore */, this /* channelCallbackToRestore */,
...@@ -124,13 +123,15 @@ class TransformProcessorBase : public IChannelCallback { ...@@ -124,13 +123,15 @@ class TransformProcessorBase : public IChannelCallback {
* listening to. * listening to.
*/ */
void canceled(ChannelBridgeBase* bridge) override { void canceled(ChannelBridgeBase* bridge) override {
executor_->add([=]() { transformer_.getExecutor()->add([=]() {
if (bridge == receiver_.get()) { if (bridge == receiver_.get()) {
// We previously cancelled the input receiver (because the consumer of // We previously cancelled the input receiver (because the consumer of
// the output receiver stopped consuming). Process the cancellation for // the output receiver stopped consuming). Process the cancellation for
// the input receiver. // the input receiver.
CHECK_EQ(getReceiverState(), ChannelState::CancellationTriggered); CHECK_EQ(getReceiverState(), ChannelState::CancellationTriggered);
processReceiverCancelled(CloseResult()).scheduleOn(executor_).start(); processReceiverCancelled(CloseResult())
.scheduleOn(transformer_.getExecutor())
.start();
} else { } else {
// We previously cancelled the sender due to the closure of the input // We previously cancelled the sender due to the closure of the input
// receiver. Process the cancellation for the sender. // receiver. Process the cancellation for the sender.
...@@ -190,8 +191,9 @@ class TransformProcessorBase : public IChannelCallback { ...@@ -190,8 +191,9 @@ class TransformProcessorBase : public IChannelCallback {
if (!inputResult.hasValue() && !inputResult.hasException()) { if (!inputResult.hasValue() && !inputResult.hasException()) {
inputResult = folly::Try<InputValueType>(OnClosedException()); inputResult = folly::Try<InputValueType>(OnClosedException());
} }
auto outputGen = folly::makeTryWith( auto outputGen = folly::makeTryWith([&]() {
[&]() { return transformValue_(std::move(inputResult)); }); return transformer_.transformValue(std::move(inputResult));
});
if (!outputGen.hasValue()) { if (!outputGen.hasValue()) {
// The transform function threw an exception and was not a coroutine. // The transform function threw an exception and was not a coroutine.
// We will close the output receiver. // We will close the output receiver.
...@@ -268,8 +270,7 @@ class TransformProcessorBase : public IChannelCallback { ...@@ -268,8 +270,7 @@ class TransformProcessorBase : public IChannelCallback {
ChannelBridgePtr<InputValueType> receiver_; ChannelBridgePtr<InputValueType> receiver_;
ChannelBridgePtr<OutputValueType> sender_; ChannelBridgePtr<OutputValueType> sender_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_; TransformerType transformer_;
TransformValueFunc transformValue_;
}; };
/** /**
...@@ -280,16 +281,14 @@ class TransformProcessorBase : public IChannelCallback { ...@@ -280,16 +281,14 @@ class TransformProcessorBase : public IChannelCallback {
template < template <
typename InputValueType, typename InputValueType,
typename OutputValueType, typename OutputValueType,
typename TransformValueFunc> typename TransformerType>
class TransformProcessor : public TransformProcessorBase< class TransformProcessor : public TransformProcessorBase<
InputValueType, InputValueType,
OutputValueType, OutputValueType,
TransformValueFunc> { TransformerType> {
public: public:
using Base = TransformProcessorBase< using Base =
InputValueType, TransformProcessorBase<InputValueType, OutputValueType, TransformerType>;
OutputValueType,
TransformValueFunc>;
using Base::Base; using Base::Base;
private: private:
...@@ -319,30 +318,20 @@ class TransformProcessor : public TransformProcessorBase< ...@@ -319,30 +318,20 @@ class TransformProcessor : public TransformProcessorBase<
template < template <
typename InputValueType, typename InputValueType,
typename OutputValueType, typename OutputValueType,
typename InitializeTransformFunc, typename TransformerType>
typename TransformValueFunc>
class ResumableTransformProcessor : public TransformProcessorBase< class ResumableTransformProcessor : public TransformProcessorBase<
InputValueType, InputValueType,
OutputValueType, OutputValueType,
TransformValueFunc> { TransformerType> {
public: public:
using Base = TransformProcessorBase< using Base =
InputValueType, TransformProcessorBase<InputValueType, OutputValueType, TransformerType>;
OutputValueType, using Base::Base;
TransformValueFunc>;
ResumableTransformProcessor(
Sender<OutputValueType> sender,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue)
: Base(std::move(sender), std::move(executor), std::move(transformValue)),
initializeTransform_(std::move(initializeTransform)) {}
void initialize() { void initialize() {
this->executor_->add([=]() { this->transformer_.getExecutor()->add([=]() {
runOperationWithSenderCancellation( runOperationWithSenderCancellation(
this->executor_, this->transformer_.getExecutor(),
this->sender_, this->sender_,
false /* currentlyWaiting */, false /* currentlyWaiting */,
this /* channelCallbackToRestore */, this /* channelCallbackToRestore */,
...@@ -359,8 +348,8 @@ class ResumableTransformProcessor : public TransformProcessorBase< ...@@ -359,8 +348,8 @@ class ResumableTransformProcessor : public TransformProcessorBase<
*/ */
folly::coro::Task<void> initializeImpl() { folly::coro::Task<void> initializeImpl() {
auto cancelToken = co_await folly::coro::co_current_cancellation_token; auto cancelToken = co_await folly::coro::co_current_cancellation_token;
auto initializeResult = auto initializeResult = co_await folly::coro::co_awaitTry(
co_await folly::coro::co_awaitTry(initializeTransform_()); this->transformer_.initializeTransform());
if (initializeResult.hasException()) { if (initializeResult.hasException()) {
auto closeResult = auto closeResult =
initializeResult.template hasException<OnClosedException>() initializeResult.template hasException<OnClosedException>()
...@@ -418,7 +407,56 @@ class ResumableTransformProcessor : public TransformProcessorBase< ...@@ -418,7 +407,56 @@ class ResumableTransformProcessor : public TransformProcessorBase<
} }
this->maybeDelete(); this->maybeDelete();
} }
};
template <
typename InputValueType,
typename OutputValueType,
typename TransformValueFunc>
class DefaultTransformer {
public:
DefaultTransformer(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
TransformValueFunc transformValue)
: executor_(std::move(executor)),
transformValue_(std::move(transformValue)) {}
folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor() {
return executor_;
}
auto transformValue(folly::Try<InputValueType> inputValue) {
return transformValue_(std::move(inputValue));
}
private:
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
TransformValueFunc transformValue_;
};
template <
typename InputValueType,
typename OutputValueType,
typename InitializeTransformFunc,
typename TransformValueFunc>
class DefaultResumableTransformer : public DefaultTransformer<
InputValueType,
OutputValueType,
TransformValueFunc> {
public:
using Base =
DefaultTransformer<InputValueType, OutputValueType, TransformValueFunc>;
DefaultResumableTransformer(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue)
: Base(std::move(executor), std::move(transformValue)),
initializeTransform_(std::move(initializeTransform)) {}
auto initializeTransform() { return initializeTransform_(); }
private:
InitializeTransformFunc initializeTransform_; InitializeTransformFunc initializeTransform_;
}; };
} // namespace detail } // namespace detail
...@@ -432,11 +470,26 @@ Receiver<OutputValueType> transform( ...@@ -432,11 +470,26 @@ Receiver<OutputValueType> transform(
ReceiverType inputReceiver, ReceiverType inputReceiver,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor, folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
TransformValueFunc transformValue) { TransformValueFunc transformValue) {
return transform(
std::move(inputReceiver),
detail::DefaultTransformer<
InputValueType,
OutputValueType,
TransformValueFunc>(std::move(executor), std::move(transformValue)));
}
template <
typename ReceiverType,
typename TransformerType,
typename InputValueType,
typename OutputValueType>
Receiver<OutputValueType> transform(
ReceiverType inputReceiver, TransformerType transformer) {
auto [outputReceiver, outputSender] = Channel<OutputValueType>::create(); auto [outputReceiver, outputSender] = Channel<OutputValueType>::create();
using TProcessor = detail:: using TProcessor = detail::
TransformProcessor<InputValueType, OutputValueType, TransformValueFunc>; TransformProcessor<InputValueType, OutputValueType, TransformerType>;
auto* processor = new TProcessor( auto* processor =
std::move(outputSender), std::move(executor), std::move(transformValue)); new TProcessor(std::move(outputSender), std::move(transformer));
processor->startTransform(std::move(inputReceiver)); processor->startTransform(std::move(inputReceiver));
return std::move(outputReceiver); return std::move(outputReceiver);
} }
...@@ -449,21 +502,34 @@ template < ...@@ -449,21 +502,34 @@ template <
typename OutputValueType> typename OutputValueType>
Receiver<OutputValueType> resumableTransform( Receiver<OutputValueType> resumableTransform(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor, folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeTransformFunc initializeTransformFunc, InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue) { TransformValueFunc transformValue) {
return resumableTransform(detail::DefaultResumableTransformer<
InputValueType,
OutputValueType,
InitializeTransformFunc,
TransformValueFunc>(
std::move(executor),
std::move(initializeTransform),
std::move(transformValue)));
}
template <
typename TransformerType,
typename ReceiverType,
typename InputValueType,
typename OutputValueType>
Receiver<OutputValueType> resumableTransform(TransformerType transformer) {
auto [outputReceiver, outputSender] = Channel<OutputValueType>::create(); auto [outputReceiver, outputSender] = Channel<OutputValueType>::create();
using TProcessor = detail::ResumableTransformProcessor< using TProcessor = detail::ResumableTransformProcessor<
InputValueType, InputValueType,
OutputValueType, OutputValueType,
InitializeTransformFunc, TransformerType>;
TransformValueFunc>; auto* processor =
auto* processor = new TProcessor( new TProcessor(std::move(outputSender), std::move(transformer));
std::move(outputSender),
executor,
std::move(initializeTransformFunc),
std::move(transformValue));
processor->initialize(); processor->initialize();
return std::move(outputReceiver); return std::move(outputReceiver);
} }
} // namespace channels } // namespace channels
} // namespace folly } // namespace folly
...@@ -79,6 +79,25 @@ Receiver<OutputValueType> transform( ...@@ -79,6 +79,25 @@ Receiver<OutputValueType> transform(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor, folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
TransformValueFunc transformValue); TransformValueFunc transformValue);
/**
* This overload accepts arguments in the form of a transformer object. The
* transformer object must have the following functions:
*
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* folly::coro::AsyncGenerator<OutputValueType&&> transformValue(
* folly::Try<InputValueType> inputValue);
*/
template <
typename ReceiverType,
typename TransformerType,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType =
typename decltype(std::declval<TransformerType>().transformValue(
std::declval<folly::Try<InputValueType>>()))::value_type>
Receiver<OutputValueType> transform(
ReceiverType inputReceiver, TransformerType transformer);
/** /**
* This function is similar to the above transform function. However, instead of * This function is similar to the above transform function. However, instead of
* taking a single input receiver, it takes an initialization function that * taking a single input receiver, it takes an initialization function that
...@@ -141,6 +160,29 @@ Receiver<OutputValueType> resumableTransform( ...@@ -141,6 +160,29 @@ Receiver<OutputValueType> resumableTransform(
InitializeTransformFunc initializeTransform, InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue); TransformValueFunc transformValue);
/**
* This overload accepts arguments in the form of a transformer object. The
* transformer object must have the following functions:
*
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>
* initializeTransform();
*
* folly::coro::AsyncGenerator<OutputValueType&&> transformValue(
* folly::Try<InputValueType> inputValue);
*/
template <
typename TransformerType,
typename ReceiverType =
typename decltype(std::declval<TransformerType>()
.initializeTransform())::StorageType::second_type,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType =
typename decltype(std::declval<TransformerType>().transformValue(
std::declval<folly::Try<InputValueType>>()))::value_type>
Receiver<OutputValueType> resumableTransform(TransformerType transformer);
/** /**
* An OnClosedException passed to a transform callback indicates that the input * An OnClosedException passed to a transform callback indicates that the input
* channel was closed. An OnClosedException can also be thrown by a transform * channel was closed. An OnClosedException can also be thrown by a transform
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment