Commit 5de6b458 authored by Rushi Desai's avatar Rushi Desai Committed by afrind

Move mcrouter/lib/fibers to folly/experimental/fibers

Summary:
The mcrouter fibers library is quite general purpose and reusable by other projects. Hence I'm moving it into folly.

Test Plan:
fbconfig -r folly/experimental/fibers && fbmake runtests
fbconfig -r mcrouter && fbmake runtests
fbconfig -r tao && fbmake
fbconfig -r ti && fbmake

Reviewed By: andrii@fb.com

Subscribers: vikas, zhuohuang, jmkaldor, jhunt, pavlo, int, aap, trunkagent, fredemmott, alikhtarov, folly-diffs@, jsedgwick, yfeldblum, chalfant, chip

FB internal diff: D1958061

Signature: t1:1958061:1428005194:b57bfecfe9678e81c48526f57e6197270e2b5a27
parent 0217505d
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <vector>
#include <folly/experimental/fibers/FiberManager.h>
namespace folly { namespace fibers {
template <typename T>
TaskIterator<T>::TaskIterator(TaskIterator&& other) noexcept
: context_(std::move(other.context_)),
id_(other.id_) {
}
template <typename T>
TaskIterator<T>::TaskIterator(std::shared_ptr<Context> context)
: context_(std::move(context)),
id_(-1) {
assert(context_);
}
template <typename T>
inline bool TaskIterator<T>::hasCompleted() const {
return context_->tasksConsumed < context_->results.size();
}
template <typename T>
inline bool TaskIterator<T>::hasPending() const {
return !context_.unique();
}
template <typename T>
inline bool TaskIterator<T>::hasNext() const {
return hasPending() || hasCompleted();
}
template <typename T>
folly::Try<T> TaskIterator<T>::awaitNextResult() {
assert(hasCompleted() || hasPending());
reserve(1);
size_t i = context_->tasksConsumed++;
id_ = context_->results[i].first;
return std::move(context_->results[i].second);
}
template <typename T>
inline T TaskIterator<T>::awaitNext() {
return std::move(awaitNextResult().value());
}
template <>
inline void TaskIterator<void>::awaitNext() {
awaitNextResult().value();
}
template <typename T>
inline void TaskIterator<T>::reserve(size_t n) {
size_t tasksReady = context_->results.size() - context_->tasksConsumed;
// we don't need to do anything if there are already n or more tasks complete
// or if we have no tasks left to execute.
if (!hasPending() || tasksReady >= n) {
return;
}
n -= tasksReady;
size_t tasksLeft = context_->totalTasks - context_->results.size();
n = std::min(n, tasksLeft);
await(
[this, n](Promise<void> promise) {
context_->tasksToFulfillPromise = n;
context_->promise.assign(std::move(promise));
});
}
template <typename T>
inline size_t TaskIterator<T>::getTaskID() const {
assert(id_ != -1);
return id_;
}
template <class InputIterator>
TaskIterator<typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
addTasks(InputIterator first, InputIterator last) {
typedef typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type
ResultType;
typedef TaskIterator<ResultType> IteratorType;
auto context = std::make_shared<typename IteratorType::Context>();
context->totalTasks = std::distance(first, last);
context->results.reserve(context->totalTasks);
for (size_t i = 0; first != last; ++i, ++first) {
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
addTask(
[i, context, f = std::move(*first)]() {
context->results.emplace_back(i, folly::makeTryFunction(std::move(f)));
// Check for awaiting iterator.
if (context->promise.hasValue()) {
if (--context->tasksToFulfillPromise == 0) {
context->promise->setValue();
context->promise.clear();
}
}
}
);
#ifdef __clang__
#pragma clang diagnostic pop
#endif
}
return IteratorType(std::move(context));
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <vector>
#include <folly/Optional.h>
#include <folly/experimental/fibers/Promise.h>
#include <folly/futures/Try.h>
namespace folly { namespace fibers {
template <typename T>
class TaskIterator;
/**
* Schedules several tasks and immediately returns an iterator, that
* allow to traverse tasks in the order of their completion. All results and
* exptions thrown are stored alongside with the task id and are
* accessible via iterator.
*
* @param first Range of tasks to be scheduled
* @param last
*
* @return movable, non-copyable iterator
*/
template <class InputIterator>
TaskIterator<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
inline addTasks(InputIterator first, InputIterator last);
template <typename T>
class TaskIterator {
public:
typedef T value_type;
// not copyable
TaskIterator(const TaskIterator& other) = delete;
TaskIterator& operator=(const TaskIterator& other) = delete;
// movable
TaskIterator(TaskIterator&& other) noexcept;
TaskIterator& operator=(TaskIterator&& other) = delete;
/**
* @return True if there are tasks immediately available to be consumed (no
* need to await on them).
*/
bool hasCompleted() const;
/**
* @return True if there are tasks pending execution (need to awaited on).
*/
bool hasPending() const;
/**
* @return True if there are any tasks (hasCompleted() || hasPending()).
*/
bool hasNext() const;
/**
* Await for another task to complete. Will not await if the result is
* already available.
*
* @return result of the task completed.
* @throw exception thrown by the task.
*/
T awaitNext();
/**
* Await until the specified number of tasks completes or there are no
* tasks left to await for.
* Note: Will not await if there are already the specified number of tasks
* available.
*
* @param n Number of tasks to await for completition.
*/
void reserve(size_t n);
/**
* @return id of the last task that was processed by awaitNext().
*/
size_t getTaskID() const;
private:
template <class InputIterator>
friend TaskIterator<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
addTasks(InputIterator first, InputIterator last);
struct Context {
std::vector<std::pair<size_t, folly::Try<T>>> results;
folly::Optional<Promise<void>> promise;
size_t totalTasks{0};
size_t tasksConsumed{0};
size_t tasksToFulfillPromise{0};
};
std::shared_ptr<Context> context_;
size_t id_;
explicit TaskIterator(std::shared_ptr<Context> context);
folly::Try<T> awaitNextResult();
};
}}
#include <folly/experimental/fibers/AddTasks-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/FiberManager.h>
namespace folly { namespace fibers {
inline Baton::Baton() : Baton(NO_WAITER) {
assert(Baton(NO_WAITER).futex_.futex == static_cast<uint32_t>(NO_WAITER));
assert(Baton(POSTED).futex_.futex == static_cast<uint32_t>(POSTED));
assert(Baton(TIMEOUT).futex_.futex == static_cast<uint32_t>(TIMEOUT));
assert(Baton(THREAD_WAITING).futex_.futex ==
static_cast<uint32_t>(THREAD_WAITING));
assert(futex_.futex.is_lock_free());
assert(waitingFiber_.is_lock_free());
}
template <typename F>
void Baton::wait(F&& mainContextFunc) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (!fm || !fm->activeFiber_) {
mainContextFunc();
return waitThread();
}
return waitFiber(*fm, std::forward<F>(mainContextFunc));
}
template <typename F>
void Baton::waitFiber(FiberManager& fm, F&& mainContextFunc) {
auto& waitingFiber = waitingFiber_;
auto f = [&mainContextFunc, &waitingFiber](Fiber& fiber) mutable {
auto baton_fiber = waitingFiber.load();
do {
if (LIKELY(baton_fiber == NO_WAITER)) {
continue;
} else if (baton_fiber == POSTED || baton_fiber == TIMEOUT) {
fiber.setData(0);
break;
} else {
throw std::logic_error("Some Fiber is already waiting on this Baton.");
}
} while(!waitingFiber.compare_exchange_weak(
baton_fiber,
reinterpret_cast<intptr_t>(&fiber)));
mainContextFunc();
};
fm.awaitFunc_ = std::ref(f);
fm.activeFiber_->preempt(Fiber::AWAITING);
}
template <typename F>
bool Baton::timed_wait(TimeoutController::Duration timeout,
F&& mainContextFunc) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (!fm || !fm->activeFiber_) {
mainContextFunc();
return timedWaitThread(timeout);
}
auto& baton = *this;
bool canceled = false;
auto timeoutFunc = [&baton, &canceled]() mutable {
baton.postHelper(TIMEOUT);
canceled = true;
};
auto id = fm->timeoutManager_->registerTimeout(
std::ref(timeoutFunc), timeout);
waitFiber(*fm, std::move(mainContextFunc));
auto posted = waitingFiber_ == POSTED;
if (!canceled) {
fm->timeoutManager_->cancel(id);
}
return posted;
}
template<typename C, typename D>
bool Baton::timed_wait(const std::chrono::time_point<C,D>& timeout) {
auto now = C::now();
if (LIKELY(now <= timeout)) {
return timed_wait(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout - now));
} else {
return timed_wait(TimeoutController::Duration(0));
}
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Baton.h"
#include <folly/detail/MemoryIdler.h>
namespace folly { namespace fibers {
void Baton::wait() {
wait([](){});
}
bool Baton::timed_wait(TimeoutController::Duration timeout) {
return timed_wait(timeout, [](){});
}
void Baton::waitThread() {
if (spinWaitForEarlyPost()) {
assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
return;
}
auto fiber = waitingFiber_.load();
if (LIKELY(fiber == NO_WAITER &&
waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
do {
folly::detail::MemoryIdler::futexWait(futex_.futex, THREAD_WAITING);
fiber = waitingFiber_.load(std::memory_order_relaxed);
} while (fiber == THREAD_WAITING);
}
if (LIKELY(fiber == POSTED)) {
return;
}
// Handle errors
if (fiber == TIMEOUT) {
throw std::logic_error("Thread baton can't have timeout status");
}
if (fiber == THREAD_WAITING) {
throw std::logic_error("Other thread is already waiting on this baton");
}
throw std::logic_error("Other fiber is already waiting on this baton");
}
bool Baton::spinWaitForEarlyPost() {
static_assert(PreBlockAttempts > 0,
"isn't this assert clearer than an uninitialized variable warning?");
for (int i = 0; i < PreBlockAttempts; ++i) {
if (try_wait()) {
// hooray!
return true;
}
#if FOLLY_X64
// The pause instruction is the polite way to spin, but it doesn't
// actually affect correctness to omit it if we don't have it.
// Pausing donates the full capabilities of the current core to
// its other hyperthreads for a dozen cycles or so
asm volatile ("pause");
#endif
}
return false;
}
bool Baton::timedWaitThread(TimeoutController::Duration timeout) {
if (spinWaitForEarlyPost()) {
assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
return true;
}
auto fiber = waitingFiber_.load();
if (LIKELY(fiber == NO_WAITER &&
waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
auto deadline = TimeoutController::Clock::now() + timeout;
do {
const auto wait_rv =
futex_.futex.futexWaitUntil(THREAD_WAITING, deadline);
if (wait_rv == folly::detail::FutexResult::TIMEDOUT) {
return false;
}
fiber = waitingFiber_.load(std::memory_order_relaxed);
} while (fiber == THREAD_WAITING);
}
if (LIKELY(fiber == POSTED)) {
return true;
}
// Handle errors
if (fiber == TIMEOUT) {
throw std::logic_error("Thread baton can't have timeout status");
}
if (fiber == THREAD_WAITING) {
throw std::logic_error("Other thread is already waiting on this baton");
}
throw std::logic_error("Other fiber is already waiting on this baton");
}
void Baton::post() {
postHelper(POSTED);
}
void Baton::postHelper(intptr_t new_value) {
auto fiber = waitingFiber_.load();
do {
if (fiber == THREAD_WAITING) {
assert(new_value == POSTED);
return postThread();
}
if (fiber == POSTED || fiber == TIMEOUT) {
return;
}
} while (!waitingFiber_.compare_exchange_weak(fiber, new_value));
if (fiber != NO_WAITER) {
reinterpret_cast<Fiber*>(fiber)->setData(0);
}
}
bool Baton::try_wait() {
auto state = waitingFiber_.load();
return state == POSTED;
}
void Baton::postThread() {
auto expected = THREAD_WAITING;
if (!waitingFiber_.compare_exchange_strong(expected, POSTED)) {
return;
}
futex_.futex.futexWake(1);
}
void Baton::reset() {
waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);;
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <folly/detail/Futex.h>
#include <folly/experimental/fibers/TimeoutController.h>
namespace folly { namespace fibers {
class Fiber;
/**
* @class Baton
*
* Primitive which allows to put current Fiber to sleep and wake it from another
* Fiber/thread.
*/
class Baton {
public:
Baton();
~Baton() {}
/**
* Puts active fiber to sleep. Returns when post is called.
*/
void wait();
/**
* Puts active fiber to sleep. Returns when post is called.
*
* @param mainContextFunc this function is immediately executed on the main
* context.
*/
template <typename F>
void wait(F&& mainContextFunc);
/**
* This is here only not break tao/locks. Please don't use it, because it is
* inefficient when used on Fibers.
*/
template<typename C, typename D = typename C::duration>
bool timed_wait(const std::chrono::time_point<C,D>& timeout);
/**
* Puts active fiber to sleep. Returns when post is called.
*
* @param timeout Baton will be automatically awaken if timeout is hit
*
* @return true if was posted, false if timeout expired
*/
bool timed_wait(TimeoutController::Duration timeout);
/**
* Puts active fiber to sleep. Returns when post is called.
*
* @param timeout Baton will be automatically awaken if timeout is hit
* @param mainContextFunc this function is immediately executed on the main
* context.
*
* @return true if was posted, false if timeout expired
*/
template <typename F>
bool timed_wait(TimeoutController::Duration timeout, F&& mainContextFunc);
/**
* Checks if the baton has been posted without blocking.
* @return true iff the baton has been posted.
*/
bool try_wait();
/**
* Wakes up Fiber which was waiting on this Baton (or if no Fiber is waiting,
* next wait() call will return immediately).
*/
void post();
/**
* Reset's the baton (equivalent to destroying the object and constructing
* another one in place).
* Caller is responsible for making sure no one is waiting on/posting the
* baton when reset() is called.
*/
void reset();
private:
enum {
/**
* Must be positive. If multiple threads are actively using a
* higher-level data structure that uses batons internally, it is
* likely that the post() and wait() calls happen almost at the same
* time. In this state, we lose big 50% of the time if the wait goes
* to sleep immediately. On circa-2013 devbox hardware it costs about
* 7 usec to FUTEX_WAIT and then be awoken (half the t/iter as the
* posix_sem_pingpong test in BatonTests). We can improve our chances
* of early post by spinning for a bit, although we have to balance
* this against the loss if we end up sleeping any way. Spins on this
* hw take about 7 nanos (all but 0.5 nanos is the pause instruction).
* We give ourself 300 spins, which is about 2 usec of waiting. As a
* partial consolation, since we are using the pause instruction we
* are giving a speed boost to the colocated hyperthread.
*/
PreBlockAttempts = 300,
};
explicit Baton(intptr_t state) : waitingFiber_(state) {};
void postHelper(intptr_t new_value);
void postThread();
void waitThread();
template <typename F>
inline void waitFiber(FiberManager& fm, F&& mainContextFunc);
/**
* Spin for "some time" (see discussion on PreBlockAttempts) waiting
* for a post.
* @return true if we received a post the spin wait, false otherwise. If the
* function returns true then Baton state is guaranteed to be POSTED
*/
bool spinWaitForEarlyPost();
bool timedWaitThread(TimeoutController::Duration timeout);
static constexpr intptr_t NO_WAITER = 0;
static constexpr intptr_t POSTED = -1;
static constexpr intptr_t TIMEOUT = -2;
static constexpr intptr_t THREAD_WAITING = -3;
union {
std::atomic<intptr_t> waitingFiber_;
struct {
folly::detail::Futex<> futex;
int32_t _unused_packing;
} futex_;
};
};
}}
#include <folly/experimental/fibers/Baton-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <boost/context/all.hpp>
#include <boost/version.hpp>
/**
* Wrappers for different versions of boost::context library
* API reference for different versions
* Boost 1.51: http://www.boost.org/doc/libs/1_51_0/libs/context/doc/html/context/context/boost_fcontext.html
* Boost 1.52: http://www.boost.org/doc/libs/1_52_0/libs/context/doc/html/context/context/boost_fcontext.html
* Boost 1.56: http://www.boost.org/doc/libs/1_56_0/libs/context/doc/html/context/context/boost_fcontext.html
*/
namespace folly { namespace fibers {
struct FContext {
public:
#if BOOST_VERSION >= 105200
using ContextStruct = boost::context::fcontext_t;
#else
using ContextStruct = boost::ctx::fcontext_t;
#endif
void* stackLimit() const {
return stackLimit_;
}
void* stackBase() const {
return stackBase_;
}
private:
void* stackLimit_;
void* stackBase_;
#if BOOST_VERSION >= 105600
ContextStruct context_;
#elif BOOST_VERSION >= 105200
ContextStruct* context_;
#else
ContextStruct context_;
#endif
friend intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
intptr_t p);
friend intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
intptr_t p);
friend FContext makeContext(void* stackLimit, size_t stackSize,
void(*fn)(intptr_t));
};
inline intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
intptr_t p) {
#if BOOST_VERSION >= 105600
return boost::context::jump_fcontext(&oldC->context_, *newC, p);
#elif BOOST_VERSION >= 105200
return boost::context::jump_fcontext(oldC->context_, newC, p);
#else
return jump_fcontext(&oldC->context_, newC, p);
#endif
}
inline intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
intptr_t p) {
#if BOOST_VERSION >= 105200
return boost::context::jump_fcontext(oldC, newC->context_, p);
#else
return jump_fcontext(oldC, &newC->context_, p);
#endif
}
inline FContext makeContext(void* stackLimit, size_t stackSize,
void(*fn)(intptr_t)) {
FContext res;
res.stackLimit_ = stackLimit;
res.stackBase_ = static_cast<unsigned char*>(stackLimit) + stackSize;
#if BOOST_VERSION >= 105200
res.context_ = boost::context::make_fcontext(res.stackBase_, stackSize, fn);
#else
res.context_.fc_stack.limit = stackLimit;
res.context_.fc_stack.base = res.stackBase_;
make_fcontext(&res.context_, fn);
#endif
return res;
}
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Memory.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/io/async/EventBase.h>
namespace folly { namespace fibers {
class EventBaseLoopController::ControllerCallback :
public folly::EventBase::LoopCallback {
public:
explicit ControllerCallback(EventBaseLoopController& controller)
: controller_(controller) {}
void runLoopCallback() noexcept override {
controller_.runLoop();
}
private:
EventBaseLoopController& controller_;
};
inline EventBaseLoopController::EventBaseLoopController()
: callback_(folly::make_unique<ControllerCallback>(*this)) {
}
inline EventBaseLoopController::~EventBaseLoopController() {
callback_->cancelLoopCallback();
}
inline void EventBaseLoopController::attachEventBase(
folly::EventBase& eventBase) {
if (eventBase_ != nullptr) {
LOG(ERROR) << "Attempt to reattach EventBase to LoopController";
}
eventBase_ = &eventBase;
eventBaseAttached_ = true;
if (awaitingScheduling_) {
schedule();
}
}
inline void EventBaseLoopController::setFiberManager(FiberManager* fm) {
fm_ = fm;
}
inline void EventBaseLoopController::schedule() {
if (eventBase_ == nullptr) {
// In this case we need to postpone scheduling.
awaitingScheduling_ = true;
} else {
// Schedule it to run in current iteration.
eventBase_->runInLoop(callback_.get(), true);
awaitingScheduling_ = false;
}
}
inline void EventBaseLoopController::cancel() {
callback_->cancelLoopCallback();
}
inline void EventBaseLoopController::runLoop() {
fm_->loopUntilNoReady();
}
inline void EventBaseLoopController::scheduleThreadSafe() {
/* The only way we could end up here is if
1) Fiber thread creates a fiber that awaits (which means we must
have already attached, fiber thread wouldn't be running).
2) We move the promise to another thread (this move is a memory fence)
3) We fulfill the promise from the other thread. */
assert(eventBaseAttached_);
eventBase_->runInEventBaseThread([this] () { runLoop(); });
}
inline void EventBaseLoopController::timedSchedule(std::function<void()> func,
TimePoint time) {
assert(eventBaseAttached_);
// We want upper bound for the cast, thus we just add 1
auto delay_ms = std::chrono::duration_cast<
std::chrono::milliseconds>(time - Clock::now()).count() + 1;
// If clock is not monotonic
delay_ms = std::max(delay_ms, 0L);
eventBase_->tryRunAfterDelay(func, delay_ms);
}
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <folly/experimental/fibers/LoopController.h>
namespace folly {
class EventBase;
}
namespace folly { namespace fibers {
class FiberManager;
class EventBaseLoopController : public LoopController {
public:
explicit EventBaseLoopController();
~EventBaseLoopController();
/**
* Attach EventBase after LoopController was created.
*/
void attachEventBase(folly::EventBase& eventBase);
folly::EventBase* getEventBase() {
return eventBase_;
}
private:
class ControllerCallback;
bool awaitingScheduling_{false};
folly::EventBase* eventBase_{nullptr};
std::unique_ptr<ControllerCallback> callback_;
FiberManager* fm_{nullptr};
std::atomic<bool> eventBaseAttached_{false};
/* LoopController interface */
void setFiberManager(FiberManager* fm) override;
void schedule() override;
void cancel() override;
void runLoop();
void scheduleThreadSafe() override;
void timedSchedule(std::function<void()> func, TimePoint time) override;
friend class FiberManager;
};
}} // folly::fibers
#include "EventBaseLoopController-inl.h"
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cassert>
namespace folly { namespace fibers {
template <typename F>
void Fiber::setFunction(F&& func) {
assert(state_ == INVALID);
func_ = std::move(func);
state_ = NOT_STARTED;
}
template <typename F, typename G>
void Fiber::setFunctionFinally(F&& resultFunc,
G&& finallyFunc) {
assert(state_ == INVALID);
resultFunc_ = std::move(resultFunc);
finallyFunc_ = std::move(finallyFunc);
state_ = NOT_STARTED;
}
inline void* Fiber::getUserBuffer() {
return &userBuffer_;
}
template <typename G>
void Fiber::setReadyFunction(G&& func) {
assert(state_ == INVALID || state_ == NOT_STARTED);
readyFunc_ = std::move(func);
}
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Fiber.h"
#include <sys/syscall.h>
#include <unistd.h>
#include <algorithm>
#include <cassert>
#include <cstring>
#include <stdexcept>
#include <folly/Likely.h>
#include <folly/Portability.h>
#include <folly/experimental/fibers/BoostContextCompatibility.h>
#include <folly/experimental/fibers/FiberManager.h>
namespace folly { namespace fibers {
namespace {
static const uint64_t kMagic8Bytes = 0xfaceb00cfaceb00c;
pid_t localThreadId() {
static thread_local pid_t threadId = syscall(SYS_gettid);
return threadId;
}
static void fillMagic(const FContext& context) {
uint64_t* begin = static_cast<uint64_t*>(context.stackLimit());
uint64_t* end = static_cast<uint64_t*>(context.stackBase());
std::fill(begin, end, kMagic8Bytes);
}
/* Size of the region from p + nBytes down to the last non-magic value */
static size_t nonMagicInBytes(const FContext& context) {
uint64_t* begin = static_cast<uint64_t*>(context.stackLimit());
uint64_t* end = static_cast<uint64_t*>(context.stackBase());
auto firstNonMagic = std::find_if(
begin, end,
[](uint64_t val) {
return val != kMagic8Bytes;
}
);
return (end - firstNonMagic) * sizeof(uint64_t);
}
} // anonymous namespace
void Fiber::setData(intptr_t data) {
assert(state_ == AWAITING);
data_ = data;
state_ = READY_TO_RUN;
if (LIKELY(threadId_ == localThreadId())) {
fiberManager_.readyFibers_.push_back(*this);
fiberManager_.ensureLoopScheduled();
} else {
fiberManager_.remoteReadyInsert(this);
}
}
Fiber::Fiber(FiberManager& fiberManager) :
fiberManager_(fiberManager) {
auto size = fiberManager_.options_.stackSize;
auto limit = fiberManager_.stackAllocator_.allocate(size);
fcontext_ = makeContext(limit, size, &Fiber::fiberFuncHelper);
if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) {
fillMagic(fcontext_);
}
}
Fiber::~Fiber() {
fiberManager_.stackAllocator_.deallocate(
static_cast<unsigned char*>(fcontext_.stackLimit()),
fiberManager_.options_.stackSize);
}
void Fiber::recordStackPosition() {
int stackDummy;
fiberManager_.stackHighWatermark_ =
std::max(fiberManager_.stackHighWatermark_,
static_cast<size_t>(
static_cast<unsigned char*>(fcontext_.stackBase()) -
static_cast<unsigned char*>(
static_cast<void*>(&stackDummy))));
}
void Fiber::fiberFuncHelper(intptr_t fiber) {
reinterpret_cast<Fiber*>(fiber)->fiberFunc();
}
/*
* Some weird bug in ASAN causes fiberFunc to allocate boundless amounts of
* memory inside __asan_handle_no_return. Work around this in ASAN builds by
* tricking the compiler into thinking it may, someday, return.
*/
#ifdef FOLLY_SANITIZE_ADDRESS
volatile bool loopForever = true;
#else
static constexpr bool loopForever = true;
#endif
void Fiber::fiberFunc() {
while (loopForever) {
assert(state_ == NOT_STARTED);
threadId_ = localThreadId();
state_ = RUNNING;
try {
if (resultFunc_) {
assert(finallyFunc_);
assert(!func_);
resultFunc_();
} else {
assert(func_);
func_();
}
} catch (...) {
fiberManager_.exceptionCallback_(std::current_exception(),
"running Fiber func_/resultFunc_");
}
if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) {
fiberManager_.stackHighWatermark_ =
std::max(fiberManager_.stackHighWatermark_,
nonMagicInBytes(fcontext_));
}
state_ = INVALID;
fiberManager_.activeFiber_ = nullptr;
auto fiber = reinterpret_cast<Fiber*>(
jumpContext(&fcontext_, &fiberManager_.mainContext_, 0));
assert(fiber == this);
}
}
intptr_t Fiber::preempt(State state) {
assert(fiberManager_.activeFiber_ == this);
assert(state_ == RUNNING);
assert(state != RUNNING);
fiberManager_.activeFiber_ = nullptr;
state_ = state;
recordStackPosition();
auto ret = jumpContext(&fcontext_, &fiberManager_.mainContext_, 0);
assert(fiberManager_.activeFiber_ == this);
assert(state_ == READY_TO_RUN);
state_ = RUNNING;
return ret;
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <boost/context/all.hpp>
#include <boost/version.hpp>
#include <folly/AtomicLinkedList.h>
#include <folly/IntrusiveList.h>
#include <folly/experimental/fibers/BoostContextCompatibility.h>
namespace folly { namespace fibers {
class Baton;
class FiberManager;
/**
* @class Fiber
* @brief Fiber object used by FiberManager to execute tasks.
*
* Each Fiber object can be executing at most one task at a time. In active
* phase it is running the task function and keeps its context.
* Fiber is also used to pass data to blocked task and thus unblock it.
* Each Fiber may be associated with a single FiberManager.
*/
class Fiber {
public:
/**
* Sets data for the blocked task
*
* @param data this data will be returned by await() when task is resumed.
*/
void setData(intptr_t data);
Fiber(const Fiber&) = delete;
Fiber& operator=(const Fiber&) = delete;
~Fiber();
private:
enum State {
INVALID, /**< Does't have task function */
NOT_STARTED, /**< Has task function, not started */
READY_TO_RUN, /**< Was started, blocked, then unblocked */
RUNNING, /**< Is running right now */
AWAITING, /**< Is currently blocked */
AWAITING_IMMEDIATE, /**< Was preempted to run an immediate function,
and will be resumed right away */
};
State state_{INVALID}; /**< current Fiber state */
friend class Baton;
friend class FiberManager;
explicit Fiber(FiberManager& fiberManager);
template <typename F>
void setFunction(F&& func);
template <typename F, typename G>
void setFunctionFinally(F&& func, G&& finally);
template <typename G>
void setReadyFunction(G&& func);
static void fiberFuncHelper(intptr_t fiber);
void fiberFunc();
/**
* Switch out of fiber context into the main context,
* performing necessary housekeeping for the new state.
*
* @param state New state, must not be RUNNING.
*
* @return The value passed back from the main context.
*/
intptr_t preempt(State state);
/**
* Examines how much of the stack we used at this moment and
* registers with the FiberManager (for monitoring).
*/
void recordStackPosition();
FiberManager& fiberManager_; /**< Associated FiberManager */
FContext fcontext_; /**< current task execution context */
intptr_t data_; /**< Used to keep some data with the Fiber */
std::function<void()> func_; /**< task function */
std::function<void()> readyFunc_; /**< function to be executed before jumping
to this fiber */
/**
* Points to next fiber in remote ready list
*/
folly::AtomicLinkedListHook<Fiber> nextRemoteReady_;
static constexpr size_t kUserBufferSize = 256;
std::aligned_storage<kUserBufferSize>::type userBuffer_;
void* getUserBuffer();
std::function<void()> resultFunc_;
std::function<void()> finallyFunc_;
folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager
queues */
pid_t threadId_{0};
};
}}
#include <folly/experimental/fibers/Fiber-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cassert>
#include <folly/Memory.h>
#include <folly/Optional.h>
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
#include <folly/experimental/fibers/Baton.h>
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/Promise.h>
#include <folly/experimental/fibers/LoopController.h>
#include <folly/futures/Try.h>
namespace folly { namespace fibers {
inline void FiberManager::ensureLoopScheduled() {
if (isLoopScheduled_) {
return;
}
isLoopScheduled_ = true;
loopController_->schedule();
}
inline void FiberManager::runReadyFiber(Fiber* fiber) {
assert(fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN);
while (fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN) {
activeFiber_ = fiber;
if (fiber->readyFunc_) {
fiber->readyFunc_();
}
jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
try {
immediateFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running immediateFunc_");
}
immediateFunc_ = nullptr;
fiber->state_ = Fiber::READY_TO_RUN;
}
}
if (fiber->state_ == Fiber::AWAITING) {
awaitFunc_(*fiber);
awaitFunc_ = nullptr;
} else if (fiber->state_ == Fiber::INVALID) {
assert(fibersActive_ > 0);
--fibersActive_;
// Making sure that task functor is deleted once task is complete.
// NOTE: we must do it on main context, as the fiber is not
// running at this point.
fiber->func_ = nullptr;
fiber->resultFunc_ = nullptr;
if (fiber->finallyFunc_) {
try {
fiber->finallyFunc_();
} catch (...) {
exceptionCallback_(std::current_exception(), "running finallyFunc_");
}
fiber->finallyFunc_ = nullptr;
}
if (fibersPoolSize_ < options_.maxFibersPoolSize) {
fibersPool_.push_front(*fiber);
++fibersPoolSize_;
} else {
delete fiber;
assert(fibersAllocated_ > 0);
--fibersAllocated_;
}
}
}
inline bool FiberManager::loopUntilNoReady() {
SCOPE_EXIT {
isLoopScheduled_ = false;
currentFiberManager_ = nullptr;
};
currentFiberManager_ = this;
bool hadRemoteFiber = true;
while (hadRemoteFiber) {
hadRemoteFiber = false;
while (!readyFibers_.empty()) {
auto& fiber = readyFibers_.front();
readyFibers_.pop_front();
runReadyFiber(&fiber);
}
remoteReadyQueue_.sweep(
[this, &hadRemoteFiber] (Fiber* fiber) {
runReadyFiber(fiber);
hadRemoteFiber = true;
}
);
remoteTaskQueue_.sweep(
[this, &hadRemoteFiber] (RemoteTask* taskPtr) {
std::unique_ptr<RemoteTask> task(taskPtr);
auto fiber = getFiber();
fiber->setFunction(std::move(task->func));
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
runReadyFiber(fiber);
hadRemoteFiber = true;
}
);
}
return fibersActive_ > 0;
}
// We need this to be in a struct, not inlined in addTask, because clang crashes
// otherwise.
template <typename F>
struct FiberManager::AddTaskHelper {
class Func;
static constexpr bool allocateInBuffer =
sizeof(Func) <= Fiber::kUserBufferSize;
class Func {
public:
Func(F&& func, FiberManager& fm) :
func_(std::forward<F>(func)), fm_(fm) {}
void operator()() {
try {
func_();
} catch (...) {
fm_.exceptionCallback_(std::current_exception(),
"running Func functor");
}
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
FiberManager& fm_;
};
};
template <typename F>
void FiberManager::addTask(F&& func) {
typedef AddTaskHelper<F> Helper;
auto fiber = getFiber();
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
} else {
auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
fiber->setFunction(std::ref(*funcLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
ensureLoopScheduled();
}
template <typename F, typename G>
void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) {
auto fiber = getFiber();
fiber->setFunction(std::forward<F>(func));
fiber->setReadyFunction(std::forward<G>(readyFunc));
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
ensureLoopScheduled();
}
template <typename F>
void FiberManager::addTaskRemote(F&& func) {
auto task = folly::make_unique<RemoteTask>(std::move(func));
if (remoteTaskQueue_.insertHead(task.release())) {
loopController_->scheduleThreadSafe();
}
}
template <typename X>
struct IsRvalueRefTry { static const bool value = false; };
template <typename T>
struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
// We need this to be in a struct, not inlined in addTaskFinally, because clang
// crashes otherwise.
template <typename F, typename G>
struct FiberManager::AddTaskFinallyHelper {
class Func;
class Finally;
typedef typename std::result_of<F()>::type Result;
static constexpr bool allocateInBuffer =
sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
class Finally {
public:
Finally(G&& finally,
FiberManager& fm) :
finally_(std::move(finally)),
fm_(fm) {
}
void operator()() {
try {
finally_(std::move(*result_));
} catch (...) {
fm_.exceptionCallback_(std::current_exception(),
"running Finally functor");
}
if (allocateInBuffer) {
this->~Finally();
} else {
delete this;
}
}
private:
friend class Func;
G finally_;
folly::Optional<folly::Try<Result>> result_;
FiberManager& fm_;
};
class Func {
public:
Func(F&& func, Finally& finally) :
func_(std::move(func)), result_(finally.result_) {}
void operator()() {
result_ = folly::makeTryFunction(std::move(func_));
if (allocateInBuffer) {
this->~Func();
} else {
delete this;
}
}
private:
F func_;
folly::Optional<folly::Try<Result>>& result_;
};
};
template <typename F, typename G>
void FiberManager::addTaskFinally(F&& func, G&& finally) {
typedef typename std::result_of<F()>::type Result;
static_assert(
IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
"finally(arg): arg must be Try<T>&&");
static_assert(
std::is_convertible<
Result,
typename std::remove_reference<
typename FirstArgOf<G>::type
>::type::element_type
>::value,
"finally(Try<T>&&): T must be convertible from func()'s return type");
auto fiber = getFiber();
typedef AddTaskFinallyHelper<F,G> Helper;
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(
fiber->getUserBuffer());
auto finallyLoc = static_cast<typename Helper::Finally*>(
static_cast<void*>(funcLoc + 1));
new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
} else {
auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
}
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
readyFibers_.push_back(*fiber);
ensureLoopScheduled();
}
template <typename F>
typename std::result_of<F()>::type
FiberManager::runInMainContext(F&& func) {
return runInMainContextHelper(std::forward<F>(func));
}
template <typename F>
inline typename std::enable_if<
!std::is_same<typename std::result_of<F()>::type, void>::value,
typename std::result_of<F()>::type>::type
FiberManager::runInMainContextHelper(F&& func) {
if (UNLIKELY(activeFiber_ == nullptr)) {
return func();
}
typedef typename std::result_of<F()>::type Result;
folly::Try<Result> result;
auto f = [&func, &result]() mutable {
result = folly::makeTryFunction(std::forward<F>(func));
};
immediateFunc_ = std::ref(f);
activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
return std::move(result.value());
}
template <typename F>
inline typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value,
void>::type
FiberManager::runInMainContextHelper(F&& func) {
if (UNLIKELY(activeFiber_ == nullptr)) {
func();
return;
}
immediateFunc_ = std::ref(func);
activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
}
inline FiberManager& FiberManager::getFiberManager() {
assert(currentFiberManager_ != nullptr);
return *currentFiberManager_;
}
inline FiberManager* FiberManager::getFiberManagerUnsafe() {
return currentFiberManager_;
}
inline bool FiberManager::hasActiveFiber() {
return activeFiber_ != nullptr;
}
template <typename F>
typename FirstArgOf<F>::type::value_type
inline await(F&& func) {
typedef typename FirstArgOf<F>::type::value_type Result;
folly::Try<Result> result;
Baton baton;
baton.wait([&func, &result, &baton]() mutable {
func(Promise<Result>(result, baton));
});
return folly::moveFromTry(std::move(result));
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FiberManager.h"
#include <sys/syscall.h>
#include <unistd.h>
#include <cassert>
#include <stdexcept>
#include <glog/logging.h>
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/LoopController.h>
namespace folly { namespace fibers {
__thread FiberManager* FiberManager::currentFiberManager_ = nullptr;
FiberManager::FiberManager(std::unique_ptr<LoopController> loopController,
Options options) :
loopController_(std::move(loopController)),
options_(options),
exceptionCallback_([](std::exception_ptr e, std::string context) {
try {
std::rethrow_exception(e);
} catch (const std::exception& e) {
LOG(DFATAL) << "Exception " << typeid(e).name()
<< " with message '" << e.what() << "' was thrown in "
<< "FiberManager with context '" << context << "'";
throw;
} catch (...) {
LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
<< "context '" << context << "'";
throw;
}
}),
timeoutManager_(std::make_shared<TimeoutController>(*loopController_)) {
loopController_->setFiberManager(this);
}
FiberManager::~FiberManager() {
if (isLoopScheduled_) {
loopController_->cancel();
}
Fiber* fiberIt;
Fiber* fiberItNext;
while (!fibersPool_.empty()) {
fibersPool_.pop_front_and_dispose([] (Fiber* fiber) {
delete fiber;
});
}
assert(readyFibers_.empty());
assert(fibersActive_ == 0);
}
LoopController& FiberManager::loopController() {
return *loopController_;
}
const LoopController& FiberManager::loopController() const {
return *loopController_;
}
bool FiberManager::hasTasks() const {
return fibersActive_ > 0 ||
!remoteReadyQueue_.empty() ||
!remoteTaskQueue_.empty();
}
Fiber* FiberManager::getFiber() {
Fiber* fiber = nullptr;
if (fibersPool_.empty()) {
fiber = new Fiber(*this);
++fibersAllocated_;
} else {
fiber = &fibersPool_.front();
fibersPool_.pop_front();
assert(fibersPoolSize_ > 0);
--fibersPoolSize_;
}
++fibersActive_;
assert(fiber);
return fiber;
}
void FiberManager::setExceptionCallback(FiberManager::ExceptionCallback ec) {
assert(ec);
exceptionCallback_ = std::move(ec);
}
size_t FiberManager::fibersAllocated() const {
return fibersAllocated_;
}
size_t FiberManager::fibersPoolSize() const {
return fibersPoolSize_;
}
size_t FiberManager::stackHighWatermark() const {
return stackHighWatermark_;
}
void FiberManager::remoteReadyInsert(Fiber* fiber) {
if (remoteReadyQueue_.insertHead(fiber)) {
loopController_->scheduleThreadSafe();
}
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <memory>
#include <queue>
#include <unordered_set>
#include <vector>
#include <folly/AtomicLinkedList.h>
#include <folly/Likely.h>
#include <folly/IntrusiveList.h>
#include <folly/futures/Try.h>
#include <folly/experimental/fibers/BoostContextCompatibility.h>
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/traits.h>
#ifdef USE_GUARD_ALLOCATOR
#include <folly/experimental/fibers/GuardPageAllocator.h>
#endif
namespace folly { namespace fibers {
class Baton;
class Fiber;
class LoopController;
class TimeoutController;
/**
* @class FiberManager
* @brief Single-threaded task execution engine.
*
* FiberManager allows semi-parallel task execution on the same thread. Each
* task can notify FiberManager that it is blocked on something (via await())
* call. This will pause execution of this task and it will be resumed only
* when it is unblocked (via setData()).
*/
class FiberManager {
public:
struct Options {
#ifdef FOLLY_SANITIZE_ADDRESS
/* ASAN needs a lot of extra stack space.
16x is a conservative estimate, 8x also worked with tests
where it mattered. Note that overallocating here does not necessarily
increase RSS, since unused memory is pretty much free. */
static constexpr size_t kDefaultStackSize{16 * 16 * 1024};
#else
static constexpr size_t kDefaultStackSize{16 * 1024};
#endif
/**
* Maximum stack size for fibers which will be used for executing all the
* tasks.
*/
size_t stackSize{kDefaultStackSize};
/**
* Record exact amount of stack used.
*
* This is fairly expensive: we fill each newly allocated stack
* with some known value and find the boundary of unused stack
* with linear search every time we surrender the stack back to fibersPool.
*/
bool debugRecordStackUsed{false};
/**
* Keep at most this many free fibers in the pool.
* This way the total number of fibers in the system is always bounded
* by the number of active fibers + maxFibersPoolSize.
*/
size_t maxFibersPoolSize{1000};
constexpr Options() {}
};
typedef std::function<void(std::exception_ptr, std::string)>
ExceptionCallback;
/**
* Initializes, but doesn't start FiberManager loop
*
* @param options FiberManager options
*/
explicit FiberManager(std::unique_ptr<LoopController> loopController,
Options options = Options());
~FiberManager();
/**
* Controller access.
*/
LoopController& loopController();
const LoopController& loopController() const;
/**
* Keeps running ready tasks until the list of ready tasks is empty.
*
* @return True if there are any waiting tasks remaining.
*/
bool loopUntilNoReady();
/**
* @return true if there are outstanding tasks.
*/
bool hasTasks() const;
/**
* Sets exception callback which will be called if any of the tasks throws an
* exception.
*
* @param ec
*/
void setExceptionCallback(ExceptionCallback ec);
/**
* Add a new task to be executed. Must be called from FiberManager's thread.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTask(F&& func);
/**
* Add a new task to be executed, along with a function readyFunc_ which needs
* to be executed just before jumping to the ready fiber
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param readyFunc functor that needs to be executed just before jumping to
* ready fiber on the main context. This can for example be
* used to set up state before starting or resuming a fiber.
*/
template <typename F, typename G>
void addTaskReadyFunc(F&& func, G&& readyFunc);
/**
* Add a new task to be executed. Safe to call from other threads.
*
* @param func Task function; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
void addTaskRemote(F&& func);
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
void addTaskFinally(F&& func, G&& finally);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type
runInMainContext(F&& func);
/**
* @return How many fiber objects (and stacks) has this manager allocated.
*/
size_t fibersAllocated() const;
/**
* @return How many of the allocated fiber objects are currently
* in the free pool.
*/
size_t fibersPoolSize() const;
/**
* return true if running activeFiber_ is not nullptr.
*/
bool hasActiveFiber();
/**
* @return What was the most observed fiber stack usage (in bytes).
*/
size_t stackHighWatermark() const;
static FiberManager& getFiberManager();
static FiberManager* getFiberManagerUnsafe();
private:
friend class Baton;
friend class Fiber;
template <typename F>
struct AddTaskHelper;
template <typename F, typename G>
struct AddTaskFinallyHelper;
struct RemoteTask {
template <typename F>
explicit RemoteTask(F&& f) : func(std::move(f)) {}
std::function<void()> func;
folly::AtomicLinkedListHook<RemoteTask> nextRemoteTask;
};
typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
size_t fibersAllocated_{0}; /**< total number of fibers allocated */
size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
size_t fibersActive_{0}; /**< number of running or blocked fibers */
FContext::ContextStruct mainContext_; /**< stores loop function context */
std::unique_ptr<LoopController> loopController_;
bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
/**
* When we are inside FiberManager loop this points to FiberManager. Otherwise
* it's nullptr
*/
static __thread FiberManager* currentFiberManager_;
/**
* runInMainContext implementation for non-void functions.
*/
template <typename F>
typename std::enable_if<
!std::is_same<typename std::result_of<F()>::type, void>::value,
typename std::result_of<F()>::type>::type
runInMainContextHelper(F&& func);
/**
* runInMainContext implementation for void functions
*/
template <typename F>
typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value,
void>::type
runInMainContextHelper(F&& func);
/**
* Allocator used to allocate stack for Fibers in the pool.
* Allocates stack on the stack of the main context.
*/
#ifdef USE_GUARD_ALLOCATOR
/* This is too slow for production use; can be fixed
if we allocated all stack storage once upfront */
GuardPageAllocator stackAllocator_;
#else
std::allocator<unsigned char> stackAllocator_;
#endif
const Options options_; /**< FiberManager options */
/**
* Largest observed individual Fiber stack usage in bytes.
*/
size_t stackHighWatermark_{0};
/**
* Schedules a loop with loopController (unless already scheduled before).
*/
void ensureLoopScheduled();
/**
* @return An initialized Fiber object from the pool
*/
Fiber* getFiber();
/**
* Function passed to the await call.
*/
std::function<void(Fiber&)> awaitFunc_;
/**
* Function passed to the runInMainContext call.
*/
std::function<void()> immediateFunc_;
ExceptionCallback exceptionCallback_; /**< task exception callback */
folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
remoteTaskQueue_;
std::shared_ptr<TimeoutController> timeoutManager_;
void runReadyFiber(Fiber* fiber);
void remoteReadyInsert(Fiber* fiber);
};
/**
* @return true iff we are running in a fiber's context
*/
inline bool onFiber() {
auto fm = FiberManager::getFiberManagerUnsafe();
return fm ? fm->hasActiveFiber() : false;
}
/**
* Add a new task to be executed.
*
* @param func Task functor; must have a signature of `void func()`.
* The object will be destroyed once task execution is complete.
*/
template <typename F>
inline void addTask(F&& func) {
return FiberManager::getFiberManager().addTask(std::forward<F>(func));
}
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
* Task functor is run and destroyed on the fiber context.
* Finally functor is run and destroyed on the main context.
*
* @param func Task functor; must have a signature of `T func()` for some T.
* @param finally Finally functor; must have a signature of
* `void finally(Try<T>&&)` and will be passed
* the result of func() (including the exception if occurred).
*/
template <typename F, typename G>
inline void addTaskFinally(F&& func, G&& finally) {
return FiberManager::getFiberManager().addTaskFinally(
std::forward<F>(func), std::forward<G>(finally));
}
/**
* Blocks task execution until given promise is fulfilled.
*
* Calls function passing in a Promise<T>, which has to be fulfilled.
*
* @return data which was used to fulfill the promise.
*/
template <typename F>
typename FirstArgOf<F>::type::value_type
inline await(F&& func);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* and runs func(), going back to the Fiber's context after completion.
* Outside a fiber, just calls func() directly.
*
* @return value returned by func().
*/
template <typename F>
typename std::result_of<F()>::type
inline runInMainContext(F&& func) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (UNLIKELY(fm == nullptr)) {
return func();
}
return fm->runInMainContext(std::forward<F>(func));
}
}}
#include "FiberManager-inl.h"
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/fibers/FiberManagerMap.h>
#include <memory>
#include <unordered_map>
namespace folly { namespace fibers {
namespace detail {
thread_local std::unordered_map<folly::EventBase*, FiberManager*>
localFiberManagerMap;
std::unordered_map<folly::EventBase*, std::unique_ptr<FiberManager>>
fiberManagerMap;
std::mutex fiberManagerMapMutex;
FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb,
const FiberManager::Options& opts) {
std::lock_guard<std::mutex> lg(fiberManagerMapMutex);
auto it = fiberManagerMap.find(&evb);
if (LIKELY(it != fiberManagerMap.end())) {
return it->second.get();
}
auto loopController = folly::make_unique<EventBaseLoopController>();
loopController->attachEventBase(evb);
auto fiberManager =
folly::make_unique<FiberManager>(std::move(loopController), opts);
auto result = fiberManagerMap.emplace(&evb, std::move(fiberManager));
return result.first->second.get();
}
} // detail namespace
FiberManager& getFiberManager(folly::EventBase& evb,
const FiberManager::Options& opts) {
auto it = detail::localFiberManagerMap.find(&evb);
if (LIKELY(it != detail::localFiberManagerMap.end())) {
return *(it->second);
}
return *(detail::localFiberManagerMap[&evb] =
detail::getFiberManagerThreadSafe(evb, opts));
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
namespace folly { namespace fibers {
FiberManager& getFiberManager(
folly::EventBase& evb,
const FiberManager::Options& opts = FiberManager::Options());
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/fibers/FiberManager.h>
namespace folly { namespace fibers {
namespace {
template <class F, class G>
typename std::enable_if<
!std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
inline callFuncs(F&& f, G&& g, size_t id) {
g(id, f());
}
template <class F, class G>
typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
inline callFuncs(F&& f, G&& g, size_t id) {
f();
g(id);
}
} // anonymous namespace
template <class InputIterator, class F>
inline void forEach(InputIterator first, InputIterator last, F&& f) {
if (first == last) {
return;
}
typedef typename std::iterator_traits<InputIterator>::value_type FuncType;
size_t tasksTodo = 1;
std::exception_ptr e;
Baton baton;
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
auto taskFunc =
[&tasksTodo, &e, &f, &baton] (size_t id, FuncType&& func) {
return [id, &tasksTodo, &e, &f, &baton,
func_ = std::forward<FuncType>(func)]() mutable {
try {
callFuncs(std::forward<FuncType>(func_), f, id);
} catch (...) {
e = std::current_exception();
}
if (--tasksTodo == 0) {
baton.post();
}
};
};
#ifdef __clang__
#pragma clang diagnostic pop
#endif
auto firstTask = first;
++first;
for (size_t i = 1; first != last; ++i, ++first, ++tasksTodo) {
addTask(taskFunc(i, std::move(*first)));
}
taskFunc(0, std::move(*firstTask))();
baton.wait();
if (e != std::exception_ptr()) {
std::rethrow_exception(e);
}
}
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace fibers {
/**
* Schedules several tasks and blocks until all of them are completed.
* In the process of their successfull completion given callback would be called
* for each of them with the index of the task and the result it returned (if
* not void).
* If any of these n tasks throws an exception, this exception will be
* re-thrown, but only when all tasks are complete. If several tasks throw
* exceptions one of them will be re-thrown. Callback won't be called for
* tasks that throw exception.
*
* @param first Range of tasks to be scheduled
* @param last
* @param F callback to call for each result.
* In case of each task returning void it should be callable
* F(size_t id)
* otherwise should be callable
* F(size_t id, Result)
*/
template <class InputIterator, class F>
inline void forEach(InputIterator first, InputIterator last, F&& f);
}} // folly::fibers
#include <folly/experimental/fibers/ForEach-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Baton.h>
#include <folly/experimental/fibers/Baton.h>
namespace folly { namespace fibers {
typedef Baton GenericBaton;
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <sys/mman.h>
#include <unistd.h>
#include <glog/logging.h>
namespace folly { namespace fibers {
namespace {
size_t pagesize() {
static const size_t pagesize = sysconf(_SC_PAGESIZE);
return pagesize;
}
/* Returns a multiple of pagesize() enough to store size + one guard page */
size_t allocSize(size_t size) {
return pagesize() * ((size + pagesize() - 1)/pagesize() + 1);
}
}
unsigned char* GuardPageAllocator::allocate(size_t size) {
/* We allocate minimum number of pages required, plus a guard page.
Since we use this for stack storage, requested allocation is aligned
at the top of the allocated pages, while the guard page is at the bottom.
-- increasing addresses -->
Guard page Normal pages
|xxxxxxxxxx|..........|..........|
<- size -------->
return value -^
*/
void* p = nullptr;
PCHECK(!::posix_memalign(&p, pagesize(), allocSize(size)));
/* Try to protect first page
(stack grows downwards from last allocated address), ignore errors */
::mprotect(p, pagesize(), PROT_NONE);
/* Return pointer to top 'size' bytes in allocated storage */
auto up = reinterpret_cast<unsigned char*>(p) + allocSize(size) - size;
assert(up >= reinterpret_cast<unsigned char*>(p) + pagesize());
return up;
}
void GuardPageAllocator::deallocate(unsigned char* up, size_t size) {
/* Get allocation base */
auto p = up + size - allocSize(size);
/* Try to unprotect the page for memory allocator to re-use,
ignore errors (in cases we failed to protect in the first place */
::mprotect(p, pagesize(), PROT_READ|PROT_WRITE);
::free(p);
}
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace fibers {
/**
* Stack allocator that protects an extra memory page after
* the end of the stack.
*/
class GuardPageAllocator {
public:
inline unsigned char* allocate(size_t size);
inline void deallocate(unsigned char* up, size_t size);
};
}} // folly::fibers
#include "GuardPageAllocator-inl.h"
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <functional>
namespace folly { namespace fibers {
class FiberManager;
class LoopController {
public:
typedef std::chrono::steady_clock Clock;
typedef std::chrono::time_point<Clock> TimePoint;
virtual ~LoopController() {}
/**
* Called by FiberManager to associate itself with the LoopController.
*/
virtual void setFiberManager(FiberManager*) = 0;
/**
* Called by FiberManager to schedule the loop function run
* at some point in the future.
*/
virtual void schedule() = 0;
/**
* Same as schedule(), but safe to call from any thread.
*/
virtual void scheduleThreadSafe() = 0;
/**
* Called by FiberManager to cancel a previously scheduled
* loop function run.
*/
virtual void cancel() = 0;
/**
* Called by FiberManager to schedule some function to be run at some time.
*/
virtual void timedSchedule(std::function<void()> func, TimePoint time) = 0;
};
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/fibers/Baton.h>
namespace folly { namespace fibers {
template <class T>
Promise<T>::Promise(folly::Try<T>& value, Baton& baton) :
value_(&value), baton_(&baton)
{}
template <class T>
Promise<T>::Promise(Promise&& other) noexcept :
value_(other.value_), baton_(other.baton_) {
other.value_ = nullptr;
other.baton_ = nullptr;
}
template <class T>
Promise<T>& Promise<T>::operator=(Promise&& other) {
std::swap(value_, other.value_);
std::swap(baton_, other.baton_);
return *this;
}
template <class T>
void Promise<T>::throwIfFulfilled() const {
if (!value_) {
throw std::logic_error("promise already fulfilled");
}
}
template <class T>
Promise<T>::~Promise() {
if (value_) {
setException(folly::make_exception_wrapper<std::logic_error>(
"promise not fulfilled"));
}
}
template <class T>
void Promise<T>::setException(folly::exception_wrapper e) {
fulfilTry(folly::Try<T>(e));
}
template <class T>
void Promise<T>::fulfilTry(folly::Try<T>&& t) {
throwIfFulfilled();
*value_ = std::move(t);
baton_->post();
value_ = nullptr;
baton_ = nullptr;
}
template <class T>
template <class M>
void Promise<T>::setValue(M&& v) {
static_assert(!std::is_same<T, void>::value,
"Use setValue() instead");
fulfilTry(folly::Try<T>(std::forward<M>(v)));
}
template <class T>
void Promise<T>::setValue() {
static_assert(std::is_same<T, void>::value,
"Use setValue(value) instead");
fulfilTry(folly::Try<void>());
}
template <class T>
template <class F>
void Promise<T>::fulfil(F&& func) {
fulfilTry(makeTryFunction(std::forward<F>(func)));
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/fibers/traits.h>
#include <folly/futures/Try.h>
namespace folly { namespace fibers {
class Baton;
template <typename F>
typename FirstArgOf<F>::type::value_type
inline await(F&& func);
template <typename T>
class Promise {
public:
typedef T value_type;
~Promise();
// not copyable
Promise(const Promise&) = delete;
Promise& operator=(const Promise&) = delete;
// movable
Promise(Promise&&) noexcept;
Promise& operator=(Promise&&);
/** Fulfil this promise (only for Promise<void>) */
void setValue();
/** Set the value (use perfect forwarding for both move and copy) */
template <class M>
void setValue(M&& value);
/**
* Fulfill the promise with a given try
*
* @param t
*/
void fulfilTry(folly::Try<T>&& t);
/** Fulfil this promise with the result of a function that takes no
arguments and returns something implicitly convertible to T.
Captures exceptions. e.g.
p.fulfil([] { do something that may throw; return a T; });
*/
template <class F>
void fulfil(F&& func);
/** Fulfil the Promise with an exception_wrapper, e.g.
auto ew = folly::try_and_catch<std::exception>([]{ ... });
if (ew) {
p.setException(std::move(ew));
}
*/
void setException(folly::exception_wrapper);
private:
template <typename F>
friend typename FirstArgOf<F>::type::value_type await(F&&);
Promise(folly::Try<T>& value, Baton& baton);
folly::Try<T>* value_;
Baton* baton_;
void throwIfFulfilled() const;
template <class F>
typename std::enable_if<
std::is_convertible<typename std::result_of<F()>::type, T>::value &&
!std::is_same<T, void>::value>::type
fulfilHelper(F&& func);
template <class F>
typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value &&
std::is_same<T, void>::value>::type
fulfilHelper(F&& func);
};
}}
#include <folly/experimental/fibers/Promise-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Likely.h>
#include <folly/experimental/fibers/LoopController.h>
namespace folly { namespace fibers {
class FiberManager;
class SimpleLoopController : public LoopController {
public:
SimpleLoopController()
: fm_(nullptr),
stopRequested_(false) {
}
/**
* Run FiberManager loop; if no ready task are present,
* run provided function. Stops after both stop() has been called
* and no waiting tasks remain.
*/
template <typename F>
void loop(F&& func) {
bool waiting = false;
stopRequested_ = false;
while (LIKELY(waiting || !stopRequested_)) {
func();
auto time = Clock::now();
for (size_t i=0; i<scheduledFuncs_.size(); ++i) {
if (scheduledFuncs_[i].first <= time) {
scheduledFuncs_[i].second();
swap(scheduledFuncs_[i], scheduledFuncs_.back());
scheduledFuncs_.pop_back();
--i;
}
}
if (scheduled_) {
scheduled_ = false;
waiting = fm_->loopUntilNoReady();
}
}
}
/**
* Requests exit from loop() as soon as all waiting tasks complete.
*/
void stop() {
stopRequested_ = true;
}
int remoteScheduleCalled() const {
return remoteScheduleCalled_;
}
void schedule() override {
scheduled_ = true;
}
void timedSchedule(std::function<void()> func, TimePoint time) override {
scheduledFuncs_.push_back({time, std::move(func)});
}
private:
FiberManager* fm_;
std::atomic<bool> scheduled_{false};
bool stopRequested_;
std::atomic<int> remoteScheduleCalled_{0};
std::vector<std::pair<TimePoint, std::function<void()>>> scheduledFuncs_;
/* LoopController interface */
void setFiberManager(FiberManager* fm) override {
fm_ = fm;
}
void cancel() override {
scheduled_ = false;
}
void scheduleThreadSafe() override {
++remoteScheduleCalled_;
scheduled_ = true;
}
friend class FiberManager;
};
}} // folly::fibers
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TimeoutController.h"
#include <folly/Memory.h>
namespace folly { namespace fibers {
TimeoutController::TimeoutController(LoopController& loopController) :
nextTimeout_(TimePoint::max()),
loopController_(loopController) {}
intptr_t TimeoutController::registerTimeout(std::function<void()> f,
Duration duration) {
auto& list = [&]() -> TimeoutHandleList& {
for (auto& bucket : timeoutHandleBuckets_) {
if (bucket.first == duration) {
return *bucket.second;
}
}
timeoutHandleBuckets_.emplace_back(duration,
folly::make_unique<TimeoutHandleList>());
return *timeoutHandleBuckets_.back().second;
}();
auto timeout = Clock::now() + duration;
list.emplace(std::move(f), timeout, list);
if (timeout < nextTimeout_) {
nextTimeout_ = timeout;
scheduleRun();
}
return reinterpret_cast<intptr_t>(&list.back());
}
void TimeoutController::runTimeouts(TimePoint time) {
auto now = Clock::now();
// Make sure we don't skip some events if function was run before actual time.
if (time < now) {
time = now;
}
if (nextTimeout_ > time) {
return;
}
nextTimeout_ = TimePoint::max();
for (auto& bucket : timeoutHandleBuckets_) {
auto& list = *bucket.second;
while (!list.empty()) {
if (!list.front().canceled) {
if (list.front().timeout > time) {
nextTimeout_ = std::min(nextTimeout_, list.front().timeout);
break;
}
list.front().func();
}
list.pop();
}
}
if (nextTimeout_ != TimePoint::max()) {
scheduleRun();
}
}
void TimeoutController::scheduleRun() {
auto time = nextTimeout_;
std::weak_ptr<TimeoutController> timeoutControllerWeak = shared_from_this();
loopController_.timedSchedule([timeoutControllerWeak, time]() {
if (auto timeoutController = timeoutControllerWeak.lock()) {
timeoutController->runTimeouts(time);
}
}, time);
}
void TimeoutController::cancel(intptr_t p) {
auto handle = reinterpret_cast<TimeoutHandle*>(p);
handle->canceled = true;
auto& list = handle->list;
while (!list.empty() && list.front().canceled) {
list.pop();
}
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <functional>
#include <memory>
#include <queue>
#include <boost/intrusive/list.hpp>
#include <folly/Likely.h>
#include <folly/experimental/fibers/LoopController.h>
namespace folly { namespace fibers {
class TimeoutController :
public std::enable_shared_from_this<TimeoutController> {
public:
typedef std::chrono::steady_clock Clock;
typedef std::chrono::time_point<Clock> TimePoint;
typedef Clock::duration Duration;
explicit TimeoutController(LoopController& loopController);
intptr_t registerTimeout(std::function<void()> f, Duration duration);
void cancel(intptr_t id);
void runTimeouts(TimePoint time);
private:
void scheduleRun();
class TimeoutHandle;
typedef std::queue<TimeoutHandle> TimeoutHandleList;
typedef std::unique_ptr<TimeoutHandleList> TimeoutHandleListPtr;
struct TimeoutHandle {
TimeoutHandle(std::function<void()> func_,
TimePoint timeout_,
TimeoutHandleList& list_) :
func(std::move(func_)), timeout(timeout_), list(list_) {}
std::function<void()> func;
bool canceled{false};
TimePoint timeout;
TimeoutHandleList& list;
};
std::vector<std::pair<Duration, TimeoutHandleListPtr>> timeoutHandleBuckets_;
TimePoint nextTimeout_;
LoopController& loopController_;
};
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Optional.h>
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/ForEach.h>
namespace folly { namespace fibers {
template <class InputIterator>
typename std::vector<
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value,
typename std::pair<
size_t,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
>::type
>
whenN(InputIterator first, InputIterator last, size_t n) {
typedef typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type Result;
assert(n > 0);
assert(n <= std::distance(first, last));
struct Context {
std::vector<std::pair<size_t, Result>> results;
size_t tasksTodo;
std::exception_ptr e;
folly::Optional<Promise<void>> promise;
Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) {
this->results.reserve(tasksTodo_);
}
};
auto context = std::make_shared<Context>(n);
await(
[first, last, context](Promise<void> promise) mutable {
context->promise = std::move(promise);
for (size_t i = 0; first != last; ++i, ++first) {
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
addTask(
[i, context, f = std::move(*first)]() {
try {
auto result = f();
if (context->tasksTodo == 0) {
return;
}
context->results.emplace_back(i, std::move(result));
} catch (...) {
if (context->tasksTodo == 0) {
return;
}
context->e = std::current_exception();
}
if (--context->tasksTodo == 0) {
context->promise->setValue();
}
});
#ifdef __clang__
#pragma clang diagnostic pop
#endif
}
});
if (context->e != std::exception_ptr()) {
std::rethrow_exception(context->e);
}
return std::move(context->results);
}
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, std::vector<size_t>>::type
whenN(InputIterator first, InputIterator last, size_t n) {
assert(n > 0);
assert(n <= std::distance(first, last));
struct Context {
std::vector<size_t> taskIndices;
std::exception_ptr e;
size_t tasksTodo;
folly::Optional<Promise<void>> promise;
Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) {
this->taskIndices.reserve(tasksTodo_);
}
};
auto context = std::make_shared<Context>(n);
await(
[first, last, context](Promise<void> promise) mutable {
context->promise = std::move(promise);
for (size_t i = 0; first != last; ++i, ++first) {
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
addTask(
[i, context, f = std::move(*first)]() {
try {
f();
if (context->tasksTodo == 0) {
return;
}
context->taskIndices.push_back(i);
} catch (...) {
if (context->tasksTodo == 0) {
return;
}
context->e = std::current_exception();
}
if (--context->tasksTodo == 0) {
context->promise->setValue();
}
});
#ifdef __clang__
#pragma clang diagnostic pop
#endif
}
});
if (context->e != std::exception_ptr()) {
std::rethrow_exception(context->e);
}
return context->taskIndices;
}
template <class InputIterator>
typename std::vector<
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>::type>
inline whenAll(InputIterator first, InputIterator last) {
typedef typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type Result;
size_t n = std::distance(first, last);
std::vector<Result> results;
std::vector<size_t> order(n);
results.reserve(n);
forEach(first, last,
[&results, &order] (size_t id, Result result) {
order[id] = results.size();
results.emplace_back(std::move(result));
});
assert(results.size() == n);
std::vector<Result> orderedResults;
orderedResults.reserve(n);
for (size_t i = 0; i < n; ++i) {
orderedResults.emplace_back(std::move(results[order[i]]));
}
return orderedResults;
}
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, void>::type
inline whenAll(InputIterator first, InputIterator last) {
forEach(first, last, [] (size_t id) {});
}
template <class InputIterator>
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value,
typename std::pair<
size_t,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
>::type
inline whenAny(InputIterator first, InputIterator last) {
auto result = whenN(first, last, 1);
assert(result.size() == 1);
return std::move(result[0]);
}
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, size_t>::type
inline whenAny(InputIterator first, InputIterator last) {
auto result = whenN(first, last, 1);
assert(result.size() == 1);
return std::move(result[0]);
}
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace fibers {
/**
* Schedules several tasks and blocks until n of these tasks are completed.
* If any of these n tasks throws an exception, this exception will be
* re-thrown, but only when n tasks are complete. If several tasks throw
* exceptions one of them will be re-thrown.
*
* @param first Range of tasks to be scheduled
* @param last
* @param n Number of tasks to wait for
*
* @return vector of pairs (task index, return value of task)
*/
template <class InputIterator>
typename std::vector<
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type,
void>::value,
typename std::pair<
size_t,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
>::type
>
inline whenN(InputIterator first, InputIterator last, size_t n);
/**
* whenN specialization for functions returning void
*
* @param first Range of tasks to be scheduled
* @param last
* @param n Number of tasks to wait for
*
* @return vector of completed task indices
*/
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, std::vector<size_t>>::type
inline whenN(InputIterator first, InputIterator last, size_t n);
/**
* Schedules several tasks and blocks until all of these tasks are completed.
* If any of the tasks throws an exception, this exception will be re-thrown,
* but only when all the tasks are complete. If several tasks throw exceptions
* one of them will be re-thrown.
*
* @param first Range of tasks to be scheduled
* @param last
*
* @return vector of values returned by tasks
*/
template <class InputIterator>
typename std::vector<
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type,
void>::value,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>::type
>
inline whenAll(InputIterator first, InputIterator last);
/**
* whenAll specialization for functions returning void
*
* @param first Range of tasks to be scheduled
* @param last
*/
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, void>::type
inline whenAll(InputIterator first, InputIterator last);
/**
* Schedules several tasks and blocks until one of them is completed.
* If this task throws an exception, this exception will be re-thrown.
* Exceptions thrown by all other tasks will be ignored.
*
* @param first Range of tasks to be scheduled
* @param last
*
* @return pair of index of the first completed task and its return value
*/
template <class InputIterator>
typename std::enable_if<
!std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value,
typename std::pair<
size_t,
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type>
>::type
inline whenAny(InputIterator first, InputIterator last);
/**
* WhenAny specialization for functions returning void.
*
* @param first Range of tasks to be scheduled
* @param last
*
* @return index of the first completed task
*/
template <class InputIterator>
typename std::enable_if<
std::is_same<
typename std::result_of<
typename std::iterator_traits<InputIterator>::value_type()>::type, void
>::value, size_t>::type
inline whenAny(InputIterator first, InputIterator last);
}}
#include <folly/experimental/fibers/WhenN-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include <folly/Benchmark.h>
#include <folly/Memory.h>
#include <folly/experimental/fibers/AddTasks.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/GenericBaton.h>
#include <folly/experimental/fibers/SimpleLoopController.h>
#include <folly/experimental/fibers/WhenN.h>
using namespace folly::fibers;
using folly::Try;
TEST(FiberManager, batonTimedWaitTimeout) {
bool taskAdded = false;
size_t iterations = 0;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
Baton baton;
auto res = baton.timed_wait(std::chrono::milliseconds(230));
EXPECT_FALSE(res);
EXPECT_EQ(5, iterations);
loopController.stop();
}
);
manager.addTask(
[&]() {
Baton baton;
auto res = baton.timed_wait(std::chrono::milliseconds(130));
EXPECT_FALSE(res);
EXPECT_EQ(3, iterations);
loopController.stop();
}
);
taskAdded = true;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
iterations ++;
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, batonTimedWaitPost) {
bool taskAdded = false;
size_t iterations = 0;
Baton* baton_ptr;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
Baton baton;
baton_ptr = &baton;
auto res = baton.timed_wait(std::chrono::milliseconds(130));
EXPECT_TRUE(res);
EXPECT_EQ(2, iterations);
loopController.stop();
}
);
taskAdded = true;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
iterations ++;
if (iterations == 2) {
baton_ptr->post();
}
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, batonTimedWaitTimeoutEvb) {
size_t tasksComplete = 0;
folly::EventBase evb;
FiberManager manager(folly::make_unique<EventBaseLoopController>());
dynamic_cast<EventBaseLoopController&>(
manager.loopController()).attachEventBase(evb);
auto task = [&](size_t timeout_ms) {
Baton baton;
auto start = EventBaseLoopController::Clock::now();
auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
auto finish = EventBaseLoopController::Clock::now();
EXPECT_FALSE(res);
auto duration_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
EXPECT_GT(duration_ms.count(), timeout_ms - 50);
EXPECT_LT(duration_ms.count(), timeout_ms + 50);
if (++tasksComplete == 2) {
evb.terminateLoopSoon();
}
};
evb.runInEventBaseThread([&]() {
manager.addTask(
[&]() {
task(500);
}
);
manager.addTask(
[&]() {
task(250);
}
);
});
evb.loopForever();
EXPECT_EQ(2, tasksComplete);
}
TEST(FiberManager, batonTimedWaitPostEvb) {
size_t tasksComplete = 0;
folly::EventBase evb;
FiberManager manager(folly::make_unique<EventBaseLoopController>());
dynamic_cast<EventBaseLoopController&>(
manager.loopController()).attachEventBase(evb);
evb.runInEventBaseThread([&]() {
manager.addTask([&]() {
Baton baton;
evb.tryRunAfterDelay([&]() {
baton.post();
},
100);
auto start = EventBaseLoopController::Clock::now();
auto res = baton.timed_wait(std::chrono::milliseconds(130));
auto finish = EventBaseLoopController::Clock::now();
EXPECT_TRUE(res);
auto duration_ms = std::chrono::duration_cast<
std::chrono::milliseconds>(finish - start);
EXPECT_TRUE(duration_ms.count() > 95 &&
duration_ms.count() < 110);
if (++tasksComplete == 1) {
evb.terminateLoopSoon();
}
});
});
evb.loopForever();
EXPECT_EQ(1, tasksComplete);
}
TEST(FiberManager, batonTryWait) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
// Check if try_wait and post work as expected
Baton b;
manager.addTask([&](){
while (!b.try_wait()) {
}
});
auto thr = std::thread([&](){
std::this_thread::sleep_for(std::chrono::milliseconds(300));
b.post();
});
manager.loopUntilNoReady();
thr.join();
Baton c;
// Check try_wait without post
manager.addTask([&](){
int cnt = 100;
while (cnt && !c.try_wait()) {
cnt--;
}
EXPECT_TRUE(!c.try_wait()); // must still hold
EXPECT_EQ(cnt, 0);
});
manager.loopUntilNoReady();
}
TEST(FiberManager, genericBatonFiberWait) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
GenericBaton b;
bool fiberRunning = false;
manager.addTask([&](){
EXPECT_EQ(manager.hasActiveFiber(), true);
fiberRunning = true;
b.wait();
fiberRunning = false;
});
EXPECT_FALSE(fiberRunning);
manager.loopUntilNoReady();
EXPECT_TRUE(fiberRunning); // ensure fiber still active
auto thr = std::thread([&](){
std::this_thread::sleep_for(std::chrono::milliseconds(300));
b.post();
});
while (fiberRunning) {
manager.loopUntilNoReady();
}
thr.join();
}
TEST(FiberManager, genericBatonThreadWait) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
GenericBaton b;
std::atomic<bool> threadWaiting(false);
auto thr = std::thread([&](){
threadWaiting = true;
b.wait();
threadWaiting = false;
});
while (!threadWaiting) {}
std::this_thread::sleep_for(std::chrono::milliseconds(300));
manager.addTask([&](){
EXPECT_EQ(manager.hasActiveFiber(), true);
EXPECT_TRUE(threadWaiting);
b.post();
while(threadWaiting) {}
});
manager.loopUntilNoReady();
thr.join();
}
TEST(FiberManager, addTasksNoncopyable) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<std::unique_ptr<int>()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
return folly::make_unique<int>(i*2 + 1);
}
);
}
auto iter = addTasks(funcs.begin(), funcs.end());
size_t n = 0;
while (iter.hasNext()) {
auto result = iter.awaitNext();
EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
EXPECT_GE(2 - n, pendingFibers.size());
++n;
}
EXPECT_EQ(3, n);
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, addTasksThrow) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
if (i % 2 == 0) {
throw std::runtime_error("Runtime");
}
return i*2 + 1;
}
);
}
auto iter = addTasks(funcs.begin(), funcs.end());
size_t n = 0;
while (iter.hasNext()) {
try {
int result = iter.awaitNext();
EXPECT_EQ(1, iter.getTaskID() % 2);
EXPECT_EQ(2 * iter.getTaskID() + 1, result);
} catch (...) {
EXPECT_EQ(0, iter.getTaskID() % 2);
}
EXPECT_GE(2 - n, pendingFibers.size());
++n;
}
EXPECT_EQ(3, n);
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, addTasksVoid) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
}
);
}
auto iter = addTasks(funcs.begin(), funcs.end());
size_t n = 0;
while (iter.hasNext()) {
iter.awaitNext();
EXPECT_GE(2 - n, pendingFibers.size());
++n;
}
EXPECT_EQ(3, n);
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, addTasksVoidThrow) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
if (i % 2 == 0) {
throw std::runtime_error("");
}
}
);
}
auto iter = addTasks(funcs.begin(), funcs.end());
size_t n = 0;
while (iter.hasNext()) {
try {
iter.awaitNext();
EXPECT_EQ(1, iter.getTaskID() % 2);
} catch (...) {
EXPECT_EQ(0, iter.getTaskID() % 2);
}
EXPECT_GE(2 - n, pendingFibers.size());
++n;
}
EXPECT_EQ(3, n);
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, reserve) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[&pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
}
);
}
auto iter = addTasks(funcs.begin(), funcs.end());
iter.reserve(2);
EXPECT_TRUE(iter.hasCompleted());
EXPECT_TRUE(iter.hasPending());
EXPECT_TRUE(iter.hasNext());
iter.awaitNext();
EXPECT_TRUE(iter.hasCompleted());
EXPECT_TRUE(iter.hasPending());
EXPECT_TRUE(iter.hasNext());
iter.awaitNext();
EXPECT_FALSE(iter.hasCompleted());
EXPECT_TRUE(iter.hasPending());
EXPECT_TRUE(iter.hasNext());
iter.awaitNext();
EXPECT_FALSE(iter.hasCompleted());
EXPECT_FALSE(iter.hasPending());
EXPECT_FALSE(iter.hasNext());
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, forEach) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
return i * 2 + 1;
}
);
}
std::vector<std::pair<size_t, int>> results;
forEach(funcs.begin(), funcs.end(),
[&results](size_t id, int result) {
results.push_back(std::make_pair(id, result));
});
EXPECT_EQ(3, results.size());
EXPECT_TRUE(pendingFibers.empty());
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
}
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenN) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
return i*2 + 1;
}
);
}
auto results = whenN(funcs.begin(), funcs.end(), 2);
EXPECT_EQ(2, results.size());
EXPECT_EQ(1, pendingFibers.size());
for (size_t i = 0; i < 2; ++i) {
EXPECT_EQ(results[i].first*2 + 1, results[i].second);
}
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenNThrow) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
throw std::runtime_error("Runtime");
return i*2+1;
}
);
}
try {
whenN(funcs.begin(), funcs.end(), 2);
} catch (...) {
EXPECT_EQ(1, pendingFibers.size());
}
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenNVoid) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
}
);
}
auto results = whenN(funcs.begin(), funcs.end(), 2);
EXPECT_EQ(2, results.size());
EXPECT_EQ(1, pendingFibers.size());
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenNVoidThrow) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
throw std::runtime_error("Runtime");
}
);
}
try {
whenN(funcs.begin(), funcs.end(), 2);
} catch (...) {
EXPECT_EQ(1, pendingFibers.size());
}
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenAll) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
return i*2+1;
}
);
}
auto results = whenAll(funcs.begin(), funcs.end());
EXPECT_TRUE(pendingFibers.empty());
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(i*2+1, results[i]);
}
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenAllVoid) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<void()>> funcs;
for (size_t i = 0; i < 3; ++ i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
}
);
}
whenAll(funcs.begin(), funcs.end());
EXPECT_TRUE(pendingFibers.empty());
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
TEST(FiberManager, whenAny) {
std::vector<Promise<int>> pendingFibers;
bool taskAdded = false;
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
manager.addTask(
[&]() {
std::vector<std::function<int()> > funcs;
for (size_t i = 0; i < 3; ++ i) {
funcs.push_back(
[i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
if (i == 1) {
throw std::runtime_error("This exception will be ignored");
}
return i*2+1;
}
);
}
auto result = whenAny(funcs.begin(), funcs.end());
EXPECT_EQ(2, pendingFibers.size());
EXPECT_EQ(2, result.first);
EXPECT_EQ(2*2+1, result.second);
}
);
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
pendingFibers.pop_back();
} else {
loopController.stop();
}
};
loopController.loop(std::move(loopFunc));
}
namespace {
/* Checks that this function was run from a main context,
by comparing an address on a stack to a known main stack address
and a known related fiber stack address. The assumption
is that fiber stack and main stack will be far enough apart,
while any two values on the same stack will be close. */
void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
int here;
/* 2 pages is a good guess */
constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
if (fiberLocation) {
EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
}
if (mainLocation) {
EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
}
EXPECT_FALSE(ran);
ran = true;
}
}
TEST(FiberManager, runInMainContext) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
bool checkRan = false;
int mainLocation;
manager.runInMainContext(
[&]() {
expectMainContext(checkRan, &mainLocation, nullptr);
});
EXPECT_TRUE(checkRan);
checkRan = false;
manager.addTask(
[&]() {
int stackLocation;
runInMainContext(
[&]() {
expectMainContext(checkRan, &mainLocation, &stackLocation);
});
EXPECT_TRUE(checkRan);
}
);
loopController.loop(
[&]() {
loopController.stop();
}
);
EXPECT_TRUE(checkRan);
}
TEST(FiberManager, addTaskFinally) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
bool checkRan = false;
int mainLocation;
manager.addTaskFinally(
[&]() {
return 1234;
},
[&](Try<int>&& result) {
EXPECT_EQ(result.value(), 1234);
expectMainContext(checkRan, &mainLocation, nullptr);
}
);
EXPECT_FALSE(checkRan);
loopController.loop(
[&]() {
loopController.stop();
}
);
EXPECT_TRUE(checkRan);
}
TEST(FiberManager, fibersPoolWithinLimit) {
FiberManager::Options opts;
opts.maxFibersPoolSize = 5;
FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
size_t fibersRun = 0;
for (size_t i = 0; i < 5; ++i) {
manager.addTask(
[&]() {
++fibersRun;
}
);
}
loopController.loop(
[&]() {
loopController.stop();
}
);
EXPECT_EQ(5, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
EXPECT_EQ(5, manager.fibersPoolSize());
for (size_t i = 0; i < 5; ++i) {
manager.addTask(
[&]() {
++fibersRun;
}
);
}
loopController.loop(
[&]() {
loopController.stop();
}
);
EXPECT_EQ(10, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
EXPECT_EQ(5, manager.fibersPoolSize());
}
TEST(FiberManager, fibersPoolOverLimit) {
FiberManager::Options opts;
opts.maxFibersPoolSize = 5;
FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
size_t fibersRun = 0;
for (size_t i = 0; i < 10; ++i) {
manager.addTask(
[&]() {
++fibersRun;
}
);
}
EXPECT_EQ(0, fibersRun);
EXPECT_EQ(10, manager.fibersAllocated());
EXPECT_EQ(0, manager.fibersPoolSize());
loopController.loop(
[&]() {
loopController.stop();
}
);
EXPECT_EQ(10, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
EXPECT_EQ(5, manager.fibersPoolSize());
}
TEST(FiberManager, remoteFiberBasic) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(manager.loopController());
int result[2];
result[0] = result[1] = 0;
folly::Optional<Promise<int>> savedPromise[2];
manager.addTask(
[&] () {
result[0] = await([&] (Promise<int> promise) {
savedPromise[0] = std::move(promise);
});
});
manager.addTask(
[&] () {
result[1] = await([&] (Promise<int> promise) {
savedPromise[1] = std::move(promise);
});
});
manager.loopUntilNoReady();
EXPECT_TRUE(savedPromise[0].hasValue());
EXPECT_TRUE(savedPromise[1].hasValue());
EXPECT_EQ(0, result[0]);
EXPECT_EQ(0, result[1]);
std::thread remoteThread0{
[&] () {
savedPromise[0]->setValue(42);
}
};
std::thread remoteThread1{
[&] () {
savedPromise[1]->setValue(43);
}
};
remoteThread0.join();
remoteThread1.join();
EXPECT_EQ(0, result[0]);
EXPECT_EQ(0, result[1]);
/* Should only have scheduled once */
EXPECT_EQ(1, loopController.remoteScheduleCalled());
manager.loopUntilNoReady();
EXPECT_EQ(42, result[0]);
EXPECT_EQ(43, result[1]);
}
TEST(FiberManager, addTaskRemoteBasic) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
int result[2];
result[0] = result[1] = 0;
folly::Optional<Promise<int>> savedPromise[2];
std::thread remoteThread0{
[&] () {
manager.addTaskRemote(
[&] () {
result[0] = await([&] (Promise<int> promise) {
savedPromise[0] = std::move(promise);
});
});
}
};
std::thread remoteThread1{
[&] () {
manager.addTaskRemote(
[&] () {
result[1] = await([&] (Promise<int> promise) {
savedPromise[1] = std::move(promise);
});
});
}
};
remoteThread0.join();
remoteThread1.join();
manager.loopUntilNoReady();
EXPECT_TRUE(savedPromise[0].hasValue());
EXPECT_TRUE(savedPromise[1].hasValue());
EXPECT_EQ(0, result[0]);
EXPECT_EQ(0, result[1]);
savedPromise[0]->setValue(42);
savedPromise[1]->setValue(43);
EXPECT_EQ(0, result[0]);
EXPECT_EQ(0, result[1]);
manager.loopUntilNoReady();
EXPECT_EQ(42, result[0]);
EXPECT_EQ(43, result[1]);
}
TEST(FiberManager, remoteHasTasks) {
size_t counter = 0;
FiberManager fm(folly::make_unique<SimpleLoopController>());
std::thread remote([&]() {
fm.addTaskRemote([&]() {
++counter;
});
});
remote.join();
while (fm.hasTasks()) {
fm.loopUntilNoReady();
}
EXPECT_FALSE(fm.hasTasks());
EXPECT_EQ(counter, 1);
}
TEST(FiberManager, remoteHasReadyTasks) {
int result = 0;
folly::Optional<Promise<int>> savedPromise;
FiberManager fm(folly::make_unique<SimpleLoopController>());
std::thread remote([&]() {
fm.addTaskRemote([&]() {
result = await([&](Promise<int> promise) {
savedPromise = std::move(promise);
});
EXPECT_TRUE(fm.hasTasks());
});
});
remote.join();
EXPECT_TRUE(fm.hasTasks());
fm.loopUntilNoReady();
EXPECT_TRUE(fm.hasTasks());
std::thread remote2([&](){
savedPromise->setValue(47);
});
remote2.join();
EXPECT_TRUE(fm.hasTasks());
fm.loopUntilNoReady();
EXPECT_FALSE(fm.hasTasks());
EXPECT_EQ(result, 47);
}
static size_t sNumAwaits;
void runBenchmark(size_t numAwaits, size_t toSend) {
sNumAwaits = numAwaits;
FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
auto& loopController =
dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
std::queue<Promise<int>> pendingRequests;
static const size_t maxOutstanding = 5;
auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
if (pendingRequests.size() == maxOutstanding || toSend == 0) {
if (pendingRequests.empty()) {
return;
}
pendingRequests.front().setValue(0);
pendingRequests.pop();
} else {
fiberManager.addTask([&pendingRequests]() {
for (size_t i = 0; i < sNumAwaits; ++i) {
auto result = await(
[&pendingRequests](Promise<int> promise) {
pendingRequests.push(std::move(promise));
});
assert(result == 0);
}
});
if (--toSend == 0) {
loopController.stop();
}
}
};
loopController.loop(std::move(loop));
}
BENCHMARK(FiberManagerBasicOneAwait, iters) {
runBenchmark(1, iters);
}
BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
runBenchmark(5, iters);
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iostream>
#include <queue>
#include <folly/Memory.h>
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/SimpleLoopController.h>
using namespace folly::fibers;
struct Application {
public:
Application ()
: fiberManager(folly::make_unique<SimpleLoopController>()),
toSend(20),
maxOutstanding(5) {
}
void loop() {
if (pendingRequests.size() == maxOutstanding || toSend == 0) {
if (pendingRequests.empty()) {
return;
}
intptr_t value = rand()%1000;
std::cout << "Completing request with data = " << value << std::endl;
pendingRequests.front().setValue(value);
pendingRequests.pop();
} else {
static size_t id_counter = 1;
size_t id = id_counter++;
std::cout << "Adding new request with id = " << id << std::endl;
fiberManager.addTask([this, id]() {
std::cout << "Executing fiber with id = " << id << std::endl;
auto result1 = await(
[this](Promise<int> fiber) {
pendingRequests.push(std::move(fiber));
});
std::cout << "Fiber id = " << id
<< " got result1 = " << result1 << std::endl;
auto result2 = await
([this](Promise<int> fiber) {
pendingRequests.push(std::move(fiber));
});
std::cout << "Fiber id = " << id
<< " got result2 = " << result2 << std::endl;
});
if (--toSend == 0) {
auto& loopController =
dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
loopController.stop();
}
}
}
FiberManager fiberManager;
std::queue<Promise<int>> pendingRequests;
size_t toSend;
size_t maxOutstanding;
};
int main() {
Application app;
auto loop = [&app]() {
app.loop();
};
auto& loopController =
dynamic_cast<SimpleLoopController&>(app.fiberManager.loopController());
loopController.loop(std::move(loop));
return 0;
}
check_PROGRAMS = mcrouter_fibers_test
mcrouter_fibers_test_SOURCES = \
FibersTest.cpp \
main.cpp
mcrouter_fibers_test_CPPFLAGS = -I$(top_srcdir)/oss_include
mcrouter_fibers_test_LDADD = $(top_builddir)/lib/libmcrouter.a -lgtest -lfollybenchmark
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/Benchmark.h>
// for backward compatibility with gflags
namespace gflags { }
namespace google { using namespace gflags; }
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
google::ParseCommandLineFlags(&argc, &argv, true);
auto rc = RUN_ALL_TESTS();
folly::runBenchmarksOnFlag();
return rc;
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <boost/type_traits.hpp>
namespace folly { namespace fibers {
/**
* For any functor F taking >= 1 argument,
* FirstArgOf<F>::type is the type of F's first parameter.
*
* Rationale: we want to declare a function func(F), where F has the
* signature `void(X)` and func should return T<X> (T and X are some types).
* Solution:
*
* template <typename F>
* T<typename FirstArgOf<F>::type>
* func(F&& f);
*/
namespace detail {
/**
* If F is a pointer-to-member, will contain a typedef type
* with the type of F's first parameter
*/
template<typename>
struct ExtractFirstMemfn;
template <typename Ret, typename T, typename First, typename... Args>
struct ExtractFirstMemfn<Ret (T::*)(First, Args...)> {
typedef First type;
};
template <typename Ret, typename T, typename First, typename... Args>
struct ExtractFirstMemfn<Ret (T::*)(First, Args...) const> {
typedef First type;
};
} // detail
/** Default - use boost */
template <typename F, typename Enable = void>
struct FirstArgOf {
typedef typename boost::function_traits<
typename std::remove_pointer<F>::type>::arg1_type type;
};
/** Specialization for function objects */
template <typename F>
struct FirstArgOf<F, typename std::enable_if<std::is_class<F>::value>::type> {
typedef typename detail::ExtractFirstMemfn<
decltype(&F::operator())>::type type;
};
}} // folly::fibers
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment