Commit 12342f02 authored by Tingzhe Zhou's avatar Tingzhe Zhou Committed by Facebook Github Bot

FlatCombiningPriorityQueue: Add interface

Summary:
Make the interface consistent with the other queue algorithms.
push
try_push
try_push_for
try_push_until
pop
try_pop
try_pop_for
try_pop_until
peek
try_peek
try_peek_for
try_peek_until

Reviewed By: yfeldblum, djwatson

Differential Revision: D8262170

fbshipit-source-id: 0299e947a0069eacafd1315c920ec6e1b36675f3
parent 2825daef
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <folly/Optional.h>
#include <folly/detail/Futex.h> #include <folly/detail/Futex.h>
#include <folly/experimental/flat_combining/FlatCombining.h> #include <folly/experimental/flat_combining/FlatCombining.h>
#include <glog/logging.h> #include <glog/logging.h>
...@@ -53,15 +54,15 @@ namespace folly { ...@@ -53,15 +54,15 @@ namespace folly {
/// CHECK(pq.empty()); /// CHECK(pq.empty());
/// CHECK(pq.size() == 0); /// CHECK(pq.size() == 0);
/// int v; /// int v;
/// CHECK(!tryPop(v)); /// CHECK(!try_pop(v));
/// CHECK(!tryPop(v, now() + seconds(1))); /// CHECK(!try_pop_until(v, now() + seconds(1)));
/// CHECK(!tryPeek(v)); /// CHECK(!try_peek(v));
/// CHECK(!tryPeek(v, now() + seconds(1))); /// CHECK(!try_peek_until(v, now() + seconds(1)));
/// pq.push(10); /// pq.push(10);
/// CHECK(!pq.empty()); /// CHECK(!pq.empty());
/// CHECK(pq.size() == 1); /// CHECK(pq.size() == 1);
/// CHECK(!pq.tryPush(20)); /// CHECK(!pq.try_push(20));
/// CHECK(!pq.tryPush(20), now() + seconds(1))); /// CHECK(!pq.try_push_until(20), now() + seconds(1)));
/// peek(v); /// peek(v);
/// CHECK_EQ(v, 10); /// CHECK_EQ(v, 10);
/// CHECK(pq.size() == 1); /// CHECK(pq.size() == 1);
...@@ -121,51 +122,125 @@ class FlatCombiningPriorityQueue ...@@ -121,51 +122,125 @@ class FlatCombiningPriorityQueue
/// provided or until the provided time_point is reached. If /// provided or until the provided time_point is reached. If
/// successful, inserts the provided item in the priority queue /// successful, inserts the provided item in the priority queue
/// according to its priority. /// according to its priority.
template <class Clock = std::chrono::steady_clock> bool try_push(const T& val) {
bool tryPush( return try_push_impl(
const T& val, val, std::chrono::time_point<std::chrono::steady_clock>::min());
const std::chrono::time_point<Clock>& when = }
std::chrono::time_point<Clock>::min());
/// Non-blocking pop. Succeeds if the priority queue is /// Non-blocking pop. Succeeds if the priority queue is
/// nonempty. Tries once if no time point is provided or until the /// nonempty. Tries once if no time point is provided or until the
/// provided time_point is reached. If successful, copies the /// provided time_point is reached. If successful, copies the
/// highest priority item and removes it from the priority queue. /// highest priority item and removes it from the priority queue.
template <class Clock = std::chrono::steady_clock> bool try_pop(T& val) {
bool tryPop( return try_pop_impl(
T& val, val, std::chrono::time_point<std::chrono::steady_clock>::min());
const std::chrono::time_point<Clock>& when = }
std::chrono::time_point<Clock>::min());
/// Non-blocking peek. Succeeds if the priority queue is /// Non-blocking peek. Succeeds if the priority queue is
/// nonempty. Tries once if no time point is provided or until the /// nonempty. Tries once if no time point is provided or until the
/// provided time_point is reached. If successful, copies the /// provided time_point is reached. If successful, copies the
/// highest priority item without removing it. /// highest priority item without removing it.
template <class Clock = std::chrono::steady_clock> bool try_peek(T& val) {
bool tryPeek( return try_peek_impl(
T& val, val, std::chrono::time_point<std::chrono::steady_clock>::min());
const std::chrono::time_point<Clock>& when = }
std::chrono::time_point<Clock>::min());
/// Blocking push. Inserts the provided item in the priority /// Blocking push. Inserts the provided item in the priority
/// queue. If it is full, this function blocks until there is space /// queue. If it is full, this function blocks until there is space
/// for the new item. /// for the new item.
void push(const T& val) { void push(const T& val) {
tryPush(val, std::chrono::time_point<std::chrono::steady_clock>::max()); try_push_impl(
val, std::chrono::time_point<std::chrono::steady_clock>::max());
} }
/// Blocking pop. Copies the highest priority item and removes /// Blocking pop. Copies the highest priority item and removes
/// it. If the priority queue is empty, this function blocks until /// it. If the priority queue is empty, this function blocks until
/// it is nonempty. /// it is nonempty.
void pop(T& val) { void pop(T& val) {
tryPop(val, std::chrono::time_point<std::chrono::steady_clock>::max()); try_pop_impl(
val, std::chrono::time_point<std::chrono::steady_clock>::max());
} }
/// Blocking peek. Copies the highest priority item without /// Blocking peek. Copies the highest priority item without
/// removing it. If the priority queue is empty, this function /// removing it. If the priority queue is empty, this function
/// blocks until it is nonempty. /// blocks until it is nonempty.
void peek(T& val) { void peek(T& val) {
tryPeek(val, std::chrono::time_point<std::chrono::steady_clock>::max()); try_peek_impl(
val, std::chrono::time_point<std::chrono::steady_clock>::max());
}
folly::Optional<T> try_pop() {
T val;
if (try_pop(val)) {
return std::move(val);
}
return folly::none;
}
folly::Optional<T> try_peek() {
T val;
if (try_peek(val)) {
return std::move(val);
}
return folly::none;
}
template <typename Rep, typename Period>
folly::Optional<T> try_pop_for(
const std::chrono::duration<Rep, Period>& timeout) {
T val;
if (try_pop(val) ||
try_pop_impl(val, std::chrono::steady_clock::now() + timeout)) {
return std::move(val);
}
return folly::none;
}
template <typename Rep, typename Period>
bool try_push_for(
const T& val,
const std::chrono::duration<Rep, Period>& timeout) {
return (
try_push(val) ||
try_push_impl(val, std::chrono::steady_clock::now() + timeout));
}
template <typename Rep, typename Period>
folly::Optional<T> try_peek_for(
const std::chrono::duration<Rep, Period>& timeout) {
T val;
if (try_peek(val) ||
try_peek_impl(val, std::chrono::steady_clock::now() + timeout)) {
return std::move(val);
}
return folly::none;
}
template <typename Clock, typename Duration>
folly::Optional<T> try_pop_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
T val;
if (try_pop_impl(val, deadline)) {
return std::move(val);
}
return folly::none;
}
template <typename Clock, typename Duration>
bool try_push_until(
const T& val,
const std::chrono::time_point<Clock, Duration>& deadline) {
return try_push_impl(val, deadline);
}
template <typename Clock, typename Duration>
folly::Optional<T> try_peek_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
T val;
if (try_peek_impl(val, deadline)) {
return std::move(val);
}
return folly::none;
} }
private: private:
...@@ -190,6 +265,21 @@ class FlatCombiningPriorityQueue ...@@ -190,6 +265,21 @@ class FlatCombiningPriorityQueue
return false; return false;
} }
} }
template <typename Clock, typename Duration>
bool try_push_impl(
const T& val,
const std::chrono::time_point<Clock, Duration>& when);
template <typename Clock, typename Duration>
bool try_pop_impl(
T& val,
const std::chrono::time_point<Clock, Duration>& when);
template <typename Clock, typename Duration>
bool try_peek_impl(
T& val,
const std::chrono::time_point<Clock, Duration>& when);
}; };
/// Implementation /// Implementation
...@@ -199,10 +289,11 @@ template < ...@@ -199,10 +289,11 @@ template <
typename PriorityQueue, typename PriorityQueue,
typename Mutex, typename Mutex,
template <typename> class Atom> template <typename> class Atom>
template <class Clock> template <typename Clock, typename Duration>
inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPush( inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_push_impl(
const T& val, const T& val,
const std::chrono::time_point<Clock>& when) { const std::chrono::time_point<Clock, Duration>& when) {
while (true) { while (true) {
bool res; bool res;
bool wake; bool wake;
...@@ -255,10 +346,11 @@ template < ...@@ -255,10 +346,11 @@ template <
typename PriorityQueue, typename PriorityQueue,
typename Mutex, typename Mutex,
template <typename> class Atom> template <typename> class Atom>
template <class Clock> template <typename Clock, typename Duration>
inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPop( inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_pop_impl(
T& val, T& val,
const std::chrono::time_point<Clock>& when) { const std::chrono::time_point<Clock, Duration>& when) {
while (true) { while (true) {
bool res; bool res;
bool wake; bool wake;
...@@ -300,10 +392,11 @@ template < ...@@ -300,10 +392,11 @@ template <
typename PriorityQueue, typename PriorityQueue,
typename Mutex, typename Mutex,
template <typename> class Atom> template <typename> class Atom>
template <class Clock> template <typename Clock, typename Duration>
inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPeek( inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_peek_impl(
T& val, T& val,
const std::chrono::time_point<Clock>& when) { const std::chrono::time_point<Clock, Duration>& when) {
while (true) { while (true) {
bool res; bool res;
......
...@@ -61,7 +61,7 @@ class BaselinePQ { ...@@ -61,7 +61,7 @@ class BaselinePQ {
return pq_.size(); return pq_.size();
} }
bool tryPush(const T& val) { bool try_push(const T& val) {
std::lock_guard<Mutex> g(m_); std::lock_guard<Mutex> g(m_);
if (maxSize_ > 0 && pq_.size() == maxSize_) { if (maxSize_ > 0 && pq_.size() == maxSize_) {
return false; return false;
...@@ -76,7 +76,7 @@ class BaselinePQ { ...@@ -76,7 +76,7 @@ class BaselinePQ {
} }
} }
bool tryPop(T& val) { bool try_pop(T& val) {
std::lock_guard<Mutex> g(m_); std::lock_guard<Mutex> g(m_);
if (!pq_.empty()) { if (!pq_.empty()) {
val = pq_.top(); val = pq_.top();
...@@ -87,7 +87,7 @@ class BaselinePQ { ...@@ -87,7 +87,7 @@ class BaselinePQ {
return false; return false;
} }
bool tryPeek(T& val) { bool try_peek(T& val) {
std::lock_guard<Mutex> g(m_); std::lock_guard<Mutex> g(m_);
if (!pq_.empty()) { if (!pq_.empty()) {
val = pq_.top(); val = pq_.top();
...@@ -122,7 +122,7 @@ static uint64_t run_once(PriorityQueue& pq, const Func& fn) { ...@@ -122,7 +122,7 @@ static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
std::atomic<uint32_t> started{0}; std::atomic<uint32_t> started{0};
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
CHECK(pq.tryPush(i * (ops / size))); CHECK(pq.try_push(i * (ops / size)));
} }
std::vector<std::thread> threads(nthreads); std::vector<std::thread> threads(nthreads);
...@@ -161,39 +161,53 @@ TEST(FCPriQueue, basic) { ...@@ -161,39 +161,53 @@ TEST(FCPriQueue, basic) {
CHECK(pq.empty()); CHECK(pq.empty());
CHECK_EQ(pq.size(), 0); CHECK_EQ(pq.size(), 0);
int v; int v;
CHECK(!pq.tryPop(v)); CHECK(!pq.try_pop(v));
// try_pop() returns an Optional
EXPECT_FALSE(bool(pq.try_pop()));
CHECK(pq.tryPush(1)); CHECK(pq.try_push(1));
CHECK(pq.tryPush(2)); CHECK(pq.try_push(2));
CHECK(!pq.empty()); CHECK(!pq.empty());
CHECK_EQ(pq.size(), 2); CHECK_EQ(pq.size(), 2);
pq.peek(v); pq.peek(v);
CHECK_EQ(v, 2); // higher value has higher priority CHECK_EQ(v, 2); // higher value has higher priority
CHECK(pq.tryPeek(v)); CHECK(pq.try_peek(v));
CHECK_EQ(v, 2); CHECK_EQ(v, 2);
CHECK(!pq.empty()); CHECK(!pq.empty());
CHECK_EQ(pq.size(), 2); CHECK_EQ(pq.size(), 2);
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
CHECK_EQ(v, 2); CHECK_EQ(v, 2);
CHECK(!pq.empty()); CHECK(!pq.empty());
CHECK_EQ(pq.size(), 1); CHECK_EQ(pq.size(), 1);
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
CHECK_EQ(v, 1); CHECK_EQ(v, 1);
CHECK(pq.empty()); CHECK(pq.empty());
CHECK_EQ(pq.size(), 0); CHECK_EQ(pq.size(), 0);
CHECK(pq.try_push(1));
CHECK(pq.try_push(2));
// check successful try_pop()
EXPECT_EQ(*pq.try_pop(), 2);
CHECK(!pq.empty());
CHECK_EQ(pq.size(), 1);
EXPECT_EQ(*pq.try_pop(), 1);
CHECK(pq.empty());
CHECK_EQ(pq.size(), 0);
} }
TEST(FCPriQueue, bounded) { TEST(FCPriQueue, bounded) {
FCPQ pq(1); FCPQ pq(1);
CHECK(pq.tryPush(1)); CHECK(pq.try_push(1));
CHECK(!pq.tryPush(1)); CHECK(!pq.try_push(1));
CHECK_EQ(pq.size(), 1); CHECK_EQ(pq.size(), 1);
CHECK(!pq.empty()); CHECK(!pq.empty());
int v; int v;
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
CHECK_EQ(v, 1); CHECK_EQ(v, 1);
CHECK_EQ(pq.size(), 0); CHECK_EQ(pq.size(), 0);
CHECK(pq.empty()); CHECK(pq.empty());
...@@ -202,16 +216,32 @@ TEST(FCPriQueue, bounded) { ...@@ -202,16 +216,32 @@ TEST(FCPriQueue, bounded) {
TEST(FCPriQueue, timeout) { TEST(FCPriQueue, timeout) {
FCPQ pq(1); FCPQ pq(1);
int v; int v;
CHECK(!pq.tryPeek( CHECK(!pq.try_peek(v));
v, CHECK(!pq.try_pop(v));
std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
CHECK(!pq.tryPop(
v,
std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
pq.push(10); pq.push(10);
CHECK(!pq.tryPush( CHECK(!pq.try_push(20));
20,
std::chrono::steady_clock::now() + std::chrono::microseconds(1000))); auto dur = std::chrono::microseconds(1000);
EXPECT_EQ(*pq.try_pop(), 10);
CHECK(pq.empty());
// check try_***_for
EXPECT_FALSE(bool(pq.try_pop_for(dur)));
EXPECT_FALSE(bool(pq.try_peek_for(dur)));
CHECK(pq.try_push_for(10, dur));
CHECK(!pq.try_push_for(20, dur));
EXPECT_EQ(*pq.try_peek_for(dur), 10);
EXPECT_EQ(*pq.try_pop_for(dur), 10);
CHECK(pq.empty());
// check try_***_until
EXPECT_FALSE(bool(pq.try_pop_until(std::chrono::steady_clock::now() + dur)));
EXPECT_FALSE(bool(pq.try_peek_until(std::chrono::steady_clock::now() + dur)));
CHECK(pq.try_push_until(10, std::chrono::steady_clock::now() + dur));
CHECK(!pq.try_push_until(20, std::chrono::steady_clock::now() + dur));
EXPECT_EQ(*pq.try_peek_until(std::chrono::steady_clock::now() + dur), 10);
EXPECT_EQ(*pq.try_pop_until(std::chrono::steady_clock::now() + dur), 10);
CHECK(pq.empty());
} }
TEST(FCPriQueue, push_pop) { TEST(FCPriQueue, push_pop) {
...@@ -224,13 +254,13 @@ TEST(FCPriQueue, push_pop) { ...@@ -224,13 +254,13 @@ TEST(FCPriQueue, push_pop) {
FCPQ pq(10000); FCPQ pq(10000);
auto fn = [&](uint32_t tid) { auto fn = [&](uint32_t tid) {
for (int i = tid; i < ops; i += nthreads) { for (int i = tid; i < ops; i += nthreads) {
CHECK(pq.tryPush(i)); CHECK(pq.try_push(i));
CHECK(pq.tryPush(i, when)); CHECK(pq.try_push_until(i, when));
pq.push(i); pq.push(i);
doWork(work); doWork(work);
int v; int v;
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
CHECK(pq.tryPop(v, when)); EXPECT_NE(pq.try_pop_until(when), folly::none);
pq.pop(v); pq.pop(v);
doWork(work); doWork(work);
} }
...@@ -261,10 +291,10 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) { ...@@ -261,10 +291,10 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) {
Baseline pq; Baseline pq;
auto fn = [&](uint32_t tid) { auto fn = [&](uint32_t tid) {
for (int i = tid; i < ops; i += nthreads) { for (int i = tid; i < ops; i += nthreads) {
CHECK(pq.tryPush(i)); CHECK(pq.try_push(i));
doWork(work); doWork(work);
int v; int v;
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
doWork(work); doWork(work);
} }
}; };
...@@ -274,10 +304,10 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) { ...@@ -274,10 +304,10 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) {
FCPQ pq; FCPQ pq;
auto fn = [&](uint32_t tid) { auto fn = [&](uint32_t tid) {
for (int i = tid; i < ops; i += nthreads) { for (int i = tid; i < ops; i += nthreads) {
CHECK(pq.tryPush(i)); CHECK(pq.try_push(i));
doWork(work); doWork(work);
int v; int v;
CHECK(pq.tryPop(v)); CHECK(pq.try_pop(v));
doWork(work); doWork(work);
} }
}; };
...@@ -302,10 +332,9 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) { ...@@ -302,10 +332,9 @@ static uint64_t test(std::string name, Exp exp, uint64_t base) {
std::chrono::steady_clock::time_point when = std::chrono::steady_clock::time_point when =
std::chrono::steady_clock::now() + std::chrono::hours(24); std::chrono::steady_clock::now() + std::chrono::hours(24);
for (int i = tid; i < ops; i += nthreads) { for (int i = tid; i < ops; i += nthreads) {
CHECK(pq.tryPush(i, when)); CHECK(pq.try_push_until(i, when));
doWork(work); doWork(work);
int v; EXPECT_NE(pq.try_pop_until(when), folly::none);
CHECK(pq.tryPop(v, when));
doWork(work); doWork(work);
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment