Commit 49f1f482 authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot

UnboundedQueue: Use synchronization/hazptr and hazptr_obj_base_linked

Summary:
- Use the hazptr library under folly/synchronization.
  - Use hazptr_obj_base_linked to manage counted links.

Reviewed By: djwatson

Differential Revision: D7674822

fbshipit-source-id: a87ece5dd1c7670677486af59988af8518abaf92
parent 8d0571ca
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
#include <folly/ConstexprMath.h> #include <folly/ConstexprMath.h>
#include <folly/Optional.h> #include <folly/Optional.h>
#include <folly/concurrency/CacheLocality.h> #include <folly/concurrency/CacheLocality.h>
#include <folly/experimental/hazptr/hazptr.h>
#include <folly/lang/Align.h> #include <folly/lang/Align.h>
#include <folly/synchronization/Hazptr.h>
#include <folly/synchronization/SaturatingSemaphore.h> #include <folly/synchronization/SaturatingSemaphore.h>
namespace folly { namespace folly {
...@@ -360,7 +360,7 @@ class UnboundedQueue { ...@@ -360,7 +360,7 @@ class UnboundedQueue {
} else { } else {
// Using hazptr_holder instead of hazptr_local because it is // Using hazptr_holder instead of hazptr_local because it is
// possible that the T ctor happens to use hazard pointers. // possible that the T ctor happens to use hazard pointers.
folly::hazptr::hazptr_holder hptr; hazptr_holder<Atom> hptr;
Segment* s = hptr.get_protected(p_.tail); Segment* s = hptr.get_protected(p_.tail);
enqueueCommon(s, std::forward<Arg>(arg)); enqueueCommon(s, std::forward<Arg>(arg));
} }
...@@ -395,7 +395,7 @@ class UnboundedQueue { ...@@ -395,7 +395,7 @@ class UnboundedQueue {
// Using hazptr_holder instead of hazptr_local because it is // Using hazptr_holder instead of hazptr_local because it is
// possible to call the T dtor and it may happen to use hazard // possible to call the T dtor and it may happen to use hazard
// pointers. // pointers.
folly::hazptr::hazptr_holder hptr; hazptr_holder<Atom> hptr;
Segment* s = hptr.get_protected(c_.head); Segment* s = hptr.get_protected(c_.head);
dequeueCommon(s, item); dequeueCommon(s, item);
} }
...@@ -425,7 +425,7 @@ class UnboundedQueue { ...@@ -425,7 +425,7 @@ class UnboundedQueue {
} else { } else {
// Using hazptr_holder instead of hazptr_local because it is // Using hazptr_holder instead of hazptr_local because it is
// possible to call ~T() and it may happen to use hazard pointers. // possible to call ~T() and it may happen to use hazard pointers.
folly::hazptr::hazptr_holder hptr; hazptr_holder<Atom> hptr;
Segment* s = hptr.get_protected(c_.head); Segment* s = hptr.get_protected(c_.head);
return tryDequeueUntilMC(s, deadline); return tryDequeueUntilMC(s, deadline);
} }
...@@ -548,7 +548,7 @@ class UnboundedQueue { ...@@ -548,7 +548,7 @@ class UnboundedQueue {
void allocNextSegment(Segment* s, const Ticket t) { void allocNextSegment(Segment* s, const Ticket t) {
Segment* next = new Segment(t); Segment* next = new Segment(t);
if (!SPSC) { if (!SPSC) {
next->acquire_ref_safe(); // hazptr next->acquire_ref_safe(); // defined in hazptr_obj_base_linked
} }
DCHECK(s->nextSegment() == nullptr); DCHECK(s->nextSegment() == nullptr);
s->setNextSegment(next); s->setNextSegment(next);
...@@ -592,7 +592,7 @@ class UnboundedQueue { ...@@ -592,7 +592,7 @@ class UnboundedQueue {
if (SPSC) { if (SPSC) {
delete s; delete s;
} else { } else {
s->retire(); // hazptr s->retire(); // defined in hazptr_obj_base_linked
} }
} }
...@@ -722,30 +722,13 @@ class UnboundedQueue { ...@@ -722,30 +722,13 @@ class UnboundedQueue {
/** /**
* Segment * Segment
*/ */
class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> { class Segment : public hazptr_obj_base_linked<Segment, Atom> {
Atom<Segment*> next_; Atom<Segment*> next_{nullptr};
const Ticket min_; const Ticket min_;
bool marked_; // used for iterative deletion
alignas(Align) Entry b_[SegmentSize]; alignas(Align) Entry b_[SegmentSize];
public: public:
explicit Segment(const Ticket t) explicit Segment(const Ticket t) noexcept : min_(t) {}
: next_(nullptr), min_(t), marked_(false) {}
~Segment() {
if (!SPSC && !marked_) {
Segment* next = nextSegment();
while (next) {
if (!next->release_ref()) { // hazptr
return;
}
Segment* s = next;
next = s->nextSegment();
s->marked_ = true;
delete s;
}
}
}
Segment* nextSegment() const noexcept { Segment* nextSegment() const noexcept {
return next_.load(std::memory_order_acquire); return next_.load(std::memory_order_acquire);
...@@ -763,6 +746,16 @@ class UnboundedQueue { ...@@ -763,6 +746,16 @@ class UnboundedQueue {
FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
return b_[index]; return b_[index];
} }
template <typename S>
void push_links(bool m, S& s) {
if (m == false) { // next_ is immutable
auto p = nextSegment();
if (p) {
s.push(p);
}
}
}
}; // Segment }; // Segment
}; // UnboundedQueue }; // UnboundedQueue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment