Commit f51b0466 authored by James Sedgwick's avatar James Sedgwick Committed by Viswanath Sivakumar

RFC: FutureDAG

Summary:
See task. Set up a DAG of Future-returning tasks (optionally with executors) and eventually kick them off.
One big question is ownership. Currently the user would be responsible for ensuring that the FutureDAG outlives its own completion. This requirement could go away with shared_from_this magic maybe

Test Plan: unit. I didn't bother to test via() functionality because it's too much work for now - the functionality is trivial. Same for "true-async" dags...

Reviewed By: hans@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2073481

Signature: t1:2073481:1431961131:82a8898502d5308f6ab3cc8cc5b84b016d3998fe
parent 4ecf6e3f
......@@ -101,6 +101,7 @@ nobase_follyinclude_HEADERS = \
experimental/fibers/WhenN.h \
experimental/fibers/WhenN-inl.h \
experimental/FunctionScheduler.h \
experimental/FutureDAG.h \
experimental/io/FsUtil.h \
experimental/JSONSchema.h \
experimental/Select64.h \
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
namespace folly {
class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
public:
static std::shared_ptr<FutureDAG> create() {
return std::shared_ptr<FutureDAG>(new FutureDAG());
}
typedef size_t Handle;
typedef std::function<Future<void>()> FutureFunc;
Handle add(FutureFunc func, Executor* executor = nullptr) {
nodes.emplace_back(std::move(func), executor);
return nodes.size() - 1;
}
void dependency(Handle a, Handle b) {
nodes[b].dependencies.push_back(a);
nodes[a].hasDependents = true;
}
Future<void> go() {
if (hasCycle()) {
return makeFuture<void>(std::runtime_error("Cycle in FutureDAG graph"));
}
std::vector<Handle> rootNodes;
std::vector<Handle> leafNodes;
for (Handle handle = 0; handle < nodes.size(); handle++) {
if (nodes[handle].dependencies.empty()) {
rootNodes.push_back(handle);
}
if (!nodes[handle].hasDependents) {
leafNodes.push_back(handle);
}
}
auto sinkHandle = add([] { return Future<void>(); });
for (auto handle : leafNodes) {
dependency(handle, sinkHandle);
}
auto sourceHandle = add(nullptr);
for (auto handle : rootNodes) {
dependency(sourceHandle, handle);
}
for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
std::vector<Future<void>> dependencies;
for (auto depHandle : nodes[handle].dependencies) {
dependencies.push_back(nodes[depHandle].promise.getFuture());
}
collect(dependencies)
.via(nodes[handle].executor)
.then([this, handle] {
nodes[handle].func()
.then([this, handle] (Try<void>&& t) {
nodes[handle].promise.setTry(std::move(t));
});
})
.onError([this, handle] (exception_wrapper ew) {
nodes[handle].promise.setException(std::move(ew));
});
}
nodes[sourceHandle].promise.setValue();
auto that = shared_from_this();
return nodes[sinkHandle].promise.getFuture().ensure([that]{});
}
private:
FutureDAG() = default;
bool hasCycle() {
// Perform a modified topological sort to detect cycles
std::vector<std::vector<Handle>> dependencies;
for (auto& node : nodes) {
dependencies.push_back(node.dependencies);
}
std::vector<size_t> dependents(nodes.size());
for (auto& dependencyEdges : dependencies) {
for (auto handle : dependencyEdges) {
dependents[handle]++;
}
}
std::vector<Handle> handles;
for (Handle handle = 0; handle < nodes.size(); handle++) {
if (!nodes[handle].hasDependents) {
handles.push_back(handle);
}
}
while (!handles.empty()) {
auto handle = handles.back();
handles.pop_back();
while (!dependencies[handle].empty()) {
auto dependency = dependencies[handle].back();
dependencies[handle].pop_back();
if (--dependents[dependency] == 0) {
handles.push_back(dependency);
}
}
}
for (auto& dependencyEdges : dependencies) {
if (!dependencyEdges.empty()) {
return true;
}
}
return false;
}
struct Node {
Node(FutureFunc&& funcArg, Executor* executorArg) :
func(std::move(funcArg)), executor(executorArg) {}
FutureFunc func{nullptr};
Executor* executor{nullptr};
SharedPromise<void> promise;
std::vector<Handle> dependencies;
bool hasDependents{false};
bool visited{false};
};
std::vector<Node> nodes;
};
} // folly
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/FutureDAG.h>
#include <gtest/gtest.h>
#include <boost/thread/barrier.hpp>
using namespace folly;
struct FutureDAGTest : public testing::Test {
typedef FutureDAG::Handle Handle;
Handle add() {
auto node = folly::make_unique<TestNode>(this);
auto handle = node->handle;
nodes.emplace(handle, std::move(node));
return handle;
}
void dependency(Handle a, Handle b) {
nodes.at(b)->dependencies.push_back(a);
dag->dependency(a, b);
}
void checkOrder() {
EXPECT_EQ(nodes.size(), order.size());
for (auto& kv : nodes) {
auto handle = kv.first;
auto& node = kv.second;
auto it = order.begin();
while (*it != handle) {
it++;
}
for (auto dep : node->dependencies) {
EXPECT_TRUE(std::find(it, order.end(), dep) == order.end());
}
}
}
struct TestNode {
explicit TestNode(FutureDAGTest* test) {
func = [this, test] {
test->order.push_back(handle);
return Future<void>();
};
handle = test->dag->add(func);
}
FutureDAG::FutureFunc func;
Handle handle;
std::vector<Handle> dependencies;
};
std::shared_ptr<FutureDAG> dag = FutureDAG::create();
std::map<Handle, std::unique_ptr<TestNode>> nodes;
std::vector<Handle> order;
};
TEST_F(FutureDAGTest, SingleNode) {
add();
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
TEST_F(FutureDAGTest, FanOut) {
auto h1 = add();
auto h2 = add();
auto h3 = add();
dependency(h1, h2);
dependency(h1, h3);
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
TEST_F(FutureDAGTest, FanIn) {
auto h1 = add();
auto h2 = add();
auto h3 = add();
dependency(h1, h3);
dependency(h2, h3);
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
TEST_F(FutureDAGTest, FanOutFanIn) {
auto h1 = add();
auto h2 = add();
auto h3 = add();
auto h4 = add();
dependency(h1, h3);
dependency(h1, h2);
dependency(h2, h4);
dependency(h3, h4);
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
TEST_F(FutureDAGTest, Complex) {
auto A = add();
auto B = add();
auto C = add();
auto D = add();
auto E = add();
auto F = add();
auto G = add();
auto H = add();
auto I = add();
auto J = add();
auto K = add();
auto L = add();
auto M = add();
auto N = add();
dependency(A, B);
dependency(A, C);
dependency(A, D);
dependency(A, J);
dependency(C, H);
dependency(D, E);
dependency(E, F);
dependency(E, G);
dependency(F, H);
dependency(G, H);
dependency(H, I);
dependency(J, K);
dependency(K, L);
dependency(K, M);
dependency(L, N);
dependency(I, N);
ASSERT_NO_THROW(dag->go().get());
checkOrder();
}
FutureDAG::FutureFunc makeFutureFunc = []{
return makeFuture();
};
FutureDAG::FutureFunc throwFunc = []{
return makeFuture<void>(std::runtime_error("oops"));
};
TEST_F(FutureDAGTest, ThrowBegin) {
auto h1 = dag->add(throwFunc);
auto h2 = dag->add(makeFutureFunc);
dag->dependency(h1, h2);
EXPECT_THROW(dag->go().get(), std::runtime_error);
}
TEST_F(FutureDAGTest, ThrowEnd) {
auto h1 = dag->add(makeFutureFunc);
auto h2 = dag->add(throwFunc);
dag->dependency(h1, h2);
EXPECT_THROW(dag->go().get(), std::runtime_error);
}
TEST_F(FutureDAGTest, Cycle1) {
auto h1 = add();
dependency(h1, h1);
EXPECT_THROW(dag->go().get(), std::runtime_error);
}
TEST_F(FutureDAGTest, Cycle2) {
auto h1 = add();
auto h2 = add();
dependency(h1, h2);
dependency(h2, h1);
EXPECT_THROW(dag->go().get(), std::runtime_error);
}
TEST_F(FutureDAGTest, Cycle3) {
auto h1 = add();
auto h2 = add();
auto h3 = add();
dependency(h1, h2);
dependency(h2, h3);
dependency(h3, h1);
EXPECT_THROW(dag->go().get(), std::runtime_error);
}
TEST_F(FutureDAGTest, DestroyBeforeComplete) {
auto barrier = std::make_shared<boost::barrier>(2);
Future<void> f;
{
auto dag = FutureDAG::create();
auto h1 = dag->add([barrier] {
auto p = std::make_shared<Promise<void>>();
std::thread t([p, barrier]{
barrier->wait();
p->setValue();
});
t.detach();
return p->getFuture();
});
auto h2 = dag->add(makeFutureFunc);
dag->dependency(h1, h2);
f = dag->go();
}
barrier->wait();
ASSERT_NO_THROW(f.get());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment