Commit 3dbba689 authored by gabime's avatar gabime

replaced the lockfree queue with bounded, locked queue

parent b1a58cd3
......@@ -7,8 +7,7 @@
// bench.cpp : spdlog benchmarks
//
#include "spdlog/async_logger.h"
#include "spdlog/sinks/file_sinks.h"
#include "spdlog/sinks/null_sink.h"
#include "spdlog/sinks/test_sink.h"
#include "spdlog/spdlog.h"
#include "utils.h"
#include <atomic>
......@@ -30,12 +29,10 @@ void bench_mt(int howmany, std::shared_ptr<spdlog::logger> log, int thread_count
int main(int argc, char *argv[])
{
int queue_size = 1048576;
int queue_size = 1024*1024;
int howmany = 1000000;
int threads = 10;
int file_size = 30 * 1024 * 1024;
int rotating_files = 5;
try
{
......@@ -45,7 +42,7 @@ int main(int argc, char *argv[])
threads = atoi(argv[2]);
if (argc > 3)
queue_size = atoi(argv[3]);
/*
cout << "*******************************************************************************\n";
cout << "Single thread, " << format(howmany) << " iterations" << endl;
cout << "*******************************************************************************\n";
......@@ -67,17 +64,32 @@ int main(int argc, char *argv[])
bench_mt(howmany, daily_mt, threads);
bench(howmany, spdlog::create<null_sink_st>("null_mt"));
*/
cout << "\n*******************************************************************************\n";
cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl;
cout << "*******************************************************************************\n";
spdlog::set_async_mode(queue_size);
for (int i = 0; i < 3; ++i)
for (int i = 0; i < 300; ++i)
{
auto as = spdlog::daily_logger_st("as", "logs/daily_async.log");
bench_mt(howmany, as, threads);
//auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log");
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
//auto as = spdlog::basic_logger_mt("as", "logs/async.log", true);
auto as = std::make_shared<spdlog::async_logger>("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000));
bench_mt(howmany, as, threads);
as.reset();
spdlog::drop("as");
auto msg_counter = test_sink->msg_counter();
cout << "Count:" << msg_counter << endl;
if (msg_counter != howmany)
{
cout << "ERROR! Expected " << howmany;
exit(0);
}
}
}
catch (std::exception &ex)
......@@ -119,6 +131,7 @@ void bench_mt(int howmany, std::shared_ptr<spdlog::logger> log, int thread_count
if (counter > howmany)
break;
log->info("Hello logger: msg number {}", counter);
//std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}));
}
......
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
......@@ -10,9 +10,6 @@
<Platform>Win32</Platform>
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ClCompile Include="example.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\include\spdlog\async_logger.h" />
<ClInclude Include="..\include\spdlog\common.h" />
......@@ -21,7 +18,6 @@
<ClInclude Include="..\include\spdlog\details\file_helper.h" />
<ClInclude Include="..\include\spdlog\details\logger_impl.h" />
<ClInclude Include="..\include\spdlog\details\log_msg.h" />
<ClInclude Include="..\include\spdlog\details\mpmc_bounded_q.h" />
<ClInclude Include="..\include\spdlog\details\null_mutex.h" />
<ClInclude Include="..\include\spdlog\details\os.h" />
<ClInclude Include="..\include\spdlog\details\pattern_formatter_impl.h" />
......@@ -46,23 +42,26 @@
<ClInclude Include="..\include\spdlog\spdlog.h" />
<ClInclude Include="..\include\spdlog\tweakme.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="bench.cpp" />
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{9E5AB93A-0CCE-4BAC-9FCB-0FC9CB5EB8D2}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>.</RootNamespace>
<WindowsTargetPlatformVersion>8.1</WindowsTargetPlatformVersion>
<WindowsTargetPlatformVersion>10.0.16299.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
......
......@@ -14,11 +14,12 @@
#include "../common.h"
#include "../details/log_msg.h"
#include "../details/mpmc_bounded_q.h"
#include "../details/mpmc_blocking_q.h"
#include "../details/os.h"
#include "../formatter.h"
#include "../sinks/sink.h"
#include<iostream>
#include <chrono>
#include <exception>
#include <functional>
......@@ -27,367 +28,333 @@
#include <thread>
#include <utility>
#include <vector>
#include <condition_variable>
namespace spdlog {
namespace details {
class async_log_helper
{
// Async msg to move to/from the queue
// Movable only. should never be copied
enum class async_msg_type
{
log,
flush,
terminate
};
struct async_msg
{
std::string logger_name;
level::level_enum level;
log_clock::time_point time;
size_t thread_id;
std::string txt;
async_msg_type msg_type;
size_t msg_id;
async_msg() = default;
~async_msg() = default;
explicit async_msg(async_msg_type m_type)
: level(level::info)
, thread_id(0)
, msg_type(m_type)
, msg_id(0)
{
}
async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)),
level(std::move(other.level)),
time(std::move(other.time)),
thread_id(other.thread_id),
txt(std::move(other.txt)),
msg_type(std::move(other.msg_type)),
msg_id(other.msg_id)
{
}
async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT
{
logger_name = std::move(other.logger_name);
level = other.level;
time = std::move(other.time);
thread_id = other.thread_id;
txt = std::move(other.txt);
msg_type = other.msg_type;
msg_id = other.msg_id;
return *this;
}
// never copy or assign. should only be moved..
async_msg(const async_msg &) = delete;
async_msg &operator=(const async_msg &other) = delete;
// construct from log_msg
explicit async_msg(const details::log_msg &m)
: level(m.level)
, time(m.time)
, thread_id(m.thread_id)
, txt(m.raw.data(), m.raw.size())
, msg_type(async_msg_type::log)
, msg_id(m.msg_id)
{
namespace details {
class async_log_helper
{
// Async msg to move to/from the queue
// Movable only. should never be copied
enum class async_msg_type
{
log,
flush,
terminate
};
struct async_msg
{
std::string logger_name;
level::level_enum level;
log_clock::time_point time;
size_t thread_id;
std::string txt;
async_msg_type msg_type;
size_t msg_id;
async_msg() = default;
~async_msg() = default;
explicit async_msg(async_msg_type m_type)
: level(level::info)
, thread_id(0)
, msg_type(m_type)
, msg_id(0)
{
}
async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)),
level(std::move(other.level)),
time(std::move(other.time)),
thread_id(other.thread_id),
txt(std::move(other.txt)),
msg_type(std::move(other.msg_type)),
msg_id(other.msg_id)
{
}
async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT
{
logger_name = std::move(other.logger_name);
level = other.level;
time = std::move(other.time);
thread_id = other.thread_id;
txt = std::move(other.txt);
msg_type = other.msg_type;
msg_id = other.msg_id;
return *this;
}
// never copy or assign. should only be moved..
async_msg(const async_msg &) = delete;
async_msg &operator=(const async_msg &other) = delete;
// construct from log_msg
explicit async_msg(const details::log_msg &m)
: level(m.level)
, time(m.time)
, thread_id(m.thread_id)
, txt(m.raw.data(), m.raw.size())
, msg_type(async_msg_type::log)
, msg_id(m.msg_id)
{
#ifndef SPDLOG_NO_NAME
logger_name = *m.logger_name;
logger_name = *m.logger_name;
#endif
}
}
// copy into log_msg
void fill_log_msg(log_msg &msg)
{
msg.logger_name = &logger_name;
msg.level = level;
msg.time = time;
msg.thread_id = thread_id;
msg.raw << txt;
msg.msg_id = msg_id;
}
};
// copy into log_msg
void fill_log_msg(log_msg &msg)
{
msg.logger_name = &logger_name;
msg.level = level;
msg.time = time;
msg.thread_id = thread_id;
msg.raw << txt;
msg.msg_id = msg_id;
}
};
public:
using item_type = async_msg;
using q_type = details::mpmc_bounded_queue<item_type>;
public:
using item_type = async_msg;
using q_type = details::mpmc_bounded_queue <item_type>;
using clock = std::chrono::steady_clock;
using clock = std::chrono::steady_clock;
async_log_helper(formatter_ptr formatter, std::vector<sink_ptr> sinks, size_t queue_size, const log_err_handler err_handler,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function<void()> worker_warmup_cb = nullptr,
const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(),
std::function<void()> worker_teardown_cb = nullptr);
async_log_helper(formatter_ptr formatter, std::vector<sink_ptr> sinks, size_t queue_size, const log_err_handler err_handler,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function<void()> worker_warmup_cb = nullptr,
const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(),
std::function<void()> worker_teardown_cb = nullptr);
void log(const details::log_msg &msg);
void log(const details::log_msg &msg);
// stop logging and join the back thread
~async_log_helper();
// stop logging and join the back thread
~async_log_helper();
async_log_helper(const async_log_helper &) = delete;
async_log_helper &operator=(const async_log_helper &) = delete;
async_log_helper(const async_log_helper &) = delete;
async_log_helper &operator=(const async_log_helper &) = delete;
void set_formatter(formatter_ptr msg_formatter);
void set_formatter(formatter_ptr msg_formatter);
void flush(bool wait_for_q);
void flush();
void set_error_handler(spdlog::log_err_handler err_handler);
void set_error_handler(spdlog::log_err_handler err_handler);
private:
formatter_ptr _formatter;
std::vector<std::shared_ptr<sinks::sink>> _sinks;
private:
formatter_ptr _formatter;
std::vector<std::shared_ptr<sinks::sink>> _sinks;
// queue of messages to log
q_type _q;
// queue of messages to log
q_type _q;
log_err_handler _err_handler;
log_err_handler _err_handler;
bool _flush_requested;
std::chrono::time_point<log_clock> _last_flush;
bool _terminate_requested;
// overflow policy
const async_overflow_policy _overflow_policy;
// overflow policy
const async_overflow_policy _overflow_policy;
// worker thread warmup callback - one can set thread priority, affinity, etc
const std::function<void()> _worker_warmup_cb;
// worker thread warmup callback - one can set thread priority, affinity, etc
const std::function<void()> _worker_warmup_cb;
// auto periodic sink flush parameter
const std::chrono::milliseconds _flush_interval_ms;
// auto periodic sink flush parameter
const std::chrono::milliseconds _flush_interval_ms;
// worker thread teardown callback
const std::function<void()> _worker_teardown_cb;
// worker thread teardown callback
const std::function<void()> _worker_teardown_cb;
std::mutex null_mutex_;
//null_mutex null_mutex_;
std::condition_variable_any not_empty_cv_;
std::condition_variable_any not_full_cv_;
// worker thread
std::thread _worker_thread;
// worker thread
std::thread _worker_thread;
void push_msg(async_msg &&new_msg);
void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy);
// worker thread main loop
void worker_loop();
// worker thread main loop
void worker_loop();
// pop next message from the queue and process it. will set the last_pop to the pop time
// return false if termination of the queue is required
bool process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush);
// dequeue next message from the queue and process it.
// return false if termination of the queue is required
bool process_next_msg();
void handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush);
void handle_flush_interval();
// sleep,yield or return immediately using the time passed since last message as a hint
static void sleep_or_yield(const spdlog::log_clock::time_point &now, const log_clock::time_point &last_op_time);
// wait until the queue is empty
void wait_empty_q();
};
} // namespace details
void flush_sinks();
};
} // namespace details
} // namespace spdlog
///////////////////////////////////////////////////////////////////////////////
// async_sink class implementation
///////////////////////////////////////////////////////////////////////////////
inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr formatter, std::vector<sink_ptr> sinks, size_t queue_size,
log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function<void()> worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, std::function<void()> worker_teardown_cb)
: _formatter(std::move(formatter))
, _sinks(std::move(sinks))
, _q(queue_size)
, _err_handler(std::move(err_handler))
, _flush_requested(false)
, _terminate_requested(false)
, _overflow_policy(overflow_policy)
, _worker_warmup_cb(std::move(worker_warmup_cb))
, _flush_interval_ms(flush_interval_ms)
, _worker_teardown_cb(std::move(worker_teardown_cb))
log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function<void()> worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, std::function<void()> worker_teardown_cb)
: _formatter(std::move(formatter))
, _sinks(std::move(sinks))
, _q(queue_size)
, _err_handler(std::move(err_handler))
, _last_flush(os::now())
, _overflow_policy(overflow_policy)
, _worker_warmup_cb(std::move(worker_warmup_cb))
, _flush_interval_ms(flush_interval_ms)
, _worker_teardown_cb(std::move(worker_teardown_cb))
{
_worker_thread = std::thread(&async_log_helper::worker_loop, this);
_worker_thread = std::thread(&async_log_helper::worker_loop, this);
}
// Send to the worker thread termination message(level=off)
// and wait for it to finish gracefully
inline spdlog::details::async_log_helper::~async_log_helper()
{
try
{
push_msg(async_msg(async_msg_type::terminate));
_worker_thread.join();
}
catch (...) // don't crash in destructor
{
}
try
{
enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
_worker_thread.join();
}
catch (...) // don't crash in destructor
{
}
}
// Try to push and block until succeeded (if the policy is not to discard when the queue is full)
inline void spdlog::details::async_log_helper::log(const details::log_msg &msg)
{
push_msg(async_msg(msg));
{
enqueue_msg(async_msg(msg), _overflow_policy);
}
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg &&new_msg)
inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy)
{
if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
{
auto last_op_time = details::os::now();
auto now = last_op_time;
do
{
now = details::os::now();
sleep_or_yield(now, last_op_time);
} while (!_q.enqueue(std::move(new_msg)));
}
// block until succeeded pushing to the queue
if (policy == async_overflow_policy::block_retry)
{
_q.enqueue(std::move(new_msg));
}
else
{
_q.enqueue_nowait(std::move(new_msg));
}
}
// optionally wait for the queue be empty and request flush from the sinks
inline void spdlog::details::async_log_helper::flush(bool wait_for_q)
inline void spdlog::details::async_log_helper::flush()
{
push_msg(async_msg(async_msg_type::flush));
if (wait_for_q)
{
wait_empty_q(); // return when queue is empty
}
enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy);
}
inline void spdlog::details::async_log_helper::worker_loop()
{
if (_worker_warmup_cb)
{
_worker_warmup_cb();
}
auto last_pop = details::os::now();
auto last_flush = last_pop;
auto active = true;
while (active)
{
try
{
active = process_next_msg(last_pop, last_flush);
}
catch (const std::exception &ex)
{
_err_handler(ex.what());
}
catch (...)
{
_err_handler("Unknown exeption in async logger worker loop.");
}
}
if (_worker_teardown_cb)
{
_worker_teardown_cb();
}
if (_worker_warmup_cb)
{
_worker_warmup_cb();
}
auto active = true;
while (active)
{
try
{
active = process_next_msg();
}
catch (const std::exception &ex)
{
_err_handler(ex.what());
}
catch (...)
{
_err_handler("Unknown exeption in async logger worker loop.");
}
}
if (_worker_teardown_cb)
{
_worker_teardown_cb();
}
}
// process next message in the queue
// return true if this thread should still be active (while no terminate msg was received)
inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush)
inline bool spdlog::details::async_log_helper::process_next_msg()
{
async_msg incoming_async_msg;
if (_q.dequeue(incoming_async_msg))
{
last_pop = details::os::now();
switch (incoming_async_msg.msg_type)
{
case async_msg_type::flush:
_flush_requested = true;
break;
case async_msg_type::terminate:
_flush_requested = true;
_terminate_requested = true;
break;
default:
log_msg incoming_log_msg;
incoming_async_msg.fill_log_msg(incoming_log_msg);
_formatter->format(incoming_log_msg);
for (auto &s : _sinks)
{
if (s->should_log(incoming_log_msg.level))
{
s->log(incoming_log_msg);
}
}
}
return true;
}
// Handle empty queue..
// This is the only place where the queue can terminate or flush to avoid losing messages already in the queue
auto now = details::os::now();
handle_flush_interval(now, last_flush);
sleep_or_yield(now, last_pop);
return !_terminate_requested;
async_msg incoming_async_msg;
bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::milliseconds(1000));
if (!dequeued)
{
handle_flush_interval();
return true;
}
switch (incoming_async_msg.msg_type)
{
case async_msg_type::flush:
flush_sinks();
return true;
case async_msg_type::terminate:
//flush_sinks();
return false;
default:
log_msg incoming_log_msg;
incoming_async_msg.fill_log_msg(incoming_log_msg);
_formatter->format(incoming_log_msg);
for (auto &s : _sinks)
{
if (s->should_log(incoming_log_msg.level))
{
s->log(incoming_log_msg);
}
}
return true;
}
assert(false);
return true; // should not be reached
}
// flush all sinks if _flush_interval_ms has expired
inline void spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush)
{
auto should_flush =
_flush_requested || (_flush_interval_ms != std::chrono::milliseconds::zero() && now - last_flush >= _flush_interval_ms);
if (should_flush)
{
for (auto &s : _sinks)
{
s->flush();
}
now = last_flush = details::os::now();
_flush_requested = false;
}
// flush all sinks if _flush_interval_ms has expired. only called if queue is empty
inline void spdlog::details::async_log_helper::handle_flush_interval()
{
if (_flush_interval_ms == std::chrono::milliseconds::zero())
{
return;
}
auto delta = details::os::now() - _last_flush;;
if (delta >= _flush_interval_ms)
{
flush_sinks();
}
}
inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter)
{
_formatter = std::move(msg_formatter);
_formatter = std::move(msg_formatter);
}
// spin, yield or sleep. use the time passed since last message as a hint
inline void spdlog::details::async_log_helper::sleep_or_yield(
const spdlog::log_clock::time_point &now, const spdlog::log_clock::time_point &last_op_time)
{
using std::chrono::microseconds;
using std::chrono::milliseconds;
auto time_since_op = now - last_op_time;
// spin upto 50 micros
if (time_since_op <= microseconds(50))
{
return;
}
// yield upto 150 micros
if (time_since_op <= microseconds(100))
{
return std::this_thread::yield();
}
// sleep for 20 ms upto 200 ms
if (time_since_op <= milliseconds(200))
{
return details::os::sleep_for_millis(20);
}
// sleep for 500 ms
return details::os::sleep_for_millis(500);
}
// wait for the queue to be empty
inline void spdlog::details::async_log_helper::wait_empty_q()
inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler)
{
auto last_op = details::os::now();
while (!_q.is_empty())
{
sleep_or_yield(details::os::now(), last_op);
}
_err_handler = std::move(err_handler);
}
inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler)
// flush all sinks if _flush_interval_ms has expired. only called if queue is empty
inline void spdlog::details::async_log_helper::flush_sinks()
{
_err_handler = std::move(err_handler);
printf("FLUSH!\n");
for (auto &s : _sinks)
{
s->flush();
}
_last_flush = os::now();
}
......@@ -44,7 +44,7 @@ inline spdlog::async_logger::async_logger(const std::string &logger_name, sink_p
inline void spdlog::async_logger::flush()
{
_async_log_helper->flush(false);//dont wat for the q to draing before returning from flush
_async_log_helper->flush();
}
// Error handler
......@@ -80,7 +80,7 @@ inline void spdlog::async_logger::_sink_it(details::log_msg &msg)
_async_log_helper->log(msg);
if (_should_flush_on(msg))
{
_async_log_helper->flush(false); // do async flush
_async_log_helper->flush(); // do async flush
}
}
catch (const std::exception &ex)
......
#pragma once
//
// Copyright(c) 2018 Gabi Melman.
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
//
// async log helper :
// multi producer-multi consumer blocking queue
// enqueue(..) - will block until room found to put the new message
// enqueue_nowait(..) - will return immediatly with false if no room left in the queue
// dequeue_for(..) - will block until the queue is not empty or timeout passed
#include <condition_variable>
#include <mutex>
#include <queue>
namespace spdlog {
namespace details {
template<typename T>
class mpmc_bounded_queue
{
public:
using item_type = T;
explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {}
// try to enqueue and block if no room left
void enqueue(T &&item)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; });
q_.push(std::forward<T>(item));
}
push_cv_.notify_one();
}
// try to enqueue and return immdeialty false if no room left
bool enqueue_nowait(T &&item)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (this->q_.size() >= this->max_items_)
{
return false;
}
q_.push(std::forward<T>(item));
}
push_cv_.notify_one();
return true;
}
// try to dequeue item. if no item found. wait upto timeout and try again
// Return true, if succeeded dequeue item, false otherwise
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
//push_cv_.wait(lock, [this] {return this->q_.size() > 0; });
bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; });
if (!found_msg)
{
return false;
}
popped_item = std::move(q_.front());
q_.pop();
}
pop_cv_.notify_one();
return true;
}
private:
size_t max_items_;
std::mutex queue_mutex_;
std::condition_variable push_cv_;
std::condition_variable pop_cv_;
std::queue<T> q_;
};
}
}
/*
A modified version of Bounded MPMC queue by Dmitry Vyukov.
Original code from:
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
licensed by Dmitry Vyukov under the terms below:
Simplified BSD license
Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list
of conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those of the authors and
should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov.
*/
/*
The code in its current form adds the license below:
Copyright(c) 2015 Gabi Melman.
Distributed under the MIT License (http://opensource.org/licenses/MIT)
*/
#pragma once
#include "../common.h"
#include <atomic>
#include <utility>
namespace spdlog {
namespace details {
template<typename T>
class mpmc_bounded_queue
{
public:
using item_type = T;
explicit mpmc_bounded_queue(size_t buffer_size)
: max_size_(buffer_size)
, buffer_(new cell_t[buffer_size])
, buffer_mask_(buffer_size - 1)
{
// queue size must be power of two
if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
{
throw spdlog_ex("async logger queue size must be power of two");
}
for (size_t i = 0; i != buffer_size; i += 1)
{
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
}
enqueue_pos_.store(0, std::memory_order_relaxed);
dequeue_pos_.store(0, std::memory_order_relaxed);
}
~mpmc_bounded_queue()
{
delete[] buffer_;
}
mpmc_bounded_queue(mpmc_bounded_queue const &) = delete;
void operator=(mpmc_bounded_queue const &) = delete;
bool enqueue(T &&data)
{
cell_t *cell;
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
if (dif == 0)
{
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
{
break;
}
}
else if (dif < 0)
{
return false;
}
else
{
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
}
cell->data_ = std::move(data);
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T &data)
{
cell_t *cell;
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0)
{
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
{
break;
}
}
else if (dif < 0)
{
return false;
}
else
{
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
}
data = std::move(cell->data_);
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
bool is_empty()
{
size_t front, front1, back;
// try to take a consistent snapshot of front/tail.
do
{
front = enqueue_pos_.load(std::memory_order_acquire);
back = dequeue_pos_.load(std::memory_order_acquire);
front1 = enqueue_pos_.load(std::memory_order_relaxed);
} while (front != front1);
return back == front;
}
private:
struct cell_t
{
std::atomic<size_t> sequence_;
T data_;
};
size_t const max_size_;
static size_t const cacheline_size = 64;
using cacheline_pad_t = char[cacheline_size];
cacheline_pad_t pad0_;
cell_t *const buffer_;
size_t const buffer_mask_;
cacheline_pad_t pad1_;
std::atomic<size_t> enqueue_pos_;
cacheline_pad_t pad2_;
std::atomic<size_t> dequeue_pos_;
cacheline_pad_t pad3_;
};
} // namespace details
} // namespace spdlog
//
// Copyright(c) 2015 Gabi Melman.
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
//
#pragma once
#include "../details/null_mutex.h"
#include "base_sink.h"
#include <mutex>
namespace spdlog {
namespace sinks {
template<class Mutex>
class test_sink : public base_sink<Mutex>
{
public:
size_t msg_counter()
{
return msg_counter_;
}
protected:
void _sink_it(const details::log_msg &) override
{
msg_counter_++;
}
void _flush() override {}
size_t msg_counter_{ 0 };
};
using test_sink_mt = test_sink<std::mutex>;
using test_sink_st = test_sink<details::null_mutex>;
} // namespace sinks
} // namespace spdlog
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment