Commit c746867a authored by Aaryaman Sagar's avatar Aaryaman Sagar Committed by Facebook Github Bot

DistributedMutex - A slightly different small exclusive-only mutex

Summary:
DistributedMutex is a small, exclusive-only mutex that distributes the
bookkeeping required for mutual exclusion in the stacks of threads that are
contending for it.  It tries to come at a lower space cost than std::mutex
while still trying to maintain the fairness benefits that come from using
std::mutex.  DistributedMutex provides the entire API included in
std::mutex, and more, with slight modifications.  DistributedMutex is the
same width as a single pointer (8 bytes on most platforms), where on the
other hand, std::mutex and pthread_mutex_t are both 40 bytes.  It is larger
than some of the other smaller locks, but the wide majority of cases using
the small locks are wasting the difference in alignment padding anyway

Benchmark results are good - at the time of writing in the common
uncontended case, it is 30% faster than some of the other small mutexes in
folly and as fast as std::mutex, which recently optimized its uncontended
path.  In the contended case, it is about 4-5x faster than some of the
smaller locks in folly, ~2x faster than std::mutex in clang and ~1.8x
faster in gcc.  DistributedMutex is also resistent to tail latency
pathalogies unlike many of the other small mutexes.  Which sleep for large
time quantums to reduce spin churn, this causes elevated latencies for
threads that enter the sleep cycle.  The tail latency of lock acquisition
on average up to 10x better with DistributedMutex

DistributedMutex reduces cache line contention by making each thread wait
on a thread local spinlock and futex.  This allows threads to keep working
only on their own cache lines without requiring cache coherence operations
when a mutex heavy contention.  This strategy does not require sequential
ordering on the centralized atomic storage for wakeup operations as each
thread assigned its own wait state

Non-timed mutex acquisitions are scheduled through intrusive LIFO
contention chains.  Each thread starts by spinning for a short quantum and
falls back to two phased sleeping.  Enqueue operations are lock free and
are piggybacked off mutex acquisition attempts.  The LIFO behavior of a
contention chain is good in the case where the mutex is held for a short
amount of time, as the head of the chain is likely to not have slept on
futex() after exhausting its spin quantum.  This allow us to avoid
unnecessary traversal and syscalls in the fast path with a higher
probability.  Even though the contention chains are LIFO, the mutex itself
does not adhere to that scheduling policy globally.  During contention,
threads that fail to lock the mutex form a LIFO chain on the central mutex
state, this chain is broken when a wakeup is scheduled, and future enqueue
operations form a new chain.  This makes the chains themselves LIFO, but
preserves global fairness through a constant factor which is limited to the
number of concurrent failed mutex acquisition attempts.  This binds the
last in first out behavior to the number of contending threads and helps
prevent starvation and latency outliers

This strategy of waking up wakers one by one in a queue does not scale well
when the number of threads goes past the number of cores.  At which point
preemption causes elevated lock acquisition latencies.  DistributedMutex
implements a hardware timestamp publishing heuristic to detect and adapt to
preemption.

DistributedMutex does not have the typical mutex API - it does not satisfy
the Lockable concept.  It requires the user to maintain ephemeral
bookkeeping and pass that bookkeeping around to unlock() calls.  The API
overhead, however, comes for free when you wrap this mutex for usage with
folly::Synchronized or std::unique_lock, which is the recommended usage
(std::lock_guard, in optimized mode, has no performance benefit over
std::unique_lock, so has been omitted).  A benefit of this API is that it
disallows incorrect usage where a thread unlocks a mutex that it does not
own, thinking a mutex is functionally identical to a binary semaphore,
which, unlike a mutex, is a suitable primitive for that usage

Timed locking through DistributedMutex is implemented through a centralized
algorithm - all waiters wait on the central mutex state, by setting and
resetting bits within the pointer-length word.  Since pointer length atomic
integers are incompatible with futex(FUTEX_WAIT) on most systems, a
non-standard implementation of futex() is used, where wait queues are
managed in user-space.  See p1135r0 and folly::ParkingLot

Reviewed By: djwatson

Differential Revision: D8949918

fbshipit-source-id: a772a70114e943ff68525c990da45e32ad1a5077
parent 87229af6
......@@ -18,6 +18,9 @@
#include <folly/Portability.h>
#include <chrono>
#include <cstdint>
#ifdef _MSC_VER
#include <intrin.h>
#endif
......@@ -42,4 +45,18 @@ inline void asm_volatile_pause() {
asm volatile("or 27,27,27");
#endif
}
inline std::uint64_t asm_rdtsc() {
#if FOLLY_X64
// read the timestamp counter on x86
auto hi = std::uint32_t{};
auto lo = std::uint32_t{};
asm volatile("rdtsc" : "=a"(lo), "=d"(hi));
return (((std::uint64_t)lo) + (((std::uint64_t)hi) << 32));
#else
// use steady_clock::now() as an approximation for the timestamp counter on
// non-x86 systems
return std::chrono::steady_clock::now().time_since_epoch().count();
#endif
}
} // namespace folly
/*
* Copyright 2004-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/synchronization/DistributedMutex.h>
#include <folly/CachelinePadded.h>
#include <folly/Likely.h>
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
#include <folly/Utility.h>
#include <folly/detail/Futex.h>
#include <folly/lang/Align.h>
#include <folly/portability/Asm.h>
#include <folly/synchronization/AtomicNotification.h>
#include <folly/synchronization/AtomicUtil.h>
#include <folly/synchronization/WaitOptions.h>
#include <folly/synchronization/detail/Sleeper.h>
#include <folly/synchronization/detail/Spin.h>
#include <glog/logging.h>
#include <atomic>
#include <cstdint>
#include <limits>
#include <stdexcept>
#include <thread>
namespace folly {
namespace detail {
namespace distributed_mutex {
// kUnlocked is used to show unlocked state
//
// When locking threads encounter kUnlocked in the underlying storage, they
// can just acquire the lock without any further effort
constexpr auto kUnlocked = std::uintptr_t{0b0};
// kLocked is used to show that the mutex is currently locked, and future
// attempts to lock the mutex should enqueue on the central storage
//
// Locking threads find this on central storage only when there is a
// contention chain that is undergoing wakeups, in every other case, a locker
// will either find kUnlocked or an arbitrary address with the kLocked bit set
constexpr auto kLocked = std::uintptr_t{0b1};
// kTimedWaiter is set when there is at least one timed waiter on the mutex
//
// Timed waiters do not follow the sleeping strategy employed by regular,
// non-timed threads. They sleep on the central mutex atomic through an
// extended futex() interface that allows sleeping with the same semantics for
// non-standard integer widths
//
// When a regular non-timed thread unlocks or enqueues on the mutex, and sees
// a timed waiter, it takes ownership of all the timed waiters. The thread
// that has taken ownership of the timed waiter releases the timed waiters
// when it gets a chance at the critical section. At which point it issues a
// wakeup to single timed waiter, timed waiters always issue wake() calls to
// other timed waiters
constexpr auto kTimedWaiter = std::uintptr_t{0b10};
// kUninitialized means that the thread has just enqueued, and has not yet
// gotten to initializing itself with the address of its successor
//
// this becomes significant for threads that are trying to wake up the
// uninitialized thread, if they see that the thread is not yet initialized,
// they can do nothing but spin, and wait for the thread to get initialized
constexpr auto kUninitialized = std::uint32_t{0b0};
// kWaiting will be set in the waiter's futex structs while they are spinning
// while waiting for the mutex
constexpr auto kWaiting = std::uint32_t{0b1};
// kWake will be set by threads that are waking up waiters that have enqueued
constexpr auto kWake = std::uint32_t{0b10};
// kSkipped will be set by a waker when they see that a waiter has been
// preempted away by the kernel, in this case the thread that got skipped will
// have to wake up and put itself back on the queue
constexpr auto kSkipped = std::uint32_t{0b11};
// kAboutToWait will be set by a waiter that enqueues itself with the purpose
// of waiting on a futex
constexpr auto kAboutToWait = std::uint32_t{0b100};
// kSleeping will be set by a waiter right before enqueueing on a futex. When
// a thread wants to wake up a waiter that has enqueued on a futex, it should
// set the futex to contain kWake
//
// a thread that is unlocking and wants to skip over a sleeping thread also
// calls futex_.exchange(kSleeping) on the sleeping thread's futex word. It
// does this to 1. detect whether the sleeping thread had actually gone to
// sleeping on the futex word so it can skip it, and 2. to synchronize with
// other non atomic writes in the sleeping thread's context (such as the write
// to track the next waiting thread).
//
// We reuse kSleeping instead of say using another constant kEarlyDelivery to
// avoid situations where a thread has to enter kernel mode due to calling
// futexWait() twice because of the presence of a waking thread. This
// situation can arise when an unlocking thread goes to skip over a sleeping
// thread, sees that the thread has slept and move on, but the sleeping thread
// had not yet entered futex(). This interleaving causes the thread calling
// futex() to return spuriously, as the futex word is not what it should be
constexpr auto kSleeping = std::uint32_t{0b101};
// The number of spins that we are allowed to do before we resort to marking a
// thread as having slept
//
// This is just a magic number from benchmarks
constexpr auto kScheduledAwaySpinThreshold = std::chrono::nanoseconds{200};
// The maximum number of spins before a thread starts yielding its processor
// in hopes of getting skipped
constexpr auto kMaxSpins = 4000;
/**
* Write only data that is available to the thread that is waking up another.
* Only the waking thread is allowed to write to this, the thread to be woken
* is allowed to read from this after a wakeup has been issued
*
* Because of the write only semantics of the data here, acquire-release (or
* stronger) memory ordering is needed to write to this
*/
class WakerMetadata {
public:
// This is the thread that initiated wakeups for the contention chain.
// There can only ever be one thread that initiates the wakeup for a
// chain in the spin only version of this mutex. When a thread that just
// woke up sees this as the next thread to wake up, it knows that it is the
// terminal node in the contention chain. This means that it was the one
// that took off the thread that had acquired the mutex off the centralized
// state. Therefore, the current thread is the last in it's contention
// chain. It will fall back to centralized storage to pick up the next
// waiter or release the mutex
//
// When we move to a full sleeping implementation, this might need to change
// to a small_vector<> to account for failed wakeups, or we can put threads
// to sleep on the central futex, which is an easier implementation
// strategy. Although, since this is allocated on the stack, we can set a
// prohitively large threshold to avoid heap allocations, this strategy
// however, might cause increased cache misses on wakeup signalling
std::uintptr_t waker_{0};
};
/**
* Waiter encapsulates the state required for waiting on the mutex, this
* contains potentially heavy state and is intended to be allocated on the
* stack as part of a lock() function call
*/
template <template <typename> class Atomic>
class Waiter {
public:
explicit Waiter(std::uint64_t futex) : futex_{futex} {}
// the atomic that this thread will spin on while waiting for the mutex to
// be unlocked
Atomic<std::uint64_t> futex_{kUninitialized};
// metadata for the waker
WakerMetadata wakerMetadata_{};
// The successor of this node. This will be the thread that had its address
// on the mutex previously
std::uintptr_t next_{0};
// the list of threads that the waker had previously seen to be sleeping on
// a futex(),
//
// this is given to the current thread as a means to pass on
// information. When the current thread goes to unlock the mutex and does
// not see contention, it should go and wake up the head of this list. If
// the current thread sees a contention chain on the mutex, it should pass
// on this list to the next thread that gets woken up
std::uintptr_t waiters_{0};
// The futex that this waiter will sleep on
//
// how can we reuse futex_ from above for futex management?
Futex<Atomic> sleeper_{kUninitialized};
};
/**
* Get the time since epoch in nanoseconds
*
* This is faster than std::chrono::steady_clock because it avoids a VDSO
* access to get the timestamp counter
*
* Note that the hardware timestamp counter on x86, like std::steady_clock is
* guaranteed to be monotonically increasing -
* https://c9x.me/x86/html/file_module_x86_id_278.html
*/
inline std::chrono::nanoseconds time() {
return std::chrono::nanoseconds{asm_rdtsc()};
}
/**
* Zero out the other bits used by the implementation and return just an
* address from a uintptr_t
*/
template <typename Type>
Type* extractAddress(std::uintptr_t from) {
// shift one bit off the end, to get all 1s followed by a single 0
auto mask = std::numeric_limits<std::uintptr_t>::max();
mask >>= 1;
mask <<= 1;
CHECK(!(mask & 0b1));
return reinterpret_cast<Type*>(from & mask);
}
/**
* Strips the given nanoseconds into only the least significant 56 bits by
* moving the least significant 56 bits over by 8 zeroing out the bottom 8
* bits to be used as a medium of information transfer for the thread wait
* nodes
*/
inline std::uint64_t strip(std::chrono::nanoseconds t) {
auto time = t.count();
return time << 8;
}
/**
* Recover the timestamp value from an integer that has the timestamp encoded
* in it
*/
inline std::uint64_t recover(std::uint64_t from) {
return from >> 8;
}
template <template <typename> class Atomic, bool TimePublishing>
class DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy {
public:
/// DistributedMutexStateProxy is movable, so the caller can contain their
/// critical section by moving the proxy around
DistributedMutexStateProxy(DistributedMutexStateProxy&& other)
: next_{exchange(other.next_, nullptr)},
expected_{exchange(other.expected_, 0)},
wakerMetadata_{exchange(other.wakerMetadata_, {})},
waiters_{exchange(other.waiters_, nullptr)},
ready_{exchange(other.ready_, nullptr)} {}
/// The proxy is valid when a mutex acquisition attempt was successful,
/// lock() is guaranteed to return a valid proxy, try_lock() is not
explicit operator bool() const {
return expected_;
}
// private:
/// friend the mutex class, since that will be accessing state private to
/// this class
friend class DistributedMutex<Atomic, TimePublishing>;
DistributedMutexStateProxy(
CachelinePadded<Waiter<Atomic>>* next,
std::uintptr_t expected,
bool timedWaiter = false,
WakerMetadata wakerMetadata = {},
CachelinePadded<Waiter<Atomic>>* waiters = nullptr,
CachelinePadded<Waiter<Atomic>>* ready = nullptr)
: next_{next},
expected_{expected},
timedWaiters_{timedWaiter},
wakerMetadata_{wakerMetadata},
waiters_{waiters},
ready_{ready} {}
// the next thread that is to be woken up, this being null at the time of
// unlock() shows that the current thread acquired the mutex without
// contention or it was the terminal thread in the queue of threads waking up
CachelinePadded<Waiter<Atomic>>* next_{nullptr};
// this is the value that the current thread should expect to find on
// unlock, and if this value is not there on unlock, the current thread
// should assume that other threads are enqueued waiting for the mutex
//
// note that if the mutex has the same state set at unlock time, and this is
// set to an address (and not say kLocked in the case of a terminal waker)
// then it must have been the case that no other thread had enqueued itself,
// since threads in the domain of this mutex do not share stack space
//
// if we want to support stack sharing, we can solve the problem by looping
// at lock time, and setting a variable that says whether we have acquired
// the lock or not perhaps
std::uintptr_t expected_{0};
// a boolean that will be set when the mutex has timed waiters that the
// current thread is responsible for waking, in such a case, the current
// thread will issue an atomic_notify_one() call after unlocking the mutex
//
// note that a timed waiter will itself always have this flag set. This is
// done so we can avoid having to issue a atomic_notify_all() call (and
// subsequently a thundering herd) when waking up timed-wait threads
bool timedWaiters_{false};
// metadata passed along from the thread that woke this thread up
WakerMetadata wakerMetadata_{};
// the list of threads that are waiting on a futex
//
// the current threads is meant to wake up this list of waiters if it is
// able to commit an unlock() on the mutex without seeing a contention chain
CachelinePadded<Waiter<Atomic>>* waiters_{nullptr};
// after a thread has woken up from a futex() call, it will have the rest of
// the threads that it were waiting behind it in this list, a thread that
// unlocks has to wake up threads from this list if it has any, before it
// goes to sleep to prevent pathological unfairness
CachelinePadded<Waiter<Atomic>>* ready_{nullptr};
};
template <template <typename> class Atomic, bool TimePublishing>
DistributedMutex<Atomic, TimePublishing>::DistributedMutex()
: state_{kUnlocked} {}
template <typename Waiter>
bool spin(Waiter& waiter) {
auto spins = 0;
while (true) {
// publish our current time in the futex as a part of the spin waiting
// process
//
// if we are under the maximum number of spins allowed before sleeping, we
// publish the exact timestamp, otherwise we publish the minimum possible
// timestamp to force the waking thread to skip us
++spins;
auto now = (spins < kMaxSpins) ? time() : decltype(time())::zero();
auto data = strip(now) | kWaiting;
auto signal = waiter.futex_.exchange(data, std::memory_order_acq_rel);
signal &= std::numeric_limits<std::uint8_t>::max();
// if we got skipped, make a note of it and return if we got a skipped
// signal or a signal to wake up
auto skipped = signal == kSkipped;
if (skipped || (signal == kWake)) {
return !skipped;
}
// if we are under the spin threshold, pause to allow the other
// hyperthread to run. If not, then sleep
if (spins < kMaxSpins) {
asm_volatile_pause();
} else {
Sleeper::sleep();
}
}
}
template <typename Waiter>
void doFutexWake(Waiter* waiter) {
if (waiter) {
// We can use a simple store operation here and not worry about checking
// to see if the thread had actually started waiting on the futex, that is
// already done in tryWake() when a sleeping thread is collected
//
// We now do not know whether the waiter had already enqueued on the futex
// or whether it had just stored kSleeping in its futex and was about to
// call futexWait(). We treat both these scenarios the same
//
// the below can theoretically cause a problem if we set the
// wake signal and the waiter was in between setting kSleeping in its
// futex and enqueueing on the futex. In this case the waiter will just
// return from futexWait() immediately. This leaves the address that the
// waiter was using for futexWait() possibly dangling, and the thread that
// we woke in the exchange above might have used that address for some
// other object
//
// however, even if the thread had indeed woken up simply becasue of the
// above exchange(), the futexWake() below is not incorrect. It is not
// incorrect because futexWake() does not actually change the memory of
// the futex word. It just uses the address to do a lookup in the kernel
// futex table. And even if we call futexWake() on some other address,
// and that address was being used to wait on futex() that thread will
// protect itself from spurious wakeups, check the value in the futex word
// and enqueue itself back on the futex
//
// this dangilng pointer possibility is why we use a pointer to the futex
// word, and avoid dereferencing after the store() operation
auto sleeper = &(*waiter)->sleeper_;
sleeper->store(kWake, std::memory_order_release);
futexWake(sleeper, 1);
}
}
template <typename Waiter>
bool doFutexWait(Waiter* waiter, Waiter*& next) {
// first we get ready to sleep by calling exchange() on the futex with a
// kSleeping value
DCHECK((*waiter)->futex_.load(std::memory_order_relaxed) == kAboutToWait);
// note the semantics of using a futex here, when we exchange the sleeper_
// with kSleeping, we are getting ready to sleep, but before sleeping we get
// ready to sleep, and we return from futexWait() when the value of
// sleeper_ might have changed. We can also wake up because of a spurious
// wakeup, so we always check against the value in sleeper_ after returning
// from futexWait(), if the value is not kWake, then we continue
auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
// Seeing a kSleeping on a futex word before we set it ourselves means only
// one thing - an unlocking thread caught us before we went to futex(), and
// we now have the lock, so we abort
//
// if we were given an early delivery, we can return from this function with
// a true, meaning that we now have the lock
if (pre == kSleeping) {
return true;
}
// if we reach here then were were not given an early delivery, and any
// thread that goes to wake us up will see a consistent view of the rest of
// the contention chain (since the next_ variable is set before the
// kSleeping exchange above)
while (pre != kWake) {
// before enqueueing on the futex, we wake any waiters that we were
// possibly responsible for
doFutexWake(exchange(next, nullptr));
// then we wait on the futex
//
// note that we have to protect ourselves against spurious wakeups here.
// Because the corresponding futexWake() above does not synchronize
// wakeups around the futex word. Because doing so would become
// inefficient
futexWait(&(*waiter)->sleeper_, kSleeping);
pre = (*waiter)->sleeper_.load(std::memory_order_acquire);
DCHECK((pre == kSleeping) || (pre == kWake));
}
// when coming out of a futex, we might have some other sleeping threads
// that we were supposed to wake up, assign that to the next pointer
DCHECK(next == nullptr);
next = extractAddress<Waiter>((*waiter)->next_);
return false;
}
template <typename Waiter>
bool wait(Waiter* waiter, bool shouldSleep, Waiter*& next) {
if (shouldSleep) {
return doFutexWait(waiter, next);
}
return spin(**waiter);
}
inline void recordTimedWaiterAndClearTimedBit(
bool& timedWaiter,
std::uintptr_t& previous) {
// the previous value in the mutex can never be kTimedWaiter, timed waiters
// always set (kTimedWaiter | kLocked) in the mutex word when they try and
// acquire the mutex
DCHECK(previous != kTimedWaiter);
if (UNLIKELY(previous & kTimedWaiter)) {
// record whether there was a timed waiter in the previous mutex state, and
// clear the timed bit from the previous state
timedWaiter = true;
previous = previous & (~kTimedWaiter);
}
}
template <template <typename> class Atomic, bool TimePublishing>
typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy
DistributedMutex<Atomic, TimePublishing>::lock() {
// first try and acquire the lock as a fast path, the underlying
// implementation is slightly faster than using std::atomic::exchange() as
// is used in this function. So we get a small perf boost in the
// uncontended case
if (auto state = try_lock()) {
return state;
}
auto previous = std::uintptr_t{0};
auto waitMode = kUninitialized;
auto nextWaitMode = kAboutToWait;
auto timedWaiter = false;
CachelinePadded<Waiter<Atomic>>* nextSleeper = nullptr;
while (true) {
// construct the state needed to wait
auto&& state = CachelinePadded<Waiter<Atomic>>{waitMode};
auto&& address = reinterpret_cast<std::uintptr_t>(&state);
DCHECK(!(address & 0b1));
// set the locked bit in the address we will be persisting in the mutex
address |= kLocked;
// attempt to acquire the mutex, mutex acquisition is successful if the
// previous value is zeroed out
//
// we use memory_order_acq_rel here because we want the read-modify-write
// operation to be both acquire and release. Acquire becasue if this is a
// successful lock acquisition, we want to acquire state any other thread
// has released from a prior unlock. We want release semantics becasue
// other threads that read the address of this value should see the full
// well-initialized node we are going to wait on if the mutex acquisition
// was unsuccessful
previous = state_.exchange(address, std::memory_order_acq_rel);
recordTimedWaiterAndClearTimedBit(timedWaiter, previous);
state->next_ = previous;
if (previous == kUnlocked) {
return {nullptr, address, timedWaiter, {}, nullptr, nextSleeper};
}
DCHECK(previous & kLocked);
// wait until we get a signal from another thread, if this returns false,
// we got skipped and had probably been scheduled out, so try again
if (!wait(&state, (waitMode == kAboutToWait), nextSleeper)) {
std::swap(waitMode, nextWaitMode);
continue;
}
// at this point it is safe to access the other fields in the waiter state,
// since the thread that woke us up is gone and nobody will be touching this
// state again, note that this requires memory ordering, and this is why we
// use memory_order_acquire (among other reasons) in the above wait
//
// first we see if the value we took off the mutex state was the thread that
// initated the wakeups, if so, we are the terminal node of the current
// contention chain. If we are the terminal node, then we should expect to
// see a kLocked in the mutex state when we unlock, if we see that, we can
// commit the unlock to the centralized mutex state. If not, we need to
// continue wakeups
//
// a nice consequence of passing kLocked as the current address if we are
// the terminal node is that it naturally just works with the algorithm. If
// we get a contention chain when coming out of a contention chain, the tail
// of the new contention chain will have kLocked set as the previous, which,
// as it happens "just works", since we have now established a recursive
// relationship until broken
auto next = previous;
auto expected = address;
if (previous == state->wakerMetadata_.waker_) {
next = 0;
expected = kLocked;
}
// if we are just coming out of a futex call, then it means that the next
// waiter we are responsible for is also a waiter waiting on a futex, so
// we return that list in the list of ready threads. We wlil be waking up
// the ready threads on unlock no matter what
return {extractAddress<CachelinePadded<Waiter<Atomic>>>(next),
expected,
timedWaiter,
state->wakerMetadata_,
extractAddress<CachelinePadded<Waiter<Atomic>>>(state->waiters_),
nextSleeper};
}
}
inline bool preempted(std::uint64_t value) {
auto currentTime = recover(strip(time()));
auto nodeTime = recover(value);
auto preempted = currentTime > nodeTime + kScheduledAwaySpinThreshold.count();
// we say that the thread has been preempted if its timestamp says so, and
// also if it is neither uninitialized nor skipped
DCHECK(value != kSkipped);
return (preempted) && (value != kUninitialized);
}
inline bool isSleeper(std::uintptr_t value) {
return (value == kAboutToWait);
}
template <typename Waiter>
std::uintptr_t tryWake(
bool publishing,
Waiter* waiter,
std::uintptr_t value,
WakerMetadata metadata,
Waiter*& sleepers) {
// first we see if we can wake the current thread that is spinning
if ((!publishing || !preempted(value)) && !isSleeper(value)) {
// we need release here because of the write to wakerMetadata_
(*waiter)->wakerMetadata_ = metadata;
(*waiter)->waiters_ = reinterpret_cast<std::uintptr_t>(sleepers);
(*waiter)->futex_.store(kWake, std::memory_order_release);
return 0;
}
// if the thread is not a sleeper, and we were not able to catch it before
// preemption, we can just return a false, it is safe to read next_ because
// the thread was preempted. Preemption signals can only come after the
// thread has set the next_ pointer, since the timestamp writes only start
// occurring after that point
//
// if a thread was preempted it must have stored next_ in the waiter struct,
// as the store to futex_ that resets the value from kUninitialized happens
// after the write to next
CHECK(publishing);
if (!isSleeper(value)) {
// go on to the next one
//
// Also, we need a memory_order_release here to prevent missed wakeups. A
// missed wakeup here can happen when we see that a thread had been
// preempted and skip it. Then go on to release the lock, and then when
// the thread which got skipped does an exchange on the central storage,
// still sees the locked bit, and never gets woken up
//
// Can we relax this?
DCHECK(preempted(value));
auto next = (*waiter)->next_;
(*waiter)->futex_.store(kSkipped, std::memory_order_release);
return next;
}
// if we are here the thread is a sleeper
//
// we attempt to catch the thread before it goes to futex(). If we are able
// to catch the thread before it sleeps on a futex, we are done, and don't
// need to go any further
//
// if we are not able to catch the thread before it goes to futex, we
// collect the current thread in the list of sleeping threads represented by
// sleepers, and return the next thread in the list and return false along
// with the previous next value
//
// it is safe to read the next_ pointer in the waiter struct if we were
// unable to catch the thread before it went to futex() because we use
// acquire-release ordering for the exchange operation below. And if we see
// that the thread was already sleeping, we have synchronized with the write
// to next_ in the context of the sleeping thread
//
// Also we need to set the value of waiters_ and wakerMetadata_ in the
// thread before doing the exchange because we need to pass on the list of
// sleepers in the event that we were able to catch the thread before it
// went to futex(). If we were unable to catch the thread before it slept,
// these fields will be ignored when the thread wakes up anyway
DCHECK(isSleeper(value));
(*waiter)->wakerMetadata_ = metadata;
(*waiter)->waiters_ = reinterpret_cast<std::uintptr_t>(sleepers);
auto pre = (*waiter)->sleeper_.exchange(kSleeping, std::memory_order_acq_rel);
// we were able to catch the thread before it went to sleep, return true
if (pre != kSleeping) {
return 0;
}
// otherwise return false, with the value of next_, it is safe to read next
// because of the same logic as when a thread was preempted
//
// we also need to collect this sleeper in the list of sleepers being built
// up
auto next = (*waiter)->next_;
(*waiter)->next_ = reinterpret_cast<std::uintptr_t>(sleepers);
sleepers = waiter;
return next;
}
template <typename Waiter>
bool wake(
bool publishing,
Waiter& waiter,
WakerMetadata metadata,
Waiter*& sleepers) {
// loop till we find a node that is either at the end of the list (as
// specified by metadata) or we find a node that is active (as specified by
// the last published timestamp of the node)
auto current = &waiter;
while (current) {
auto value = (*current)->futex_.load(std::memory_order_acquire);
auto next = tryWake(publishing, current, value, metadata, sleepers);
if (!next) {
return true;
}
// we need to read the value of the next node in the list before skipping
// it, this is because after we skip it the node might wake up and enqueue
// itself, and thereby gain a new next node
CHECK(publishing);
current =
(next == metadata.waker_) ? nullptr : extractAddress<Waiter>(next);
}
return false;
}
template <typename Atomic>
void wakeTimedWaiters(Atomic* state, bool timedWaiters) {
if (UNLIKELY(timedWaiters)) {
atomic_notify_one(state);
}
}
template <typename Atomic, typename Proxy, typename Sleepers>
bool tryUnlockClean(Atomic& state, Proxy& proxy, Sleepers sleepers) {
auto expected = proxy.expected_;
while (true) {
if (state.compare_exchange_strong(
expected,
kUnlocked,
std::memory_order_release,
std::memory_order_relaxed)) {
// if we were able to commit an unlocked, we need to wake up the futex
// waiters, if any
doFutexWake(sleepers);
return true;
}
// if we failed the compare_exchange_strong() above, we check to see if
// the failure was because of the presence of a timed waiter. If that
// was the case then we try one more time with the kTimedWaiter bit set
if (UNLIKELY(expected == (proxy.expected_ | kTimedWaiter))) {
proxy.timedWaiters_ = true;
continue;
}
// otherwise break, we have a contention chain
return false;
}
}
template <template <typename> class Atomic, bool Publish>
void DistributedMutex<Atomic, Publish>::unlock(
DistributedMutex::DistributedMutexStateProxy proxy) {
// we always wake up ready threads and timed waiters if we saw either
DCHECK(proxy) << "Invalid proxy passed to DistributedMutex::unlock()";
SCOPE_EXIT {
doFutexWake(proxy.ready_);
wakeTimedWaiters(&state_, proxy.timedWaiters_);
};
// if there is a wait queue we are responsible for, try and start wakeups,
// don't bother with the mutex state
auto sleepers = proxy.waiters_;
if (proxy.next_) {
if (wake(Publish, *proxy.next_, proxy.wakerMetadata_, sleepers)) {
return;
}
// At this point, if are in the if statement, we were not the terminal
// node of the wakeup chain. Terminal nodes have the next_ pointer set to
// null in lock()
//
// So we need to pretend we were the end of the contention chain. Coming
// out of a contention chain always has the kLocked state set in the
// mutex. Unless there is another contention chain lined up, which does
// not matter since we are the terminal node anyway
proxy.expected_ = kLocked;
}
while (true) {
// otherwise, since we don't have anyone we need to wake up, we try and
// release the mutex just as is
//
// if this is successful, we can return, the unlock was successful, we have
// committed a nice kUnlocked to the central storage, yay
if (tryUnlockClean(state_, proxy, sleepers)) {
return;
}
// here we have a contention chain built up on the mutex. We grab the
// wait queue and start executing wakeups. We leave a locked bit on the
// centralized storage and handoff control to the head of the queue
//
// we use memory_order_acq_rel here because we want to see the
// full well-initialized node that the other thread is waiting on
//
// If we are unable to wake the contention chain, it is possible that when
// we come back to looping here, a new contention chain will form. In
// that case we need to use kLocked as the waker_ value because the
// terminal node of the new chain will see kLocked in the central storage
auto head = state_.exchange(kLocked, std::memory_order_acq_rel);
recordTimedWaiterAndClearTimedBit(proxy.timedWaiters_, head);
auto next = extractAddress<CachelinePadded<Waiter<Atomic>>>(head);
DCHECK((head & kLocked) && (head != kLocked)) << "incorrect state " << head;
if (wake(Publish, *next, {exchange(proxy.expected_, kLocked)}, sleepers)) {
break;
}
}
}
template <template <typename> class Atomic, bool TimePublishing>
typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy
DistributedMutex<Atomic, TimePublishing>::try_lock() {
// Try and set the least significant bit of the centralized lock state to 1,
// indicating locked.
//
// If this succeeds, it must have been the case that we had a kUnlocked (or
// 0) in the centralized storage before, since that is the only case where a
// 0 can be found. So we assert that in debug mode
//
// If this fails, then it is a no-op
auto previous = atomic_fetch_set(state_, 0, std::memory_order_acquire);
if (!previous) {
return {nullptr, kLocked};
}
return {nullptr, 0};
}
template <typename Atomic, typename Deadline, typename MakeProxy>
auto timedLock(Atomic& state, Deadline deadline, MakeProxy proxy) {
while (true) {
// we put a bit on the central state to show that there is a timed waiter
// and go to sleep on the central state
//
// when this thread goes to unlock the mutex, it will expect a 0b1 in the
// mutex state (0b1, not 0b11), but then it will see that the value in the
// mutex state is 0b11 and not 0b1, meaning that there might have been
// another timed waiter. Even though there might not have been another
// timed waiter in the time being. This sort of missed wakeup is
// desirable for timed waiters; it helps avoid thundering herds of timed
// waiters. Because the mutex is packed in 8 bytes, and we need an
// address to be stored in those 8 bytes, we don't have much room to play
// with. The only other solution is to issue a futexWake(INT_MAX) to wake
// up all waiters when a clean unlock is committed, when a thread saw a
// timed waiter in the mutex previously.
//
// putting a 0b11 here works for a set of reasons that is a superset of
// the set of reasons that make it okay to put a kLocked (0b1) in the
// mutex state. Now that the thread has put (kTimedWaiter | kLocked)
// (0b11) in the mutex state and it expects a kLocked (0b1), there are two
// scenarios possible. The first being when there is no contention chain
// formation in the mutex from the time a timed waiter got a lock to
// unlock. In this case, the unlocker sees a 0b11 in the mutex state,
// adjusts to the presence of a timed waiter and cleanly unlocks with a
// kUnlocked (0b0). The second is when there is a contention chain.
// When a thread puts its address in the mutex and sees the timed bit, it
// records the presence of a timed waiter, and then pretends as if it
// hadn't seen the timed bit. So future contention chain releases, will
// terminate with a kLocked (0b1) and not a (kLocked | kTimedWaiter)
// (0b11). This just works naturally with the rest of the algorithm
// without incurring a perf hit for the regular non-timed case
//
// this strategy does however mean, that when threads try to acquire the
// mutex and all time out, there will be a wasteful syscall to issue wakeups
// to waiting threads. We don't do anything to try and minimize this
//
// we need to use a fetch_or() here because we need to convey two bits of
// information - 1, whether the mutex is locked or not, and 2, whether
// there is a timed waiter. The alternative here is to use the second bit
// to convey information only, we can use a fetch_set() on the second bit
// to make this faster, but that comes at the expense of requiring regular
// fast path lock attempts. Which use a single bit read-modify-write for
// better performance
auto data = kTimedWaiter | kLocked;
auto previous = state.fetch_or(data, std::memory_order_acquire);
if (!(previous & 0b1)) {
DCHECK(!previous);
return proxy(nullptr, kLocked, true);
}
// wait on the futex until signalled, if we get a timeout, the try_lock
// fails
auto result = atomic_wait_until(&state, previous | data, deadline);
if (result == std::cv_status::timeout) {
return proxy(nullptr, std::uintptr_t{0}, false);
}
}
}
template <template <typename> class Atomic, bool TimePublishing>
template <typename Clock, typename Duration>
typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy
DistributedMutex<Atomic, TimePublishing>::try_lock_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
// fast path for the uncontended case
//
// we get the time after trying to acquire the mutex because in the
// uncontended case, the price of getting the time is about 1/3 of the
// actual mutex acquisition. So we only pay the price of that extra bit of
// latency when needed
//
// this is even higher when VDSO is involved on architectures that do not
// offer a direct interface to the timestamp counter
if (auto state = try_lock()) {
return state;
}
// fall back to the timed locking algorithm
using Proxy = DistributedMutexStateProxy;
return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; });
}
template <template <typename> class Atomic, bool TimePublishing>
template <typename Rep, typename Period>
typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy
DistributedMutex<Atomic, TimePublishing>::try_lock_for(
const std::chrono::duration<Rep, Period>& duration) {
// fast path for the uncontended case. Reasoning for doing this here is the
// same as in try_lock_until()
if (auto state = try_lock()) {
return state;
}
// fall back to the timed locking algorithm
using Proxy = DistributedMutexStateProxy;
auto deadline = std::chrono::steady_clock::now() + duration;
return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; });
}
} // namespace distributed_mutex
} // namespace detail
} // namespace folly
/*
* Copyright 2004-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdint>
namespace folly {
namespace detail {
namespace distributed_mutex {
/**
* DistributedMutex is a small, exclusive-only mutex that distributes the
* bookkeeping required for mutual exclusion in the stacks of threads that are
* contending for it. It tries to come at a lower space cost than std::mutex
* while still trying to maintain the fairness benefits that come from using
* std::mutex. DistributedMutex provides the entire API included in
* std::mutex, and more, with slight modifications. DistributedMutex is the
* same width as a single pointer (8 bytes on most platforms), where on the
* other hand, std::mutex and pthread_mutex_t are both 40 bytes. It is larger
* than some of the other smaller locks, but the wide majority of cases using
* the small locks are wasting the difference in alignment padding anyway
*
* Benchmark results are good - at the time of writing in the common
* uncontended case, it is 30% faster than some of the other small mutexes in
* folly and as fast as std::mutex, which recently optimized its uncontended
* path. In the contended case, it is about 4-5x faster than some of the
* smaller locks in folly, ~2x faster than std::mutex in clang and ~1.8x
* faster in gcc. DistributedMutex is also resistent to tail latency
* pathalogies unlike many of the other small mutexes. Which sleep for large
* time quantums to reduce spin churn, this causes elevated latencies for
* threads that enter the sleep cycle. The tail latency of lock acquisition
* on average up to 10x better with DistributedMutex
*
* DistributedMutex reduces cache line contention by making each thread wait
* on a thread local spinlock and futex. This allows threads to keep working
* only on their own cache lines without requiring cache coherence operations
* when a mutex heavy contention. This strategy does not require sequential
* ordering on the centralized atomic storage for wakeup operations as each
* thread assigned its own wait state
*
* Non-timed mutex acquisitions are scheduled through intrusive LIFO
* contention chains. Each thread starts by spinning for a short quantum and
* falls back to two phased sleeping. Enqueue operations are lock free and
* are piggybacked off mutex acquisition attempts. The LIFO behavior of a
* contention chain is good in the case where the mutex is held for a short
* amount of time, as the head of the chain is likely to not have slept on
* futex() after exhausting its spin quantum. This allow us to avoid
* unnecessary traversal and syscalls in the fast path with a higher
* probability. Even though the contention chains are LIFO, the mutex itself
* does not adhere to that scheduling policy globally. During contention,
* threads that fail to lock the mutex form a LIFO chain on the central mutex
* state, this chain is broken when a wakeup is scheduled, and future enqueue
* operations form a new chain. This makes the chains themselves LIFO, but
* preserves global fairness through a constant factor which is limited to the
* number of concurrent failed mutex acquisition attempts. This binds the
* last in first out behavior to the number of contending threads and helps
* prevent starvation and latency outliers
*
* This strategy of waking up wakers one by one in a queue does not scale well
* when the number of threads goes past the number of cores. At which point
* preemption causes elevated lock acquisition latencies. DistributedMutex
* implements a hardware timestamp publishing heuristic to detect and adapt to
* preemption.
*
* DistributedMutex does not have the typical mutex API - it does not satisfy
* the Lockable concept. It requires the user to maintain ephemeral
* bookkeeping and pass that bookkeeping around to unlock() calls. The API
* overhead, however, comes for free when you wrap this mutex for usage with
* folly::Synchronized or std::unique_lock, which is the recommended usage
* (std::lock_guard, in optimized mode, has no performance benefit over
* std::unique_lock, so has been omitted). A benefit of this API is that it
* disallows incorrect usage where a thread unlocks a mutex that it does not
* own, thinking a mutex is functionally identical to a binary semaphore,
* which, unlike a mutex, is a suitable primitive for that usage
*
* Timed locking through DistributedMutex is implemented through a centralized
* algorithm - all waiters wait on the central mutex state, by setting and
* resetting bits within the pointer-length word. Since pointer length atomic
* integers are incompatible with futex(FUTEX_WAIT) on most systems, a
* non-standard implementation of futex() is used, where wait queues are
* managed in user-space. See p1135r0 and folly::ParkingLot
*/
template <
template <typename> class Atomic = std::atomic,
bool TimePublishing = true>
class DistributedMutex {
public:
class DistributedMutexStateProxy;
/**
* DistributedMutex is only default constructible, it can neither be moved
* nor copied
*/
DistributedMutex();
DistributedMutex(DistributedMutex&&) = delete;
DistributedMutex(const DistributedMutex&) = delete;
DistributedMutex& operator=(DistributedMutex&&) = delete;
DistributedMutex& operator=(const DistributedMutex&) = delete;
/**
* Acquires the mutex in exclusive mode
*
* This returns an ephemeral proxy that contains internal mutex state. This
* must be kept around for the duration of the critical section and passed
* subsequently to unlock() as an rvalue
*
* The proxy has no public API and is intended to be for internal usage only
*
* This is not a recursive mutex - trying to acquire the mutex twice from
* the same thread without unlocking it results in undefined behavior
*/
DistributedMutexStateProxy lock();
/**
* Unlocks the mutex
*
* The proxy returned by lock must be passed to unlock as an rvalue. No
* other option is possible here, since the proxy is only movable and not
* copyable
*/
void unlock(DistributedMutexStateProxy);
/**
* Try to acquire the mutex
*
* A non blocking version of the lock() function. The returned object is
* contextually convertible to bool. And has the value true when the mutex
* was successfully acquired, false otherwise
*
* This is allowed to return false spuriously, i.e. this is not guaranteed
* to return true even when the mutex is currently unlocked. In the event
* of a failed acquisition, this does not impose any memory ordering
* constraints for other threads
*/
DistributedMutexStateProxy try_lock();
/**
* Try to acquire the mutex, blocking for the given time
*
* Like try_lock(), this is allowed to fail spuriously and is not guaranteed
* to return false even when the mutex is currently unlocked. But only
* after the given time has elapsed
*
* try_lock_for() accepts a duration to block for, and try_lock_until()
* accepts an absolute wall clock time point
*/
template <typename Rep, typename Period>
DistributedMutexStateProxy try_lock_for(
const std::chrono::duration<Rep, Period>& duration);
/**
* Try to acquire the lock, blocking until the given deadline
*
* Other than the difference in the meaning of the second argument, the
* semantics of this function are identical to try_lock_for()
*/
template <typename Clock, typename Duration>
DistributedMutexStateProxy try_lock_until(
const std::chrono::time_point<Clock, Duration>& deadline);
private:
Atomic<std::uintptr_t> state_{0};
};
} // namespace distributed_mutex
} // namespace detail
/**
* Bring the default instantiation of DistributedMutex into the folly
* namespace without requiring any template arguments for public usage
*/
using DistributedMutex = detail::distributed_mutex::DistributedMutex<>;
} // namespace folly
#include <folly/synchronization/DistributedMutex-inl.h>
......@@ -44,11 +44,7 @@ class Sleeper {
public:
Sleeper() : spinCount(0) {}
void wait() {
if (spinCount < kMaxActiveSpin) {
++spinCount;
asm_volatile_pause();
} else {
static void sleep() {
/*
* Always sleep 0.5ms, assuming this will make the kernel put
* us down for whatever its minimum timer resolution is (in
......@@ -57,6 +53,14 @@ class Sleeper {
struct timespec ts = {0, 500000};
nanosleep(&ts, nullptr);
}
void wait() {
if (spinCount < kMaxActiveSpin) {
++spinCount;
asm_volatile_pause();
} else {
sleep();
}
}
};
......
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/synchronization/DistributedMutex.h>
#include <folly/MapUtil.h>
#include <folly/Synchronized.h>
#include <folly/container/Foreach.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
#include <folly/test/DeterministicSchedule.h>
#include <chrono>
#include <thread>
using namespace std::literals;
namespace folly {
namespace test {
/**
* Like DeterministicSchedule, but allows setting callbacks that can be run
* for the current thread when an atomic access occurs, and after. This
* allows us to construct thread interleavings by hand
*
* Constructing a ManualSchedule is required to ensure that we maintain
* per-test state for threads
*
* This can also be used to order thread movement, as an alternative to
* maintaining condition variables and/or semaphores for the purposes of
* testing, for example
*
* auto one = std::thread{[&]() {
* schedule.wait(1);
* two();
* schedule.post(2);
* }};
*
* auto two = std::thread{[&]() {
* one();
* schedule.post(1);
* schedule.wait(2);
* three();
* }};
*
* The code above is guaranteed to call one(), then two(), and then three()
*/
class ManualSchedule {
public:
ManualSchedule() = default;
~ManualSchedule() {
// delete this schedule from the global map
auto schedules = schedules_.wlock();
for_each(*schedules, [&](auto& schedule, auto, auto iter) {
if (schedule.second == this) {
schedules->erase(iter);
}
});
}
/**
* These will be invoked by DeterministicAtomic to signal atomic access
* before and after the operation
*/
static void beforeSharedAccess() {
if (folly::kIsDebug) {
auto id = std::this_thread::get_id();
// get the schedule assigned for the current thread, if one exists,
// otherwise proceed as normal
auto schedule = get_ptr(*schedules_.wlock(), id);
if (!schedule) {
return;
}
// now try and get the callbacks for this thread, if there is a callback
// registered for the test, it must mean that we have a callback
auto callback = get_ptr((*(*schedule)->callbacks_.wlock()), id);
if (!callback) {
return;
}
(*callback)();
}
}
static void afterSharedAccess(bool) {
beforeSharedAccess();
}
/**
* Set a callback that will be called on every subsequent atomic access.
* This will be invoked before and after every atomic access, for the thread
* that called setCallback
*/
void setCallback(std::function<void()> callback) {
schedules_.wlock()->insert({std::this_thread::get_id(), this});
callbacks_.wlock()->insert({std::this_thread::get_id(), callback});
}
/**
* Delete the callback set for this thread on atomic accesses
*/
void removeCallbacks() {
callbacks_.wlock()->erase(std::this_thread::get_id());
}
/**
* wait() and post() for easy testing
*/
void wait(int id) {
if (folly::kIsDebug) {
auto& baton = (*batons_.wlock())[id];
baton.wait();
}
}
void post(int id) {
if (folly::kIsDebug) {
auto& baton = (*batons_.wlock())[id];
baton.post();
}
}
private:
// the map of threads to the schedule started for that test
static Synchronized<std::unordered_map<std::thread::id, ManualSchedule*>>
schedules_;
// the map of callbacks to be executed for a thread's atomic accesses
Synchronized<std::unordered_map<std::thread::id, std::function<void()>>>
callbacks_;
// batons for testing, this map will only ever be written to, so it is safe
// to hold references outside lock
Synchronized<std::unordered_map<int, folly::Baton<>>> batons_;
};
Synchronized<std::unordered_map<std::thread::id, ManualSchedule*>>
ManualSchedule::schedules_;
template <typename T>
using ManualAtomic = test::DeterministicAtomicImpl<T, ManualSchedule>;
template <template <typename> class Atomic>
using TestDistributedMutex =
detail::distributed_mutex::DistributedMutex<Atomic, false>;
/**
* Futex extensions for ManualAtomic
*
* Note that doing nothing in these should still result in a program that is
* well defined, since futex wait calls should be tolerant to spurious wakeups
*/
int futexWakeImpl(const detail::Futex<ManualAtomic>*, int, uint32_t) {
ManualSchedule::beforeSharedAccess();
return 1;
}
detail::FutexResult futexWaitImpl(
const detail::Futex<ManualAtomic>*,
uint32_t,
std::chrono::system_clock::time_point const*,
std::chrono::steady_clock::time_point const*,
uint32_t) {
ManualSchedule::beforeSharedAccess();
return detail::FutexResult::AWOKEN;
}
template <typename Clock, typename Duration>
std::cv_status atomic_wait_until(
const ManualAtomic<std::uintptr_t>*,
std::uintptr_t,
const std::chrono::time_point<Clock, Duration>&) {
ManualSchedule::beforeSharedAccess();
return std::cv_status::no_timeout;
}
void atomic_notify_one(const ManualAtomic<std::uintptr_t>*) {
ManualSchedule::beforeSharedAccess();
}
} // namespace test
namespace {
DEFINE_int32(stress_factor, 1000, "The stress test factor for tests");
constexpr auto kForever = 100h;
using DSched = test::DeterministicSchedule;
int sum(int n) {
return (n * (n + 1)) / 2;
}
template <template <typename> class Atom = std::atomic>
void basicNThreads(int numThreads, int iterations = FLAGS_stress_factor) {
auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
auto&& barrier = std::atomic<int>{0};
auto&& threads = std::vector<std::thread>{};
auto&& result = std::vector<int>{};
auto&& function = [&](auto id) {
return [&, id] {
for (auto j = 0; j < iterations; ++j) {
auto state = mutex.lock();
EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
result.push_back(id);
EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
mutex.unlock(std::move(state));
}
};
};
for (auto i = 1; i <= numThreads; ++i) {
threads.push_back(DSched::thread(function(i)));
}
for (auto& thread : threads) {
DSched::join(thread);
}
auto total = 0;
for (auto value : result) {
total += value;
}
EXPECT_EQ(total, sum(numThreads) * iterations);
}
} // namespace
TEST(DistributedMutex, InternalDetailTestOne) {
auto value = 0;
auto ptr = reinterpret_cast<std::uintptr_t>(&value);
EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
ptr = ptr | 0b1;
EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
}
TEST(DistributedMutex, Basic) {
auto&& mutex = DistributedMutex{};
auto state = mutex.lock();
mutex.unlock(std::move(state));
}
TEST(DistributedMutex, BasicTryLock) {
auto&& mutex = DistributedMutex{};
while (true) {
auto state = mutex.try_lock();
if (state) {
mutex.unlock(std::move(state));
break;
}
}
}
TEST(DistributedMutex, TestSingleElementContentionChain) {
using namespace folly::detail;
// Acquire the mutex once, let another thread form a contention chain on the
// mutex, and then release it. Observe the other thread grab the lock
auto&& schedule = test::ManualSchedule{};
auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
auto&& waiter = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(1);
}
++i;
});
schedule.wait(0);
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
// lock the mutex, signal the waiter, and then wait till the first thread
// has gotten on the wait list
auto state = mutex.lock();
schedule.post(0);
schedule.wait(1);
// release the mutex, and then wait for the waiter to acquire the lock
mutex.unlock(std::move(state));
waiter.join();
}
TEST(DistributedMutex, TestTwoElementContentionChain) {
using namespace folly::detail;
// Acquire the mutex once, let another thread form a contention chain on the
// mutex, and then release it. Observe the other thread grab the lock
auto&& schedule = test::ManualSchedule{};
auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
auto&& one = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(3);
}
++i;
});
schedule.wait(0);
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
auto&& two = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(2);
}
++i;
});
schedule.wait(1);
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
// lock the mutex, signal the waiter, and then wait till the first thread
// has gotten on the wait list
auto state = mutex.lock();
schedule.post(0);
schedule.post(1);
schedule.wait(2);
schedule.wait(3);
// release the mutex, and then wait for the waiter to acquire the lock
mutex.unlock(std::move(state));
one.join();
two.join();
}
TEST(DistributedMutex, TestTwoContentionChains) {
using namespace folly::detail;
auto&& schedule = test::ManualSchedule{};
auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
auto&& one = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(0);
}
++i;
});
schedule.wait(1);
auto state = mutex.lock();
schedule.wait(4);
mutex.unlock(std::move(state));
}};
auto&& two = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(2);
}
++i;
});
schedule.wait(3);
auto state = mutex.lock();
schedule.wait(5);
mutex.unlock(std::move(state));
}};
auto state = mutex.lock();
schedule.post(1);
schedule.post(3);
schedule.wait(0);
schedule.wait(2);
// at this point there is one contention chain. Release it
mutex.unlock(std::move(state));
// then start a new contention chain
auto&& three = std::thread{[&]() {
schedule.setCallback([&, i = 0]() mutable {
if (i == 2) {
schedule.post(4);
schedule.post(5);
}
++i;
});
auto lockState = mutex.lock();
schedule.post(6);
mutex.unlock(std::move(lockState));
}};
// wait for the third thread to pick up the lock
schedule.wait(6);
one.join();
two.join();
three.join();
}
TEST(DistributedMutex, StressTwoThreads) {
basicNThreads(2);
}
TEST(DistributedMutex, StressThreeThreads) {
basicNThreads(3);
}
TEST(DistributedMutex, StressFourThreads) {
basicNThreads(4);
}
TEST(DistributedMutex, StressFiveThreads) {
basicNThreads(5);
}
TEST(DistributedMutex, StressSixThreads) {
basicNThreads(6);
}
TEST(DistributedMutex, StressSevenThreads) {
basicNThreads(7);
}
TEST(DistributedMutex, StressEightThreads) {
basicNThreads(8);
}
TEST(DistributedMutex, StressSixteenThreads) {
basicNThreads(16);
}
TEST(DistributedMutex, StressThirtyTwoThreads) {
basicNThreads(32);
}
TEST(DistributedMutex, StressSixtyFourThreads) {
basicNThreads(64);
}
TEST(DistributedMutex, StressHundredThreads) {
basicNThreads(100);
}
TEST(DistributedMutex, StressHardwareConcurrencyThreads) {
basicNThreads(std::thread::hardware_concurrency());
}
TEST(DistributedMutex, StressTryLock) {
auto&& mutex = DistributedMutex{};
for (auto i = 0; i < FLAGS_stress_factor; ++i) {
while (true) {
auto state = mutex.try_lock();
if (state) {
mutex.unlock(std::move(state));
break;
}
}
}
}
namespace {
constexpr auto numIterationsDeterministicTest(int threads) {
if (threads <= 8) {
return 100;
}
return 10;
}
void runBasicNThreadsDeterministic(int threads, int iterations) {
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
basicNThreads<test::DeterministicAtomic>(threads, iterations);
static_cast<void>(schedule);
}
}
} // namespace
TEST(DistributedMutex, DeterministicStressTwoThreads) {
runBasicNThreadsDeterministic(2, numIterationsDeterministicTest(2));
}
TEST(DistributedMutex, DeterministicStressFourThreads) {
runBasicNThreadsDeterministic(4, numIterationsDeterministicTest(4));
}
TEST(DistributedMutex, DeterministicStressEightThreads) {
runBasicNThreadsDeterministic(8, numIterationsDeterministicTest(8));
}
TEST(DistributedMutex, DeterministicStressSixteenThreads) {
runBasicNThreadsDeterministic(16, numIterationsDeterministicTest(16));
}
TEST(DistributedMutex, DeterministicStressThirtyTwoThreads) {
runBasicNThreadsDeterministic(32, numIterationsDeterministicTest(32));
}
TEST(DistributedMutex, TimedLockTimeout) {
auto&& mutex = DistributedMutex{};
auto&& start = folly::Baton<>{};
auto&& done = folly::Baton<>{};
auto thread = std::thread{[&]() {
auto state = mutex.lock();
start.post();
done.wait();
mutex.unlock(std::move(state));
}};
start.wait();
auto result = mutex.try_lock_for(10ms);
EXPECT_FALSE(result);
done.post();
thread.join();
}
TEST(DistributedMutex, TimedLockAcquireAfterUnlock) {
auto&& mutex = DistributedMutex{};
auto&& start = folly::Baton<>{};
auto thread = std::thread{[&]() {
auto state = mutex.lock();
start.post();
/* sleep override */
std::this_thread::sleep_for(10ms);
mutex.unlock(std::move(state));
}};
start.wait();
auto result = mutex.try_lock_for(kForever);
EXPECT_TRUE(result);
thread.join();
}
TEST(DistributedMutex, TimedLockAcquireAfterLock) {
auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
auto&& schedule = test::ManualSchedule{};
auto thread = std::thread{[&] {
schedule.setCallback([&, i = 0]() mutable {
if (i == 1) {
schedule.post(0);
schedule.wait(1);
}
// when this thread goes into the atomic_notify_one() we let the other
// thread wake up
if (i == 3) {
schedule.post(2);
}
++i;
});
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
schedule.setCallback([&, i = 0]() mutable {
// allow the other thread to unlock after the current thread has set the
// timed waiter state into the mutex
if (i == 2) {
schedule.post(1);
schedule.wait(2);
}
++i;
});
schedule.wait(0);
auto state = mutex.try_lock_for(kForever);
EXPECT_TRUE(state);
mutex.unlock(std::move(state));
thread.join();
}
TEST(DistributedMutex, TimedLockAcquireAfterContentionChain) {
auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
auto&& schedule = test::ManualSchedule{};
auto one = std::thread{[&] {
schedule.setCallback([&, i = 0]() mutable {
if (i == 1) {
schedule.post(0);
schedule.wait(1);
schedule.wait(2);
}
++i;
});
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
auto two = std::thread{[&] {
schedule.setCallback([&, i = 0]() mutable {
// block the current thread until the first thread has acquired the
// lock
if (i == 0) {
schedule.wait(0);
}
// when the current thread enqueues, let the first thread unlock so we
// get woken up
//
// then wait for the first thread to unlock
if (i == 2) {
schedule.post(1);
}
++i;
});
auto state = mutex.lock();
mutex.unlock(std::move(state));
}};
// make the current thread wait for the first thread to unlock
schedule.setCallback([&, i = 0]() mutable {
// let the first thread unlock after we have enqueued ourselves on the
// mutex
if (i == 2) {
schedule.post(2);
}
++i;
});
auto state = mutex.try_lock_for(kForever);
EXPECT_TRUE(state);
mutex.unlock(std::move(state));
one.join();
two.join();
}
namespace {
template <template <typename> class Atom = std::atomic>
void stressTryLockWithConcurrentLocks(
int numThreads,
int iterations = FLAGS_stress_factor) {
auto&& threads = std::vector<std::thread>{};
auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
auto&& atomic = std::atomic<std::uint64_t>{0};
for (auto i = 0; i < numThreads; ++i) {
threads.push_back(DSched::thread([&] {
for (auto j = 0; j < iterations; ++j) {
auto state = mutex.lock();
EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
mutex.unlock(std::move(state));
}
}));
}
for (auto i = 0; i < iterations; ++i) {
if (auto state = mutex.try_lock()) {
EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
mutex.unlock(std::move(state));
}
}
for (auto& thread : threads) {
DSched::join(thread);
}
}
} // namespace
TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) {
stressTryLockWithConcurrentLocks(2);
}
TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) {
stressTryLockWithConcurrentLocks(4);
}
TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) {
stressTryLockWithConcurrentLocks(8);
}
TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) {
stressTryLockWithConcurrentLocks(16);
}
TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) {
stressTryLockWithConcurrentLocks(32);
}
TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) {
stressTryLockWithConcurrentLocks(64);
}
TEST(DistributedMutex, DeterministicTryLockWithLocksTwoThreads) {
auto iterations = numIterationsDeterministicTest(2);
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockWithFourThreads) {
auto iterations = numIterationsDeterministicTest(4);
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockWithLocksEightThreads) {
auto iterations = numIterationsDeterministicTest(8);
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockWithLocksSixteenThreads) {
auto iterations = numIterationsDeterministicTest(16);
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockWithLocksThirtyTwoThreads) {
auto iterations = numIterationsDeterministicTest(32);
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockWithLocksSixtyFourThreads) {
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
static_cast<void>(schedule);
}
}
namespace {
template <template <typename> class Atom = std::atomic>
void concurrentTryLocks(int numThreads, int iterations = FLAGS_stress_factor) {
auto&& threads = std::vector<std::thread>{};
auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
auto&& atomic = std::atomic<std::uint64_t>{0};
for (auto i = 0; i < numThreads; ++i) {
threads.push_back(DSched::thread([&] {
for (auto j = 0; j < iterations; ++j) {
if (auto state = mutex.try_lock()) {
EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
mutex.unlock(std::move(state));
}
}
}));
}
for (auto& thread : threads) {
DSched::join(thread);
}
}
} // namespace
TEST(DistributedMutex, StressTryLockWithTwoThreads) {
concurrentTryLocks(2);
}
TEST(DistributedMutex, StressTryLockFourThreads) {
concurrentTryLocks(4);
}
TEST(DistributedMutex, StressTryLockEightThreads) {
concurrentTryLocks(8);
}
TEST(DistributedMutex, StressTryLockSixteenThreads) {
concurrentTryLocks(16);
}
TEST(DistributedMutex, StressTryLockThirtyTwoThreads) {
concurrentTryLocks(32);
}
TEST(DistributedMutex, StressTryLockSixtyFourThreads) {
concurrentTryLocks(64);
}
TEST(DistributedMutex, DeterministicTryLockTwoThreads) {
auto iterations = numIterationsDeterministicTest(2);
concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockFourThreads) {
auto iterations = numIterationsDeterministicTest(4);
concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockEightThreads) {
auto iterations = numIterationsDeterministicTest(8);
concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockSixteenThreads) {
auto iterations = numIterationsDeterministicTest(16);
concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockThirtyTwoThreads) {
auto iterations = numIterationsDeterministicTest(32);
concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
static_cast<void>(schedule);
}
}
TEST(DistributedMutex, DeterministicTryLockSixtyFourThreads) {
concurrentTryLocks<test::DeterministicAtomic>(64, 5);
for (auto pass = 0; pass < 3; ++pass) {
auto&& schedule = DSched{DSched::uniform(pass)};
concurrentTryLocks<test::DeterministicAtomic>(64, 5);
static_cast<void>(schedule);
}
}
} // namespace folly
......@@ -24,6 +24,8 @@
#include <google/base/spinlock.h>
#include <folly/Benchmark.h>
#include <folly/SharedMutex.h>
#include <folly/synchronization/DistributedMutex.h>
#include <folly/synchronization/SmallLocks.h>
/* "Work cycle" is just an additional nop loop iteration.
......@@ -47,23 +49,36 @@ static void burn(size_t n) {
namespace {
struct SimpleBarrier {
explicit SimpleBarrier(size_t count) : lock_(), cv_(), count_(count) {}
template <typename Mutex>
std::unique_lock<Mutex> lock(Mutex& mutex) {
return std::unique_lock<Mutex>{mutex};
}
template <typename Mutex, typename Other>
void unlock(Mutex&, Other) {}
auto lock(folly::DistributedMutex& mutex) {
return mutex.lock();
}
template <typename State>
void unlock(folly::DistributedMutex& mutex, State state) {
mutex.unlock(std::move(state));
}
struct SimpleBarrier {
explicit SimpleBarrier(int count) : count_(count) {}
void wait() {
std::unique_lock<std::mutex> lockHeld(lock_);
if (++num_ == count_) {
cv_.notify_all();
} else {
cv_.wait(lockHeld, [&]() { return num_ >= count_; });
// we spin for a bit to try and get the kernel to schedule threads on
// different cores
for (auto i = 0; i < 100000; ++i) {
folly::doNotOptimizeAway(i);
}
num_.fetch_add(1);
while (num_.load() != count_) {
}
}
private:
std::mutex lock_;
std::condition_variable cv_;
size_t num_{0};
size_t count_;
std::atomic<int> num_{0};
const int count_;
};
} // namespace
......@@ -106,7 +121,7 @@ static void runContended(size_t numOps, size_t numThreads) {
size_t threadgroups = totalthreads / numThreads;
struct lockstruct {
char padding1[128];
Lock lock;
Lock mutex;
char padding2[128];
long value = 1;
};
......@@ -122,13 +137,13 @@ static void runContended(size_t numOps, size_t numThreads) {
for (size_t t = 0; t < totalthreads; ++t) {
threads[t] = std::thread([&, t] {
lockstruct* lock = &locks[t % threadgroups];
lockstruct* mutex = &locks[t % threadgroups];
runbarrier.wait();
for (size_t op = 0; op < numOps; op += 1) {
lock->lock.lock();
auto state = lock(mutex->mutex);
burn(FLAGS_work);
lock->value++;
lock->lock.unlock();
mutex->value++;
unlock(mutex->mutex, std::move(state));
burn(FLAGS_unlocked_work);
}
});
......@@ -175,7 +190,7 @@ static void runFairness() {
for (size_t t = 0; t < totalthreads; ++t) {
threads[t] = std::thread([&, t] {
lockstruct* lock = &locks[t % threadgroups];
lockstruct* mutex = &locks[t % threadgroups];
long value = 0;
std::chrono::microseconds max(0);
std::chrono::microseconds time(0);
......@@ -184,7 +199,7 @@ static void runFairness() {
while (!stop) {
std::chrono::steady_clock::time_point prelock =
std::chrono::steady_clock::now();
lock->lock.lock();
auto state = lock(mutex->lock);
std::chrono::steady_clock::time_point postlock =
std::chrono::steady_clock::now();
auto diff = std::chrono::duration_cast<std::chrono::microseconds>(
......@@ -196,7 +211,7 @@ static void runFairness() {
}
burn(FLAGS_work);
value++;
lock->lock.unlock();
unlock(mutex->lock, std::move(state));
burn(FLAGS_unlocked_work);
}
{
......@@ -245,47 +260,47 @@ static void runFairness() {
mx.count());
}
BENCHMARK(StdMutexUncontendedBenchmark, iters) {
std::mutex lock;
while (iters--) {
lock.lock();
lock.unlock();
template <typename Mutex>
void runUncontended(std::size_t iters) {
auto&& mutex = Mutex{};
for (auto i = std::size_t{0}; i < iters; ++i) {
auto state = lock(mutex);
unlock(mutex, std::move(state));
}
}
BENCHMARK(StdMutexUncontendedBenchmark, iters) {
runUncontended<std::mutex>(iters);
}
BENCHMARK(GoogleSpinUncontendedBenchmark, iters) {
SpinLock lock;
while (iters--) {
lock.Lock();
lock.Unlock();
}
runUncontended<GoogleSpinLockAdapter>(iters);
}
BENCHMARK(MicroSpinLockUncontendedBenchmark, iters) {
folly::MicroSpinLock lock;
lock.init();
while (iters--) {
lock.lock();
lock.unlock();
}
runUncontended<InitLock<folly::MicroSpinLock>>(iters);
}
BENCHMARK(PicoSpinLockUncontendedBenchmark, iters) {
// uint8_t would be more fair, but PicoSpinLock needs at lesat two bytes
folly::PicoSpinLock<uint16_t> lock;
lock.init();
while (iters--) {
lock.lock();
lock.unlock();
}
runUncontended<InitLock<folly::PicoSpinLock<std::uint16_t>>>(iters);
}
BENCHMARK(MicroLockUncontendedBenchmark, iters) {
folly::MicroLock lock;
lock.init();
runUncontended<InitLock<folly::MicroLock>>(iters);
}
BENCHMARK(SharedMutexUncontendedBenchmark, iters) {
runUncontended<folly::SharedMutex>(iters);
}
BENCHMARK(DistributedMutexUncontendedBenchmark, iters) {
runUncontended<folly::DistributedMutex>(iters);
}
BENCHMARK(AtomicFetchAddUncontendedBenchmark, iters) {
auto&& atomic = std::atomic<uint64_t>{0};
while (iters--) {
lock.lock();
lock.unlock();
folly::doNotOptimizeAway(atomic.fetch_add(1));
}
}
......@@ -334,6 +349,12 @@ static void folly_picospin(size_t numOps, size_t numThreads) {
static void folly_microlock(size_t numOps, size_t numThreads) {
runContended<folly::MicroLock>(numOps, numThreads);
}
static void folly_sharedmutex(size_t numOps, size_t numThreads) {
runContended<folly::SharedMutex>(numOps, numThreads);
}
static void folly_distributedmutex(size_t numOps, size_t numThreads) {
runContended<folly::DistributedMutex>(numOps, numThreads);
}
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 1thread, 1)
......@@ -341,42 +362,64 @@ BENCH_REL(google_spin, 1thread, 1)
BENCH_REL(folly_microspin, 1thread, 1)
BENCH_REL(folly_picospin, 1thread, 1)
BENCH_REL(folly_microlock, 1thread, 1)
BENCH_REL(folly_sharedmutex, 1thread, 1)
BENCH_REL(folly_distributedmutex, 1thread, 1)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 2thread, 2)
BENCH_REL(google_spin, 2thread, 2)
BENCH_REL(folly_microspin, 2thread, 2)
BENCH_REL(folly_picospin, 2thread, 2)
BENCH_REL(folly_microlock, 2thread, 2)
BENCH_REL(folly_sharedmutex, 2thread, 2)
BENCH_REL(folly_distributedmutex, 2thread, 2)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 4thread, 4)
BENCH_REL(google_spin, 4thread, 4)
BENCH_REL(folly_microspin, 4thread, 4)
BENCH_REL(folly_picospin, 4thread, 4)
BENCH_REL(folly_microlock, 4thread, 4)
BENCH_REL(folly_sharedmutex, 4thread, 4)
BENCH_REL(folly_distributedmutex, 4thread, 4)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 8thread, 8)
BENCH_REL(google_spin, 8thread, 8)
BENCH_REL(folly_microspin, 8thread, 8)
BENCH_REL(folly_picospin, 8thread, 8)
BENCH_REL(folly_microlock, 8thread, 8)
BENCH_REL(folly_sharedmutex, 8thread, 8)
BENCH_REL(folly_distributedmutex, 8thread, 8)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 16thread, 16)
BENCH_REL(google_spin, 16thread, 16)
BENCH_REL(folly_microspin, 16thread, 16)
BENCH_REL(folly_picospin, 16thread, 16)
BENCH_REL(folly_microlock, 16thread, 16)
BENCH_REL(folly_sharedmutex, 16thread, 16)
BENCH_REL(folly_distributedmutex, 16thread, 16)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 32thread, 32)
BENCH_REL(google_spin, 32thread, 32)
BENCH_REL(folly_microspin, 32thread, 32)
BENCH_REL(folly_picospin, 32thread, 32)
BENCH_REL(folly_microlock, 32thread, 32)
BENCH_REL(folly_sharedmutex, 32thread, 32)
BENCH_REL(folly_distributedmutex, 32thread, 32)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 64thread, 64)
BENCH_REL(google_spin, 64thread, 64)
BENCH_REL(folly_microspin, 64thread, 64)
BENCH_REL(folly_picospin, 64thread, 64)
BENCH_REL(folly_microlock, 64thread, 64)
BENCH_REL(folly_sharedmutex, 64thread, 64)
BENCH_REL(folly_distributedmutex, 64thread, 64)
BENCHMARK_DRAW_LINE();
BENCH_BASE(std_mutex, 128thread, 128)
BENCH_REL(google_spin, 128thread, 128)
BENCH_REL(folly_microspin, 128thread, 128)
BENCH_REL(folly_picospin, 128thread, 128)
BENCH_REL(folly_microlock, 128thread, 128)
BENCH_REL(folly_sharedmutex, 128thread, 128)
BENCH_REL(folly_distributedmutex, 128thread, 128)
#define FairnessTest(type) \
{ \
......@@ -391,7 +434,9 @@ int main(int argc, char** argv) {
FairnessTest(GoogleSpinLockAdapter);
FairnessTest(InitLock<folly::MicroSpinLock>);
FairnessTest(InitLock<folly::PicoSpinLock<uint16_t>>);
FairnessTest(folly::MicroLock);
FairnessTest(InitLock<folly::MicroLock>);
FairnessTest(folly::SharedMutex);
FairnessTest(folly::DistributedMutex);
folly::runBenchmarks();
......@@ -403,70 +448,101 @@ int main(int argc, char** argv) {
Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
std::mutex:
Sum: 3762980 Mean: 67196 stddev: 916
Lock time stats in us: mean 15 stddev 1141 max 22384
Sum: 3645010 Mean: 65089 stddev: 841
Lock time stats in us: mean 16 stddev 1178 max 21361
GoogleSpinLockAdapter:
Sum: 4263945 Mean: 76141 stddev: 2072
Lock time stats in us: mean 10 stddev 1007 max 10132
Sum: 4329140 Mean: 77306 stddev: 2338
Lock time stats in us: mean 10 stddev 16 max 19860
InitLock<folly::MicroSpinLock>:
Sum: 3469284 Mean: 61951 stddev: 15693
Lock time stats in us: mean 31 stddev 1237 max 334200
Sum: 3513882 Mean: 62747 stddev: 27713
Lock time stats in us: mean 31 stddev 1222 max 211624
InitLock<folly::PicoSpinLock<uint16_t>>:
Sum: 1011034 Mean: 18054 stddev: 3819
Lock time stats in us: mean 108 stddev 4247 max 691707
folly::MicroLock:
Sum: 1712173 Mean: 30574 stddev: 4032
Lock time stats in us: mean 53 stddev 2508 max 11554
Sum: 2182472 Mean: 38972 stddev: 41789
Lock time stats in us: mean 49 stddev 1967 max 228875
InitLock<folly::MicroLock>:
Sum: 1868601 Mean: 33367 stddev: 4836
Lock time stats in us: mean 48 stddev 2298 max 12235
folly::SharedMutex:
Sum: 2037742 Mean: 36388 stddev: 18204
Lock time stats in us: mean 53 stddev 2107 max 132469
folly::DistributedMutex:
Sum: 6793764 Mean: 121317 stddev: 20791
Lock time stats in us: mean 15 stddev 8 max 55696
============================================================================
folly/synchronization/test/SmallLocksBenchmark.cpprelative time/iter iters/s
============================================================================
StdMutexUncontendedBenchmark 16.74ns 59.72M
GoogleSpinUncontendedBenchmark 11.26ns 88.78M
MicroSpinLockUncontendedBenchmark 10.62ns 94.16M
PicoSpinLockUncontendedBenchmark 11.83ns 84.54M
MicroLockUncontendedBenchmark 20.62ns 48.50M
StdMutexUncontendedBenchmark 16.73ns 59.77M
GoogleSpinUncontendedBenchmark 11.26ns 88.80M
MicroSpinLockUncontendedBenchmark 10.06ns 99.44M
PicoSpinLockUncontendedBenchmark 11.25ns 88.89M
MicroLockUncontendedBenchmark 19.20ns 52.09M
SharedMutexUncontendedBenchmark 19.45ns 51.40M
DistributedMutexUncontendedBenchmark 17.02ns 58.75M
AtomicFetchAddUncontendedBenchmark 5.47ns 182.91M
----------------------------------------------------------------------------
----------------------------------------------------------------------------
std_mutex(1thread) 802.21ns 1.25M
google_spin(1thread) 109.81% 730.52ns 1.37M
folly_microspin(1thread) 119.16% 673.22ns 1.49M
folly_picospin(1thread) 119.02% 673.99ns 1.48M
folly_microlock(1thread) 131.67% 609.28ns 1.64M
folly_sharedmutex(1thread) 118.41% 677.46ns 1.48M
folly_distributedmutex(1thread) 100.27% 800.02ns 1.25M
----------------------------------------------------------------------------
std_mutex(1thread) 745.50ns 1.34M
google_spin(1thread) 103.88% 717.66ns 1.39M
folly_microspin(1thread) 102.61% 726.55ns 1.38M
folly_picospin(1thread) 90.78% 821.20ns 1.22M
folly_microlock(1thread) 96.64% 771.42ns 1.30M
std_mutex(2thread) 1.30us 769.21K
google_spin(2thread) 129.59% 1.00us 996.85K
folly_microspin(2thread) 158.13% 822.13ns 1.22M
folly_picospin(2thread) 150.43% 864.23ns 1.16M
folly_microlock(2thread) 144.94% 896.92ns 1.11M
folly_sharedmutex(2thread) 120.36% 1.08us 925.83K
folly_distributedmutex(2thread) 112.98% 1.15us 869.08K
----------------------------------------------------------------------------
std_mutex(2thread) 1.26us 796.48K
google_spin(2thread) 136.08% 922.64ns 1.08M
folly_microspin(2thread) 145.00% 865.87ns 1.15M
folly_picospin(2thread) 163.80% 766.49ns 1.30M
folly_microlock(2thread) 137.96% 910.06ns 1.10M
std_mutex(4thread) 2.36us 424.08K
google_spin(4thread) 120.20% 1.96us 509.75K
folly_microspin(4thread) 109.07% 2.16us 462.53K
folly_picospin(4thread) 113.37% 2.08us 480.78K
folly_microlock(4thread) 83.88% 2.81us 355.71K
folly_sharedmutex(4thread) 90.47% 2.61us 383.65K
folly_distributedmutex(4thread) 121.82% 1.94us 516.63K
----------------------------------------------------------------------------
std_mutex(4thread) 2.16us 462.09K
google_spin(4thread) 107.52% 2.01us 496.84K
folly_microspin(4thread) 103.81% 2.08us 479.71K
folly_picospin(4thread) 105.20% 2.06us 486.14K
folly_microlock(4thread) 77.07% 2.81us 356.15K
std_mutex(8thread) 5.39us 185.64K
google_spin(8thread) 127.72% 4.22us 237.10K
folly_microspin(8thread) 106.70% 5.05us 198.08K
folly_picospin(8thread) 88.02% 6.12us 163.41K
folly_microlock(8thread) 79.78% 6.75us 148.11K
folly_sharedmutex(8thread) 78.25% 6.88us 145.26K
folly_distributedmutex(8thread) 162.74% 3.31us 302.12K
----------------------------------------------------------------------------
std_mutex(8thread) 5.55us 180.33K
google_spin(8thread) 110.44% 5.02us 199.16K
folly_microspin(8thread) 105.13% 5.27us 189.58K
folly_picospin(8thread) 98.81% 5.61us 178.19K
folly_microlock(8thread) 81.95% 6.77us 147.78K
std_mutex(16thread) 11.74us 85.16K
google_spin(16thread) 109.91% 10.68us 93.60K
folly_microspin(16thread) 103.93% 11.30us 88.50K
folly_picospin(16thread) 50.36% 23.32us 42.89K
folly_microlock(16thread) 55.85% 21.03us 47.56K
folly_sharedmutex(16thread) 64.27% 18.27us 54.74K
folly_distributedmutex(16thread) 181.32% 6.48us 154.41K
----------------------------------------------------------------------------
std_mutex(16thread) 11.30us 88.48K
google_spin(16thread) 109.33% 10.34us 96.74K
folly_microspin(16thread) 105.86% 10.68us 93.67K
folly_picospin(16thread) 43.61% 25.92us 38.58K
folly_microlock(16thread) 52.82% 21.40us 46.73K
std_mutex(32thread) 31.56us 31.68K
google_spin(32thread) 95.17% 33.17us 30.15K
folly_microspin(32thread) 100.60% 31.38us 31.87K
folly_picospin(32thread) 31.30% 100.84us 9.92K
folly_microlock(32thread) 55.04% 57.35us 17.44K
folly_sharedmutex(32thread) 65.09% 48.49us 20.62K
folly_distributedmutex(32thread) 177.39% 17.79us 56.20K
----------------------------------------------------------------------------
std_mutex(32thread) 32.24us 31.02K
google_spin(32thread) 100.57% 32.06us 31.19K
folly_microspin(32thread) 102.32% 31.51us 31.73K
folly_picospin(32thread) 36.63% 88.02us 11.36K
folly_microlock(32thread) 57.61% 55.97us 17.87K
std_mutex(64thread) 39.90us 25.06K
google_spin(64thread) 110.92% 35.98us 27.80K
folly_microspin(64thread) 105.98% 37.65us 26.56K
folly_picospin(64thread) 33.03% 120.80us 8.28K
folly_microlock(64thread) 58.02% 68.78us 14.54K
folly_sharedmutex(64thread) 68.43% 58.32us 17.15K
folly_distributedmutex(64thread) 200.38% 19.91us 50.22K
----------------------------------------------------------------------------
std_mutex(64thread) 35.67us 28.04K
google_spin(64thread) 111.44% 32.01us 31.24K
folly_microspin(64thread) 94.45% 37.76us 26.48K
folly_picospin(64thread) 36.01% 99.05us 10.10K
folly_microlock(64thread) 54.11% 65.92us 15.17K
std_mutex(128thread) 75.67us 13.21K
google_spin(128thread) 116.14% 65.16us 15.35K
folly_microspin(128thread) 100.82% 75.06us 13.32K
folly_picospin(128thread) 44.99% 168.21us 5.94K
folly_microlock(128thread) 53.93% 140.31us 7.13K
folly_sharedmutex(128thread) 64.37% 117.55us 8.51K
folly_distributedmutex(128thread) 185.71% 40.75us 24.54K
============================================================================
*/
......@@ -222,16 +222,17 @@ class DeterministicSchedule : boost::noncopyable {
* DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that
* cooperates with DeterministicSchedule.
*/
template <typename T>
struct DeterministicAtomic {
template <typename T, typename Schedule = DeterministicSchedule>
struct DeterministicAtomicImpl {
std::atomic<T> data;
DeterministicAtomic() = default;
~DeterministicAtomic() = default;
DeterministicAtomic(DeterministicAtomic<T> const&) = delete;
DeterministicAtomic<T>& operator=(DeterministicAtomic<T> const&) = delete;
DeterministicAtomicImpl() = default;
~DeterministicAtomicImpl() = default;
DeterministicAtomicImpl(DeterministicAtomicImpl<T> const&) = delete;
DeterministicAtomicImpl<T>& operator=(DeterministicAtomicImpl<T> const&) =
delete;
constexpr /* implicit */ DeterministicAtomic(T v) noexcept : data(v) {}
constexpr /* implicit */ DeterministicAtomicImpl(T v) noexcept : data(v) {}
bool is_lock_free() const noexcept {
return data.is_lock_free();
......@@ -249,13 +250,13 @@ struct DeterministicAtomic {
T v1,
std::memory_order success,
std::memory_order failure) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
auto orig = v0;
bool rv = data.compare_exchange_strong(v0, v1, success, failure);
FOLLY_TEST_DSCHED_VLOG(
this << ".compare_exchange_strong(" << std::hex << orig << ", "
<< std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
DeterministicSchedule::afterSharedAccess(rv);
Schedule::afterSharedAccess(rv);
return rv;
}
......@@ -271,175 +272,175 @@ struct DeterministicAtomic {
T v1,
std::memory_order success,
std::memory_order failure) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
auto orig = v0;
bool rv = data.compare_exchange_weak(v0, v1, success, failure);
FOLLY_TEST_DSCHED_VLOG(
this << ".compare_exchange_weak(" << std::hex << orig << ", "
<< std::hex << v1 << ") -> " << rv << "," << std::hex << v0);
DeterministicSchedule::afterSharedAccess(rv);
Schedule::afterSharedAccess(rv);
return rv;
}
T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.exchange(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".exchange(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
/* implicit */ operator T() const noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.operator T();
FOLLY_TEST_DSCHED_VLOG(this << "() -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.load(mo);
FOLLY_TEST_DSCHED_VLOG(this << ".load() -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data = v);
FOLLY_TEST_DSCHED_VLOG(this << " = " << std::hex << v);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
data.store(v, mo);
FOLLY_TEST_DSCHED_VLOG(this << ".store(" << std::hex << v << ")");
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
}
T operator++() noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = ++data;
FOLLY_TEST_DSCHED_VLOG(this << " pre++ -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator++(int /* postDummy */) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data++;
FOLLY_TEST_DSCHED_VLOG(this << " post++ -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator--() noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = --data;
FOLLY_TEST_DSCHED_VLOG(this << " pre-- -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator--(int /* postDummy */) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data--;
FOLLY_TEST_DSCHED_VLOG(this << " post-- -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator+=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data += v);
FOLLY_TEST_DSCHED_VLOG(
this << " += " << std::hex << v << " -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T fetch_add(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.fetch_add(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".fetch_add(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator-=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data -= v);
FOLLY_TEST_DSCHED_VLOG(
this << " -= " << std::hex << v << " -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T fetch_sub(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.fetch_sub(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".fetch_sub(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator&=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data &= v);
FOLLY_TEST_DSCHED_VLOG(
this << " &= " << std::hex << v << " -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T fetch_and(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.fetch_and(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".fetch_and(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator|=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data |= v);
FOLLY_TEST_DSCHED_VLOG(
this << " |= " << std::hex << v << " -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T fetch_or(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.fetch_or(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".fetch_or(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T operator^=(T v) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = (data ^= v);
FOLLY_TEST_DSCHED_VLOG(
this << " ^= " << std::hex << v << " -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
T fetch_xor(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
DeterministicSchedule::beforeSharedAccess();
Schedule::beforeSharedAccess();
T rv = data.fetch_xor(v, mo);
FOLLY_TEST_DSCHED_VLOG(
this << ".fetch_xor(" << std::hex << v << ") -> " << std::hex << rv);
DeterministicSchedule::afterSharedAccess(true);
Schedule::afterSharedAccess(true);
return rv;
}
......@@ -449,6 +450,9 @@ struct DeterministicAtomic {
}
};
template <typename T>
using DeterministicAtomic = DeterministicAtomicImpl<T, DeterministicSchedule>;
/* Futex extensions for DeterministicSchedule based Futexes */
int futexWakeImpl(
const detail::Futex<test::DeterministicAtomic>* futex,
......@@ -461,6 +465,26 @@ detail::FutexResult futexWaitImpl(
std::chrono::steady_clock::time_point const* absSteadyTime,
uint32_t waitMask);
/**
* Implementations of the atomic_wait API for DeterministicAtomic, these are
* no-ops here. Which for a correct implementation should not make a
* difference because threads are required to have atomic operations around
* waits and wakes
*/
template <typename Integer>
void atomic_wait(const DeterministicAtomic<Integer>*, Integer) {}
template <typename Integer, typename Clock, typename Duration>
std::cv_status atomic_wait_until(
const DeterministicAtomic<Integer>*,
Integer,
const std::chrono::time_point<Clock, Duration>&) {
return std::cv_status::no_timeout;
}
template <typename Integer>
void atomic_notify_one(const DeterministicAtomic<Integer>*) {}
template <typename Integer>
void atomic_notify_all(const DeterministicAtomic<Integer>*) {}
/**
* DeterministicMutex is a drop-in replacement of std::mutex that
* cooperates with DeterministicSchedule.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment