Commit c2cf8102 authored by Miroslav Crnic's avatar Miroslav Crnic Committed by Facebook Github Bot

NotificationQueueBenchmark multi consumer benchmarks

Summary:
NotificationQueue, although mostly not used as such, is a multi consumer queue and it's missing benchmarks for that part.

There where also a couple of issues with original benchmark which this diff addresses:
1. Benchamrks used to do N * number of producers number of iterations and didn't report increased iteration count back to benchmark
2. Benchmark would not run the same case during whole benchmark time as producers where stopped as soon as they produced enough request and consumer would finish without contention

Since these are very tight loops I added a busy loop param which can be varied the same way as batching parameter.
This param adds some work in a form of a busy loop in both producers and consumer to make benchmarking cases, where producers/consumers are actually doing something except from posting to the queue, easier.

Reviewed By: yfeldblum

Differential Revision: D16071580

fbshipit-source-id: c75f2dd9ae8617be09834cdf153a4b4ae06aaaca
parent 8bb0525b
......@@ -14,17 +14,19 @@
* limitations under the License.
*/
#include <algorithm>
#include <thread>
#include <folly/Benchmark.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/synchronization/Baton.h>
#include <condition_variable>
#include <mutex>
#include <thread>
using namespace folly;
static size_t constexpr kMaxRead = 20;
static size_t constexpr kProducerWarmup = 1000;
static size_t constexpr kBusyLoopSize = 0;
class MockConsumer : public NotificationQueue<Func>::Consumer {
public:
......@@ -33,53 +35,86 @@ class MockConsumer : public NotificationQueue<Func>::Consumer {
}
};
void runTest(int iters, int numThreads) {
BenchmarkSuspender susp;
EventBase evb;
evb.setMaxReadAtOnce(kMaxRead);
std::mutex m;
std::condition_variable cv;
int numRunning = 0;
int numProcessed = 0;
int numTotal = iters * numThreads;
std::vector<std::thread> threads;
for (int i = 0; i < numThreads; i++) {
threads.push_back(std::thread([&]() mutable {
// wait for all the threads to start up
bool notifyAll = false;
{
std::lock_guard<std::mutex> lk(m);
if (++numRunning == numThreads) {
notifyAll = true;
susp.dismiss();
}
static void burn(size_t n) {
for (size_t i = 0; i < n; ++i) {
folly::doNotOptimizeAway(i);
}
}
if (notifyAll) {
cv.notify_all();
} else {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&]() { return numRunning == numThreads; });
void multiProducerMultiConsumer(
int iters,
size_t numProducers,
size_t numConsumers) {
BenchmarkSuspender susp;
NotificationQueue<Func> queue;
std::vector<std::unique_ptr<EventBase>> consumerEventBases;
std::vector<std::thread> consumerThreads;
// Initialize consumers
for (size_t i = 0; i < numConsumers; ++i) {
// construct base without time measurements
consumerEventBases.emplace_back(std::make_unique<EventBase>(false));
EventBase& base = *consumerEventBases.back();
consumerThreads.emplace_back([&base, &queue]() mutable {
base.setMaxReadAtOnce(kMaxRead);
MockConsumer consumer;
consumer.startConsuming(&base, &queue);
base.loopForever();
});
}
for (auto j = 0; j < iters; j++) {
evb.runInEventBaseThread([&]() mutable {
if (++numProcessed == numTotal) {
evb.terminateLoopSoon();
;
std::vector<std::thread> producerThreads;
std::atomic<size_t> producersWarmedUp{0};
// Needs to be less than 0 so that consumers don't reach 0 during warm up
std::atomic<ssize_t> itemsToProcess{-1};
std::atomic<bool> stop_producing{false};
Baton warmUpBaton;
Baton finishedBaton;
// Initialize producers and warm up both producers and consumers
// during warm up producers produce kProducerWarmup tasks each and consumers
// try to consume as much as possible
for (size_t i = 0; i < numProducers; ++i) {
producerThreads.emplace_back(std::thread([numProducers,
&warmUpBaton,
&queue,
&producersWarmedUp,
&itemsToProcess,
&stop_producing,
&finishedBaton]() mutable {
size_t num_produced{0};
while (!stop_producing.load(std::memory_order_relaxed)) {
burn(kBusyLoopSize);
if (num_produced++ == kProducerWarmup &&
numProducers ==
producersWarmedUp.fetch_add(1, std::memory_order_relaxed) + 1) {
warmUpBaton.post();
}
queue.putMessage([&itemsToProcess, &finishedBaton]() {
burn(kBusyLoopSize);
if (itemsToProcess.fetch_sub(1, std::memory_order_relaxed) == 0) {
finishedBaton.post();
}
});
}
}));
}
evb.loopForever();
warmUpBaton.wait();
susp.dismiss();
// This sets itemsToProcess to desired iterations. Consumers reduce it with
// every task. One which reduces it to 0 notifies via finishedBaton.
itemsToProcess.store(iters, std::memory_order_relaxed);
finishedBaton.wait();
susp.rehire();
for (auto& t : threads) {
t.join();
// Stop producers
stop_producing.store(true, std::memory_order_relaxed);
for (auto& producerThread : producerThreads) {
producerThread.join();
}
// Stop consumers
for (auto& consumerEventBase : consumerEventBases) {
consumerEventBase->terminateLoopSoon();
}
for (auto& consumerThread : consumerThreads) {
consumerThread.join();
}
}
......@@ -114,12 +149,49 @@ BENCHMARK(DequeueBenchmark, n) {
consumer.stopConsuming();
}
BENCHMARK_PARAM(runTest, 1)
BENCHMARK_PARAM(runTest, 2)
BENCHMARK_PARAM(runTest, 4)
BENCHMARK_PARAM(runTest, 8)
BENCHMARK_PARAM(runTest, 16)
BENCHMARK_PARAM(runTest, 32)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p__1c, 1, 1)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p__1c, 2, 1)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p__1c, 4, 1)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p__1c, 8, 1)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p__1c, 16, 1)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p__1c, 32, 1)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p__2c, 1, 2)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p__2c, 2, 2)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p__2c, 4, 2)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p__2c, 8, 2)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p__2c, 16, 2)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p__2c, 32, 2)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p__4c, 1, 4)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p__4c, 2, 4)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p__4c, 4, 4)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p__4c, 8, 4)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p__4c, 16, 4)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p__4c, 32, 4)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p__8c, 1, 8)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p__8c, 2, 8)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p__8c, 4, 8)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p__8c, 8, 8)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p__8c, 16, 8)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p__8c, 32, 8)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p_16c, 1, 16)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p_16c, 2, 16)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p_16c, 4, 16)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p_16c, 8, 16)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p_16c, 16, 16)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p_16c, 32, 16)
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _1p_32c, 1, 32)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _2p_32c, 2, 32)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _4p_32c, 4, 32)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, _8p_32c, 8, 32)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 16p_32c, 16, 32)
BENCHMARK_NAMED_PARAM(multiProducerMultiConsumer, 32p_32c, 32, 32)
BENCHMARK_DRAW_LINE();
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment