Commit 2ae32be7 authored by Pranjal Raihan's avatar Pranjal Raihan Committed by Facebook GitHub Bot

Add AtomicObserver<T>

Summary:
An `AtomicObserver` is a read-optimized cache for an `Observer` value using `std::atomic`. This avoids creating a `shared_ptr` on every read (atomic increment/decrement on control block). The functionality is similar to that of `TLObserver` but uses significantly less memory if `std::atomic<T>` is valid.

The read path involves 2 atomic loads and a cache staleness check. A lock is used when the cache needs to be updated.

Reviewed By: zhxchen17

Differential Revision: D25254487

fbshipit-source-id: cfaf7c328c075f9f06c5c9d2d7bb2e1987285616
parent 48380667
......@@ -108,6 +108,54 @@ Observer<T> makeStaticObserver(std::shared_ptr<T> value) {
return makeObserver([value = std::move(value)] { return value; });
}
template <typename T>
AtomicObserver<T>::AtomicObserver(Observer<T> observer)
: observer_(std::move(observer)) {
refreshLock_.init();
}
template <typename T>
AtomicObserver<T>::AtomicObserver(const AtomicObserver<T>& other)
: AtomicObserver(other.observer_) {}
template <typename T>
AtomicObserver<T>::AtomicObserver(AtomicObserver<T>&& other) noexcept
: AtomicObserver(std::move(other.observer_)) {}
template <typename T>
AtomicObserver<T>& AtomicObserver<T>::operator=(
const AtomicObserver<T>& other) {
return *this = other.observer_;
}
template <typename T>
AtomicObserver<T>& AtomicObserver<T>::operator=(
AtomicObserver<T>&& other) noexcept {
return *this = std::move(other.observer_);
}
template <typename T>
AtomicObserver<T>& AtomicObserver<T>::operator=(Observer<T> observer) {
observer_ = std::move(observer);
cachedVersion_.store(0, std::memory_order_release);
return *this;
}
template <typename T>
T AtomicObserver<T>::get() const {
auto version = cachedVersion_.load(std::memory_order_acquire);
if (UNLIKELY(observer_.needRefresh(version))) {
std::lock_guard<folly::MicroLock> guard{refreshLock_};
version = cachedVersion_.load(std::memory_order_acquire);
if (LIKELY(observer_.needRefresh(version))) {
auto snapshot = *observer_;
cachedValue_.store(*snapshot, std::memory_order_relaxed);
cachedVersion_.store(snapshot.getVersion(), std::memory_order_release);
}
}
return cachedValue_.load(std::memory_order_relaxed);
}
template <typename T>
TLObserver<T>::TLObserver(Observer<T> observer)
: observer_(observer),
......
......@@ -16,6 +16,7 @@
#pragma once
#include <folly/MicroLock.h>
#include <folly/ThreadLocal.h>
#include <folly/experimental/ReadMostlySharedPtr.h>
#include <folly/experimental/observer/Observer-pre.h>
......@@ -63,20 +64,48 @@ namespace observer {
* Notice that a + b will be only called when either a or b is changed. Getting
* a snapshot from sumObserver won't trigger any re-computation.
*
* See AtomicObserver and TLObserver for optimized reads.
*
* TLObserver is very similar to Observer, but it also keeps a thread-local
* cache for the observed object.
* See ObserverCreator class if you want to wrap any existing subscription API
* in an Observer object.
*/
template <typename T>
class Observer;
/**
* An AtomicObserver provides read-optimized caching for an Observer using
* `std::atomic`. Reading only requires atomic loads unless the cached value
* is stale. If the cache needs to be refreshed, a mutex is used to
* synchronize the update. This avoids creating a shared_ptr for every read.
*
* AtomicObserver is ideal when there are lots of reads on a trivially-copyable
* type. if `std::atomic<T>` is not possible but you still want to optimize
* reads, consider a TLObserver.
*
* Observer<int> observer = ...;
* TLObserver<int> tlObserver(observer);
* auto& snapshot = *tlObserver;
* AtomicObserver<int> atomicObserver(observer);
* auto value = *atomicObserver;
*/
template <typename T>
class AtomicObserver;
/**
* A TLObserver provides read-optimized caching for an Observer using
* thread-local storage. This avoids creating a shared_ptr for every read.
*
* The functionality is similar to that of AtomicObserver except it allows types
* that don't support atomics. If possible, use AtomicObserver instead.
*
* See ObserverCreator class if you want to wrap any existing subscription API
* in an Observer object.
* TLObserver can consume significant amounts of memory if accessed from many
* threads. The problem is exacerbated if you chain several TLObservers.
* Therefore, TLObserver should be used sparingly.
*
* Observer<int> observer = ...;
* TLObserver<int> tlObserver(observer);
* auto& snapshot = *tlObserver;
*/
template <typename T>
class Observer;
class TLObserver;
template <typename T>
class Snapshot {
......@@ -151,7 +180,11 @@ class Observer {
*/
bool needRefresh(const Snapshot<T>& snapshot) const {
DCHECK_EQ(core_.get(), snapshot.core_);
return snapshot.getVersion() < core_->getVersionLastChange();
return needRefresh(snapshot.getVersion());
}
bool needRefresh(size_t version) const {
return version < core_->getVersionLastChange();
}
CallbackHandle addCallback(folly::Function<void(Snapshot<T>)> callback) const;
......@@ -222,6 +255,26 @@ Observer<T> makeStaticObserver(T value);
template <typename T>
Observer<T> makeStaticObserver(std::shared_ptr<T> value);
template <typename T>
class AtomicObserver {
public:
explicit AtomicObserver(Observer<T> observer);
AtomicObserver(const AtomicObserver<T>& other);
AtomicObserver(AtomicObserver<T>&& other) noexcept;
AtomicObserver<T>& operator=(const AtomicObserver<T>& other);
AtomicObserver<T>& operator=(AtomicObserver<T>&& other) noexcept;
AtomicObserver<T>& operator=(Observer<T> observer);
T get() const;
T operator*() const { return get(); }
private:
mutable std::atomic<T> cachedValue_{};
mutable std::atomic<size_t> cachedVersion_{};
mutable folly::MicroLock refreshLock_;
Observer<T> observer_;
};
template <typename T>
class TLObserver {
public:
......@@ -300,6 +353,19 @@ auto makeTLObserver(F&& creator) {
return makeTLObserver(makeObserver(std::forward<F>(creator)));
}
/**
* Same as makeObserver(...), but creates AtomicObserver.
*/
template <typename T>
AtomicObserver<T> makeAtomicObserver(Observer<T> observer) {
return AtomicObserver<T>(std::move(observer));
}
template <typename F>
auto makeAtomicObserver(F&& creator) {
return makeAtomicObserver(makeObserver(std::forward<F>(creator)));
}
template <typename T, bool CacheInThreadLocal>
struct ObserverTraits {};
......
......@@ -578,6 +578,39 @@ TEST(Observer, MakeStaticObserver) {
EXPECT_EQ(**implicitSharedPtrObserver, 5);
}
TEST(Observer, AtomicObserver) {
SimpleObservable<int> observable{42};
SimpleObservable<int> observable2{12};
AtomicObserver<int> observer{observable.getObserver()};
AtomicObserver<int> observerCopy{observer};
EXPECT_EQ(*observer, 42);
EXPECT_EQ(*observerCopy, 42);
observable.setValue(24);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*observer, 24);
EXPECT_EQ(*observerCopy, 24);
observer = observable2.getObserver();
EXPECT_EQ(*observer, 12);
EXPECT_EQ(*observerCopy, 24);
observable2.setValue(15);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*observer, 15);
EXPECT_EQ(*observerCopy, 24);
observerCopy = observer;
EXPECT_EQ(*observerCopy, 15);
auto dependentObserver =
makeAtomicObserver([o = observer] { return *o + 1; });
EXPECT_EQ(*dependentObserver, 16);
observable2.setValue(20);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*dependentObserver, 21);
}
TEST(Observer, Unwrap) {
SimpleObservable<bool> selectorObservable{true};
SimpleObservable<int> trueObservable{1};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment