Commit 596aa895 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by facebook-github-bot-4

New ReadMostlySharedPtr implementation

Summary: This changes ReadMostlySharedPtr API to have 3 types: MainPtr, WeakPtr, SharedPtr. MainPtr and SharedPtr are equivalents of std::shared_ptr, and WeakPtr is an equivalent of std::weak_ptr. The only difference is that it can only be a single MainPtr, and while it's alive copying SharedPtr/WeakPtr or WeakPtr doesn't require atomic operations (and thus can be more performant than std::shared_ptr). Unlike original ReadMostlySharedPtr API, there're no thread-safety guarantees between reset() and getShared() for ReadMostlySharedPtr.

ReadMostlySharedPtr can work with different RefCount implementations. This diff introduces RCURefCount (which is currently using liburcu) and TLRefCount.

Reviewed By: djwatson

Differential Revision: D2683572

fb-gh-sync-id: a7a03af4b1cf5f81a613368c6eebe70b2eaef064
parent 7ee590ed
......@@ -249,7 +249,6 @@ nobase_follyinclude_HEADERS = \
Random.h \
Random-inl.h \
Range.h \
ReadMostlySharedPtr.h \
RWSpinLock.h \
ScopeGuard.h \
SharedMutex.h \
......
......@@ -116,11 +116,6 @@ void Random::secureRandom(void* data, size_t size) {
bufferedRandomDevice->get(data, size);
}
ThreadLocalPRNG::ThreadLocalPRNG() {
static folly::ThreadLocal<ThreadLocalPRNG::LocalInstancePRNG> localInstance;
local_ = localInstance.get();
}
class ThreadLocalPRNG::LocalInstancePRNG {
public:
LocalInstancePRNG() : rng(Random::create()) { }
......@@ -128,6 +123,11 @@ class ThreadLocalPRNG::LocalInstancePRNG {
Random::DefaultGenerator rng;
};
ThreadLocalPRNG::ThreadLocalPRNG() {
static folly::ThreadLocal<ThreadLocalPRNG::LocalInstancePRNG> localInstance;
local_ = localInstance.get();
}
uint32_t ThreadLocalPRNG::getImpl(LocalInstancePRNG* local) {
return local->rng();
}
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
#pragma once
#include <atomic>
#include <memory>
#include <folly/Optional.h>
#include <folly/ThreadLocal.h>
#include <folly/SpinLock.h>
namespace folly {
/**
* @file ReadMostlySharedPtr is a smart pointer that allows for high
* performance shared ownership of an object. In order to provide
* this, ReadMostlySharedPtr may potentially delay the destruction of
* a shared object for longer than a std::shared_ptr would, and
* depending on the implementation, may have slower updates.
*
* The load() method allows a reader to acquire a ReadPtr that
* maintains a reference to a single version of the object. Even if a
* writer calls store(), the ReadPtr will point to the version of the
* object that was in use at the time of the read. The old version of
* the object will only be destroyed after all outstanding ReadPtrs to
* that version have been destroyed.
*/
template<typename T,
typename Tag = void>
class ReadMostlySharedPtr {
public:
constexpr explicit ReadMostlySharedPtr(std::unique_ptr<T>&& ptr = nullptr)
: masterPtr_(std::move(ptr)) {}
/**
* Replaces the managed object.
*/
void store(std::unique_ptr<T>&& uptr) {
{
std::shared_ptr<T> ptr(std::move(uptr));
std::lock_guard<std::mutex> lock(mutex_);
// Swap to avoid calling ~T() under the lock
std::swap(masterPtr_, ptr);
}
{
// This also holds a lock that prevents destruction of thread cache
// entries, but not creation. If creating a thread cache entry for a new
// thread happens duting iteration, the entry is not guaranteed to
// be seen. It's fine for us: if load() created a new cache entry after
// we got accessor, it will see the updated pointer, so we don't need to
// clear the cache.
auto accessor = threadLocalCache_.accessAllThreads();
for (CachedPointer& local: accessor) {
std::lock_guard<folly::SpinLock> local_lock(local.lock);
// We could instead just assign masterPtr_ to local.ptr, but it's better
// if the thread allocates the Ptr for itself - the allocator is more
// likely to place its reference counter in a region optimal for access
// from that thread.
local.ptr.clear();
}
}
}
class ReadPtr {
friend class ReadMostlySharedPtr;
public:
ReadPtr() {}
void reset() {
ref_ = nullptr;
ptr_.reset();
}
explicit operator bool() const {
return (ref_ != nullptr);
}
bool operator ==(T* ptr) const {
return ref_ == ptr;
}
bool operator ==(std::nullptr_t) const {
return ref_ == nullptr;
}
T* operator->() const { return ref_; }
T& operator*() const { return *ref_; }
T* get() const { return ref_; }
private:
explicit ReadPtr(std::shared_ptr<T>& ptr)
: ptr_(ptr)
, ref_(ptr.get()) {}
std::shared_ptr<T> ptr_;
T* ref_{nullptr};
};
/**
* Returns a shared_ptr to the managed object.
*/
ReadPtr load() const {
auto& local = *threadLocalCache_;
std::lock_guard<folly::SpinLock> local_lock(local.lock);
if (!local.ptr.hasValue()) {
std::lock_guard<std::mutex> lock(mutex_);
if (!masterPtr_) {
local.ptr.emplace(nullptr);
} else {
// The following expression is tricky.
//
// It creates a shared_ptr<shared_ptr<T>> that points to a copy of
// masterPtr_. The reference counter of this shared_ptr<shared_ptr<T>>
// will normally only be modified from this thread, which avoids
// cache line bouncing. (Though the caller is free to pass the pointer
// to other threads and bump reference counter from there)
//
// Then this shared_ptr<shared_ptr<T>> is turned into shared_ptr<T>.
// This means that the returned shared_ptr<T> will internally point to
// control block of the shared_ptr<shared_ptr<T>>, but will dereference
// to T, not shared_ptr<T>.
local.ptr = makeCachedCopy(masterPtr_);
}
}
// The return statement makes the copy before destroying local variables,
// so local.ptr is only accessed under local.lock here.
return ReadPtr(local.ptr.value());
}
private:
// non copyable
ReadMostlySharedPtr(const ReadMostlySharedPtr&) = delete;
ReadMostlySharedPtr& operator=(const ReadMostlySharedPtr&) = delete;
struct CachedPointer {
folly::Optional<std::shared_ptr<T>> ptr;
folly::SpinLock lock;
};
std::shared_ptr<T> masterPtr_;
// Instead of using Tag as tag for ThreadLocal, effectively use pair (T, Tag),
// which is more granular.
struct ThreadLocalTag {};
mutable folly::ThreadLocal<CachedPointer, ThreadLocalTag> threadLocalCache_;
// Ensures safety between concurrent store() and load() calls
mutable std::mutex mutex_;
std::shared_ptr<T>
makeCachedCopy(const std::shared_ptr<T> &ptr) const {
// For std::shared_ptr wrap a copy in another std::shared_ptr to
// avoid cache line bouncing.
//
// The following expression is tricky.
//
// It creates a shared_ptr<shared_ptr<T>> that points to a copy of
// masterPtr_. The reference counter of this shared_ptr<shared_ptr<T>>
// will normally only be modified from this thread, which avoids
// cache line bouncing. (Though the caller is free to pass the pointer
// to other threads and bump reference counter from there)
//
// Then this shared_ptr<shared_ptr<T>> is turned into shared_ptr<T>.
// This means that the returned shared_ptr<T> will internally point to
// control block of the shared_ptr<shared_ptr<T>>, but will dereference
// to T, not shared_ptr<T>.
return std::shared_ptr<T>(
std::make_shared<std::shared_ptr<T>>(ptr), ptr.get());
}
};
}
......@@ -59,7 +59,13 @@ template<class T, class Tag> class ThreadLocalPtr;
template<class T, class Tag=void>
class ThreadLocal {
public:
constexpr ThreadLocal() {}
constexpr ThreadLocal() : constructor_([]() {
return new T();
}) {}
explicit ThreadLocal(std::function<T*()> constructor) :
constructor_(constructor) {
}
T* get() const {
T* ptr = tlp_.get();
......@@ -98,12 +104,13 @@ class ThreadLocal {
ThreadLocal& operator=(const ThreadLocal&) = delete;
T* makeTlp() const {
T* ptr = new T();
auto ptr = constructor_();
tlp_.reset(ptr);
return ptr;
}
mutable ThreadLocalPtr<T,Tag> tlp_;
std::function<T*()> constructor_;
};
/*
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ThreadLocal.h>
#include <folly/experimental/RCUUtils.h>
namespace folly {
class RCURefCount {
public:
using Int = int64_t;
RCURefCount() :
localCount_([&]() {
return new LocalRefCount(globalCount_);
}) {
}
~RCURefCount() noexcept {
assert(state_ == State::GLOBAL);
assert(globalCount_.load() == 0);
}
// This can't increment from 0.
Int operator++() noexcept {
auto& localCount = *localCount_;
std::lock_guard<RCUReadLock> lg(RCUReadLock::instance());
if (LIKELY(state_ == State::LOCAL)) {
++localCount;
return 42;
} else if (state_ == State::GLOBAL_TRANSITION) {
++globalCount_;
return 42;
} else {
auto globalCount = globalCount_.load();
do {
if (!globalCount) {
return 0;
}
} while (!globalCount_.compare_exchange_weak(globalCount,
globalCount + 1));
return globalCount + 1;
}
}
Int operator--() noexcept {
auto& localCount = *localCount_;
std::lock_guard<RCUReadLock> lg(RCUReadLock::instance());
if (LIKELY(state_ == State::LOCAL)) {
--localCount;
return 42;
} else {
auto value = --globalCount_;
if (state_ == State::GLOBAL) {
assert(value >= 0);
return value;
} else {
return 42;
}
}
}
Int operator*() const {
std::lock_guard<RCUReadLock> lg(RCUReadLock::instance());
if (state_ == State::GLOBAL) {
return globalCount_;
}
return 42;
}
void useGlobal() noexcept {
state_ = State::GLOBAL_TRANSITION;
synchronize_rcu();
// At this point everyone is using the global count
auto accessor = localCount_.accessAllThreads();
for (auto& count : accessor) {
count.collect();
}
state_ = State::GLOBAL;
synchronize_rcu();
// After this ++ or -- can return 0.
}
private:
using AtomicInt = std::atomic<Int>;
enum class State {
LOCAL,
GLOBAL_TRANSITION,
GLOBAL
};
class LocalRefCount {
public:
explicit LocalRefCount(AtomicInt& globalCount) :
count_(0),
globalCount_(globalCount) {
RCURegisterThread();
}
~LocalRefCount() {
collect();
}
void collect() {
globalCount_ += count_;
count_ = 0;
}
void operator++() {
++count_;
}
void operator--() {
--count_;
}
private:
Int count_;
AtomicInt& globalCount_;
};
std::atomic<State> state_{State::LOCAL};
folly::ThreadLocal<LocalRefCount, RCURefCount> localCount_;
std::atomic<int64_t> globalCount_{1};
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/RCUUtils.h>
#include <folly/Portability.h>
#include <folly/ThreadLocal.h>
namespace folly {
namespace {
struct RCURegisterThreadHelper {
RCURegisterThreadHelper() {
rcu_register_thread();
}
~RCURegisterThreadHelper() {
rcu_unregister_thread();
}
bool alive{false};
};
}
bool RCURegisterThread() {
static folly::ThreadLocal<RCURegisterThreadHelper>* rcuRegisterThreadHelper =
new folly::ThreadLocal<RCURegisterThreadHelper>();
auto& helper = **rcuRegisterThreadHelper;
auto ret = !helper.alive;
helper.alive = true;
return ret;
}
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <urcu.h>
namespace folly {
/**
* This must be called at least once from any thread, which uses RCUReadLock.
* First call should happen before RCUReadLock is used for the first time. Can
* be safely called more that once.
*
* Returns true when called for the first time from current thread.
*/
bool RCURegisterThread();
class RCUReadLock {
public:
static RCUReadLock& instance() {
// Both lock and unlock are static, so no need to worry about destruction
// order
static RCUReadLock instance;
return instance;
}
static void lock() {
assert(RCURegisterThread() == false);
rcu_read_lock();
}
static void unlock() {
rcu_read_unlock();
}
private:
RCUReadLock() {}
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
#pragma once
#include <atomic>
#include <folly/experimental/RCURefCount.h>
#include <folly/experimental/TLRefCount.h>
namespace folly {
template <typename T, typename RefCount>
class ReadMostlyMainPtr;
template <typename T, typename RefCount>
class ReadMostlyWeakPtr;
template <typename T, typename RefCount>
class ReadMostlySharedPtr;
using DefaultRefCount = TLRefCount;
namespace detail {
template <typename T, typename RefCount = DefaultRefCount>
class ReadMostlySharedPtrCore {
public:
T* get() {
return ptrRaw_;
}
std::shared_ptr<T> getShared() {
return ptr_;
}
bool incref() {
return ++count_ > 0;
}
void decref() {
if (--count_ == 0) {
ptrRaw_ = nullptr;
ptr_.reset();
decrefWeak();
}
}
void increfWeak() {
auto value = ++weakCount_;
assert(value > 0);
}
void decrefWeak() {
if (--weakCount_ == 0) {
delete this;
}
}
size_t useCount() const {
return *count_;
}
~ReadMostlySharedPtrCore() noexcept {
assert(*count_ == 0);
assert(*weakCount_ == 0);
}
private:
friend class ReadMostlyMainPtr<T, RefCount>;
explicit ReadMostlySharedPtrCore(std::shared_ptr<T> ptr) :
ptrRaw_(ptr.get()),
ptr_(std::move(ptr)) {
}
T* ptrRaw_;
RefCount count_;
RefCount weakCount_;
std::shared_ptr<T> ptr_;
};
}
template <typename T, typename RefCount = DefaultRefCount>
class ReadMostlyMainPtr {
public:
ReadMostlyMainPtr() {
}
explicit ReadMostlyMainPtr(std::shared_ptr<T> ptr) {
reset(std::move(ptr));
}
ReadMostlyMainPtr(const ReadMostlyMainPtr&) = delete;
ReadMostlyMainPtr& operator=(const ReadMostlyMainPtr&) = delete;
ReadMostlyMainPtr(ReadMostlyMainPtr&& other) noexcept {
*this = std::move(other);
}
ReadMostlyMainPtr& operator=(ReadMostlyMainPtr&& other) noexcept {
std::swap(impl_, other.impl_);
return *this;
}
bool operator==(const ReadMostlyMainPtr<T, RefCount>& other) const {
return get() == other.get();
}
bool operator==(T* other) const {
return get() == other;
}
bool operator==(const ReadMostlySharedPtr<T, RefCount>& other) const {
return get() == other.get();
}
~ReadMostlyMainPtr() noexcept {
reset();
}
void reset() noexcept {
if (impl_) {
impl_->count_.useGlobal();
impl_->weakCount_.useGlobal();
impl_->decref();
impl_ = nullptr;
}
}
void reset(std::shared_ptr<T> ptr) {
reset();
if (ptr) {
impl_ = new detail::ReadMostlySharedPtrCore<T, RefCount>(std::move(ptr));
}
}
T* get() const {
if (impl_) {
return impl_->ptrRaw_;
} else {
return nullptr;
}
}
std::shared_ptr<T> getStdShared() {
if (impl_) {
return impl_->ptr_;
} else {
return {};
}
}
T& operator*() const {
return *get();
}
T* operator->() const {
return get();
}
ReadMostlySharedPtr<T, RefCount> getShared() const {
return ReadMostlySharedPtr<T, RefCount>(*this);
}
explicit operator bool() const {
return impl_ != nullptr;
}
private:
friend class ReadMostlyWeakPtr<T, RefCount>;
friend class ReadMostlySharedPtr<T, RefCount>;
detail::ReadMostlySharedPtrCore<T, RefCount>* impl_{nullptr};
};
template <typename T, typename RefCount = DefaultRefCount>
class ReadMostlyWeakPtr {
public:
ReadMostlyWeakPtr() {}
explicit ReadMostlyWeakPtr(const ReadMostlyMainPtr<T, RefCount>& mainPtr) {
reset(mainPtr.impl_);
}
ReadMostlyWeakPtr(const ReadMostlyWeakPtr& other) {
*this = other;
}
ReadMostlyWeakPtr& operator=(const ReadMostlyWeakPtr& other) {
reset(other.impl_);
return *this;
}
ReadMostlyWeakPtr(ReadMostlyWeakPtr&& other) noexcept {
*this = other;
}
ReadMostlyWeakPtr& operator=(ReadMostlyWeakPtr&& other) noexcept {
std::swap(impl_, other.impl_);
return *this;
}
~ReadMostlyWeakPtr() noexcept {
reset(nullptr);
}
ReadMostlySharedPtr<T, RefCount> lock() {
return ReadMostlySharedPtr<T, RefCount>(*this);
}
private:
friend class ReadMostlySharedPtr<T, RefCount>;
void reset(detail::ReadMostlySharedPtrCore<T, RefCount>* impl) {
if (impl_) {
impl_->decrefWeak();
}
impl_ = impl;
if (impl_) {
impl_->increfWeak();
}
}
detail::ReadMostlySharedPtrCore<T, RefCount>* impl_{nullptr};
};
template <typename T, typename RefCount = DefaultRefCount>
class ReadMostlySharedPtr {
public:
ReadMostlySharedPtr() {}
explicit ReadMostlySharedPtr(const ReadMostlyWeakPtr<T, RefCount>& weakPtr) {
reset(weakPtr.impl_);
}
// Generally, this shouldn't be used.
explicit ReadMostlySharedPtr(const ReadMostlyMainPtr<T, RefCount>& mainPtr) {
reset(mainPtr.impl_);
}
ReadMostlySharedPtr(const ReadMostlySharedPtr& other) {
*this = other;
}
ReadMostlySharedPtr& operator=(const ReadMostlySharedPtr& other) {
reset(other.impl_);
return *this;
}
ReadMostlySharedPtr& operator=(const ReadMostlyWeakPtr<T, RefCount>& other) {
reset(other.impl_);
return *this;
}
ReadMostlySharedPtr& operator=(const ReadMostlyMainPtr<T, RefCount>& other) {
reset(other.impl_);
return *this;
}
ReadMostlySharedPtr(ReadMostlySharedPtr&& other) noexcept {
*this = std::move(other);
}
~ReadMostlySharedPtr() noexcept {
reset(nullptr);
}
ReadMostlySharedPtr& operator=(ReadMostlySharedPtr&& other) noexcept {
std::swap(ptr_, other.ptr_);
std::swap(impl_, other.impl_);
return *this;
}
bool operator==(const ReadMostlyMainPtr<T, RefCount>& other) const {
return get() == other.get();
}
bool operator==(T* other) const {
return get() == other;
}
bool operator==(const ReadMostlySharedPtr<T, RefCount>& other) const {
return get() == other.get();
}
void reset() {
reset(nullptr);
}
T* get() const {
return ptr_;
}
std::shared_ptr<T> getStdShared() const {
if (impl_) {
return impl_->ptr_;
} else {
return {};
}
}
T& operator*() const {
return *get();
}
T* operator->() const {
return get();
}
size_t use_count() const {
return impl_->useCount();
}
bool unique() const {
return use_count() == 1;
}
explicit operator bool() const {
return impl_ != nullptr;
}
private:
void reset(detail::ReadMostlySharedPtrCore<T, RefCount>* impl) {
if (impl_) {
impl_->decref();
impl_ = nullptr;
ptr_ = nullptr;
}
if (impl && impl->incref()) {
impl_ = impl;
ptr_ = impl->get();
}
}
T* ptr_{nullptr};
detail::ReadMostlySharedPtrCore<T, RefCount>* impl_{nullptr};
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ThreadLocal.h>
namespace folly {
class TLRefCount {
public:
using Int = int64_t;
TLRefCount() :
localCount_([&]() {
return new LocalRefCount(*this);
}) {
}
~TLRefCount() noexcept {
assert(globalCount_.load() == 0);
assert(state_.load() == State::GLOBAL);
}
// This can't increment from 0.
Int operator++() noexcept {
auto& localCount = *localCount_;
if (++localCount) {
return 42;
}
if (state_.load() == State::GLOBAL_TRANSITION) {
std::lock_guard<std::mutex> lg(globalMutex_);
}
assert(state_.load() == State::GLOBAL);
auto value = globalCount_.load();
do {
if (value == 0) {
return 0;
}
} while (!globalCount_.compare_exchange_weak(value, value+1));
return value + 1;
}
Int operator--() noexcept {
auto& localCount = *localCount_;
if (--localCount) {
return 42;
}
if (state_.load() == State::GLOBAL_TRANSITION) {
std::lock_guard<std::mutex> lg(globalMutex_);
}
assert(state_.load() == State::GLOBAL);
return --globalCount_;
}
Int operator*() const {
if (state_ != State::GLOBAL) {
return 42;
}
return globalCount_.load();
}
void useGlobal() noexcept {
std::lock_guard<std::mutex> lg(globalMutex_);
state_ = State::GLOBAL_TRANSITION;
auto accessor = localCount_.accessAllThreads();
for (auto& count : accessor) {
count.collect();
}
state_ = State::GLOBAL;
}
private:
using AtomicInt = std::atomic<Int>;
enum class State {
LOCAL,
GLOBAL_TRANSITION,
GLOBAL
};
class LocalRefCount {
public:
explicit LocalRefCount(TLRefCount& refCount) :
refCount_(refCount) {}
~LocalRefCount() {
collect();
}
void collect() {
std::lock_guard<std::mutex> lg(collectMutex_);
if (collectDone_) {
return;
}
collectCount_ = count_;
refCount_.globalCount_ += collectCount_;
collectDone_ = true;
}
bool operator++() {
return update(1);
}
bool operator--() {
return update(-1);
}
private:
bool update(Int delta) {
if (UNLIKELY(refCount_.state_.load() != State::LOCAL)) {
return false;
}
auto count = count_ += delta;
if (UNLIKELY(refCount_.state_.load() != State::LOCAL)) {
std::lock_guard<std::mutex> lg(collectMutex_);
if (!collectDone_) {
return true;
}
if (collectCount_ != count) {
return false;
}
}
return true;
}
Int count_{0};
TLRefCount& refCount_;
std::mutex collectMutex_;
Int collectCount_{0};
bool collectDone_;
};
std::atomic<State> state_{State::LOCAL};
folly::ThreadLocal<LocalRefCount, TLRefCount> localCount_;
std::atomic<int64_t> globalCount_{1};
std::mutex globalMutex_;
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
#include <thread>
#include <iostream>
#include <folly/Benchmark.h>
#include <folly/Memory.h>
#include <gflags/gflags.h>
#include <folly/experimental/ReadMostlySharedPtr.h>
template <template<typename> class MainPtr,
template<typename> class WeakPtr,
size_t threadCount>
void benchmark(size_t n) {
MainPtr<int> mainPtr(folly::make_unique<int>(42));
std::vector<std::thread> ts;
for (size_t t = 0; t < threadCount; ++t) {
ts.emplace_back([&]() {
WeakPtr<int> weakPtr(mainPtr);
for (size_t i = 0; i < n; ++i) {
weakPtr.lock();
}
});
}
for (auto& t: ts) {
t.join();
}
}
template <typename T>
using RCUMainPtr = folly::ReadMostlyMainPtr<T, folly::RCURefCount>;
template <typename T>
using RCUWeakPtr = folly::ReadMostlyWeakPtr<T, folly::RCURefCount>;
template <typename T>
using TLMainPtr = folly::ReadMostlyMainPtr<T, folly::TLRefCount>;
template <typename T>
using TLWeakPtr = folly::ReadMostlyWeakPtr<T, folly::TLRefCount>;
BENCHMARK(WeakPtrOneThread, n) {
benchmark<std::shared_ptr, std::weak_ptr, 1>(n);
}
BENCHMARK(WeakPtrFourThreads, n) {
benchmark<std::shared_ptr, std::weak_ptr, 4>(n);
}
BENCHMARK(RCUReadMostlyWeakPtrOneThread, n) {
benchmark<RCUMainPtr, RCUWeakPtr, 1>(n);
}
BENCHMARK(RCUReadMostlyWeakPtrFourThreads, n) {
benchmark<RCUMainPtr, RCUWeakPtr, 4>(n);
}
BENCHMARK(TLReadMostlyWeakPtrOneThread, n) {
benchmark<TLMainPtr, TLWeakPtr, 1>(n);
}
BENCHMARK(TLReadMostlyWeakPtrFourThreads, n) {
benchmark<TLMainPtr, TLWeakPtr, 4>(n);
}
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
gflags::SetCommandLineOptionWithMode(
"bm_min_usec", "100000", gflags::SET_FLAG_IF_DEFAULT
);
folly::runBenchmarks();
return 0;
}
......@@ -22,8 +22,11 @@
#include <condition_variable>
#include <gtest/gtest.h>
#include <folly/ReadMostlySharedPtr.h>
#include <folly/Baton.h>
#include <folly/experimental/ReadMostlySharedPtr.h>
using folly::ReadMostlyMainPtr;
using folly::ReadMostlyWeakPtr;
using folly::ReadMostlySharedPtr;
// send SIGALRM to test process after this many seconds
......@@ -56,86 +59,69 @@ struct TestObject {
class Coordinator {
public:
void requestAndWait() {
{
std::lock_guard<std::mutex> lock(mutex);
assert(!is_requested);
assert(!is_completed);
is_requested = true;
}
cv.notify_all();
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return is_completed; });
}
requestBaton_.post();
completeBaton_.wait();
}
void waitForRequest() {
std::unique_lock<std::mutex> lock(mutex);
assert(!is_completed);
cv.wait(lock, [&] { return is_requested; });
folly::RCURegisterThread();
requestBaton_.wait();
}
void completed() {
{
std::lock_guard<std::mutex> lock(mutex);
assert(is_requested);
is_completed = true;
}
cv.notify_all();
completeBaton_.post();
}
private:
bool is_requested = false;
bool is_completed = false;
std::condition_variable cv;
std::mutex mutex;
folly::Baton<> requestBaton_;
folly::Baton<> completeBaton_;
};
TEST_F(ReadMostlySharedPtrTest, BasicStores) {
ReadMostlySharedPtr<TestObject> ptr;
ReadMostlyMainPtr<TestObject> ptr;
// Store 1.
std::atomic<int> cnt1{0};
ptr.store(folly::make_unique<TestObject>(1, cnt1));
ptr.reset(folly::make_unique<TestObject>(1, cnt1));
EXPECT_EQ(1, cnt1.load());
// Store 2, check that 1 is destroyed.
std::atomic<int> cnt2{0};
ptr.store(folly::make_unique<TestObject>(2, cnt2));
ptr.reset(folly::make_unique<TestObject>(2, cnt2));
EXPECT_EQ(1, cnt2.load());
EXPECT_EQ(0, cnt1.load());
// Store nullptr, check that 2 is destroyed.
ptr.store(nullptr);
ptr.reset(nullptr);
EXPECT_EQ(0, cnt2.load());
}
TEST_F(ReadMostlySharedPtrTest, BasicLoads) {
std::atomic<int> cnt2{0};
ReadMostlySharedPtr<TestObject>::ReadPtr x;
ReadMostlySharedPtr<TestObject> x;
{
ReadMostlySharedPtr<TestObject> ptr;
ReadMostlyMainPtr<TestObject> ptr;
// Check that ptr is initially nullptr.
EXPECT_EQ(ptr.load(), nullptr);
EXPECT_EQ(ptr.get(), nullptr);
std::atomic<int> cnt1{0};
ptr.store(folly::make_unique<TestObject>(1, cnt1));
ptr.reset(folly::make_unique<TestObject>(1, cnt1));
EXPECT_EQ(1, cnt1.load());
x = ptr.load();
x = ptr;
EXPECT_EQ(1, x->value);
ptr.store(folly::make_unique<TestObject>(2, cnt2));
ptr.reset(folly::make_unique<TestObject>(2, cnt2));
EXPECT_EQ(1, cnt2.load());
EXPECT_EQ(1, cnt1.load());
x = ptr.load();
x = ptr;
EXPECT_EQ(2, x->value);
EXPECT_EQ(0, cnt1.load());
ptr.store(nullptr);
ptr.reset(nullptr);
EXPECT_EQ(1, cnt2.load());
}
......@@ -149,55 +135,55 @@ TEST_F(ReadMostlySharedPtrTest, LoadsFromThreads) {
std::atomic<int> cnt{0};
{
ReadMostlySharedPtr<TestObject> ptr;
ReadMostlyMainPtr<TestObject> ptr;
Coordinator loads[7];
std::thread t1([&] {
loads[0].waitForRequest();
EXPECT_EQ(ptr.load(), nullptr);
EXPECT_EQ(ptr.getShared(), nullptr);
loads[0].completed();
loads[3].waitForRequest();
EXPECT_EQ(2, ptr.load()->value);
EXPECT_EQ(2, ptr.getShared()->value);
loads[3].completed();
loads[4].waitForRequest();
EXPECT_EQ(4, ptr.load()->value);
EXPECT_EQ(4, ptr.getShared()->value);
loads[4].completed();
loads[5].waitForRequest();
EXPECT_EQ(5, ptr.load()->value);
EXPECT_EQ(5, ptr.getShared()->value);
loads[5].completed();
});
std::thread t2([&] {
loads[1].waitForRequest();
EXPECT_EQ(1, ptr.load()->value);
EXPECT_EQ(1, ptr.getShared()->value);
loads[1].completed();
loads[2].waitForRequest();
EXPECT_EQ(2, ptr.load()->value);
EXPECT_EQ(2, ptr.getShared()->value);
loads[2].completed();
loads[6].waitForRequest();
EXPECT_EQ(5, ptr.load()->value);
EXPECT_EQ(5, ptr.getShared()->value);
loads[6].completed();
});
loads[0].requestAndWait();
ptr.store(folly::make_unique<TestObject>(1, cnt));
ptr.reset(folly::make_unique<TestObject>(1, cnt));
loads[1].requestAndWait();
ptr.store(folly::make_unique<TestObject>(2, cnt));
ptr.reset(folly::make_unique<TestObject>(2, cnt));
loads[2].requestAndWait();
loads[3].requestAndWait();
ptr.store(folly::make_unique<TestObject>(3, cnt));
ptr.store(folly::make_unique<TestObject>(4, cnt));
ptr.reset(folly::make_unique<TestObject>(3, cnt));
ptr.reset(folly::make_unique<TestObject>(4, cnt));
loads[4].requestAndWait();
ptr.store(folly::make_unique<TestObject>(5, cnt));
ptr.reset(folly::make_unique<TestObject>(5, cnt));
loads[5].requestAndWait();
loads[6].requestAndWait();
......@@ -213,27 +199,27 @@ TEST_F(ReadMostlySharedPtrTest, LoadsFromThreads) {
TEST_F(ReadMostlySharedPtrTest, Ctor) {
std::atomic<int> cnt1{0};
{
ReadMostlySharedPtr<TestObject> ptr(
ReadMostlyMainPtr<TestObject> ptr(
folly::make_unique<TestObject>(1, cnt1));
EXPECT_EQ(1, ptr.load()->value);
EXPECT_EQ(1, ptr.getShared()->value);
}
EXPECT_EQ(0, cnt1.load());
}
TEST_F(ReadMostlySharedPtrTest, ClearingCache) {
ReadMostlySharedPtr<TestObject> ptr;
ReadMostlyMainPtr<TestObject> ptr;
// Store 1.
std::atomic<int> cnt1{0};
ptr.store(folly::make_unique<TestObject>(1, cnt1));
ptr.reset(folly::make_unique<TestObject>(1, cnt1));
Coordinator c;
std::thread t([&] {
// Cache the pointer for this thread.
ptr.load();
ptr.getShared();
c.requestAndWait();
});
......@@ -243,140 +229,10 @@ TEST_F(ReadMostlySharedPtrTest, ClearingCache) {
// Store 2 and check that 1 is destroyed.
std::atomic<int> cnt2{0};
ptr.store(folly::make_unique<TestObject>(2, cnt2));
ptr.reset(folly::make_unique<TestObject>(2, cnt2));
EXPECT_EQ(0, cnt1.load());
// Unblock thread.
c.completed();
t.join();
}
TEST_F(ReadMostlySharedPtrTest, SlowDestructor) {
struct Thingy {
Coordinator* dtor;
Thingy(Coordinator* dtor = nullptr) : dtor(dtor) {}
~Thingy() {
if (dtor) {
dtor->requestAndWait();
}
}
};
Coordinator dtor;
ReadMostlySharedPtr<Thingy> ptr;
ptr.store(folly::make_unique<Thingy>(&dtor));
std::thread t([&] {
// This will block in ~Thingy().
ptr.store(folly::make_unique<Thingy>());
});
// Wait until store() in thread calls ~T().
dtor.waitForRequest();
// Do a store while another store() is stuck in ~T().
ptr.store(folly::make_unique<Thingy>());
// Let the other store() go.
dtor.completed();
t.join();
}
TEST_F(ReadMostlySharedPtrTest, StressTest) {
const int ptr_count = 2;
const int thread_count = 5;
const std::chrono::milliseconds duration(100);
const std::chrono::milliseconds upd_delay(1);
const std::chrono::milliseconds respawn_delay(1);
struct Instance {
std::atomic<int> value{0};
std::atomic<int> prev_value{0};
ReadMostlySharedPtr<TestObject> ptr;
};
struct Thread {
std::thread t;
std::atomic<bool> shutdown{false};
};
std::atomic<int> counter(0);
std::vector<Instance> instances(ptr_count);
std::vector<Thread> threads(thread_count);
std::atomic<int> seed(0);
// Threads that call load() and checking value.
auto thread_func = [&](int t) {
pthread_setname_np(pthread_self(),
("load" + folly::to<std::string>(t)).c_str());
std::mt19937 rnd(++seed);
while (!threads[t].shutdown.load()) {
Instance& instance = instances[rnd() % instances.size()];
int val1 = instance.prev_value.load();
auto p = instance.ptr.load();
int val = p ? p->value : 0;
int val2 = instance.value.load();
EXPECT_LE(val1, val);
EXPECT_LE(val, val2);
}
};
for (size_t t = 0; t < threads.size(); ++t) {
threads[t].t = std::thread(thread_func, t);
}
std::atomic<bool> shutdown(false);
// Thread that calls store() occasionally.
std::thread update_thread([&] {
pthread_setname_np(pthread_self(), "store");
std::mt19937 rnd(++seed);
while (!shutdown.load()) {
Instance& instance = instances[rnd() % instances.size()];
int val = ++instance.value;
instance.ptr.store(folly::make_unique<TestObject>(val, counter));
++instance.prev_value;
/* sleep override */
std::this_thread::sleep_for(upd_delay);
}
});
// Thread that joins and spawns load() threads occasionally.
std::thread respawn_thread([&] {
pthread_setname_np(pthread_self(), "respawn");
std::mt19937 rnd(++seed);
while (!shutdown.load()) {
int t = rnd() % threads.size();
threads[t].shutdown.store(true);
threads[t].t.join();
threads[t].shutdown.store(false);
threads[t].t = std::thread(thread_func, t);
/* sleep override */
std::this_thread::sleep_for(respawn_delay);
}
});
// Let all of this run for some time.
/* sleep override */
std::this_thread::sleep_for(duration);
// Shut all of this down.
shutdown.store(true);
update_thread.join();
respawn_thread.join();
for (auto& t: threads) {
t.shutdown.store(true);
t.t.join();
}
for (auto& instance: instances) {
instance.ptr.store(nullptr);
EXPECT_EQ(instance.value.load(), instance.prev_value.load());
}
EXPECT_EQ(0, counter.load());
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <folly/Benchmark.h>
#include <folly/experimental/RCURefCount.h>
#include <folly/experimental/TLRefCount.h>
namespace folly {
template <typename Counter>
void shutdown(Counter&) {
}
void shutdown(RCURefCount& c) {
c.useGlobal();
--c;
}
void shutdown(TLRefCount& c) {
c.useGlobal();
--c;
}
template <typename Counter, size_t threadCount>
void benchmark(size_t n) {
Counter x;
std::vector<std::thread> ts;
for (size_t t = 0; t < threadCount; ++t) {
ts.emplace_back([&]() {
for (size_t i = 0; i < n; ++i) {
++x;
}
for (size_t i = 0; i < n; ++i) {
--x;
}
});
}
for (auto& t: ts) {
t.join();
}
shutdown(x);
}
BENCHMARK(atomicOneThread, n) {
benchmark<std::atomic<RCURefCount::Int>, 1>(n);
}
BENCHMARK(atomicFourThreads, n) {
benchmark<std::atomic<RCURefCount::Int>, 4>(n);
}
BENCHMARK(RCURefCountOneThread, n) {
benchmark<RCURefCount, 1>(n);
}
BENCHMARK(RCURefCountFourThreads, n) {
benchmark<RCURefCount, 4>(n);
}
BENCHMARK(TLRefCountOneThread, n) {
benchmark<TLRefCount, 1>(n);
}
BENCHMARK(TLRefCountFourThreads, n) {
benchmark<TLRefCount, 4>(n);
}
}
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
gflags::SetCommandLineOptionWithMode(
"bm_min_usec", "100000", gflags::SET_FLAG_IF_DEFAULT
);
folly::runBenchmarks();
return 0;
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <folly/Baton.h>
#include <folly/experimental/RCURefCount.h>
#include <folly/experimental/TLRefCount.h>
#include <gtest/gtest.h>
namespace folly {
template <typename RefCount>
void basicTest() {
constexpr size_t numIters = 100000;
constexpr size_t numThreads = 10;
size_t got0 = 0;
RefCount count;
folly::Baton<> b;
std::vector<std::thread> ts;
for (size_t t = 0; t < numThreads; ++t) {
ts.emplace_back([&count, &b, &got0, numIters, t]() {
for (size_t i = 0; i < numIters; ++i) {
auto ret = ++count;
EXPECT_TRUE(ret > 1);
}
if (t == 0) {
b.post();
}
for (size_t i = 0; i < numIters; ++i) {
auto ret = --count;
if (ret == 0) {
++got0;
EXPECT_EQ(numIters - 1, i);
}
}
});
}
b.wait();
count.useGlobal();
EXPECT_TRUE(--count > 0);
for (auto& t: ts) {
t.join();
}
EXPECT_EQ(1, got0);
EXPECT_EQ(0, ++count);
EXPECT_EQ(0, ++count);
}
TEST(RCURefCount, Basic) {
basicTest<RCURefCount>();
}
TEST(TLRefCount, Basic) {
basicTest<TLRefCount>();
}
}
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment