Commit f96d7622 authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot

UnboundedQueue: Add cleanup of remaining items at destruction.

Summary:
- Add cleanup of remaining items, if any, at destruction.
- Add test.

Reviewed By: davidtgoldblatt

Differential Revision: D8860966

fbshipit-source-id: e3ab2e6ff31e08d91aa20c8c058471823c722a38
parent c5f46707
...@@ -251,11 +251,8 @@ class UnboundedQueue { ...@@ -251,11 +251,8 @@ class UnboundedQueue {
/** destructor */ /** destructor */
~UnboundedQueue() { ~UnboundedQueue() {
Segment* next; cleanUpRemainingItems();
for (Segment* s = head(); s; s = next) { reclaimRemainingSegments();
next = s->nextSegment();
reclaimSegment(s);
}
} }
/** enqueue */ /** enqueue */
...@@ -635,6 +632,34 @@ class UnboundedQueue { ...@@ -635,6 +632,34 @@ class UnboundedQueue {
} }
} }
/** cleanUpRemainingItems */
void cleanUpRemainingItems() {
auto end = producerTicket();
auto s = head();
for (auto t = consumerTicket(); t < end; ++t) {
if (t >= s->minTicket() + SegmentSize) {
s = s->nextSegment();
}
DCHECK_LT(t, (s->minTicket() + SegmentSize));
auto idx = index(t);
auto& e = s->entry(idx);
e.destroyItem();
}
}
/** reclaimRemainingSegments */
void reclaimRemainingSegments() {
auto h = head();
auto s = h->nextSegment();
h->setNextSegment(nullptr);
reclaimSegment(h);
while (s) {
auto next = s->nextSegment();
delete s;
s = next;
}
}
FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept { FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
return (t * Stride) & (SegmentSize - 1); return (t * Stride) & (SegmentSize - 1);
} }
...@@ -751,6 +776,10 @@ class UnboundedQueue { ...@@ -751,6 +776,10 @@ class UnboundedQueue {
return flag_.try_wait_until(deadline, opt); return flag_.try_wait_until(deadline, opt);
} }
FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
itemPtr()->~T();
}
private: private:
FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept { FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
item = std::move(*(itemPtr())); item = std::move(*(itemPtr()));
...@@ -766,10 +795,6 @@ class UnboundedQueue { ...@@ -766,10 +795,6 @@ class UnboundedQueue {
FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
return static_cast<T*>(static_cast<void*>(&item_)); return static_cast<T*>(static_cast<void*>(&item_));
} }
FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
itemPtr()->~T();
}
}; // Entry }; // Entry
/** /**
...@@ -787,6 +812,10 @@ class UnboundedQueue { ...@@ -787,6 +812,10 @@ class UnboundedQueue {
return next_.load(std::memory_order_acquire); return next_.load(std::memory_order_acquire);
} }
void setNextSegment(Segment* next) {
next_.store(next, std::memory_order_relaxed);
}
bool casNextSegment(Segment* next) noexcept { bool casNextSegment(Segment* next) noexcept {
Segment* expected = nullptr; Segment* expected = nullptr;
return next_.compare_exchange_strong( return next_.compare_exchange_strong(
......
...@@ -127,6 +127,33 @@ TEST(UnboundedQueue, peek) { ...@@ -127,6 +127,33 @@ TEST(UnboundedQueue, peek) {
peek_test<UMPSC, true>(); peek_test<UMPSC, true>();
} }
TEST(UnboundedQueue, cleanup_on_destruction) {
struct Foo {
int* p_{nullptr};
explicit Foo(int* p) : p_(p) {}
Foo(Foo&& o) noexcept : p_(std::exchange(o.p_, nullptr)) {}
~Foo() {
if (p_) {
++(*p_);
}
}
Foo& operator=(Foo&& o) noexcept {
p_ = std::exchange(o.p_, nullptr);
return *this;
}
};
int count = 0;
int num = 3;
{
folly::UMPMCQueue<Foo, false> q;
for (int i = 0; i < num; ++i) {
Foo foo(&count);
q.enqueue(std::move(foo));
}
}
EXPECT_EQ(count, num);
}
template <typename ProdFunc, typename ConsFunc, typename EndFunc> template <typename ProdFunc, typename ConsFunc, typename EndFunc>
inline uint64_t run_once( inline uint64_t run_once(
int nprod, int nprod,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment