Commit 8e5c2b92 authored by Adam Norton's avatar Adam Norton Committed by Facebook Github Bot

Avoid executor re-queue in collect[All]SemiFuture

Summary:
This is a performance optimization to avoid an executor re-queue for the collect[All] continuations which just move the result.

The biggest problem with this now is that these futures are often on a thread pool backed with a blocking MPMCQueue which means the add can potentially deadlock. This diff is not meant as a true fix to the problem though; just an improvement.

Reviewed By: yfeldblum

Differential Revision: D14754908

fbshipit-source-id: e533bfa6b9d2d820d50422b612c1489b560ec9d5
parent 4fa5101b
...@@ -1548,7 +1548,8 @@ collectAllSemiFuture(Fs&&... fs) { ...@@ -1548,7 +1548,8 @@ collectAllSemiFuture(Fs&&... fs) {
auto ctx = std::make_shared<Context>(); auto ctx = std::make_shared<Context>();
futures::detail::foreach( futures::detail::foreach(
[&](auto i, auto&& f) { [&](auto i, auto&& f) {
f.setCallback_([i, ctx](auto&& t) { auto g = std::move(f).via(&InlineExecutor::instance());
g.setCallback_([i, ctx](auto&& t) {
std::get<i.value>(ctx->results) = std::move(t); std::get<i.value>(ctx->results) = std::move(t);
}); });
}, },
...@@ -1597,8 +1598,8 @@ collectAllSemiFuture(InputIterator first, InputIterator last) { ...@@ -1597,8 +1598,8 @@ collectAllSemiFuture(InputIterator first, InputIterator last) {
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last))); auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
for (size_t i = 0; first != last; ++first, ++i) { for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_( auto f = std::move(*first).via(&InlineExecutor::instance());
[i, ctx](Try<T>&& t) { ctx->results[i] = std::move(t); }); f.setCallback_([i, ctx](Try<T>&& t) { ctx->results[i] = std::move(t); });
} }
auto future = ctx->p.getSemiFuture(); auto future = ctx->p.getSemiFuture();
...@@ -1657,7 +1658,8 @@ collectSemiFuture(InputIterator first, InputIterator last) { ...@@ -1657,7 +1658,8 @@ collectSemiFuture(InputIterator first, InputIterator last) {
auto ctx = std::make_shared<Context>(std::distance(first, last)); auto ctx = std::make_shared<Context>(std::distance(first, last));
for (size_t i = 0; first != last; ++first, ++i) { for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) { auto f = std::move(*first).via(&InlineExecutor::instance());
f.setCallback_([i, ctx](Try<T>&& t) {
if (t.hasException()) { if (t.hasException()) {
if (!ctx->threw.exchange(true, std::memory_order_relaxed)) { if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
ctx->p.setException(std::move(t.exception())); ctx->p.setException(std::move(t.exception()));
...@@ -1712,7 +1714,8 @@ collectSemiFuture(Fs&&... fs) { ...@@ -1712,7 +1714,8 @@ collectSemiFuture(Fs&&... fs) {
auto ctx = std::make_shared<Context>(); auto ctx = std::make_shared<Context>();
futures::detail::foreach( futures::detail::foreach(
[&](auto i, auto&& f) { [&](auto i, auto&& f) {
f.setCallback_([i, ctx](auto&& t) { auto g = std::move(f).via(&InlineExecutor::instance());
g.setCallback_([i, ctx](auto&& t) {
if (t.hasException()) { if (t.hasException()) {
if (!ctx->threw.exchange(true, std::memory_order_relaxed)) { if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
ctx->p.setException(std::move(t.exception())); ctx->p.setException(std::move(t.exception()));
......
...@@ -30,6 +30,13 @@ static eggs_t eggs("eggs"); ...@@ -30,6 +30,13 @@ static eggs_t eggs("eggs");
auto rng = std::mt19937(folly::randomNumberSeed()); auto rng = std::mt19937(folly::randomNumberSeed());
class NoAddExecutor : public folly::Executor {
public:
void add(Func) override {
throw eggs;
}
};
TEST(Collect, collectAll) { TEST(Collect, collectAll) {
// returns a vector variant // returns a vector variant
{ {
...@@ -779,6 +786,43 @@ TEST(Collect, collectAllNone) { ...@@ -779,6 +786,43 @@ TEST(Collect, collectAllNone) {
EXPECT_TRUE(f.isReady()); EXPECT_TRUE(f.isReady());
} }
TEST(Collect, collectNoRequeue) {
NoAddExecutor executor;
std::vector<Future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.emplace_back(makeFuture(i).via(&executor));
}
EXPECT_NO_THROW(collect(futures).get());
}
TEST(Collect, collectVariadicNoRequeue) {
NoAddExecutor executor;
auto f1 = makeFuture(0).via(&executor);
auto f2 = makeFuture(1).via(&executor);
EXPECT_NO_THROW(collect(f1, f2).get());
}
TEST(Collect, collectAllNoRequeue) {
NoAddExecutor executor;
std::vector<Future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.emplace_back(makeFuture(i).via(&executor));
}
auto results = collectAll(futures).get();
for (auto& t : results) {
EXPECT_FALSE(t.hasException());
}
}
TEST(Collect, collectAllVariadicNoRequeue) {
NoAddExecutor executor;
auto f1 = makeFuture(0).via(&executor);
auto f2 = makeFuture(1).via(&executor);
auto results = collectAll(f1, f2).get();
EXPECT_FALSE(std::get<0>(results).hasException());
EXPECT_FALSE(std::get<1>(results).hasException());
}
TEST(Collect, noDefaultConstructor) { TEST(Collect, noDefaultConstructor) {
struct A { struct A {
explicit A(size_t /* x */) {} explicit A(size_t /* x */) {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment