Commit ed74c95c authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Split ObserverManager in leaky and non-leaky singletons

Summary: This allows makeObserver to work even during shutdown.

Reviewed By: yfeldblum

Differential Revision: D19136597

fbshipit-source-id: 17e946a66743a56d24904697c02ce7165bcaf397
parent 22ede92e
......@@ -43,7 +43,7 @@ constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
constexpr size_t kNextBatchSize{1024};
} // namespace
class ObserverManager::CurrentQueue {
class ObserverManager::UpdatesManager::CurrentQueue {
public:
CurrentQueue() {
if (FLAGS_observer_manager_pool_size < 1) {
......@@ -96,10 +96,12 @@ class ObserverManager::CurrentQueue {
std::vector<std::thread> threads_;
};
class ObserverManager::NextQueue {
class ObserverManager::UpdatesManager::NextQueue {
public:
explicit NextQueue(ObserverManager& manager) : manager_(manager) {
NextQueue() {
thread_ = std::thread([&]() {
auto& manager = getInstance();
folly::setThreadName(
folly::sformat("{}NQ", kObserverManagerThreadNamePrefix));
......@@ -119,7 +121,7 @@ class ObserverManager::NextQueue {
}
{
SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
SharedMutexReadPriority::WriteHolder wh(manager.versionMutex_);
// We can't pick more tasks from the queue after we bumped the
// version, so we have to do this while holding the lock.
......@@ -137,11 +139,11 @@ class ObserverManager::NextQueue {
corePtr->setForceRefresh();
}
++manager_.version_;
++manager.version_;
}
for (auto& core : cores) {
manager_.scheduleRefresh(std::move(core), manager_.version_);
manager.scheduleRefresh(std::move(core), manager.version_);
}
{
......@@ -182,60 +184,66 @@ class ObserverManager::NextQueue {
}
private:
ObserverManager& manager_;
UMPSCQueue<Core::WeakPtr, true> queue_;
std::thread thread_;
std::atomic<bool> stop_{false};
folly::Synchronized<std::vector<std::promise<void>>> emptyWaiters_;
};
ObserverManager::ObserverManager() {
ObserverManager::UpdatesManager::UpdatesManager() {
currentQueue_ = std::make_unique<CurrentQueue>();
nextQueue_ = std::make_unique<NextQueue>(*this);
nextQueue_ = std::make_unique<NextQueue>();
}
ObserverManager::~ObserverManager() {
ObserverManager::UpdatesManager::~UpdatesManager() {
// Destroy NextQueue, before the rest of this object, since it expects
// ObserverManager to be alive.
nextQueue_.reset();
currentQueue_.reset();
}
void ObserverManager::scheduleCurrent(Function<void()> task) {
void ObserverManager::UpdatesManager::scheduleCurrent(Function<void()> task) {
currentQueue_->add(std::move(task));
}
void ObserverManager::scheduleNext(Core::WeakPtr core) {
void ObserverManager::UpdatesManager::scheduleNext(Core::WeakPtr core) {
nextQueue_->add(std::move(core));
}
void ObserverManager::waitForAllUpdates() {
auto instance = getInstance();
if (!instance) {
return;
if (auto updatesManager = getUpdatesManager()) {
return updatesManager->waitForAllUpdates();
}
}
instance->nextQueue_->waitForEmpty();
void ObserverManager::UpdatesManager::waitForAllUpdates() {
auto& instance = ObserverManager::getInstance();
nextQueue_->waitForEmpty();
// Wait for all readers to release the lock.
SharedMutexReadPriority::WriteHolder wh(instance->versionMutex_);
SharedMutexReadPriority::WriteHolder wh(instance.versionMutex_);
}
struct ObserverManager::Singleton {
static folly::Singleton<ObserverManager> instance;
static folly::Singleton<UpdatesManager> instance;
// MSVC 2015 doesn't let us access ObserverManager's constructor if we
// try to use a lambda to initialize instance, so we have to create
// an actual function instead.
static ObserverManager* createManager() {
return new ObserverManager();
static UpdatesManager* createManager() {
return new UpdatesManager();
}
};
folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
createManager);
folly::Singleton<ObserverManager::UpdatesManager>
ObserverManager::Singleton::instance(createManager);
std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
std::shared_ptr<ObserverManager::UpdatesManager>
ObserverManager::getUpdatesManager() {
return Singleton::instance.try_get();
}
ObserverManager& ObserverManager::getInstance() {
static auto instance = new ObserverManager();
return *instance;
}
} // namespace observer_detail
} // namespace folly
......@@ -53,13 +53,7 @@ namespace observer_detail {
class ObserverManager {
public:
static size_t getVersion() {
auto instance = getInstance();
if (!instance) {
return 1;
}
return instance->version_;
return getInstance().version_;
}
static bool inManagerThread() {
......@@ -71,13 +65,15 @@ class ObserverManager {
return;
}
auto instance = getInstance();
auto updatesManager = getUpdatesManager();
if (!instance) {
if (!updatesManager) {
return;
}
SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
auto& instance = getInstance();
SharedMutexReadPriority::ReadHolder rh(instance.versionMutex_);
// TSAN assumes that the thread that locks the mutex must
// be the one that unlocks it. However, we are passing ownership of
......@@ -86,51 +82,47 @@ class ObserverManager {
// annotate that the thread has released the mutex, and then annotate
// the async thread as acquiring the mutex.
annotate_rwlock_released(
&instance->versionMutex_,
&instance.versionMutex_,
annotate_rwlock_level::rdlock,
__FILE__,
__LINE__);
instance->scheduleCurrent([core = std::move(core),
instancePtr = instance.get(),
rh = std::move(rh)]() {
// Make TSAN know that the current thread owns the read lock now.
annotate_rwlock_acquired(
&instancePtr->versionMutex_,
annotate_rwlock_level::rdlock,
__FILE__,
__LINE__);
core->refresh(instancePtr->version_);
});
updatesManager->scheduleCurrent(
[core = std::move(core), &instance, rh = std::move(rh)]() {
// Make TSAN know that the current thread owns the read lock now.
annotate_rwlock_acquired(
&instance.versionMutex_,
annotate_rwlock_level::rdlock,
__FILE__,
__LINE__);
core->refresh(instance.version_);
});
}
static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
auto instance = getInstance();
auto updatesManager = getUpdatesManager();
if (!instance) {
if (!updatesManager) {
return;
}
instance->scheduleNext(std::move(coreWeak));
updatesManager->scheduleNext(std::move(coreWeak));
}
static void initCore(Core::Ptr core) {
DCHECK(core->getVersion() == 0);
auto instance = getInstance();
if (!instance) {
throw std::logic_error("ObserverManager requested during shutdown");
}
auto& instance = getInstance();
auto inManagerThread = std::exchange(inManagerThread_, true);
SCOPE_EXIT {
inManagerThread_ = inManagerThread;
};
SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
SharedMutexReadPriority::ReadHolder rh(instance.versionMutex_);
core->refresh(instance->version_);
core->refresh(instance.version_);
}
static void waitForAllUpdates();
......@@ -168,15 +160,13 @@ class ObserverManager {
return;
}
if (auto instance = getInstance()) {
instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
bool hasCycle =
!cycleDetector.addEdge(&currentDependencies_->core, &core);
if (hasCycle) {
throw std::logic_error("Observer cycle detected.");
}
});
}
getInstance().cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
bool hasCycle =
!cycleDetector.addEdge(&currentDependencies_->core, &core);
if (hasCycle) {
throw std::logic_error("Observer cycle detected.");
}
});
}
static void unmarkRefreshDependency(const Core& core) {
......@@ -184,11 +174,9 @@ class ObserverManager {
return;
}
if (auto instance = getInstance()) {
instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
cycleDetector.removeEdge(&currentDependencies_->core, &core);
});
}
getInstance().cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
cycleDetector.removeEdge(&currentDependencies_->core, &core);
});
}
DependencySet release() {
......@@ -212,23 +200,28 @@ class ObserverManager {
static FOLLY_TLS Dependencies* currentDependencies_;
};
~ObserverManager();
private:
ObserverManager();
struct Singleton;
ObserverManager() {}
void scheduleCurrent(Function<void()>);
void scheduleNext(Core::WeakPtr);
class UpdatesManager {
public:
UpdatesManager();
~UpdatesManager();
void scheduleCurrent(Function<void()>);
void scheduleNext(Core::WeakPtr);
void waitForAllUpdates();
class CurrentQueue;
class NextQueue;
private:
class CurrentQueue;
class NextQueue;
std::unique_ptr<CurrentQueue> currentQueue_;
std::unique_ptr<NextQueue> nextQueue_;
std::unique_ptr<CurrentQueue> currentQueue_;
std::unique_ptr<NextQueue> nextQueue_;
};
struct Singleton;
static std::shared_ptr<ObserverManager> getInstance();
static ObserverManager& getInstance();
static std::shared_ptr<UpdatesManager> getUpdatesManager();
static FOLLY_TLS bool inManagerThread_;
/**
......
......@@ -16,6 +16,7 @@
#include <thread>
#include <folly/Singleton.h>
#include <folly/experimental/observer/SimpleObservable.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
......@@ -474,3 +475,9 @@ TEST(Observer, GetSnapshotOnManagerThread) {
finishBaton.post();
destructorBaton.wait();
}
TEST(Observer, Shutdown) {
folly::SingletonVault::singleton()->destroyInstances();
auto observer = folly::observer::makeObserver([] { return 42; });
EXPECT_EQ(42, **observer);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment