Commit ea244c69 authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot

ConcurrentHashMap: Use synchronization/hazptr and hazptr_obj_base_linked

Summary:
- Use the hazard pointer library under folly/synchronization.
  - NodeT derived from hazptr_obj_base_linked.
  - Bucket heads are hazptr_root-s with dtor that automatically unlinks nodes
  - Provides support for uncertain removal of NodeT-s both from NodeT and Buckets
  - As intended, hazptr_cleanup reclaims all unprotected removed objects even if they have not been determined to be retirable at the start of cleanup.

Fixes the following:
- The pattern of calling h.get_protected(node->next_), when hazptr_holder h is protecting node is unsafe because try_protect (inside get_protected) may fail and we end protecting neither node nor next. This diff uses a different hazptr_holder to protect the next node. A correct pattern is to call h2.get_protected(node->next_) then h.swap(h2); h2.reset();
- The pattern of calling h2.reset(node) then h1.reset() to continue protecting a node already protected by h1 is unsafe because h2's protection may be too late after node was retired. A correct pattern is h2.swap(h1).

Reviewed By: djwatson

Differential Revision: D7708325

fbshipit-source-id: 617dcfe19410071888abafa32f0e1ada9d123983
parent 8fb5c152
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include <folly/Optional.h> #include <folly/Optional.h>
#include <folly/concurrency/detail/ConcurrentHashMap-detail.h> #include <folly/concurrency/detail/ConcurrentHashMap-detail.h>
#include <folly/experimental/hazptr/hazptr.h> #include <folly/synchronization/Hazptr.h>
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
*/ */
#pragma once #pragma once
#include <folly/experimental/hazptr/hazptr.h> #include <folly/synchronization/Hazptr.h>
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
...@@ -25,7 +25,7 @@ namespace detail { ...@@ -25,7 +25,7 @@ namespace detail {
namespace concurrenthashmap { namespace concurrenthashmap {
// hazptr retire() that can use an allocator. // hazptr deleter that can use an allocator.
template <typename Allocator> template <typename Allocator>
class HazptrDeleter { class HazptrDeleter {
public: public:
...@@ -123,38 +123,48 @@ template < ...@@ -123,38 +123,48 @@ template <
typename ValueType, typename ValueType,
typename Allocator, typename Allocator,
template <typename> class Atom = std::atomic> template <typename> class Atom = std::atomic>
class NodeT : public folly::hazptr::hazptr_obj_base_refcounted< class NodeT : public hazptr_obj_base_linked<
NodeT<KeyType, ValueType, Allocator, Atom>, NodeT<KeyType, ValueType, Allocator, Atom>,
Atom,
concurrenthashmap::HazptrDeleter<Allocator>> { concurrenthashmap::HazptrDeleter<Allocator>> {
public: public:
typedef std::pair<const KeyType, ValueType> value_type; typedef std::pair<const KeyType, ValueType> value_type;
explicit NodeT(NodeT* other) : item_(other->item_) {} explicit NodeT(NodeT* other) : item_(other->item_) {
this->set_deleter( // defined in hazptr_obj
concurrenthashmap::HazptrDeleter<Allocator>());
this->acquire_link_safe(); // defined in hazptr_obj_base_linked
}
template <typename Arg, typename... Args> template <typename Arg, typename... Args>
NodeT(Arg&& k, Args&&... args) NodeT(Arg&& k, Args&&... args)
: item_( : item_(
std::piecewise_construct, std::piecewise_construct,
std::forward<Arg>(k), std::forward<Arg>(k),
std::forward<Args>(args)...) {} std::forward<Args>(args)...) {
this->set_deleter( // defined in hazptr_obj
void release() {
this->retire(
folly::hazptr::default_hazptr_domain(),
concurrenthashmap::HazptrDeleter<Allocator>()); concurrenthashmap::HazptrDeleter<Allocator>());
this->acquire_link_safe(); // defined in hazptr_obj_base_linked
} }
~NodeT() {
auto next = next_.load(std::memory_order_acquire); void release() {
if (next) { this->unlink();
if (next->release_ref()) {
delete next;
}
}
} }
value_type& getItem() { value_type& getItem() {
return item_.getItem(); return item_.getItem();
} }
template <typename S>
void push_links(bool m, S& s) {
if (m) {
auto p = next_.load(std::memory_order_acquire);
if (p) {
s.push(p);
}
}
}
Atom<NodeT*> next_{nullptr}; Atom<NodeT*> next_{nullptr};
private: private:
...@@ -185,10 +195,6 @@ class NodeT : public folly::hazptr::hazptr_obj_base_refcounted< ...@@ -185,10 +195,6 @@ class NodeT : public folly::hazptr::hazptr_obj_base_refcounted<
* including dummy nodes increases the memory usage by 2x, but we * including dummy nodes increases the memory usage by 2x, but we
* could split the difference and still require a lock to set bucket * could split the difference and still require a lock to set bucket
* pointers. * pointers.
*
* * hazptr acquire/release could be optimized more, in
* single-threaded case, hazptr overhead is ~30% for a hot find()
* loop.
*/ */
template < template <
typename KeyType, typename KeyType,
...@@ -379,7 +385,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -379,7 +385,7 @@ class alignas(64) ConcurrentHashMapSegment {
} }
auto idx = getIdx(bcount, h); auto idx = getIdx(bcount, h);
auto head = &buckets->buckets_[idx]; auto head = &buckets->buckets_[idx]();
auto node = head->load(std::memory_order_relaxed); auto node = head->load(std::memory_order_relaxed);
auto headnode = node; auto headnode = node;
auto prev = head; auto prev = head;
...@@ -406,7 +412,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -406,7 +412,7 @@ class alignas(64) ConcurrentHashMapSegment {
auto next = node->next_.load(std::memory_order_relaxed); auto next = node->next_.load(std::memory_order_relaxed);
cur->next_.store(next, std::memory_order_relaxed); cur->next_.store(next, std::memory_order_relaxed);
if (next) { if (next) {
next->acquire_ref(); next->acquire_link(); // defined in hazptr_obj_base_linked
} }
prev->store(cur, std::memory_order_release); prev->store(cur, std::memory_order_release);
g.unlock(); g.unlock();
...@@ -437,7 +443,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -437,7 +443,7 @@ class alignas(64) ConcurrentHashMapSegment {
bcount <<= 1; bcount <<= 1;
hazbuckets.reset(buckets); hazbuckets.reset(buckets);
idx = getIdx(bcount, h); idx = getIdx(bcount, h);
head = &buckets->buckets_[idx]; head = &buckets->buckets_[idx]();
headnode = head->load(std::memory_order_relaxed); headnode = head->load(std::memory_order_relaxed);
} }
...@@ -465,7 +471,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -465,7 +471,7 @@ class alignas(64) ConcurrentHashMapSegment {
auto oldcount = bucket_count_.load(std::memory_order_relaxed); auto oldcount = bucket_count_.load(std::memory_order_relaxed);
for (size_t i = 0; i < oldcount; i++) { for (size_t i = 0; i < oldcount; i++) {
auto bucket = &buckets->buckets_[i]; auto bucket = &buckets->buckets_[i]();
auto node = bucket->load(std::memory_order_relaxed); auto node = bucket->load(std::memory_order_relaxed);
if (!node) { if (!node) {
continue; continue;
...@@ -491,15 +497,15 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -491,15 +497,15 @@ class alignas(64) ConcurrentHashMapSegment {
count++; count++;
} }
// Set longest last run in new bucket, incrementing the refcount. // Set longest last run in new bucket, incrementing the refcount.
lastrun->acquire_ref(); lastrun->acquire_link(); // defined in hazptr_obj_base_linked
newbuckets->buckets_[lastidx].store(lastrun, std::memory_order_relaxed); newbuckets->buckets_[lastidx]().store(lastrun, std::memory_order_relaxed);
// Clone remaining nodes // Clone remaining nodes
for (; node != lastrun; for (; node != lastrun;
node = node->next_.load(std::memory_order_relaxed)) { node = node->next_.load(std::memory_order_relaxed)) {
auto newnode = (Node*)Allocator().allocate(sizeof(Node)); auto newnode = (Node*)Allocator().allocate(sizeof(Node));
new (newnode) Node(node); new (newnode) Node(node);
auto k = getIdx(bucket_count, HashFn()(node->getItem().first)); auto k = getIdx(bucket_count, HashFn()(node->getItem().first));
auto prevhead = &newbuckets->buckets_[k]; auto prevhead = &newbuckets->buckets_[k]();
newnode->next_.store(prevhead->load(std::memory_order_relaxed)); newnode->next_.store(prevhead->load(std::memory_order_relaxed));
prevhead->store(newnode, std::memory_order_relaxed); prevhead->store(newnode, std::memory_order_relaxed);
} }
...@@ -511,31 +517,27 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -511,31 +517,27 @@ class alignas(64) ConcurrentHashMapSegment {
buckets_.store(newbuckets, std::memory_order_release); buckets_.store(newbuckets, std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_release); seqlock_.fetch_add(1, std::memory_order_release);
oldbuckets->retire( oldbuckets->retire(
folly::hazptr::default_hazptr_domain(),
concurrenthashmap::HazptrBucketDeleter<Allocator>(oldcount)); concurrenthashmap::HazptrBucketDeleter<Allocator>(oldcount));
} }
bool find(Iterator& res, const KeyType& k) { bool find(Iterator& res, const KeyType& k) {
auto hazcurr = &res.hazptrs_[1]; auto& hazcurr = res.hazptrs_[1];
folly::hazptr::hazptr_local<1> hlocal; auto& haznext = res.hazptrs_[2];
auto haznext = &hlocal[0];
auto h = HashFn()(k); auto h = HashFn()(k);
size_t bcount; size_t bcount;
Buckets* buckets; Buckets* buckets;
getBucketsAndCount(bcount, buckets, res.hazptrs_[0]); getBucketsAndCount(bcount, buckets, res.hazptrs_[0]);
auto idx = getIdx(bcount, h); auto idx = getIdx(bcount, h);
auto prev = &buckets->buckets_[idx]; auto prev = &buckets->buckets_[idx]();
auto node = hazcurr->get_protected(*prev); auto node = hazcurr.get_protected(*prev);
while (node) { while (node) {
if (KeyEqual()(k, node->getItem().first)) { if (KeyEqual()(k, node->getItem().first)) {
// We may be using hlocal, make sure we are using hazptrs_
res.hazptrs_[1].reset(node);
res.setNode(node, buckets, bcount, idx); res.setNode(node, buckets, bcount, idx);
return true; return true;
} }
node = haznext[0].get_protected(node->next_); node = haznext.get_protected(node->next_);
std::swap(hazcurr, haznext); hazcurr.swap(haznext);
} }
return false; return false;
} }
...@@ -554,14 +556,14 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -554,14 +556,14 @@ class alignas(64) ConcurrentHashMapSegment {
size_t bcount = bucket_count_.load(std::memory_order_relaxed); size_t bcount = bucket_count_.load(std::memory_order_relaxed);
auto buckets = buckets_.load(std::memory_order_relaxed); auto buckets = buckets_.load(std::memory_order_relaxed);
auto idx = getIdx(bcount, h); auto idx = getIdx(bcount, h);
auto head = &buckets->buckets_[idx]; auto head = &buckets->buckets_[idx]();
node = head->load(std::memory_order_relaxed); node = head->load(std::memory_order_relaxed);
Node* prev = nullptr; Node* prev = nullptr;
while (node) { while (node) {
if (KeyEqual()(key, node->getItem().first)) { if (KeyEqual()(key, node->getItem().first)) {
auto next = node->next_.load(std::memory_order_relaxed); auto next = node->next_.load(std::memory_order_relaxed);
if (next) { if (next) {
next->acquire_ref(); next->acquire_link(); // defined in hazptr_obj_base_linked
} }
if (prev) { if (prev) {
prev->next_.store(next, std::memory_order_release); prev->next_.store(next, std::memory_order_release);
...@@ -618,7 +620,6 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -618,7 +620,6 @@ class alignas(64) ConcurrentHashMapSegment {
size_ = 0; size_ = 0;
} }
buckets->retire( buckets->retire(
folly::hazptr::default_hazptr_domain(),
concurrenthashmap::HazptrBucketDeleter<Allocator>(bcount)); concurrenthashmap::HazptrBucketDeleter<Allocator>(bcount));
} }
...@@ -645,44 +646,36 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -645,44 +646,36 @@ class alignas(64) ConcurrentHashMapSegment {
// Could be optimized to avoid an extra pointer dereference by // Could be optimized to avoid an extra pointer dereference by
// allocating buckets_ at the same time. // allocating buckets_ at the same time.
class Buckets : public folly::hazptr::hazptr_obj_base< class Buckets : public hazptr_obj_base<
Buckets, Buckets,
Atom,
concurrenthashmap::HazptrBucketDeleter<Allocator>> { concurrenthashmap::HazptrBucketDeleter<Allocator>> {
using BucketRoot = hazptr_root<Node, Atom>;
Buckets() {} Buckets() {}
~Buckets() {} ~Buckets() {}
public: public:
static Buckets* create(size_t count) { static Buckets* create(size_t count) {
auto buf = auto buf =
Allocator().allocate(sizeof(Buckets) + sizeof(Atom<Node*>) * count); Allocator().allocate(sizeof(Buckets) + sizeof(BucketRoot) * count);
auto buckets = new (buf) Buckets(); auto buckets = new (buf) Buckets();
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
auto bucket = new (&buckets->buckets_[i]) Atom<Node*>(); new (&buckets->buckets_[i]) BucketRoot;
bucket->store(nullptr, std::memory_order_relaxed);
} }
return buckets; return buckets;
} }
void destroy(size_t count) { void destroy(size_t count) {
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
auto elem = buckets_[i].load(std::memory_order_relaxed); buckets_[i].~BucketRoot();
if (elem) {
// Buckets may not own the chain: this could be a Buckets
// struct freed after resize, and we only have a refcount
// on the chain.
if (elem->release_ref()) {
concurrenthashmap::HazptrDeleter<Allocator>()(elem);
}
}
typedef Atom<Node*> Element;
buckets_[i].~Element();
} }
this->~Buckets(); this->~Buckets();
Allocator().deallocate( Allocator().deallocate(
(uint8_t*)this, sizeof(Atom<Node*>) * count + sizeof(*this)); (uint8_t*)this, sizeof(BucketRoot) * count + sizeof(*this));
} }
Atom<Node*> buckets_[0]; BucketRoot buckets_[0];
}; };
public: public:
...@@ -712,7 +705,8 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -712,7 +705,8 @@ class alignas(64) ConcurrentHashMapSegment {
const Iterator& operator++() { const Iterator& operator++() {
DCHECK(node_); DCHECK(node_);
node_ = hazptrs_[1].get_protected(node_->next_); node_ = hazptrs_[2].get_protected(node_->next_);
hazptrs_[1].swap(hazptrs_[2]);
if (!node_) { if (!node_) {
++idx_; ++idx_;
next(); next();
...@@ -726,7 +720,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -726,7 +720,7 @@ class alignas(64) ConcurrentHashMapSegment {
break; break;
} }
DCHECK(buckets_); DCHECK(buckets_);
node_ = hazptrs_[1].get_protected(buckets_->buckets_[idx_]); node_ = hazptrs_[1].get_protected(buckets_->buckets_[idx_]());
if (node_) { if (node_) {
break; break;
} }
...@@ -776,7 +770,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -776,7 +770,7 @@ class alignas(64) ConcurrentHashMapSegment {
} }
// These are accessed directly from the functions above // These are accessed directly from the functions above
folly::hazptr::hazptr_array<2> hazptrs_; hazptr_array<3, Atom> hazptrs_;
private: private:
Node* node_{nullptr}; Node* node_{nullptr};
...@@ -794,7 +788,7 @@ class alignas(64) ConcurrentHashMapSegment { ...@@ -794,7 +788,7 @@ class alignas(64) ConcurrentHashMapSegment {
void getBucketsAndCount( void getBucketsAndCount(
size_t& bcount, size_t& bcount,
Buckets*& buckets, Buckets*& buckets,
folly::hazptr::hazptr_holder& hazptr) { hazptr_holder<Atom>& hazptr) {
while (true) { while (true) {
auto seqlock = seqlock_.load(std::memory_order_acquire); auto seqlock = seqlock_.load(std::memory_order_acquire);
bcount = bucket_count_.load(std::memory_order_acquire); bcount = bucket_count_.load(std::memory_order_acquire);
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include <folly/test/DeterministicSchedule.h> #include <folly/test/DeterministicSchedule.h>
using namespace folly::test; using namespace folly::test;
using namespace folly::hazptr;
using namespace folly; using namespace folly;
using namespace std; using namespace std;
...@@ -610,6 +609,8 @@ TEST(ConcurrentHashMap, Deletion) { ...@@ -610,6 +609,8 @@ TEST(ConcurrentHashMap, Deletion) {
map.insert(0, std::make_shared<Wrapper>(del)); map.insert(0, std::make_shared<Wrapper>(del));
} }
folly::hazptr_cleanup();
EXPECT_TRUE(del); EXPECT_TRUE(del);
} }
...@@ -623,7 +624,7 @@ TEST(ConcurrentHashMap, DeletionWithErase) { ...@@ -623,7 +624,7 @@ TEST(ConcurrentHashMap, DeletionWithErase) {
map.erase(0); map.erase(0);
} }
hazptr_cleanup(); folly::hazptr_cleanup();
EXPECT_TRUE(del); EXPECT_TRUE(del);
} }
...@@ -639,7 +640,7 @@ TEST(ConcurrentHashMap, DeletionWithIterator) { ...@@ -639,7 +640,7 @@ TEST(ConcurrentHashMap, DeletionWithIterator) {
map.erase(it); map.erase(it);
} }
hazptr_cleanup(); folly::hazptr_cleanup();
EXPECT_TRUE(del); EXPECT_TRUE(del);
} }
...@@ -656,6 +657,8 @@ TEST(ConcurrentHashMap, DeletionWithForLoop) { ...@@ -656,6 +657,8 @@ TEST(ConcurrentHashMap, DeletionWithForLoop) {
} }
} }
folly::hazptr_cleanup();
EXPECT_TRUE(del); EXPECT_TRUE(del);
} }
...@@ -669,6 +672,8 @@ TEST(ConcurrentHashMap, DeletionMultiple) { ...@@ -669,6 +672,8 @@ TEST(ConcurrentHashMap, DeletionMultiple) {
map.insert(1, std::make_shared<Wrapper>(del2)); map.insert(1, std::make_shared<Wrapper>(del2));
} }
folly::hazptr_cleanup();
EXPECT_TRUE(del1); EXPECT_TRUE(del1);
EXPECT_TRUE(del2); EXPECT_TRUE(del2);
} }
...@@ -683,7 +688,7 @@ TEST(ConcurrentHashMap, DeletionAssigned) { ...@@ -683,7 +688,7 @@ TEST(ConcurrentHashMap, DeletionAssigned) {
map.insert_or_assign(0, std::make_shared<Wrapper>(del2)); map.insert_or_assign(0, std::make_shared<Wrapper>(del2));
} }
hazptr_cleanup(); folly::hazptr_cleanup();
EXPECT_TRUE(del1); EXPECT_TRUE(del1);
EXPECT_TRUE(del2); EXPECT_TRUE(del2);
...@@ -700,7 +705,7 @@ TEST(ConcurrentHashMap, DeletionMultipleMaps) { ...@@ -700,7 +705,7 @@ TEST(ConcurrentHashMap, DeletionMultipleMaps) {
map2.insert(0, std::make_shared<Wrapper>(del2)); map2.insert(0, std::make_shared<Wrapper>(del2));
} }
hazptr_cleanup(); folly::hazptr_cleanup();
EXPECT_TRUE(del1); EXPECT_TRUE(del1);
EXPECT_TRUE(del2); EXPECT_TRUE(del2);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment