Commit 37515540 authored by Kevin Doherty's avatar Kevin Doherty Committed by Facebook Github Bot

Avoid calling emplace_back from collectN future callbacks

Summary:
Currently, `collectN` attaches a callback to each input future. When the callback fires, it calls `emplace_back` on vector `ctx->v` with the result of the future. As the added unit test shows, this does not work if the futures are completed in parallel.

See https://fb.facebook.com/groups/474291069286180/permalink/1838209692894304/ for more discussion.

Reviewed By: yfeldblum

Differential Revision: D7372982

fbshipit-source-id: 875eff19172bdbfb7d4f687911f4fa22061e4ed8
parent 1d4b2e2e
......@@ -1367,21 +1367,46 @@ collectAnyWithoutException(InputIterator first, InputIterator last) {
// TODO(T26439406): Make return SemiFuture
template <class InputIterator>
Future<std::vector<std::pair<size_t, Try<typename
std::iterator_traits<InputIterator>::value_type::value_type>>>>
Future<std::vector<std::pair<
size_t,
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
collectN(InputIterator first, InputIterator last, size_t n) {
typedef typename
std::iterator_traits<InputIterator>::value_type::value_type T;
typedef std::vector<std::pair<size_t, Try<T>>> V;
using T =
typename std::iterator_traits<InputIterator>::value_type::value_type;
using V = std::vector<Optional<Try<T>>>;
using Result = std::vector<std::pair<size_t, Try<T>>>;
assert(n > 0);
assert(std::distance(first, last) >= 0);
struct CollectNContext {
explicit CollectNContext(size_t numFutures) : v(numFutures) {}
V v;
std::atomic<size_t> completed = {0};
Promise<V> p;
Promise<Result> p;
inline void setPartialResult(size_t index, Try<T>&& t) {
v[index] = std::move(t);
}
inline void complete() {
Result result;
result.reserve(completed.load());
for (size_t i = 0; i < v.size(); ++i) {
auto& entry = v[i];
if (entry.hasValue()) {
result.emplace_back(i, std::move(entry).value());
}
}
p.setTry(Try<Result>(std::move(result)));
}
};
auto ctx = std::make_shared<CollectNContext>();
if (size_t(std::distance(first, last)) < n) {
auto numFutures = static_cast<size_t>(std::distance(first, last));
auto ctx = std::make_shared<CollectNContext>(numFutures);
if (numFutures < n) {
ctx->p.setException(std::runtime_error("Not enough futures"));
} else {
// for each completed Future, increase count and add to vector, until we
......@@ -1390,10 +1415,9 @@ collectN(InputIterator first, InputIterator last, size_t n) {
mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
auto c = ++ctx->completed;
if (c <= n) {
assert(ctx->v.size() < n);
ctx->v.emplace_back(i, std::move(t));
ctx->setPartialResult(i, std::move(t));
if (c == n) {
ctx->p.setTry(Try<V>(std::move(ctx->v)));
ctx->complete();
}
}
});
......
......@@ -593,6 +593,42 @@ TEST(Collect, collectN) {
EXPECT_TRUE(flag);
}
TEST(Collect, collectNParallel) {
std::vector<Promise<Unit>> ps(100);
std::vector<Future<Unit>> futures;
for (auto& p : ps) {
futures.push_back(p.getFuture());
}
bool flag = false;
size_t n = 90;
collectN(futures, n).then([&](std::vector<std::pair<size_t, Try<Unit>>> v) {
flag = true;
EXPECT_EQ(n, v.size());
for (auto& tt : v) {
EXPECT_TRUE(tt.second.hasValue());
}
});
std::vector<std::thread> ts;
boost::barrier barrier(ps.size() + 1);
for (size_t i = 0; i < ps.size(); i++) {
ts.emplace_back([&ps, &barrier, i]() {
barrier.wait();
ps[i].setValue();
});
}
barrier.wait();
for (size_t i = 0; i < ps.size(); i++) {
ts[i].join();
}
EXPECT_TRUE(flag);
}
/// Ensure that we can compile collectAll/Any with folly::small_vector
TEST(Collect, smallVector) {
static_assert(!FOLLY_IS_TRIVIALLY_COPYABLE(Future<Unit>),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment