Commit a8990a74 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

Refactor non-variadic collect variants

Summary:
[Folly] Refactor non-variadic `collect` variants.

No change in behavior - just moving code around.

Reviewed By: marshallcline

Differential Revision: D8440630

fbshipit-source-id: a6349c73c2109237f1be174c2260e9c9eb2d1c0e
parent 698d1e78
......@@ -1315,16 +1315,6 @@ FOLLY_ALWAYS_INLINE FOLLY_ATTR_VISIBILITY_HIDDEN void foreach(
} // namespace detail
} // namespace futures
// mapSetCallback calls func(i, Try<T>) when every future completes
template <class T, class InputIterator, class F>
void mapSetCallback(InputIterator first, InputIterator last, F func) {
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([func, i](Try<T>&& t) {
func(i, std::move(t));
});
}
}
// collectAll (variadic)
template <typename... Fs>
......@@ -1362,23 +1352,23 @@ template <class InputIterator>
SemiFuture<std::vector<
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
collectAllSemiFuture(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
using F = typename std::iterator_traits<InputIterator>::value_type;
using T = typename F::value_type;
struct CollectAllContext {
CollectAllContext(size_t n) : results(n) {}
~CollectAllContext() {
struct Context {
explicit Context(size_t n) : results(n) {}
~Context() {
p.setValue(std::move(results));
}
Promise<std::vector<Try<T>>> p;
std::vector<Try<T>> results;
};
auto ctx =
std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
ctx->results[i] = std::move(t);
});
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_(
[i, ctx](Try<T>&& t) { ctx->results[i] = std::move(t); });
}
return ctx->p.getSemiFuture();
}
......@@ -1391,68 +1381,47 @@ collectAll(InputIterator first, InputIterator last) {
// collect (iterator)
namespace futures {
namespace detail {
// TODO(T26439406): Make return SemiFuture
template <class InputIterator>
Future<std::vector<
typename std::iterator_traits<InputIterator>::value_type::value_type>>
collect(InputIterator first, InputIterator last) {
using F = typename std::iterator_traits<InputIterator>::value_type;
using T = typename F::value_type;
template <typename T>
struct CollectContext {
struct Nothing {
explicit Nothing(int /* n */) {}
struct Context {
explicit Context(size_t n) : result(n) {
finalResult.reserve(n);
}
~Context() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
std::transform(
result.begin(),
result.end(),
std::back_inserter(finalResult),
[](Optional<T>& o) { return std::move(o.value()); });
p.setValue(std::move(finalResult));
}
}
Promise<std::vector<T>> p;
std::vector<Optional<T>> result;
std::vector<T> finalResult;
std::atomic<bool> threw{false};
};
using Result = typename std::conditional<
std::is_void<T>::value,
void,
std::vector<T>>::type;
using InternalResult = typename std::conditional<
std::is_void<T>::value,
Nothing,
std::vector<Optional<T>>>::type;
explicit CollectContext(size_t n) : result(n) {
finalResult.reserve(n);
}
~CollectContext() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
std::transform(result.begin(), result.end(),
std::back_inserter(finalResult),
[](Optional<T>& o) { return std::move(o.value()); });
p.setValue(std::move(finalResult));
}
}
void setPartialResult(size_t i, Try<T>& t) {
result[i] = std::move(t.value());
auto ctx = std::make_shared<Context>(std::distance(first, last));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (t.hasException()) {
if (!ctx->threw.exchange(true)) {
ctx->p.setException(std::move(t.exception()));
}
} else if (!ctx->threw) {
ctx->result[i] = std::move(t.value());
}
});
}
Promise<Result> p;
InternalResult result;
Result finalResult;
std::atomic<bool> threw {false};
};
} // namespace detail
} // namespace futures
// TODO(T26439406): Make return SemiFuture
template <class InputIterator>
Future<typename futures::detail::CollectContext<typename std::iterator_traits<
InputIterator>::value_type::value_type>::Result>
collect(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
std::distance(first, last));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (t.hasException()) {
if (!ctx->threw.exchange(true)) {
ctx->p.setException(std::move(t.exception()));
}
} else if (!ctx->threw) {
ctx->setPartialResult(i, t);
}
});
return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
}
......@@ -1501,21 +1470,22 @@ Future<
typename
std::iterator_traits<InputIterator>::value_type::value_type>>>
collectAny(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
using F = typename std::iterator_traits<InputIterator>::value_type;
using T = typename F::value_type;
struct CollectAnyContext {
CollectAnyContext() {}
struct Context {
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done {false};
};
auto ctx = std::make_shared<CollectAnyContext>();
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (!ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t)));
}
});
auto ctx = std::make_shared<Context>();
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (!ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t)));
}
});
}
return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
}
......@@ -1527,27 +1497,27 @@ Future<std::pair<
size_t,
typename std::iterator_traits<InputIterator>::value_type::value_type>>
collectAnyWithoutException(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
using F = typename std::iterator_traits<InputIterator>::value_type;
using T = typename F::value_type;
struct CollectAnyWithoutExceptionContext {
CollectAnyWithoutExceptionContext(){}
struct Context {
Context(size_t n) : nTotal(n) {}
Promise<std::pair<size_t, T>> p;
std::atomic<bool> done{false};
std::atomic<size_t> nFulfilled{0};
size_t nTotal;
};
auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
ctx->nTotal = size_t(std::distance(first, last));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (!t.hasException() && !ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t.value())));
} else if (++ctx->nFulfilled == ctx->nTotal) {
ctx->p.setException(t.exception());
}
});
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (!t.hasException() && !ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t.value())));
} else if (++ctx->nFulfilled == ctx->nTotal) {
ctx->p.setException(t.exception());
}
});
}
return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
}
......@@ -1558,62 +1528,57 @@ SemiFuture<std::vector<std::pair<
size_t,
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
collectN(InputIterator first, InputIterator last, size_t n) {
using T =
typename std::iterator_traits<InputIterator>::value_type::value_type;
using V = std::vector<Optional<Try<T>>>;
using F = typename std::iterator_traits<InputIterator>::value_type;
using T = typename F::value_type;
using Result = std::vector<std::pair<size_t, Try<T>>>;
assert(n > 0);
assert(std::distance(first, last) >= 0);
struct CollectNContext {
explicit CollectNContext(size_t numFutures) : v(numFutures) {}
struct Context {
explicit Context(size_t numFutures, size_t min_)
: v(numFutures), min(min_) {}
V v;
std::vector<Optional<Try<T>>> v;
size_t min;
std::atomic<size_t> completed = {0}; // # input futures completed
std::atomic<size_t> stored = {0}; // # output values stored
Promise<Result> p;
void setPartialResult(size_t index, Try<T>&& t) {
v[index] = std::move(t);
}
void complete() {
Result result;
result.reserve(completed.load());
for (size_t i = 0; i < v.size(); ++i) {
auto& entry = v[i];
if (entry.hasValue()) {
result.emplace_back(i, std::move(entry).value());
}
}
p.setTry(Try<Result>(std::move(result)));
}
};
auto numFutures = static_cast<size_t>(std::distance(first, last));
auto ctx = std::make_shared<CollectNContext>(numFutures);
assert(n > 0);
assert(std::distance(first, last) >= 0);
if (numFutures < n) {
ctx->p.setException(std::runtime_error("Not enough futures"));
} else {
// for each completed Future, increase count and add to vector, until we
// have n completed futures at which point we fulfil our Promise with the
// vector
mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
if (size_t(std::distance(first, last)) < n) {
return SemiFuture<Result>(
exception_wrapper(std::runtime_error("Not enough futures")));
}
// for each completed Future, increase count and add to vector, until we
// have n completed futures at which point we fulfil our Promise with the
// vector
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)), n);
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
// relaxed because this guards control but does not guard data
auto const c = 1 + ctx->completed.fetch_add(1, std::memory_order_relaxed);
if (c > n) {
if (c > ctx->min) {
return;
}
ctx->setPartialResult(i, std::move(t));
ctx->v[i] = std::move(t);
// release because the stored values in all threads must be visible below
// acquire because no stored value is permitted to be fetched early
auto const s = 1 + ctx->stored.fetch_add(1, std::memory_order_acq_rel);
if (s < n) {
if (s < ctx->min) {
return;
}
ctx->complete();
Result result;
result.reserve(ctx->completed.load());
for (size_t j = 0; j < ctx->v.size(); ++j) {
auto& entry = ctx->v[j];
if (entry.hasValue()) {
result.emplace_back(j, std::move(entry).value());
}
}
ctx->p.setTry(Try<Result>(std::move(result)));
});
}
......@@ -1741,28 +1706,26 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) {
// unorderedReduce (iterator)
// TODO(T26439406): Make return SemiFuture
template <class It, class T, class F, class ItT, class Arg>
template <class It, class T, class F>
Future<T> unorderedReduce(It first, It last, T initial, F func) {
using ItF = typename std::iterator_traits<It>::value_type;
using ItT = typename ItF::value_type;
using Arg = MaybeTryArg<F, T, ItT>;
if (first == last) {
return makeFuture(std::move(initial));
}
typedef isTry<Arg> IsTry;
struct UnorderedReduceContext {
UnorderedReduceContext(T&& memo, F&& fn, size_t n)
: lock_(), memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
{}
static void fulfillWithValueOrFuture(Promise<T>&& p, T&& v) {
p.setValue(std::move(v));
}
static void fulfillWithValueOrFuture(Promise<T>&& p, Future<T>&& f) {
f.setCallback_(
[p = std::move(p)](Try<T>&& t) mutable { p.setTry(std::move(t)); });
}
struct Context {
Context(T&& memo, F&& fn, size_t n)
: lock_(),
memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)),
numThens_(0),
numFutures_(n),
promise_() {}
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
......@@ -1772,49 +1735,57 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
Promise<T> promise_;
};
auto ctx = std::make_shared<UnorderedReduceContext>(
std::move(initial), std::move(func), std::distance(first, last));
mapSetCallback<ItT>(
first,
last,
[ctx](size_t /* i */, Try<ItT>&& t) {
// Futures can be completed in any order, simultaneously.
// To make this non-blocking, we create a new Future chain in
// the order of completion to reduce the values.
// The spinlock just protects chaining a new Future, not actually
// executing the reduce, which should be really fast.
Promise<T> p;
auto f = p.getFuture();
{
folly::MSLGuard lock(ctx->lock_);
f = exchange(ctx->memo_, std::move(f));
if (++ctx->numThens_ == ctx->numFutures_) {
// After reducing the value of the last Future, fulfill the Promise
ctx->memo_.setCallback_(
[ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
}
struct Fulfill {
void operator()(Promise<T>&& p, T&& v) const {
p.setValue(std::move(v));
}
void operator()(Promise<T>&& p, Future<T>&& f) const {
f.setCallback_(
[p = std::move(p)](Try<T>&& t) mutable { p.setTry(std::move(t)); });
}
};
auto ctx = std::make_shared<Context>(
std::move(initial), std::move(func), std::distance(first, last));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<ItT>&& t) {
(void)i;
// Futures can be completed in any order, simultaneously.
// To make this non-blocking, we create a new Future chain in
// the order of completion to reduce the values.
// The spinlock just protects chaining a new Future, not actually
// executing the reduce, which should be really fast.
Promise<T> p;
auto f = p.getFuture();
{
folly::MSLGuard lock(ctx->lock_);
f = exchange(ctx->memo_, std::move(f));
if (++ctx->numThens_ == ctx->numFutures_) {
// After reducing the value of the last Future, fulfill the Promise
ctx->memo_.setCallback_(
[ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
}
f.setCallback_([ctx, mp = std::move(p), mt = std::move(t)](
Try<T>&& v) mutable {
if (v.hasValue()) {
try {
ctx->fulfillWithValueOrFuture(
std::move(mp),
ctx->func_(
std::move(v.value()),
mt.template get<IsTry::value, Arg&&>()));
} catch (std::exception& e) {
mp.setException(exception_wrapper(std::current_exception(), e));
} catch (...) {
mp.setException(exception_wrapper(std::current_exception()));
}
f.setCallback_(
[ctx, mp = std::move(p), mt = std::move(t)](Try<T>&& v) mutable {
if (v.hasValue()) {
try {
Fulfill{}(
std::move(mp),
ctx->func_(
std::move(v.value()),
mt.template get<IsTry::value, Arg&&>()));
} catch (std::exception& e) {
mp.setException(exception_wrapper(std::current_exception(), e));
} catch (...) {
mp.setException(exception_wrapper(std::current_exception()));
}
} else {
mp.setTry(std::move(v));
}
} else {
mp.setTry(std::move(v));
}
});
});
});
});
}
return ctx->promise_.getSemiFuture().via(&InlineExecutor::instance());
}
......
......@@ -72,7 +72,6 @@ namespace futures {
namespace detail {
template <class> class Core;
template <class> struct CollectContext;
template <typename...>
struct ArgType;
......
......@@ -307,8 +307,8 @@ Future<std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>> collectAll(
/// type of the returned Future is std::vector<T> instead of
/// std::vector<Try<T>>
template <class InputIterator>
Future<typename futures::detail::CollectContext<typename std::iterator_traits<
InputIterator>::value_type::value_type>::result_type>
Future<std::vector<
typename std::iterator_traits<InputIterator>::value_type::value_type>>
collect(InputIterator first, InputIterator last);
/// Sugar for the most common case
......@@ -435,12 +435,7 @@ auto reduce(Collection&& c, T&& initial, F&& func)
/** like reduce, but calls func on finished futures as they complete
does NOT keep the order of the input
*/
template <
class It,
class T,
class F,
class ItT = typename std::iterator_traits<It>::value_type::value_type,
class Arg = MaybeTryArg<F, T, ItT>>
template <class It, class T, class F>
Future<T> unorderedReduce(It first, It last, T initial, F func);
/// Sugar for the most common case
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment