Commit db57efd0 authored by Hannes Roth's avatar Hannes Roth Committed by Sara Golemon

(Wangle) variadic collect

Summary:
For D2099047 (matthieu) and also for symmetry. Can re-use most of the
code, also refactored it a bit (using an empty base case).

Test Plan:
Run all the tests.

Will add some more before committing.

Reviewed By: jsedgwick@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant, matthieu

FB internal diff: D2131515

Signature: t1:2131515:1433776852:544166fbfdfabf6760fd78f87821290e17e6e4a3
parent 023eb4f8
...@@ -545,13 +545,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { ...@@ -545,13 +545,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) {
// collectAll (variadic) // collectAll (variadic)
template <typename... Fs> template <typename... Fs>
typename detail::VariadicContext< typename detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>::type typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs) { collectAll(Fs&&... fs) {
auto ctx = std::make_shared<detail::VariadicContext< auto ctx = std::make_shared<detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>>(); typename std::decay<Fs>::type::value_type...>>();
detail::collectAllVariadicHelper(ctx, detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
std::forward<typename std::decay<Fs>::type>(fs)...); ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
...@@ -581,6 +581,8 @@ collectAll(InputIterator first, InputIterator last) { ...@@ -581,6 +581,8 @@ collectAll(InputIterator first, InputIterator last) {
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
// collect (iterator)
namespace detail { namespace detail {
template <typename T> template <typename T>
...@@ -648,6 +650,21 @@ collect(InputIterator first, InputIterator last) { ...@@ -648,6 +650,21 @@ collect(InputIterator first, InputIterator last) {
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
// collect (variadic)
template <typename... Fs>
typename detail::CollectVariadicContext<
typename std::decay<Fs>::type::value_type...>::type
collect(Fs&&... fs) {
auto ctx = std::make_shared<detail::CollectVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
detail::collectVariadicHelper<detail::CollectVariadicContext>(
ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
return ctx->p.getFuture();
}
// collectAny (iterator)
template <class InputIterator> template <class InputIterator>
Future< Future<
std::pair<size_t, std::pair<size_t,
...@@ -673,6 +690,8 @@ collectAny(InputIterator first, InputIterator last) { ...@@ -673,6 +690,8 @@ collectAny(InputIterator first, InputIterator last) {
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
// collectN (iterator)
template <class InputIterator> template <class InputIterator>
Future<std::vector<std::pair<size_t, Try<typename Future<std::vector<std::pair<size_t, Try<typename
std::iterator_traits<InputIterator>::value_type::value_type>>>> std::iterator_traits<InputIterator>::value_type::value_type>>>>
...@@ -709,6 +728,8 @@ collectN(InputIterator first, InputIterator last, size_t n) { ...@@ -709,6 +728,8 @@ collectN(InputIterator first, InputIterator last, size_t n) {
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
// reduce (iterator)
template <class It, class T, class F> template <class It, class T, class F>
Future<T> reduce(It first, It last, T&& initial, F&& func) { Future<T> reduce(It first, It last, T&& initial, F&& func) {
if (first == last) { if (first == last) {
...@@ -740,6 +761,8 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) { ...@@ -740,6 +761,8 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
return f; return f;
} }
// window (collection)
template <class Collection, class F, class ItT, class Result> template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>> std::vector<Future<Result>>
window(Collection input, F func, size_t n) { window(Collection input, F func, size_t n) {
...@@ -787,6 +810,8 @@ window(Collection input, F func, size_t n) { ...@@ -787,6 +810,8 @@ window(Collection input, F func, size_t n) {
return futures; return futures;
} }
// reduce
template <class T> template <class T>
template <class I, class F> template <class I, class F>
Future<I> Future<T>::reduce(I&& initial, F&& func) { Future<I> Future<T>::reduce(I&& initial, F&& func) {
...@@ -801,6 +826,8 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) { ...@@ -801,6 +826,8 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) {
}); });
} }
// unorderedReduce (iterator)
template <class It, class T, class F, class ItT, class Arg> template <class It, class T, class F, class ItT, class Arg>
Future<T> unorderedReduce(It first, It last, T initial, F func) { Future<T> unorderedReduce(It first, It last, T initial, F func) {
if (first == last) { if (first == last) {
...@@ -849,6 +876,8 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) { ...@@ -849,6 +876,8 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
return ctx->promise_.getFuture(); return ctx->promise_.getFuture();
} }
// within
template <class T> template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) { Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk); return within(dur, TimedOut(), tk);
...@@ -890,6 +919,8 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) { ...@@ -890,6 +919,8 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
return ctx->promise.getFuture().via(getExecutor()); return ctx->promise.getFuture().via(getExecutor());
} }
// delayed
template <class T> template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) { Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk)) return collectAll(*this, futures::sleep(dur, tk))
......
...@@ -41,7 +41,8 @@ struct isTry<Try<T>> : std::true_type {}; ...@@ -41,7 +41,8 @@ struct isTry<Try<T>> : std::true_type {};
namespace detail { namespace detail {
template <class> class Core; template <class> class Core;
template <class...> struct VariadicContext; template <class...> struct CollectAllVariadicContext;
template <class...> struct CollectVariadicContext;
template <class> struct CollectContext; template <class> struct CollectContext;
template<typename F, typename... Args> template<typename F, typename... Args>
......
...@@ -341,34 +341,59 @@ class Core { ...@@ -341,34 +341,59 @@ class Core {
}; };
template <typename... Ts> template <typename... Ts>
struct VariadicContext { struct CollectAllVariadicContext {
VariadicContext() {} CollectAllVariadicContext() {}
~VariadicContext() { template <typename T, size_t I>
inline void setPartialResult(Try<T>& t) {
std::get<I>(results) = std::move(t);
}
~CollectAllVariadicContext() {
p.setValue(std::move(results)); p.setValue(std::move(results));
} }
Promise<std::tuple<Try<Ts>... >> p; Promise<std::tuple<Try<Ts>...>> p;
std::tuple<Try<Ts>... > results; std::tuple<Try<Ts>...> results;
typedef Future<std::tuple<Try<Ts>...>> type; typedef Future<std::tuple<Try<Ts>...>> type;
}; };
template <typename... Ts, typename THead, typename... Fs> template <typename... Ts>
typename std::enable_if<sizeof...(Fs) == 0, void>::type struct CollectVariadicContext {
collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx, CollectVariadicContext() {}
THead&& head, Fs&&... tail) { template <typename T, size_t I>
head.setCallback_([ctx](Try<typename THead::value_type>&& t) { inline void setPartialResult(Try<T>& t) {
std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t); if (t.hasException()) {
}); if (!threw.exchange(true)) {
p.setException(std::move(t.exception()));
}
} else if (!threw) {
std::get<I>(results) = std::move(t.value());
}
}
~CollectVariadicContext() {
if (!threw.exchange(true)) {
p.setValue(std::move(results));
}
}
Promise<std::tuple<Ts...>> p;
std::tuple<Ts...> results;
std::atomic<bool> threw;
typedef Future<std::tuple<Ts...>> type;
};
template <template <typename ...> class T, typename... Ts>
void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx) {
// base case
} }
template <typename... Ts, typename THead, typename... Fs> template <template <typename ...> class T, typename... Ts,
typename std::enable_if<sizeof...(Fs) != 0, void>::type typename THead, typename... TTail>
collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx, void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
THead&& head, Fs&&... tail) { THead&& head, TTail&&... tail) {
head.setCallback_([ctx](Try<typename THead::value_type>&& t) { head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t); ctx->template setPartialResult<typename THead::value_type,
sizeof...(Ts) - sizeof...(TTail) - 1>(t);
}); });
// template tail-recursion // template tail-recursion
collectAllVariadicHelper(ctx, std::forward<Fs>(tail)...); collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
} }
}} // folly::detail }} // folly::detail
...@@ -155,7 +155,7 @@ auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) { ...@@ -155,7 +155,7 @@ auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) {
/// is a Future<std::tuple<Try<T1>, Try<T2>, ...>>. /// is a Future<std::tuple<Try<T1>, Try<T2>, ...>>.
/// The Futures are moved in, so your copies are invalid. /// The Futures are moved in, so your copies are invalid.
template <typename... Fs> template <typename... Fs>
typename detail::VariadicContext< typename detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>::type typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs); collectAll(Fs&&... fs);
...@@ -174,6 +174,14 @@ auto collect(Collection&& c) -> decltype(collect(c.begin(), c.end())) { ...@@ -174,6 +174,14 @@ auto collect(Collection&& c) -> decltype(collect(c.begin(), c.end())) {
return collect(c.begin(), c.end()); return collect(c.begin(), c.end());
} }
/// Like collectAll, but will short circuit on the first exception. Thus, the
/// type of the returned Future is std::tuple<T1, T2, ...> instead of
/// std::tuple<Try<T1>, Try<T2>, ...>
template <typename... Fs>
typename detail::CollectVariadicContext<
typename std::decay<Fs>::type::value_type...>::type
collect(Fs&&... fs);
/** The result is a pair of the index of the first Future to complete and /** The result is a pair of the index of the first Future to complete and
the Try. If multiple Futures complete at the same time (or are already the Try. If multiple Futures complete at the same time (or are already
complete when passed in), the "winner" is chosen non-deterministically. complete when passed in), the "winner" is chosen non-deterministically.
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <boost/thread/barrier.hpp>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/Random.h> #include <folly/Random.h>
#include <folly/small_vector.h> #include <folly/small_vector.h>
...@@ -353,6 +355,134 @@ TEST(Collect, alreadyCompleted) { ...@@ -353,6 +355,134 @@ TEST(Collect, alreadyCompleted) {
} }
} }
TEST(Collect, parallel) {
std::vector<Promise<int>> ps(10);
std::vector<Future<int>> fs;
for (size_t i = 0; i < ps.size(); i++) {
fs.emplace_back(ps[i].getFuture());
}
auto f = collect(fs);
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
ps[i].setValue(i);
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
for (size_t i = 0; i < ps.size(); i++) {
EXPECT_EQ(i, f.value()[i]);
}
}
TEST(Collect, parallelWithError) {
std::vector<Promise<int>> ps(10);
std::vector<Future<int>> fs;
for (size_t i = 0; i < ps.size(); i++) {
fs.emplace_back(ps[i].getFuture());
}
auto f = collect(fs);
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
if (i == (ps.size()/2)) {
ps[i].setException(eggs);
} else {
ps[i].setValue(i);
}
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
EXPECT_THROW(f.value(), eggs_t);
}
TEST(Collect, allParallel) {
std::vector<Promise<int>> ps(10);
std::vector<Future<int>> fs;
for (size_t i = 0; i < ps.size(); i++) {
fs.emplace_back(ps[i].getFuture());
}
auto f = collectAll(fs);
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
ps[i].setValue(i);
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
for (size_t i = 0; i < ps.size(); i++) {
EXPECT_TRUE(f.value()[i].hasValue());
EXPECT_EQ(i, f.value()[i].value());
}
}
TEST(Collect, allParallelWithError) {
std::vector<Promise<int>> ps(10);
std::vector<Future<int>> fs;
for (size_t i = 0; i < ps.size(); i++) {
fs.emplace_back(ps[i].getFuture());
}
auto f = collectAll(fs);
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
if (i == (ps.size()/2)) {
ps[i].setException(eggs);
} else {
ps[i].setValue(i);
}
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
for (size_t i = 0; i < ps.size(); i++) {
if (i == (ps.size()/2)) {
EXPECT_THROW(f.value()[i].value(), eggs_t);
} else {
EXPECT_TRUE(f.value()[i].hasValue());
EXPECT_EQ(i, f.value()[i].value());
}
}
}
TEST(Collect, collectN) { TEST(Collect, collectN) {
std::vector<Promise<void>> promises(10); std::vector<Promise<void>> promises(10);
std::vector<Future<void>> futures; std::vector<Future<void>> futures;
...@@ -443,6 +573,59 @@ TEST(Collect, collectAllVariadicReferences) { ...@@ -443,6 +573,59 @@ TEST(Collect, collectAllVariadicReferences) {
EXPECT_TRUE(flag); EXPECT_TRUE(flag);
} }
TEST(Collect, collectAllVariadicWithException) {
Promise<bool> pb;
Promise<int> pi;
Future<bool> fb = pb.getFuture();
Future<int> fi = pi.getFuture();
bool flag = false;
collectAll(std::move(fb), std::move(fi))
.then([&](std::tuple<Try<bool>, Try<int>> tup) {
flag = true;
EXPECT_TRUE(std::get<0>(tup).hasValue());
EXPECT_EQ(std::get<0>(tup).value(), true);
EXPECT_TRUE(std::get<1>(tup).hasException());
EXPECT_THROW(std::get<1>(tup).value(), eggs_t);
});
pb.setValue(true);
EXPECT_FALSE(flag);
pi.setException(eggs);
EXPECT_TRUE(flag);
}
TEST(Collect, collectVariadic) {
Promise<bool> pb;
Promise<int> pi;
Future<bool> fb = pb.getFuture();
Future<int> fi = pi.getFuture();
bool flag = false;
collect(std::move(fb), std::move(fi))
.then([&](std::tuple<bool, int> tup) {
flag = true;
EXPECT_EQ(std::get<0>(tup), true);
EXPECT_EQ(std::get<1>(tup), 42);
});
pb.setValue(true);
EXPECT_FALSE(flag);
pi.setValue(42);
EXPECT_TRUE(flag);
}
TEST(Collect, collectVariadicWithException) {
Promise<bool> pb;
Promise<int> pi;
Future<bool> fb = pb.getFuture();
Future<int> fi = pi.getFuture();
bool flag = false;
auto f = collect(std::move(fb), std::move(fi));
pb.setValue(true);
EXPECT_FALSE(f.isReady());
pi.setException(eggs);
EXPECT_TRUE(f.isReady());
EXPECT_TRUE(f.getTry().hasException());
EXPECT_THROW(f.get(), eggs_t);
}
TEST(Collect, collectAllNone) { TEST(Collect, collectAllNone) {
std::vector<Future<int>> fs; std::vector<Future<int>> fs;
auto f = collectAll(fs); auto f = collectAll(fs);
......
...@@ -413,6 +413,31 @@ TEST(Future, thenFunctionFuture) { ...@@ -413,6 +413,31 @@ TEST(Future, thenFunctionFuture) {
EXPECT_EQ(f.value(), "start;static;class-static;class"); EXPECT_EQ(f.value(), "start;static;class-static;class");
} }
TEST(Future, thenStdFunction) {
{
std::function<int()> fn = [](){ return 42; };
auto f = makeFuture().then(std::move(fn));
EXPECT_EQ(f.value(), 42);
}
{
std::function<int(int)> fn = [](int i){ return i + 23; };
auto f = makeFuture(19).then(std::move(fn));
EXPECT_EQ(f.value(), 42);
}
{
std::function<int(Try<int>&)> fn = [](Try<int>& t){ return t.value() + 2; };
auto f = makeFuture(1).then(std::move(fn));
EXPECT_EQ(f.value(), 3);
}
{
bool flag = false;
std::function<void()> fn = [&flag](){ flag = true; };
auto f = makeFuture().then(std::move(fn));
EXPECT_TRUE(f.isReady());
EXPECT_TRUE(flag);
}
}
TEST(Future, thenBind) { TEST(Future, thenBind) {
auto l = []() { auto l = []() {
return makeFuture("bind"); return makeFuture("bind");
......
...@@ -16,12 +16,17 @@ ...@@ -16,12 +16,17 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <boost/thread/barrier.hpp>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <vector> #include <vector>
using namespace folly; using namespace folly;
typedef FutureException eggs_t;
static eggs_t eggs("eggs");
TEST(Window, basic) { TEST(Window, basic) {
// int -> Future<int> // int -> Future<int>
auto fn = [](std::vector<int> input, size_t window_size, size_t expect) { auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
...@@ -79,3 +84,107 @@ TEST(Window, basic) { ...@@ -79,3 +84,107 @@ TEST(Window, basic) {
EXPECT_EQ(6, res); EXPECT_EQ(6, res);
} }
} }
TEST(Window, parallel) {
std::vector<int> input;
std::vector<Promise<int>> ps(10);
for (size_t i = 0; i < ps.size(); i++) {
input.emplace_back(i);
}
auto f = collect(window(input, [&](int i) {
return ps[i].getFuture();
}, 3));
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
ps[i].setValue(i);
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
for (size_t i = 0; i < ps.size(); i++) {
EXPECT_EQ(i, f.value()[i]);
}
}
TEST(Window, parallelWithError) {
std::vector<int> input;
std::vector<Promise<int>> ps(10);
for (size_t i = 0; i < ps.size(); i++) {
input.emplace_back(i);
}
auto f = collect(window(input, [&](int i) {
return ps[i].getFuture();
}, 3));
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
if (i == (ps.size()/2)) {
ps[i].setException(eggs);
} else {
ps[i].setValue(i);
}
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
EXPECT_THROW(f.value(), eggs_t);
}
TEST(Window, allParallelWithError) {
std::vector<int> input;
std::vector<Promise<int>> ps(10);
for (size_t i = 0; i < ps.size(); i++) {
input.emplace_back(i);
}
auto f = collectAll(window(input, [&](int i) {
return ps[i].getFuture();
}, 3));
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
if (i == (ps.size()/2)) {
ps[i].setException(eggs);
} else {
ps[i].setValue(i);
}
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(f.isReady());
for (size_t i = 0; i < ps.size(); i++) {
if (i == (ps.size()/2)) {
EXPECT_THROW(f.value()[i].value(), eggs_t);
} else {
EXPECT_TRUE(f.value()[i].hasValue());
EXPECT_EQ(i, f.value()[i].value());
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment