Commit 59ea1768 authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot 4

Dynamic expansion of folly MPMC queue

Summary:
This diff allows queues to start with small capacity and expand as needed up to the specified capacity.
The main additions and changes:
- Extra template parameter `Dynamic` that enables dynamic expansion (`default 'false').
- `ClosedArray` type.
- Extra members:
  -- `dstate_`: a packed 64 bit unsigned int that contains a seqlock (which implicitly indicates the number of expansions and the lowest ticket for the current `dslots_/dcapacity_/dstride_` configuration.
 -- `dcapacity_`: current dynamic capacity.
 -- `dslots_`: current dynamic slots array. (in anonymous union with `slots_`)
 -- `dstride_`: current dynamic stride. (in anonymous union with `stride_`)
 -- `closed_` a logarithmic-sized array of ClosedArray to hold information about earlier smaller queue arrays for use by lagging consumers.

Design sketch:
- Reallocate a new larger array on expansion
- Expansion uses a seqlock. The common case critical path includes a seqlock read-only section.
- Lagging consumers and lagging blocking producers use a logarithmic-sized array for info about closed arrays
- Tickets are adjusted by an offset (to accounts for the tickets associated with earlier closed arrays) in order to calculate appropriate index and turn.
- The synching of `pushTicket_` with the ticket offset packed in `dstate_` is tricky. `pushTicket_` is accessed outside `dstate_`'s seqlock.

Reviewed By: djwatson

Differential Revision: D3462592

fbshipit-source-id: d442a7694190cca3c33753409ffac941d7463f83
parent 1cd90b1d
......@@ -38,6 +38,9 @@ struct SingleElementQueue;
template <typename T> class MPMCPipelineStageImpl;
/// MPMCQueue base CRTP template
template <typename> class MPMCQueueBase;
} // namespace detail
/// MPMCQueue<T> is a high-performance bounded concurrent queue that
......@@ -93,34 +96,536 @@ template <typename T> class MPMCPipelineStageImpl;
/// are you can enqueue one sentinel and then have each consumer requeue
/// two sentinels after it receives it (by requeuing 2 the shutdown can
/// complete in O(log P) time instead of O(P)).
template<typename T,
template<typename> class Atom = std::atomic>
class MPMCQueue : boost::noncopyable {
template<typename T, template<typename> class Atom = std::atomic,
bool Dynamic = false>
class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
friend class detail::MPMCPipelineStageImpl<T>;
using Slot = detail::SingleElementQueue<T,Atom>;
public:
explicit MPMCQueue(size_t queueCapacity)
: detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>>(queueCapacity)
{
this->stride_ = this->computeStride(queueCapacity);
this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
}
MPMCQueue() noexcept { }
};
/// The dynamic version of MPMCQueue allows dynamic expansion of queue
/// capacity, such that a queue may start with a smaller capacity than
/// specified and expand only if needed. Users may optionally specify
/// the initial capacity and the expansion multiplier.
///
/// The design uses a seqlock to enforce mutual exclusion among
/// expansion attempts. Regular operations read up-to-date queue
/// information (slots array, capacity, stride) inside read-only
/// seqlock sections, which are unimpeded when no expansion is in
/// progress.
///
/// An expansion computes a new capacity, allocates a new slots array,
/// and updates stride. No information needs to be copied from the
/// current slots array to the new one. When this happens, new slots
/// will not have sequence numbers that match ticket numbers. The
/// expansion needs to compute a ticket offset such that operations
/// that use new arrays can adjust the calculations of slot indexes
/// and sequence numbers that take into account that the new slots
/// start with sequence numbers of zero. The current ticket offset is
/// packed with the seqlock in an atomic 64-bit integer. The initial
/// offset is zero.
///
/// Lagging write and read operations with tickets lower than the
/// ticket offset of the current slots array (i.e., the minimum ticket
/// number that can be served by the current array) must use earlier
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
/// structure never need to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
/// prevent the issuing of new push and pop tickets concurrently. The
/// expansion must set the new ticket offset to a value that couldn't
/// have been issued to an operation that has already gone through a
/// seqlock read-only section (and hence obtained information for
/// older closed arrays).
///
/// Note that the total queue capacity can temporarily exceed the
/// specified capacity when there are lagging consumers that haven't
/// yet consumed all the elements in closed arrays. Users should not
/// rely on the capacity of dynamic queues for synchronization, e.g.,
/// they should not expect that a thread will definitely block on a
/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template<typename> class Atom>
class MPMCQueue<T,Atom,true> :
public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
using Slot = detail::SingleElementQueue<T,Atom>;
struct ClosedArray {
uint64_t offset_ {0};
Slot* slots_ {nullptr};
size_t capacity_ {0};
int stride_ {0};
};
public:
explicit MPMCQueue(size_t queueCapacity)
: detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
{
size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
initQueue(cap, kDefaultExpansionMultiplier);
}
explicit MPMCQueue(size_t queueCapacity,
size_t minCapacity,
size_t expansionMultiplier)
: detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
{
minCapacity = std::max<size_t>(1, minCapacity);
size_t cap = std::min<size_t>(minCapacity, queueCapacity);
expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
initQueue(cap, expansionMultiplier);
}
MPMCQueue() noexcept {
dmult_ = 0;
closed_ = nullptr;
}
MPMCQueue(MPMCQueue<T,Atom,true>&& rhs) noexcept {
this->capacity_ = rhs.capacity_;
this->slots_ = rhs.slots_;
this->stride_ = rhs.stride_;
this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
this->pushSpinCutoff_.store(
rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
this->popSpinCutoff_.store(
rhs.popSpinCutoff_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
dmult_ = rhs.dmult_;
closed_ = rhs.closed_;
rhs.capacity_ = 0;
rhs.slots_ = nullptr;
rhs.stride_ = 0;
rhs.dstate_.store(0, std::memory_order_relaxed);
rhs.dcapacity_.store(0, std::memory_order_relaxed);
rhs.pushTicket_.store(0, std::memory_order_relaxed);
rhs.popTicket_.store(0, std::memory_order_relaxed);
rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
rhs.dmult_ = 0;
rhs.closed_ = nullptr;
}
MPMCQueue<T,Atom, true> const& operator= (MPMCQueue<T,Atom, true>&& rhs) {
if (this != &rhs) {
this->~MPMCQueue();
new (this) MPMCQueue(std::move(rhs));
}
return *this;
}
~MPMCQueue() {
if (closed_ != nullptr) {
for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
delete[] closed_[i].slots_;
}
delete[] closed_;
}
}
size_t allocatedCapacity() const noexcept {
return this->dcapacity_.load(std::memory_order_relaxed);
}
template <typename ...Args>
void blockingWrite(Args&&... args) noexcept {
uint64_t ticket = this->pushTicket_++;
Slot* slots;
size_t cap;
int stride;
uint64_t state;
uint64_t offset;
do {
if (!trySeqlockReadSection(state, slots, cap, stride)) {
continue;
}
offset = getOffset(state);
if (ticket < offset) {
// There was an expansion after this ticket was issued.
updateFromClosed(state, ticket, offset, slots, cap, stride);
break;
}
if (slots[this->idx((ticket-offset), cap, stride)]
.mayEnqueue(this->turn(ticket-offset, cap))) {
// A slot is ready. No need to expand.
break;
} else if (this->popTicket_.load(std::memory_order_relaxed) + cap
> ticket) {
// May block, but a pop is in progress. No need to expand.
// Get seqlock read section info again in case an expansion
// occurred with an equal or higher ticket.
continue;
} else {
// May block. See if we can expand.
if (tryExpand(state, cap)) {
// This or another thread started an expansion. Get updated info.
continue;
} else {
// Can't expand.
break;
}
}
} while (true);
this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
std::forward<Args>(args)...);
}
void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
ticket = this->popTicket_++;
Slot* slots;
size_t cap;
int stride;
uint64_t state;
uint64_t offset;
while (!trySeqlockReadSection(state, slots, cap, stride));
offset = getOffset(state);
if (ticket < offset) {
// There was an expansion after the corresponding push ticket
// was issued.
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
}
private:
enum {
kSeqlockBits = 6,
kDefaultMinDynamicCapacity = 10,
kDefaultExpansionMultiplier = 10,
};
size_t dmult_;
// Info about closed slots arrays for use by lagging operations
ClosedArray* closed_;
void initQueue(const size_t cap, const size_t mult) {
this->stride_ = this->computeStride(cap);
this->slots_ = new Slot[cap + 2 * this->kSlotPadding];
this->dstate_.store(0);
this->dcapacity_.store(cap);
dmult_ = mult;
size_t maxClosed = 0;
for (size_t expanded = cap;
expanded < this->capacity_;
expanded *= mult) {
++maxClosed;
}
closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
}
bool tryObtainReadyPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
do {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
if (!trySeqlockReadSection(state, slots, cap, stride)) {
continue;
}
uint64_t offset = getOffset(state);
if (ticket < offset) {
// There was an expansion with offset greater than this ticket
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
if (slots[this->idx((ticket-offset), cap, stride)]
.mayEnqueue(this->turn(ticket-offset, cap))) {
// A slot is ready.
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
return true;
} else {
continue;
}
} else {
if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
// Try again. Ticket changed.
continue;
}
// Likely to block.
// Try to expand unless the ticket is for a closed array
if (offset == getOffset(state)) {
if (tryExpand(state, cap)) {
// This or another thread started an expansion. Get up-to-date info.
continue;
}
}
return false;
}
} while (true);
}
bool tryObtainPromisedPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
do {
ticket = this->pushTicket_.load(std::memory_order_acquire);
auto numPops = this->popTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
continue;
}
int64_t n = ticket - numPops;
if (n >= static_cast<ssize_t>(this->capacity_)) {
return false;
}
if ((n >= static_cast<ssize_t>(cap))) {
if (tryExpand(state, cap)) {
// This or another thread started an expansion. Start over
// with a new state.
continue;
} else {
// Can't expand.
return false;
}
}
uint64_t offset = getOffset(state);
if (ticket < offset) {
// There was an expansion with offset greater than this ticket
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
return true;
}
} while (true);
}
bool tryObtainReadyPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
do {
ticket = this->popTicket_.load(std::memory_order_relaxed);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
continue;
}
uint64_t offset = getOffset(state);
if (ticket < offset) {
// There was an expansion after the corresponding push ticket
// was issued.
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
if (slots[this->idx((ticket-offset), cap, stride)]
.mayDequeue(this->turn(ticket-offset, cap))) {
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
ticket -= offset;
return true;
}
} else {
return false;
}
} while (true);
}
bool tryObtainPromisedPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
do {
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
if (!trySeqlockReadSection(state, slots, cap, stride)) {
continue;
}
if (ticket >= numPushes) {
return false;
}
if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
// Adjust ticket
uint64_t offset = getOffset(state);
if (ticket < offset) {
// There was an expansion after the corresponding push
// ticket was issued.
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
// Adjust ticket
ticket -= offset;
return true;
}
} while (true);
}
/// Enqueues an element with a specific ticket number
template <typename ...Args>
void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
Slot* slots;
size_t cap;
int stride;
uint64_t state;
uint64_t offset;
while (!trySeqlockReadSection(state, slots, cap, stride)) {}
offset = getOffset(state);
if (ticket < offset) {
// There was an expansion after this ticket was issued.
updateFromClosed(state, ticket, offset, slots, cap, stride);
}
this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
std::forward<Args>(args)...);
}
uint64_t getOffset(const uint64_t state) const noexcept {
return state >> kSeqlockBits;
}
int getNumClosed(const uint64_t state) const noexcept {
return (state & ((1 << kSeqlockBits) - 1)) >> 1;
}
/// Try to expand the queue. Returns true if this expansion was
/// successful or a concurent expansion is in progress. Returns
/// false if the queue has reached its maximum capacity or
/// allocation has failed.
bool tryExpand(const uint64_t state, const size_t cap) noexcept {
if (cap == this->capacity_) {
return false;
}
// Acquire seqlock
uint64_t oldval = state;
assert((state & 1) == 0);
if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
assert(cap == this->dcapacity_.load());
uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
this->popTicket_.load());
size_t newCapacity =
std::min(dmult_ * cap, this->capacity_);
Slot* newSlots =
new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
if (newSlots == nullptr) {
// Expansion failed. Restore the seqlock
this->dstate_.store(state);
return false;
}
// Successful expansion
// calculate the current ticket offset
uint64_t offset = getOffset(state);
// calculate index in closed array
int index = getNumClosed(state);
assert((index << 1) < (1 << kSeqlockBits));
// fill the info for the closed slots array
closed_[index].offset_ = offset;
closed_[index].slots_ = this->dslots_.load();
closed_[index].capacity_ = cap;
closed_[index].stride_ = this->dstride_.load();
// update the new slots array info
this->dslots_.store(newSlots);
this->dcapacity_.store(newCapacity);
this->dstride_.store(this->computeStride(newCapacity));
// Release the seqlock and record the new ticket offset
this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
return true;
} else { // failed to acquire seqlock
// Someone acaquired the seqlock. Go back to the caller and get
// up-to-date info.
return true;
}
}
/// Seqlock read-only section
bool trySeqlockReadSection(
uint64_t& state, Slot*& slots, size_t& cap, int& stride
) noexcept {
state = this->dstate_.load(std::memory_order_acquire);
if (state & 1) {
// Locked.
return false;
}
// Start read-only section.
slots = this->dslots_.load(std::memory_order_relaxed);
cap = this->dcapacity_.load(std::memory_order_relaxed);
stride = this->dstride_.load(std::memory_order_relaxed);
// End of read-only section. Validate seqlock.
std::atomic_thread_fence(std::memory_order_acquire);
return (state == this->dstate_.load(std::memory_order_relaxed));
}
/// Update local variables of a lagging operation using the
/// most recent closed array with offset <= ticket
void updateFromClosed(
const uint64_t state, const uint64_t ticket,
uint64_t& offset, Slot*& slots, size_t& cap, int& stride
) noexcept {
for (int i = getNumClosed(state) - 1; i >= 0; --i) {
offset = closed_[i].offset_;
if (offset <= ticket) {
slots = closed_[i].slots_;
cap = closed_[i].capacity_;
stride = closed_[i].stride_;
return;;
}
}
// A closed array with offset <= ticket should have been found
assert(false);
}
};
namespace detail {
/// CRTP specialization of MPMCQueueBase
template<
template<
typename T, template<typename> class Atom, bool Dynamic> class Derived,
typename T, template<typename> class Atom, bool Dynamic>
class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
// Note: Using CRTP static casts in several functions of this base
// template instead of making called functions virtual or duplicating
// the code of calling functions in the derived partially specialized
// template
static_assert(std::is_nothrow_constructible<T,T&&>::value ||
folly::IsRelocatable<T>::value,
"T must be relocatable or have a noexcept move constructor");
friend class detail::MPMCPipelineStageImpl<T>;
public:
typedef T value_type;
explicit MPMCQueue(size_t queueCapacity)
using Slot = detail::SingleElementQueue<T,Atom>;
explicit MPMCQueueBase(size_t queueCapacity)
: capacity_(queueCapacity)
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
, popSpinCutoff_(0)
{
if (queueCapacity == 0)
if (queueCapacity == 0) {
throw std::invalid_argument(
"MPMCQueue with explicit capacity 0 is impossible"
// Stride computation in derived classes would sigfpe if capacity is 0
);
// would sigfpe if capacity is 0
stride_ = computeStride(queueCapacity);
slots_ = new detail::SingleElementQueue<T,Atom>[queueCapacity +
2 * kSlotPadding];
}
// ideally this would be a static assert, but g++ doesn't allow it
assert(alignof(MPMCQueue<T,Atom>)
......@@ -132,10 +637,12 @@ class MPMCQueue : boost::noncopyable {
/// A default-constructed queue is useful because a usable (non-zero
/// capacity) queue can be moved onto it or swapped with it
MPMCQueue() noexcept
MPMCQueueBase() noexcept
: capacity_(0)
, slots_(nullptr)
, stride_(0)
, dstate_(0)
, dcapacity_(0)
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
......@@ -145,10 +652,12 @@ class MPMCQueue : boost::noncopyable {
/// IMPORTANT: The move constructor is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
MPMCQueueBase(MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) noexcept
: capacity_(rhs.capacity_)
, slots_(rhs.slots_)
, stride_(rhs.stride_)
, dstate_(rhs.dstate_.load(std::memory_order_relaxed))
, dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed))
, pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
, popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
, pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
......@@ -161,6 +670,8 @@ class MPMCQueue : boost::noncopyable {
rhs.capacity_ = 0;
rhs.slots_ = nullptr;
rhs.stride_ = 0;
rhs.dstate_.store(0, std::memory_order_relaxed);
rhs.dcapacity_.store(0, std::memory_order_relaxed);
rhs.pushTicket_.store(0, std::memory_order_relaxed);
rhs.popTicket_.store(0, std::memory_order_relaxed);
rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
......@@ -170,17 +681,18 @@ class MPMCQueue : boost::noncopyable {
/// IMPORTANT: The move operator is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
MPMCQueueBase<Derived<T,Atom,Dynamic>> const& operator=
(MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) {
if (this != &rhs) {
this->~MPMCQueue();
new (this) MPMCQueue(std::move(rhs));
this->~MPMCQueueBase();
new (this) MPMCQueueBase(std::move(rhs));
}
return *this;
}
/// MPMCQueue can only be safely destroyed when there are no
/// pending enqueuers or dequeuers (this is not checked).
~MPMCQueue() {
~MPMCQueueBase() {
delete[] slots_;
}
......@@ -235,6 +747,11 @@ class MPMCQueue : boost::noncopyable {
return capacity_;
}
/// Doesn't change for non-dynamic
size_t allocatedCapacity() const noexcept {
return capacity_;
}
/// Returns the total number of calls to blockingWrite or successful
/// calls to write, including those blockingWrite calls that are
/// currently blocking
......@@ -256,7 +773,8 @@ class MPMCQueue : boost::noncopyable {
/// to a T constructor.
template <typename ...Args>
void blockingWrite(Args&&... args) noexcept {
enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_,
std::forward<Args>(args)...);
}
/// If an item can be enqueued with no blocking, does so and returns
......@@ -275,9 +793,14 @@ class MPMCQueue : boost::noncopyable {
template <typename ...Args>
bool write(Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainReadyPushTicket(ticket)) {
Slot* slots;
size_t cap;
int stride;
if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
tryObtainReadyPushTicket(ticket, slots, cap, stride)) {
// we have pre-validated that the ticket won't block
enqueueWithTicket(ticket, std::forward<Args>(args)...);
enqueueWithTicketBase(ticket, slots, cap, stride,
std::forward<Args>(args)...);
return true;
} else {
return false;
......@@ -288,11 +811,15 @@ class MPMCQueue : boost::noncopyable {
bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainPromisedPushTicketUntil(ticket, when)) {
Slot* slots;
size_t cap;
int stride;
if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
// we have pre-validated that the ticket won't block, or rather that
// it won't block longer than it takes another thread to dequeue an
// element from the slot it identifies.
enqueueWithTicket(ticket, std::forward<Args>(args)...);
enqueueWithTicketBase(ticket, slots, cap, stride,
std::forward<Args>(args)...);
return true;
} else {
return false;
......@@ -315,10 +842,15 @@ class MPMCQueue : boost::noncopyable {
template <typename ...Args>
bool writeIfNotFull(Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainPromisedPushTicket(ticket)) {
Slot* slots;
size_t cap;
int stride;
if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
// some other thread is already dequeuing the slot into which we
// are going to enqueue, but we might have to wait for them to finish
enqueueWithTicket(ticket, std::forward<Args>(args)...);
enqueueWithTicketBase(ticket, slots, cap, stride,
std::forward<Args>(args)...);
return true;
} else {
return false;
......@@ -328,16 +860,33 @@ class MPMCQueue : boost::noncopyable {
/// Moves a dequeued element onto elem, blocking until an element
/// is available
void blockingRead(T& elem) noexcept {
dequeueWithTicket(popTicket_++, elem);
uint64_t ticket;
static_cast<Derived<T,Atom,Dynamic>*>(this)->
blockingReadWithTicket(ticket, elem);
}
/// Same as blockingRead() but also records the ticket nunmer
void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
ticket = popTicket_++;
dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
}
/// If an item can be dequeued with no blocking, does so and returns
/// true, otherwise returns false.
bool read(T& elem) noexcept {
uint64_t ticket;
if (tryObtainReadyPopTicket(ticket)) {
return readAndGetTicket(ticket, elem);
}
/// Same as read() but also records the ticket nunmer
bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
Slot* slots;
size_t cap;
int stride;
if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
tryObtainReadyPopTicket(ticket, slots, cap, stride)) {
// the ticket has been pre-validated to not block
dequeueWithTicket(ticket, elem);
dequeueWithTicketBase(ticket, slots, cap, stride, elem);
return true;
} else {
return false;
......@@ -351,16 +900,20 @@ class MPMCQueue : boost::noncopyable {
/// prefer read.
bool readIfNotEmpty(T& elem) noexcept {
uint64_t ticket;
if (tryObtainPromisedPopTicket(ticket)) {
Slot* slots;
size_t cap;
int stride;
if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
// the matching enqueue already has a ticket, but might not be done
dequeueWithTicket(ticket, elem);
dequeueWithTicketBase(ticket, slots, cap, stride, elem);
return true;
} else {
return false;
}
}
private:
protected:
enum {
/// Once every kAdaptationFreq we will spin longer, to try to estimate
/// the proper spin backoff
......@@ -370,21 +923,41 @@ class MPMCQueue : boost::noncopyable {
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
/ sizeof(detail::SingleElementQueue<T,Atom>) + 1
/ sizeof(Slot) + 1
};
/// The maximum number of items in the queue at once
size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
/// Anonymous union for use when Dynamic = false and true, respectively
union {
/// An array of capacity_ SingleElementQueue-s, each of which holds
/// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
/// touch the slots at either end, to avoid false sharing
detail::SingleElementQueue<T,Atom>* slots_;
Slot* slots_;
/// Current dynamic slots array of dcapacity_ SingleElementQueue-s
Atom<Slot*> dslots_;
};
/// Anonymous union for use when Dynamic = false and true, respectively
union {
/// The number of slots_ indices that we advance for each ticket, to
/// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
/// aren't on the same cache line
int stride_;
/// Current stride
Atom<int> dstride_;
};
/// The following two memebers are used by dynamic MPMCQueue.
/// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
/// better cache locality if they are in the same cache line as
/// dslots_ and dstride_.
///
/// Dynamic state. A packed seqlock and ticket offset
Atom<uint64_t> dstate_;
/// Dynamic capacity
Atom<size_t> dcapacity_;
/// Enqueuers get tickets from here
Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
......@@ -405,7 +978,6 @@ class MPMCQueue : boost::noncopyable {
char padding_[detail::CacheLocality::kFalseSharingRange -
sizeof(Atom<uint32_t>)];
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
/// false sharing (multiple cores accessing the same cache line even
......@@ -446,23 +1018,29 @@ class MPMCQueue : boost::noncopyable {
/// Returns the index into slots_ that should be used when enqueuing or
/// dequeuing with the specified ticket
size_t idx(uint64_t ticket) noexcept {
return ((ticket * stride_) % capacity_) + kSlotPadding;
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
return ((ticket * stride) % cap) + kSlotPadding;
}
/// Maps an enqueue or dequeue ticket to the turn should be used at the
/// corresponding SingleElementQueue
uint32_t turn(uint64_t ticket) noexcept {
return ticket / capacity_;
uint32_t turn(uint64_t ticket, size_t cap) noexcept {
return ticket / cap;
}
/// Tries to obtain a push ticket for which SingleElementQueue::enqueue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
auto ticket = pushTicket_.load(std::memory_order_acquire); // A
bool tryObtainReadyPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
ticket = pushTicket_.load(std::memory_order_acquire); // A
slots = slots_;
cap = capacity_;
stride = stride_;
while (true) {
if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
if (!slots[idx(ticket, cap, stride)]
.mayEnqueue(turn(ticket, cap))) {
// if we call enqueue(ticket, ...) on the SingleElementQueue
// right now it would block, but this might no longer be the next
// ticket. We can increase the chance of tryEnqueue success under
......@@ -479,7 +1057,6 @@ class MPMCQueue : boost::noncopyable {
// or prev failing CAS) and the following CAS. If the CAS fails
// it will effect a load of pushTicket_
if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
rv = ticket;
return true;
}
}
......@@ -492,18 +1069,22 @@ class MPMCQueue : boost::noncopyable {
/// ticket is filled on success AND failure.
template <class Clock>
bool tryObtainPromisedPushTicketUntil(
uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride,
const std::chrono::time_point<Clock>& when
) noexcept {
bool deadlineReached = false;
while (!deadlineReached) {
if (tryObtainPromisedPushTicket(ticket)) {
if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
return true;
}
// ticket is a blocking ticket until the preceding ticket has been
// processed: wait until this ticket's turn arrives. We have not reserved
// this ticket so we will have to re-attempt to get a non-blocking ticket
// if we wake up before we time-out.
deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
deadlineReached = !slots[idx(ticket, cap, stride)]
.tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_,
(ticket % kAdaptationFreq) == 0, when);
}
return false;
}
......@@ -513,13 +1094,18 @@ class MPMCQueue : boost::noncopyable {
/// blocking may be required when using the returned ticket if some
/// other thread's pop is still in progress (ticket has been granted but
/// pop has not yet completed).
bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
bool tryObtainPromisedPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
slots = slots_;
cap = capacity_;
stride = stride_;
while (true) {
auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
int64_t n = numPushes - numPops;
rv = numPushes;
ticket = numPushes;
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
......@@ -535,10 +1121,16 @@ class MPMCQueue : boost::noncopyable {
/// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
auto ticket = popTicket_.load(std::memory_order_acquire);
bool tryObtainReadyPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
ticket = popTicket_.load(std::memory_order_acquire);
slots = slots_;
cap = capacity_;
stride = stride_;
while (true) {
if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
if (!slots[idx(ticket, cap, stride)]
.mayDequeue(turn(ticket, cap))) {
auto prev = ticket;
ticket = popTicket_.load(std::memory_order_acquire);
if (prev == ticket) {
......@@ -546,7 +1138,6 @@ class MPMCQueue : boost::noncopyable {
}
} else {
if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
rv = ticket;
return true;
}
}
......@@ -563,7 +1154,9 @@ class MPMCQueue : boost::noncopyable {
/// to block waiting for someone to call enqueue, although we might
/// have to block waiting for them to finish executing code inside the
/// MPMCQueue itself.
bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
bool tryObtainPromisedPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
auto numPops = popTicket_.load(std::memory_order_acquire); // A
while (true) {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
......@@ -574,7 +1167,10 @@ class MPMCQueue : boost::noncopyable {
return false;
}
if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
rv = numPops;
ticket = numPops;
slots = slots_;
cap = capacity_;
stride = stride_;
return true;
}
}
......@@ -582,25 +1178,35 @@ class MPMCQueue : boost::noncopyable {
// Given a ticket, constructs an enqueued item using args
template <typename ...Args>
void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
slots_[idx(ticket)].enqueue(turn(ticket),
void enqueueWithTicketBase(
uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args
) noexcept {
slots[idx(ticket, cap, stride)]
.enqueue(turn(ticket, cap),
pushSpinCutoff_,
(ticket % kAdaptationFreq) == 0,
std::forward<Args>(args)...);
}
// To support tracking ticket numbers in MPMCPipelineStageImpl
template <typename ...Args>
void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
enqueueWithTicketBase(ticket, slots_, capacity_, stride_,
std::forward<Args>(args)...);
}
// Given a ticket, dequeues the corresponding element
void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
slots_[idx(ticket)].dequeue(turn(ticket),
void dequeueWithTicketBase(
uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem
) noexcept {
slots[idx(ticket, cap, stride)]
.dequeue(turn(ticket, cap),
popSpinCutoff_,
(ticket % kAdaptationFreq) == 0,
elem);
}
};
namespace detail {
/// SingleElementQueue implements a blocking queue that holds at most one
/// item, and that requires its users to assign incrementing identifiers
/// (turns) to each enqueue and dequeue operation. Note that the turns
......
......@@ -76,8 +76,8 @@ class MPMCPipelineStageImpl {
}
uint64_t blockingRead(T& elem) noexcept {
uint64_t ticket = queue_.popTicket_++;
queue_.dequeueWithTicket(ticket, elem);
uint64_t ticket;
queue_.blockingReadWithTicket(ticket, elem);
return ticket;
}
......@@ -87,12 +87,7 @@ class MPMCPipelineStageImpl {
template <class... Args>
bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
if (queue_.tryObtainReadyPopTicket(ticket)) {
queue_.dequeueWithTicket(ticket, elem);
return true;
} else {
return false;
}
return queue_.readAndGetTicket(ticket, elem);
}
// See MPMCQueue<T>::writeCount; only works for the first stage
......
......@@ -101,9 +101,9 @@ TEST(MPMCQueue, sequencer_deterministic) {
run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
}
template <typename T>
template <bool Dynamic = false, typename T>
void runElementTypeTest(T&& src) {
MPMCQueue<T> cq(10);
MPMCQueue<T, std::atomic, Dynamic> cq(10);
cq.blockingWrite(std::forward<T>(src));
T dest;
cq.blockingRead(dest);
......@@ -153,7 +153,21 @@ TEST(MPMCQueue, lots_of_element_types) {
EXPECT_EQ(RefCounted::active_instances, 0);
}
TEST(MPMCQueue, lots_of_element_types_dynamic) {
runElementTypeTest<true>(10);
runElementTypeTest<true>(string("abc"));
runElementTypeTest<true>(std::make_pair(10, string("def")));
runElementTypeTest<true>(vector<string>{{"abc"}});
runElementTypeTest<true>(std::make_shared<char>('a'));
runElementTypeTest<true>(folly::make_unique<char>('a'));
runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
EXPECT_EQ(RefCounted::active_instances, 0);
}
TEST(MPMCQueue, single_thread_enqdeq) {
// Non-dynamic version only.
// False positive for dynamic version. Capacity can be temporarily
// higher than specified.
MPMCQueue<int> cq(10);
for (int pass = 0; pass < 10; ++pass) {
......@@ -184,6 +198,9 @@ TEST(MPMCQueue, single_thread_enqdeq) {
}
TEST(MPMCQueue, tryenq_capacity_test) {
// Non-dynamic version only.
// False positive for dynamic version. Capacity can be temporarily
// higher than specified.
for (size_t cap = 1; cap < 100; ++cap) {
MPMCQueue<int> cq(cap);
for (size_t i = 0; i < cap; ++i) {
......@@ -194,6 +211,9 @@ TEST(MPMCQueue, tryenq_capacity_test) {
}
TEST(MPMCQueue, enq_capacity_test) {
// Non-dynamic version only.
// False positive for dynamic version. Capacity can be temporarily
// higher than specified.
for (auto cap : { 1, 100, 10000 }) {
MPMCQueue<int> cq(cap);
for (int i = 0; i < cap; ++i) {
......@@ -214,11 +234,11 @@ TEST(MPMCQueue, enq_capacity_test) {
}
}
template <template<typename> class Atom>
template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqThread(
int numThreads,
int n, /*numOps*/
MPMCQueue<int, Atom>& cq,
MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
......@@ -241,18 +261,18 @@ void runTryEnqDeqThread(
sum += threadSum;
}
template <template<typename> class Atom>
template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
// hard guarantees on their individual behavior. We can still test
// correctness in aggregate
MPMCQueue<int,Atom> cq(numThreads);
MPMCQueue<int,Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
numThreads, n, std::ref(cq), std::ref(sum), t));
}
for (auto& t : threads) {
......@@ -271,6 +291,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
}
}
TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
int nts[] = { 1, 3, 100 };
int n = 100000;
for (int nt : nts) {
runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
}
}
TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
int nts[] = { 1, 3, 100 };
......@@ -280,6 +309,15 @@ TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
}
}
TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
int nts[] = { 1, 3, 100 };
int n = 100000;
for (int nt : nts) {
runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
}
}
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
......@@ -296,6 +334,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
DSched sched(DSched::uniformSubset(seed, 2));
runTryEnqDeqTest<DeterministicAtomic>(nt, n);
}
{
DSched sched(DSched::uniform(seed));
runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
}
{
DSched sched(DSched::uniformSubset(seed, 2));
runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
}
}
}
......@@ -414,10 +460,11 @@ string producerConsumerBench(Q&& queue,
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
uint64_t failures = failed;
size_t allocated = q.allocatedCapacity();
return folly::sformat(
"{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
"handoff, {} failures",
"handoff, {} failures, {} allocated",
qName,
numProducers,
writer.methodName(),
......@@ -425,59 +472,76 @@ string producerConsumerBench(Q&& queue,
nanosPer,
csw,
n,
failures);
failures,
allocated);
}
TEST(MPMCQueue, mt_prod_cons_deterministic) {
template <bool Dynamic = false>
void runMtProdConsDeterministic(long seed) {
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(0));
DSched sched(DSched::uniform(seed));
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
callers;
callers.emplace_back(
make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
callers.emplace_back(
make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
callers.emplace_back(
make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
milliseconds(1)));
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
seconds(2)));
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
Dynamic>>>> callers;
callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
DeterministicAtomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
DeterministicAtomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
DeterministicAtomic, Dynamic>>>());
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
DeterministicAtomic, Dynamic>>>(milliseconds(1)));
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
DeterministicAtomic, Dynamic>>>(seconds(2)));
size_t cap;
for (const auto& caller : callers) {
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
"MPMCQueue<int, DeterministicAtomic>(10)",
cap = 10;
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
"MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ folly::to<std::string>(cap)+")",
1,
1,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
"MPMCQueue<int, DeterministicAtomic>(100)",
cap = 100;
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
"MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ folly::to<std::string>(cap)+")",
10,
10,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
"MPMCQueue<int, DeterministicAtomic>(10)",
cap = 10;
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
"MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ folly::to<std::string>(cap)+")",
1,
1,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
"MPMCQueue<int, DeterministicAtomic>(100)",
cap = 100;
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
"MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ folly::to<std::string>(cap)+")",
10,
10,
1000,
*caller);
LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
"MPMCQueue<int, DeterministicAtomic>(1)",
cap = 1;
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
"MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ folly::to<std::string>(cap)+")",
10,
10,
1000,
......@@ -485,74 +549,177 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) {
}
}
void runMtProdConsDeterministicDynamic(
long seed,
uint32_t prods,
uint32_t cons,
uint32_t numOps,
size_t cap,
size_t minCap,
size_t mult
) {
// we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(seed));
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
true>>>> callers;
callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
DeterministicAtomic, true>>>());
callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
DeterministicAtomic, true>>>());
callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
DeterministicAtomic, true>>>());
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
DeterministicAtomic, true>>>(milliseconds(1)));
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
DeterministicAtomic, true>>>(seconds(2)));
for (const auto& caller : callers) {
LOG(INFO) <<
producerConsumerBench(
MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
"MPMCQueue<int, DeterministicAtomic, true>("
+ folly::to<std::string>(cap) + ", "
+ folly::to<std::string>(minCap) + ", "
+ folly::to<std::string>(mult)+")",
prods,
cons,
numOps,
*caller);
}
}
TEST(MPMCQueue, mt_prod_cons_deterministic) {
runMtProdConsDeterministic(0);
}
TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
runMtProdConsDeterministic<true>(0);
}
template <typename T>
void setFromEnv(T& var, const char* envvar) {
char* str = std::getenv(envvar);
if (str) { var = atoi(str); }
}
TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
long seed = 0;
uint32_t prods = 10;
uint32_t cons = 10;
uint32_t numOps = 1000;
size_t cap = 10000;
size_t minCap = 9;
size_t mult = 3;
setFromEnv(seed, "SEED");
setFromEnv(prods, "PRODS");
setFromEnv(cons, "CONS");
setFromEnv(numOps, "NUM_OPS");
setFromEnv(cap, "CAP");
setFromEnv(minCap, "MIN_CAP");
setFromEnv(mult, "MULT");
runMtProdConsDeterministicDynamic(
seed, prods, cons, numOps, cap, minCap, mult);
}
#define PC_BENCH(q, np, nc, ...) \
producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
TEST(MPMCQueue, mt_prod_cons) {
template <bool Dynamic = false>
void runMtProdCons() {
int n = 100000;
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
setFromEnv(n, "NUM_OPS");
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
callers;
callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
std::atomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
std::atomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
Dynamic>>>());
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
std::atomic, Dynamic>>>(milliseconds(1)));
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
std::atomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
1, 1, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
10, 1, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
1, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
10, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
1, 1, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
10, 1, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
1, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
10, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
32, 100, n, *caller);
}
}
TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
TEST(MPMCQueue, mt_prod_cons) {
runMtProdCons();
}
TEST(MPMCQueue, mt_prod_cons_dynamic) {
runMtProdCons</* Dynamic = */ true>();
}
template <bool Dynamic = false>
void runMtProdConsEmulatedFutex() {
int n = 100000;
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
callers;
callers.emplace_back(
make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
callers.emplace_back(
make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
callers.emplace_back(
make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
milliseconds(1)));
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
seconds(2)));
vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
Dynamic>>>> callers;
callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
EmulatedFutexAtomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
EmulatedFutexAtomic, Dynamic>>>());
callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
EmulatedFutexAtomic, Dynamic>>>());
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
(MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
(10000)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
(100000)), 32, 100, n, *caller);
}
}
template <template <typename> class Atom>
TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
runMtProdConsEmulatedFutex();
}
TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
runMtProdConsEmulatedFutex</* Dynamic = */ true>();
}
template <template <typename> class Atom, bool Dynamic = false>
void runNeverFailThread(int numThreads,
int n, /*numOps*/
MPMCQueue<int, Atom>& cq,
MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
......@@ -568,10 +735,10 @@ void runNeverFailThread(int numThreads,
sum += threadSum;
}
template <template <typename> class Atom>
template <template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
MPMCQueue<int, Atom> cq(numThreads);
MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
......@@ -579,7 +746,7 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
numThreads,
n,
std::ref(cq),
......@@ -595,51 +762,72 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
return nowMicro() - beginMicro;
}
TEST(MPMCQueue, mt_never_fail) {
int nts[] = {1, 3, 100};
int n = 100000;
template <template<typename> class Atom, bool Dynamic = false>
void runMtNeverFail(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
int nts[] = {1, 3, 100};
TEST(MPMCQueue, mt_never_fail) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<std::atomic>(nts, n);
}
TEST(MPMCQueue, mt_never_fail_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
runMtNeverFail<std::atomic, true>(nts, n);
}
TEST(MPMCQueue, mt_never_fail_deterministic) {
int nts[] = {3, 10};
TEST(MPMCQueue, mt_never_fail_emulated_futex) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}
long seed = 0; // nowMicro() % 10000;
LOG(INFO) << "using seed " << seed;
TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
}
int n = 1000;
template<bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
LOG(INFO) << "using seed " << seed;
for (int nt : nts) {
{
DSched sched(DSched::uniform(seed));
runNeverFailTest<DeterministicAtomic>(nt, n);
runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
}
{
DSched sched(DSched::uniformSubset(seed, 2));
runNeverFailTest<DeterministicAtomic>(nt, n);
runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
}
}
}
template <class Clock, template <typename> class Atom>
TEST(MPMCQueue, mt_never_fail_deterministic) {
std::vector<int> nts {3, 10};
long seed = 0; // nowMicro() % 10000;
int n = 1000;
runMtNeverFailDeterministic(nts, n, seed);
}
TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
std::vector<int> nts {3, 10};
long seed = 0; // nowMicro() % 10000;
int n = 1000;
runMtNeverFailDeterministic<true>(nts, n, seed);
}
template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
MPMCQueue<int, Atom>& cq,
MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
......@@ -656,10 +844,10 @@ void runNeverFailUntilThread(int numThreads,
sum += threadSum;
}
template <class Clock, template <typename> class Atom>
template <class Clock, template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
MPMCQueue<int, Atom> cq(numThreads);
MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
......@@ -667,7 +855,8 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
threads[t] = DSched::thread(std::bind(
runNeverFailUntilThread<Clock, Atom, Dynamic>,
numThreads,
n,
std::ref(cq),
......@@ -683,30 +872,50 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
return nowMicro() - beginMicro;
}
TEST(MPMCQueue, mt_never_fail_until_system) {
int nts[] = {1, 3, 100};
int n = 100000;
template <bool Dynamic = false>
void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
TEST(MPMCQueue, mt_never_fail_until_steady) {
int nts[] = {1, 3, 100};
TEST(MPMCQueue, mt_never_fail_until_system) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSystem(nts, n);
}
TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSystem<true>(nts, n);
}
template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
TEST(MPMCQueue, mt_never_fail_until_steady) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSteady(nts, n);
}
TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
std::vector<int> nts {1, 3, 100};
int n = 100000;
runMtNeverFailUntilSteady<true>(nts, n);
}
enum LifecycleEvent {
NOTHING = -1,
DEFAULT_CONSTRUCTOR,
......@@ -794,7 +1003,8 @@ void runPerfectForwardingTest() {
EXPECT_EQ(lc_outstanding(), 0);
{
MPMCQueue<Lifecycle<R>> queue(50);
// Non-dynamic only. False positive for dynamic.
MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
LIFECYCLE_STEP(NOTHING);
for (int pass = 0; pass < 10; ++pass) {
......@@ -870,19 +1080,21 @@ TEST(MPMCQueue, perfect_forwarding_relocatable) {
runPerfectForwardingTest<std::true_type>();
}
TEST(MPMCQueue, queue_moving) {
template <bool Dynamic = false>
void run_queue_moving() {
lc_snap();
EXPECT_EQ(lc_outstanding(), 0);
{
MPMCQueue<Lifecycle<std::false_type>> a(50);
MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
LIFECYCLE_STEP(NOTHING);
a.blockingWrite();
LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
// move constructor
MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
= std::move(a);
LIFECYCLE_STEP(NOTHING);
EXPECT_EQ(a.capacity(), 0);
EXPECT_EQ(a.size(), 0);
......@@ -893,7 +1105,7 @@ TEST(MPMCQueue, queue_moving) {
LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
// move operator
MPMCQueue<Lifecycle<std::false_type>> c;
MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
LIFECYCLE_STEP(NOTHING);
c = std::move(b);
LIFECYCLE_STEP(NOTHING);
......@@ -908,7 +1120,7 @@ TEST(MPMCQueue, queue_moving) {
{
// swap
MPMCQueue<Lifecycle<std::false_type>> d(10);
MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
LIFECYCLE_STEP(NOTHING);
std::swap(c, d);
LIFECYCLE_STEP(NOTHING);
......@@ -933,6 +1145,17 @@ TEST(MPMCQueue, queue_moving) {
LIFECYCLE_STEP(DESTRUCTOR);
}
TEST(MPMCQueue, queue_moving) {
run_queue_moving();
}
TEST(MPMCQueue, queue_moving_dynamic) {
run_queue_moving<true>();
}
TEST(MPMCQueue, explicit_zero_capacity_fail) {
ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment