Commit 5a7b4437 authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

Add try_take_for to BlockingQueue

Summary:
add try_take_for to BlockingQueue, utilizing new LifoSem::try_wait_for method.
To be used to kill threads after an idle period in CPUThreadPoolExecutor.

Reviewed By: magedm

Differential Revision: D7477268

fbshipit-source-id: 76ea7fa1fc3b4b5f7a7529ce1424a91eef2ac7b1
parent 59e6e586
......@@ -227,6 +227,24 @@ ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
}
}
folly::Optional<ThreadPoolExecutor::ThreadPtr>
ThreadPoolExecutor::StoppedThreadQueue::try_take_for(
std::chrono::milliseconds time) {
while (true) {
{
std::lock_guard<std::mutex> guard(mutex_);
if (queue_.size() > 0) {
auto item = std::move(queue_.front());
queue_.pop();
return item;
}
}
if (!sem_.try_wait_for(time)) {
return folly::none;
}
}
}
size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
std::lock_guard<std::mutex> guard(mutex_);
return queue_.size();
......
......@@ -237,6 +237,8 @@ class ThreadPoolExecutor : public virtual folly::Executor {
void add(ThreadPtr item) override;
ThreadPtr take() override;
size_t size() override;
folly::Optional<ThreadPtr> try_take_for(
std::chrono::milliseconds /*timeout */) override;
private:
folly::LifoSem sem_;
......
......@@ -21,6 +21,7 @@
#include <glog/logging.h>
#include <folly/CPortability.h>
#include <folly/Optional.h>
namespace folly {
......@@ -45,6 +46,7 @@ class BlockingQueue {
return 1;
}
virtual T take() = 0;
virtual folly::Optional<T> try_take_for(std::chrono::milliseconds time) = 0;
virtual size_t size() = 0;
};
......
......@@ -50,6 +50,16 @@ class LifoSemMPMCQueue : public BlockingQueue<T> {
return item;
}
folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
T item;
while (!queue_.readIfNotEmpty(item)) {
if (!sem_.try_wait_for(time)) {
return folly::none;
}
}
return std::move(item);
}
size_t capacity() {
return queue_.capacity();
}
......
......@@ -85,6 +85,18 @@ class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
}
}
folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
T item;
while (true) {
if (nonBlockingTake(item)) {
return std::move(item);
}
if (!sem_.try_wait_for(time)) {
return folly::none;
}
}
}
bool nonBlockingTake(T& item) {
for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
if (it->readIfNotEmpty(item)) {
......
......@@ -40,6 +40,16 @@ class UnboundedBlockingQueue : public BlockingQueue<T> {
return item;
}
folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
T item;
while (!queue_.try_dequeue(item)) {
if (!sem_.try_wait_for(time)) {
return folly::none;
}
}
return item;
}
size_t size() override {
return queue_.size();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment