Commit ac28a012 authored by Pranav Thulasiram Bhat's avatar Pranav Thulasiram Bhat Committed by Facebook GitHub Bot

collectAllFibers and collectFibers

Summary:
This diff implements a wrapper around `fibers::collectAll` to accept annotated functors.

It also implements version of `collect` that accepts variadic arguments, and returns a tuple.

Reviewed By: A5he

Differential Revision: D22142416

fbshipit-source-id: dab320f7e1718c3872c54e168c24ea4bc39051d5
parent 98759027
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace folly {
namespace fibers {
namespace async {
namespace detail {
/**
* Wrapper around the input iterators to return the wrapped functors
*/
template <class InnerIterator>
struct await_iterator {
/**
* Wrapper around the input functor to apply `init_await` on invocation
*/
struct AwaitWrapper {
explicit AwaitWrapper(InnerIterator it) : it_(std::move(it)) {}
auto operator()() {
return init_await((*it_)());
}
private:
InnerIterator it_;
};
using iterator_category = std::input_iterator_tag;
using value_type = AwaitWrapper;
using difference_type = std::size_t;
using pointer = value_type*;
using reference = value_type&;
explicit await_iterator(InnerIterator it) : it_(it) {}
await_iterator& operator++() {
++it_;
return *this;
}
await_iterator operator++(int) {
await_iterator retval = *this;
++(*this);
return retval;
}
bool operator==(await_iterator other) const {
return it_ == other.it_;
}
bool operator!=(await_iterator other) const {
return !(*this == other);
}
value_type operator*() const {
return AwaitWrapper(it_);
}
private:
InnerIterator it_;
};
} // namespace detail
template <class InputIterator, typename FuncType, typename ResultType>
Async<std::vector<
typename std::enable_if<
!std::is_same<ResultType, Async<void>>::value,
async_inner_type_t<ResultType>>::
type>> inline collectAll(InputIterator first, InputIterator last) {
return Async(folly::fibers::collectAll(
detail::await_iterator<InputIterator>(first),
detail::await_iterator<InputIterator>(last)));
}
template <class InputIterator, typename FuncType, typename ResultType>
typename std::
enable_if<std::is_same<ResultType, Async<void>>::value, Async<void>>::
type inline collectAll(InputIterator first, InputIterator last) {
folly::fibers::collectAll(
detail::await_iterator<InputIterator>(first),
detail::await_iterator<InputIterator>(last));
return {};
}
} // namespace async
} // namespace fibers
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Try.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/WhenN.h>
#include <folly/fibers/async/Async.h>
#include <folly/fibers/async/FiberManager.h>
#include <folly/fibers/async/Future.h>
#include <folly/functional/Invoke.h>
#include <algorithm>
#include <vector>
#pragma once
namespace folly {
namespace fibers {
namespace async {
/**
* Schedules several async annotated functors and blocks until all of these are
* completed. If any of the functors throws an exception, this exception will be
* re-thrown, but only when all the tasks are complete. If several throw
* exceptions one of them will be re-thrown.
*
* Returns a vector of the results of the functors.
*/
template <
class InputIterator,
typename FuncType =
typename std::iterator_traits<InputIterator>::value_type,
typename ResultType = invoke_result_t<FuncType>>
Async<std::vector<typename std::enable_if<
!std::is_same<ResultType, Async<void>>::value,
async_inner_type_t<ResultType>>::type>>
collectAll(InputIterator first, InputIterator last);
/**
* collectAll specialization for functions returning void
*/
template <
class InputIterator,
typename FuncType =
typename std::iterator_traits<InputIterator>::value_type,
typename ResultType = invoke_result_t<FuncType>>
typename std::
enable_if<std::is_same<ResultType, Async<void>>::value, Async<void>>::
type inline collectAll(InputIterator first, InputIterator last);
/**
* collectAll version that takes a container instead of iterators for
* convenience
*/
template <class Collection>
auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) {
return collectAll(c.begin(), c.end());
}
/**
* collectAll version that takes a varying number of functors instead of a
* container or iterators
*/
template <typename... Ts>
Async<
std::tuple<folly::lift_unit_t<async_inner_type_t<invoke_result_t<Ts>>>...>>
collectAll(Ts&&... tasks) {
auto future = folly::collectAllUnsafe(detail::addFiberFuture(
std::forward<Ts>(tasks), FiberManager::getFiberManager())...);
auto tuple = await(futureWait(std::move(future)));
return Async(folly::unwrapTryTuple(std::move(tuple)));
}
} // namespace async
} // namespace fibers
} // namespace folly
#include <folly/fibers/async/Collect-inl.h>
...@@ -14,12 +14,14 @@ ...@@ -14,12 +14,14 @@
* limitations under the License. * limitations under the License.
*/ */
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
#include <folly/fibers/FiberManager.h> #include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h> #include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/async/Async.h> #include <folly/fibers/async/Async.h>
#include <folly/fibers/async/Baton.h> #include <folly/fibers/async/Baton.h>
#include <folly/fibers/async/Collect.h>
#include <folly/fibers/async/FiberManager.h> #include <folly/fibers/async/FiberManager.h>
#include <folly/fibers/async/Future.h> #include <folly/fibers/async/Future.h>
#include <folly/fibers/async/Promise.h> #include <folly/fibers/async/Promise.h>
...@@ -31,6 +33,7 @@ ...@@ -31,6 +33,7 @@
#include <folly/fibers/async/Task.h> #include <folly/fibers/async/Task.h>
#endif #endif
using namespace ::testing;
using namespace folly::fibers; using namespace folly::fibers;
namespace { namespace {
...@@ -281,3 +284,50 @@ TEST(FiberManager, asyncFiberManager) { ...@@ -281,3 +284,50 @@ TEST(FiberManager, asyncFiberManager) {
EXPECT_TRUE(innerCompleted); EXPECT_TRUE(innerCompleted);
} }
} }
TEST(AsyncTest, collect) {
auto makeVoidTask = [](bool& ref) {
return [&]() -> async::Async<void> {
ref = true;
return {};
};
};
auto makeRetTask = [](int val) {
return [=]() -> async::Async<int> { return val; };
};
async::executeOnFiberAndWait([&]() -> async::Async<void> {
{
std::array<bool, 3> cs{false, false, false};
std::vector<folly::Function<async::Async<void>()>> tasks;
tasks.emplace_back(makeVoidTask(cs[0]));
tasks.emplace_back(makeVoidTask(cs[1]));
tasks.emplace_back(makeVoidTask(cs[2]));
async::await(async::collectAll(tasks));
EXPECT_THAT(cs, ElementsAre(true, true, true));
}
{
std::vector<folly::Function<async::Async<int>()>> tasks;
tasks.emplace_back(makeRetTask(1));
tasks.emplace_back(makeRetTask(2));
tasks.emplace_back(makeRetTask(3));
EXPECT_THAT(async::await(async::collectAll(tasks)), ElementsAre(1, 2, 3));
}
{
std::array<bool, 3> cs{false, false, false};
async::await(async::collectAll(
makeVoidTask(cs[0]), makeVoidTask(cs[1]), makeVoidTask(cs[2])));
EXPECT_THAT(cs, ElementsAre(true, true, true));
}
{
EXPECT_EQ(
std::make_tuple(1, 2, 3),
async::await(async::collectAll(
makeRetTask(1), makeRetTask(2), makeRetTask(3))));
}
return {};
});
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment