Commit c93226d1 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot 1

Resolve fibers-futures dependency

Reviewed By: mzlee

Differential Revision: D3780312

fbshipit-source-id: c42c8f0a06b82520ee1b46f105a2a85ad524c442
parent 2dedc14b
......@@ -514,6 +514,8 @@ nobase_follyinclude_HEADERS += \
fibers/Fiber-inl.h \
fibers/FiberManager.h \
fibers/FiberManager-inl.h \
fibers/FiberManagerInternal.h \
fibers/FiberManagerInternal-inl.h \
fibers/FiberManagerMap.h \
fibers/ForEach.h \
fibers/ForEach-inl.h \
......
......@@ -19,7 +19,7 @@
#include <vector>
#include <folly/Optional.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/Promise.h>
#include <folly/Try.h>
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
#include <folly/fibers/Fiber.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
......
......@@ -18,7 +18,7 @@
#include <chrono>
#include <folly/detail/MemoryIdler.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/portability/Asm.h>
namespace folly {
......
......@@ -15,7 +15,7 @@
*/
#pragma once
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/LoopController.h>
#include <folly/io/async/EventBase.h>
#include <atomic>
......
......@@ -23,7 +23,7 @@
#include <folly/Likely.h>
#include <folly/Portability.h>
#include <folly/fibers/BoostContextCompatibility.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/portability/SysSyscall.h>
#include <folly/portability/Unistd.h>
......
......@@ -15,277 +15,11 @@
*/
#pragma once
#include <cassert>
#include <folly/CPortability.h>
#include <folly/Memory.h>
#include <folly/Optional.h>
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
#ifdef __APPLE__
#include <folly/ThreadLocal.h>
#endif
#include <folly/fibers/Baton.h>
#include <folly/fibers/Fiber.h>
#include <folly/fibers/LoopController.h>
#include <folly/fibers/Promise.h>
#include <folly/futures/Promise.h>
#include <folly/Try.h>
namespace folly {
namespace fibers {
namespace {
inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
#ifdef FOLLY_SANITIZE_ADDRESS
/* ASAN needs a lot of extra stack space.
16x is a conservative estimate, 8x also worked with tests
where it mattered. Note that overallocating here does not necessarily
increase RSS, since unused memory is pretty much free. */
opts.stackSize *= 16;
#endif
return opts;
}
} // anonymous
inline void FiberManager::ensureLoopScheduled() {
if (isLoopScheduled_) {
return;
}
isLoopScheduled_ = true;
loopController_->schedule();
}
inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
#ifdef FOLLY_SANITIZE_ADDRESS
registerFiberActivationWithAsan(fiber);
#endif
activeFiber_ = fiber;
return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
}
inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
DCHECK_EQ(activeFiber_, fiber);
#ifdef FOLLY_SANITIZE_ADDRESS
registerFiberDeactivationWithAsan(fiber);
#endif
activeFiber_ = nullptr;
return jumpContext(&fiber->fcontext_, &mainContext_, 0);
}
inline void FiberManager::runReadyFiber(Fiber* fiber) {
SCOPE_EXIT {
assert(currentFiber_ == nullptr);
assert(activeFiber_ == nullptr);
};
assert(
fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN);
currentFiber_ = fiber;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
if (observer_) {
observer_->starting(reinterpret_cast<uintptr_t>(fiber));
}
while (fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN) {
activateFiber(fiber);
if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
try {
immediateFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running immediateFunc_");
}
immediateFunc_ = nullptr;
fiber->state_ = Fiber::READY_TO_RUN;
}
}
if (fiber->state_ == Fiber::AWAITING) {
awaitFunc_(*fiber);
awaitFunc_ = nullptr;
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
} else if (fiber->state_ == Fiber::INVALID) {
assert(fibersActive_ > 0);
--fibersActive_;
// Making sure that task functor is deleted once task is complete.
// NOTE: we must do it on main context, as the fiber is not
// running at this point.
fiber->func_ = nullptr;
fiber->resultFunc_ = nullptr;
if (fiber->finallyFunc_) {
try {
fiber->finallyFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running finallyFunc_");
}
fiber->finallyFunc_ = nullptr;
}
// Make sure LocalData is not accessible from its destructor
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
fiber->localData_.reset();
fiber->rcontext_.reset();
if (fibersPoolSize_ < options_.maxFibersPoolSize ||
options_.fibersPoolResizePeriodMs > 0) {
fibersPool_.push_front(*fiber);
++fibersPoolSize_;
} else {
delete fiber;
assert(fibersAllocated_ > 0);
--fibersAllocated_;
}
} else if (fiber->state_ == Fiber::YIELDED) {
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
fiber->state_ = Fiber::READY_TO_RUN;
yieldedFibers_.push_back(*fiber);
}
}
inline bool FiberManager::loopUntilNoReady() {
#ifndef _WIN32
if (UNLIKELY(!alternateSignalStackRegistered_)) {
registerAlternateSignalStack();
}
#endif
// Support nested FiberManagers
auto originalFiberManager = this;
std::swap(currentFiberManager_, originalFiberManager);
SCOPE_EXIT {
isLoopScheduled_ = false;
if (!readyFibers_.empty()) {
ensureLoopScheduled();
}
std::swap(currentFiberManager_, originalFiberManager);
CHECK_EQ(this, originalFiberManager);
};
bool hadRemoteFiber = true;
while (hadRemoteFiber) {
hadRemoteFiber = false;
while (!readyFibers_.empty()) {
auto& fiber = readyFibers_.front();
readyFibers_.pop_front();
runReadyFiber(&fiber);
}
remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
runReadyFiber(fiber);
hadRemoteFiber = true;
});
remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
std::unique_ptr<RemoteTask> task(taskPtr);
auto fiber = getFiber();
if (task->localData) {
fiber->localData_ = *task->localData;
}
fiber->rcontext_ = std::move(task->rcontext);
fiber->setFunction(std::move(task->func));
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
runReadyFiber(fiber);
hadRemoteFiber = true;
});
}
if (observer_) {
for (auto& yielded : yieldedFibers_) {
observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
}
}
readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
return fibersActive_ > 0;
}
// We need this to be in a struct, not inlined in addTask, because clang crashes
// otherwise.
template <typename F>
struct FiberManager::AddTaskHelper {
class Func;
static constexpr bool allocateInBuffer =
sizeof(Func) <= Fiber::kUserBufferSize;
class Func {
public:
Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
void operator()() {
try {
func_();
} catch (...) {
fm_.exceptionCallback_(
std::current_exception(), "running Func functor");
}
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
FiberManager& fm_;
};
};
template <typename F>
void FiberManager::addTask(F&& func) {
typedef AddTaskHelper<F> Helper;
auto fiber = getFiber();
initLocalData(*fiber);
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
} else {
auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
ensureLoopScheduled();
}
template <typename F>
auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
......@@ -302,23 +36,6 @@ auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
return f;
}
template <typename F>
void FiberManager::addTaskRemote(F&& func) {
auto task = [&]() {
auto currentFm = getFiberManagerUnsafe();
if (currentFm && currentFm->currentFiber_ &&
currentFm->localType_ == localType_) {
return folly::make_unique<RemoteTask>(
std::forward<F>(func), currentFm->currentFiber_->localData_);
}
return folly::make_unique<RemoteTask>(std::forward<F>(func));
}();
auto insertHead = [&]() {
return remoteTaskQueue_.insertHead(task.release());
};
loopController_->scheduleThreadSafe(std::ref(insertHead));
}
template <typename F>
auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
......@@ -333,223 +50,5 @@ auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
});
return f;
}
template <typename X>
struct IsRvalueRefTry {
static const bool value = false;
};
template <typename T>
struct IsRvalueRefTry<folly::Try<T>&&> {
static const bool value = true;
};
// We need this to be in a struct, not inlined in addTaskFinally, because clang
// crashes otherwise.
template <typename F, typename G>
struct FiberManager::AddTaskFinallyHelper {
class Func;
typedef typename std::result_of<F()>::type Result;
class Finally {
public:
Finally(G finally, FiberManager& fm)
: finally_(std::move(finally)), fm_(fm) {}
void operator()() {
try {
finally_(std::move(*result_));
} catch (...) {
fm_.exceptionCallback_(
std::current_exception(), "running Finally functor");
}
if (allocateInBuffer) {
this->~Finally();
} else {
delete this;
}
}
private:
friend class Func;
G finally_;
folly::Optional<folly::Try<Result>> result_;
FiberManager& fm_;
};
class Func {
public:
Func(F func, Finally& finally)
: func_(std::move(func)), result_(finally.result_) {}
void operator()() {
result_ = folly::makeTryWith(std::move(func_));
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
folly::Optional<folly::Try<Result>>& result_;
};
static constexpr bool allocateInBuffer =
sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
};
template <typename F, typename G>
void FiberManager::addTaskFinally(F&& func, G&& finally) {
typedef typename std::result_of<F()>::type Result;
static_assert(
IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
"finally(arg): arg must be Try<T>&&");
static_assert(
std::is_convertible<
Result,
typename std::remove_reference<
typename FirstArgOf<G>::type>::type::element_type>::value,
"finally(Try<T>&&): T must be convertible from func()'s return type");
auto fiber = getFiber();
initLocalData(*fiber);
typedef AddTaskFinallyHelper<
typename std::decay<F>::type,
typename std::decay<G>::type>
Helper;
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
auto finallyLoc =
static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
} else {
auto finallyLoc =
new typename Helper::Finally(std::forward<G>(finally), *this);
auto funcLoc =
new typename Helper::Func(std::forward<F>(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
ensureLoopScheduled();
}
template <typename F>
typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
if (UNLIKELY(activeFiber_ == nullptr)) {
return func();
}
typedef typename std::result_of<F()>::type Result;
folly::Try<Result> result;
auto f = [&func, &result]() mutable {
result = folly::makeTryWith(std::forward<F>(func));
};
immediateFunc_ = std::ref(f);
activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
return std::move(result).value();
}
inline FiberManager& FiberManager::getFiberManager() {
assert(currentFiberManager_ != nullptr);
return *currentFiberManager_;
}
inline FiberManager* FiberManager::getFiberManagerUnsafe() {
return currentFiberManager_;
}
inline bool FiberManager::hasActiveFiber() const {
return activeFiber_ != nullptr;
}
inline void FiberManager::yield() {
assert(currentFiberManager_ == this);
assert(activeFiber_ != nullptr);
assert(activeFiber_->state_ == Fiber::RUNNING);
activeFiber_->preempt(Fiber::YIELDED);
}
template <typename T>
T& FiberManager::local() {
if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
return currentFiber_->localData_.get<T>();
}
return localThread<T>();
}
template <typename T>
T& FiberManager::localThread() {
#ifndef __APPLE__
static thread_local T t;
return t;
#else // osx doesn't support thread_local
static ThreadLocal<T> t;
return *t;
#endif
}
inline void FiberManager::initLocalData(Fiber& fiber) {
auto fm = getFiberManagerUnsafe();
if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
fiber.localData_ = fm->currentFiber_->localData_;
}
fiber.rcontext_ = RequestContext::saveContext();
}
template <typename LocalT>
FiberManager::FiberManager(
LocalType<LocalT>,
std::unique_ptr<LoopController> loopController__,
Options options)
: loopController_(std::move(loopController__)),
stackAllocator_(options.useGuardPages),
options_(preprocessOptions(std::move(options))),
exceptionCallback_([](std::exception_ptr eptr, std::string context) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& e) {
LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
<< e.what() << "' was thrown in "
<< "FiberManager with context '" << context << "'";
} catch (...) {
LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
<< "context '" << context << "'";
}
}),
timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
fibersPoolResizer_(*this),
localType_(typeid(LocalT)) {
loopController_->setFiberManager(this);
}
template <typename F>
typename FirstArgOf<F>::type::value_type inline await(F&& func) {
typedef typename FirstArgOf<F>::type::value_type Result;
typedef typename FirstArgOf<F>::type::baton_type BatonT;
return Promise<Result, BatonT>::await(std::forward<F>(func));
}
}
}
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FiberManager.h"
#include "FiberManagerInternal.h"
#include <signal.h>
......
......@@ -15,559 +15,5 @@
*/
#pragma once
#include <functional>
#include <memory>
#include <queue>
#include <thread>
#include <type_traits>
#include <typeindex>
#include <unordered_set>
#include <vector>
#include <folly/AtomicIntrusiveLinkedList.h>
#include <folly/Executor.h>
#include <folly/IntrusiveList.h>
#include <folly/Likely.h>
#include <folly/Try.h>
#include <folly/io/async/Request.h>
#include <folly/experimental/ExecutionObserver.h>
#include <folly/fibers/BoostContextCompatibility.h>
#include <folly/fibers/Fiber.h>
#include <folly/fibers/GuardPageAllocator.h>
#include <folly/fibers/TimeoutController.h>
#include <folly/fibers/traits.h>
namespace folly {
template <class T>
class Future;
namespace fibers {
class Baton;
class Fiber;
class LoopController;
class TimeoutController;
template <typename T>
class LocalType {};
class InlineFunctionRunner {
public:
virtual ~InlineFunctionRunner() {}
/**
* func must be executed inline and only once.
*/
virtual void run(folly::Function<void()> func) = 0;
};
/**
* @class FiberManager
* @brief Single-threaded task execution engine.
*
* FiberManager allows semi-parallel task execution on the same thread. Each
* task can notify FiberManager that it is blocked on something (via await())
* call. This will pause execution of this task and it will be resumed only
* when it is unblocked (via setData()).
*/
class FiberManager : public ::folly::Executor {
public:
struct Options {
static constexpr size_t kDefaultStackSize{16 * 1024};
/**
* Maximum stack size for fibers which will be used for executing all the
* tasks.
*/
size_t stackSize{kDefaultStackSize};
/**
* Record exact amount of stack used.
*
* This is fairly expensive: we fill each newly allocated stack
* with some known value and find the boundary of unused stack
* with linear search every time we surrender the stack back to fibersPool.
* 0 disables stack recording.
*/
size_t recordStackEvery{0};
/**
* Keep at most this many free fibers in the pool.
* This way the total number of fibers in the system is always bounded
* by the number of active fibers + maxFibersPoolSize.
*/
size_t maxFibersPoolSize{1000};
/**
* Protect limited amount of fiber stacks with guard pages.
*/
bool useGuardPages{true};
/**
* Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
* milliseconds. If value is 0, periodic resizing of the fibers pool is
* disabled.
*/
uint32_t fibersPoolResizePeriodMs{0};
constexpr Options() {}
};
using ExceptionCallback =
folly::Function<void(std::exception_ptr, std::string)>;
FiberManager(const FiberManager&) = delete;
FiberManager& operator=(const FiberManager&) = delete;
/**
* Initializes, but doesn't start FiberManager loop
*
* @param loopController
* @param options FiberManager options
*/
explicit FiberManager(
std::unique_ptr<LoopController> loopController,
Options options = Options());
/**
* Initializes, but doesn't start FiberManager loop
*
* @param loopController
* @param options FiberManager options
* @tparam LocalT only local of this type may be stored on fibers.
* Locals of other types will be considered thread-locals.
*/
template <typename LocalT>
FiberManager(
LocalType<LocalT>,
std::unique_ptr<LoopController> loopController,
Options options = Options());
~FiberManager();
/**
* Controller access.
*/
LoopController& loopController();
const LoopController& loopController() const;
/**
* Keeps running ready tasks until the list of ready tasks is empty.
*
* @return True if there are any waiting tasks remaining.
*/
bool loopUntilNoReady();
/**
* @return true if there are outstanding tasks.
*/
bool hasTasks() const;
/**
* Sets exception callback which will be called if any of the tasks throws an
* exception.
*
* @param ec
*/
void setExceptionCallback(ExceptionCallback ec);
/**
* Add a new task to be executed. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTask(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
/**
* Add a new task to be executed. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTaskRemote(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskRemoteFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
// Executor interface calls addTaskRemote
void add(folly::Func f) override {
addTaskRemote(std::move(f));
}
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
void addTaskFinally(F&& func, G&& finally);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type runInMainContext(F&& func);
/**
* Returns a refference to a fiber-local context for given Fiber. Should be
* always called with the same T for each fiber. Fiber-local context is lazily
* default-constructed on first request.
* When new task is scheduled via addTask / addTaskRemote from a fiber its
* fiber-local context is copied into the new fiber.
*/
template <typename T>
T& local();
template <typename T>
static T& localThread();
/**
* @return How many fiber objects (and stacks) has this manager allocated.
*/
size_t fibersAllocated() const;
/**
* @return How many of the allocated fiber objects are currently
* in the free pool.
*/
size_t fibersPoolSize() const;
/**
* return true if running activeFiber_ is not nullptr.
*/
bool hasActiveFiber() const;
/**
* @return The currently running fiber or null if no fiber is executing.
*/
Fiber* currentFiber() const {
return currentFiber_;
}
/**
* @return What was the most observed fiber stack usage (in bytes).
*/
size_t stackHighWatermark() const;
/**
* Yield execution of the currently running fiber. Must only be called from a
* fiber executing on this FiberManager. The calling fiber will be scheduled
* when all other fibers have had a chance to run and the event loop is
* serviced.
*/
void yield();
/**
* Setup fibers execution observation/instrumentation. Fiber locals are
* available to observer.
*
* @param observer Fiber's execution observer.
*/
void setObserver(ExecutionObserver* observer);
/**
* @return Current observer for this FiberManager. Returns nullptr
* if no observer has been set.
*/
ExecutionObserver* getObserver();
/**
* Setup fibers preempt runner.
*/
void setPreemptRunner(InlineFunctionRunner* preemptRunner);
/**
* Returns an estimate of the number of fibers which are waiting to run (does
* not include fibers or tasks scheduled remotely).
*/
size_t runQueueSize() const {
return readyFibers_.size() + yieldedFibers_.size();
}
static FiberManager& getFiberManager();
static FiberManager* getFiberManagerUnsafe();
private:
friend class Baton;
friend class Fiber;
template <typename F>
struct AddTaskHelper;
template <typename F, typename G>
struct AddTaskFinallyHelper;
struct RemoteTask {
template <typename F>
explicit RemoteTask(F&& f)
: func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
template <typename F>
RemoteTask(F&& f, const Fiber::LocalData& localData_)
: func(std::forward<F>(f)),
localData(folly::make_unique<Fiber::LocalData>(localData_)),
rcontext(RequestContext::saveContext()) {}
folly::Function<void()> func;
std::unique_ptr<Fiber::LocalData> localData;
std::shared_ptr<RequestContext> rcontext;
AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
};
intptr_t activateFiber(Fiber* fiber);
intptr_t deactivateFiber(Fiber* fiber);
typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
GlobalFiberTailQueue;
Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
/**
* Same as active fiber, but also set for functions run from fiber on main
* context.
*/
Fiber* currentFiber_{nullptr};
FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
execution */
FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
size_t fibersAllocated_{0}; /**< total number of fibers allocated */
size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
size_t fibersActive_{0}; /**< number of running or blocked fibers */
size_t fiberId_{0}; /**< id of last fiber used */
/**
* Maximum number of active fibers in the last period lasting
* Options::fibersPoolResizePeriod milliseconds.
*/
size_t maxFibersActiveLastPeriod_{0};
FContext::ContextStruct mainContext_; /**< stores loop function context */
std::unique_ptr<LoopController> loopController_;
bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
/**
* When we are inside FiberManager loop this points to FiberManager. Otherwise
* it's nullptr
*/
static FOLLY_TLS FiberManager* currentFiberManager_;
/**
* Allocator used to allocate stack for Fibers in the pool.
* Allocates stack on the stack of the main context.
*/
GuardPageAllocator stackAllocator_;
const Options options_; /**< FiberManager options */
/**
* Largest observed individual Fiber stack usage in bytes.
*/
size_t stackHighWatermark_{0};
/**
* Schedules a loop with loopController (unless already scheduled before).
*/
void ensureLoopScheduled();
/**
* @return An initialized Fiber object from the pool
*/
Fiber* getFiber();
/**
* Sets local data for given fiber if all conditions are met.
*/
void initLocalData(Fiber& fiber);
/**
* Function passed to the await call.
*/
folly::Function<void(Fiber&)> awaitFunc_;
/**
* Function passed to the runInMainContext call.
*/
folly::Function<void()> immediateFunc_;
/**
* Preempt runner.
*/
InlineFunctionRunner* preemptRunner_{nullptr};
/**
* Fiber's execution observer.
*/
ExecutionObserver* observer_{nullptr};
ExceptionCallback exceptionCallback_; /**< task exception callback */
folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
remoteReadyQueue_;
folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
remoteTaskQueue_;
std::shared_ptr<TimeoutController> timeoutManager_;
struct FibersPoolResizer {
explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
void operator()();
private:
FiberManager& fiberManager_;
};
FibersPoolResizer fibersPoolResizer_;
bool fibersPoolResizerScheduled_{false};
void doFibersPoolResizing();
/**
* Only local of this type will be available for fibers.
*/
std::type_index localType_;
void runReadyFiber(Fiber* fiber);
void remoteReadyInsert(Fiber* fiber);
#ifdef FOLLY_SANITIZE_ADDRESS
// These methods notify ASAN when a fiber is entered/exited so that ASAN can
// find the right stack extents when it needs to poison/unpoison the stack.
void registerFiberActivationWithAsan(Fiber* fiber);
void registerFiberDeactivationWithAsan(Fiber* fiber);
void unpoisonFiberStack(const Fiber* fiber);
#endif // FOLLY_SANITIZE_ADDRESS
#ifndef _WIN32
bool alternateSignalStackRegistered_{false};
void registerAlternateSignalStack();
#endif
};
/**
* @return true iff we are running in a fiber's context
*/
inline bool onFiber() {
auto fm = FiberManager::getFiberManagerUnsafe();
return fm ? fm->hasActiveFiber() : false;
}
/**
* Add a new task to be executed.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
inline void addTask(F&& func) {
return FiberManager::getFiberManager().addTask(std::forward<F>(func));
}
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
* Task functor is run and destroyed on the fiber context.
* Finally functor is run and destroyed on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
inline void addTaskFinally(F&& func, G&& finally) {
return FiberManager::getFiberManager().addTaskFinally(
std::forward<F>(func), std::forward<G>(finally));
}
/**
* Blocks task execution until given promise is fulfilled.
*
* Calls function passing in a Promise<T>, which has to be fulfilled.
*
* @return data which was used to fulfill the promise.
*/
template <typename F>
typename FirstArgOf<F>::type::value_type inline await(F&& func);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type inline runInMainContext(F&& func) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (UNLIKELY(fm == nullptr)) {
return func();
}
return fm->runInMainContext(std::forward<F>(func));
}
/**
* Returns a refference to a fiber-local context for given Fiber. Should be
* always called with the same T for each fiber. Fiber-local context is lazily
* default-constructed on first request.
* When new task is scheduled via addTask / addTaskRemote from a fiber its
* fiber-local context is copied into the new fiber.
*/
template <typename T>
T& local() {
auto fm = FiberManager::getFiberManagerUnsafe();
if (fm) {
return fm->local<T>();
}
return FiberManager::localThread<T>();
}
inline void yield() {
auto fm = FiberManager::getFiberManagerUnsafe();
if (fm) {
fm->yield();
} else {
std::this_thread::yield();
}
}
}
}
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/FiberManager-inl.h>
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/futures/Promise.h>
namespace folly {
namespace fibers {
template <typename F>
auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
using T = typename std::result_of<F()>::type;
using FutureT = typename folly::Unit::Lift<T>::type;
folly::Promise<FutureT> p;
auto f = p.getFuture();
addTaskFinally(
[func = std::forward<F>(func)]() mutable { return func(); },
[p = std::move(p)](folly::Try<T> && t) mutable {
p.setTry(std::move(t));
});
return f;
}
template <typename F>
auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
folly::Promise<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>
p;
auto f = p.getFuture();
addTaskRemote(
[ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
auto t = folly::makeTryWith(std::forward<F>(func));
runInMainContext([&]() { p.setTry(std::move(t)); });
});
return f;
}
}
}
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cassert>
#include <folly/CPortability.h>
#include <folly/Memory.h>
#include <folly/Optional.h>
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
#ifdef __APPLE__
#include <folly/ThreadLocal.h>
#endif
#include <folly/fibers/Baton.h>
#include <folly/fibers/Fiber.h>
#include <folly/fibers/LoopController.h>
#include <folly/fibers/Promise.h>
#include <folly/Try.h>
namespace folly {
namespace fibers {
namespace {
inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
#ifdef FOLLY_SANITIZE_ADDRESS
/* ASAN needs a lot of extra stack space.
16x is a conservative estimate, 8x also worked with tests
where it mattered. Note that overallocating here does not necessarily
increase RSS, since unused memory is pretty much free. */
opts.stackSize *= 16;
#endif
return opts;
}
} // anonymous
inline void FiberManager::ensureLoopScheduled() {
if (isLoopScheduled_) {
return;
}
isLoopScheduled_ = true;
loopController_->schedule();
}
inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
#ifdef FOLLY_SANITIZE_ADDRESS
registerFiberActivationWithAsan(fiber);
#endif
activeFiber_ = fiber;
return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
}
inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
DCHECK_EQ(activeFiber_, fiber);
#ifdef FOLLY_SANITIZE_ADDRESS
registerFiberDeactivationWithAsan(fiber);
#endif
activeFiber_ = nullptr;
return jumpContext(&fiber->fcontext_, &mainContext_, 0);
}
inline void FiberManager::runReadyFiber(Fiber* fiber) {
SCOPE_EXIT {
assert(currentFiber_ == nullptr);
assert(activeFiber_ == nullptr);
};
assert(
fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN);
currentFiber_ = fiber;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
if (observer_) {
observer_->starting(reinterpret_cast<uintptr_t>(fiber));
}
while (fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN) {
activateFiber(fiber);
if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
try {
immediateFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running immediateFunc_");
}
immediateFunc_ = nullptr;
fiber->state_ = Fiber::READY_TO_RUN;
}
}
if (fiber->state_ == Fiber::AWAITING) {
awaitFunc_(*fiber);
awaitFunc_ = nullptr;
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
} else if (fiber->state_ == Fiber::INVALID) {
assert(fibersActive_ > 0);
--fibersActive_;
// Making sure that task functor is deleted once task is complete.
// NOTE: we must do it on main context, as the fiber is not
// running at this point.
fiber->func_ = nullptr;
fiber->resultFunc_ = nullptr;
if (fiber->finallyFunc_) {
try {
fiber->finallyFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running finallyFunc_");
}
fiber->finallyFunc_ = nullptr;
}
// Make sure LocalData is not accessible from its destructor
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
fiber->localData_.reset();
fiber->rcontext_.reset();
if (fibersPoolSize_ < options_.maxFibersPoolSize ||
options_.fibersPoolResizePeriodMs > 0) {
fibersPool_.push_front(*fiber);
++fibersPoolSize_;
} else {
delete fiber;
assert(fibersAllocated_ > 0);
--fibersAllocated_;
}
} else if (fiber->state_ == Fiber::YIELDED) {
if (observer_) {
observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
fiber->state_ = Fiber::READY_TO_RUN;
yieldedFibers_.push_back(*fiber);
}
}
inline bool FiberManager::loopUntilNoReady() {
#ifndef _WIN32
if (UNLIKELY(!alternateSignalStackRegistered_)) {
registerAlternateSignalStack();
}
#endif
// Support nested FiberManagers
auto originalFiberManager = this;
std::swap(currentFiberManager_, originalFiberManager);
SCOPE_EXIT {
isLoopScheduled_ = false;
if (!readyFibers_.empty()) {
ensureLoopScheduled();
}
std::swap(currentFiberManager_, originalFiberManager);
CHECK_EQ(this, originalFiberManager);
};
bool hadRemoteFiber = true;
while (hadRemoteFiber) {
hadRemoteFiber = false;
while (!readyFibers_.empty()) {
auto& fiber = readyFibers_.front();
readyFibers_.pop_front();
runReadyFiber(&fiber);
}
remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
runReadyFiber(fiber);
hadRemoteFiber = true;
});
remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
std::unique_ptr<RemoteTask> task(taskPtr);
auto fiber = getFiber();
if (task->localData) {
fiber->localData_ = *task->localData;
}
fiber->rcontext_ = std::move(task->rcontext);
fiber->setFunction(std::move(task->func));
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
runReadyFiber(fiber);
hadRemoteFiber = true;
});
}
if (observer_) {
for (auto& yielded : yieldedFibers_) {
observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
}
}
readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
return fibersActive_ > 0;
}
// We need this to be in a struct, not inlined in addTask, because clang crashes
// otherwise.
template <typename F>
struct FiberManager::AddTaskHelper {
class Func;
static constexpr bool allocateInBuffer =
sizeof(Func) <= Fiber::kUserBufferSize;
class Func {
public:
Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
void operator()() {
try {
func_();
} catch (...) {
fm_.exceptionCallback_(
std::current_exception(), "running Func functor");
}
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
FiberManager& fm_;
};
};
template <typename F>
void FiberManager::addTask(F&& func) {
typedef AddTaskHelper<F> Helper;
auto fiber = getFiber();
initLocalData(*fiber);
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
} else {
auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
ensureLoopScheduled();
}
template <typename F>
void FiberManager::addTaskRemote(F&& func) {
auto task = [&]() {
auto currentFm = getFiberManagerUnsafe();
if (currentFm && currentFm->currentFiber_ &&
currentFm->localType_ == localType_) {
return folly::make_unique<RemoteTask>(
std::forward<F>(func), currentFm->currentFiber_->localData_);
}
return folly::make_unique<RemoteTask>(std::forward<F>(func));
}();
auto insertHead = [&]() {
return remoteTaskQueue_.insertHead(task.release());
};
loopController_->scheduleThreadSafe(std::ref(insertHead));
}
template <typename X>
struct IsRvalueRefTry {
static const bool value = false;
};
template <typename T>
struct IsRvalueRefTry<folly::Try<T>&&> {
static const bool value = true;
};
// We need this to be in a struct, not inlined in addTaskFinally, because clang
// crashes otherwise.
template <typename F, typename G>
struct FiberManager::AddTaskFinallyHelper {
class Func;
typedef typename std::result_of<F()>::type Result;
class Finally {
public:
Finally(G finally, FiberManager& fm)
: finally_(std::move(finally)), fm_(fm) {}
void operator()() {
try {
finally_(std::move(*result_));
} catch (...) {
fm_.exceptionCallback_(
std::current_exception(), "running Finally functor");
}
if (allocateInBuffer) {
this->~Finally();
} else {
delete this;
}
}
private:
friend class Func;
G finally_;
folly::Optional<folly::Try<Result>> result_;
FiberManager& fm_;
};
class Func {
public:
Func(F func, Finally& finally)
: func_(std::move(func)), result_(finally.result_) {}
void operator()() {
result_ = folly::makeTryWith(std::move(func_));
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
folly::Optional<folly::Try<Result>>& result_;
};
static constexpr bool allocateInBuffer =
sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
};
template <typename F, typename G>
void FiberManager::addTaskFinally(F&& func, G&& finally) {
typedef typename std::result_of<F()>::type Result;
static_assert(
IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
"finally(arg): arg must be Try<T>&&");
static_assert(
std::is_convertible<
Result,
typename std::remove_reference<
typename FirstArgOf<G>::type>::type::element_type>::value,
"finally(Try<T>&&): T must be convertible from func()'s return type");
auto fiber = getFiber();
initLocalData(*fiber);
typedef AddTaskFinallyHelper<
typename std::decay<F>::type,
typename std::decay<G>::type>
Helper;
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
auto finallyLoc =
static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
} else {
auto finallyLoc =
new typename Helper::Finally(std::forward<G>(finally), *this);
auto funcLoc =
new typename Helper::Func(std::forward<F>(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
ensureLoopScheduled();
}
template <typename F>
typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
if (UNLIKELY(activeFiber_ == nullptr)) {
return func();
}
typedef typename std::result_of<F()>::type Result;
folly::Try<Result> result;
auto f = [&func, &result]() mutable {
result = folly::makeTryWith(std::forward<F>(func));
};
immediateFunc_ = std::ref(f);
activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
return std::move(result).value();
}
inline FiberManager& FiberManager::getFiberManager() {
assert(currentFiberManager_ != nullptr);
return *currentFiberManager_;
}
inline FiberManager* FiberManager::getFiberManagerUnsafe() {
return currentFiberManager_;
}
inline bool FiberManager::hasActiveFiber() const {
return activeFiber_ != nullptr;
}
inline void FiberManager::yield() {
assert(currentFiberManager_ == this);
assert(activeFiber_ != nullptr);
assert(activeFiber_->state_ == Fiber::RUNNING);
activeFiber_->preempt(Fiber::YIELDED);
}
template <typename T>
T& FiberManager::local() {
if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
return currentFiber_->localData_.get<T>();
}
return localThread<T>();
}
template <typename T>
T& FiberManager::localThread() {
#ifndef __APPLE__
static thread_local T t;
return t;
#else // osx doesn't support thread_local
static ThreadLocal<T> t;
return *t;
#endif
}
inline void FiberManager::initLocalData(Fiber& fiber) {
auto fm = getFiberManagerUnsafe();
if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
fiber.localData_ = fm->currentFiber_->localData_;
}
fiber.rcontext_ = RequestContext::saveContext();
}
template <typename LocalT>
FiberManager::FiberManager(
LocalType<LocalT>,
std::unique_ptr<LoopController> loopController__,
Options options)
: loopController_(std::move(loopController__)),
stackAllocator_(options.useGuardPages),
options_(preprocessOptions(std::move(options))),
exceptionCallback_([](std::exception_ptr eptr, std::string context) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& e) {
LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
<< e.what() << "' was thrown in "
<< "FiberManager with context '" << context << "'";
} catch (...) {
LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
<< "context '" << context << "'";
}
}),
timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
fibersPoolResizer_(*this),
localType_(typeid(LocalT)) {
loopController_->setFiberManager(this);
}
template <typename F>
typename FirstArgOf<F>::type::value_type inline await(F&& func) {
typedef typename FirstArgOf<F>::type::value_type Result;
typedef typename FirstArgOf<F>::type::baton_type BatonT;
return Promise<Result, BatonT>::await(std::forward<F>(func));
}
}
}
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <memory>
#include <queue>
#include <thread>
#include <type_traits>
#include <typeindex>
#include <unordered_set>
#include <vector>
#include <folly/AtomicIntrusiveLinkedList.h>
#include <folly/Executor.h>
#include <folly/IntrusiveList.h>
#include <folly/Likely.h>
#include <folly/Try.h>
#include <folly/io/async/Request.h>
#include <folly/experimental/ExecutionObserver.h>
#include <folly/fibers/BoostContextCompatibility.h>
#include <folly/fibers/Fiber.h>
#include <folly/fibers/GuardPageAllocator.h>
#include <folly/fibers/TimeoutController.h>
#include <folly/fibers/traits.h>
namespace folly {
template <class T>
class Future;
namespace fibers {
class Baton;
class Fiber;
class LoopController;
class TimeoutController;
template <typename T>
class LocalType {};
class InlineFunctionRunner {
public:
virtual ~InlineFunctionRunner() {}
/**
* func must be executed inline and only once.
*/
virtual void run(folly::Function<void()> func) = 0;
};
/**
* @class FiberManager
* @brief Single-threaded task execution engine.
*
* FiberManager allows semi-parallel task execution on the same thread. Each
* task can notify FiberManager that it is blocked on something (via await())
* call. This will pause execution of this task and it will be resumed only
* when it is unblocked (via setData()).
*/
class FiberManager : public ::folly::Executor {
public:
struct Options {
static constexpr size_t kDefaultStackSize{16 * 1024};
/**
* Maximum stack size for fibers which will be used for executing all the
* tasks.
*/
size_t stackSize{kDefaultStackSize};
/**
* Record exact amount of stack used.
*
* This is fairly expensive: we fill each newly allocated stack
* with some known value and find the boundary of unused stack
* with linear search every time we surrender the stack back to fibersPool.
* 0 disables stack recording.
*/
size_t recordStackEvery{0};
/**
* Keep at most this many free fibers in the pool.
* This way the total number of fibers in the system is always bounded
* by the number of active fibers + maxFibersPoolSize.
*/
size_t maxFibersPoolSize{1000};
/**
* Protect limited amount of fiber stacks with guard pages.
*/
bool useGuardPages{true};
/**
* Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
* milliseconds. If value is 0, periodic resizing of the fibers pool is
* disabled.
*/
uint32_t fibersPoolResizePeriodMs{0};
constexpr Options() {}
};
using ExceptionCallback =
folly::Function<void(std::exception_ptr, std::string)>;
FiberManager(const FiberManager&) = delete;
FiberManager& operator=(const FiberManager&) = delete;
/**
* Initializes, but doesn't start FiberManager loop
*
* @param loopController
* @param options FiberManager options
*/
explicit FiberManager(
std::unique_ptr<LoopController> loopController,
Options options = Options());
/**
* Initializes, but doesn't start FiberManager loop
*
* @param loopController
* @param options FiberManager options
* @tparam LocalT only local of this type may be stored on fibers.
* Locals of other types will be considered thread-locals.
*/
template <typename LocalT>
FiberManager(
LocalType<LocalT>,
std::unique_ptr<LoopController> loopController,
Options options = Options());
~FiberManager();
/**
* Controller access.
*/
LoopController& loopController();
const LoopController& loopController() const;
/**
* Keeps running ready tasks until the list of ready tasks is empty.
*
* @return True if there are any waiting tasks remaining.
*/
bool loopUntilNoReady();
/**
* @return true if there are outstanding tasks.
*/
bool hasTasks() const;
/**
* Sets exception callback which will be called if any of the tasks throws an
* exception.
*
* @param ec
*/
void setExceptionCallback(ExceptionCallback ec);
/**
* Add a new task to be executed. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTask(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
/**
* Add a new task to be executed. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTaskRemote(F&& func);
/**
* Add a new task to be executed and return a future that will be set on
* return from func. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
auto addTaskRemoteFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
// Executor interface calls addTaskRemote
void add(folly::Func f) override {
addTaskRemote(std::move(f));
}
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
void addTaskFinally(F&& func, G&& finally);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type runInMainContext(F&& func);
/**
* Returns a refference to a fiber-local context for given Fiber. Should be
* always called with the same T for each fiber. Fiber-local context is lazily
* default-constructed on first request.
* When new task is scheduled via addTask / addTaskRemote from a fiber its
* fiber-local context is copied into the new fiber.
*/
template <typename T>
T& local();
template <typename T>
static T& localThread();
/**
* @return How many fiber objects (and stacks) has this manager allocated.
*/
size_t fibersAllocated() const;
/**
* @return How many of the allocated fiber objects are currently
* in the free pool.
*/
size_t fibersPoolSize() const;
/**
* return true if running activeFiber_ is not nullptr.
*/
bool hasActiveFiber() const;
/**
* @return The currently running fiber or null if no fiber is executing.
*/
Fiber* currentFiber() const {
return currentFiber_;
}
/**
* @return What was the most observed fiber stack usage (in bytes).
*/
size_t stackHighWatermark() const;
/**
* Yield execution of the currently running fiber. Must only be called from a
* fiber executing on this FiberManager. The calling fiber will be scheduled
* when all other fibers have had a chance to run and the event loop is
* serviced.
*/
void yield();
/**
* Setup fibers execution observation/instrumentation. Fiber locals are
* available to observer.
*
* @param observer Fiber's execution observer.
*/
void setObserver(ExecutionObserver* observer);
/**
* @return Current observer for this FiberManager. Returns nullptr
* if no observer has been set.
*/
ExecutionObserver* getObserver();
/**
* Setup fibers preempt runner.
*/
void setPreemptRunner(InlineFunctionRunner* preemptRunner);
/**
* Returns an estimate of the number of fibers which are waiting to run (does
* not include fibers or tasks scheduled remotely).
*/
size_t runQueueSize() const {
return readyFibers_.size() + yieldedFibers_.size();
}
static FiberManager& getFiberManager();
static FiberManager* getFiberManagerUnsafe();
private:
friend class Baton;
friend class Fiber;
template <typename F>
struct AddTaskHelper;
template <typename F, typename G>
struct AddTaskFinallyHelper;
struct RemoteTask {
template <typename F>
explicit RemoteTask(F&& f)
: func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
template <typename F>
RemoteTask(F&& f, const Fiber::LocalData& localData_)
: func(std::forward<F>(f)),
localData(folly::make_unique<Fiber::LocalData>(localData_)),
rcontext(RequestContext::saveContext()) {}
folly::Function<void()> func;
std::unique_ptr<Fiber::LocalData> localData;
std::shared_ptr<RequestContext> rcontext;
AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
};
intptr_t activateFiber(Fiber* fiber);
intptr_t deactivateFiber(Fiber* fiber);
typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
GlobalFiberTailQueue;
Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
/**
* Same as active fiber, but also set for functions run from fiber on main
* context.
*/
Fiber* currentFiber_{nullptr};
FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
execution */
FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
size_t fibersAllocated_{0}; /**< total number of fibers allocated */
size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
size_t fibersActive_{0}; /**< number of running or blocked fibers */
size_t fiberId_{0}; /**< id of last fiber used */
/**
* Maximum number of active fibers in the last period lasting
* Options::fibersPoolResizePeriod milliseconds.
*/
size_t maxFibersActiveLastPeriod_{0};
FContext::ContextStruct mainContext_; /**< stores loop function context */
std::unique_ptr<LoopController> loopController_;
bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
/**
* When we are inside FiberManager loop this points to FiberManager. Otherwise
* it's nullptr
*/
static FOLLY_TLS FiberManager* currentFiberManager_;
/**
* Allocator used to allocate stack for Fibers in the pool.
* Allocates stack on the stack of the main context.
*/
GuardPageAllocator stackAllocator_;
const Options options_; /**< FiberManager options */
/**
* Largest observed individual Fiber stack usage in bytes.
*/
size_t stackHighWatermark_{0};
/**
* Schedules a loop with loopController (unless already scheduled before).
*/
void ensureLoopScheduled();
/**
* @return An initialized Fiber object from the pool
*/
Fiber* getFiber();
/**
* Sets local data for given fiber if all conditions are met.
*/
void initLocalData(Fiber& fiber);
/**
* Function passed to the await call.
*/
folly::Function<void(Fiber&)> awaitFunc_;
/**
* Function passed to the runInMainContext call.
*/
folly::Function<void()> immediateFunc_;
/**
* Preempt runner.
*/
InlineFunctionRunner* preemptRunner_{nullptr};
/**
* Fiber's execution observer.
*/
ExecutionObserver* observer_{nullptr};
ExceptionCallback exceptionCallback_; /**< task exception callback */
folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
remoteReadyQueue_;
folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
remoteTaskQueue_;
std::shared_ptr<TimeoutController> timeoutManager_;
struct FibersPoolResizer {
explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
void operator()();
private:
FiberManager& fiberManager_;
};
FibersPoolResizer fibersPoolResizer_;
bool fibersPoolResizerScheduled_{false};
void doFibersPoolResizing();
/**
* Only local of this type will be available for fibers.
*/
std::type_index localType_;
void runReadyFiber(Fiber* fiber);
void remoteReadyInsert(Fiber* fiber);
#ifdef FOLLY_SANITIZE_ADDRESS
// These methods notify ASAN when a fiber is entered/exited so that ASAN can
// find the right stack extents when it needs to poison/unpoison the stack.
void registerFiberActivationWithAsan(Fiber* fiber);
void registerFiberDeactivationWithAsan(Fiber* fiber);
void unpoisonFiberStack(const Fiber* fiber);
#endif // FOLLY_SANITIZE_ADDRESS
#ifndef _WIN32
bool alternateSignalStackRegistered_{false};
void registerAlternateSignalStack();
#endif
};
/**
* @return true iff we are running in a fiber's context
*/
inline bool onFiber() {
auto fm = FiberManager::getFiberManagerUnsafe();
return fm ? fm->hasActiveFiber() : false;
}
/**
* Add a new task to be executed.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
inline void addTask(F&& func) {
return FiberManager::getFiberManager().addTask(std::forward<F>(func));
}
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
* Task functor is run and destroyed on the fiber context.
* Finally functor is run and destroyed on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
inline void addTaskFinally(F&& func, G&& finally) {
return FiberManager::getFiberManager().addTaskFinally(
std::forward<F>(func), std::forward<G>(finally));
}
/**
* Blocks task execution until given promise is fulfilled.
*
* Calls function passing in a Promise<T>, which has to be fulfilled.
*
* @return data which was used to fulfill the promise.
*/
template <typename F>
typename FirstArgOf<F>::type::value_type inline await(F&& func);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type inline runInMainContext(F&& func) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (UNLIKELY(fm == nullptr)) {
return func();
}
return fm->runInMainContext(std::forward<F>(func));
}
/**
* Returns a refference to a fiber-local context for given Fiber. Should be
* always called with the same T for each fiber. Fiber-local context is lazily
* default-constructed on first request.
* When new task is scheduled via addTask / addTaskRemote from a fiber its
* fiber-local context is copied into the new fiber.
*/
template <typename T>
T& local() {
auto fm = FiberManager::getFiberManagerUnsafe();
if (fm) {
return fm->local<T>();
}
return FiberManager::localThread<T>();
}
inline void yield() {
auto fm = FiberManager::getFiberManagerUnsafe();
if (fm) {
fm->yield();
} else {
std::this_thread::yield();
}
}
}
}
#include <folly/fibers/FiberManagerInternal-inl.h>
......@@ -16,7 +16,7 @@
#pragma once
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
......
......@@ -15,7 +15,7 @@
*/
#include <folly/Optional.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/ForEach.h>
namespace folly {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment