Commit 76745f68 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot 4

Cycle detection

Summary:
1. This implements a GraphCycleDetector which can check if newly added edge belongs to a cycle in a directed graph. GraphCycleDetector is used to detect cycles between Observers when creator function is run.
2. This also fixes a bug where new dependencies could be saved even if Observer creator failed.

Reviewed By: yfeldblum

Differential Revision: D3746743

fbshipit-source-id: 99d10446c56fa4d8f7485f38309e8a282cd21bdf
parent 9ae9af6f
......@@ -107,6 +107,7 @@ nobase_follyinclude_HEADERS = \
experimental/LockFreeRingBuffer.h \
experimental/NestedCommandLineApp.h \
experimental/observer/detail/Core.h \
experimental/observer/detail/GraphCycleDetector.h \
experimental/observer/detail/ObserverManager.h \
experimental/observer/detail/Observer-pre.h \
experimental/observer/Observable.h \
......
......@@ -41,12 +41,15 @@ Core::VersionedData Core::getData() {
size_t Core::refresh(size_t version, bool force) {
CHECK(ObserverManager::inManagerThread());
ObserverManager::DependencyRecorder::markRefreshDependency(*this);
SCOPE_EXIT {
ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
};
if (version_ >= version) {
return versionLastChange_;
}
bool refreshDependents = false;
{
std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
......@@ -57,11 +60,21 @@ size_t Core::refresh(size_t version, bool force) {
bool needRefresh = force || version_ == 0;
ObserverManager::DependencyRecorder dependencyRecorder(*this);
// This can be run in parallel, but we expect most updates to propagate
// bottom to top.
dependencies_.withRLock([&](const Dependencies& dependencies) {
for (const auto& dependency : dependencies) {
if (dependency->refresh(version) > version_) {
try {
if (dependency->refresh(version) > version_) {
needRefresh = true;
break;
}
} catch (...) {
LOG(ERROR) << "Exception while checking dependencies for updates: "
<< exceptionStr(std::current_exception());
needRefresh = true;
break;
}
......@@ -73,8 +86,6 @@ size_t Core::refresh(size_t version, bool force) {
return versionLastChange_;
}
ObserverManager::DependencyRecorder dependencyRecorder;
try {
{
VersionedData newData{creator_(), version};
......@@ -85,7 +96,6 @@ size_t Core::refresh(size_t version, bool force) {
}
versionLastChange_ = version;
refreshDependents = true;
} catch (...) {
LOG(ERROR) << "Exception while refreshing Observer: "
<< exceptionStr(std::current_exception());
......@@ -98,6 +108,10 @@ size_t Core::refresh(size_t version, bool force) {
version_ = version;
if (versionLastChange_ != version) {
return versionLastChange_;
}
auto newDependencies = dependencyRecorder.release();
dependencies_.withWLock([&](Dependencies& dependencies) {
for (const auto& dependency : newDependencies) {
......@@ -116,13 +130,11 @@ size_t Core::refresh(size_t version, bool force) {
});
}
if (refreshDependents) {
auto dependents = dependents_.copy();
auto dependents = dependents_.copy();
for (const auto& dependentWeak : dependents) {
if (auto dependent = dependentWeak.lock()) {
ObserverManager::scheduleRefresh(std::move(dependent), version);
}
for (const auto& dependentWeak : dependents) {
if (auto dependent = dependentWeak.lock()) {
ObserverManager::scheduleRefresh(std::move(dependent), version);
}
}
......
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unordered_map>
#include <unordered_set>
namespace folly {
namespace observer_detail {
template <typename NodeId>
class GraphCycleDetector {
using NodeSet = std::unordered_set<NodeId>;
public:
/**
* Add new edge. If edge creates a cycle then it's not added and false is
* returned.
*/
bool addEdge(NodeId from, NodeId to) {
// In general case DFS may be expensive here, but in most cases to-node will
// have no edges, so it should be O(1).
NodeSet visitedNodes;
dfs(visitedNodes, to);
if (visitedNodes.count(from)) {
return false;
}
auto& nodes = edges_[from];
DCHECK_EQ(0, nodes.count(to));
nodes.insert(to);
return true;
}
void removeEdge(NodeId from, NodeId to) {
auto& nodes = edges_[from];
DCHECK(nodes.count(to));
nodes.erase(to);
if (nodes.empty()) {
edges_.erase(from);
}
}
private:
void dfs(NodeSet& visitedNodes, NodeId node) {
// We don't terminate early if cycle is detected, because this is considered
// an error condition, so not worth optimizing for.
if (visitedNodes.count(node)) {
return;
}
visitedNodes.insert(node);
if (!edges_.count(node)) {
return;
}
for (const auto& to : edges_[node]) {
dfs(visitedNodes, to);
}
}
std::unordered_map<NodeId, NodeSet> edges_;
};
}
}
......@@ -16,6 +16,7 @@
#pragma once
#include <folly/experimental/observer/detail/Core.h>
#include <folly/experimental/observer/detail/GraphCycleDetector.h>
#include <folly/futures/Future.h>
namespace folly {
......@@ -109,9 +110,15 @@ class ObserverManager {
class DependencyRecorder {
public:
using Dependencies = std::unordered_set<Core::Ptr>;
using DependencySet = std::unordered_set<Core::Ptr>;
struct Dependencies {
explicit Dependencies(const Core& core_) : core(core_) {}
DependencyRecorder() {
DependencySet dependencies;
const Core& core;
};
explicit DependencyRecorder(const Core& core) : dependencies_(core) {
DCHECK(inManagerThread());
previousDepedencies_ = currentDependencies_;
......@@ -122,19 +129,47 @@ class ObserverManager {
DCHECK(inManagerThread());
DCHECK(currentDependencies_);
currentDependencies_->insert(std::move(dependency));
currentDependencies_->dependencies.insert(std::move(dependency));
}
static void markRefreshDependency(const Core& core) {
if (!currentDependencies_) {
return;
}
if (auto instance = getInstance()) {
instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
bool hasCycle =
!cycleDetector.addEdge(&currentDependencies_->core, &core);
if (hasCycle) {
throw std::logic_error("Observer cycle detected.");
}
});
}
}
static void unmarkRefreshDependency(const Core& core) {
if (!currentDependencies_) {
return;
}
if (auto instance = getInstance()) {
instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
cycleDetector.removeEdge(&currentDependencies_->core, &core);
});
}
}
Dependencies release() {
DependencySet release() {
DCHECK(currentDependencies_ == &dependencies_);
std::swap(currentDependencies_, previousDepedencies_);
previousDepedencies_ = nullptr;
return std::move(dependencies_);
return std::move(dependencies_.dependencies);
}
~DependencyRecorder() {
if (previousDepedencies_) {
if (currentDependencies_ == &dependencies_) {
release();
}
}
......@@ -176,6 +211,9 @@ class ObserverManager {
*/
SharedMutexReadPriority versionMutex_;
std::atomic<size_t> version_{1};
using CycleDetector = GraphCycleDetector<const Core*>;
folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
};
}
}
......@@ -148,6 +148,64 @@ TEST(Observer, NullValue) {
EXPECT_EQ(46, **oddObserver);
}
TEST(Observer, Cycle) {
SimpleObservable<int> observable(0);
auto observer = observable.getObserver();
folly::Optional<Observer<int>> observerB;
auto observerA = makeObserver([observer, &observerB]() {
auto value = **observer;
if (value == 1) {
**observerB;
}
return value;
});
observerB = makeObserver([observerA]() { return **observerA; });
auto collectObserver = makeObserver([observer, observerA, &observerB]() {
auto value = **observer;
auto valueA = **observerA;
auto valueB = ***observerB;
if (value == 1) {
if (valueA == 0) {
EXPECT_EQ(0, valueB);
} else {
EXPECT_EQ(1, valueA);
EXPECT_EQ(0, valueB);
}
} else if (value == 2) {
EXPECT_EQ(value, valueA);
EXPECT_TRUE(valueB == 0 || valueB == 2);
} else {
EXPECT_EQ(value, valueA);
EXPECT_EQ(value, valueB);
}
return value;
});
folly::Baton<> baton;
auto waitingObserver = makeObserver([collectObserver, &baton]() {
*collectObserver;
baton.post();
return folly::Unit();
});
baton.reset();
EXPECT_EQ(0, **collectObserver);
for (size_t i = 1; i <= 3; ++i) {
observable.setValue(i);
EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
baton.reset();
EXPECT_EQ(i, **collectObserver);
}
}
TEST(Observer, Stress) {
SimpleObservable<int> observable(0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment