Commit f5a52d63 authored by Matthieu Martin's avatar Matthieu Martin Committed by Facebook Github Bot

Allow an EventBase to own multiple FiberManager

Summary:
See the test for a concrete example.
We have a scenario where both a library, and a service using that library, are using fiber. Managing a separate similar-sized thread and event-base pool costs us complexity and performance. So this seems like a legit need.

The overall solution is to hash the Options struct.
But because that has some cost, I made it optional, by introducing a new FrozenOptions struct.

Reviewed By: andriigrynenko

Differential Revision: D15305274

fbshipit-source-id: 5c53d8c4ed321ae88089c64de41af8229f877d36
parent 97e9c15e
......@@ -65,11 +65,24 @@ static AsanUnpoisonMemoryRegionFuncPtr getUnpoisonMemoryRegionFunc();
#endif
namespace std {
template <>
struct hash<folly::fibers::FiberManager::Options> {
ssize_t operator()(const folly::fibers::FiberManager::Options& opts) const {
return hash<decltype(opts.hash())>()(opts.hash());
}
};
} // namespace std
namespace folly {
namespace fibers {
FOLLY_TLS FiberManager* FiberManager::currentFiberManager_ = nullptr;
auto FiberManager::FrozenOptions::create(const Options& options) -> ssize_t {
return std::hash<Options>()(options);
}
FiberManager::FiberManager(
std::unique_ptr<LoopController> loopController,
Options options)
......
......@@ -122,6 +122,33 @@ class FiberManager : public ::folly::Executor {
uint32_t fibersPoolResizePeriodMs{0};
constexpr Options() {}
auto hash() const {
return std::make_tuple(
stackSize,
stackSizeMultiplier,
recordStackEvery,
maxFibersPoolSize,
useGuardPages,
fibersPoolResizePeriodMs);
}
};
/**
* A (const) Options instance with a dedicated unique identifier,
* which is used as a key in FiberManagerMap.
* This is relevant if you want to run different FiberManager,
* with different Option, on the same EventBase.
*/
struct FrozenOptions {
explicit FrozenOptions(Options options_)
: options(std::move(options_)), token(create(options)) {}
const Options options;
const ssize_t token;
private:
static ssize_t create(const Options&);
};
using ExceptionCallback =
......
......@@ -28,18 +28,25 @@ namespace fibers {
namespace {
// ssize_t is a hash of FiberManager::Options
template <typename EventBaseT>
Function<void()> makeOnEventBaseDestructionCallback(EventBaseT& evb);
using Key = std::pair<EventBaseT*, ssize_t>;
template <typename EventBaseT>
Function<void()> makeOnEventBaseDestructionCallback(const Key<EventBaseT>& key);
template <typename EventBaseT>
class GlobalCache {
public:
static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
return instance().getImpl(evb, opts);
static FiberManager& get(
const Key<EventBaseT>& key,
EventBaseT& evb,
const FiberManager::Options& opts) {
return instance().getImpl(key, evb, opts);
}
static std::unique_ptr<FiberManager> erase(EventBaseT& evb) {
return instance().eraseImpl(evb);
static std::unique_ptr<FiberManager> erase(const Key<EventBaseT>& key) {
return instance().eraseImpl(key);
}
private:
......@@ -52,17 +59,20 @@ class GlobalCache {
return *ret;
}
FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
FiberManager& getImpl(
const Key<EventBaseT>& key,
EventBaseT& evb,
const FiberManager::Options& opts) {
bool constructed = false;
SCOPE_EXIT {
if (constructed) {
evb.runOnDestruction(makeOnEventBaseDestructionCallback(evb));
evb.runOnDestruction(makeOnEventBaseDestructionCallback(key));
}
};
std::lock_guard<std::mutex> lg(mutex_);
auto& fmPtrRef = map_[&evb];
auto& fmPtrRef = map_[key];
if (!fmPtrRef) {
constructed = true;
......@@ -75,18 +85,18 @@ class GlobalCache {
return *fmPtrRef;
}
std::unique_ptr<FiberManager> eraseImpl(EventBaseT& evb) {
std::unique_ptr<FiberManager> eraseImpl(const Key<EventBaseT>& key) {
std::lock_guard<std::mutex> lg(mutex_);
DCHECK_EQ(map_.count(&evb), 1u);
DCHECK_EQ(map_.count(key), 1u);
auto ret = std::move(map_[&evb]);
map_.erase(&evb);
auto ret = std::move(map_[key]);
map_.erase(key);
return ret;
}
std::mutex mutex_;
std::unordered_map<EventBaseT*, std::unique_ptr<FiberManager>> map_;
std::unordered_map<Key<EventBaseT>, std::unique_ptr<FiberManager>> map_;
};
constexpr size_t kEraseListMaxSize = 64;
......@@ -94,17 +104,20 @@ constexpr size_t kEraseListMaxSize = 64;
template <typename EventBaseT>
class ThreadLocalCache {
public:
static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
return instance()->getImpl(evb, opts);
static FiberManager& get(
const Key<EventBaseT>& key,
EventBaseT& evb,
const FiberManager::Options& opts) {
return instance()->getImpl(key, evb, opts);
}
static void erase(EventBaseT& evb) {
static void erase(const Key<EventBaseT>& key) {
for (auto& localInstance : instance().accessAllThreads()) {
localInstance.eraseInfo_.withWLock([&](auto& info) {
if (info.eraseList.size() >= kEraseListMaxSize) {
info.eraseAll = true;
} else {
info.eraseList.push_back(&evb);
info.eraseList.push_back(key);
}
localInstance.eraseRequested_ = true;
});
......@@ -126,12 +139,15 @@ class ThreadLocalCache {
return *ret;
}
FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
FiberManager& getImpl(
const Key<EventBaseT>& key,
EventBaseT& evb,
const FiberManager::Options& opts) {
eraseImpl();
auto& fmPtrRef = map_[&evb];
auto& fmPtrRef = map_[key];
if (!fmPtrRef) {
fmPtrRef = &GlobalCache<EventBaseT>::get(evb, opts);
fmPtrRef = &GlobalCache<EventBaseT>::get(key, evb, opts);
}
DCHECK(fmPtrRef != nullptr);
......@@ -148,8 +164,8 @@ class ThreadLocalCache {
if (info.eraseAll) {
map_.clear();
} else {
for (auto evbPtr : info.eraseList) {
map_.erase(evbPtr);
for (auto& key : info.eraseList) {
map_.erase(key);
}
}
......@@ -159,23 +175,24 @@ class ThreadLocalCache {
});
}
std::unordered_map<EventBaseT*, FiberManager*> map_;
std::unordered_map<Key<EventBaseT>, FiberManager*> map_;
std::atomic<bool> eraseRequested_{false};
struct EraseInfo {
bool eraseAll{false};
std::vector<EventBaseT*> eraseList;
std::vector<Key<EventBaseT>> eraseList;
};
folly::Synchronized<EraseInfo> eraseInfo_;
};
template <typename EventBaseT>
Function<void()> makeOnEventBaseDestructionCallback(EventBaseT& evb) {
return [&evb] {
auto fm = GlobalCache<EventBaseT>::erase(evb);
Function<void()> makeOnEventBaseDestructionCallback(
const Key<EventBaseT>& key) {
return [key] {
auto fm = GlobalCache<EventBaseT>::erase(key);
DCHECK(fm.get() != nullptr);
ThreadLocalCache<EventBaseT>::erase(evb);
ThreadLocalCache<EventBaseT>::erase(key);
};
}
......@@ -184,13 +201,22 @@ Function<void()> makeOnEventBaseDestructionCallback(EventBaseT& evb) {
FiberManager& getFiberManager(
EventBase& evb,
const FiberManager::Options& opts) {
return ThreadLocalCache<EventBase>::get(evb, opts);
return ThreadLocalCache<EventBase>::get(std::make_pair(&evb, 0), evb, opts);
}
FiberManager& getFiberManager(
VirtualEventBase& evb,
const FiberManager::Options& opts) {
return ThreadLocalCache<VirtualEventBase>::get(evb, opts);
return ThreadLocalCache<VirtualEventBase>::get(
std::make_pair(&evb, 0), evb, opts);
}
FiberManager& getFiberManager(
folly::EventBase& evb,
const FiberManager::FrozenOptions& opts) {
return ThreadLocalCache<EventBase>::get(
std::make_pair(&evb, opts.token), evb, opts.options);
}
} // namespace fibers
} // namespace folly
......@@ -29,5 +29,10 @@ FiberManager& getFiberManager(
FiberManager& getFiberManager(
folly::VirtualEventBase& evb,
const FiberManager::Options& opts = FiberManager::Options());
FiberManager& getFiberManager(
folly::EventBase& evb,
const FiberManager::FrozenOptions& opts);
} // namespace fibers
} // namespace folly
......@@ -1570,6 +1570,50 @@ TEST(FiberManager, nestedFiberManagers) {
outerEvb.loopForever();
}
TEST(FiberManager, nestedFiberManagersSameEvb) {
folly::EventBase evb;
auto& fm1 = getFiberManager(evb);
EXPECT_EQ(&fm1, &getFiberManager(evb));
// Always return the same fm by default
FiberManager::Options unused;
unused.stackSize = 1024;
EXPECT_EQ(&fm1, &getFiberManager(evb, unused));
// Use frozen options
FiberManager::Options used;
used.stackSize = 1024;
FiberManager::FrozenOptions options{used};
auto& fm2 = getFiberManager(evb, options);
EXPECT_NE(&fm1, &fm2);
// Same option
EXPECT_EQ(&fm2, &getFiberManager(evb, options));
EXPECT_EQ(&fm2, &getFiberManager(evb, FiberManager::FrozenOptions{used}));
FiberManager::Options same;
same.stackSize = 1024;
EXPECT_EQ(&fm2, &getFiberManager(evb, FiberManager::FrozenOptions{same}));
// Different option
FiberManager::Options differ;
differ.stackSize = 2048;
auto& fm3 = getFiberManager(evb, FiberManager::FrozenOptions{differ});
EXPECT_NE(&fm1, &fm3);
EXPECT_NE(&fm2, &fm3);
// Nested usage
getFiberManager(evb)
.addTaskFuture([&] {
EXPECT_EQ(&fm1, FiberManager::getFiberManagerUnsafe());
getFiberManager(evb, options)
.addTaskFuture(
[&] { EXPECT_EQ(&fm2, FiberManager::getFiberManagerUnsafe()); })
.wait();
})
.waitVia(&evb);
}
TEST(FiberManager, semaphore) {
static constexpr size_t kTasks = 10;
static constexpr size_t kIterations = 10000;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment