Commit 68104e03 authored by Chad Austin's avatar Chad Austin Committed by Facebook GitHub Bot

default LockFreeRingBuffer to trivial and make it compatible with TSAN

Summary:
LockFreeRingBuffer is racy for nontrivial types. I could reproduce
ASAN failures with nontrivial types like std::string, and TSAN
failures with trivial types.

Require triviality by default, and sequence the turns, loads, and
stores with atomic_thread_fence.

Reviewed By: yfeldblum

Differential Revision: D27684743

fbshipit-source-id: e0b32855d3ed5c2edbd7ab72092f2c533071a70f
parent 15ee6d71
......@@ -27,12 +27,23 @@
#include <folly/Traits.h>
#include <folly/detail/TurnSequencer.h>
#include <folly/portability/Unistd.h>
#include <folly/synchronization/SanitizeThread.h>
namespace folly {
namespace detail {
template <typename T, template <typename> class Atom>
template <
typename T,
template <typename>
class Atom,
template <typename>
class Storage>
class RingBufferSlot;
template <typename T>
class RingBufferTrivialStorage;
template <typename T>
class RingBufferBrokenStorage;
} // namespace detail
/// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
......@@ -54,7 +65,10 @@ class RingBufferSlot;
/// "future" can optionally block but reads from the "past" will always fail.
///
template <typename T, template <typename> class Atom = std::atomic>
template <
typename T,
template <typename> class Atom = std::atomic,
template <typename> class Storage = detail::RingBufferTrivialStorage>
class LockFreeRingBuffer {
static_assert(
std::is_nothrow_default_constructible<T>::value,
......@@ -92,9 +106,7 @@ class LockFreeRingBuffer {
};
explicit LockFreeRingBuffer(uint32_t capacity) noexcept
: capacity_(capacity),
slots_(new detail::RingBufferSlot<T, Atom>[capacity]),
ticket_(0) {}
: capacity_(capacity), slots_(new Slot[capacity]), ticket_(0) {}
LockFreeRingBuffer(const LockFreeRingBuffer&) = delete;
LockFreeRingBuffer& operator=(const LockFreeRingBuffer&) = delete;
......@@ -103,7 +115,7 @@ class LockFreeRingBuffer {
/// Writes can block iff a previous writer has not yet completed a write
/// for the same slot (before the most recent wrap-around).
template <typename V>
void write(V& value) noexcept {
void write(const V& value) noexcept {
uint64_t ticket = ticket_.fetch_add(1);
slots_[idx(ticket)].write(turn(ticket), value);
}
......@@ -113,7 +125,7 @@ class LockFreeRingBuffer {
/// for the same slot (before the most recent wrap-around).
/// Returns a Cursor pointing to the just-written T.
template <typename V>
Cursor writeAndGetCursor(V& value) noexcept {
Cursor writeAndGetCursor(const V& value) noexcept {
uint64_t ticket = ticket_.fetch_add(1);
slots_[idx(ticket)].write(turn(ticket), value);
return Cursor(ticket);
......@@ -166,16 +178,17 @@ class LockFreeRingBuffer {
/// dumped memory regions.
std::pair<void const*, size_t> internalBufferLocation() const {
return std::make_pair(
static_cast<void const*>(slots_.get()),
sizeof(detail::RingBufferSlot<T, Atom>[capacity_]));
static_cast<void const*>(slots_.get()), sizeof(Slot[capacity_]));
}
~LockFreeRingBuffer() {}
private:
using Slot = detail::RingBufferSlot<T, Atom, Storage>;
const uint32_t capacity_;
const std::unique_ptr<detail::RingBufferSlot<T, Atom>[]> slots_;
const std::unique_ptr<Slot[]> slots_;
Atom<uint64_t> ticket_;
......@@ -187,15 +200,15 @@ class LockFreeRingBuffer {
}; // LockFreeRingBuffer
namespace detail {
template <typename T, template <typename> class Atom>
template <
typename T,
template <typename>
class Atom,
template <typename>
class Storage>
class RingBufferSlot {
template <typename V>
void copy(V& dest, T& src) {
dest = src;
}
public:
explicit RingBufferSlot() noexcept : sequencer_(), data() {}
explicit RingBufferSlot() noexcept {}
template <typename V>
void write(const uint32_t turn, const V& value) noexcept {
......@@ -205,7 +218,7 @@ class RingBufferSlot {
// Change to an odd-numbered turn to indicate write in process
sequencer_.completeTurn(turn * 2);
data = value;
storage_.store(value);
sequencer_.completeTurn(turn * 2 + 1);
// At (turn + 1) * 2
}
......@@ -218,7 +231,7 @@ class RingBufferSlot {
TurnSequencer<Atom>::TryWaitResult::SUCCESS) {
return false;
}
copy(dest, data);
storage_.load(dest);
// if it's still the same turn, we read the value successfully
return sequencer_.isTurn(desired_turn);
......@@ -230,7 +243,7 @@ class RingBufferSlot {
if (!sequencer_.isTurn((turn + 1) * 2)) {
return false;
}
copy(dest, data);
storage_.load(dest);
// if it's still the same turn, we read the value successfully
return sequencer_.isTurn((turn + 1) * 2);
......@@ -238,9 +251,53 @@ class RingBufferSlot {
private:
TurnSequencer<Atom> sequencer_;
T data;
}; // RingBufferSlot
Storage<T> storage_;
};
} // namespace detail
template <typename T>
class RingBufferTrivialStorage {
static_assert(is_trivially_copyable_v<T>, "T must trivially copyable");
// Note: If T fits in 8 bytes, folly::AtomicStruct could be used instead.
public:
RingBufferTrivialStorage() noexcept {
annotate_benign_race_sized(
&data_,
sizeof(T),
"T is trivial and sequencer is checked to determine validity",
__FILE__,
__LINE__);
}
void store(const T& src) {
std::memcpy(&data_, &src, sizeof(T));
std::atomic_thread_fence(std::memory_order_release);
}
void load(T& dest) {
std::atomic_thread_fence(std::memory_order_acquire);
std::memcpy(&dest, &data_, sizeof(T));
}
private:
// No initialization is necessary because the sequencer is checked before data
// is returned.
T data_;
};
template <typename T>
class [[deprecated(
"It is UB to race loads and stores across multiple threads. "
"Use RingBufferTrivialStorage.")]] RingBufferBrokenStorage {
public:
void store(const T& src) { data_ = src; }
void load(T & dest) { dest = data_; }
private:
T data_{};
};
} // namespace detail
} // namespace folly
......@@ -250,4 +250,47 @@ TEST(LockFreeRingBuffer, moveBackwardsCanFail) {
EXPECT_FALSE(cursor.moveBackward()); // moving back does nothing
}
namespace {
struct S {
int x;
float y;
char c;
};
} // namespace
TEST(LockFreeRingBuffer, contendedReadsAndWrites) {
LockFreeRingBuffer<S> rb{2};
std::atomic<bool> done{false};
std::vector<std::thread> threads;
for (int i = 0; i < 8; ++i) {
threads.emplace_back([&] {
while (!done.load(std::memory_order_relaxed)) {
S value{10, -5.5, 100};
rb.write(value);
}
});
}
for (int i = 0; i < 8; ++i) {
threads.emplace_back([&] {
S value;
while (!done.load(std::memory_order_relaxed)) {
if (rb.tryRead(value, rb.currentTail())) {
EXPECT_EQ(10, value.x);
EXPECT_EQ(-5.5, value.y);
EXPECT_EQ(100, value.c);
}
}
});
}
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
done.store(true, std::memory_order_relaxed);
for (auto& thread : threads) {
thread.join();
}
}
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment