Commit d7c3a477 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by facebook-github-bot-4

Fix EventBase destruction race in FiberManagerMap

Summary:
Previously we could be reading from thread-local FiberManagerMap while it was modified.
This is now fixed by keeping a per-thread list of EventBases which need to be removed from local maps. On the fast-path no action is taken, since list will be empty.

This is second try, since D2853921 got reverted.
The new implementation is simpler and does not rely on AtomicLinkedList.

Reviewed By: yfeldblum

Differential Revision: D2908018

fb-gh-sync-id: 4d7aed974c19761f7e2732ddbf8694af57c69bd6
shipit-source-id: 4d7aed974c19761f7e2732ddbf8694af57c69bd6
parent b0a7bdf6
...@@ -15,96 +15,169 @@ ...@@ -15,96 +15,169 @@
*/ */
#include "FiberManagerMap.h" #include "FiberManagerMap.h"
#include <cassert>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <folly/AtomicLinkedList.h>
#include <folly/ThreadLocal.h> #include <folly/ThreadLocal.h>
#include <folly/Synchronized.h>
namespace folly { namespace fibers { namespace folly { namespace fibers {
namespace { namespace {
// Leak these intentionally. During shutdown, we may call getFiberManager, and class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
// want access to the fiber managers during that time. public:
class LocalFiberManagerMapTag; explicit EventBaseOnDestructionCallback(EventBase& evb) : evb_(evb) {}
typedef folly::ThreadLocal< void runLoopCallback() noexcept override;
std::unordered_map<folly::EventBase*, FiberManager*>,
LocalFiberManagerMapTag>
LocalMapType;
LocalMapType* localFiberManagerMap() {
static auto ret = new LocalMapType();
return ret;
}
typedef private:
std::unordered_map<folly::EventBase*, std::unique_ptr<FiberManager>> EventBase& evb_;
MapType; };
MapType* fiberManagerMap() {
static auto ret = new MapType();
return ret;
}
std::mutex* fiberManagerMapMutex() { class GlobalCache {
static auto ret = new std::mutex(); public:
return ret; static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
} return instance().getImpl(evb, opts);
}
static std::unique_ptr<FiberManager> erase(EventBase& evb) {
return instance().eraseImpl(evb);
}
class OnEventBaseDestructionCallback : public folly::EventBase::LoopCallback { private:
public: GlobalCache() {}
explicit OnEventBaseDestructionCallback(folly::EventBase& evb)
: evb_(&evb) {} // Leak this intentionally. During shutdown, we may call getFiberManager,
void runLoopCallback() noexcept override { // and want access to the fiber managers during that time.
for (auto& localMap : localFiberManagerMap()->accessAllThreads()) { static GlobalCache& instance() {
localMap.erase(evb_); static auto ret = new GlobalCache();
return *ret;
}
FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
std::lock_guard<std::mutex> lg(mutex_);
auto& fmPtrRef = map_[&evb];
if (!fmPtrRef) {
auto loopController = make_unique<EventBaseLoopController>();
loopController->attachEventBase(evb);
evb.runOnDestruction(new EventBaseOnDestructionCallback(evb));
fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
} }
std::unique_ptr<FiberManager> fm;
{ return *fmPtrRef;
std::lock_guard<std::mutex> lg(*fiberManagerMapMutex()); }
auto it = fiberManagerMap()->find(evb_);
assert(it != fiberManagerMap()->end()); std::unique_ptr<FiberManager> eraseImpl(EventBase& evb) {
fm = std::move(it->second); std::lock_guard<std::mutex> lg(mutex_);
fiberManagerMap()->erase(it);
DCHECK_EQ(1, map_.count(&evb));
auto ret = std::move(map_[&evb]);
map_.erase(&evb);
return ret;
}
std::mutex mutex_;
std::unordered_map<EventBase*, std::unique_ptr<FiberManager>> map_;
};
constexpr size_t kEraseListMaxSize = 64;
class ThreadLocalCache {
public:
static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
return instance()->getImpl(evb, opts);
}
static void erase(EventBase& evb) {
for (auto& localInstance : instance().accessAllThreads()) {
SYNCHRONIZED(info, localInstance.eraseInfo_) {
if (info.eraseList.size() >= kEraseListMaxSize) {
info.eraseAll = true;
} else {
info.eraseList.push_back(&evb);
}
localInstance.eraseRequested_ = true;
}
} }
assert(fm.get() != nullptr);
fm->loopUntilNoReady();
delete this;
} }
private: private:
folly::EventBase* evb_; ThreadLocalCache() {}
};
FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb, struct ThreadLocalCacheTag {};
const FiberManager::Options& opts) { using ThreadThreadLocalCache = ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
std::lock_guard<std::mutex> lg(*fiberManagerMapMutex());
auto it = fiberManagerMap()->find(&evb); // Leak this intentionally. During shutdown, we may call getFiberManager,
if (LIKELY(it != fiberManagerMap()->end())) { // and want access to the fiber managers during that time.
return it->second.get(); static ThreadThreadLocalCache& instance() {
static auto ret = new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
return *ret;
} }
auto loopController = folly::make_unique<EventBaseLoopController>(); FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
loopController->attachEventBase(evb); eraseImpl();
auto fiberManager =
folly::make_unique<FiberManager>(std::move(loopController), opts); auto& fmPtrRef = map_[&evb];
auto result = fiberManagerMap()->emplace(&evb, std::move(fiberManager)); if (!fmPtrRef) {
evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); fmPtrRef = &GlobalCache::get(evb, opts);
return result.first->second.get(); }
DCHECK(fmPtrRef != nullptr);
return *fmPtrRef;
}
void eraseImpl() {
if (!eraseRequested_.load()) {
return;
}
SYNCHRONIZED(info, eraseInfo_) {
if (info.eraseAll) {
map_.clear();
} else {
for (auto evbPtr : info.eraseList) {
map_.erase(evbPtr);
}
}
info.eraseList.clear();
info.eraseAll = false;
eraseRequested_ = false;
}
}
std::unordered_map<EventBase*, FiberManager*> map_;
std::atomic<bool> eraseRequested_{false};
struct EraseInfo {
bool eraseAll{false};
std::vector<EventBase*> eraseList;
};
folly::Synchronized<EraseInfo> eraseInfo_;
};
void EventBaseOnDestructionCallback::runLoopCallback() noexcept {
auto fm = GlobalCache::erase(evb_);
DCHECK(fm.get() != nullptr);
ThreadLocalCache::erase(evb_);
fm->loopUntilNoReady();
delete this;
} }
} // namespace } // namespace
FiberManager& getFiberManager(folly::EventBase& evb, FiberManager& getFiberManager(EventBase& evb,
const FiberManager::Options& opts) { const FiberManager::Options& opts) {
auto it = (*localFiberManagerMap())->find(&evb); return ThreadLocalCache::get(evb, opts);
if (LIKELY(it != (*localFiberManagerMap())->end())) {
return *(it->second);
}
auto fm = getFiberManagerThreadSafe(evb, opts);
(*localFiberManagerMap())->emplace(&evb, fm);
return *fm;
} }
}} }}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment