Commit d43e710c authored by Phil Willoughby's avatar Phil Willoughby Committed by Facebook Github Bot 6

Convert a polling loop to a futex wait

Summary:Add a new method to MPMCQueue:
```
template <class Clock, typename... Args>
  bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
                     Args&&... args) noexcept
```
This allows you to write producers which terminate reliably in the absence of consumers.

Returns `true` if `args` was enqueued, `false` otherwise.

`Clock` must be one of the types supported by the underlying call to `folly::detail::Futex::futexWaitUntil`; at time of writing these are `std::chrono::steady_clock` and `std::chrono::system_clock`.

Reviewed By: nbronson

Differential Revision: D2895574

fb-gh-sync-id: bdfabcd043191c149f1271e30ffc28476cc8a36e
shipit-source-id: bdfabcd043191c149f1271e30ffc28476cc8a36e
parent f509d73d
...@@ -284,6 +284,21 @@ class MPMCQueue : boost::noncopyable { ...@@ -284,6 +284,21 @@ class MPMCQueue : boost::noncopyable {
} }
} }
template <class Clock, typename... Args>
bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainPromisedPushTicketUntil(ticket, when)) {
// we have pre-validated that the ticket won't block, or rather that
// it won't block longer than it takes another thread to dequeue an
// element from the slot it identifies.
enqueueWithTicket(ticket, std::forward<Args>(args)...);
return true;
} else {
return false;
}
}
/// If the queue is not full, enqueues and returns true, otherwise /// If the queue is not full, enqueues and returns true, otherwise
/// returns false. Unlike write this method can be blocked by another /// returns false. Unlike write this method can be blocked by another
/// thread, specifically a read that has linearized (been assigned /// thread, specifically a read that has linearized (been assigned
...@@ -471,6 +486,28 @@ class MPMCQueue : boost::noncopyable { ...@@ -471,6 +486,28 @@ class MPMCQueue : boost::noncopyable {
} }
} }
/// Tries until when to obtain a push ticket for which
/// SingleElementQueue::enqueue won't block. Returns true on success, false
/// on failure.
/// ticket is filled on success AND failure.
template <class Clock>
bool tryObtainPromisedPushTicketUntil(
uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
bool deadlineReached = false;
while (!deadlineReached) {
if (tryObtainPromisedPushTicket(ticket)) {
return true;
}
// ticket is a blocking ticket until the preceding ticket has been
// processed: wait until this ticket's turn arrives. We have not reserved
// this ticket so we will have to re-attempt to get a non-blocking ticket
// if we wake up before we time-out.
deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
}
return false;
}
/// Tries to obtain a push ticket which can be satisfied if all /// Tries to obtain a push ticket which can be satisfied if all
/// in-progress pops complete. This function does not block, but /// in-progress pops complete. This function does not block, but
/// blocking may be required when using the returned ticket if some /// blocking may be required when using the returned ticket if some
...@@ -482,6 +519,7 @@ class MPMCQueue : boost::noncopyable { ...@@ -482,6 +519,7 @@ class MPMCQueue : boost::noncopyable {
auto numPops = popTicket_.load(std::memory_order_acquire); // B auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending // n will be negative if pops are pending
int64_t n = numPushes - numPops; int64_t n = numPushes - numPops;
rv = numPushes;
if (n >= static_cast<ssize_t>(capacity_)) { if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we // Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the // performed at A, because if numPushes was stale at B then the
...@@ -489,7 +527,6 @@ class MPMCQueue : boost::noncopyable { ...@@ -489,7 +527,6 @@ class MPMCQueue : boost::noncopyable {
return false; return false;
} }
if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) { if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
rv = numPushes;
return true; return true;
} }
} }
...@@ -597,7 +634,7 @@ struct SingleElementQueue { ...@@ -597,7 +634,7 @@ struct SingleElementQueue {
template <typename = typename std::enable_if< template <typename = typename std::enable_if<
(folly::IsRelocatable<T>::value && (folly::IsRelocatable<T>::value &&
boost::has_nothrow_constructor<T>::value) || boost::has_nothrow_constructor<T>::value) ||
std::is_nothrow_constructible<T,T&&>::value>::type> std::is_nothrow_constructible<T, T&&>::value>::type>
void enqueue(const uint32_t turn, void enqueue(const uint32_t turn,
Atom<uint32_t>& spinCutoff, Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff, const bool updateSpinCutoff,
...@@ -611,6 +648,20 @@ struct SingleElementQueue { ...@@ -611,6 +648,20 @@ struct SingleElementQueue {
ImplByMove, ImplByRelocation>::type()); ImplByMove, ImplByRelocation>::type());
} }
/// Waits until either:
/// 1: the dequeue turn preceding the given enqueue turn has arrived
/// 2: the given deadline has arrived
/// Case 1 returns true, case 2 returns false.
template <class Clock>
bool tryWaitForEnqueueTurnUntil(
const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff,
const std::chrono::time_point<Clock>& when) noexcept {
return sequencer_.tryWaitForTurn(
turn * 2, spinCutoff, updateSpinCutoff, &when);
}
bool mayEnqueue(const uint32_t turn) const noexcept { bool mayEnqueue(const uint32_t turn) const noexcept {
return sequencer_.isTurn(turn * 2); return sequencer_.isTurn(turn * 2);
} }
......
...@@ -82,10 +82,10 @@ struct TurnSequencer { ...@@ -82,10 +82,10 @@ struct TurnSequencer {
/// See tryWaitForTurn /// See tryWaitForTurn
/// Requires that `turn` is not a turn in the past. /// Requires that `turn` is not a turn in the past.
void waitForTurn(const uint32_t turn, void waitForTurn(const uint32_t turn,
Atom<uint32_t>& spinCutoff, Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff) noexcept { const bool updateSpinCutoff) noexcept {
bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff); bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
(void) success; (void)success;
assert(success); assert(success);
} }
...@@ -99,9 +99,15 @@ struct TurnSequencer { ...@@ -99,9 +99,15 @@ struct TurnSequencer {
/// before blocking and will adjust spinCutoff based on the results, /// before blocking and will adjust spinCutoff based on the results,
/// otherwise it will spin for at most spinCutoff spins. /// otherwise it will spin for at most spinCutoff spins.
/// Returns true if the wait succeeded, false if the turn is in the past /// Returns true if the wait succeeded, false if the turn is in the past
/// or the absTime time value is not nullptr and is reached before the turn
/// arrives
template <class Clock = std::chrono::steady_clock,
class Duration = typename Clock::duration>
bool tryWaitForTurn(const uint32_t turn, bool tryWaitForTurn(const uint32_t turn,
Atom<uint32_t>& spinCutoff, Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff) noexcept { const bool updateSpinCutoff,
const std::chrono::time_point<Clock, Duration>* absTime =
nullptr) noexcept {
uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
const uint32_t effectiveSpinCutoff = const uint32_t effectiveSpinCutoff =
updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
...@@ -142,7 +148,15 @@ struct TurnSequencer { ...@@ -142,7 +148,15 @@ struct TurnSequencer {
continue; continue;
} }
} }
state_.futexWait(new_state, futexChannel(turn)); if (absTime) {
auto futexResult =
state_.futexWaitUntil(new_state, *absTime, futexChannel(turn));
if (futexResult == FutexResult::TIMEDOUT) {
return false;
}
} else {
state_.futexWait(new_state, futexChannel(turn));
}
} }
if (updateSpinCutoff || prevThresh == 0) { if (updateSpinCutoff || prevThresh == 0) {
...@@ -179,9 +193,9 @@ struct TurnSequencer { ...@@ -179,9 +193,9 @@ struct TurnSequencer {
while (true) { while (true) {
assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
uint32_t new_state = encode( uint32_t new_state =
(turn + 1) << kTurnShift, encode((turn + 1) << kTurnShift,
max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
if (state_.compare_exchange_strong(state, new_state)) { if (state_.compare_exchange_strong(state, new_state)) {
if (max_waiter_delta != 0) { if (max_waiter_delta != 0) {
state_.futexWake(std::numeric_limits<int>::max(), state_.futexWake(std::numeric_limits<int>::max(),
...@@ -227,9 +241,7 @@ struct TurnSequencer { ...@@ -227,9 +241,7 @@ struct TurnSequencer {
/// Returns the bitmask to pass futexWait or futexWake when communicating /// Returns the bitmask to pass futexWait or futexWake when communicating
/// about the specified turn /// about the specified turn
int futexChannel(uint32_t turn) const noexcept { int futexChannel(uint32_t turn) const noexcept { return 1 << (turn & 31); }
return 1 << (turn & 31);
}
uint32_t decodeCurrentSturn(uint32_t state) const noexcept { uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
return state & ~kWaitersMask; return state & ~kWaitersMask;
...@@ -240,7 +252,7 @@ struct TurnSequencer { ...@@ -240,7 +252,7 @@ struct TurnSequencer {
} }
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD);
} }
}; };
......
...@@ -35,6 +35,14 @@ FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr); ...@@ -35,6 +35,14 @@ FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
using namespace folly; using namespace folly;
using namespace detail; using namespace detail;
using namespace test; using namespace test;
using std::chrono::time_point;
using std::chrono::steady_clock;
using std::chrono::seconds;
using std::chrono::milliseconds;
using std::string;
using std::make_unique;
using std::unique_ptr;
using std::vector;
typedef DeterministicSchedule DSched; typedef DeterministicSchedule DSched;
...@@ -61,7 +69,7 @@ void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) { ...@@ -61,7 +69,7 @@ void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
Atom<uint32_t> spinThreshold(0); Atom<uint32_t> spinThreshold(0);
int prev = -1; int prev = -1;
std::vector<std::thread> threads(numThreads); vector<std::thread> threads(numThreads);
for (int i = 0; i < numThreads; ++i) { for (int i = 0; i < numThreads; ++i) {
threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>, threads[i] = DSched::thread(std::bind(run_mt_sequencer_thread<Atom>,
numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold), numThreads, numOps, init, std::ref(seq), std::ref(spinThreshold),
...@@ -102,6 +110,12 @@ void runElementTypeTest(T&& src) { ...@@ -102,6 +110,12 @@ void runElementTypeTest(T&& src) {
cq.blockingRead(dest); cq.blockingRead(dest);
EXPECT_TRUE(cq.write(std::move(dest))); EXPECT_TRUE(cq.write(std::move(dest)));
EXPECT_TRUE(cq.read(dest)); EXPECT_TRUE(cq.read(dest));
auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
EXPECT_TRUE(cq.read(dest));
auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
EXPECT_TRUE(cq.read(dest));
} }
struct RefCounted { struct RefCounted {
...@@ -132,9 +146,9 @@ void intrusive_ptr_release(RefCounted const* p) { ...@@ -132,9 +146,9 @@ void intrusive_ptr_release(RefCounted const* p) {
TEST(MPMCQueue, lots_of_element_types) { TEST(MPMCQueue, lots_of_element_types) {
runElementTypeTest(10); runElementTypeTest(10);
runElementTypeTest(std::string("abc")); runElementTypeTest(string("abc"));
runElementTypeTest(std::make_pair(10, std::string("def"))); runElementTypeTest(std::make_pair(10, string("def")));
runElementTypeTest(std::vector<std::string>{ { "abc" } }); runElementTypeTest(vector<string>{{"abc"}});
runElementTypeTest(std::make_shared<char>('a')); runElementTypeTest(std::make_shared<char>('a'));
runElementTypeTest(folly::make_unique<char>('a')); runElementTypeTest(folly::make_unique<char>('a'));
runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted)); runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
...@@ -237,7 +251,7 @@ void runTryEnqDeqTest(int numThreads, int numOps) { ...@@ -237,7 +251,7 @@ void runTryEnqDeqTest(int numThreads, int numOps) {
MPMCQueue<int,Atom> cq(numThreads); MPMCQueue<int,Atom> cq(numThreads);
uint64_t n = numOps; uint64_t n = numOps;
std::vector<std::thread> threads(numThreads); vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0); std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) { for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>, threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
...@@ -294,9 +308,59 @@ uint64_t nowMicro() { ...@@ -294,9 +308,59 @@ uint64_t nowMicro() {
} }
template <typename Q> template <typename Q>
std::string producerConsumerBench(Q&& queue, std::string qName, struct WriteMethodCaller {
int numProducers, int numConsumers, WriteMethodCaller() {}
int numOps, bool ignoreContents = false) { virtual ~WriteMethodCaller() = default;
virtual bool callWrite(Q& q, int i) = 0;
virtual string methodName() = 0;
};
template <typename Q>
struct BlockingWriteCaller : public WriteMethodCaller<Q> {
bool callWrite(Q& q, int i) override {
q.blockingWrite(i);
return true;
}
string methodName() override { return "blockingWrite"; }
};
template <typename Q>
struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
string methodName() override { return "writeIfNotFull"; }
};
template <typename Q>
struct WriteCaller : public WriteMethodCaller<Q> {
bool callWrite(Q& q, int i) override { return q.write(i); }
string methodName() override { return "write"; }
};
template <typename Q,
class Clock = steady_clock,
class Duration = typename Clock::duration>
struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
const Duration duration_;
explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
bool callWrite(Q& q, int i) override {
auto then = Clock::now() + duration_;
return q.tryWriteUntil(then, i);
}
string methodName() override {
return folly::sformat(
"tryWriteUntil({}ms)",
std::chrono::duration_cast<milliseconds>(duration_).count());
}
};
template <typename Q>
string producerConsumerBench(Q&& queue,
string qName,
int numProducers,
int numConsumers,
int numOps,
WriteMethodCaller<Q>& writer,
bool ignoreContents = false) {
Q& q = queue; Q& q = queue;
struct rusage beginUsage; struct rusage beginUsage;
...@@ -306,17 +370,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName, ...@@ -306,17 +370,20 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
uint64_t n = numOps; uint64_t n = numOps;
std::atomic<uint64_t> sum(0); std::atomic<uint64_t> sum(0);
std::atomic<uint64_t> failed(0);
std::vector<std::thread> producers(numProducers); vector<std::thread> producers(numProducers);
for (int t = 0; t < numProducers; ++t) { for (int t = 0; t < numProducers; ++t) {
producers[t] = DSched::thread([&,t]{ producers[t] = DSched::thread([&,t]{
for (int i = t; i < numOps; i += numProducers) { for (int i = t; i < numOps; i += numProducers) {
q.blockingWrite(i); while (!writer.callWrite(q, i)) {
++failed;
}
} }
}); });
} }
std::vector<std::thread> consumers(numConsumers); vector<std::thread> consumers(numConsumers);
for (int t = 0; t < numConsumers; ++t) { for (int t = 0; t < numConsumers; ++t) {
consumers[t] = DSched::thread([&,t]{ consumers[t] = DSched::thread([&,t]{
uint64_t localSum = 0; uint64_t localSum = 0;
...@@ -348,27 +415,76 @@ std::string producerConsumerBench(Q&& queue, std::string qName, ...@@ -348,27 +415,76 @@ std::string producerConsumerBench(Q&& queue, std::string qName,
uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n; uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw - long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw); (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
uint64_t failures = failed;
return folly::format( return folly::sformat(
"{}, {} producers, {} consumers => {} nanos/handoff, {} csw / {} handoff", "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
qName, numProducers, numConsumers, nanosPer, csw, n).str(); "handoff, {} failures",
qName,
numProducers,
writer.methodName(),
numConsumers,
nanosPer,
csw,
n,
failures);
} }
TEST(MPMCQueue, mt_prod_cons_deterministic) { TEST(MPMCQueue, mt_prod_cons_deterministic) {
// we use the Bench method, but perf results are meaningless under DSched // we use the Bench method, but perf results are meaningless under DSched
DSched sched(DSched::uniform(0)); DSched sched(DSched::uniform(0));
producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10), vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
"", 1, 1, 1000); callers;
producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100), callers.emplace_back(
"", 10, 10, 1000); make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(10), callers.emplace_back(
"", 1, 1, 1000); make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(100), callers.emplace_back(
"", 10, 10, 1000); make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
producerConsumerBench(MPMCQueue<int,DeterministicAtomic>(1), callers.emplace_back(
"", 10, 10, 1000); make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
milliseconds(1)));
callers.emplace_back(
make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
seconds(2)));
for (const auto& caller : callers) {
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
"MPMCQueue<int, DeterministicAtomic>(10)",
1,
1,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
"MPMCQueue<int, DeterministicAtomic>(100)",
10,
10,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
"MPMCQueue<int, DeterministicAtomic>(10)",
1,
1,
1000,
*caller);
LOG(INFO)
<< producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
"MPMCQueue<int, DeterministicAtomic>(100)",
10,
10,
1000,
*caller);
LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
"MPMCQueue<int, DeterministicAtomic>(1)",
10,
10,
1000,
*caller);
}
} }
#define PC_BENCH(q, np, nc, ...) \ #define PC_BENCH(q, np, nc, ...) \
...@@ -376,38 +492,71 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) { ...@@ -376,38 +492,71 @@ TEST(MPMCQueue, mt_prod_cons_deterministic) {
TEST(MPMCQueue, mt_prod_cons) { TEST(MPMCQueue, mt_prod_cons) {
int n = 100000; int n = 100000;
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n); vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n); callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n); callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n); callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n); callers.emplace_back(
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n); make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n); callers.emplace_back(
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n); make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n); for (const auto& caller : callers) {
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
}
} }
TEST(MPMCQueue, mt_prod_cons_emulated_futex) { TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
int n = 100000; int n = 100000;
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n); vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n); callers;
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n); callers.emplace_back(
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n); make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n); callers.emplace_back(
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n); make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n); callers.emplace_back(
LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n); make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
LOG(INFO) callers.emplace_back(
<< PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n); make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
} milliseconds(1)));
callers.emplace_back(
template <template<typename> class Atom> make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
void runNeverFailThread( seconds(2)));
int numThreads, for (const auto& caller : callers) {
int n, /*numOps*/ LOG(INFO) << PC_BENCH(
MPMCQueue<int, Atom>& cq, (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
std::atomic<uint64_t>& sum, LOG(INFO) << PC_BENCH(
int t) { (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
LOG(INFO) << PC_BENCH(
(MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
}
}
template <template <typename> class Atom>
void runNeverFailThread(int numThreads,
int n, /*numOps*/
MPMCQueue<int, Atom>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0; uint64_t threadSum = 0;
for (int i = t; i < n; i += numThreads) { for (int i = t; i < n; i += numThreads) {
// enq + deq // enq + deq
...@@ -421,19 +570,23 @@ void runNeverFailThread( ...@@ -421,19 +570,23 @@ void runNeverFailThread(
sum += threadSum; sum += threadSum;
} }
template <template<typename> class Atom> template <template <typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) { uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq // always #enq >= #deq
MPMCQueue<int,Atom> cq(numThreads); MPMCQueue<int, Atom> cq(numThreads);
uint64_t n = numOps; uint64_t n = numOps;
auto beginMicro = nowMicro(); auto beginMicro = nowMicro();
std::vector<std::thread> threads(numThreads); vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0); std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) { for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>, threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
numThreads, n, std::ref(cq), std::ref(sum), t)); numThreads,
n,
std::ref(cq),
std::ref(sum),
t));
} }
for (auto& t : threads) { for (auto& t : threads) {
DSched::join(t); DSched::join(t);
...@@ -445,29 +598,29 @@ uint64_t runNeverFailTest(int numThreads, int numOps) { ...@@ -445,29 +598,29 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
} }
TEST(MPMCQueue, mt_never_fail) { TEST(MPMCQueue, mt_never_fail) {
int nts[] = { 1, 3, 100 }; int nts[] = {1, 3, 100};
int n = 100000; int n = 100000;
for (int nt : nts) { for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n); uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< nt << " threads"; << " threads";
} }
} }
TEST(MPMCQueue, mt_never_fail_emulated_futex) { TEST(MPMCQueue, mt_never_fail_emulated_futex) {
int nts[] = { 1, 3, 100 }; int nts[] = {1, 3, 100};
int n = 100000; int n = 100000;
for (int nt : nts) { for (int nt : nts) {
uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n); uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< nt << " threads"; << " threads";
} }
} }
TEST(MPMCQueue, mt_never_fail_deterministic) { TEST(MPMCQueue, mt_never_fail_deterministic) {
int nts[] = { 3, 10 }; int nts[] = {3, 10};
long seed = 0; // nowMicro() % 10000; long seed = 0; // nowMicro() % 10000;
LOG(INFO) << "using seed " << seed; LOG(INFO) << "using seed " << seed;
...@@ -485,6 +638,77 @@ TEST(MPMCQueue, mt_never_fail_deterministic) { ...@@ -485,6 +638,77 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
} }
} }
template <class Clock, template <typename> class Atom>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
MPMCQueue<int, Atom>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
for (int i = t; i < n; i += numThreads) {
// enq + deq
auto soon = Clock::now() + std::chrono::seconds(1);
EXPECT_TRUE(cq.tryWriteUntil(soon, i));
int dest = -1;
EXPECT_TRUE(cq.readIfNotEmpty(dest));
EXPECT_TRUE(dest >= 0);
threadSum += dest;
}
sum += threadSum;
}
template <class Clock, template <typename> class Atom>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
MPMCQueue<int, Atom> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
numThreads,
n,
std::ref(cq),
std::ref(sum),
t));
}
for (auto& t : threads) {
DSched::join(t);
}
EXPECT_TRUE(cq.isEmpty());
EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
return nowMicro() - beginMicro;
}
TEST(MPMCQueue, mt_never_fail_until_system) {
int nts[] = {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed =
runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
TEST(MPMCQueue, mt_never_fail_until_steady) {
int nts[] = {1, 3, 100};
int n = 100000;
for (int nt : nts) {
uint64_t elapsed =
runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
}
}
enum LifecycleEvent { enum LifecycleEvent {
NOTHING = -1, NOTHING = -1,
DEFAULT_CONSTRUCTOR, DEFAULT_CONSTRUCTOR,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment