Commit 8a14ddfa authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

Add folly::Optional<T> UnboundedQueue::try_dequeue() method

Summary: Add folly::Optional<T> UnboundedQueue::try_dequeue() method

Reviewed By: magedm

Differential Revision: D7164049

fbshipit-source-id: 659de2d26067bb1a5bdea50f4cd7711e6955ba61
parent 9b55b900
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <folly/ConstexprMath.h> #include <folly/ConstexprMath.h>
#include <folly/Optional.h>
#include <folly/concurrency/CacheLocality.h> #include <folly/concurrency/CacheLocality.h>
#include <folly/experimental/hazptr/hazptr.h> #include <folly/experimental/hazptr/hazptr.h>
#include <folly/lang/Align.h> #include <folly/lang/Align.h>
...@@ -266,7 +267,16 @@ class UnboundedQueue { ...@@ -266,7 +267,16 @@ class UnboundedQueue {
/** try_dequeue */ /** try_dequeue */
FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept { FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min()); auto o = try_dequeue();
if (LIKELY(o.has_value())) {
item = std::move(*o);
return true;
}
return false;
}
FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue() noexcept {
return tryDequeueUntil(std::chrono::steady_clock::time_point::min());
} }
/** try_dequeue_until */ /** try_dequeue_until */
...@@ -274,7 +284,20 @@ class UnboundedQueue { ...@@ -274,7 +284,20 @@ class UnboundedQueue {
FOLLY_ALWAYS_INLINE bool try_dequeue_until( FOLLY_ALWAYS_INLINE bool try_dequeue_until(
T& item, T& item,
const std::chrono::time_point<Clock, Duration>& deadline) noexcept { const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
return tryDequeueUntil(item, deadline); folly::Optional<T> o = try_dequeue_until(deadline);
if (LIKELY(o.has_value())) {
item = std::move(*o);
return true;
}
return false;
}
template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_until(
const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
return tryDequeueUntil(deadline);
} }
/** try_dequeue_for */ /** try_dequeue_for */
...@@ -282,10 +305,24 @@ class UnboundedQueue { ...@@ -282,10 +305,24 @@ class UnboundedQueue {
FOLLY_ALWAYS_INLINE bool try_dequeue_for( FOLLY_ALWAYS_INLINE bool try_dequeue_for(
T& item, T& item,
const std::chrono::duration<Rep, Period>& duration) noexcept { const std::chrono::duration<Rep, Period>& duration) noexcept {
if (LIKELY(try_dequeue(item))) { folly::Optional<T> o = try_dequeue_for(duration);
if (LIKELY(o.has_value())) {
item = std::move(*o);
return true; return true;
} }
return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
return false;
}
template <typename Rep, typename Period>
FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_for(
const std::chrono::duration<Rep, Period>& duration) noexcept {
folly::Optional<T> o = try_dequeue();
if (LIKELY(o.has_value())) {
return o;
}
return tryDequeueUntil(std::chrono::steady_clock::now() + duration);
} }
/** size */ /** size */
...@@ -369,26 +406,24 @@ class UnboundedQueue { ...@@ -369,26 +406,24 @@ class UnboundedQueue {
/** tryDequeueUntil */ /** tryDequeueUntil */
template <typename Clock, typename Duration> template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool tryDequeueUntil( FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntil(
T& item,
const std::chrono::time_point<Clock, Duration>& deadline) noexcept { const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
if (SingleConsumer) { if (SingleConsumer) {
Segment* s = head(); Segment* s = head();
return tryDequeueUntilSC(s, item, deadline); return tryDequeueUntilSC(s, deadline);
} else { } else {
// Using hazptr_holder instead of hazptr_local because it is // Using hazptr_holder instead of hazptr_local because it is
// possible to call ~T() and it may happen to use hazard pointers. // possible to call ~T() and it may happen to use hazard pointers.
folly::hazptr::hazptr_holder hptr; folly::hazptr::hazptr_holder hptr;
Segment* s = hptr.get_protected(c_.head); Segment* s = hptr.get_protected(c_.head);
return tryDequeueUntilMC(s, item, deadline); return tryDequeueUntilMC(s, deadline);
} }
} }
/** tryDequeueUntilSC */ /** tryDequeueUntilSC */
template <typename Clock, typename Duration> template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC( FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilSC(
Segment* s, Segment* s,
T& item,
const std::chrono::time_point<Clock, Duration>& deadline) noexcept { const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
Ticket t = consumerTicket(); Ticket t = consumerTicket();
DCHECK_GE(t, s->minTicket()); DCHECK_GE(t, s->minTicket());
...@@ -396,45 +431,44 @@ class UnboundedQueue { ...@@ -396,45 +431,44 @@ class UnboundedQueue {
size_t idx = index(t); size_t idx = index(t);
Entry& e = s->entry(idx); Entry& e = s->entry(idx);
if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
return false; return folly::Optional<T>();
} }
setConsumerTicket(t + 1); setConsumerTicket(t + 1);
e.takeItem(item); auto ret = e.takeItem();
if (responsibleForAdvance(t)) { if (responsibleForAdvance(t)) {
advanceHead(s); advanceHead(s);
} }
return true; return ret;
} }
/** tryDequeueUntilMC */ /** tryDequeueUntilMC */
template <typename Clock, typename Duration> template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC( FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilMC(
Segment* s, Segment* s,
T& item,
const std::chrono::time_point<Clock, Duration>& deadline) noexcept { const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
while (true) { while (true) {
Ticket t = consumerTicket(); Ticket t = consumerTicket();
if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
s = tryGetNextSegmentUntil(s, deadline); s = tryGetNextSegmentUntil(s, deadline);
if (s == nullptr) { if (s == nullptr) {
return false; // timed out return folly::Optional<T>(); // timed out
} }
continue; continue;
} }
size_t idx = index(t); size_t idx = index(t);
Entry& e = s->entry(idx); Entry& e = s->entry(idx);
if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
return false; return folly::Optional<T>();
} }
if (!c_.ticket.compare_exchange_weak( if (!c_.ticket.compare_exchange_weak(
t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
continue; continue;
} }
e.takeItem(item); auto ret = e.takeItem();
if (responsibleForAdvance(t)) { if (responsibleForAdvance(t)) {
advanceHead(s); advanceHead(s);
} }
return true; return ret;
} }
} }
...@@ -643,6 +677,11 @@ class UnboundedQueue { ...@@ -643,6 +677,11 @@ class UnboundedQueue {
getItem(item); getItem(item);
} }
FOLLY_ALWAYS_INLINE folly::Optional<T> takeItem() noexcept {
flag_.wait();
return getItem();
}
template <typename Clock, typename Duration> template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool tryWaitUntil( FOLLY_ALWAYS_INLINE bool tryWaitUntil(
const std::chrono::time_point<Clock, Duration>& deadline) noexcept { const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
...@@ -658,6 +697,13 @@ class UnboundedQueue { ...@@ -658,6 +697,13 @@ class UnboundedQueue {
destroyItem(); destroyItem();
} }
FOLLY_ALWAYS_INLINE folly::Optional<T> getItem() noexcept {
folly::Optional<T> ret = std::move(*(itemPtr()));
destroyItem();
return ret;
}
FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
return static_cast<T*>(static_cast<void*>(&item_)); return static_cast<T*>(static_cast<void*>(&item_));
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment