Commit e872b440 authored by Hannes Roth's avatar Hannes Roth Committed by Viswanath Sivakumar

(Wangle) unorderedReduce

Summary:
Use this if you don't need the order of the input, e.g. summing up
values. This constructs a separate Future chain to do the reducing,
because we don't want to add locking while reducing. The only lock
necessary is when adding a new Future to the chain, which should be
really quick.

Test Plan: Run all the tests.

Reviewed By: hans@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2015326

Tasks: 6025252

Signature: t1:2015326:1431557191:9ea2edccb0162dedf067b5b3300de2fe72a1a4c9
parent 0593c224
......@@ -800,6 +800,54 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) {
});
}
template <class It, class T, class F, class ItT, class Arg>
Future<T> unorderedReduce(It first, It last, T initial, F func) {
if (first == last) {
return makeFuture(std::move(initial));
}
typedef isTry<Arg> IsTry;
struct UnorderedReduceContext {
UnorderedReduceContext(T&& memo, F&& fn, size_t n)
: lock_(), memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
{};
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
F func_;
size_t numThens_; // how many Futures completed and called .then()
size_t numFutures_; // how many Futures in total
Promise<T> promise_;
};
auto ctx = std::make_shared<UnorderedReduceContext>(
std::move(initial), std::move(func), std::distance(first, last));
mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
folly::MoveWrapper<Try<ItT>> mt(std::move(t));
// Futures can be completed in any order, simultaneously.
// To make this non-blocking, we create a new Future chain in
// the order of completion to reduce the values.
// The spinlock just protects chaining a new Future, not actually
// executing the reduce, which should be really fast.
folly::MSLGuard lock(ctx->lock_);
ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
// Either return a ItT&& or a Try<ItT>&& depending
// on the type of the argument of func.
return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
});
if (++ctx->numThens_ == ctx->numFutures_) {
// After reducing the value of the last Future, fulfill the Promise
ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
ctx->promise_.setValue(std::move(t2));
});
}
});
return ctx->promise_.getFuture();
}
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
......
......@@ -239,6 +239,9 @@ using isFutureResult = isFuture<typename std::result_of<F(T&&, Arg&&)>::type>;
The type of the final result is a Future of the type of the initial value.
Func can either return a T, or a Future<T>
func is called in order of the input, see unorderedReduce if that is not
a requirement
*/
template <class It, class T, class F>
Future<T> reduce(It first, It last, T&& initial, F&& func);
......@@ -255,4 +258,24 @@ auto reduce(Collection&& c, T&& initial, F&& func)
std::forward<F>(func));
}
/** like reduce, but calls func on finished futures as they complete
does NOT keep the order of the input
*/
template <class It, class T, class F,
class ItT = typename std::iterator_traits<It>::value_type::value_type,
class Arg = MaybeTryArg<F, T, ItT>>
Future<T> unorderedReduce(It first, It last, T initial, F func);
/// Sugar for the most common case
template <class Collection, class T, class F>
auto unorderedReduce(Collection&& c, T&& initial, F&& func)
-> decltype(unorderedReduce(c.begin(), c.end(), std::forward<T>(initial),
std::forward<F>(func))) {
return unorderedReduce(
c.begin(),
c.end(),
std::forward<T>(initial),
std::forward<F>(func));
}
} // namespace folly
......@@ -1807,6 +1807,60 @@ TEST(Reduce, Chain) {
}
}
TEST(Reduce, Streaming) {
{
std::vector<Future<int>> fs;
fs.push_back(makeFuture(1));
fs.push_back(makeFuture(2));
fs.push_back(makeFuture(3));
Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
[](double a, int&& b){
return double(b);
});
EXPECT_EQ(3.0, f.get());
}
{
Promise<int> p1;
Promise<int> p2;
Promise<int> p3;
std::vector<Future<int>> fs;
fs.push_back(p1.getFuture());
fs.push_back(p2.getFuture());
fs.push_back(p3.getFuture());
Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
[](double a, int&& b){
return double(b);
});
p3.setValue(3);
p2.setValue(2);
p1.setValue(1);
EXPECT_EQ(1.0, f.get());
}
}
TEST(Reduce, StreamingException) {
Promise<int> p1;
Promise<int> p2;
Promise<int> p3;
std::vector<Future<int>> fs;
fs.push_back(p1.getFuture());
fs.push_back(p2.getFuture());
fs.push_back(p3.getFuture());
Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
[](double a, int&& b){
return b + 0.0;
});
p3.setValue(3);
p2.setException(exception_wrapper(std::runtime_error("blah")));
p1.setValue(1);
EXPECT_THROW(f.get(), std::runtime_error);
}
TEST(Map, Basic) {
Promise<int> p1;
Promise<int> p2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment