Commit 5a500c70 authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

Add timed_wait

Summary:
Add try_wait_for and try_wait_until.

Algorithm -

We could use a doubly-linked list, but this would double the number of contended
CAS for push/post.  Instead, assume timeouts are infrequent, and just walk the list
from the head to remove nodes.

A lock bit is added to the head, which is taken only on timeout.  We assume
timeouts are infrequent.

If a concurrent post() removes the node before the timeout can remove it,
we must wait for the corresponding post() (but it is likely to arrive soon),
instead of timing out.

Reviewed By: magedm

Differential Revision: D7167894

fbshipit-source-id: ea5242098d9ccd286a72fade6292223e95c44a81
parent c9dd3e5e
...@@ -26,14 +26,15 @@ ...@@ -26,14 +26,15 @@
#include <folly/CachelinePadded.h> #include <folly/CachelinePadded.h>
#include <folly/IndexedMemPool.h> #include <folly/IndexedMemPool.h>
#include <folly/Likely.h> #include <folly/Likely.h>
#include <folly/lang/SafeAssert.h>
#include <folly/synchronization/AtomicStruct.h> #include <folly/synchronization/AtomicStruct.h>
#include <folly/synchronization/Baton.h> #include <folly/synchronization/SaturatingSemaphore.h>
namespace folly { namespace folly {
template < template <
template <typename> class Atom = std::atomic, template <typename> class Atom = std::atomic,
class BatonType = Baton<true, Atom>> class BatonType = SaturatingSemaphore<true, Atom>>
struct LifoSemImpl; struct LifoSemImpl;
/// LifoSem is a semaphore that wakes its waiters in a manner intended to /// LifoSem is a semaphore that wakes its waiters in a manner intended to
...@@ -71,6 +72,11 @@ struct LifoSemImpl; ...@@ -71,6 +72,11 @@ struct LifoSemImpl;
/// ///
/// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait(). /// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait().
/// ///
/// -- timed wait variants - will wait until timeout. Note when these
/// timeout, the current implementation takes a lock, blocking
/// concurrent pushes and pops. (If timed wait calls are
/// substantial, consider re-working this code to be lock-free).
///
/// LifoSem also has the notion of a shutdown state, in which any calls /// LifoSem also has the notion of a shutdown state, in which any calls
/// that would block (or are already blocked) throw ShutdownSemError. /// that would block (or are already blocked) throw ShutdownSemError.
/// Note the difference between a call to wait() and a call to wait() /// Note the difference between a call to wait() and a call to wait()
...@@ -81,11 +87,10 @@ struct LifoSemImpl; ...@@ -81,11 +87,10 @@ struct LifoSemImpl;
/// you can just check isShutdown() yourself (preferrably wrapped in /// you can just check isShutdown() yourself (preferrably wrapped in
/// an UNLIKELY). This fast-stop behavior is easy to add, but difficult /// an UNLIKELY). This fast-stop behavior is easy to add, but difficult
/// to remove if you want the draining behavior, which is why we have /// to remove if you want the draining behavior, which is why we have
/// chosen the former. Since wait() is the only method that can block, /// chosen the former.
/// it is the only one that is affected by the shutdown state.
/// ///
/// All LifoSem operations operations except valueGuess() are guaranteed /// All LifoSem operations except valueGuess() are guaranteed to be
/// to be linearizable. /// linearizable.
typedef LifoSemImpl<> LifoSem; typedef LifoSemImpl<> LifoSem;
...@@ -212,11 +217,13 @@ class LifoSemHead { ...@@ -212,11 +217,13 @@ class LifoSemHead {
enum { enum {
IsNodeIdxShift = 32, IsNodeIdxShift = 32,
IsShutdownShift = 33, IsShutdownShift = 33,
SeqShift = 34, IsLockedShift = 34,
SeqShift = 35,
}; };
enum : uint64_t { enum : uint64_t {
IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift, IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
IsShutdownMask = uint64_t(1) << IsShutdownShift, IsShutdownMask = uint64_t(1) << IsShutdownShift,
IsLockedMask = uint64_t(1) << IsLockedShift,
SeqIncr = uint64_t(1) << SeqShift, SeqIncr = uint64_t(1) << SeqShift,
SeqMask = ~(SeqIncr - 1), SeqMask = ~(SeqIncr - 1),
}; };
...@@ -242,6 +249,9 @@ class LifoSemHead { ...@@ -242,6 +249,9 @@ class LifoSemHead {
inline constexpr bool isShutdown() const { inline constexpr bool isShutdown() const {
return (bits & IsShutdownMask) != 0; return (bits & IsShutdownMask) != 0;
} }
inline constexpr bool isLocked() const {
return (bits & IsLockedMask) != 0;
}
inline constexpr uint32_t seq() const { inline constexpr uint32_t seq() const {
return uint32_t(bits >> SeqShift); return uint32_t(bits >> SeqShift);
} }
...@@ -257,6 +267,7 @@ class LifoSemHead { ...@@ -257,6 +267,7 @@ class LifoSemHead {
/// Returns the LifoSemHead that results from popping a waiter node, /// Returns the LifoSemHead that results from popping a waiter node,
/// given the current waiter node's next ptr /// given the current waiter node's next ptr
inline LifoSemHead withPop(uint32_t idxNext) const { inline LifoSemHead withPop(uint32_t idxNext) const {
assert(!isLocked());
assert(isNodeIdx()); assert(isNodeIdx());
if (idxNext == 0) { if (idxNext == 0) {
// no isNodeIdx bit or data bits. Wraparound of seq bits is okay // no isNodeIdx bit or data bits. Wraparound of seq bits is okay
...@@ -272,6 +283,7 @@ class LifoSemHead { ...@@ -272,6 +283,7 @@ class LifoSemHead {
/// Returns the LifoSemHead that results from pushing a new waiter node /// Returns the LifoSemHead that results from pushing a new waiter node
inline LifoSemHead withPush(uint32_t _idx) const { inline LifoSemHead withPush(uint32_t _idx) const {
assert(!isLocked());
assert(isNodeIdx() || value() == 0); assert(isNodeIdx() || value() == 0);
assert(!isShutdown()); assert(!isShutdown());
assert(_idx != 0); assert(_idx != 0);
...@@ -281,6 +293,7 @@ class LifoSemHead { ...@@ -281,6 +293,7 @@ class LifoSemHead {
/// Returns the LifoSemHead with value increased by delta, with /// Returns the LifoSemHead with value increased by delta, with
/// saturation if the maximum value is reached /// saturation if the maximum value is reached
inline LifoSemHead withValueIncr(uint32_t delta) const { inline LifoSemHead withValueIncr(uint32_t delta) const {
assert(!isLocked());
assert(!isNodeIdx()); assert(!isNodeIdx());
auto rv = LifoSemHead{ bits + SeqIncr + delta }; auto rv = LifoSemHead{ bits + SeqIncr + delta };
if (UNLIKELY(rv.isNodeIdx())) { if (UNLIKELY(rv.isNodeIdx())) {
...@@ -292,6 +305,7 @@ class LifoSemHead { ...@@ -292,6 +305,7 @@ class LifoSemHead {
/// Returns the LifoSemHead that results from decrementing the value /// Returns the LifoSemHead that results from decrementing the value
inline LifoSemHead withValueDecr(uint32_t delta) const { inline LifoSemHead withValueDecr(uint32_t delta) const {
assert(!isLocked());
assert(delta > 0 && delta <= value()); assert(delta > 0 && delta <= value());
return LifoSemHead{ bits + SeqIncr - delta }; return LifoSemHead{ bits + SeqIncr - delta };
} }
...@@ -302,6 +316,20 @@ class LifoSemHead { ...@@ -302,6 +316,20 @@ class LifoSemHead {
return LifoSemHead{ bits | IsShutdownMask }; return LifoSemHead{ bits | IsShutdownMask };
} }
// Returns LifoSemHead with lock bit set, but rest of bits unchanged.
inline LifoSemHead withLock() const {
assert(!isLocked());
return LifoSemHead{bits | IsLockedMask};
}
// Returns LifoSemHead with lock bit unset, and updated seqno based
// on idx.
inline LifoSemHead withoutLock(uint32_t idxNext) const {
assert(isLocked());
// We need to treat this as a pop, as we may change the list head.
return LifoSemHead{bits & ~IsLockedMask}.withPop(idxNext);
}
inline constexpr bool operator== (const LifoSemHead& rhs) const { inline constexpr bool operator== (const LifoSemHead& rhs) const {
return bits == rhs.bits; return bits == rhs.bits;
} }
...@@ -377,6 +405,11 @@ struct LifoSemBase { ...@@ -377,6 +405,11 @@ struct LifoSemBase {
// now wake up any waiters // now wake up any waiters
while (h.isNodeIdx()) { while (h.isNodeIdx()) {
if (h.isLocked()) {
std::this_thread::yield();
h = head_->load(std::memory_order_acquire);
continue;
}
auto& node = idxToNode(h.idx()); auto& node = idxToNode(h.idx());
auto repl = h.withPop(node.next); auto repl = h.withPop(node.next);
if (head_->compare_exchange_strong(h, repl)) { if (head_->compare_exchange_strong(h, repl)) {
...@@ -424,10 +457,23 @@ struct LifoSemBase { ...@@ -424,10 +457,23 @@ struct LifoSemBase {
/// Note that wait() doesn't throw during shutdown if tryWait() would /// Note that wait() doesn't throw during shutdown if tryWait() would
/// return true /// return true
void wait() { void wait() {
auto const deadline = std::chrono::steady_clock::time_point::max();
auto res = try_wait_until(deadline);
FOLLY_SAFE_DCHECK(res, "infinity time has passed");
}
template <typename Rep, typename Period>
bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
return try_wait_until(timeout + std::chrono::steady_clock::now());
}
template <typename Clock, typename Duration>
bool try_wait_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
// early check isn't required for correctness, but is an important // early check isn't required for correctness, but is an important
// perf win if we can avoid allocating and deallocating a node // perf win if we can avoid allocating and deallocating a node
if (tryWait()) { if (tryWait()) {
return; return true;
} }
// allocateNode() won't compile unless Handoff has a default // allocateNode() won't compile unless Handoff has a default
...@@ -441,10 +487,23 @@ struct LifoSemBase { ...@@ -441,10 +487,23 @@ struct LifoSemBase {
} }
if (rv == WaitResult::PUSH) { if (rv == WaitResult::PUSH) {
node->handoff().wait(); if (!node->handoff().try_wait_until(deadline)) {
if (tryRemoveNode(*node)) {
return false;
} else {
// We could not remove our node. Return to waiting.
//
// This only happens if we lose a removal race with post(),
// so we are not likely to wait long. This is only
// necessary to ensure we don't return node's memory back to
// IndexedMemPool before post() has had a chance to post to
// handoff(). In a stronger memory reclamation scheme, such
// as hazptr or rcu, this would not be necessary.
node->handoff().wait();
}
}
if (UNLIKELY(node->isShutdownNotice())) { if (UNLIKELY(node->isShutdownNotice())) {
// this wait() didn't consume a value, it was triggered by shutdown // this wait() didn't consume a value, it was triggered by shutdown
assert(isShutdown());
throw ShutdownSemError( throw ShutdownSemError(
"blocking wait() interrupted by semaphore shutdown"); "blocking wait() interrupted by semaphore shutdown");
} }
...@@ -454,6 +513,7 @@ struct LifoSemBase { ...@@ -454,6 +513,7 @@ struct LifoSemBase {
// recycle the node now // recycle the node now
} }
// else node wasn't pushed, so it is safe to recycle // else node wasn't pushed, so it is safe to recycle
return true;
} }
/// Returns a guess at the current value, designed for debugging. /// Returns a guess at the current value, designed for debugging.
...@@ -522,6 +582,57 @@ struct LifoSemBase { ...@@ -522,6 +582,57 @@ struct LifoSemBase {
return LifoSemRawNode<Atom>::pool().locateElem(&node); return LifoSemRawNode<Atom>::pool().locateElem(&node);
} }
// Locks the list head (blocking concurrent pushes and pops)
// and attempts to remove this node. Returns true if node was
// found and removed, false if not found.
bool tryRemoveNode(const LifoSemNode<Handoff, Atom>& removenode) {
auto removeidx = nodeToIdx(removenode);
auto head = head_->load(std::memory_order_acquire);
// Try to lock the head.
while (true) {
if (head.isLocked()) {
std::this_thread::yield();
head = head_->load(std::memory_order_acquire);
continue;
}
if (!head.isNodeIdx()) {
return false;
}
if (head_->compare_exchange_weak(
head,
head.withLock(),
std::memory_order_acquire,
std::memory_order_relaxed)) {
break;
}
}
// Update local var to what head_ is, for better assert() checking.
head = head.withLock();
bool result = false;
auto idx = head.idx();
if (idx == removeidx) {
// pop from head. Head seqno is updated.
head_->store(
head.withoutLock(removenode.next), std::memory_order_release);
return true;
}
auto node = &idxToNode(idx);
idx = node->next;
while (idx) {
if (idx == removeidx) {
// Pop from mid-list.
node->next = removenode.next;
result = true;
break;
}
node = &idxToNode(idx);
idx = node->next;
}
// Unlock and return result
head_->store(head.withoutLock(head.idx()), std::memory_order_release);
return result;
}
/// Either increments by n and returns 0, or pops a node and returns it. /// Either increments by n and returns 0, or pops a node and returns it.
/// If n + the stripe's value overflows, then the stripe's value /// If n + the stripe's value overflows, then the stripe's value
/// saturates silently at 2^32-1 /// saturates silently at 2^32-1
...@@ -530,6 +641,10 @@ struct LifoSemBase { ...@@ -530,6 +641,10 @@ struct LifoSemBase {
assert(n > 0); assert(n > 0);
auto head = head_->load(std::memory_order_acquire); auto head = head_->load(std::memory_order_acquire);
if (head.isLocked()) {
std::this_thread::yield();
continue;
}
if (head.isNodeIdx()) { if (head.isNodeIdx()) {
auto& node = idxToNode(head.idx()); auto& node = idxToNode(head.idx());
if (head_->compare_exchange_strong(head, head.withPop(node.next))) { if (head_->compare_exchange_strong(head, head.withPop(node.next))) {
...@@ -560,6 +675,11 @@ struct LifoSemBase { ...@@ -560,6 +675,11 @@ struct LifoSemBase {
while (true) { while (true) {
auto head = head_->load(std::memory_order_acquire); auto head = head_->load(std::memory_order_acquire);
if (head.isLocked()) {
std::this_thread::yield();
continue;
}
if (!head.isNodeIdx() && head.value() > 0) { if (!head.isNodeIdx() && head.value() > 0) {
// decr // decr
auto delta = std::min(n, head.value()); auto delta = std::min(n, head.value());
......
...@@ -286,6 +286,52 @@ TEST(LifoSem, multi_try_wait) { ...@@ -286,6 +286,52 @@ TEST(LifoSem, multi_try_wait) {
ASSERT_EQ(NPOSTS, consumed); ASSERT_EQ(NPOSTS, consumed);
} }
TEST(LifoSem, timeout) {
long seed = folly::randomNumberSeed() % 10000;
LOG(INFO) << "seed=" << seed;
DSched sched(DSched::uniform(seed));
DeterministicAtomic<uint32_t> handoffs{0};
for (int pass = 0; pass < 10; ++pass) {
DLifoSem a;
std::vector<std::thread> threads;
while (threads.size() < 20) {
threads.push_back(DSched::thread([&] {
for (int i = 0; i < 10; i++) {
try {
if (a.try_wait_for(std::chrono::milliseconds(1))) {
handoffs--;
}
} catch (ShutdownSemError&) {
// expected
EXPECT_TRUE(a.isShutdown());
}
}
}));
}
std::vector<std::thread> threads2;
while (threads2.size() < 20) {
threads2.push_back(DSched::thread([&] {
for (int i = 0; i < 10; i++) {
a.post();
handoffs++;
}
}));
}
if (pass > 5) {
a.shutdown();
}
for (auto& thr : threads) {
DSched::join(thr);
}
for (auto& thr : threads2) {
DSched::join(thr);
}
// At least one timeout must occur.
EXPECT_GT(handoffs.load(), 0);
}
}
BENCHMARK(lifo_sem_pingpong, iters) { BENCHMARK(lifo_sem_pingpong, iters) {
LifoSem a; LifoSem a;
LifoSem b; LifoSem b;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment