Commit 2f06c17f authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

Dead Code: PriorityMPMCQueue

Summary: [Folly] Dead Code: `PriorityMPMCQueue`.

Reviewed By: LeeHowes

Differential Revision: D17718372

fbshipit-source-id: c79c376abadf7149b94d343cd6705f030fd71666
parent be4a3034
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <glog/logging.h>
#include <algorithm>
#include <vector>
#include <folly/MPMCQueue.h>
namespace folly {
/// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
/// by managing multiple underlying MPMCQueues. As of now, this does
/// not implement a blocking interface. For the purposes of this
/// class, lower number is higher priority
template <
typename T,
template <typename> class Atom = std::atomic,
bool Dynamic = false>
class PriorityMPMCQueue {
public:
PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
CHECK_GT(numPriorities, 0);
queues_.reserve(numPriorities);
for (size_t i = 0; i < numPriorities; i++) {
queues_.emplace_back(capacity);
}
}
size_t getNumPriorities() {
return queues_.size();
}
// Add at medium priority by default
bool write(T&& item) {
return writeWithPriority(std::move(item), getNumPriorities() / 2);
}
bool writeWithPriority(T&& item, size_t priority) {
size_t queue = std::min(getNumPriorities() - 1, priority);
CHECK_LT(queue, queues_.size());
return queues_.at(queue).write(std::move(item));
}
bool writeWithPriority(
T&& item,
size_t priority,
std::chrono::milliseconds timeout) {
size_t queue = std::min(getNumPriorities() - 1, priority);
CHECK_LT(queue, queues_.size());
return queues_.at(queue).tryWriteUntil(
std::chrono::steady_clock::now() + timeout, std::move(item));
}
bool read(T& item) {
for (auto& q : queues_) {
if (q.readIfNotEmpty(item)) {
return true;
}
}
return false;
}
bool readWithPriority(T& item, size_t priority) {
return queues_[priority].readIfNotEmpty(item);
}
size_t size() const {
size_t total_size = 0;
for (auto& q : queues_) {
// MPMCQueue can have a negative size if there are pending readers.
// Since we don't expose a blocking interface this shouldn't happen,
// But just in case we put a floor at 0
total_size += std::max<ssize_t>(0, q.size());
}
return total_size;
}
size_t sizeGuess() const {
size_t total_size = 0;
for (auto& q : queues_) {
// MPMCQueue can have a negative size if there are pending readers.
// Since we don't expose a blocking interface this shouldn't happen,
// But just in case we put a floor at 0
total_size += std::max<ssize_t>(0, q.sizeGuess());
}
return total_size;
}
/// Returns true if there are no items available for dequeue
bool isEmpty() const {
return size() == 0;
}
private:
std::vector<folly::MPMCQueue<T, Atom, Dynamic>> queues_;
};
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/PriorityMPMCQueue.h>
#include <folly/portability/GTest.h>
using namespace folly;
TEST(PriorityMPMCQueue, BasicOps) {
// With just one priority, this should behave like a normal MPMCQueue
PriorityMPMCQueue<size_t> queue(1, 10);
EXPECT_TRUE(queue.isEmpty());
EXPECT_EQ(1, queue.getNumPriorities());
queue.write(9);
queue.write(8);
EXPECT_FALSE(queue.isEmpty());
EXPECT_EQ(2, queue.size());
EXPECT_EQ(2, queue.sizeGuess());
size_t item;
queue.read(item);
EXPECT_EQ(9, item);
EXPECT_FALSE(queue.isEmpty());
EXPECT_EQ(1, queue.size());
EXPECT_EQ(1, queue.sizeGuess());
queue.read(item);
EXPECT_EQ(8, item);
EXPECT_TRUE(queue.isEmpty());
EXPECT_EQ(0, queue.size());
EXPECT_EQ(0, queue.sizeGuess());
}
TEST(PriorityMPMCQueue, TestPriorities) {
PriorityMPMCQueue<size_t> queue(3, 10);
EXPECT_TRUE(queue.isEmpty());
EXPECT_EQ(3, queue.getNumPriorities());
// This should go to the lowpri queue, as we only
// have 3 priorities
queue.writeWithPriority(5, 50);
// unqualified writes should be mid-pri
queue.write(3);
queue.writeWithPriority(6, 2);
queue.writeWithPriority(1, 0);
queue.write(4);
queue.writeWithPriority(2, 0);
EXPECT_FALSE(queue.isEmpty());
EXPECT_EQ(6, queue.size());
EXPECT_EQ(6, queue.sizeGuess());
size_t item;
for (int i = 1; i <= 6; i++) {
queue.read(item);
EXPECT_EQ(i, item);
EXPECT_EQ(6 - i, queue.size());
EXPECT_EQ(6 - i, queue.sizeGuess());
}
}
TEST(PriorityMPMCQueue, TestReadWithPriority) {
PriorityMPMCQueue<size_t> queue(3, 10);
EXPECT_TRUE(queue.isEmpty());
EXPECT_EQ(3, queue.getNumPriorities());
queue.writeWithPriority(2, 2);
queue.writeWithPriority(1, 1);
queue.writeWithPriority(0, 0);
EXPECT_FALSE(queue.isEmpty());
EXPECT_EQ(3, queue.size());
EXPECT_EQ(3, queue.sizeGuess());
size_t item;
for (int i = 0; i < 3; i++) {
EXPECT_TRUE(queue.readWithPriority(item, i));
EXPECT_EQ(i, item);
EXPECT_FALSE(queue.readWithPriority(item, i));
}
}
TEST(PriorityMPMCQueue, TestWriteWithPriorityAndTimeout) {
PriorityMPMCQueue<size_t> queue(5, 1);
EXPECT_TRUE(queue.isEmpty());
EXPECT_EQ(5, queue.getNumPriorities());
const auto timeout = std::chrono::milliseconds{30};
for (int i = 0; i < 5; i++) {
auto time_before = std::chrono::steady_clock::now();
EXPECT_TRUE(queue.writeWithPriority(i, i, timeout));
auto time_after = std::chrono::steady_clock::now();
EXPECT_LE(time_after - time_before, timeout);
}
// check writeWithPriority will wait for at least timeout if the queue is
// full.
auto time_before = std::chrono::steady_clock::now();
EXPECT_FALSE(queue.writeWithPriority(5, 0, timeout));
auto time_after = std::chrono::steady_clock::now();
EXPECT_GE(time_after - time_before, timeout);
EXPECT_FALSE(queue.isEmpty());
EXPECT_EQ(5, queue.size());
EXPECT_EQ(5, queue.sizeGuess());
size_t item;
for (int i = 0; i < 5; i++) {
queue.read(item);
EXPECT_EQ(i, item);
EXPECT_EQ(4 - i, queue.size());
EXPECT_EQ(4 - i, queue.sizeGuess());
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment