Commit 44ecd1ef authored by Marc Celani's avatar Marc Celani Committed by Dave Watson

Make it easy to wrap pre-existing cob-style async apis

Summary:
Tao neesd a way to wrap its c-style async apis with Later. Although my comments suggest that you can do this, it turns out I never got around to implementing it. Well, here is the implementation.

Basically, we supply the callback to the pre-existing api, and that callback will fulfill a promise that is used internally within Later. This is thread safe because the async call is not made until the starter is fired, and we can use the future immediately for chaining then() calls.

Test Plan: unit test

Reviewed By: hannesr@fb.com

FB internal diff: D1197721
parent 649cb97d
......@@ -68,6 +68,18 @@ Later<T>::Later(U&& input) {
});
}
template <class T>
template <class U, class Unused, class Unused2>
Later<T>::Later(std::function<void(std::function<void(U&&)>&&)>&& fn) {
folly::MoveWrapper<Promise<U>> promise;
future_ = promise->getFuture();
starter_.getFuture().then([=](Try<void>&& t) mutable {
fn([=](U&& output) mutable {
promise->setValue(std::move(output));
});
});
}
template <class T>
template <class F>
typename std::enable_if<
......
......@@ -32,7 +32,8 @@ template <typename T> struct isLater;
* threadsafe manner.
*
* The interface to add additional work is the same as future: a then() method
* that can take either a type T, a Future<T>, or a Later<T>
* that takes a function that can return either a type T, a Future<T>, or a
* Later<T>
*
* Thread transitions are done by using executors and calling the via() method.
*
......@@ -62,16 +63,44 @@ class Later {
public:
typedef T value_type;
/*
* This default constructor is used to build an asynchronous workflow that
* takes no input.
*/
template <class U = void,
class = typename std::enable_if<std::is_void<U>::value>::type,
class = typename std::enable_if<std::is_same<T, U>::value>::type>
Later();
/*
* This constructor is used to build an asynchronous workflow that takes a
* value as input, and that value is passed in.
*/
template <class U,
class = typename std::enable_if<!std::is_void<U>::value>::type,
class = typename std::enable_if<std::is_same<T, U>::value>::type>
explicit Later(U&& input);
/*
* This constructor is used to wrap a pre-existing cob-style asynchronous api
* so that it can be used in wangle in a threadsafe manner. wangle provides
* the callback to this pre-existing api, and this callback will fulfill a
* promise so as to incorporate this api into the workflow.
*
* Example usage:
*
* // This adds two ints asynchronously. cob is called in another thread.
* void addAsync(int a, int b, std::function<void(int&&)>&& cob);
*
* Later<int> asyncWrapper([=](std::function<void(int&&)>&& fn) {
* addAsync(1, 2, std::move(fn));
* });
*/
template <class U,
class = typename std::enable_if<!std::is_void<U>::value>::type,
class = typename std::enable_if<std::is_same<T, U>::value>::type>
explicit Later(std::function<void(std::function<void(U&&)>&&)>&& fn);
/*
* then() adds additional work to the end of the workflow. If the lambda
* provided to then() returns a future, that future must be fulfilled in the
......
......@@ -54,6 +54,12 @@ struct LaterFixture : public testing::Test {
t.join();
}
void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
eastExecutor->add([=]() {
cob(a + b);
});
}
Later<void> later;
std::shared_ptr<ManualExecutor> westExecutor;
std::shared_ptr<ManualExecutor> eastExecutor;
......@@ -112,6 +118,25 @@ TEST_F(LaterFixture, thread_hops) {
EXPECT_EQ(future.value(), 1);
}
TEST_F(LaterFixture, wrapping_preexisting_async_modules) {
auto westThreadId = std::this_thread::get_id();
std::function<void(std::function<void(int&&)>&&)> wrapper =
[=](std::function<void(int&&)>&& fn) {
addAsync(2, 2, std::move(fn));
};
auto future = Later<int>(std::move(wrapper))
.via(westExecutor.get())
.then([=](Try<int>&& t) {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
})
.launch();
while (!future.isReady()) {
waiter->makeProgress();
}
EXPECT_EQ(future.value(), 4);
}
TEST_F(LaterFixture, chain_laters) {
auto westThreadId = std::this_thread::get_id();
auto future = later.via(eastExecutor.get()).then([=](Try<void>&& t) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment