Commit 7e6a92c1 authored by Matthieu Martin's avatar Matthieu Martin Committed by Facebook GitHub Bot

Async interface for addTaskRemoteFuture

Summary:
This provides a set of three functions under folly::fibers::async, to schedule then wait for work on a remote fiber manager.
The functions cover those 3 (legitimate) needs:

- Blocking current thread: `T executeOnRemoteFiberAndWait`
- Blocking current fiber: `Async<T> executeOnRemoteFiber`
- Awaitable (barebone version for now): `Future<T> addFiberRemoteFuture`

See code docblock for more details

Reviewed By: pranavtbhat

Differential Revision: D22252598

fbshipit-source-id: add9c23f5bdacb233e47f30b2639d376027167e3
parent 5b8e0513
...@@ -96,6 +96,16 @@ auto executeOnNewFiber(F&& func) { ...@@ -96,6 +96,16 @@ auto executeOnNewFiber(F&& func) {
addFiberFuture(std::forward<F>(func), FiberManager::getFiberManager())); addFiberFuture(std::forward<F>(func), FiberManager::getFiberManager()));
} }
/*
* Run an async-annotated functor on a new fiber on remote thread,
* blocking the current fiber.
*/
template <typename F>
auto executeOnRemoteFiber(F&& func, FiberManager& fm) {
DCHECK(detail::onFiber());
return futureWait(addFiberRemoteFuture(std::forward<F>(func), fm));
}
} // namespace async } // namespace async
} // namespace fibers } // namespace fibers
} // namespace folly } // namespace folly
......
...@@ -62,6 +62,21 @@ auto addFiberFuture(F&& func, FiberManager& fm) { ...@@ -62,6 +62,21 @@ auto addFiberFuture(F&& func, FiberManager& fm) {
[func = std::forward<F>(func)]() mutable { return init_await(func()); }); [func = std::forward<F>(func)]() mutable { return init_await(func()); });
} }
/**
* Schedule an async-annotated functor to run on a remote thread's
* fiber manager.
* Returns a future for the result.
*
* In most cases, prefer those options instead:
* - executeOnRemoteFiber: wait on remote fiber from (local) fiber context
* - executeOnRemoteFiberAndWait: wait on remote fiber from (local) main context
*/
template <typename F>
auto addFiberRemoteFuture(F&& func, FiberManager& fm) {
return fm.addTaskRemoteFuture(
[func = std::forward<F>(func)]() mutable { return init_await(func()); });
}
} // namespace async } // namespace async
} // namespace fibers } // namespace fibers
} // namespace folly } // namespace folly
...@@ -76,6 +76,22 @@ auto executeOnFiberAndWait( ...@@ -76,6 +76,22 @@ auto executeOnFiberAndWait(
std::forward<F>(func), evb, getFiberManager(evb, opts)); std::forward<F>(func), evb, getFiberManager(evb, opts));
} }
/**
* Run an async-annotated functor `func`, to completion on a remote
* fiber manager, blocking the current thread.
*
* Should not be called from fiber context.
*
* This is similar to the above functions (sugar to initialize
* Async annotated fiber context) but handle the case where the
* library uses a dedicated thread pool to run fibers.
*/
template <typename F>
auto executeOnRemoteFiberAndWait(F&& func, FiberManager& fm) {
DCHECK(!detail::onFiber());
return addFiberRemoteFuture(std::forward<F>(func), fm).get();
}
} // namespace async } // namespace async
} // namespace fibers } // namespace fibers
} // namespace folly } // namespace folly
...@@ -345,3 +345,39 @@ TEST(AsyncTest, collect) { ...@@ -345,3 +345,39 @@ TEST(AsyncTest, collect) {
return {}; return {};
}); });
} }
TEST(FiberManager, remoteFiberManager) {
folly::EventBase evb;
auto& fm = getFiberManager(evb);
bool test1Finished = false;
bool test2Finished = false;
std::atomic<bool> remoteFinished = false;
std::thread remote([&]() {
async::executeOnRemoteFiberAndWait(
[&]() -> async::Async<void> {
test1Finished = true;
return {};
},
fm);
async::executeOnFiberAndWait([&] {
return async::executeOnRemoteFiber(
[&]() -> async::Async<void> {
test2Finished = true;
return {};
},
fm);
});
remoteFinished = true;
});
while (!remoteFinished) {
evb.loopOnce();
}
remote.join();
EXPECT_TRUE(test1Finished);
EXPECT_TRUE(test2Finished);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment