Commit 05e0144b authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Avoid having to catpure initialization arguments when using resumableTransform

Summary:
Currently, resumableTransform takes a paremeter-less initializeTransform method for initialization, and re-initializes the transform when the transformValue function throws an OnClosedException. This requires users to capture any initialization parameters in the initializeTransform lambda, so they can re-initialize with the same parameters when needed.

However, it is often the case that the initialization arguments are stored elsewhere in the app, and passed along as part of each message coming through input receiver to the transform. (This is the case in all uses of resumableTransform so far.) If we could take advantage of already having the initialization arguments available, we would not need to waste memory capturing and storing them in the initializeTransform lambda.

This diff does exactly that. Instead of triggering a reinitialization by throwing an OnCloseException, reinitialization is triggered by throwing a ReinitializeTransformException<InitializationArg>. This initialization argument is then passed to initializeTransform. By doing this, we no longer need to store initialization arguments in resumable transforms.

(As a side benefit, this also makes the code slightly clearer to the reader. Rather than the reader having to know that throwing an OnClosedException triggers a reinitialization, throwing a ReinitializeTransform exception makes it clear.)

Reviewed By: aary

Differential Revision: D33037191

fbshipit-source-id: 891394d79d3fc43a8e253031472256bb852c7716
parent ef8b5db6
......@@ -316,6 +316,7 @@ class TransformProcessor : public TransformProcessorBase<
* the resumableTransform function.
*/
template <
typename InitializeArg,
typename InputValueType,
typename OutputValueType,
typename TransformerType>
......@@ -328,15 +329,16 @@ class ResumableTransformProcessor : public TransformProcessorBase<
TransformProcessorBase<InputValueType, OutputValueType, TransformerType>;
using Base::Base;
void initialize() {
this->transformer_.getExecutor()->add([=]() {
runOperationWithSenderCancellation(
this->transformer_.getExecutor(),
this->sender_,
false /* currentlyWaiting */,
this /* channelCallbackToRestore */,
initializeImpl());
});
void initialize(InitializeArg initializeArg) {
this->transformer_.getExecutor()->add(
[=, initializeArg = std::move(initializeArg)]() mutable {
runOperationWithSenderCancellation(
this->transformer_.getExecutor(),
this->sender_,
false /* currentlyWaiting */,
this /* channelCallbackToRestore */,
initializeImpl(std::move(initializeArg)));
});
}
private:
......@@ -346,10 +348,10 @@ class ResumableTransformProcessor : public TransformProcessorBase<
* resumableTransform is created, and again whenever the previous input
* receiver closed without an exception.
*/
folly::coro::Task<void> initializeImpl() {
folly::coro::Task<void> initializeImpl(InitializeArg initializeArg) {
auto cancelToken = co_await folly::coro::co_current_cancellation_token;
auto initializeResult = co_await folly::coro::co_awaitTry(
this->transformer_.initializeTransform());
this->transformer_.initializeTransform(std::move(initializeArg)));
if (initializeResult.hasException()) {
auto closeResult =
initializeResult.template hasException<OnClosedException>()
......@@ -389,19 +391,24 @@ class ResumableTransformProcessor : public TransformProcessorBase<
auto cancelToken = co_await folly::coro::co_current_cancellation_token;
if (this->getSenderState() == ChannelState::Active &&
!cancelToken.isCancellationRequested()) {
if (closeResult.exception.has_value()) {
if (!closeResult.exception.has_value()) {
// We were closed without an exception. We will close the sender without
// an exception.
this->sender_->senderClose();
} else if (
noRetriesAllowed ||
!closeResult.exception
->is_compatible_with<ReinitializeException<InitializeArg>>()) {
// We were closed with an exception. We will close the sender with that
// exception.
this->sender_->senderClose(std::move(closeResult.exception.value()));
} else if (noRetriesAllowed) {
// We were closed without an exception, but no retries are allowed. This
// means that the user-provided initialization function threw an
// OnClosedException.
this->sender_->senderClose();
} else {
// We were not closed with an exception. We will re-run the user's
// initialization function and resume the resumableTransform.
co_await initializeImpl();
// We were closed with a ReinitializeException. We will re-run the
// user's initialization function and resume the resumableTransform.
auto* reinitializeEx =
closeResult.exception
->get_exception<ReinitializeException<InitializeArg>>();
co_await initializeImpl(reinitializeEx->initializeArg);
co_return;
}
}
......@@ -435,6 +442,7 @@ class DefaultTransformer {
};
template <
typename InitializeArg,
typename InputValueType,
typename OutputValueType,
typename InitializeTransformFunc,
......@@ -454,7 +462,9 @@ class DefaultResumableTransformer : public DefaultTransformer<
: Base(std::move(executor), std::move(transformValue)),
initializeTransform_(std::move(initializeTransform)) {}
auto initializeTransform() { return initializeTransform_(); }
auto initializeTransform(InitializeArg initializeArg) {
return initializeTransform_(std::move(initializeArg));
}
private:
InitializeTransformFunc initializeTransform_;
......@@ -495,6 +505,7 @@ Receiver<OutputValueType> transform(
}
template <
typename InitializeArg,
typename InitializeTransformFunc,
typename TransformValueFunc,
typename ReceiverType,
......@@ -502,32 +513,39 @@ template <
typename OutputValueType>
Receiver<OutputValueType> resumableTransform(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeArg initializeArg,
InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue) {
return resumableTransform(detail::DefaultResumableTransformer<
InputValueType,
OutputValueType,
InitializeTransformFunc,
TransformValueFunc>(
std::move(executor),
std::move(initializeTransform),
std::move(transformValue)));
return resumableTransform(
std::move(initializeArg),
detail::DefaultResumableTransformer<
InitializeArg,
InputValueType,
OutputValueType,
InitializeTransformFunc,
TransformValueFunc>(
std::move(executor),
std::move(initializeTransform),
std::move(transformValue)));
}
template <
typename InitializeArg,
typename TransformerType,
typename ReceiverType,
typename InputValueType,
typename OutputValueType>
Receiver<OutputValueType> resumableTransform(TransformerType transformer) {
Receiver<OutputValueType> resumableTransform(
InitializeArg initializeArg, TransformerType transformer) {
auto [outputReceiver, outputSender] = Channel<OutputValueType>::create();
using TProcessor = detail::ResumableTransformProcessor<
InitializeArg,
InputValueType,
OutputValueType,
TransformerType>;
auto* processor =
new TProcessor(std::move(outputSender), std::move(transformer));
processor->initialize();
processor->initialize(std::move(initializeArg));
return std::move(outputReceiver);
}
......
......@@ -101,62 +101,77 @@ Receiver<OutputValueType> transform(
/**
* This function is similar to the above transform function. However, instead of
* taking a single input receiver, it takes an initialization function that
* returns a std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>.
* accepts a value of type InitializeArg, and returns a
* std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>.
*
* - If the InitializeTransform function returns successfully, the vector's
* output values will be immediately sent to the output receiver. The input
* receiver is then processed as described in the transform function's
* documentation, until it is closed (without an exception). At that point,
* the InitializationTransform is re-run, and the transform begins anew.
* documentation, unless and until it throws a ReinitializeException. At
* that point, the InitializationTransform is re-run with the InitializeArg
* specified in the ReinitializeException, and the transform begins anew.
*
* - If the InitializeTransform function throws an OnClosedException, the
* output receiver is closed (with no exception).
* - If the InitializeTransform function or the TransformValue function throws
* an OnClosedException, the output receiver is closed (with no exception).
*
* - If the InitializeTransform function throws any other type of exception,
* the output receiver is closed with that exception.
*
* - If the TransformValue function throws any exception other than
* OnClosedException, the output receiver is closed with that exception.
* - If the InitializeTransform function or the TransformValue function throws
* any other type of exception, the output receiver is closed with that
* exception.
*
* @param executor: A folly::SequencedExecutor used to transform the values.
*
* @param initializeArg: The initial argument passed to the InitializeTransform
* function.
*
* @param initializeTransform: The InitializeTransform function as described
* above.
*
* @param transformValue: A function as described above.
* @param transformValue: The TransformValue function as described above.
*
* Example:
*
* struct InitializeArg {
* std::string param;
* }
*
* // Function that returns a receiver
* Receiver<int> getInputReceiver();
* Receiver<int> getInputReceiver(InitializeArg initializeArg);
*
* // Function that returns an executor
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* Receiver<std::string> outputReceiver = transform(
* getExecutor(),
* []() -> folly::coro::Task<
* InitializeArg{"param"},
* [](InitializeArg initializeArg) -> folly::coro::Task<
* std::pair<std::vector<std::string>, Receiver<int>> {
* co_return std::make_pair(
* std::vector<std::string>({"Initialized"}),
* getInputReceiver());
* getInputReceiver(initializeArg));
* },
* [](folly::Try<int> try) -> folly::coro::AsyncGenerator<std::string&&> {
* co_yield folly::to<std::string>(try.value());
* try {
* co_yield folly::to<std::string>(try.value());
* } catch (const SomeApplicationException& ex) {
* throw ReinitializeException(InitializeArg{ex.getParam()});
* }
* });
*
*/
template <
typename InitializeArg,
typename InitializeTransformFunc,
typename TransformValueFunc,
typename ReceiverType = typename folly::invoke_result_t<
InitializeTransformFunc>::StorageType::second_type,
InitializeTransformFunc,
InitializeArg>::StorageType::second_type,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType = typename folly::invoke_result_t<
TransformValueFunc,
folly::Try<InputValueType>>::value_type>
Receiver<OutputValueType> resumableTransform(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeArg initializeArg,
InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue);
......@@ -167,21 +182,23 @@ Receiver<OutputValueType> resumableTransform(
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>
* initializeTransform();
* initializeTransform(InitializeArg initializeArg);
*
* folly::coro::AsyncGenerator<OutputValueType&&> transformValue(
* folly::Try<InputValueType> inputValue);
*/
template <
typename InitializeArg,
typename TransformerType,
typename ReceiverType =
typename decltype(std::declval<TransformerType>()
.initializeTransform())::StorageType::second_type,
typename decltype(std::declval<TransformerType>().initializeTransform(
std::declval<InitializeArg>()))::StorageType::second_type,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType =
typename decltype(std::declval<TransformerType>().transformValue(
std::declval<folly::Try<InputValueType>>()))::value_type>
Receiver<OutputValueType> resumableTransform(TransformerType transformer);
Receiver<OutputValueType> resumableTransform(
InitializeArg initializeArg, TransformerType transformer);
/**
* An OnClosedException passed to a transform callback indicates that the input
......@@ -193,6 +210,22 @@ struct OnClosedException : public std::exception {
return "A transform has closed the channel.";
}
};
/**
* A ReinitializeException thrown by a transform callback indicates that the
* resumable transform needs to be re-initialized.
*/
template <typename InitializeArg>
struct ReinitializeException : public std::exception {
explicit ReinitializeException(InitializeArg _initializeArg)
: initializeArg(std::move(_initializeArg)) {}
const char* what() const noexcept override {
return "This resumable transform should be re-initialized.";
}
InitializeArg initializeArg;
};
} // namespace channels
} // namespace folly
......
......@@ -451,14 +451,11 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[alreadyInitialized = false,
receiver = std::move(untransformedReceiver)]() mutable
toVector("abc"s, "def"s),
[receiver = std::move(untransformedReceiver)](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
if (alreadyInitialized) {
throw OnClosedException();
}
alreadyInitialized = true;
co_return std::make_pair(toVector("abc"s, "def"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
co_yield folly::to<std::string>(result.value());
......@@ -492,14 +489,11 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[alreadyInitialized = false,
receiver = std::move(untransformedReceiver)]() mutable
toVector("abc"s, "def"s),
[receiver = std::move(untransformedReceiver)](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
if (alreadyInitialized) {
throw OnClosedException();
}
alreadyInitialized = true;
co_return std::make_pair(toVector("abc"s, "def"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
co_yield folly::to<std::string>(result.value());
......@@ -533,17 +527,13 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[alreadyInitialized = false,
receiver = std::move(untransformedReceiver)]() mutable
toVector("abc"s, "def"s),
[receiver = std::move(untransformedReceiver)](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
if (alreadyInitialized) {
throw OnClosedException();
}
alreadyInitialized = true;
co_return std::make_pair(toVector("abc"s, "def"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
LOG(INFO) << "Got value " << result.hasException();
co_yield folly::to<std::string>(result.value());
});
......@@ -570,18 +560,23 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[numTimesInitialized = 0, &receiver = untransformedReceiver]() mutable
toVector("abc1"s),
[&receiver = untransformedReceiver](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
if (numTimesInitialized > 1) {
throw OnClosedException();
}
numTimesInitialized++;
co_return std::make_pair(
toVector(folly::to<std::string>("abc", numTimesInitialized)),
std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
co_yield folly::to<std::string>(result.value());
[numReinitializations = 0](folly::Try<int> result) mutable
-> folly::coro::AsyncGenerator<std::string&&> {
try {
co_yield folly::to<std::string>(result.value());
} catch (const OnClosedException&) {
if (numReinitializations >= 1) {
throw;
}
numReinitializations++;
throw ReinitializeException(toVector("abc2"s));
}
});
EXPECT_CALL(onNext_, onValue("abc1"));
......@@ -616,12 +611,13 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[alreadyInitialized = false,
receiver = std::move(untransformedReceiver)]() mutable
toVector("abc"s),
[alreadyInitialized = false, receiver = std::move(untransformedReceiver)](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
CHECK(!alreadyInitialized);
alreadyInitialized = true;
co_return std::make_pair(toVector("abc"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
co_yield folly::to<std::string>(result.value());
......@@ -648,18 +644,18 @@ TEST_F(
auto [untransformedReceiver, sender] = Channel<int>::create();
auto transformedReceiver = resumableTransform(
&executor_,
[numTimesInitialized = 0, &receiver = untransformedReceiver]() mutable
toVector("abc1"s),
[&receiver = untransformedReceiver](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
numTimesInitialized++;
co_return std::make_pair(
toVector(folly::to<std::string>("abc", numTimesInitialized)),
std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string&&> {
if (result.hasValue()) {
co_yield folly::to<std::string>(result.value());
} else {
EXPECT_THROW(result.throwUnlessValue(), std::runtime_error);
throw ReinitializeException(toVector("abc2"s));
}
});
......@@ -694,12 +690,13 @@ TEST_F(ResumableTransformFixture, TransformThrows_NoReinitialization_Rethrows) {
bool transformThrows = false;
auto transformedReceiver = resumableTransform(
&executor_,
[alreadyInitialized = false, &receiver = untransformedReceiver]() mutable
toVector("abc"s),
[alreadyInitialized = false, &receiver = untransformedReceiver](
std::vector<std::string> initializeArg) mutable
-> folly::coro::Task<std::pair<std::vector<std::string>, Receiver<int>>> {
CHECK(!alreadyInitialized);
alreadyInitialized = true;
co_return std::make_pair(
toVector(folly::to<std::string>("abc")), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[&](folly::Try<int> result)
-> folly::coro::AsyncGenerator<std::string&&> {
......@@ -769,20 +766,27 @@ TEST_F(ResumableTransformFixtureStress, Close) {
bool close = false;
consumer_->startConsuming(resumableTransform(
folly::SerialExecutor::create(&transformExecutor),
[&]() -> folly::coro::Task<
std::pair<std::vector<std::string>, Receiver<int>>> {
if (close) {
throw OnClosedException();
}
toVector("start"s),
[&](std::vector<std::string> initializeArg)
-> folly::coro::Task<
std::pair<std::vector<std::string>, Receiver<int>>> {
auto [receiver, sender] = Channel<int>::create();
auto newProducer = makeProducer();
newProducer->startProducing(
std::move(sender), std::nullopt /* closeEx */);
setProducer(std::move(newProducer));
co_return std::make_pair(toVector("start"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string> {
co_yield folly::to<std::string>(std::move(result.value()));
[&](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string> {
try {
co_yield folly::to<std::string>(std::move(result.value()));
} catch (const OnClosedException&) {
if (close) {
throw;
} else {
throw ReinitializeException(toVector("start"s));
}
}
}));
waitForProducer();
......@@ -808,7 +812,8 @@ TEST_F(ResumableTransformFixtureStress, CancelDuringReinitialization) {
folly::makeGuard([&]() { resumableTransformDestroyed.setValue(); });
consumer_->startConsuming(resumableTransform(
folly::SerialExecutor::create(&transformExecutor),
[&, g = std::move(guard)]()
toVector("start"s),
[&, g = std::move(guard)](std::vector<std::string> initializeArg)
-> folly::coro::Task<
std::pair<std::vector<std::string>, Receiver<int>>> {
initializationStarted.setValue(folly::unit);
......@@ -820,10 +825,14 @@ TEST_F(ResumableTransformFixtureStress, CancelDuringReinitialization) {
newProducer->startProducing(
std::move(sender), std::nullopt /* closeEx */);
setProducer(std::move(newProducer));
co_return std::make_pair(toVector("start"s), std::move(receiver));
co_return std::make_pair(std::move(initializeArg), std::move(receiver));
},
[](folly::Try<int> result) -> folly::coro::AsyncGenerator<std::string> {
co_yield folly::to<std::string>(std::move(result.value()));
try {
co_yield folly::to<std::string>(std::move(result.value()));
} catch (const OnClosedException&) {
throw ReinitializeException(toVector("start"s));
}
}));
initializationStarted.getSemiFuture().get();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment