Commit 0a5dcad2 authored by Mike Kolupaev's avatar Mike Kolupaev Committed by Facebook GitHub Bot

Document non-structured-concurrenty caveats in coro::merge(), fix one of them

Summary:
Part of the added comment about `folly::coro::merge()`:

  // Currently this doesn't fully do structured concurrency. Care is needed to
  // avoid use-after-free when the stream is terminated early, as some cancelled
  // input streams may be left running in background.
  // The rule is: if output generator's next() returned an empty value
  // (end of stream), it's guaranteed that 'sources' and all input coroutines are
  // completed and destroyed, no use-after-free danger. In all other cases
  // (next() returned an exception or the output generator was destroyed early),
  // the input generators may be left running in background (cancelled and
  // detached).

This diff adds a long comment about this and a couple other quirks (or what I would consider quirks).

It also fixes one of the quirks: the merged stream may return end-of-stream when 'source' coroutine frame is not destroyed yet. So if the 'source' coroutine e.g. has a `SCOPE_EXIT` that references some state owned by the caller of `merge()`, it may in principle use-after-free. This diff destroys 'source' coroutine frame just before returning end-of-stream rather than just after. This way `merge()` is at least possible to use in "strucutured concurrency" style, as long as you avoid exceptions and always drain the merged generator before destroying it.

It would be easy to do the same for the exception case (just wait for the `makeConsumerTask()` coroutine to complete before calling `throw_exception()`). This would make `merge()` as close to structured concurrency as is possible with `folly::AsyncGenerator` (which can always be synchronously destroyed, leaving no choice but to detach child coroutines; but as long as the user doesn't do that, it should be fine). But I'm hesitant to do it because some existing call sites might rely on current behavior. I'll leave this to the team that actually owns this code. Same for the two cancellation quirks listed in the added comment.

Reviewed By: iahs

Differential Revision: D33557074

fbshipit-source-id: cf7aaee8ec78e21c370d1f7faff3b4bad1327c95
parent 28ef22ea
......@@ -49,6 +49,7 @@ AsyncGenerator<Reference, Value> merge(
coro::Mutex mutex;
coro::Baton recordPublished;
coro::Baton recordConsumed;
coro::Baton allTasksCompleted;
CallbackRecord<Reference> record;
};
......@@ -161,7 +162,9 @@ AsyncGenerator<Reference, Value> merge(
// Start a task that consumes the stream of input streams.
makeConsumerTask(state, std::move(sources))
.scheduleOn(executor)
.start([](auto&&) {}, state->cancelSource.getToken());
.start(
[state](auto&&) { state->allTasksCompleted.post(); },
state->cancelSource.getToken());
// Consume values produced by the input streams.
while (true) {
......@@ -178,12 +181,18 @@ AsyncGenerator<Reference, Value> merge(
if (state->record.hasValue()) {
// next value
co_yield std::move(state->record).value();
} else if (state->record.hasError()) {
std::move(state->record).error().throw_exception();
} else {
// none
assert(state->record.hasNone());
break;
// We're closing the output stream. In the spirit of structured
// concurrency, let's make sure to not leave any background tasks behind.
co_await state->allTasksCompleted;
if (state->record.hasError()) {
std::move(state->record).error().throw_exception();
} else {
// none
assert(state->record.hasNone());
break;
}
}
}
}
......
......@@ -33,13 +33,33 @@ namespace coro {
// produced by 'sources', interleaving them in the order that the values
// are produced.
//
// If any of the input streams completes with an error then the error
// is produced from the output stream and the remainder of the input streams
// are truncated, discarding any remaining values.
// The resulting stream will terminate when the end of the 'sources' stream has
// been reached and the ends of all of the input streams it produced have been
// reached.
//
// The resulting stream will terminate only when the end of the 'sources'
// stream has been reached and the ends of all of the input streams it
// produced have been reached.
// On exception, cancellation, or output stream destruction: cancels remaining
// input streams and 'sources', discards any remaining values, and produces an
// exception (if an input stream produced an exception) or end-of-stream
// (if next() call was cancelled).
//
// Structured concurrency: if the output stream produced an empty value
// (end-of-stream) or an exception, it's guaranteed that 'sources' and all input
// generators have been destroyed.
// If the output stream is destroyed before reaching end-of-stream or exception,
// the remaining input generators are cancelled and detached; beware of
// use-after-free.
//
// Normally cancelling output stream's next() call cancels the stream, discards
// any remaining values, and returns an end-of-stream. But there are caveats:
// * If there's an item ready to be delivered, next() call returns it without
// checking for cancellation. So if input streams are fast, and next() is
// called infrequently, cancellation may go unprocessed indefinitely unless
// you also check for cancellation on your side (which you should probably do
// anyway unless you're calling next() in a tight loop).
// * It's possible that the cancelled next() registers the cancellation but
// returns a value anyway (if it was produced at just the right moment). Then
// a later next() call would return end-of-stream even if it was called with
// a different, non-cancelled cancellation token.
template <typename Reference, typename Value>
AsyncGenerator<Reference, Value> merge(
folly::Executor::KeepAlive<> executor,
......
......@@ -18,6 +18,7 @@
#include <folly/CancellationToken.h>
#include <folly/ScopeGuard.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
......@@ -237,4 +238,101 @@ TEST_F(MergeTest, CancellationTokenPropagatesToInnerFromConsumer) {
}());
}
// Check that by the time merged generator's next() returns an empty value
// (end of stream) or throws an exception all source generators are destroyed.
TEST_F(MergeTest, SourcesAreDestroyedBeforeEof) {
std::atomic<int> runningSourceGenerators = 0;
std::atomic<int> runningListGenerators = 0;
auto sourceGenerator =
[&](bool shouldThrow) -> folly::coro::AsyncGenerator<int> {
++runningSourceGenerators;
SCOPE_EXIT { --runningSourceGenerators; };
co_await folly::coro::co_reschedule_on_current_executor;
co_yield 42;
co_await folly::coro::co_reschedule_on_current_executor;
if (shouldThrow) {
throw std::runtime_error("test exception");
}
};
auto listGenerator = [&](bool shouldThrow)
-> folly::coro::AsyncGenerator<folly::coro::AsyncGenerator<int>> {
CHECK(runningListGenerators == 0);
++runningListGenerators;
SCOPE_EXIT {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
--runningListGenerators;
};
for (int i = 0;; ++i) {
co_await folly::coro::co_reschedule_on_current_executor;
co_yield sourceGenerator(shouldThrow && (i % 2 == 1));
}
};
folly::CPUThreadPoolExecutor exec(4);
// Stream interrupted by cancellation.
auto future =
folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto gen =
folly::coro::merge(&exec, listGenerator(/* shouldThrow */ false));
folly::CancellationSource cancelSource;
auto r = co_await folly::coro::co_withCancellation(
cancelSource.getToken(), gen.next());
CHECK(r.has_value());
CHECK_EQ(*r, 42);
CHECK_GT(
runningSourceGenerators.load() + runningListGenerators.load(), 0);
cancelSource.requestCancellation();
// Currently the merged generator discards items produced
// after cancellation. But this behavior is not important, and
// it would probably be equally fine to return them (but stop
// calling source generators for more), so this test accepts
// either behavior.
while (true) {
r = co_await folly::coro::co_withCancellation(
cancelSource.getToken(), gen.next());
if (!r.has_value()) {
break;
}
CHECK_EQ(*r, 42);
}
CHECK_EQ(runningSourceGenerators.load(), 0);
CHECK_EQ(runningListGenerators.load(), 0);
})
.scheduleOn(&exec)
.start();
std::move(future).get();
// Stream interrupted by exception.
future =
folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
auto gen =
folly::coro::merge(&exec, listGenerator(/* shouldThrow */ true));
auto r = co_await gen.next();
CHECK(r.has_value());
CHECK_EQ(*r, 42);
CHECK_GT(
runningSourceGenerators.load() + runningListGenerators.load(), 0);
while (true) {
auto r2 = co_await folly::coro::co_awaitTry(gen.next());
if (!r2.hasValue()) {
CHECK(
r2.exception().what().find("test exception") !=
std::string::npos);
break;
}
CHECK(r2->has_value());
CHECK_EQ(r2->value(), 42);
}
CHECK_EQ(runningSourceGenerators.load(), 0);
CHECK_EQ(runningListGenerators.load(), 0);
})
.scheduleOn(&exec)
.start();
std::move(future).get();
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment