Commit 148e1b8f authored by Tingzhe Zhou's avatar Tingzhe Zhou Committed by Facebook Github Bot

Concurrent Priority Queue: Fast and Scalable

Summary:
This is the implementation of the unbounded concurrent priority queue based on Mound.
 - The data structure supports push, pop, size and empty operations.  [TODO: timed pop]
 - User can select whether it is the strict priority queue (PopBatch = 0), or relaxed implementation (PopBatch > 0).
 - Supports blocking.

Reviewed By: magedm

Differential Revision: D9028417

fbshipit-source-id: e12dc43aef4fb5b1195d9e27807cbd9f0ad1a401
parent 8daf15ad
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <climits>
#include <cmath>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <folly/Random.h>
#include <folly/SpinLock.h>
#include <folly/ThreadLocal.h>
#include <folly/detail/Futex.h>
#include <folly/lang/Align.h>
#include <folly/synchronization/Hazptr.h>
#include <folly/synchronization/WaitOptions.h>
#include <folly/synchronization/detail/Spin.h>
/// ------ Concurrent Priority Queue Implementation ------
// The concurrent priority queue implementation is based on the
// Mound data structure (Mounds: Array-Based Concurrent Priority Queues
// by Yujie Liu and Michael Spear, ICPP 2012)
//
/// --- Overview ---
// This relaxed implementation extends the Mound algorithm, and provides
// following features:
// - Arbitrary priorities.
// - Unbounded size.
// - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop]
// - Supports blocking.
// - Fast and Scalable.
//
/// --- Mound ---
// A Mound is a heap where each element is a sorted linked list.
// First nodes in the lists maintain the heap property. Push randomly
// selects a leaf at the bottom level, then uses binary search to find
// a place to insert the new node to the head of the list. Pop gets
// the node from the head of the list at the root, then swap the
// list down until the heap feature holds. To use Mound in our
// implementation, we need to solve the following problems:
// - 1. Lack of general relaxed implementations. Mound is appealing
// for relaxed priority queue implementation because pop the whole
// list from the root is straightforward. One thread pops the list
// and following threads can pop from the list until its empty.
// Those pops only trigger one swap done operation. Thus reduce
// the latency for pop and reduce the contention for Mound.
// The difficulty is to provide a scalable and fast mechanism
// to let threads concurrently get elements from the list.
// - 2. Lack of control of list length. The length for every
// lists is critical for the performance. Mound suffers from not
// only the extreme cases(Push with increasing priorities, Mound
// becomes a sorted linked list; Push with decreasing priorities,
// Mound becomes to a regular heap), but also the common case(for
// random generated priorities, Mound degrades to the regular heap
// after millions of push/pop operations). The difficulty is to
// stabilize the list length without losing the accuracy and performance.
// - 3. Does not support blocking. Blocking is an important feature.
// Mound paper does not mention it. Designing the new algorithm for
// efficient blocking is challenging.
// - 4. Memory management. Mound allows optimistic reads. We need to
// protect the node from been reclaimed.
//
/// --- Design ---
// Our implementation extends Mound algorithm to support
// efficient relaxed pop. We employ a shared buffer algorithm to
// share the popped list. Our algorithm makes popping from shared
// buffer as fast as fetch_and_add. We improve the performance
// and compact the heap structure by stabilizing the size of each list.
// The implementation exposes the template parameter to set the
// preferred list length. Under the hood, we provide algorithms for
// fast inserting, pruning, and merging. The blocking algorithm is
// tricky. It allows one producer only wakes one consumer at a time.
// It also does not block the producer. For optimistic read, we use
// hazard pointer to protect the node from been reclaimed. We optimize the
// check-lock-check pattern by using test-test-and-set spin lock.
/// --- Template Parameters: ---
// 1. PopBatch could be 0 or a positive integer.
// If it is 0, only pop one node at a time.
// This is the strict implementation. It guarantees the return
// priority is alway the highest. If it is > 0, we keep
// up to that number of nodes in a shared buffer to be consumed by
// subsequent pop operations.
//
// 2. ListTargetSize represents the minimal length for the list. It
// solves the problem when inserting to Mound with
// decreasing priority order (degrade to a heap). Moreover,
// it maintains the Mound structure stable after trillions of
// operations, which causes unbalanced problem in the original
// Mound algorithm. We set the prunning length and merging lengtyh
// based on this parameter.
//
/// --- Interface ---
// void push(const T& val)
// void pop(T& val)
// size_t size()
// bool empty()
template <
typename T,
bool MayBlock = false,
bool SupportsSize = false,
size_t PopBatch = 16,
size_t ListTargetSize = 25,
typename Mutex = folly::SpinLock,
template <typename> class Atom = std::atomic>
class RelaxedConcurrentPriorityQueue {
// Max height of the tree
static constexpr uint32_t MAX_LEVELS = 32;
// The default minimum value
static constexpr T MIN_VALUE = std::numeric_limits<T>::min();
// Align size for the shared buffer node
static constexpr size_t Align = 1u << 7;
static_assert(PopBatch <= 256, "PopBatch must be <= 256");
static_assert(
ListTargetSize >= 1 && ListTargetSize <= 256,
"TargetSize must be in the range [1, 256]");
// The maximal length for the list
static constexpr size_t PruningSize = ListTargetSize * 2;
// When pop from Mound, tree elements near the leaf
// level are likely be very small (the length of the list). When
// swapping down after pop a list, we check the size of the
// children to decide whether to merge them to their parent.
static constexpr size_t MergingSize = ListTargetSize;
/// List Node structure
struct Node : public folly::hazptr_obj_base<Node, Atom> {
Node* next;
T val;
};
/// Mound Element (Tree node), head points to a linked list
struct MoundElement {
// Reading (head, size) without acquiring the lock
Atom<Node*> head;
Atom<size_t> size;
alignas(Align) Mutex lock;
MoundElement() { // initializer
head.store(nullptr, std::memory_order_relaxed);
size.store(0, std::memory_order_relaxed);
}
};
/// The pos strcture simplify the implementation
struct Position {
uint32_t level;
uint32_t index;
};
/// Node for shared buffer should be aligned
struct BufferNode {
alignas(Align) Atom<Node*> pnode;
};
/// Data members
// Mound structure -> 2D array to represent a tree
MoundElement* levels_[MAX_LEVELS];
// Record the current leaf level (root is 0)
Atom<uint32_t> bottom_;
// It is used when expanding the tree
Atom<uint32_t> guard_;
// Mound with shared buffer
// Following two members are accessed by consumers
std::unique_ptr<BufferNode[]> shared_buffer_;
alignas(Align) Atom<int> top_loc_;
// Numbers of active waiting threads
// (Written by consumer; read by producer)
alignas(Align) Atom<int> active_waiting_;
// Monotonically increasing value (overflow to 0 is allowed)
// (Written by producer; read by consumer)
folly::detail::Futex<Atom> seq_number_;
// Two counters to calculate size of the queue
alignas(Align) Atom<size_t> counter_p_;
alignas(Align) Atom<size_t> counter_c_;
public:
/// Constructor
RelaxedConcurrentPriorityQueue()
: active_waiting_(0), seq_number_(0), counter_p_(0), counter_c_(0) {
if (PopBatch > 0) {
top_loc_ = -1;
shared_buffer_.reset(new BufferNode[PopBatch]);
for (size_t i = 0; i < PopBatch; i++) {
shared_buffer_[i].pnode = nullptr;
}
}
bottom_.store(0, std::memory_order_relaxed);
guard_.store(0, std::memory_order_relaxed);
// allocate the root MoundElement and initialize Mound
levels_[0] = new MoundElement[1]; // default MM for MoundElement
for (uint32_t i = 1; i < MAX_LEVELS; i++) {
levels_[i] = nullptr;
}
}
~RelaxedConcurrentPriorityQueue() {
if (PopBatch > 0) {
deleteSharedBuffer();
}
Position pos;
pos.level = pos.index = 0;
deleteAllNodes(pos);
// default MM for MoundElement
for (int i = getBottomLevel(); i >= 0; i--) {
delete[] levels_[i];
}
}
void push(const T& val) {
moundPush(val);
if (SupportsSize) {
counter_p_.fetch_add(1, std::memory_order_relaxed);
}
}
void pop(T& val) {
moundPop(val);
if (SupportsSize) {
counter_c_.fetch_add(1, std::memory_order_relaxed);
}
}
/// Note: size() and empty() are guaranteed to be accurate only if
/// the queue is not changed concurrently.
/// Returns an estimate of the size of the queue
size_t size() {
DCHECK(SupportsSize);
size_t p = counter_p_.load(std::memory_order_acquire);
size_t c = counter_c_.load(std::memory_order_acquire);
return (p > c) ? p - c : 0;
}
/// Returns true only if the queue was empty during the call.
bool empty() {
return isEmpty();
}
private:
uint32_t getBottomLevel() {
return bottom_.load(std::memory_order_acquire);
}
/// This function is only called by the destructor
void deleteSharedBuffer() {
DCHECK(PopBatch > 0);
// delete nodes in the buffer
int loc = top_loc_.load(std::memory_order_relaxed);
while (loc >= 0) {
Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
delete n;
}
// delete buffer
shared_buffer_.reset();
}
/// This function is only called by the destructor
void deleteAllNodes(const Position& pos) {
if (getElementSize(pos) == 0) {
// current list is empty, do not need to check
// its children again.
return;
}
Node* curList = getList(pos);
setTreeNode(pos, nullptr);
while (curList != nullptr) { // reclaim nodes
Node* n = curList;
curList = curList->next;
delete n;
}
if (!isLeaf(pos)) {
deleteAllNodes(leftOf(pos));
deleteAllNodes(rightOf(pos));
}
}
/// Check the first node in TreeElement keeps the heap structure.
bool isHeap(const Position& pos) {
if (isLeaf(pos)) {
return true;
}
Position lchild = leftOf(pos);
Position rchild = rightOf(pos);
return isHeap(lchild) && isHeap(rchild) &&
readValue(pos) >= readValue(lchild) &&
readValue(pos) >= readValue(rchild);
}
/// Current position is leaf?
FOLLY_ALWAYS_INLINE bool isLeaf(const Position& pos) {
return pos.level == getBottomLevel();
}
/// Current element is the root?
FOLLY_ALWAYS_INLINE bool isRoot(const Position& pos) {
return pos.level == 0;
}
/// Locate the parent node
FOLLY_ALWAYS_INLINE Position parentOf(const Position& pos) {
Position res;
res.level = pos.level - 1;
res.index = pos.index / 2;
return res;
}
/// Locate the left child
FOLLY_ALWAYS_INLINE Position leftOf(const Position& pos) {
Position res;
res.level = pos.level + 1;
res.index = pos.index * 2;
return res;
}
/// Locate the right child
FOLLY_ALWAYS_INLINE Position rightOf(const Position& pos) {
Position res;
res.level = pos.level + 1;
res.index = pos.index * 2 + 1;
return res;
}
/// get the list size in current MoundElement
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position& pos) {
return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
}
/// Set the size of current MoundElement
FOLLY_ALWAYS_INLINE void setElementSize(
const Position& pos,
const uint32_t& v) {
levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
}
/// Extend the tree level
void grow(uint32_t btm) {
while (true) {
if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
break;
}
// someone already expanded the tree
if (btm != getBottomLevel()) {
return;
}
std::this_thread::yield();
}
// double check the bottom has not changed yet
if (btm != getBottomLevel()) {
guard_.store(0, std::memory_order_release);
return;
}
// create and initialize the new level
uint32_t tmp_btm = getBottomLevel();
uint32_t size = 1 << (tmp_btm + 1);
MoundElement* new_level = new MoundElement[size]; // MM
levels_[tmp_btm + 1] = new_level;
bottom_.store(tmp_btm + 1, std::memory_order_acq_rel);
guard_.store(0, std::memory_order_release);
}
/// TODO: optimization
// This function is important, it selects a position to insert the
// node, there are two execution paths when this function returns.
// 1. It returns a position with head node has lower priority than the target.
// Thus it could be potentially used as the starting element to do the binary
// search to find the fit position. (slow path)
// 2. It returns a position, which is not the best fit.
// But it prevents aggressively grow the Mound. (fast path)
Position selectPosition(
const T& val,
bool& path,
uint32_t& seed,
folly::hazptr_holder<Atom>& hptr) {
while (true) {
uint32_t b = getBottomLevel();
int bound = 1 << b; // number of elements in this level
int steps = 1 + b * b; // probe the length
++seed;
uint32_t index = seed % bound;
for (int i = 0; i < steps; i++) {
int loc = (index + i) % bound;
Position pos;
pos.level = b;
pos.index = loc;
// the first round, we do the quick check
if (optimisticReadValue(pos, hptr) <= val) {
path = false;
seed = ++loc;
return pos;
} else if (getElementSize(pos) < ListTargetSize) {
// [fast path] conservative implementation
// it makes sure every tree element should
// have more than the given number of nodes.
seed = ++loc;
path = true;
return pos;
}
if (b != getBottomLevel()) {
break;
}
}
// failed too many times grow
if (b == getBottomLevel()) {
grow(b);
}
}
}
/// Swap two Tree Elements (head, size)
void swapList(const Position& a, const Position& b) {
Node* tmp = getList(a);
setTreeNode(a, getList(b));
setTreeNode(b, tmp);
// need to swap the tree node meta-data
uint32_t sa = getElementSize(a);
uint32_t sb = getElementSize(b);
setElementSize(a, sb);
setElementSize(b, sa);
}
FOLLY_ALWAYS_INLINE void lockNode(const Position& pos) {
levels_[pos.level][pos.index].lock.lock();
}
FOLLY_ALWAYS_INLINE void unlockNode(const Position& pos) {
levels_[pos.level][pos.index].lock.unlock();
}
FOLLY_ALWAYS_INLINE bool trylockNode(const Position& pos) {
return levels_[pos.level][pos.index].lock.try_lock();
}
FOLLY_ALWAYS_INLINE T
optimisticReadValue(const Position& pos, folly::hazptr_holder<Atom>& hptr) {
Node* tmp = hptr.get_protected(levels_[pos.level][pos.index].head);
return (tmp == nullptr) ? MIN_VALUE : tmp->val;
}
// Get the value from the head of the list as the elementvalue
FOLLY_ALWAYS_INLINE T readValue(const Position& pos) {
Node* tmp = getList(pos);
return (tmp == nullptr) ? MIN_VALUE : tmp->val;
}
FOLLY_ALWAYS_INLINE Node* getList(const Position& pos) {
return levels_[pos.level][pos.index].head.load(std::memory_order_relaxed);
}
FOLLY_ALWAYS_INLINE void setTreeNode(const Position& pos, Node* t) {
levels_[pos.level][pos.index].head.store(t, std::memory_order_relaxed);
}
// Merge two sorted lists
Node* mergeList(Node* base, Node* source) {
if (base == nullptr) {
return source;
} else if (source == nullptr) {
return base;
}
Node *res, *p;
// choose the head node
if (base->val >= source->val) {
res = base;
base = base->next;
p = res;
} else {
res = source;
source = source->next;
p = res;
}
while (base != nullptr && source != nullptr) {
if (base->val >= source->val) {
p->next = base;
base = base->next;
} else {
p->next = source;
source = source->next;
}
p = p->next;
}
if (base == nullptr) {
p->next = source;
} else {
p->next = base;
}
return res;
}
/// Merge list t to the Element Position
void mergeListTo(const Position& pos, Node* t, const size_t& list_length) {
Node* head = getList(pos);
setTreeNode(pos, mergeList(head, t));
uint32_t ns = getElementSize(pos) + list_length;
setElementSize(pos, ns);
}
bool pruningLeaf(const Position& pos) {
if (getElementSize(pos) <= PruningSize) {
unlockNode(pos);
return true;
}
int b = getBottomLevel();
int leaves = 1 << b;
int cnodes = 0;
for (int i = 0; i < leaves; i++) {
Position tmp;
tmp.level = b;
tmp.index = i;
if (getElementSize(tmp) != 0) {
cnodes++;
}
if (cnodes > leaves * 2 / 3) {
break;
}
}
if (cnodes <= leaves * 2 / 3) {
unlockNode(pos);
return true;
}
return false;
}
/// Split the current list into two lists,
/// then split the tail list and merge to two children.
void startPruning(const Position& pos) {
if (isLeaf(pos) && pruningLeaf(pos)) {
return;
}
// split the list, record the tail
Node* pruning_head = getList(pos);
int steps = ListTargetSize; // keep in the original list
for (int i = 0; i < steps - 1; i++) {
pruning_head = pruning_head->next;
}
Node* t = pruning_head;
pruning_head = pruning_head->next;
t->next = nullptr;
int tail_length = getElementSize(pos) - steps;
setElementSize(pos, steps);
// split the tail list into two lists
// evenly merge to two children
if (pos.level != getBottomLevel()) {
// split the rest into two lists
int left_length = (tail_length + 1) / 2;
int right_length = tail_length - left_length;
Node *to_right, *to_left = pruning_head;
for (int i = 0; i < left_length - 1; i++) {
pruning_head = pruning_head->next;
}
to_right = pruning_head->next;
pruning_head->next = nullptr;
Position lchild = leftOf(pos);
Position rchild = rightOf(pos);
if (left_length != 0) {
lockNode(lchild);
mergeListTo(lchild, to_left, left_length);
}
if (right_length != 0) {
lockNode(rchild);
mergeListTo(rchild, to_right, right_length);
}
unlockNode(pos);
if (left_length != 0 && getElementSize(lchild) > PruningSize) {
startPruning(lchild);
} else if (left_length != 0) {
unlockNode(lchild);
}
if (right_length != 0 && getElementSize(rchild) > PruningSize) {
startPruning(rchild);
} else if (right_length != 0) {
unlockNode(rchild);
}
} else { // time to grow the Mound
grow(pos.level);
// randomly choose a child to insert
if (steps % 2 == 1) {
Position rchild = rightOf(pos);
lockNode(rchild);
mergeListTo(rchild, pruning_head, tail_length);
unlockNode(pos);
unlockNode(rchild);
} else {
Position lchild = leftOf(pos);
lockNode(lchild);
mergeListTo(lchild, pruning_head, tail_length);
unlockNode(pos);
unlockNode(lchild);
}
}
}
// This function insert the new node (always) at the head of the
// current list. It needs to lock the parent & current
// This function may cause the list becoming tooooo long, so we
// provide pruning algorithm.
bool regularInsert(const Position& pos, const T& val, Node* newNode) {
// insert to the root node
if (isRoot(pos)) {
lockNode(pos);
T nv = readValue(pos);
if (LIKELY(nv <= val)) {
newNode->next = getList(pos);
setTreeNode(pos, newNode);
uint32_t sz = getElementSize(pos);
setElementSize(pos, sz + 1);
if (UNLIKELY(sz > PruningSize)) {
startPruning(pos);
} else {
unlockNode(pos);
}
return true;
}
unlockNode(pos);
return false;
}
// TODO a good place to improve accuracy
// insert to an inner node
Position parent = parentOf(pos);
if (!trylockNode(parent)) {
return false;
}
if (!trylockNode(pos)) {
unlockNode(parent);
return false;
}
T pv = readValue(parent);
T nv = readValue(pos);
if (LIKELY(pv > val && nv <= val)) {
newNode->next = getList(pos);
setTreeNode(pos, newNode);
uint32_t sz = getElementSize(pos);
setElementSize(pos, sz + 1);
unlockNode(parent);
if (UNLIKELY(sz > PruningSize)) {
startPruning(pos);
} else {
unlockNode(pos);
}
return true;
}
unlockNode(parent);
unlockNode(pos);
return false;
}
bool forceInsertToRoot(Node* newNode) {
Position pos;
pos.level = pos.index = 0;
std::unique_lock<Mutex> lck(
levels_[pos.level][pos.index].lock, std::try_to_lock);
if (!lck.owns_lock()) {
return false;
}
uint32_t sz = getElementSize(pos);
if (sz >= ListTargetSize) {
return false;
}
Node* p = getList(pos);
newNode->next = p;
setTreeNode(pos, newNode);
p = newNode;
while (p != nullptr && p->next != nullptr && p->val <= p->next->val) {
std::swap(p->val, p->next->val);
p = p->next;
}
setElementSize(pos, sz + 1);
return true;
}
// This function forces the new node inserting to the current position
// if the element does not hold the enough nodes. It is safe to
// lock just one position to insert, because it won't be the first
// node to sustain the heap structure.
bool forceInsert(const Position& pos, const T& val, Node* newNode) {
if (isRoot(pos)) {
return forceInsertToRoot(newNode);
}
while (true) {
std::unique_lock<Mutex> lck(
levels_[pos.level][pos.index].lock, std::try_to_lock);
if (!lck.owns_lock()) {
if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
continue;
} else {
return false;
}
}
T nv = readValue(pos);
uint32_t sz = getElementSize(pos);
// do not allow the new node to be the first one
// do not allow the list size tooooo big
if (UNLIKELY(nv < val || sz >= ListTargetSize)) {
return false;
}
Node* p = getList(pos);
// find a place to insert the node
while (p->next != nullptr && p->next->val > val) {
p = p->next;
}
newNode->next = p->next;
p->next = newNode;
// do not forget to change the metadata
setElementSize(pos, sz + 1);
return true;
}
}
void binarySearchPosition(
Position& cur,
const T& val,
folly::hazptr_holder<Atom>& hptr) {
Position parent, mid;
if (cur.level == 0) {
return;
}
// start from the root
parent.level = parent.index = 0;
while (true) { // binary search
mid.level = (cur.level + parent.level) / 2;
mid.index = cur.index >> (cur.level - mid.level);
T mv = optimisticReadValue(mid, hptr);
if (val < mv) {
parent = mid;
} else {
cur = mid;
}
if (mid.level == 0 || // the root
((parent.level + 1 == cur.level) && parent.level != 0)) {
return;
}
}
}
// The push keeps the length of each element stable
void moundPush(const T& val) {
Position cur;
folly::hazptr_holder<Atom> hptr;
Node* newNode = new Node;
newNode->val = val;
uint32_t seed = folly::Random::rand32() % (1 << 21);
while (true) {
// shell we go the fast path?
bool go_fast_path = false;
// chooice the right node to start
cur = selectPosition(val, go_fast_path, seed, hptr);
if (go_fast_path) {
if (LIKELY(forceInsert(cur, val, newNode))) {
if (MayBlock) {
blockingPushImpl();
}
return;
} else {
continue;
}
}
binarySearchPosition(cur, val, hptr);
if (LIKELY(regularInsert(cur, val, newNode))) {
if (MayBlock) {
blockingPushImpl();
}
return;
}
}
}
int popToSharedBuffer(const uint32_t rsize, Node* head) {
Position pos;
pos.level = pos.index = 0;
int num = std::min(rsize, (uint32_t)PopBatch);
for (int i = num - 1; i >= 0; i--) {
// wait until this block is empty
while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
;
shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
head = head->next;
}
if (num > 0) {
top_loc_.store(num - 1, std::memory_order_acq_rel);
}
setTreeNode(pos, head);
return rsize - num;
}
void mergeDown(const Position& pos) {
if (isLeaf(pos)) {
unlockNode(pos);
return;
}
// acquire locks for L and R and compare
Position lchild = leftOf(pos);
Position rchild = rightOf(pos);
lockNode(lchild);
lockNode(rchild);
// read values
T nv = readValue(pos);
T lv = readValue(lchild);
T rv = readValue(rchild);
if (nv >= lv && nv >= rv) {
unlockNode(pos);
unlockNode(lchild);
unlockNode(rchild);
return;
}
// If two children contains nodes less than the
// threshold, we merge two children to the parent
// and do merge down on both of them.
size_t sum =
getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
if (sum <= MergingSize) {
Node* l1 = mergeList(getList(rchild), getList(lchild));
setTreeNode(pos, mergeList(l1, getList(pos)));
setElementSize(pos, sum);
setTreeNode(lchild, nullptr);
setElementSize(lchild, 0);
setTreeNode(rchild, nullptr);
setElementSize(rchild, 0);
unlockNode(pos);
mergeDown(lchild);
mergeDown(rchild);
return;
}
// pull from right
if (rv >= lv && rv > nv) {
swapList(rchild, pos);
unlockNode(pos);
unlockNode(lchild);
mergeDown(rchild);
} else if (lv >= rv && lv > nv) {
// pull from left
swapList(lchild, pos);
unlockNode(pos);
unlockNode(rchild);
mergeDown(lchild);
}
}
bool deferSettingRootSize(Position& pos) {
if (isLeaf(pos)) {
setElementSize(pos, 0);
unlockNode(pos);
return true;
}
// acquire locks for L and R and compare
Position lchild = leftOf(pos);
Position rchild = rightOf(pos);
lockNode(lchild);
lockNode(rchild);
if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
setElementSize(pos, 0);
unlockNode(pos);
unlockNode(lchild);
unlockNode(rchild);
return true;
} else {
// read values
T lv = readValue(lchild);
T rv = readValue(rchild);
if (lv >= rv) {
swapList(lchild, pos);
setElementSize(lchild, 0);
unlockNode(pos);
unlockNode(rchild);
pos = lchild;
} else {
swapList(rchild, pos);
setElementSize(rchild, 0);
unlockNode(pos);
unlockNode(lchild);
pos = rchild;
}
return false;
}
}
bool moundPopMany(T& val) {
// pop from the root
Position pos;
pos.level = pos.index = 0;
// the root is nullptr, return false
Node* head = getList(pos);
if (head == nullptr) {
unlockNode(pos);
return false;
}
// shared buffer already filled by other threads
if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
unlockNode(pos);
return false;
}
uint32_t sz = getElementSize(pos);
// get the one node first
val = head->val;
Node* p = head;
head = head->next;
sz--;
if (PopBatch > 0) {
sz = popToSharedBuffer(sz, head);
} else {
setTreeNode(pos, head);
}
bool done = false;
if (LIKELY(sz == 0)) {
done = deferSettingRootSize(pos);
} else {
setElementSize(pos, sz);
}
if (LIKELY(!done)) {
mergeDown(pos);
}
p->retire();
return true;
}
FOLLY_ALWAYS_INLINE bool toWakeUp() {
std::atomic_thread_fence(std::memory_order_seq_cst);
auto c = active_waiting_.load(std::memory_order_acquire);
return c > 0;
}
void wakeUp() {
seq_number_.fetch_add(1, std::memory_order_seq_cst);
seq_number_.futexWake(1);
}
void blockingPushImpl() {
if (LIKELY(!toWakeUp())) {
return;
}
// slow path, likely to have a futexWake call
wakeUp();
}
// This could guarentee the Mound is empty
FOLLY_ALWAYS_INLINE bool isMoundEmpty() {
Position pos;
pos.level = pos.index = 0;
return getElementSize(pos) == 0;
}
// Return true if the shared buffer is empty
FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty() {
return top_loc_.load(std::memory_order_acquire) < 0;
}
bool isEmpty() {
if (PopBatch > 0) {
return isMoundEmpty() && isSharedBufferEmpty();
}
return isMoundEmpty();
}
void blockingPopImpl() {
active_waiting_.fetch_add(1, std::memory_order_seq_cst);
auto w = seq_number_.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
if (!isEmpty()) { // not empty
active_waiting_.fetch_sub(1, std::memory_order_seq_cst);
return;
}
seq_number_.futexWait(w);
active_waiting_.fetch_sub(1, std::memory_order_seq_cst);
}
bool tryPopFromMound(T& val) {
if (isMoundEmpty()) {
return false;
}
Position pos;
pos.level = pos.index = 0;
// lock the root
if (trylockNode(pos)) {
return moundPopMany(val);
}
return false;
}
FOLLY_ALWAYS_INLINE static folly::WaitOptions wait_options() {
return {};
}
template <typename Clock, typename Duration>
FOLLY_NOINLINE bool tryWait(
const std::chrono::time_point<Clock, Duration>& deadline,
const folly::WaitOptions& opt = wait_options()) {
// Fast path, by quick check the status
switch (folly::detail::spin_pause_until(
deadline, opt, [=] { return !isEmpty(); })) {
case folly::detail::spin_result::success:
return true;
case folly::detail::spin_result::timeout:
return false;
case folly::detail::spin_result::advance:
break;
}
// Spinning strategy
if (!MayBlock) {
while (true) {
auto res = folly::detail::spin_yield_until(
deadline, [=] { return !isEmpty(); });
if (res == folly::detail::spin_result::success) {
return true;
} else if (res == folly::detail::spin_result::timeout) {
return false;
}
}
} else { // Blocking strategy
blockingPopImpl();
}
return true;
}
bool tryPopFromSharedBuffer(T& val) {
int get_or = -1;
if (!isSharedBufferEmpty()) {
get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
if (get_or >= 0) {
Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
val = c->val;
c->retire();
return true;
}
}
return false;
}
void moundPop(T& val) {
if (PopBatch > 0) {
if (tryPopFromSharedBuffer(val)) {
return;
}
}
while (true) {
if (LIKELY(tryPopFromMound(val))) {
return;
}
tryWait(std::chrono::time_point<std::chrono::steady_clock>::max());
if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
return;
}
}
}
};
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <boost/thread.hpp>
#include <folly/Random.h>
#include <folly/SpinLock.h>
#include <folly/experimental/ConcurrentPriorityQueue.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/test/DeterministicSchedule.h>
#include <glog/logging.h>
DEFINE_int32(reps, 1, "number of reps");
DEFINE_int64(ops, 32, "number of operations per rep");
DEFINE_int64(elems, 64, "number of elements");
static std::vector<int> nthr = {1, 2, 4, 8};
// threads number
static uint32_t nthreads;
template <class PriorityQueue>
void basicOpsTest() {
int res;
PriorityQueue pq;
EXPECT_TRUE(pq.empty());
EXPECT_EQ(pq.size(), 0);
pq.push(1);
pq.push(2);
pq.push(3);
EXPECT_FALSE(pq.empty());
EXPECT_EQ(pq.size(), 3);
pq.pop(res);
EXPECT_EQ(res, 3);
pq.pop(res);
EXPECT_EQ(res, 2);
pq.pop(res);
EXPECT_EQ(res, 1);
EXPECT_TRUE(pq.empty());
EXPECT_EQ(pq.size(), 0);
pq.push(3);
pq.push(2);
pq.push(1);
pq.pop(res);
EXPECT_EQ(res, 3);
pq.pop(res);
EXPECT_EQ(res, 2);
pq.pop(res);
EXPECT_EQ(res, 1);
EXPECT_TRUE(pq.empty());
EXPECT_EQ(pq.size(), 0);
}
TEST(CPQ, BasicOpsTest) {
// Spinning
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true>>();
// Strict
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 1>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 0, 3>>();
// Relaxed
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 1, 1>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 2, 1>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, false, true, 3, 3>>();
// Blocking
basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>();
basicOpsTest<RelaxedConcurrentPriorityQueue<int, true, true, 2>>();
}
/// execute the function for nthreads
template <typename Func>
static void run_once(const Func& fn) {
boost::barrier barrier_start{nthreads + 1};
std::vector<std::thread> threads(nthreads);
for (uint32_t tid = 0; tid < nthreads; ++tid) {
threads[tid] = std::thread([&, tid] {
barrier_start.wait();
fn(tid);
});
}
barrier_start.wait(); // start the execution
for (auto& t : threads) {
t.join();
}
}
template <class PriorityQueue>
void singleThreadTest() {
PriorityQueue pq;
folly::Random::DefaultGenerator rng;
rng.seed(FLAGS_elems);
uint64_t expect_sum = 0;
// random push
for (int i = 0; i < FLAGS_elems; i++) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
expect_sum += val;
pq.push(val);
}
int val = 0;
int counter = 0;
uint64_t sum = 0;
while (counter < FLAGS_elems) {
pq.pop(val);
sum += val;
counter++;
}
EXPECT_EQ(sum, expect_sum);
}
TEST(CPQ, SingleThrStrictImplTest) {
// spinning
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
// blocking
singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 0, 8>>();
}
TEST(CPQ, SingleThrRelaxedImplTest) {
// spinning
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 1>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 2>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, false, false, 100, 8>>();
// blocking
singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 1>>();
singleThreadTest<RelaxedConcurrentPriorityQueue<int, true, false, 8>>();
}
/// concurrent pop should made the queue empty
/// with executing the eaqual elements pop function
template <class PriorityQueue>
void concurrentPopforSharedBuffer() {
for (int t : nthr) {
PriorityQueue pq;
folly::Random::DefaultGenerator rng;
rng.seed(FLAGS_elems);
uint64_t check_sum = 0;
// random push
for (int i = 0; i < FLAGS_elems; i++) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
pq.push(val);
check_sum += val;
}
std::atomic<uint64_t> pop_sum(0);
std::atomic<int> to_end(0);
nthreads = t;
auto fn = [&](uint32_t tid) {
int val = tid;
while (true) {
int index = to_end.fetch_add(1, std::memory_order_acq_rel);
if (index < FLAGS_elems) {
pq.pop(val);
} else {
break;
}
pop_sum.fetch_add(val, std::memory_order_acq_rel);
}
};
run_once(fn);
// check the sum of returned values of successful pop
EXPECT_EQ(pop_sum, check_sum);
}
}
TEST(CPQ, ConcurrentPopStrictImplTest) {
// spinning
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 0, 128>>();
// blocking
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 0>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 0, 128>>();
}
TEST(CPQ, ConcurrentPopRelaxedImplTest) {
// spinning
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 8, 2>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, false, false, 128, 2>>();
// blocking
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 1>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 128>>();
concurrentPopforSharedBuffer<
RelaxedConcurrentPriorityQueue<int, true, false, 8, 128>>();
}
/// executing fixed number of push, counting the
/// element number & total value.
template <class PriorityQueue>
void concurrentPush() {
for (int t : nthr) {
PriorityQueue pq;
std::atomic<uint64_t> counter_sum(0);
nthreads = t;
auto fn = [&](uint32_t tid) {
folly::Random::DefaultGenerator rng;
rng.seed(tid);
uint64_t local_sum = 0;
for (int i = tid; i < FLAGS_elems; i += nthreads) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
local_sum += val;
pq.push(val);
}
counter_sum.fetch_add(local_sum, std::memory_order_acq_rel);
};
run_once(fn);
// check the total number of elements
uint64_t actual_sum = 0;
for (int i = 0; i < FLAGS_elems; i++) {
int res = 0;
pq.pop(res);
actual_sum += res;
}
EXPECT_EQ(actual_sum, counter_sum);
}
}
TEST(CPQ, ConcurrentPushStrictImplTest) {
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 8>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 0, 128>>();
}
TEST(CPQ, ConcurrentPushRelaxedImplTest) {
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 2, 128>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 128, 8>>();
concurrentPush<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
}
template <class PriorityQueue>
void concurrentOps(int ops) {
for (int t : nthr) {
PriorityQueue pq;
std::atomic<uint64_t> counter_push(0);
std::atomic<uint64_t> counter_pop(0);
nthreads = t;
boost::barrier rb0{nthreads};
boost::barrier rb1{nthreads};
boost::barrier rb2{nthreads};
std::atomic<int> to_end(0);
auto fn = [&](uint32_t tid) {
folly::Random::DefaultGenerator rng;
rng.seed(tid);
uint64_t local_push = 0;
uint64_t local_pop = 0;
int res;
/// initialize the queue
for (int i = tid; i < FLAGS_elems; i += nthreads) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
local_push++;
pq.push(val);
}
rb0.wait();
/// operations
for (int i = 0; i < ops; i++) {
if (ops % 2 == 0) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
local_push++;
pq.push(val);
} else {
pq.pop(res);
local_pop++;
}
}
rb1.wait();
// collecting the ops info for checking purpose
counter_push.fetch_add(local_push, std::memory_order_seq_cst);
counter_pop.fetch_add(local_pop, std::memory_order_seq_cst);
rb2.wait();
/// clean up work
uint64_t r = counter_push.load(std::memory_order_seq_cst) -
counter_pop.load(std::memory_order_seq_cst);
while (true) {
uint64_t index = to_end.fetch_add(1, std::memory_order_acq_rel);
if (index < r) {
pq.pop(res);
} else {
break;
}
}
};
run_once(fn);
// the total push and pop ops should be the same
}
}
template <class PriorityQueue>
void concurrentSizeTest(int ops) {
for (int t : nthr) {
PriorityQueue pq;
nthreads = t;
EXPECT_TRUE(pq.empty());
auto fn = [&](uint32_t tid) {
folly::Random::DefaultGenerator rng;
rng.seed(tid);
uint64_t local_push = 0;
int res;
/// initialize the queue
for (int i = tid; i < FLAGS_elems; i += nthreads) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
local_push++;
pq.push(val);
}
/// operations
for (int i = 0; i < ops; i++) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
pq.push(val);
pq.pop(res);
}
};
run_once(fn);
// the total push and pop ops should be the same
EXPECT_EQ(pq.size(), FLAGS_elems);
}
}
static std::vector<int> sizes = {0, 1024};
TEST(CPQ, ConcurrentMixedStrictImplTest) {
for (auto size : sizes) {
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0>>(size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>(
size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 0, 32>>(
size);
}
}
TEST(CPQ, ConcurrentMixedRelaxedImplTest) {
for (auto size : sizes) {
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false>>(size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 1, 32>>(
size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 32, 1>>(
size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>(
size);
concurrentOps<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>(size);
}
}
TEST(CPQ, StrictImplSizeTest) {
for (auto size : sizes) {
concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 0>>(
size);
concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 0>>(
size);
}
}
TEST(CPQ, RelaxedImplSizeTest) {
for (auto size : sizes) {
concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true>>(size);
concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, true, true, 2, 8>>(
size);
concurrentSizeTest<RelaxedConcurrentPriorityQueue<int, false, true, 8, 2>>(
size);
}
}
template <class PriorityQueue>
void multiPusherPopper(int PushThr, int PopThr) {
int ops = FLAGS_ops;
uint32_t total_threads = PushThr + PopThr;
PriorityQueue pq;
std::atomic<uint64_t> sum_push(0);
std::atomic<uint64_t> sum_pop(0);
auto fn_popthr = [&](uint32_t tid) {
for (int i = tid; i < ops; i += PopThr) {
int val;
pq.pop(val);
sum_pop.fetch_add(val, std::memory_order_acq_rel);
}
};
auto fn_pushthr = [&](uint32_t tid) {
folly::Random::DefaultGenerator rng_t;
rng_t.seed(tid);
for (int i = tid; i < ops; i += PushThr) {
int val = folly::Random::rand32(rng_t);
pq.push(val);
sum_push.fetch_add(val, std::memory_order_acq_rel);
}
};
boost::barrier barrier_start{total_threads + 1};
std::vector<std::thread> threads_push(PushThr);
for (int tid = 0; tid < PushThr; ++tid) {
threads_push[tid] = std::thread([&, tid] {
barrier_start.wait();
fn_pushthr(tid);
});
}
std::vector<std::thread> threads_pop(PopThr);
for (int tid = 0; tid < PopThr; ++tid) {
threads_pop[tid] = std::thread([&, tid] {
barrier_start.wait();
fn_popthr(tid);
});
}
barrier_start.wait(); // start the execution
// begin time measurement
for (auto& t : threads_push) {
t.join();
}
for (auto& t : threads_pop) {
t.join();
}
EXPECT_EQ(sum_pop, sum_push);
}
TEST(CPQ, PusherPopperBlockingTest) {
for (auto i : nthr) {
for (auto j : nthr) {
// Original
multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>(
i, j);
// Relaxed
multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false, 1, 8>>(
i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, true, false, 8, 128>>(i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, true, false, 128, 8>>(i, j);
multiPusherPopper<RelaxedConcurrentPriorityQueue<int, true, false>>(i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, true, false, 16, 16>>(i, j);
}
}
}
TEST(CPQ, PusherPopperSpinningTest) {
for (auto i : nthr) {
for (auto j : nthr) {
// Original
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, false, false, 0, 1>>(i, j);
// Relaxed
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, false, false, 1, 8>>(i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, false, false, 8, 128>>(i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, false, false, 128, 8>>(i, j);
multiPusherPopper<RelaxedConcurrentPriorityQueue<int, false, false>>(
i, j);
multiPusherPopper<
RelaxedConcurrentPriorityQueue<int, false, false, 16, 16>>(i, j);
}
}
}
template <class PriorityQueue>
void blockingFirst() {
PriorityQueue pq;
int nPop = 16;
boost::barrier b{static_cast<unsigned int>(nPop + 1)};
std::atomic<int> finished{0};
std::vector<std::thread> threads_pop(nPop);
for (int tid = 0; tid < nPop; ++tid) {
threads_pop[tid] = std::thread([&] {
int val;
b.wait();
pq.pop(val);
finished.fetch_add(1, std::memory_order_acq_rel);
});
}
b.wait();
int c = 0;
// push to Mound one by one
// the popping threads should wake up one by one
do {
pq.push(1);
c++;
while (finished.load(std::memory_order_acquire) != c)
;
EXPECT_EQ(finished.load(std::memory_order_acquire), c);
} while (c < nPop);
for (auto& t : threads_pop) {
t.join();
}
}
template <class PriorityQueue>
void concurrentBlocking() {
uint32_t nThrs = 16;
for (int iter = 0; iter < FLAGS_reps * 10; iter++) {
PriorityQueue pq;
boost::barrier b{static_cast<unsigned int>(nThrs + nThrs + 1)};
std::atomic<uint32_t> finished{0};
std::vector<std::thread> threads_pop(nThrs);
for (uint32_t tid = 0; tid < nThrs; ++tid) {
threads_pop[tid] = std::thread([&] {
b.wait();
int val;
pq.pop(val);
finished.fetch_add(1, std::memory_order_acq_rel);
});
}
std::vector<std::thread> threads_push(nThrs);
for (uint32_t tid = 0; tid < nThrs; ++tid) {
threads_push[tid] = std::thread([&, tid] {
b.wait();
pq.push(tid);
while (finished.load(std::memory_order_acquire) != nThrs)
;
EXPECT_EQ(finished.load(std::memory_order_acquire), nThrs);
});
}
b.wait();
for (auto& t : threads_pop) {
t.join();
}
for (auto& t : threads_push) {
t.join();
}
}
}
TEST(CPQ, PopBlockingTest) {
// strict
blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
// relaxed
blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false>>();
blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
blockingFirst<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
// Spinning
blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false>>();
blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
blockingFirst<RelaxedConcurrentPriorityQueue<int, false, false, 16, 1>>();
}
TEST(CPQ, MixedBlockingTest) {
// strict
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 1>>();
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 0, 16>>();
// relaxed
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false>>();
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 8, 8>>();
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, true, false, 16, 1>>();
// Spinning
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false>>();
concurrentBlocking<RelaxedConcurrentPriorityQueue<int, false, false, 8, 8>>();
concurrentBlocking<
RelaxedConcurrentPriorityQueue<int, false, false, 16, 1>>();
}
using DSched = folly::test::DeterministicSchedule;
using folly::test::DeterministicAtomic;
using folly::test::DeterministicMutex;
template <class PriorityQueue, template <typename> class Atom = std::atomic>
static void DSchedMixedTest() {
for (int r = 0; r < FLAGS_reps; r++) {
// the thread number is 32
int thr = 8;
PriorityQueue pq;
folly::Random::DefaultGenerator rng;
rng.seed(thr);
uint64_t pre_sum = 0;
uint64_t pre_size = 0;
for (int i = 0; i < FLAGS_elems; i++) {
int val = folly::Random::rand32(rng) % FLAGS_elems + 1;
pq.push(val);
pre_sum += val;
}
pre_size = FLAGS_elems;
Atom<uint64_t> atom_push_sum{0};
Atom<uint64_t> atom_pop_sum{0};
std::vector<std::thread> threads(thr);
for (int tid = 0; tid < thr; ++tid) {
threads[tid] = DSched::thread([&]() {
folly::Random::DefaultGenerator tl_rng;
tl_rng.seed(thr);
uint64_t pop_sum = 0;
uint64_t push_sum = 0;
for (int i = 0; i < FLAGS_ops; i++) {
int val = folly::Random::rand32(tl_rng) % FLAGS_elems + 1;
pq.push(val);
push_sum += val;
pq.pop(val);
pop_sum += val;
}
atom_push_sum.fetch_add(push_sum, std::memory_order_acq_rel);
atom_pop_sum.fetch_add(pop_sum, std::memory_order_acq_rel);
});
}
for (auto& t : threads) {
DSched::join(t);
}
// It checks the number of elements remain in Mound
while (pre_size > 0) {
pre_size--;
int val = -1;
pq.pop(val);
atom_pop_sum += val;
}
// Check the accumulation of popped and pushed priorities
EXPECT_EQ(atom_pop_sum, pre_sum + atom_push_sum);
}
}
TEST(CPQ, DSchedMixedStrictTest) {
DSched sched(DSched::uniform(0));
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
0,
25,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
0,
25,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
0,
1,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
0,
1,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
0,
128,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
0,
128,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
}
TEST(CPQ, DSchedMixedRelaxedTest) {
DSched sched(DSched::uniform(0));
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
16,
25,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
16,
25,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
1,
16,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
1,
16,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
16,
1,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
16,
1,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
false,
false,
16,
16,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
DSchedMixedTest<
RelaxedConcurrentPriorityQueue<
int,
true,
false,
16,
16,
DeterministicMutex,
DeterministicAtomic>,
DeterministicAtomic>();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment