Commit 6e6f1db2 authored by Doron Roberts-Kedes's avatar Doron Roberts-Kedes Committed by Facebook Github Bot

Refactor folly::ConcurrentHashMapSegment to delegate internal work to BucketTable impl_ member

Summary:
This sets the stage for D14458269, which adds a 2nd possible ImplT class.

Move code shared between ImplT's into shared area.

Work done in *_internal functions and functions that access member fields other than `batch_` now goes into this impl_ member.

ConcurrentHashMapSegment basically is now just a wrapper that knows how to translate things like emplace, try_emplace, assign, etc. to various flavors of insert_internal. Changes to erase control flow are analogous.

Reviewed By: davidtgoldblatt

Differential Revision: D14458241

fbshipit-source-id: 41d103141ead1eb306b6fe863932f00f827f68c3
parent 5caa4e1d
......@@ -25,28 +25,13 @@ namespace detail {
namespace concurrenthashmap {
// hazptr deleter that can use an allocator.
template <typename Allocator>
class HazptrDeleter {
public:
template <typename Node>
void operator()(Node* node) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
};
template <typename Allocator>
class HazptrBucketDeleter {
size_t count_;
public:
HazptrBucketDeleter(size_t count) : count_(count) {}
HazptrBucketDeleter() = default;
template <typename Bucket>
void operator()(Bucket* bucket) {
bucket->destroy(count_);
}
enum class InsertType {
DOES_NOT_EXIST, // insert/emplace operations. If key exists, return false.
MUST_EXIST, // assign operations. If key does not exist, return false.
ANY, // insert_or_assign.
MATCH, // assign_if_equal (not in std). For concurrent maps, a
// way to atomically change a value if equal to some other
// value.
};
template <
......@@ -118,6 +103,31 @@ class ValueHolder<
mutable bool owned_{true};
};
// hazptr deleter that can use an allocator.
template <typename Allocator>
class HazptrDeleter {
public:
template <typename Node>
void operator()(Node* node) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
};
class HazptrTableDeleter {
size_t count_;
public:
HazptrTableDeleter(size_t count) : count_(count) {}
HazptrTableDeleter() = default;
template <typename Table>
void operator()(Table* table) {
table->destroy(count_);
}
};
namespace bucket {
template <
typename KeyType,
typename ValueType,
......@@ -176,31 +186,6 @@ class NodeT : public hazptr_obj_base_linked<
ValueHolder<KeyType, ValueType, Allocator> item_;
};
} // namespace concurrenthashmap
/* A Segment is a single shard of the ConcurrentHashMap.
* All writes take the lock, while readers are all wait-free.
* Readers always proceed in parallel with the single writer.
*
*
* Possible additional optimizations:
*
* * insert / erase could be lock / wait free. Would need to be
* careful that assign and rehash don't conflict (possibly with
* reader/writer lock, or microlock per node or per bucket, etc).
* Java 8 goes halfway, and does lock per bucket, except for the
* first item, that is inserted with a CAS (which is somewhat
* specific to java having a lock per object)
*
* * I tried using trylock() and find() to warm the cache for insert()
* and erase() similar to Java 7, but didn't have much luck.
*
* * We could order elements using split ordering, for faster rehash,
* and no need to ever copy nodes. Note that a full split ordering
* including dummy nodes increases the memory usage by 2x, but we
* could split the difference and still require a lock to set bucket
* pointers.
*/
template <
typename KeyType,
typename ValueType,
......@@ -210,31 +195,21 @@ template <
typename Allocator = std::allocator<uint8_t>,
template <typename> class Atom = std::atomic,
class Mutex = std::mutex>
class alignas(64) ConcurrentHashMapSegment {
enum class InsertType {
DOES_NOT_EXIST, // insert/emplace operations. If key exists, return false.
MUST_EXIST, // assign operations. If key does not exist, return false.
ANY, // insert_or_assign.
MATCH, // assign_if_equal (not in std). For concurrent maps, a
// way to atomically change a value if equal to some other
// value.
};
class alignas(64) BucketTable {
public:
typedef KeyType key_type;
typedef ValueType mapped_type;
typedef std::pair<const KeyType, ValueType> value_type;
typedef std::size_t size_type;
using Node = concurrenthashmap::NodeT<KeyType, ValueType, Allocator, Atom>;
using Node =
concurrenthashmap::bucket::NodeT<KeyType, ValueType, Allocator, Atom>;
using InsertType = concurrenthashmap::InsertType;
class Iterator;
ConcurrentHashMapSegment(
BucketTable(
size_t initial_buckets,
float load_factor,
size_t max_size,
hazptr_obj_batch<Atom>* batch)
: load_factor_(load_factor), max_size_(max_size), batch_(batch) {
: load_factor_(load_factor), max_size_(max_size) {
DCHECK(batch);
initial_buckets = folly::nextPowTwo(initial_buckets);
DCHECK(
......@@ -247,7 +222,7 @@ class alignas(64) ConcurrentHashMapSegment {
bucket_count_.store(initial_buckets, std::memory_order_relaxed);
}
~ConcurrentHashMapSegment() {
~BucketTable() {
auto buckets = buckets_.load(std::memory_order_relaxed);
// We can delete and not retire() here, since users must have
// their own synchronization around destruction.
......@@ -264,118 +239,14 @@ class alignas(64) ConcurrentHashMapSegment {
return size() == 0;
}
bool insert(Iterator& it, std::pair<key_type, mapped_type>&& foo) {
return insert(it, std::move(foo.first), std::move(foo.second));
}
template <typename Key, typename Value>
bool insert(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename... Args>
bool try_emplace(Iterator& it, Key&& k, Args&&... args) {
// Note: first key is only ever compared. Second is moved in to
// create the node, and the first key is never touched again.
return insert_internal(
it,
std::forward<Key>(k),
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
nullptr,
std::forward<Key>(k),
std::forward<Args>(args)...);
}
template <typename... Args>
bool emplace(Iterator& it, const KeyType& k, Node* node) {
return insert_internal(
it,
k,
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
node,
node);
}
template <typename Key, typename Value>
bool insert_or_assign(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::ANY,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename Value>
bool assign(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::MUST_EXIST,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename Value>
bool assign_if_equal(
Iterator& it,
Key&& k,
const ValueType& expected,
Value&& desired) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(desired));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::MATCH,
[&expected](const ValueType& v) { return v == expected; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename MatchFunc, typename... Args>
bool insert_internal(
bool insert(
Iterator& it,
const KeyType& k,
InsertType type,
MatchFunc match,
Node* cur,
hazptr_obj_batch<Atom>* batch,
Args&&... args) {
auto h = HashFn()(k);
std::unique_lock<Mutex> g(m_);
......@@ -388,7 +259,7 @@ class alignas(64) ConcurrentHashMapSegment {
// Would exceed max size.
throw std::bad_alloc();
}
rehash(bcount << 1);
rehash(bcount << 1, batch);
buckets = buckets_.load(std::memory_order_relaxed);
bcount = bucket_count_.load(std::memory_order_relaxed);
}
......@@ -416,7 +287,7 @@ class alignas(64) ConcurrentHashMapSegment {
} else {
if (!cur) {
cur = (Node*)Allocator().allocate(sizeof(Node));
new (cur) Node(batch_, std::forward<Args>(args)...);
new (cur) Node(batch, std::forward<Args>(args)...);
}
auto next = node->next_.load(std::memory_order_relaxed);
cur->next_.store(next, std::memory_order_relaxed);
......@@ -445,7 +316,7 @@ class alignas(64) ConcurrentHashMapSegment {
// Would exceed max size.
throw std::bad_alloc();
}
rehash(bcount << 1);
rehash(bcount << 1, batch);
// Reload correct bucket.
buckets = buckets_.load(std::memory_order_relaxed);
......@@ -463,7 +334,7 @@ class alignas(64) ConcurrentHashMapSegment {
// OR DOES_NOT_EXIST, but only in the try_emplace case
DCHECK(type == InsertType::ANY || type == InsertType::DOES_NOT_EXIST);
cur = (Node*)Allocator().allocate(sizeof(Node));
new (cur) Node(batch_, std::forward<Args>(args)...);
new (cur) Node(batch, std::forward<Args>(args)...);
}
cur->next_.store(headnode, std::memory_order_relaxed);
head->store(cur, std::memory_order_release);
......@@ -472,9 +343,9 @@ class alignas(64) ConcurrentHashMapSegment {
}
// Must hold lock.
void rehash(size_t bucket_count) {
void rehash(size_t bucket_count, hazptr_obj_batch<Atom>* batch) {
auto buckets = buckets_.load(std::memory_order_relaxed);
auto newbuckets = Buckets::create(bucket_count, batch_);
auto newbuckets = Buckets::create(bucket_count, batch);
load_factor_nodes_ = bucket_count * load_factor_;
......@@ -512,7 +383,7 @@ class alignas(64) ConcurrentHashMapSegment {
for (; node != lastrun;
node = node->next_.load(std::memory_order_relaxed)) {
auto newnode = (Node*)Allocator().allocate(sizeof(Node));
new (newnode) Node(batch_, node);
new (newnode) Node(batch, node);
auto k = getIdx(bucket_count, HashFn()(node->getItem().first));
auto prevhead = &newbuckets->buckets_[k]();
newnode->next_.store(prevhead->load(std::memory_order_relaxed));
......@@ -525,8 +396,7 @@ class alignas(64) ConcurrentHashMapSegment {
bucket_count_.store(bucket_count, std::memory_order_release);
buckets_.store(newbuckets, std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_release);
oldbuckets->retire(
concurrenthashmap::HazptrBucketDeleter<Allocator>(oldcount));
oldbuckets->retire(concurrenthashmap::HazptrTableDeleter(oldcount));
}
bool find(Iterator& res, const KeyType& k) {
......@@ -551,20 +421,8 @@ class alignas(64) ConcurrentHashMapSegment {
return false;
}
// Listed separately because we need a prev pointer.
size_type erase(const key_type& key) {
return erase_internal(key, nullptr, [](const ValueType&) { return true; });
}
size_type erase_if_equal(const key_type& key, const ValueType& expected) {
return erase_internal(key, nullptr, [&expected](const ValueType& v) {
return v == expected;
});
}
template <typename MatchFunc>
size_type
erase_internal(const key_type& key, Iterator* iter, MatchFunc match) {
std::size_t erase(const KeyType& key, Iterator* iter, MatchFunc match) {
Node* node{nullptr};
auto h = HashFn()(key);
{
......@@ -617,29 +475,17 @@ class alignas(64) ConcurrentHashMapSegment {
return 0;
}
// Unfortunately because we are reusing nodes on rehash, we can't
// have prev pointers in the bucket chain. We have to start the
// search from the bucket.
//
// This is a small departure from standard stl containers: erase may
// throw if hash or key_eq functions throw.
void erase(Iterator& res, Iterator& pos) {
erase_internal(pos->first, &res, [](const ValueType&) { return true; });
// Invalidate the iterator.
pos = cend();
}
void clear() {
void clear(hazptr_obj_batch<Atom>* batch) {
size_t bcount = bucket_count_.load(std::memory_order_relaxed);
Buckets* buckets;
auto newbuckets = Buckets::create(bcount, batch_);
auto newbuckets = Buckets::create(bcount, batch);
{
std::lock_guard<Mutex> g(m_);
buckets = buckets_.load(std::memory_order_relaxed);
buckets_.store(newbuckets, std::memory_order_release);
size_ = 0;
}
buckets->retire(concurrenthashmap::HazptrBucketDeleter<Allocator>(bcount));
buckets->retire(concurrenthashmap::HazptrTableDeleter(bcount));
}
void max_load_factor(float factor) {
......@@ -663,12 +509,13 @@ class alignas(64) ConcurrentHashMapSegment {
return Iterator(nullptr);
}
private:
// Could be optimized to avoid an extra pointer dereference by
// allocating buckets_ at the same time.
class Buckets : public hazptr_obj_base<
Buckets,
Atom,
concurrenthashmap::HazptrBucketDeleter<Allocator>> {
concurrenthashmap::HazptrTableDeleter> {
using BucketRoot = hazptr_root<Node, Atom>;
Buckets() {}
......@@ -838,6 +685,267 @@ class alignas(64) ConcurrentHashMapSegment {
alignas(64) Atom<Buckets*> buckets_{nullptr};
std::atomic<uint64_t> seqlock_{0};
Atom<size_t> bucket_count_;
};
} // namespace bucket
} // namespace concurrenthashmap
/* A Segment is a single shard of the ConcurrentHashMap.
* All writes take the lock, while readers are all wait-free.
* Readers always proceed in parallel with the single writer.
*
*
* Possible additional optimizations:
*
* * insert / erase could be lock / wait free. Would need to be
* careful that assign and rehash don't conflict (possibly with
* reader/writer lock, or microlock per node or per bucket, etc).
* Java 8 goes halfway, and does lock per bucket, except for the
* first item, that is inserted with a CAS (which is somewhat
* specific to java having a lock per object)
*
* * I tried using trylock() and find() to warm the cache for insert()
* and erase() similar to Java 7, but didn't have much luck.
*
* * We could order elements using split ordering, for faster rehash,
* and no need to ever copy nodes. Note that a full split ordering
* including dummy nodes increases the memory usage by 2x, but we
* could split the difference and still require a lock to set bucket
* pointers.
*/
template <
typename KeyType,
typename ValueType,
uint8_t ShardBits = 0,
typename HashFn = std::hash<KeyType>,
typename KeyEqual = std::equal_to<KeyType>,
typename Allocator = std::allocator<uint8_t>,
template <typename> class Atom = std::atomic,
class Mutex = std::mutex,
// TODO is there a better way to do this
template <
typename,
typename,
uint8_t,
typename,
typename,
typename,
template <typename> class,
class> class Impl = concurrenthashmap::bucket::BucketTable>
class alignas(64) ConcurrentHashMapSegment {
using ImplT = Impl<
KeyType,
ValueType,
ShardBits,
HashFn,
KeyEqual,
Allocator,
Atom,
Mutex>;
public:
typedef KeyType key_type;
typedef ValueType mapped_type;
typedef std::pair<const KeyType, ValueType> value_type;
typedef std::size_t size_type;
using InsertType = concurrenthashmap::InsertType;
using Iterator = typename ImplT::Iterator;
using Node = typename ImplT::Node;
ConcurrentHashMapSegment(
size_t initial_buckets,
float load_factor,
size_t max_size,
hazptr_obj_batch<Atom>* batch)
: impl_(initial_buckets, load_factor, max_size, batch), batch_(batch) {
DCHECK(batch);
}
~ConcurrentHashMapSegment() = default;
size_t size() {
return impl_.size();
}
bool empty() {
return impl_.empty();
}
bool insert(Iterator& it, std::pair<key_type, mapped_type>&& foo) {
return insert(it, std::move(foo.first), std::move(foo.second));
}
template <typename Key, typename Value>
bool insert(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename... Args>
bool try_emplace(Iterator& it, Key&& k, Args&&... args) {
// Note: first key is only ever compared. Second is moved in to
// create the node, and the first key is never touched again.
return insert_internal(
it,
std::forward<Key>(k),
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
nullptr,
std::forward<Key>(k),
std::forward<Args>(args)...);
}
template <typename... Args>
bool emplace(Iterator& it, const KeyType& k, Node* node) {
return insert_internal(
it,
k,
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
node,
node);
}
template <typename Key, typename Value>
bool insert_or_assign(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::ANY,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename Value>
bool assign(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::MUST_EXIST,
[](const ValueType&) { return false; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename Key, typename Value>
bool assign_if_equal(
Iterator& it,
Key&& k,
const ValueType& expected,
Value&& desired) {
auto node = (Node*)Allocator().allocate(sizeof(Node));
new (node) Node(batch_, std::forward<Key>(k), std::forward<Value>(desired));
auto res = insert_internal(
it,
node->getItem().first,
InsertType::MATCH,
[&expected](const ValueType& v) { return v == expected; },
node,
node);
if (!res) {
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
template <typename MatchFunc, typename... Args>
bool insert_internal(
Iterator& it,
const KeyType& k,
InsertType type,
MatchFunc match,
Node* cur,
Args&&... args) {
return impl_.insert(
it, k, type, match, cur, batch_, std::forward<Args>(args)...);
}
// Must hold lock.
void rehash(size_t bucket_count) {
impl_.rehash(bucket_count, batch_);
}
bool find(Iterator& res, const KeyType& k) {
return impl_.find(res, k);
}
// Listed separately because we need a prev pointer.
size_type erase(const key_type& key) {
return erase_internal(key, nullptr, [](const ValueType&) { return true; });
}
size_type erase_if_equal(const key_type& key, const ValueType& expected) {
return erase_internal(key, nullptr, [&expected](const ValueType& v) {
return v == expected;
});
}
template <typename MatchFunc>
size_type
erase_internal(const key_type& key, Iterator* iter, MatchFunc match) {
return impl_.erase(key, iter, match);
}
// Unfortunately because we are reusing nodes on rehash, we can't
// have prev pointers in the bucket chain. We have to start the
// search from the bucket.
//
// This is a small departure from standard stl containers: erase may
// throw if hash or key_eq functions throw.
void erase(Iterator& res, Iterator& pos) {
erase_internal(pos->first, &res, [](const ValueType&) { return true; });
// Invalidate the iterator.
pos = cend();
}
void clear() {
impl_.clear(batch_);
}
void max_load_factor(float factor) {
impl_.max_load_factor(factor);
}
Iterator cbegin() {
return impl_.cbegin();
}
Iterator cend() {
return impl_.cend();
}
private:
ImplT impl_;
hazptr_obj_batch<Atom>* batch_;
};
} // namespace detail
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment