Commit 3d63fc96 authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

AtomicCoreCachedSharedPtr

Summary: A folly::atomic_shared_ptr version of CoreCachedSharedPtr.

Reviewed By: yfeldblum

Differential Revision: D5389603

fbshipit-source-id: 942700cd66f5f5219418f4c6112146dc40351aa0
parent e6fa347e
...@@ -20,7 +20,9 @@ ...@@ -20,7 +20,9 @@
#include <memory> #include <memory>
#include <folly/Enumerate.h> #include <folly/Enumerate.h>
#include <folly/concurrency/AtomicSharedPtr.h>
#include <folly/concurrency/CacheLocality.h> #include <folly/concurrency/CacheLocality.h>
#include <folly/experimental/hazptr/hazptr.h>
namespace folly { namespace folly {
...@@ -82,4 +84,66 @@ class CoreCachedWeakPtr { ...@@ -82,4 +84,66 @@ class CoreCachedWeakPtr {
std::array<std::weak_ptr<T>, kNumSlots> slots_; std::array<std::weak_ptr<T>, kNumSlots> slots_;
}; };
/**
* This class creates core-local caches for a given shared_ptr, to
* mitigate contention when acquiring/releasing it.
*
* All methods are threadsafe. Hazard pointers are used to avoid
* use-after-free for concurrent reset() and get() operations.
*
* Concurrent reset()s are sequenced with respect to each other: the
* sharded shared_ptrs will always all be set to the same value.
* get()s will never see a newer pointer on one core, and an older
* pointer on another after a subsequent thread migration.
*/
template <class T, size_t kNumSlots = 64>
class AtomicCoreCachedSharedPtr {
public:
explicit AtomicCoreCachedSharedPtr(const std::shared_ptr<T>& p = nullptr) {
reset(p);
}
~AtomicCoreCachedSharedPtr() {
auto slots = slots_.load(std::memory_order_acquire);
// Delete of AtomicCoreCachedSharedPtr must be synchronized, no
// need for stlots->retire().
if (slots) {
delete slots;
}
}
void reset(const std::shared_ptr<T>& p = nullptr) {
auto newslots = folly::make_unique<Slots>();
// Allocate each Holder in a different CoreAllocator stripe to
// prevent false sharing. Their control blocks will be adjacent
// thanks to allocate_shared().
for (auto slot : folly::enumerate(newslots->slots_)) {
auto alloc = getCoreAllocatorStl<Holder, kNumSlots>(slot.index);
auto holder = std::allocate_shared<Holder>(alloc, p);
*slot = std::shared_ptr<T>(holder, p.get());
}
auto oldslots = slots_.exchange(newslots.release());
if (oldslots) {
oldslots->retire();
}
}
std::shared_ptr<T> get() const {
folly::hazptr::hazptr_holder hazptr;
auto slots = hazptr.get_protected(slots_);
if (!slots) {
return nullptr;
}
return (slots->slots_)[AccessSpreader<>::current(kNumSlots)];
}
private:
using Holder = std::shared_ptr<T>;
struct Slots : folly::hazptr::hazptr_obj_base<Slots> {
std::array<std::shared_ptr<T>, kNumSlots> slots_;
};
std::atomic<Slots*> slots_{nullptr};
};
} // namespace } // namespace
...@@ -80,6 +80,13 @@ void benchmarkWeakPtrLock(size_t numThreads, size_t iters) { ...@@ -80,6 +80,13 @@ void benchmarkWeakPtrLock(size_t numThreads, size_t iters) {
parallelRun([&] { return wp.lock(); }, numThreads, iters); parallelRun([&] { return wp.lock(); }, numThreads, iters);
} }
void benchmarkAtomicSharedPtrCopy(size_t numThreads, size_t iters) {
auto s = std::make_shared<int>(1);
folly::atomic_shared_ptr<int> p;
p.store(s);
parallelRun([&] { return p.load(); }, numThreads, iters);
}
void benchmarkCoreCachedSharedPtrGet(size_t numThreads, size_t iters) { void benchmarkCoreCachedSharedPtrGet(size_t numThreads, size_t iters) {
folly::CoreCachedSharedPtr<int> p(std::make_shared<int>(1)); folly::CoreCachedSharedPtr<int> p(std::make_shared<int>(1));
parallelRun([&] { return p.get(); }, numThreads, iters); parallelRun([&] { return p.get(); }, numThreads, iters);
...@@ -91,6 +98,11 @@ void benchmarkCoreCachedWeakPtrLock(size_t numThreads, size_t iters) { ...@@ -91,6 +98,11 @@ void benchmarkCoreCachedWeakPtrLock(size_t numThreads, size_t iters) {
parallelRun([&] { return wp.get().lock(); }, numThreads, iters); parallelRun([&] { return wp.get().lock(); }, numThreads, iters);
} }
void benchmarkAtomicCoreCachedSharedPtrGet(size_t numThreads, size_t iters) {
folly::AtomicCoreCachedSharedPtr<int> p(std::make_shared<int>(1));
parallelRun([&] { return p.get(); }, numThreads, iters);
}
} // namespace } // namespace
BENCHMARK(SharedPtrSingleThread, n) { BENCHMARK(SharedPtrSingleThread, n) {
...@@ -99,12 +111,18 @@ BENCHMARK(SharedPtrSingleThread, n) { ...@@ -99,12 +111,18 @@ BENCHMARK(SharedPtrSingleThread, n) {
BENCHMARK(WeakPtrSingleThread, n) { BENCHMARK(WeakPtrSingleThread, n) {
benchmarkWeakPtrLock(1, n); benchmarkWeakPtrLock(1, n);
} }
BENCHMARK(AtomicSharedPtrSingleThread, n) {
benchmarkAtomicSharedPtrCopy(1, n);
}
BENCHMARK(CoreCachedSharedPtrSingleThread, n) { BENCHMARK(CoreCachedSharedPtrSingleThread, n) {
benchmarkCoreCachedSharedPtrGet(1, n); benchmarkCoreCachedSharedPtrGet(1, n);
} }
BENCHMARK(CoreCachedWeakPtrSingleThread, n) { BENCHMARK(CoreCachedWeakPtrSingleThread, n) {
benchmarkCoreCachedWeakPtrLock(1, n); benchmarkCoreCachedWeakPtrLock(1, n);
} }
BENCHMARK(AtomicCoreCachedSharedPtrSingleThread, n) {
benchmarkAtomicCoreCachedSharedPtrGet(1, n);
}
BENCHMARK_DRAW_LINE(); BENCHMARK_DRAW_LINE();
...@@ -114,12 +132,18 @@ BENCHMARK(SharedPtr4Threads, n) { ...@@ -114,12 +132,18 @@ BENCHMARK(SharedPtr4Threads, n) {
BENCHMARK(WeakPtr4Threads, n) { BENCHMARK(WeakPtr4Threads, n) {
benchmarkWeakPtrLock(4, n); benchmarkWeakPtrLock(4, n);
} }
BENCHMARK(AtomicSharedPtr4Threads, n) {
benchmarkAtomicSharedPtrCopy(4, n);
}
BENCHMARK(CoreCachedSharedPtr4Threads, n) { BENCHMARK(CoreCachedSharedPtr4Threads, n) {
benchmarkCoreCachedSharedPtrGet(4, n); benchmarkCoreCachedSharedPtrGet(4, n);
} }
BENCHMARK(CoreCachedWeakPtr4Threads, n) { BENCHMARK(CoreCachedWeakPtr4Threads, n) {
benchmarkCoreCachedWeakPtrLock(4, n); benchmarkCoreCachedWeakPtrLock(4, n);
} }
BENCHMARK(AtomicCoreCachedSharedPtr4Threads, n) {
benchmarkAtomicCoreCachedSharedPtrGet(4, n);
}
BENCHMARK_DRAW_LINE(); BENCHMARK_DRAW_LINE();
...@@ -129,12 +153,39 @@ BENCHMARK(SharedPtr16Threads, n) { ...@@ -129,12 +153,39 @@ BENCHMARK(SharedPtr16Threads, n) {
BENCHMARK(WeakPtr16Threads, n) { BENCHMARK(WeakPtr16Threads, n) {
benchmarkWeakPtrLock(16, n); benchmarkWeakPtrLock(16, n);
} }
BENCHMARK(AtomicSharedPtr16Threads, n) {
benchmarkAtomicSharedPtrCopy(16, n);
}
BENCHMARK(CoreCachedSharedPtr16Threads, n) { BENCHMARK(CoreCachedSharedPtr16Threads, n) {
benchmarkCoreCachedSharedPtrGet(16, n); benchmarkCoreCachedSharedPtrGet(16, n);
} }
BENCHMARK(CoreCachedWeakPtr16Threads, n) { BENCHMARK(CoreCachedWeakPtr16Threads, n) {
benchmarkCoreCachedWeakPtrLock(16, n); benchmarkCoreCachedWeakPtrLock(16, n);
} }
BENCHMARK(AtomicCoreCachedSharedPtr16Threads, n) {
benchmarkAtomicCoreCachedSharedPtrGet(16, n);
}
BENCHMARK_DRAW_LINE();
BENCHMARK(SharedPtrSingleThreadReset, n) {
auto p = std::make_shared<int>(1);
parallelRun([&] { p = std::make_shared<int>(1); }, 1, n);
}
BENCHMARK(AtomicSharedPtrSingleThreadReset, n) {
auto s = std::make_shared<int>(1);
folly::atomic_shared_ptr<int> p;
p.store(s);
parallelRun([&] { p.store(std::make_shared<int>(1)); }, 1, n);
}
BENCHMARK(CoreCachedSharedPtrSingleThreadReset, n) {
folly::CoreCachedSharedPtr<int> p(std::make_shared<int>(1));
parallelRun([&] { p.reset(std::make_shared<int>(1)); }, 1, n);
}
BENCHMARK(AtomicCoreCachedSharedPtrSingleThreadReset, n) {
folly::AtomicCoreCachedSharedPtr<int> p(std::make_shared<int>(1));
parallelRun([&] { p.reset(std::make_shared<int>(1)); }, 1, n);
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment