Commit bde0b49f authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Fix a race when a leaf update is missed

Summary:
If multiple leaves are updated at the same time (or a new observer is created concurrently that depends on the updated leaf) it's possible for refresh to be called concurrently with force=false which will not update the leaf value, but also ignore the refresh with force=true.
This is fixed by making force to be the property of the leaf observer itself, which is set when the version is bumped.

Reviewed By: yfeldblum

Differential Revision: D14222400

fbshipit-source-id: 657c2c273002f576dbe48a232eaef2d530cea07d
parent 017cd274
...@@ -41,7 +41,7 @@ Core::VersionedData Core::getData() { ...@@ -41,7 +41,7 @@ Core::VersionedData Core::getData() {
return data_.copy(); return data_.copy();
} }
size_t Core::refresh(size_t version, bool force) { size_t Core::refresh(size_t version) {
CHECK(ObserverManager::inManagerThread()); CHECK(ObserverManager::inManagerThread());
ObserverManager::DependencyRecorder::markRefreshDependency(*this); ObserverManager::DependencyRecorder::markRefreshDependency(*this);
...@@ -61,7 +61,7 @@ size_t Core::refresh(size_t version, bool force) { ...@@ -61,7 +61,7 @@ size_t Core::refresh(size_t version, bool force) {
return versionLastChange_; return versionLastChange_;
} }
bool needRefresh = force || version_ == 0; bool needRefresh = std::exchange(forceRefresh_, false) || version_ == 0;
ObserverManager::DependencyRecorder dependencyRecorder(*this); ObserverManager::DependencyRecorder dependencyRecorder(*this);
...@@ -143,6 +143,10 @@ size_t Core::refresh(size_t version, bool force) { ...@@ -143,6 +143,10 @@ size_t Core::refresh(size_t version, bool force) {
return versionLastChange_; return versionLastChange_;
} }
void Core::setForceRefresh() {
forceRefresh_ = true;
}
Core::Core(folly::Function<std::shared_ptr<const void>()> creator) Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
: creator_(std::move(creator)) {} : creator_(std::move(creator)) {}
......
...@@ -81,12 +81,17 @@ class Core : public std::enable_shared_from_this<Core> { ...@@ -81,12 +81,17 @@ class Core : public std::enable_shared_from_this<Core> {
/** /**
* Check if the observed object needs to be re-computed. Returns the version * Check if the observed object needs to be re-computed. Returns the version
* of last change. If force is true, re-computes the observed object, even if * of last change.
* dependencies didn't change.
* *
* This should be only called from ObserverManager thread. * This should be only called from ObserverManager thread.
*/ */
size_t refresh(size_t version, bool force = false); size_t refresh(size_t version);
/**
* Force the next call to refresh to unconditionally re-compute the observed
* object, even if dependencies didn't change.
*/
void setForceRefresh();
~Core(); ~Core();
...@@ -110,6 +115,8 @@ class Core : public std::enable_shared_from_this<Core> { ...@@ -110,6 +115,8 @@ class Core : public std::enable_shared_from_this<Core> {
folly::Function<std::shared_ptr<const void>()> creator_; folly::Function<std::shared_ptr<const void>()> creator_;
std::mutex refreshMutex_; std::mutex refreshMutex_;
bool forceRefresh_{false};
}; };
} // namespace observer_detail } // namespace observer_detail
} // namespace folly } // namespace folly
...@@ -137,11 +137,15 @@ class ObserverManager::NextQueue { ...@@ -137,11 +137,15 @@ class ObserverManager::NextQueue {
} }
} }
for (auto& corePtr : cores) {
corePtr->setForceRefresh();
}
++manager_.version_; ++manager_.version_;
} }
for (auto& core : cores) { for (auto& core : cores) {
manager_.scheduleRefresh(std::move(core), manager_.version_, true); manager_.scheduleRefresh(std::move(core), manager_.version_);
} }
{ {
......
...@@ -65,8 +65,7 @@ class ObserverManager { ...@@ -65,8 +65,7 @@ class ObserverManager {
return inManagerThread_; return inManagerThread_;
} }
static void static void scheduleRefresh(Core::Ptr core, size_t minVersion) {
scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
if (core->getVersion() >= minVersion) { if (core->getVersion() >= minVersion) {
return; return;
} }
...@@ -93,8 +92,7 @@ class ObserverManager { ...@@ -93,8 +92,7 @@ class ObserverManager {
instance->scheduleCurrent([core = std::move(core), instance->scheduleCurrent([core = std::move(core),
instancePtr = instance.get(), instancePtr = instance.get(),
rh = std::move(rh), rh = std::move(rh)]() {
force]() {
// Make TSAN know that the current thread owns the read lock now. // Make TSAN know that the current thread owns the read lock now.
annotate_rwlock_acquired( annotate_rwlock_acquired(
&instancePtr->versionMutex_, &instancePtr->versionMutex_,
...@@ -102,7 +100,7 @@ class ObserverManager { ...@@ -102,7 +100,7 @@ class ObserverManager {
__FILE__, __FILE__,
__LINE__); __LINE__);
core->refresh(instancePtr->version_, force); core->refresh(instancePtr->version_);
}); });
} }
...@@ -131,7 +129,7 @@ class ObserverManager { ...@@ -131,7 +129,7 @@ class ObserverManager {
SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_); SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
core->refresh(instance->version_, false); core->refresh(instance->version_);
} }
static void waitForAllUpdates(); static void waitForAllUpdates();
......
...@@ -248,6 +248,27 @@ TEST(Observer, Stress) { ...@@ -248,6 +248,27 @@ TEST(Observer, Stress) {
}); });
} }
TEST(Observer, StressMultipleUpdates) {
SimpleObservable<int> observable1(0);
SimpleObservable<int> observable2(0);
auto observer = makeObserver(
[o1 = observable1.getObserver(), o2 = observable2.getObserver()]() {
return (**o1) * (**o2);
});
EXPECT_EQ(0, **observer);
constexpr size_t numIters = 10000;
for (size_t i = 1; i <= numIters; ++i) {
observable1.setValue(i);
observable2.setValue(i);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(i * i, **observer);
}
}
TEST(Observer, TLObserver) { TEST(Observer, TLObserver) {
auto createTLObserver = [](int value) { auto createTLObserver = [](int value) {
return folly::observer::makeTLObserver([=] { return value; }); return folly::observer::makeTLObserver([=] { return value; });
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment