Commit fff701bf authored by Delyan Kratunov's avatar Delyan Kratunov Committed by Noam Lerner

LockFreeRingBuffer

Summary:
This introduces a lock-free ring buffer with the following expected semantics:

1. Writers can't block on readers
2. Writers are mostly lock-free
3. Readers can detect if they're being too slow
4. Be usable on Android (TBD but should work as-is with the armeabi-v7a ABI; armeabi (ARMv5) support is explicitly a non-goal)

Non-goals:
1. Match MPMCQueue in level of optimization. There's no need for that yet.

Test Plan: iloveunittests

Reviewed By: ngbronson@fb.com

Subscribers: trunkagent, folly-diffs@, yfeldblum, chalfant

FB internal diff: D2037718

Signature: t1:2037718:1432850250:c57963510d8cda58edc006f4c3260f5ac34d4996
parent a95dbe43
......@@ -104,6 +104,7 @@ nobase_follyinclude_HEADERS = \
experimental/FutureDAG.h \
experimental/io/FsUtil.h \
experimental/JSONSchema.h \
experimental/LockFreeRingBuffer.h \
experimental/Select64.h \
experimental/SharedMutex.h \
experimental/StringKeyedCommon.h \
......
......@@ -78,6 +78,16 @@ struct TurnSequencer {
return decodeCurrentSturn(state) == (turn << kTurnShift);
}
/// See tryWaitForTurn
/// Requires that `turn` is not a turn in the past.
void waitForTurn(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff) noexcept {
bool success = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
(void) success;
assert(success);
}
// Internally we always work with shifted turn values, which makes the
// truncation and wraparound work correctly. This leaves us bits at
// the bottom to store the number of waiters. We call shifted turns
......@@ -87,7 +97,8 @@ struct TurnSequencer {
/// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
/// before blocking and will adjust spinCutoff based on the results,
/// otherwise it will spin for at most spinCutoff spins.
void waitForTurn(const uint32_t turn,
/// Returns true if the wait succeeded, false if the turn is in the past
bool tryWaitForTurn(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff) noexcept {
uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
......@@ -103,8 +114,11 @@ struct TurnSequencer {
break;
}
// wrap-safe version of assert(current_sturn < sturn)
assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
// wrap-safe version of (current_sturn >= sturn)
if(sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
// turn is in the past
return false;
}
// the first effectSpinCutoff tries are spins, after that we will
// record ourself as a waiter and block with futexWait
......@@ -154,6 +168,8 @@ struct TurnSequencer {
prevThresh, prevThresh + int(target - prevThresh) / 8);
}
}
return true;
}
/// Unblocks a thread running waitForTurn(turn + 1)
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <boost/noncopyable.hpp>
#include <iostream>
#include <cmath>
#include <string.h>
#include <type_traits>
#include <unistd.h>
#include <folly/detail/TurnSequencer.h>
#include <folly/Portability.h>
namespace folly {
namespace detail {
template<typename T,
template<typename> class Atom>
class RingBufferSlot;
} // namespace detail
/// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
/// following semantics:
///
/// 1. Writers cannot block on other writers UNLESS they are <capacity> writes
/// apart from each other (writing to the same slot after a wrap-around)
/// 2. Writers cannot block on readers
/// 3. Readers can wait for writes that haven't occurred yet
/// 4. Readers can detect if they are lagging behind
///
/// In this sense, reads from this buffer are best-effort but writes
/// are guaranteed.
///
/// Another way to think about this is as an unbounded stream of writes. The
/// buffer contains the last <capacity> writes but readers can attempt to read
/// any part of the stream, even outside this window. The read API takes a
/// Cursor that can point anywhere in this stream of writes. Reads from the
/// "future" can optionally block but reads from the "past" will always fail.
///
template<typename T, template<typename> class Atom = std::atomic>
class LockFreeRingBuffer: boost::noncopyable {
static_assert(std::is_nothrow_default_constructible<T>::value,
"Element type must be nothrow default constructible");
static_assert(FOLLY_IS_TRIVIALLY_COPYABLE(T),
"Element type must be trivially copyable");
public:
/// Opaque pointer to a past or future write.
/// Can be moved relative to its current location but not in absolute terms.
struct Cursor {
explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
void moveForward(uint64_t steps = 1) noexcept {
ticket += steps;
}
void moveBackward(uint64_t steps = 1) noexcept {
if (steps > ticket) {
ticket = 0;
} else {
ticket -= steps;
}
}
protected: // for test visibility reasons
uint64_t ticket;
friend class LockFreeRingBuffer;
};
explicit LockFreeRingBuffer(size_t capacity) noexcept
: capacity_(capacity)
, slots_(new detail::RingBufferSlot<T,Atom>[capacity])
, ticket_(0)
{}
/// Perform a single write of an object of type T.
/// Writes can block iff a previous writer has not yet completed a write
/// for the same slot (before the most recent wrap-around).
void write(T& value) noexcept {
uint64_t ticket = ticket_.fetch_add(1);
slots_[idx(ticket)].write(turn(ticket), value);
}
/// Read the value at the cursor.
/// Returns true if the read succeeded, false otherwise. If the return
/// value is false, dest is to be considered partially read and in an
/// inconsistent state. Readers are advised to discard it.
bool tryRead(T& dest, const Cursor& cursor) noexcept {
return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
}
/// Read the value at the cursor or block if the write has not occurred yet.
/// Returns true if the read succeeded, false otherwise. If the return
/// value is false, dest is to be considered partially read and in an
/// inconsistent state. Readers are advised to discard it.
bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
}
/// Returns a Cursor pointing to the first write that has not occurred yet.
Cursor currentHead() noexcept {
return Cursor(ticket_.load());
}
/// Returns a Cursor pointing to a currently readable write.
/// skipFraction is a value in the [0, 1] range indicating how far into the
/// currently readable window to place the cursor. 0 means the
/// earliest readable write, 1 means the latest readable write (if any).
Cursor currentTail(double skipFraction = 0.0) noexcept {
assert(skipFraction >= 0.0 && skipFraction <= 1.0);
uint64_t ticket = ticket_.load();
uint64_t backStep = std::llround((1.0 - skipFraction) * capacity_);
// always try to move at least one step backward to something readable
backStep = std::max<uint64_t>(1, backStep);
// can't go back more steps than we've taken
backStep = std::min(ticket, backStep);
return Cursor(ticket - backStep);
}
~LockFreeRingBuffer() {
}
private:
const size_t capacity_;
const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
Atom<uint64_t> ticket_;
uint32_t idx(uint64_t ticket) noexcept {
return ticket % capacity_;
}
uint32_t turn(uint64_t ticket) noexcept {
return (ticket / capacity_);
}
}; // LockFreeRingBuffer
namespace detail {
template<typename T, template<typename> class Atom>
class RingBufferSlot {
public:
explicit RingBufferSlot() noexcept
: sequencer_()
, data()
{
}
void write(const uint32_t turn, T& value) noexcept {
Atom<uint32_t> cutoff(0);
sequencer_.waitForTurn(turn * 2, cutoff, false);
// Change to an odd-numbered turn to indicate write in process
sequencer_.completeTurn(turn * 2);
data = std::move(value);
sequencer_.completeTurn(turn * 2 + 1);
// At (turn + 1) * 2
}
bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
uint32_t desired_turn = (turn + 1) * 2;
Atom<uint32_t> cutoff(0);
if(!sequencer_.tryWaitForTurn(desired_turn, cutoff, false)) {
return false;
}
memcpy(&dest, &data, sizeof(T));
// if it's still the same turn, we read the value successfully
return sequencer_.isTurn(desired_turn);
}
bool tryRead(T& dest, uint32_t turn) noexcept {
// The write that started at turn 0 ended at turn 2
if (!sequencer_.isTurn((turn + 1) * 2)) {
return false;
}
memcpy(&dest, &data, sizeof(T));
// if it's still the same turn, we read the value successfully
return sequencer_.isTurn((turn + 1) * 2);
}
private:
TurnSequencer<Atom> sequencer_;
T data;
}; // RingBufferSlot
} // namespace detail
} // namespace folly
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include <iostream>
#include <thread>
#include <folly/detail/Futex.h>
#include <folly/experimental/LockFreeRingBuffer.h>
#include <folly/test/DeterministicSchedule.h>
namespace folly {
TEST(LockFreeRingBuffer, writeReadSequentially) {
const int capacity = 256;
const int turns = 4;
LockFreeRingBuffer<int> rb(capacity);
LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
for (unsigned int turn = 0; turn < turns; turn++) {
for (unsigned int write = 0; write < capacity; write++) {
int val = turn*capacity + write;
rb.write(val);
}
for (unsigned int write = 0; write < capacity; write++) {
int dest = 0;
ASSERT_TRUE(rb.tryRead(dest, cur));
ASSERT_EQ(turn*capacity + write, dest);
cur.moveForward();
}
}
}
TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) {
const int capacity = 256;
const int turns = 4;
LockFreeRingBuffer<int> rb(capacity);
for (unsigned int turn = 0; turn < turns; turn++) {
for (unsigned int write = 0; write < capacity; write++) {
int val = turn*capacity + write;
rb.write(val);
}
LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
cur.moveBackward(1); /// last write
for (int write = capacity - 1; write >= 0; write--) {
int foo = 0;
ASSERT_TRUE(rb.tryRead(foo, cur));
ASSERT_EQ(turn*capacity + write, foo);
cur.moveBackward();
}
}
}
TEST(LockFreeRingBuffer, readsCanBlock) {
// Start a reader thread, confirm that reading can block
std::atomic<bool> readerHasRun(false);
LockFreeRingBuffer<int> rb(1);
auto cursor = rb.currentHead();
cursor.moveForward(3); // wait for the 4th write
const int sentinel = 0xfaceb00c;
auto reader = std::thread([&]() {
int val = 0;
EXPECT_TRUE(rb.waitAndTryRead(val, cursor));
readerHasRun = true;
EXPECT_EQ(sentinel, val);
});
for (int i = 0; i < 4; i++) {
EXPECT_FALSE(readerHasRun);
int val = sentinel;
rb.write(val);
}
reader.join();
EXPECT_TRUE(readerHasRun);
}
// expose the cursor raw value via a wrapper type
template<typename T, template<typename> class Atom>
uint64_t value(const typename LockFreeRingBuffer<T, Atom>::Cursor&& rbcursor) {
typedef typename LockFreeRingBuffer<T,Atom>::Cursor RBCursor;
RBCursor cursor = std::move(rbcursor);
struct ExposedCursor : RBCursor {
ExposedCursor(const RBCursor& cursor): RBCursor(cursor) {}
uint64_t value(){
return this->ticket;
}
};
return ExposedCursor(cursor).value();
}
template<template<typename> class Atom>
void runReader(
LockFreeRingBuffer<int, Atom>& rb, std::atomic<int32_t>& writes
) {
int32_t idx;
while ((idx = writes--) > 0) {
rb.write(idx);
}
}
template<template<typename> class Atom>
void runWritesNeverFail(
int capacity, int writes, int writers
) {
using folly::test::DeterministicSchedule;
DeterministicSchedule sched(DeterministicSchedule::uniform(0));
LockFreeRingBuffer<int, Atom> rb(capacity);
std::atomic<int32_t> writes_remaining(writes);
std::vector<std::thread> threads(writers);
for (int i = 0; i < writers; i++) {
threads[i] = DeterministicSchedule::thread(
std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining))
);
}
for (auto& thread : threads) {
DeterministicSchedule::join(thread);
}
EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead()));
}
TEST(LockFreeRingBuffer, writesNeverFail) {
using folly::test::DeterministicAtomic;
using folly::detail::EmulatedFutexAtomic;
runWritesNeverFail<DeterministicAtomic>(1, 100, 4);
runWritesNeverFail<DeterministicAtomic>(10, 100, 4);
runWritesNeverFail<DeterministicAtomic>(100, 1000, 8);
runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16);
runWritesNeverFail<std::atomic>(1, 100, 4);
runWritesNeverFail<std::atomic>(10, 100, 4);
runWritesNeverFail<std::atomic>(100, 1000, 8);
runWritesNeverFail<std::atomic>(1000, 10000, 16);
runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4);
runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4);
runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8);
runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16);
}
TEST(LockFreeRingBuffer, readerCanDetectSkips) {
const int capacity = 4;
const int rounds = 4;
LockFreeRingBuffer<int> rb(capacity);
auto cursor = rb.currentHead();
cursor.moveForward(1);
for (int round = 0; round < rounds; round++) {
for (int i = 0; i < capacity; i++) {
int val = round * capacity + i;
rb.write(val);
}
}
int result = -1;
EXPECT_FALSE(rb.tryRead(result, cursor));
EXPECT_FALSE(rb.waitAndTryRead(result, cursor));
EXPECT_EQ(-1, result);
cursor = rb.currentTail();
EXPECT_TRUE(rb.tryRead(result, cursor));
EXPECT_EQ(capacity * (rounds - 1), result);
cursor = rb.currentTail(1.0);
EXPECT_TRUE(rb.tryRead(result, cursor));
EXPECT_EQ((capacity * rounds) - 1, result);
}
TEST(LockFreeRingBuffer, currentTailRange) {
const int capacity = 4;
LockFreeRingBuffer<int> rb(capacity);
// Workaround for template deduction failure
auto (&cursorValue)(value<int, std::atomic>);
// Empty buffer - everything points to 0
EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
EXPECT_EQ(0, cursorValue(rb.currentTail(0.5)));
EXPECT_EQ(0, cursorValue(rb.currentTail(1)));
// Half-full
int val = 5;
rb.write(val);
rb.write(val);
EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
EXPECT_EQ(1, cursorValue(rb.currentTail(1)));
// Full
rb.write(val);
rb.write(val);
EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
EXPECT_EQ(3, cursorValue(rb.currentTail(1)));
auto midvalue = cursorValue(rb.currentTail(0.5));
// both rounding behaviours are acceptable
EXPECT_TRUE(midvalue == 1 || midvalue == 2);
}
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment