Commit d5d78479 authored by Margot Leibold's avatar Margot Leibold Committed by Facebook GitHub Bot

Implement ReadMostlyTLObserver

Summary: Implement a thread local Observer that optimizes on the shared_ptr copy by using ReadMostlySharedPtr

Reviewed By: andriigrynenko

Differential Revision: D24439652

fbshipit-source-id: 9174be7f3f2fc34bdc79bd83eab571972b0c057d
parent 31563f4c
......@@ -118,6 +118,38 @@ const Snapshot<T>& TLObserver<T>::getSnapshotRef() const {
return snapshot;
}
template <typename T>
ReadMostlyTLObserver<T>::ReadMostlyTLObserver(Observer<T> observer)
: observer_(observer),
callback_(
observer_.addCallback([this](folly::observer::Snapshot<T> snapshot) {
globalData_.lock()->reset(snapshot.getShared());
globalVersion_ = snapshot.getVersion();
})) {}
template <typename T>
ReadMostlyTLObserver<T>::ReadMostlyTLObserver(
const ReadMostlyTLObserver<T>& other)
: ReadMostlyTLObserver(other.observer_) {}
template <typename T>
folly::ReadMostlySharedPtr<const T> ReadMostlyTLObserver<T>::getShared() const {
if (localSnapshot_->version_ == globalVersion_.load()) {
if (auto data = localSnapshot_->data_.lock()) {
return data;
}
}
return refresh();
}
template <typename T>
folly::ReadMostlySharedPtr<const T> ReadMostlyTLObserver<T>::refresh() const {
auto version = globalVersion_.load();
auto globalData = globalData_.lock();
*localSnapshot_ = LocalSnapshot(*globalData, version);
return globalData->getShared();
}
struct CallbackHandle::Context {
Optional<Observer<folly::Unit>> observer;
Synchronized<bool> canceled{false};
......
......@@ -17,6 +17,7 @@
#pragma once
#include <folly/ThreadLocal.h>
#include <folly/experimental/ReadMostlySharedPtr.h>
#include <folly/experimental/observer/Observer-pre.h>
#include <folly/experimental/observer/detail/Core.h>
......@@ -226,6 +227,57 @@ class TLObserver {
folly::ThreadLocal<Snapshot<T>> snapshot_;
};
/**
* A TLObserver that optimizes for getting shared_ptr to data
*/
template <typename T>
class ReadMostlyTLObserver {
public:
explicit ReadMostlyTLObserver(Observer<T> observer);
ReadMostlyTLObserver(const ReadMostlyTLObserver<T>& other);
folly::ReadMostlySharedPtr<const T> getShared() const;
private:
folly::ReadMostlySharedPtr<const T> refresh() const;
struct LocalSnapshot {
LocalSnapshot() {}
LocalSnapshot(
const folly::ReadMostlyMainPtr<const T>& data,
int64_t version)
: data_(data), version_(version) {}
folly::ReadMostlyWeakPtr<const T> data_;
int64_t version_;
};
Observer<T> observer_;
folly::Synchronized<folly::ReadMostlyMainPtr<const T>, std::mutex>
globalData_;
std::atomic<int64_t> globalVersion_;
folly::ThreadLocal<LocalSnapshot> localSnapshot_;
// Construct callback last so that it's joined before members it may
// be accessing are destructed
CallbackHandle callback_;
};
/**
* Same as makeObserver(...), but creates ReadMostlyTLObserver.
*/
template <typename T>
ReadMostlyTLObserver<T> makeReadMostlyTLObserver(Observer<T> observer) {
return ReadMostlyTLObserver<T>(std::move(observer));
}
template <typename F>
auto makeReadMostlyTLObserver(F&& creator) {
return makeReadMostlyTLObserver(makeObserver(std::forward<F>(creator)));
}
/**
* Same as makeObserver(...), but creates TLObserver.
*/
......
......@@ -283,6 +283,33 @@ TEST(Observer, TLObserver) {
EXPECT_EQ(41, ***k);
}
TEST(ReadMostlyTLObserver, ReadMostlyTLObserver) {
auto createReadMostlyTLObserver = [](int value) {
return folly::observer::makeReadMostlyTLObserver([=] { return value; });
};
auto k = std::make_unique<folly::observer::ReadMostlyTLObserver<int>>(
createReadMostlyTLObserver(42));
EXPECT_EQ(42, *k->getShared());
k = std::make_unique<folly::observer::ReadMostlyTLObserver<int>>(
createReadMostlyTLObserver(41));
EXPECT_EQ(41, *k->getShared());
}
TEST(ReadMostlyTLObserver, Update) {
SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
ReadMostlyTLObserver readMostlyObserver(observer);
EXPECT_EQ(*readMostlyObserver.getShared(), 42);
observable.setValue(24);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*readMostlyObserver.getShared(), 24);
}
TEST(Observer, SubscribeCallback) {
static auto mainThreadId = std::this_thread::get_id();
static std::function<void()> updatesCob;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment