Commit 1cc6c6d9 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook GitHub Bot

Make Observable update code run on the observer internal thread

Summary: Previously update() could be run both directly from within the update notification callback (that can happen in any context while holding some application locks), but also while observer is initially created from the observable. This change moves the update logic to be called on the observer internal thread, making the implementation even more resilient to various locks that may be held by the application.

Reviewed By: yfeldblum

Differential Revision: D32338088

fbshipit-source-id: 557f47c98469a951ce4b5513ce96e2b3a5d8ba6b
parent 3a9d2d47
......@@ -48,7 +48,7 @@ class ObserverCreatorContext {
return state->value;
}
void update() {
observer_detail::Core::Ptr update() {
// This mutex ensures there's no race condition between initial update()
// call and update() calls from the subsciption callback.
//
......@@ -66,12 +66,13 @@ class ObserverCreatorContext {
auto state = state_.lock();
if (!state->updateValue(std::move(newValue))) {
// Value didn't change, so we can skip the version update.
return;
return nullptr;
}
if (!std::exchange(state->updateRequested, true)) {
observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
return coreWeak_.lock();
}
return nullptr;
}
template <typename F>
......@@ -157,16 +158,22 @@ ObserverCreator<Observable, Traits>::getObserver() && {
[context = std::move(contextPrimary)]() { return context->get(); });
context_->setCore(observer.core_);
context_->subscribe([contextWeak = std::move(contextWeak)] {
if (auto context = contextWeak.lock()) {
context->update();
}
});
auto scheduleUpdate = [contextWeak = std::move(contextWeak)] {
observer_detail::ObserverManager::scheduleRefreshNewVersion(
[contextWeak]() -> observer_detail::Core::Ptr {
if (auto context = contextWeak.lock()) {
return context->update();
}
return nullptr;
});
};
context_->subscribe(scheduleUpdate);
// Do an extra update in case observable was updated between observer creation
// and setting updates callback.
context_->update();
context_.reset();
scheduleUpdate();
return observer;
}
......
......@@ -103,35 +103,29 @@ class ObserverManager::UpdatesManager::NextQueue {
folly::setThreadName(
folly::sformat("{}NQ", kObserverManagerThreadNamePrefix));
Core::WeakPtr queueCoreWeak;
Function<Core::Ptr()> queueCoreFunc;
while (true) {
queue_.dequeue(queueCoreWeak);
if (stop_) {
return;
}
std::vector<Core::Ptr> cores;
{
if (auto queueCore = queueCoreWeak.lock()) {
cores.emplace_back(std::move(queueCore));
queue_.dequeue(queueCoreFunc);
do {
if (stop_) {
return;
}
}
{
if (auto queueCore = queueCoreFunc()) {
cores.emplace_back(std::move(queueCore));
}
}
} while (cores.size() < kNextBatchSize &&
queue_.try_dequeue(queueCoreFunc));
{
SharedMutexReadPriority::WriteHolder wh(manager.versionMutex_);
// We can't pick more tasks from the queue after we bumped the
// version, so we have to do this while holding the lock.
while (cores.size() < kNextBatchSize &&
queue_.try_dequeue(queueCoreWeak)) {
if (stop_) {
return;
}
if (auto queueCore = queueCoreWeak.lock()) {
cores.emplace_back(std::move(queueCore));
}
}
for (auto& corePtr : cores) {
corePtr->setForceRefresh();
......@@ -159,12 +153,14 @@ class ObserverManager::UpdatesManager::NextQueue {
});
}
void add(Core::WeakPtr core) { queue_.enqueue(std::move(core)); }
void add(Function<Core::Ptr()> coreFunc) {
queue_.enqueue(std::move(coreFunc));
}
~NextQueue() {
stop_ = true;
// Write to the queue to notify the thread.
queue_.enqueue(Core::WeakPtr());
queue_.enqueue([]() -> Core::Ptr { return nullptr; });
thread_.join();
}
......@@ -174,13 +170,13 @@ class ObserverManager::UpdatesManager::NextQueue {
emptyWaiters_.wlock()->push_back(std::move(promise));
// Write to the queue to notify the thread.
queue_.enqueue(Core::WeakPtr());
queue_.enqueue([]() -> Core::Ptr { return nullptr; });
future.get();
}
private:
UMPSCQueue<Core::WeakPtr, true> queue_;
UMPSCQueue<Function<Core::Ptr()>, true> queue_;
std::thread thread_;
std::atomic<bool> stop_{false};
folly::Synchronized<std::vector<std::promise<void>>> emptyWaiters_;
......@@ -202,8 +198,9 @@ void ObserverManager::UpdatesManager::scheduleCurrent(Function<void()> task) {
currentQueue_->add(std::move(task));
}
void ObserverManager::UpdatesManager::scheduleNext(Core::WeakPtr core) {
nextQueue_->add(std::move(core));
void ObserverManager::UpdatesManager::scheduleNext(
Function<Core::Ptr()> coreFunc) {
nextQueue_->add(std::move(coreFunc));
}
void ObserverManager::waitForAllUpdates() {
......
......@@ -80,14 +80,14 @@ class ObserverManager {
});
}
static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
static void scheduleRefreshNewVersion(Function<Core::Ptr()> coreFunc) {
auto updatesManager = getUpdatesManager();
if (!updatesManager) {
return;
}
updatesManager->scheduleNext(std::move(coreWeak));
updatesManager->scheduleNext(std::move(coreFunc));
}
static void initCore(Core::Ptr core) {
......@@ -200,7 +200,7 @@ class ObserverManager {
UpdatesManager();
~UpdatesManager();
void scheduleCurrent(Function<void()>);
void scheduleNext(Core::WeakPtr);
void scheduleNext(Function<Core::Ptr()>);
void waitForAllUpdates();
private:
......
......@@ -320,15 +320,17 @@ TEST(Observer, SubscribeCallback) {
folly::observer::ObserverCreator<Observable, Traits>().getObserver();
EXPECT_TRUE(updatesCob);
EXPECT_EQ(2, getCallsStart);
EXPECT_EQ(2, getCallsFinish);
EXPECT_GE(2, getCallsStart);
EXPECT_GE(2, getCallsFinish);
updatesCob();
EXPECT_EQ(3, getCallsStart);
EXPECT_EQ(3, getCallsFinish);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(3, getCallsStart);
EXPECT_EQ(3, getCallsFinish);
slowGet = true;
cobThread = std::thread([] { updatesCob(); });
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment