Commit 74f36602 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

PriorityUnboundedQueueSet, wrapping UnboundedQueue

Summary: [Folly] `PriorityUnboundedQueueSet`, wrapping `UnboundedQueue`.

Reviewed By: andriigrynenko

Differential Revision: D14719777

fbshipit-source-id: 529ddd8de5879908e36f6fc26e7a091230cb03da
parent 8ea8b06a
...@@ -509,6 +509,8 @@ if (BUILD_TESTS) ...@@ -509,6 +509,8 @@ if (BUILD_TESTS)
TEST concurrent_hash_map_test SOURCES ConcurrentHashMapTest.cpp TEST concurrent_hash_map_test SOURCES ConcurrentHashMapTest.cpp
TEST dynamic_bounded_queue_test WINDOWS_DISABLED TEST dynamic_bounded_queue_test WINDOWS_DISABLED
SOURCES DynamicBoundedQueueTest.cpp SOURCES DynamicBoundedQueueTest.cpp
TEST priority_unbounded_queue_set_test
SOURCES PriorityUnboundedQueueSetTest.cpp
TEST unbounded_queue_test SOURCES UnboundedQueueTest.cpp TEST unbounded_queue_test SOURCES UnboundedQueueTest.cpp
DIRECTORY detail/test/ DIRECTORY detail/test/
......
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <vector>
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/lang/Align.h>
namespace folly {
/// PriorityUnboundedQueueSet
///
/// A set of per-priority queues, and an interface for accessing them.
///
/// Functions:
/// Consumer operations:
/// bool try_dequeue(T&);
/// Optional<T> try_dequeue();
/// Tries to extract an element from the front of the least-priority
/// backing queue which has an element, if any.
/// T const* try_peek();
/// Returns a pointer to the element at the front of the least-priority
/// backing queue which has an element, if any. Only allowed when
/// SingleConsumer is true.
/// Note:
/// Queues at lower priority are tried before queues at higher priority.
///
/// Secondary functions:
/// queue& at_priority(size_t);
/// queue const& at_priority(size_t) const;
/// Returns a reference to the owned queue at the given priority.
/// size_t size() const;
/// Returns an estimate of the total size of the owned queues.
/// bool empty() const;
/// Returns true only if all of the owned queues were empty during the
/// call.
/// Note: size() and empty() are guaranteed to be accurate only if the
/// owned queues are not changed concurrently.
template <
typename T,
bool SingleProducer,
bool SingleConsumer,
bool MayBlock,
size_t LgSegmentSize = 8,
size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
template <typename> class Atom = std::atomic>
class PriorityUnboundedQueueSet {
public:
using queue = UnboundedQueue<
T,
SingleProducer,
SingleConsumer,
MayBlock,
LgSegmentSize,
LgAlign,
Atom>;
explicit PriorityUnboundedQueueSet(size_t priorities) : queues_(priorities) {}
PriorityUnboundedQueueSet(PriorityUnboundedQueueSet const&) = delete;
PriorityUnboundedQueueSet(PriorityUnboundedQueueSet&&) = delete;
PriorityUnboundedQueueSet& operator=(PriorityUnboundedQueueSet const&) =
delete;
PriorityUnboundedQueueSet& operator=(PriorityUnboundedQueueSet&&) = delete;
queue& at_priority(size_t priority) {
return queues_.at(priority);
}
queue const& at_priority(size_t priority) const {
return queues_.at(priority);
}
bool try_dequeue(T& item) noexcept {
for (auto& q : queues_) {
if (q.try_dequeue(item)) {
return true;
}
}
return false;
}
Optional<T> try_dequeue() noexcept {
for (auto& q : queues_) {
if (auto item = q.try_dequeue()) {
return item;
}
}
return none;
}
T const* try_peek() noexcept {
DCHECK(SingleConsumer);
for (auto& q : queues_) {
if (auto ptr = q.try_peek()) {
return ptr;
}
}
return nullptr;
}
size_t size() const noexcept {
size_t size = 0;
for (auto& q : queues_) {
size += q.size();
}
return size;
}
bool empty() const noexcept {
for (auto& q : queues_) {
if (!q.empty()) {
return false;
}
}
return true;
}
size_t priorities() const noexcept {
return queues_.size();
}
private:
std::vector<queue> queues_;
}; // PriorityUnboundedQueueSet
/* Aliases */
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
template <typename> class Atom = std::atomic>
using PriorityUSPSCQueueSet = PriorityUnboundedQueueSet<
T,
true,
true,
MayBlock,
LgSegmentSize,
LgAlign,
Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
template <typename> class Atom = std::atomic>
using PriorityUMPSCQueueSet = PriorityUnboundedQueueSet<
T,
false,
true,
MayBlock,
LgSegmentSize,
LgAlign,
Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
template <typename> class Atom = std::atomic>
using PriorityUSPMCQueueSet = PriorityUnboundedQueueSet<
T,
true,
false,
MayBlock,
LgSegmentSize,
LgAlign,
Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
template <typename> class Atom = std::atomic>
using PriorityUMPMCQueueSet = PriorityUnboundedQueueSet<
T,
false,
false,
MayBlock,
LgSegmentSize,
LgAlign,
Atom>;
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <folly/concurrency/PriorityUnboundedQueueSet.h>
#include <folly/container/Enumerate.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
class PriorityUnboundedQueueSetTest : public testing::Test {};
TEST_F(PriorityUnboundedQueueSetTest, try_dequeue) {
PriorityUSPSCQueueSet<int, false> q(3);
q.at_priority(1).enqueue(42);
EXPECT_EQ(42, q.try_dequeue().value());
}
TEST_F(PriorityUnboundedQueueSetTest, try_peek) {
PriorityUSPSCQueueSet<int, false> q(3);
q.at_priority(1).enqueue(42);
EXPECT_EQ(42, *q.try_peek());
q.at_priority(1).enqueue(42);
EXPECT_EQ(42, q.try_dequeue().value());
}
TEST_F(PriorityUnboundedQueueSetTest, size_empty) {
PriorityUSPSCQueueSet<int, false> q(3);
EXPECT_TRUE(q.empty());
EXPECT_EQ(0, q.size());
q.at_priority(1).enqueue(42);
EXPECT_FALSE(q.empty());
EXPECT_EQ(1, q.size());
EXPECT_EQ(42, *q.try_peek());
EXPECT_FALSE(q.empty());
EXPECT_EQ(1, q.size());
EXPECT_EQ(42, q.try_dequeue().value());
EXPECT_TRUE(q.empty());
EXPECT_EQ(0, q.size());
}
TEST_F(PriorityUnboundedQueueSetTest, priority_order) {
PriorityUSPSCQueueSet<int, false> q(3);
EXPECT_EQ(0, q.size());
q.at_priority(1).enqueue(55);
q.at_priority(2).enqueue(42);
q.at_priority(0).enqueue(12);
q.at_priority(1).enqueue(27);
EXPECT_EQ(4, q.size());
EXPECT_EQ(12, q.try_dequeue().value());
EXPECT_EQ(55, q.try_dequeue().value());
EXPECT_EQ(27, q.try_dequeue().value());
EXPECT_EQ(42, q.try_dequeue().value());
EXPECT_EQ(0, q.size());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment