Commit ea91c9bc authored by Aaryaman Sagar's avatar Aaryaman Sagar Committed by Facebook GitHub Bot

Have collect() handle the case of a not-ready future

Summary:
If one of the input futures is off the end of a folly::Executor::weakRef()
executor, then there is a chance that it may never complete with a value or an
ecception.  In this case, collect() would crash because it assumes that the
folly::Try instances for all input futures have either a value or an exception.

Fix that case by injecting a BrokenPromise exception for the case where a future
never has an exception or a value.

Reviewed By: yfeldblum

Differential Revision: D26989091

fbshipit-source-id: b810fe4d5d071233da1f453b3759991e057d78c6
parent 74f3c043
......@@ -25,12 +25,14 @@
#include <folly/Optional.h>
#include <folly/Traits.h>
#include <folly/container/Foreach.h>
#include <folly/detail/AsyncTrace.h>
#include <folly/executors/ExecutorWithPriority.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/futures/detail/Core.h>
#include <folly/lang/Pretty.h>
namespace folly {
......@@ -1451,11 +1453,18 @@ collect(InputIterator first, InputIterator last) {
~Context() {
if (!threw.load(std::memory_order_relaxed)) {
// map Optional<T> -> T
std::transform(
result.begin(),
result.end(),
std::back_inserter(finalResult),
[](Optional<T>& o) { return std::move(o.value()); });
for (auto& value : result) {
// if any of the input futures were off the end of a weakRef(), the
// logic added in setCallback_ will not execute as an executor
// weakRef() drops all callbacks added silently without executing them
if (!value.has_value()) {
p.setException(BrokenPromise{pretty_name<std::vector<T>>()});
return;
}
finalResult.push_back(std::move(value.value()));
}
p.setValue(std::move(finalResult));
}
}
......@@ -1509,7 +1518,21 @@ SemiFuture<std::tuple<typename remove_cvref_t<Fs>::value_type...>> collect(
struct Context {
~Context() {
if (!threw.load(std::memory_order_relaxed)) {
p.setValue(unwrapTryTuple(std::move(results)));
// if any of the input futures were off the end of a weakRef(), the
// logic added in setCallback_ will not execute as an executor
// weakRef() drops all callbacks added silently without executing them
auto brokenPromise = false;
folly::for_each(results, [&](auto& result) {
if (!result.hasValue() && !result.hasException()) {
brokenPromise = true;
}
});
if (brokenPromise) {
p.setException(BrokenPromise{pretty_name<Result>()});
} else {
p.setValue(unwrapTryTuple(std::move(results)));
}
}
}
Promise<Result> p;
......
......@@ -18,11 +18,14 @@
#include <boost/thread/barrier.hpp>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Random.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
#include <folly/small_vector.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
......@@ -988,3 +991,54 @@ TEST(Collect, noDefaultConstructor) {
auto f = collect(std::move(f1), std::move(f2));
}
TEST(Collect, CollectVariadicWithDestroyedWeakRef) {
auto one = std::make_unique<folly::CPUThreadPoolExecutor>(1);
auto two = std::make_unique<folly::CPUThreadPoolExecutor>(1);
auto reachedFirstCallback = folly::Baton<>{};
auto hasExecutorBeenDestroyed = folly::Baton<>{};
auto future = folly::collect(
folly::makeSemiFuture(),
folly::makeSemiFuture()
.via(one.get())
.thenValue([&](auto) {
reachedFirstCallback.post();
hasExecutorBeenDestroyed.wait();
})
.via(two->weakRef())
.thenValue([](auto) {}),
folly::makeSemiFuture());
reachedFirstCallback.wait();
two.reset();
hasExecutorBeenDestroyed.post();
EXPECT_THROW(std::move(future).get(), folly::BrokenPromise);
}
TEST(Collect, CollectRangeWithDestroyedWeakRef) {
auto one = std::make_unique<folly::CPUThreadPoolExecutor>(1);
auto two = std::make_unique<folly::CPUThreadPoolExecutor>(1);
auto reachedFirstCallback = folly::Baton<>{};
auto hasExecutorBeenDestroyed = folly::Baton<>{};
auto futures = std::vector<folly::SemiFuture<folly::Unit>>{};
futures.push_back(folly::makeSemiFuture());
futures.push_back(folly::makeSemiFuture()
.via(one.get())
.thenValue([&](auto) {
reachedFirstCallback.post();
hasExecutorBeenDestroyed.wait();
})
.via(two->weakRef())
.thenValue([](auto) {}));
futures.push_back(folly::makeSemiFuture());
auto future = folly::collect(futures.begin(), futures.end());
reachedFirstCallback.wait();
two.reset();
hasExecutorBeenDestroyed.post();
EXPECT_THROW(std::move(future).get(), folly::BrokenPromise);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment