Commit 1ac421a5 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by facebook-github-bot-1

Fix EventBase destruction race in FiberManagerMap

Summary:
Previously we could be reading from thread-local FiberManagerMap while it was modified.
This is now fixed by keeping a per-thread list of EventBases which need to be removed from local maps. On the fast-path no action is taken, since list will be empty.

Reviewed By: yfeldblum

Differential Revision: D2853921

fb-gh-sync-id: f05e1924dd2b97bfb359537de1909bbe193e0cb9
parent dbf7c3d2
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <glog/logging.h>
namespace folly {
/**
* A very simple atomic single-linked list primitive.
*
* Usage:
*
* class MyClass {
* AtomicIntrusiveLinkedListHook<MyClass> hook_;
* }
*
* AtomicIntrusiveLinkedList<MyClass, &MyClass::hook_> list;
* list.insert(&a);
* list.sweep([] (MyClass* c) { doSomething(c); }
*/
template <class T>
struct AtomicIntrusiveLinkedListHook {
T* next{nullptr};
};
template <class T, AtomicIntrusiveLinkedListHook<T> T::*HookMember>
class AtomicIntrusiveLinkedList {
public:
AtomicIntrusiveLinkedList() {}
AtomicIntrusiveLinkedList(const AtomicIntrusiveLinkedList&) = delete;
AtomicIntrusiveLinkedList& operator=(const AtomicIntrusiveLinkedList&) =
delete;
AtomicIntrusiveLinkedList(AtomicIntrusiveLinkedList&& other) noexcept {
*this = std::move(other);
}
AtomicIntrusiveLinkedList& operator=(
AtomicIntrusiveLinkedList&& other) noexcept {
auto tmp = other.head_.load();
other.head_ = head_.load();
head_ = tmp;
return *this;
}
/**
* Note: list must be empty on destruction.
*/
~AtomicIntrusiveLinkedList() { DCHECK(empty()); }
bool empty() const { return head_ == nullptr; }
/**
* Atomically insert t at the head of the list.
* @return True if the inserted element is the only one in the list
* after the call.
*/
bool insertHead(T* t) {
DCHECK(next(t) == nullptr);
auto oldHead = head_.load(std::memory_order_relaxed);
do {
next(t) = oldHead;
/* oldHead is updated by the call below.
NOTE: we don't use next(t) instead of oldHead directly due to
compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899),
MSVC (bug 819819); source:
http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */
} while (!head_.compare_exchange_weak(
oldHead, t, std::memory_order_release, std::memory_order_relaxed));
return oldHead == nullptr;
}
/**
* Repeatedly replaces the head with nullptr,
* and calls func() on the removed elements in the order from tail to head.
* Stops when the list is empty.
*/
template <typename F>
void sweep(F&& func) {
while (auto head = head_.exchange(nullptr)) {
auto rhead = reverse(head);
while (rhead != nullptr) {
auto t = rhead;
rhead = next(t);
next(t) = nullptr;
func(t);
}
}
}
private:
std::atomic<T*> head_{nullptr};
static T*& next(T* t) { return (t->*HookMember).next; }
/* Reverses a linked list, returning the pointer to the new head
(old tail) */
static T* reverse(T* head) {
T* rhead = nullptr;
while (head != nullptr) {
auto t = head;
head = next(t);
next(t) = rhead;
rhead = t;
}
return rhead;
}
};
} // namespace folly
...@@ -13,13 +13,14 @@ ...@@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#pragma once
#ifndef FOLLY_ATOMIC_LINKED_LIST_H_
#define FOLLY_ATOMIC_LINKED_LIST_H_
#include <atomic> #include <atomic>
#include <cassert> #include <cassert>
#include <folly/AtomicIntrusiveLinkedList.h>
#include <folly/Memory.h>
namespace folly { namespace folly {
/** /**
...@@ -27,112 +28,59 @@ namespace folly { ...@@ -27,112 +28,59 @@ namespace folly {
* *
* Usage: * Usage:
* *
* class MyClass { * AtomicLinkedList<MyClass> list;
* AtomicLinkedListHook<MyClass> hook_; * list.insert(a);
* } * list.sweep([] (MyClass& c) { doSomething(c); }
*
* AtomicLinkedList<MyClass, &MyClass::hook_> list;
* list.insert(&a);
* list.sweep([] (MyClass* c) { doSomething(c); }
*/ */
template <class T>
struct AtomicLinkedListHook {
T* next{nullptr};
};
template <class T, AtomicLinkedListHook<T> T::* HookMember> template <class T>
class AtomicLinkedList { class AtomicLinkedList {
public: public:
AtomicLinkedList() {} AtomicLinkedList() {}
AtomicLinkedList(const AtomicLinkedList&) = delete; AtomicLinkedList(const AtomicLinkedList&) = delete;
AtomicLinkedList& operator=(const AtomicLinkedList&) = delete; AtomicLinkedList& operator=(const AtomicLinkedList&) = delete;
AtomicLinkedList(AtomicLinkedList&& other) noexcept { AtomicLinkedList(AtomicLinkedList&& other) noexcept = default;
auto tmp = other.head_.load(); AtomicLinkedList& operator=(AtomicLinkedList&& other) = default;
other.head_ = head_.load();
head_ = tmp;
}
AtomicLinkedList& operator=(AtomicLinkedList&& other) noexcept {
auto tmp = other.head_.load();
other.head_ = head_.load();
head_ = tmp;
return *this;
}
/**
* Note: list must be empty on destruction.
*/
~AtomicLinkedList() { ~AtomicLinkedList() {
assert(empty()); sweep([](T&&) {});
} }
bool empty() const { bool empty() const { return list_.empty(); }
return head_ == nullptr;
}
/** /**
* Atomically insert t at the head of the list. * Atomically insert t at the head of the list.
* @return True if the inserted element is the only one in the list * @return True if the inserted element is the only one in the list
* after the call. * after the call.
*/ */
bool insertHead(T* t) { bool insertHead(T t) {
assert(next(t) == nullptr); auto wrapper = folly::make_unique<Wrapper>(std::move(t));
auto oldHead = head_.load(std::memory_order_relaxed);
do {
next(t) = oldHead;
/* oldHead is updated by the call below.
NOTE: we don't use next(t) instead of oldHead directly due to return list_.insertHead(wrapper.release());
compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899),
MSVC (bug 819819); source:
http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */
} while (!head_.compare_exchange_weak(oldHead, t,
std::memory_order_release,
std::memory_order_relaxed));
return oldHead == nullptr;
} }
/** /**
* Repeatedly replaces the head with nullptr, * Repeatedly pops element from head,
* and calls func() on the removed elements in the order from tail to head. * and calls func() on the removed elements in the order from tail to head.
* Stops when the list is empty. * Stops when the list is empty.
*/ */
template <typename F> template <typename F>
void sweep(F&& func) { void sweep(F&& func) {
while (auto head = head_.exchange(nullptr)) { list_.sweep([&](Wrapper* wrapperPtr) mutable {
auto rhead = reverse(head); std::unique_ptr<Wrapper> wrapper(wrapperPtr);
while (rhead != nullptr) {
auto t = rhead; func(std::move(wrapper->data));
rhead = next(t); });
next(t) = nullptr;
func(t);
}
}
} }
private: private:
std::atomic<T*> head_{nullptr}; struct Wrapper {
explicit Wrapper(T&& t) : data(std::move(t)) {}
static T*& next(T* t) { AtomicIntrusiveLinkedListHook<Wrapper> hook;
return (t->*HookMember).next; T data;
} };
AtomicIntrusiveLinkedList<Wrapper, &Wrapper::hook> list_;
/* Reverses a linked list, returning the pointer to the new head
(old tail) */
static T* reverse(T* head) {
T* rhead = nullptr;
while (head != nullptr) {
auto t = head;
head = next(t);
next(t) = rhead;
rhead = t;
}
return rhead;
}
}; };
} // namespace folly } // namespace folly
#endif
...@@ -27,6 +27,7 @@ nobase_follyinclude_HEADERS = \ ...@@ -27,6 +27,7 @@ nobase_follyinclude_HEADERS = \
AtomicHashArray-inl.h \ AtomicHashArray-inl.h \
AtomicHashMap.h \ AtomicHashMap.h \
AtomicHashMap-inl.h \ AtomicHashMap-inl.h \
AtomicIntrusiveLinkedList.h \
AtomicLinkedList.h \ AtomicLinkedList.h \
AtomicStruct.h \ AtomicStruct.h \
AtomicUnorderedMap.h \ AtomicUnorderedMap.h \
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <boost/context/all.hpp> #include <boost/context/all.hpp>
#include <boost/version.hpp> #include <boost/version.hpp>
#include <folly/AtomicLinkedList.h> #include <folly/AtomicIntrusiveLinkedList.h>
#include <folly/CPortability.h> #include <folly/CPortability.h>
#include <folly/IntrusiveList.h> #include <folly/IntrusiveList.h>
#include <folly/experimental/fibers/BoostContextCompatibility.h> #include <folly/experimental/fibers/BoostContextCompatibility.h>
...@@ -126,7 +126,7 @@ class Fiber { ...@@ -126,7 +126,7 @@ class Fiber {
/** /**
* Points to next fiber in remote ready list * Points to next fiber in remote ready list
*/ */
folly::AtomicLinkedListHook<Fiber> nextRemoteReady_; folly::AtomicIntrusiveLinkedListHook<Fiber> nextRemoteReady_;
static constexpr size_t kUserBufferSize = 256; static constexpr size_t kUserBufferSize = 256;
std::aligned_storage<kUserBufferSize>::type userBuffer_; std::aligned_storage<kUserBufferSize>::type userBuffer_;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include <folly/AtomicLinkedList.h> #include <folly/AtomicIntrusiveLinkedList.h>
#include <folly/Executor.h> #include <folly/Executor.h>
#include <folly/Likely.h> #include <folly/Likely.h>
#include <folly/IntrusiveList.h> #include <folly/IntrusiveList.h>
...@@ -307,7 +307,7 @@ class FiberManager : public ::folly::Executor { ...@@ -307,7 +307,7 @@ class FiberManager : public ::folly::Executor {
std::function<void()> func; std::function<void()> func;
std::unique_ptr<Fiber::LocalData> localData; std::unique_ptr<Fiber::LocalData> localData;
std::shared_ptr<RequestContext> rcontext; std::shared_ptr<RequestContext> rcontext;
AtomicLinkedListHook<RemoteTask> nextRemoteTask; AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
}; };
typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue; typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
...@@ -414,9 +414,10 @@ class FiberManager : public ::folly::Executor { ...@@ -414,9 +414,10 @@ class FiberManager : public ::folly::Executor {
ExceptionCallback exceptionCallback_; /**< task exception callback */ ExceptionCallback exceptionCallback_; /**< task exception callback */
folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_; folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
remoteReadyQueue_;
folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask> folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
remoteTaskQueue_; remoteTaskQueue_;
std::shared_ptr<TimeoutController> timeoutManager_; std::shared_ptr<TimeoutController> timeoutManager_;
......
...@@ -15,96 +15,140 @@ ...@@ -15,96 +15,140 @@
*/ */
#include "FiberManagerMap.h" #include "FiberManagerMap.h"
#include <cassert>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <folly/AtomicLinkedList.h>
#include <folly/ThreadLocal.h> #include <folly/ThreadLocal.h>
namespace folly { namespace fibers { namespace folly { namespace fibers {
namespace { namespace {
// Leak these intentionally. During shutdown, we may call getFiberManager, and class OnEventBaseDestructionCallback : public EventBase::LoopCallback {
// want access to the fiber managers during that time. public:
class LocalFiberManagerMapTag; explicit OnEventBaseDestructionCallback(EventBase& evb) : evb_(evb) {}
typedef folly::ThreadLocal< void runLoopCallback() noexcept override;
std::unordered_map<folly::EventBase*, FiberManager*>,
LocalFiberManagerMapTag>
LocalMapType;
LocalMapType* localFiberManagerMap() {
static auto ret = new LocalMapType();
return ret;
}
typedef private:
std::unordered_map<folly::EventBase*, std::unique_ptr<FiberManager>> EventBase& evb_;
MapType; };
MapType* fiberManagerMap() {
static auto ret = new MapType();
return ret;
}
std::mutex* fiberManagerMapMutex() { class GlobalCache {
static auto ret = new std::mutex(); public:
return ret; static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
} return instance().getImpl(evb, opts);
}
static std::unique_ptr<FiberManager> erase(EventBase& evb) {
return instance().eraseImpl(evb);
}
private:
GlobalCache() {}
class OnEventBaseDestructionCallback : public folly::EventBase::LoopCallback { // Leak this intentionally. During shutdown, we may call getFiberManager,
public: // and want access to the fiber managers during that time.
explicit OnEventBaseDestructionCallback(folly::EventBase& evb) static GlobalCache& instance() {
: evb_(&evb) {} static auto ret = new GlobalCache();
void runLoopCallback() noexcept override { return *ret;
for (auto& localMap : localFiberManagerMap()->accessAllThreads()) { }
localMap.erase(evb_);
FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
std::lock_guard<std::mutex> lg(mutex_);
auto& fmPtrRef = map_[&evb];
if (!fmPtrRef) {
auto loopController = make_unique<EventBaseLoopController>();
loopController->attachEventBase(evb);
evb.runOnDestruction(new OnEventBaseDestructionCallback(evb));
fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
} }
std::unique_ptr<FiberManager> fm;
{ return *fmPtrRef;
std::lock_guard<std::mutex> lg(*fiberManagerMapMutex()); }
auto it = fiberManagerMap()->find(evb_);
assert(it != fiberManagerMap()->end()); std::unique_ptr<FiberManager> eraseImpl(EventBase& evb) {
fm = std::move(it->second); std::lock_guard<std::mutex> lg(mutex_);
fiberManagerMap()->erase(it);
DCHECK(map_.find(&evb) != map_.end());
auto ret = std::move(map_[&evb]);
map_.erase(&evb);
return ret;
}
std::mutex mutex_;
std::unordered_map<EventBase*, std::unique_ptr<FiberManager>> map_;
};
class LocalCache {
public:
static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
return instance()->getImpl(evb, opts);
}
static void erase(EventBase& evb) {
for (auto& localInstance : instance().accessAllThreads()) {
localInstance.removedEvbs_.insertHead(&evb);
} }
assert(fm.get() != nullptr);
fm->loopUntilNoReady();
delete this;
} }
private: private:
folly::EventBase* evb_; LocalCache() {}
};
FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb, struct LocalCacheTag {};
const FiberManager::Options& opts) { using ThreadLocalCache = ThreadLocal<LocalCache, LocalCacheTag>;
std::lock_guard<std::mutex> lg(*fiberManagerMapMutex());
auto it = fiberManagerMap()->find(&evb); // Leak this intentionally. During shutdown, we may call getFiberManager,
if (LIKELY(it != fiberManagerMap()->end())) { // and want access to the fiber managers during that time.
return it->second.get(); static ThreadLocalCache& instance() {
static auto ret = new ThreadLocalCache([]() { return new LocalCache(); });
return *ret;
} }
auto loopController = folly::make_unique<EventBaseLoopController>(); FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
loopController->attachEventBase(evb); eraseImpl();
auto fiberManager =
folly::make_unique<FiberManager>(std::move(loopController), opts); auto& fmPtrRef = map_[&evb];
auto result = fiberManagerMap()->emplace(&evb, std::move(fiberManager)); if (!fmPtrRef) {
evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); fmPtrRef = &GlobalCache::get(evb, opts);
return result.first->second.get(); }
DCHECK(fmPtrRef != nullptr);
return *fmPtrRef;
}
void eraseImpl() {
if (removedEvbs_.empty()) {
return;
}
removedEvbs_.sweep([&](EventBase* evb) { map_.erase(evb); });
}
std::unordered_map<EventBase*, FiberManager*> map_;
AtomicLinkedList<EventBase*> removedEvbs_;
};
void OnEventBaseDestructionCallback::runLoopCallback() noexcept {
auto fm = GlobalCache::erase(evb_);
DCHECK(fm.get() != nullptr);
LocalCache::erase(evb_);
fm->loopUntilNoReady();
delete this;
} }
} // namespace } // namespace
FiberManager& getFiberManager(folly::EventBase& evb, FiberManager& getFiberManager(EventBase& evb,
const FiberManager::Options& opts) { const FiberManager::Options& opts) {
auto it = (*localFiberManagerMap())->find(&evb); return LocalCache::get(evb, opts);
if (LIKELY(it != (*localFiberManagerMap())->end())) {
return *(it->second);
}
auto fm = getFiberManagerThreadSafe(evb, opts);
(*localFiberManagerMap())->emplace(&evb, fm);
return *fm;
} }
}} }}
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <gtest/gtest.h>
#include <folly/AtomicLinkedList.h>
class TestIntrusiveObject {
public:
explicit TestIntrusiveObject(size_t id__) : id_(id__) {}
size_t id() { return id_; }
private:
folly::AtomicIntrusiveLinkedListHook<TestIntrusiveObject> hook_;
size_t id_;
public:
using List = folly::AtomicIntrusiveLinkedList<TestIntrusiveObject,
&TestIntrusiveObject::hook_>;
};
TEST(AtomicIntrusiveLinkedList, Basic) {
TestIntrusiveObject a(1), b(2), c(3);
TestIntrusiveObject::List list;
EXPECT_TRUE(list.empty());
{
EXPECT_TRUE(list.insertHead(&a));
EXPECT_FALSE(list.insertHead(&b));
EXPECT_FALSE(list.empty());
size_t id = 0;
list.sweep(
[&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); });
EXPECT_TRUE(list.empty());
}
// Try re-inserting the same item (b) and a new item (c)
{
EXPECT_TRUE(list.insertHead(&b));
EXPECT_FALSE(list.insertHead(&c));
EXPECT_FALSE(list.empty());
size_t id = 1;
list.sweep(
[&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); });
EXPECT_TRUE(list.empty());
}
TestIntrusiveObject::List movedList = std::move(list);
}
TEST(AtomicIntrusiveLinkedList, Move) {
TestIntrusiveObject a(1), b(2);
TestIntrusiveObject::List list1;
EXPECT_TRUE(list1.insertHead(&a));
EXPECT_FALSE(list1.insertHead(&b));
EXPECT_FALSE(list1.empty());
TestIntrusiveObject::List list2(std::move(list1));
EXPECT_TRUE(list1.empty());
EXPECT_FALSE(list2.empty());
TestIntrusiveObject::List list3;
EXPECT_TRUE(list3.empty());
list3 = std::move(list2);
EXPECT_TRUE(list2.empty());
EXPECT_FALSE(list3.empty());
size_t id = 0;
list3.sweep(
[&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); });
}
TEST(AtomicIntrusiveLinkedList, Stress) {
constexpr size_t kNumThreads = 32;
constexpr size_t kNumElements = 100000;
std::vector<TestIntrusiveObject> elements;
for (size_t i = 0; i < kNumThreads * kNumElements; ++i) {
elements.emplace_back(i);
}
TestIntrusiveObject::List list;
std::vector<std::thread> threads;
for (size_t threadId = 0; threadId < kNumThreads; ++threadId) {
threads.emplace_back(
[threadId, kNumThreads, kNumElements, &list, &elements]() {
for (size_t id = 0; id < kNumElements; ++id) {
list.insertHead(&elements[threadId + kNumThreads * id]);
}
});
}
std::vector<size_t> ids;
TestIntrusiveObject* prev{nullptr};
while (ids.size() < kNumThreads * kNumThreads) {
list.sweep([&](TestIntrusiveObject* current) {
ids.push_back(current->id());
if (prev && prev->id() % kNumThreads == current->id() % kNumThreads) {
EXPECT_EQ(prev->id() + kNumThreads, current->id());
}
prev = current;
});
}
std::sort(ids.begin(), ids.end());
for (size_t i = 0; i < kNumThreads * kNumElements; ++i) {
EXPECT_EQ(i, ids[i]);
}
for (auto& thread : threads) {
thread.join();
}
}
class TestObject {
public:
TestObject(size_t id__, std::shared_ptr<void> ptr) : id_(id__), ptr_(ptr) {}
size_t id() { return id_; }
private:
size_t id_;
std::shared_ptr<void> ptr_;
};
TEST(AtomicLinkedList, Basic) {
constexpr size_t kNumElements = 10;
using List = folly::AtomicLinkedList<TestObject>;
List list;
std::shared_ptr<void> ptr = std::make_shared<int>(42);
for (size_t id = 0; id < kNumElements; ++id) {
list.insertHead({id, ptr});
}
size_t counter = 0;
list.sweep([&](TestObject object) {
EXPECT_EQ(counter, object.id());
EXPECT_EQ(1 + kNumElements - counter, ptr.use_count());
++counter;
});
EXPECT_TRUE(ptr.unique());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment