Commit 2e9437e4 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Hook up collectAll() algorithms to propagate async stack chains

Summary:
The collectAll() family of algorithms now hooks up async-stack frames
of child operations to ensure that they trace back up through to the
caller of collectAll() rather than stopping at the call to .start()
inside collectAll().

This also adds the helper `folly::coro::detail::co_current_async_stack_frame` which
allows the coroutine to get access to its own AsyncStackFrame
object - which is needed for algorithms like collectAll() which
then need to pass this on to the BarrierTask when it is started.

Reviewed By: andriigrynenko

Differential Revision: D24466130

fbshipit-source-id: f7058388ebc8fa212684eb0edc0a83deef6fc2a2
parent 036fea85
......@@ -18,6 +18,7 @@
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/detail/Barrier.h>
#include <folly/experimental/coro/detail/BarrierTask.h>
#include <folly/experimental/coro/detail/CurrentAsyncFrame.h>
#include <folly/experimental/coro/detail/Helpers.h>
namespace folly {
......@@ -87,6 +88,8 @@ auto collectAllTryImpl(
folly::coro::detail::Barrier barrier{sizeof...(SemiAwaitables) + 1};
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
// Use std::initializer_list to ensure that the sub-tasks are launched
// in the order they appear in the parameter pack.
......@@ -96,7 +99,7 @@ auto collectAllTryImpl(
// context.
const auto context = RequestContext::saveContext();
(void)std::initializer_list<int>{
(tasks[Indices].start(&barrier),
(tasks[Indices].start(&barrier, asyncFrame),
RequestContext::setContext(context),
0)...};
......@@ -179,10 +182,12 @@ auto collectAllImpl(
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
// Use std::initializer_list to ensure that the sub-tasks are launched
// in the order they appear in the parameter pack.
(void)std::initializer_list<int>{
(tasks[Indices].start(&barrier),
(tasks[Indices].start(&barrier, asyncFrame),
RequestContext::setContext(context),
0)...};
......@@ -295,11 +300,13 @@ auto collectAllRange(InputRange awaitables)
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
// Launch the tasks and wait for them all to finish.
{
detail::Barrier barrier{tasks.size() + 1};
for (auto&& task : tasks) {
task.start(&barrier);
task.start(&barrier, asyncFrame);
RequestContext::setContext(context);
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
......@@ -382,11 +389,13 @@ auto collectAllRange(InputRange awaitables) -> folly::coro::Task<void> {
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
// Launch the tasks and wait for them all to finish.
{
detail::Barrier barrier{tasks.size() + 1};
for (auto&& task : tasks) {
task.start(&barrier);
task.start(&barrier, asyncFrame);
RequestContext::setContext(context);
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
......@@ -462,11 +471,13 @@ auto collectAllTryRange(InputRange awaitables)
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
// Launch the tasks and wait for them all to finish.
{
detail::Barrier barrier{tasks.size() + 1};
for (auto&& task : tasks) {
task.start(&barrier);
task.start(&barrier, asyncFrame);
RequestContext::setContext(context);
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
......@@ -564,6 +575,8 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
try {
auto lock = co_await mutex.co_scoped_lock();
......@@ -578,7 +591,7 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
workerTasks.push_back(makeWorker());
barrier.add(1);
workerTasks.back().start(&barrier);
workerTasks.back().start(&barrier, asyncFrame);
RequestContext::setContext(context);
......@@ -720,6 +733,8 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
try {
auto lock = co_await mutex.co_scoped_lock();
......@@ -734,7 +749,7 @@ auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
workerTasks.push_back(makeWorker());
barrier.add(1);
workerTasks.back().start(&barrier);
workerTasks.back().start(&barrier, asyncFrame);
RequestContext::setContext(context);
......@@ -873,6 +888,8 @@ auto collectAllTryWindowed(InputRange awaitables, std::size_t maxConcurrency)
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
try {
auto lock = co_await mutex.co_scoped_lock();
while (!iterationException && iter != iterEnd &&
......@@ -886,7 +903,7 @@ auto collectAllTryWindowed(InputRange awaitables, std::size_t maxConcurrency)
workerTasks.push_back(makeWorker());
barrier.add(1);
workerTasks.back().start(&barrier);
workerTasks.back().start(&barrier, asyncFrame);
RequestContext::setContext(context);
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This macro enables FbSystrace usage in production for fb4a. When
* FOLLY_SCOPED_TRACE_SECTION_HEADER is defined then a trace section is started
* and later automatically terminated at the close of the scope it is called in.
* In all other cases no action is taken.
*/
#pragma once
#include <folly/Executor.h>
#include <folly/experimental/coro/WithAsyncStack.h>
#include <folly/tracing/AsyncStack.h>
#include <experimental/coroutine>
namespace folly {
namespace coro {
namespace detail {
// Helper struct for getting access to the current coroutine's AsyncStackFrame
class CurrentAsyncStackFrameAwaitable {
class Awaiter {
public:
bool await_ready() noexcept { return false; }
template <typename Promise>
bool await_suspend(
std::experimental::coroutine_handle<Promise> h) noexcept {
asyncFrame_ = &h.promise().getAsyncFrame();
return false;
}
folly::AsyncStackFrame& await_resume() noexcept { return *asyncFrame_; }
private:
folly::AsyncStackFrame* asyncFrame_ = nullptr;
};
public:
CurrentAsyncStackFrameAwaitable viaIfAsync(
const folly::Executor::KeepAlive<>&) const noexcept {
return {};
}
friend Awaiter tag_invoke(
cpo_t<co_withAsyncStack>,
CurrentAsyncStackFrameAwaitable) noexcept {
return Awaiter{};
}
};
// Await this object within a coroutine to obtain a reference to the current
// coroutine's AsyncStackFrame. This will only work within a coroutine whose
// promise_type implements the getAsyncFrame() method.
inline constexpr CurrentAsyncStackFrameAwaitable co_current_async_stack_frame{};
} // namespace detail
} // namespace coro
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment