Commit 7fbec719 authored by Marc Celani's avatar Marc Celani Committed by Sara Golemon

Introduce LifoSem

Summary: A LIFO semaphore is useful for building a thread pool because we can keep cpu caches warmer by continually reusing the last used thread. This diff introduces a LIFO semaphore to folly.

Test Plan: unit tests

Reviewed By: davejwatson@fb.com

FB internal diff: D1280405
parent 59c7c8ea
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LifoSem.h"
/// Raw node storage is preallocated in a contiguous memory segment,
/// but we use an anonymous mmap so the physical memory used (RSS) will
/// only reflect the maximum number of waiters that actually existed
/// concurrently. For blocked threads the max node count is limited by the
/// number of threads, so we can conservatively estimate that this will be
/// < 10k. For LifoEventSem, however, we could potentially have many more.
///
/// On a 64-bit architecture each LifoSemRawNode takes 16 bytes. We make
/// the pool 1 million entries.
LIFOSEM_DECLARE_POOL(std::atomic, 1000000)
namespace folly {
ShutdownSemError::ShutdownSemError(const std::string& msg)
: std::runtime_error(msg)
{}
ShutdownSemError::~ShutdownSemError() noexcept {
}
}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_LIFOSEM_H
#define FOLLY_LIFOSEM_H
#include <string.h>
#include <stdint.h>
#include <atomic>
#include <algorithm>
#include <memory>
#include <system_error>
#include "folly/AtomicStruct.h"
#include "folly/Baton.h"
#include "folly/IndexedMemPool.h"
#include "folly/Likely.h"
#include "folly/detail/CacheLocality.h"
namespace folly {
template <template<typename> class Atom = std::atomic,
class BatonType = Baton<Atom>>
struct LifoSemImpl;
/// LifoSem is a semaphore that wakes its waiters in a manner intended to
/// maximize performance rather than fairness. It should be preferred
/// to a mutex+condvar or POSIX sem_t solution when all of the waiters
/// are equivalent. It is faster than a condvar or sem_t, and it has a
/// shutdown state that might save you a lot of complexity when it comes
/// time to shut down your work pipelines. LifoSem is larger than sem_t,
/// but that is only because it uses padding and alignment to avoid
/// false sharing.
///
/// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
/// state that awakens all waiters. LifoSem is faster than sem_t because
/// it performs exact wakeups, so it often requires fewer system calls.
/// It provides all of the functionality of sem_t except for timed waiting.
/// It is called LifoSem because its wakeup policy is approximately LIFO,
/// rather than the usual FIFO.
///
/// The core semaphore operations provided are:
///
/// -- post() -- if there is a pending waiter, wake it up, otherwise
/// increment the value of the semaphore. If the value of the semaphore
/// is already 2^32-1, does nothing. Compare to sem_post().
///
/// -- post(n) -- equivalent to n calls to post(), but much more efficient.
/// sem_t has no equivalent to this method.
///
/// -- bool tryWait() -- if the semaphore's value is positive, decrements it
/// and returns true, otherwise returns false. Compare to sem_trywait().
///
/// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
/// value by n, returning the amount by which it actually was decremented
/// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls
/// to tryWait(). sem_t has no equivalent to this method.
///
/// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait().
///
/// LifoSem also has the notion of a shutdown state, in which any calls
/// that would block (or are already blocked) throw ShutdownSemError.
/// Note the difference between a call to wait() and a call to wait()
/// that might block. In the former case tryWait() would succeed, and no
/// isShutdown() check is performed. In the latter case an exception is
/// thrown. This behavior allows a LifoSem controlling work distribution
/// to drain. If you want to immediately stop all waiting on shutdown,
/// you can just check isShutdown() yourself (preferrably wrapped in
/// an UNLIKELY). This fast-stop behavior is easy to add, but difficult
/// to remove if you want the draining behavior, which is why we have
/// chosen the former. Since wait() is the only method that can block,
/// it is the only one that is affected by the shutdown state.
///
/// All LifoSem operations operations except valueGuess() are guaranteed
/// to be linearizable.
typedef LifoSemImpl<> LifoSem;
/// The exception thrown when wait()ing on an isShutdown() LifoSem
struct ShutdownSemError : public std::runtime_error {
explicit ShutdownSemError(const std::string& msg);
virtual ~ShutdownSemError() noexcept;
};
namespace detail {
// Internally, a LifoSem is either a value or a linked list of wait nodes.
// This union is captured in the LifoSemHead type, which holds either a
// value or an indexed pointer to the list. LifoSemHead itself is a value
// type, the head is a mutable atomic box containing a LifoSemHead value.
// Each wait node corresponds to exactly one waiter. Values can flow
// through the semaphore either by going into and out of the head's value,
// or by direct communication from a poster to a waiter. The former path
// is taken when there are no pending waiters, the latter otherwise. The
// general flow of a post is to try to increment the value or pop-and-post
// a wait node. Either of those have the effect of conveying one semaphore
// unit. Waiting is the opposite, either a decrement of the value or
// push-and-wait of a wait node. The generic LifoSemBase abstracts the
// actual mechanism by which a wait node's post->wait communication is
// performed, which is why we have LifoSemRawNode and LifoSemNode.
/// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
/// for user-specified Handoff types. This is done so that we can have
/// a large static IndexedMemPool of nodes, instead of per-type pools
template <template<typename> class Atom>
struct LifoSemRawNode {
std::aligned_storage<sizeof(void*),alignof(void*)>::type raw;
/// The IndexedMemPool index of the next node in this chain, or 0
/// if none. This will be set to uint32_t(-1) if the node is being
/// posted due to a shutdown-induced wakeup
uint32_t next;
bool isShutdownNotice() const { return next == uint32_t(-1); }
void clearShutdownNotice() { next = 0; }
void setShutdownNotice() { next = uint32_t(-1); }
typedef folly::IndexedMemPool<LifoSemRawNode<Atom>,32,200,Atom> Pool;
/// Storage for all of the waiter nodes for LifoSem-s that use Atom
static Pool pool;
};
/// Use this macro to declare the static storage that backs the raw nodes
/// for the specified atomic type
#define LIFOSEM_DECLARE_POOL(Atom, capacity) \
template<> \
folly::detail::LifoSemRawNode<Atom>::Pool \
folly::detail::LifoSemRawNode<Atom>::pool((capacity));
/// Handoff is a type not bigger than a void* that knows how to perform a
/// single post() -> wait() communication. It must have a post() method.
/// If it has a wait() method then LifoSemBase's wait() implementation
/// will work out of the box, otherwise you will need to specialize
/// LifoSemBase::wait accordingly.
template <typename Handoff, template<typename> class Atom>
struct LifoSemNode : public LifoSemRawNode<Atom> {
static_assert(sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
"Handoff too big for small-object optimization, use indirection");
static_assert(alignof(Handoff) <=
alignof(decltype(LifoSemRawNode<Atom>::raw)),
"Handoff alignment constraint not satisfied");
template <typename ...Args>
void init(Args&&... args) {
new (&this->raw) Handoff(std::forward<Args>(args)...);
}
void destroy() {
handoff().~Handoff();
#ifndef NDEBUG
memset(&this->raw, 'F', sizeof(this->raw));
#endif
}
Handoff& handoff() {
return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
}
const Handoff& handoff() const {
return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
}
};
template <typename Handoff, template<typename> class Atom>
struct LifoSemNodeRecycler {
void operator()(LifoSemNode<Handoff,Atom>* elem) const {
elem->destroy();
auto idx = LifoSemRawNode<Atom>::pool.locateElem(elem);
LifoSemRawNode<Atom>::pool.recycleIndex(idx);
}
};
/// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
/// bits, and a sequence number used to avoid ABA problems in the lock-free
/// management of the LifoSem's wait lists. The value can either hold
/// an integral semaphore value (if there are no waiters) or a node index
/// (see IndexedMemPool) for the head of a list of wait nodes
class LifoSemHead {
// What we really want are bitfields:
// uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
// Unfortunately g++ generates pretty bad code for this sometimes (I saw
// -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
// in bulk, for example). We can generate better code anyway by assuming
// that setters won't be given values that cause under/overflow, and
// putting the sequence at the end where its planned overflow doesn't
// need any masking.
//
// data == 0 (empty list) with isNodeIdx is conceptually the same
// as data == 0 (no unclaimed increments) with !isNodeIdx, we always
// convert the former into the latter to make the logic simpler.
enum {
IsNodeIdxShift = 32,
IsShutdownShift = 33,
SeqShift = 34,
};
enum : uint64_t {
IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
IsShutdownMask = uint64_t(1) << IsShutdownShift,
SeqIncr = uint64_t(1) << SeqShift,
SeqMask = ~(SeqIncr - 1),
};
public:
uint64_t bits;
//////// getters
inline uint32_t idx() const {
assert(isNodeIdx());
assert(uint32_t(bits) != 0);
return uint32_t(bits);
}
inline uint32_t value() const {
assert(!isNodeIdx());
return uint32_t(bits);
}
inline constexpr bool isNodeIdx() const {
return (bits & IsNodeIdxMask) != 0;
}
inline constexpr bool isShutdown() const {
return (bits & IsShutdownMask) != 0;
}
inline constexpr uint32_t seq() const {
return uint32_t(bits >> SeqShift);
}
//////// setter-like things return a new struct
/// This should only be used for initial construction, not for setting
/// the value, because it clears the sequence number
static inline constexpr LifoSemHead fresh(uint32_t value) {
return LifoSemHead{ value };
}
/// Returns the LifoSemHead that results from popping a waiter node,
/// given the current waiter node's next ptr
inline LifoSemHead withPop(uint32_t idxNext) const {
assert(isNodeIdx());
if (idxNext == 0) {
// no isNodeIdx bit or data bits. Wraparound of seq bits is okay
return LifoSemHead{ (bits & (SeqMask | IsShutdownMask)) + SeqIncr };
} else {
// preserve sequence bits (incremented with wraparound okay) and
// isNodeIdx bit, replace all data bits
return LifoSemHead{
(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
SeqIncr + idxNext };
}
}
/// Returns the LifoSemHead that results from pushing a new waiter node
inline LifoSemHead withPush(uint32_t idx) const {
assert(isNodeIdx() || value() == 0);
assert(!isShutdown());
assert(idx != 0);
return LifoSemHead{ (bits & SeqMask) | IsNodeIdxMask | idx };
}
/// Returns the LifoSemHead with value increased by delta, with
/// saturation if the maximum value is reached
inline LifoSemHead withValueIncr(uint32_t delta) const {
assert(!isNodeIdx());
auto rv = LifoSemHead{ bits + SeqIncr + delta };
if (UNLIKELY(rv.isNodeIdx())) {
// value has overflowed into the isNodeIdx bit
rv = LifoSemHead{ (rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1) };
}
return rv;
}
/// Returns the LifoSemHead that results from decrementing the value
inline LifoSemHead withValueDecr(uint32_t delta) const {
assert(delta > 0 && delta <= value());
return LifoSemHead{ bits + SeqIncr - delta };
}
/// Returns the LifoSemHead with the same state as the current node,
/// but with the shutdown bit set
inline LifoSemHead withShutdown() const {
return LifoSemHead{ bits | IsShutdownMask };
}
inline constexpr bool operator== (const LifoSemHead& rhs) const {
return bits == rhs.bits;
}
inline constexpr bool operator!= (const LifoSemHead& rhs) const {
return !(*this == rhs);
}
};
/// LifoSemBase is the engine for several different types of LIFO
/// semaphore. LifoSemBase handles storage of positive semaphore values
/// and wait nodes, but the actual waiting and notification mechanism is
/// up to the client.
///
/// The Handoff type is responsible for arranging one wakeup notification.
/// See LifoSemNode for more information on how to make your own.
template <typename Handoff,
template<typename> class Atom = std::atomic>
struct LifoSemBase : boost::noncopyable {
/// Constructor
explicit LifoSemBase(uint32_t initialValue = 0)
: head_(LifoSemHead::fresh(initialValue)) {}
/// Silently saturates if value is already 2^32-1
void post() {
auto idx = incrOrPop(1);
if (idx != 0) {
idxToNode(idx).handoff().post();
}
}
/// Equivalent to n calls to post(), except may be much more efficient.
/// At any point in time at which the semaphore's value would exceed
/// 2^32-1 if tracked with infinite precision, it may be silently
/// truncated to 2^32-1. This saturation is not guaranteed to be exact,
/// although it is guaranteed that overflow won't result in wrap-around.
/// There would be a substantial performance and complexity cost in
/// guaranteeing exact saturation (similar to the cost of maintaining
/// linearizability near the zero value, but without as much of
/// a benefit).
void post(uint32_t n) {
uint32_t idx;
while (n > 0 && (idx = incrOrPop(n)) != 0) {
// pop accounts for only 1
idxToNode(idx).handoff().post();
--n;
}
}
/// Returns true iff shutdown() has been called
bool isShutdown() const {
return UNLIKELY(head_.load(std::memory_order_acquire).isShutdown());
}
/// Prevents blocking on this semaphore, causing all blocking wait()
/// calls to throw ShutdownSemError. Both currently blocked wait() and
/// future calls to wait() for which tryWait() would return false will
/// cause an exception. Calls to wait() for which the matching post()
/// has already occurred will proceed normally.
void shutdown() {
// first set the shutdown bit
auto h = head_.load(std::memory_order_acquire);
while (!h.isShutdown()) {
if (head_.compare_exchange_strong(h, h.withShutdown())) {
// success
h = h.withShutdown();
break;
}
// compare_exchange_strong rereads h, retry
}
// now wake up any waiters
while (h.isNodeIdx()) {
auto& node = idxToNode(h.idx());
auto repl = h.withPop(node.next);
if (head_.compare_exchange_strong(h, repl)) {
// successful pop, wake up the waiter and move on. The next
// field is used to convey that this wakeup didn't consume a value
node.setShutdownNotice();
node.handoff().post();
h = repl;
}
}
}
/// Returns true iff value was decremented
bool tryWait() {
uint32_t n = 1;
auto rv = decrOrPush(n, 0);
assert((rv == WaitResult::DECR && n == 0) ||
(rv != WaitResult::DECR && n == 1));
// SHUTDOWN is okay here, since we don't actually wait
return rv == WaitResult::DECR;
}
/// Equivalent to (but may be much more efficient than) n calls to
/// tryWait(). Returns the total amount by which the semaphore's value
/// was decreased
uint32_t tryWait(uint32_t n) {
auto const orig = n;
while (n > 0) {
#ifndef NDEBUG
auto prev = n;
#endif
auto rv = decrOrPush(n, 0);
assert((rv == WaitResult::DECR && n < prev) ||
(rv != WaitResult::DECR && n == prev));
if (rv != WaitResult::DECR) {
break;
}
}
return orig - n;
}
/// Blocks the current thread until there is a matching post or the
/// semaphore is shut down. Throws ShutdownSemError if the semaphore
/// has been shut down and this method would otherwise be blocking.
/// Note that wait() doesn't throw during shutdown if tryWait() would
/// return true
void wait() {
// early check isn't required for correctness, but is an important
// perf win if we can avoid allocating and deallocating a node
if (tryWait()) {
return;
}
// allocateNode() won't compile unless Handoff has a default
// constructor
UniquePtr node = allocateNode();
auto rv = tryWaitOrPush(*node);
if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
assert(isShutdown());
throw ShutdownSemError("wait() would block but semaphore is shut down");
}
if (rv == WaitResult::PUSH) {
node->handoff().wait();
if (UNLIKELY(node->isShutdownNotice())) {
// this wait() didn't consume a value, it was triggered by shutdown
assert(isShutdown());
throw ShutdownSemError(
"blocking wait() interrupted by semaphore shutdown");
}
// node->handoff().wait() can't return until after the node has
// been popped and post()ed, so it is okay for the UniquePtr to
// recycle the node now
}
// else node wasn't pushed, so it is safe to recycle
}
/// Returns a guess at the current value, designed for debugging.
/// If there are no concurrent posters or waiters then this will
/// be correct
uint32_t valueGuess() const {
// this is actually linearizable, but we don't promise that because
// we may want to add striping in the future to help under heavy
// contention
auto h = head_.load(std::memory_order_acquire);
return h.isNodeIdx() ? 0 : h.value();
}
protected:
enum class WaitResult {
PUSH,
DECR,
SHUTDOWN,
};
/// The type of a std::unique_ptr that will automatically return a
/// LifoSemNode to the appropriate IndexedMemPool
typedef std::unique_ptr<LifoSemNode<Handoff, Atom>,
LifoSemNodeRecycler<Handoff, Atom>> UniquePtr;
/// Returns a node that can be passed to decrOrLink
template <typename... Args>
UniquePtr allocateNode(Args&&... args) {
auto idx = LifoSemRawNode<Atom>::pool.allocIndex();
if (idx != 0) {
auto& node = idxToNode(idx);
node.clearShutdownNotice();
try {
node.init(std::forward<Args>(args)...);
} catch (...) {
LifoSemRawNode<Atom>::pool.recycleIndex(idx);
throw;
}
return UniquePtr(&node);
} else {
return UniquePtr();
}
}
/// Returns DECR if the semaphore value was decremented (and waiterNode
/// was untouched), PUSH if a reference to the wait node was pushed,
/// or SHUTDOWN if decrement was not possible and push wasn't allowed
/// because isShutdown(). Ownership of the wait node remains the
/// responsibility of the caller, who must not release it until after
/// the node's Handoff has been posted.
WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
uint32_t n = 1;
return decrOrPush(n, nodeToIdx(waiterNode));
}
private:
folly::AtomicStruct<LifoSemHead,Atom> head_
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING;
char padding_[folly::detail::CacheLocality::kFalseSharingRange -
sizeof(LifoSemHead)];
static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
auto raw = &LifoSemRawNode<Atom>::pool[idx];
return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
}
static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
return LifoSemRawNode<Atom>::pool.locateElem(&node);
}
/// Either increments by n and returns 0, or pops a node and returns it.
/// If n + the stripe's value overflows, then the stripe's value
/// saturates silently at 2^32-1
uint32_t incrOrPop(uint32_t n) {
while (true) {
assert(n > 0);
auto head = head_.load(std::memory_order_acquire);
if (head.isNodeIdx()) {
auto& node = idxToNode(head.idx());
if (head_.compare_exchange_strong(head, head.withPop(node.next))) {
// successful pop
return head.idx();
}
} else {
auto after = head.withValueIncr(n);
if (head_.compare_exchange_strong(head, after)) {
// successful incr
return 0;
}
}
// retry
}
}
/// Returns DECR if some amount was decremented, with that amount
/// subtracted from n. If n is 1 and this function returns DECR then n
/// must be 0 afterward. Returns PUSH if no value could be decremented
/// and idx was pushed, or if idx was zero and no push was performed but
/// a push would have been performed with a valid node. Returns SHUTDOWN
/// if the caller should have blocked but isShutdown(). If idx == 0,
/// may return PUSH even after isShutdown() or may return SHUTDOWN
WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
assert(n > 0);
while (true) {
auto head = head_.load(std::memory_order_acquire);
if (!head.isNodeIdx() && head.value() > 0) {
// decr
auto delta = std::min(n, head.value());
if (head_.compare_exchange_strong(head, head.withValueDecr(delta))) {
n -= delta;
return WaitResult::DECR;
}
} else {
// push
if (idx == 0) {
return WaitResult::PUSH;
}
if (UNLIKELY(head.isShutdown())) {
return WaitResult::SHUTDOWN;
}
auto& node = idxToNode(idx);
node.next = head.isNodeIdx() ? head.idx() : 0;
if (head_.compare_exchange_strong(head, head.withPush(idx))) {
// push succeeded
return WaitResult::PUSH;
}
}
}
// retry
}
};
} // namespace detail
template <template<typename> class Atom, class BatonType>
struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
explicit LifoSemImpl(uint32_t v = 0)
: detail::LifoSemBase<BatonType, Atom>(v) {}
};
} // namespace folly
#endif
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/LifoSem.h"
#include "folly/test/DeterministicSchedule.h"
#include <thread>
#include <semaphore.h>
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "folly/Benchmark.h"
#include "folly/Random.h"
using namespace folly;
using namespace folly::test;
typedef LifoSemImpl<DeterministicAtomic> DLifoSem;
typedef DeterministicSchedule DSched;
LIFOSEM_DECLARE_POOL(DeterministicAtomic, 100000)
TEST(LifoSem, basic) {
LifoSem sem;
EXPECT_FALSE(sem.tryWait());
sem.post();
EXPECT_TRUE(sem.tryWait());
sem.post();
sem.wait();
}
TEST(LifoSem, multi) {
LifoSem sem;
const int opsPerThread = 10000;
std::thread threads[10];
std::atomic<int> blocks(0);
for (auto& thr : threads) {
thr = std::thread([&]{
int b = 0;
for (int i = 0; i < opsPerThread; ++i) {
if (!sem.tryWait()) {
sem.wait();
++b;
}
sem.post();
}
blocks += b;
});
}
// start the flood
sem.post();
for (auto& thr : threads) {
thr.join();
}
LOG(INFO) << opsPerThread * sizeof(threads)/sizeof(threads[0])
<< " post/wait pairs, " << blocks << " blocked";
}
TEST(LifoSem, pingpong) {
DSched sched(DSched::uniform(0));
const int iters = 100;
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
DLifoSem b;
auto thr = DSched::thread([&]{
for (int i = 0; i < iters; ++i) {
a.wait();
// main thread can't be running here
EXPECT_EQ(a.valueGuess(), 0);
EXPECT_EQ(b.valueGuess(), 0);
b.post();
}
});
for (int i = 0; i < iters; ++i) {
a.post();
b.wait();
// child thread can't be running here
EXPECT_EQ(a.valueGuess(), 0);
EXPECT_EQ(b.valueGuess(), 0);
}
DSched::join(thr);
}
}
TEST(LifoSem, mutex) {
DSched sched(DSched::uniform(0));
const int iters = 100;
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
auto thr = DSched::thread([&]{
for (int i = 0; i < iters; ++i) {
a.wait();
a.post();
}
});
for (int i = 0; i < iters; ++i) {
a.post();
a.wait();
}
a.post();
DSched::join(thr);
a.wait();
}
}
TEST(LifoSem, no_blocking) {
long seed = folly::randomNumberSeed() % 10000;
LOG(INFO) << "seed=" << seed;
DSched sched(DSched::uniform(seed));
const int iters = 100;
const int numThreads = 2;
const int width = 10;
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
std::vector<std::thread> threads;
while (threads.size() < numThreads) {
threads.emplace_back(DSched::thread([&]{
for (int i = 0; i < iters; ++i) {
a.post(width);
for (int w = 0; w < width; ++w) {
a.wait();
}
}
}));
}
for (auto& thr : threads) {
DSched::join(thr);
}
}
}
TEST(LifoSem, one_way) {
long seed = folly::randomNumberSeed() % 10000;
LOG(INFO) << "seed=" << seed;
DSched sched(DSched::uniformSubset(seed, 1, 6));
const int iters = 1000;
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
auto thr = DSched::thread([&]{
for (int i = 0; i < iters; ++i) {
a.wait();
}
});
for (int i = 0; i < iters; ++i) {
a.post();
}
DSched::join(thr);
}
}
TEST(LifoSem, shutdown_race) {
long seed = folly::randomNumberSeed() % 10000;
LOG(INFO) << "seed=" << seed;
bool shutdownWon = false;
bool shutdownLost = false;
for (int pass = 0; pass < 1000; ++pass) {
DSched sched(DSched::uniformSubset(seed + pass, 1, 1 + (pass % 50)));
DLifoSem a;
int waitCount = 0;
auto thr = DSched::thread([&]{
try {
while (true) {
a.wait();
++waitCount;
}
} catch (ShutdownSemError& x) {
// expected
EXPECT_TRUE(a.isShutdown());
}
});
EXPECT_TRUE(!a.isShutdown());
a.shutdown();
EXPECT_TRUE(a.isShutdown());
a.post();
DSched::join(thr);
EXPECT_EQ(1, waitCount + a.valueGuess());
if (waitCount == 0) {
shutdownWon = true;
} else {
shutdownLost = true;
}
}
EXPECT_TRUE(shutdownWon);
EXPECT_TRUE(shutdownLost);
}
TEST(LifoSem, shutdown_multi) {
DSched sched(DSched::uniform(0));
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
std::vector<std::thread> threads;
while (threads.size() < 20) {
threads.push_back(DSched::thread([&]{
try {
a.wait();
EXPECT_TRUE(false);
} catch (ShutdownSemError& x) {
// expected
EXPECT_TRUE(a.isShutdown());
}
}));
}
a.shutdown();
for (auto& thr : threads) {
DSched::join(thr);
}
}
}
TEST(LifoSem, multi_try_wait_simple) {
LifoSem sem;
sem.post(5);
auto n = sem.tryWait(10); // this used to trigger an assert
ASSERT_EQ(5, n);
}
TEST(LifoSem, multi_try_wait) {
long seed = folly::randomNumberSeed() % 10000;
LOG(INFO) << "seed=" << seed;
DSched sched(DSched::uniform(seed));
DLifoSem sem;
const int NPOSTS = 1000;
auto producer = [&]{
for (int i=0; i<NPOSTS; ++i) {
sem.post();
}
};
std::atomic<bool> consumer_stop;
int consumed = 0;
auto consumer = [&]{
bool stop;
do {
stop = consumer_stop.load();
int n;
do {
n = sem.tryWait(10);
consumed += n;
} while (n > 0);
} while (!stop);
};
std::thread producer_thread(DSched::thread(producer));
std::thread consumer_thread(DSched::thread(consumer));
DSched::join(producer_thread);
consumer_stop.store(true);
DSched::join(consumer_thread);
ASSERT_EQ(NPOSTS, consumed);
}
BENCHMARK(lifo_sem_pingpong, iters) {
LifoSem a;
LifoSem b;
auto thr = std::thread([&]{
for (int i = 0; i < iters; ++i) {
a.wait();
b.post();
}
});
for (int i = 0; i < iters; ++i) {
a.post();
b.wait();
}
thr.join();
}
BENCHMARK(lifo_sem_oneway, iters) {
LifoSem a;
auto thr = std::thread([&]{
for (int i = 0; i < iters; ++i) {
a.wait();
}
});
for (int i = 0; i < iters; ++i) {
a.post();
}
thr.join();
}
BENCHMARK(single_thread_lifo_post, iters) {
LifoSem sem;
for (int n = 0; n < iters; ++n) {
sem.post();
asm volatile ("":::"memory");
}
}
BENCHMARK(single_thread_lifo_wait, iters) {
LifoSem sem(iters);
for (int n = 0; n < iters; ++n) {
sem.wait();
asm volatile ("":::"memory");
}
}
BENCHMARK(single_thread_lifo_postwait, iters) {
LifoSem sem;
for (int n = 0; n < iters; ++n) {
sem.post();
asm volatile ("":::"memory");
sem.wait();
asm volatile ("":::"memory");
}
}
BENCHMARK(single_thread_lifo_trywait, iters) {
LifoSem sem;
for (int n = 0; n < iters; ++n) {
EXPECT_FALSE(sem.tryWait());
asm volatile ("":::"memory");
}
}
BENCHMARK(single_thread_posix_postwait, iters) {
sem_t sem;
EXPECT_EQ(sem_init(&sem, 0, 0), 0);
for (int n = 0; n < iters; ++n) {
EXPECT_EQ(sem_post(&sem), 0);
EXPECT_EQ(sem_wait(&sem), 0);
}
EXPECT_EQ(sem_destroy(&sem), 0);
}
BENCHMARK(single_thread_posix_trywait, iters) {
sem_t sem;
EXPECT_EQ(sem_init(&sem, 0, 0), 0);
for (int n = 0; n < iters; ++n) {
EXPECT_EQ(sem_trywait(&sem), -1);
}
EXPECT_EQ(sem_destroy(&sem), 0);
}
static void contendedUse(uint n, int posters, int waiters) {
LifoSemImpl<std::atomic> sem;
std::vector<std::thread> threads;
std::atomic<bool> go(false);
BENCHMARK_SUSPEND {
for (int t = 0; t < waiters; ++t) {
threads.emplace_back([=,&sem] {
for (uint i = t; i < n; i += waiters) {
sem.wait();
}
});
}
for (int t = 0; t < posters; ++t) {
threads.emplace_back([=,&sem,&go] {
while (!go.load()) {
std::this_thread::yield();
}
for (uint i = t; i < n; i += posters) {
sem.post();
}
});
}
}
go.store(true);
for (auto& thr : threads) {
thr.join();
}
}
BENCHMARK_DRAW_LINE()
BENCHMARK_NAMED_PARAM(contendedUse, 1_to_1, 1, 1)
BENCHMARK_NAMED_PARAM(contendedUse, 1_to_32, 1, 32)
BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1, 31, 1)
BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16)
BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
// sudo nice -n -20 tao/queues/LifoSemTests --benchmark --bm_min_iters=10000000
// ============================================================================
// tao/queues/LifoSemTests.cpp relative time/iter iters/s
// ============================================================================
// lifo_sem_pingpong 1.91us 522.92K
// lifo_sem_oneway 211.18ns 4.74M
// single_thread_lifo_post 19.71ns 50.75M
// single_thread_lifo_wait 18.84ns 53.09M
// single_thread_lifo_postwait 39.41ns 25.37M
// single_thread_lifo_trywait 912.10ps 1.10G
// single_thread_posix_postwait 32.93ns 30.37M
// single_thread_posix_trywait 10.06ns 99.36M
// ----------------------------------------------------------------------------
// contendedUse(1_to_1) 208.21ns 4.80M
// contendedUse(1_to_32) 532.41ns 1.88M
// contendedUse(32_to_1) 153.74ns 6.50M
// contendedUse(16_to_16) 301.86ns 3.31M
// contendedUse(32_to_32) 268.32ns 3.73M
// contendedUse(32_to_1000) 966.27ns 1.03M
// ============================================================================
int main(int argc, char ** argv) {
testing::InitGoogleTest(&argc, argv);
google::ParseCommandLineFlags(&argc, &argv, true);
int rv = RUN_ALL_TESTS();
folly::runBenchmarksOnFlag();
return rv;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment