Commit 9103c1fe authored by Lee Howes's avatar Lee Howes Committed by facebook-github-bot-4

Adding addTaskFuture and addTaskRemoteFuture to FiberManager.

Summary: Adds functionality to fibermanager to add tasks locally and remotely that will return futures. As a side effect, also wraps the function in addTaskRemote in a move wrapper so that it behaves correctly with move-only types such as promises.

Reviewed By: andriigrynenko, yfeldblum

Differential Revision: D2892898

fb-gh-sync-id: 1666c103db35d9c149c36f8b8c3058cd6e0465fc
parent fc353ba6
......@@ -29,6 +29,7 @@
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/LoopController.h>
#include <folly/experimental/fibers/Promise.h>
#include <folly/futures/Promise.h>
#include <folly/futures/Try.h>
namespace folly { namespace fibers {
......@@ -258,24 +259,57 @@ void FiberManager::addTask(F&& func) {
ensureLoopScheduled();
}
template <typename F>
auto FiberManager::addTaskFuture(F&& func)
-> folly::Future<typename std::result_of<F()>::type> {
using T = typename std::result_of<F()>::type;
folly::Promise<T> p;
auto f = p.getFuture();
addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
[p = std::move(p)](folly::Try<T> && t) mutable {
p.setTry(std::move(t));
});
return f;
}
template <typename F>
void FiberManager::addTaskRemote(F&& func) {
// addTaskRemote indirectly requires wrapping the function in a
// std::function, which must be copyable. As move-only lambdas may be
// passed in we wrap it first in a move wrapper and then capture the wrapped
// version.
auto functionWrapper = [f = folly::makeMoveWrapper(
std::forward<F>(func))]() mutable {
return (*f)();
};
auto task = [&]() {
auto currentFm = getFiberManagerUnsafe();
if (currentFm &&
currentFm->currentFiber_ &&
currentFm->localType_ == localType_) {
return folly::make_unique<RemoteTask>(
std::forward<F>(func),
currentFm->currentFiber_->localData_);
std::move(functionWrapper), currentFm->currentFiber_->localData_);
}
return folly::make_unique<RemoteTask>(std::forward<F>(func));
return folly::make_unique<RemoteTask>(std::move(functionWrapper));
}();
auto insertHead =
[&]() { return remoteTaskQueue_.insertHead(task.release()); };
loopController_->scheduleThreadSafe(std::ref(insertHead));
}
template <typename F>
auto FiberManager::addTaskRemoteFuture(F&& func)
-> folly::Future<typename std::result_of<F()>::type> {
folly::Promise<typename std::result_of<F()>::type> p;
auto f = p.getFuture();
addTaskRemote(
[ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
auto t = folly::makeTryWith(std::forward<F>(func));
runInMainContext([&]() { p.setTry(std::move(t)); });
});
return f;
}
template <typename X>
struct IsRvalueRefTry { static const bool value = false; };
template <typename T>
......
......@@ -20,6 +20,7 @@
#include <queue>
#include <thread>
#include <typeindex>
#include <type_traits>
#include <unordered_set>
#include <vector>
......@@ -37,7 +38,12 @@
#include <folly/experimental/fibers/TimeoutController.h>
#include <folly/experimental/fibers/traits.h>
namespace folly { namespace fibers {
namespace folly {
template <class T>
class Future;
namespace fibers {
class Baton;
class Fiber;
......@@ -176,6 +182,16 @@ class FiberManager : public ::folly::Executor {
template <typename F>
void addTask(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskFuture(F&& func)
-> folly::Future<typename std::result_of<F()>::type>;
/**
* Add a new task to be executed. Safe to call from other threads.
*
......@@ -185,6 +201,17 @@ class FiberManager : public ::folly::Executor {
template <typename F>
void addTaskRemote(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskRemoteFuture(F&& func)
-> folly::Future<typename std::result_of<F()>::type>;
// Executor interface calls addTaskRemote
void add(std::function<void()> f) {
addTaskRemote(std::move(f));
......
......@@ -21,6 +21,7 @@
#include <folly/Benchmark.h>
#include <folly/Memory.h>
#include <folly/futures/Future.h>
#include <folly/experimental/fibers/AddTasks.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
......@@ -1528,6 +1529,23 @@ TEST(FiberManager, batonWaitTimeoutMany) {
evb.loopForever();
}
TEST(FiberManager, remoteFutureTest) {
FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
int testValue1 = 5;
int testValue2 = 7;
auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
loopController.loop([&]() { loopController.stop(); });
auto v1 = f1.get();
auto v2 = f2.get();
EXPECT_EQ(v1, testValue1);
EXPECT_EQ(v2, testValue2);
}
static size_t sNumAwaits;
void runBenchmark(size_t numAwaits, size_t toSend) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment