Commit 20808864 authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Add TimedDrivableExecutor to folly.

Summary: Adds a TimedDrivableExecutor implementation of DrivableExecutor that adds a driveUntil method. driveUntil is as drive, except that it takes a timepoint and will stop driving after that time to allow callers to time out more easily.

Reviewed By: yfeldblum

Differential Revision: D6658320

fbshipit-source-id: a75145748e78497ce107ae152f25729547883835
parent f57db697
......@@ -371,6 +371,7 @@ if (BUILD_TESTS)
TEST serial_executor_test SOURCES SerialExecutorTest.cpp
TEST thread_pool_executor_test SOURCES ThreadPoolExecutorTest.cpp
TEST threaded_executor_test SOURCES ThreadedExecutorTest.cpp
TEST timed_drivable_executor_test SOURCES TimedDrivableExecutorTest.cpp
DIRECTORY executors/task_queue/test/
TEST unbounded_blocking_queue_test SOURCES UnboundedBlockingQueueTest.cpp
......
......@@ -103,6 +103,7 @@ nobase_follyinclude_HEADERS = \
executors/SerialExecutor.h \
executors/ThreadPoolExecutor.h \
executors/ThreadedExecutor.h \
executors/TimedDrivableExecutor.h \
executors/task_queue/BlockingQueue.h \
executors/task_queue/LifoSemMPMCQueue.h \
executors/task_queue/PriorityLifoSemMPMCQueue.h \
......@@ -541,6 +542,7 @@ libfolly_la_SOURCES = \
executors/SerialExecutor.cpp \
executors/ThreadPoolExecutor.cpp \
executors/ThreadedExecutor.cpp \
executors/TimedDrivableExecutor.cpp \
executors/QueuedImmediateExecutor.cpp \
experimental/hazptr/hazptr.cpp \
experimental/hazptr/memory_resource.cpp \
......
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/TimedDrivableExecutor.h>
#include <cstring>
#include <ctime>
#include <string>
#include <tuple>
namespace folly {
void TimedDrivableExecutor::add(Func callback) {
queue_.enqueue(std::move(callback));
}
void TimedDrivableExecutor::drive() {
wait();
run();
}
size_t TimedDrivableExecutor::run() {
size_t count = 0;
size_t n = queue_.size();
// If we have waited already, then func_ may have a value
if (func_) {
auto f = std::move(func_);
f();
count = 1;
}
while (count < n && queue_.try_dequeue(func_)) {
auto f = std::move(func_);
f();
++count;
}
return count;
}
size_t TimedDrivableExecutor::drain() {
size_t tasksRun = 0;
size_t tasksForSingleRun = 0;
while ((tasksForSingleRun = run()) != 0) {
tasksRun += tasksForSingleRun;
}
return tasksRun;
}
void TimedDrivableExecutor::wait() {
if (!func_) {
queue_.dequeue(func_);
}
}
} // namespace folly
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <folly/concurrency/UnboundedQueue.h>
#include <folly/executors/DrivableExecutor.h>
namespace folly {
/*
* A DrivableExecutor can be driven via its drive() method or its driveUntil()
* that drives until some time point.
*/
class TimedDrivableExecutor : public DrivableExecutor {
public:
/// Implements DrivableExecutor
void drive() override;
// Make progress if there is work to do and return true. Otherwise return
// false.
bool try_drive() {
return try_wait() && run() > 0;
}
// Make progress on this Executor's work. Acts as drive, except it will only
// wait for a period of timeout for work to be enqueued. If no work is
// enqueued by that point, it will return.
template <typename Rep, typename Period>
bool try_drive_for(const std::chrono::duration<Rep, Period>& timeout) {
return try_wait_for(timeout) && run() > 0;
}
// Make progress on this Executor's work. Acts as drive, except it will only
// wait until deadline for work to be enqueued. If no work is enqueued by
// that point, it will return.
template <typename Clock, typename Duration>
bool try_drive_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
return try_wait_until(deadline) && run() > 0;
}
void add(Func) override;
/// Do work. Returns the number of functions that were executed (maybe 0).
/// Non-blocking, in the sense that we don't wait for work (we can't
/// control whether one of the functions blocks).
/// This is stable, it will not chase an ever-increasing tail of work.
/// This also means, there may be more work available to perform at the
/// moment that this returns.
size_t run();
// Do work until there is no more work to do.
// Returns the number of functions that were executed (maybe 0).
// Unlike run, this method is not stable. It will chase an infinite tail of
// work so should be used with care.
// There will be no work available to perform at the moment that this
// returns.
size_t drain();
/// Wait for work to do.
void wait();
// Return true if there is work to do, false otherwise
bool try_wait() {
return func_ || queue_.try_dequeue(func_);
}
/// Wait for work to do or for a period of timeout, whichever is sooner.
template <typename Rep, typename Period>
bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
return func_ || queue_.try_dequeue_for(func_, timeout);
}
/// Wait for work to do or until deadline passes, whichever is sooner.
template <typename Clock, typename Duration>
bool try_wait_until(
const std::chrono::time_point<Clock, Duration>& deadline) {
return func_ || queue_.try_dequeue_until(func_, deadline);
}
private:
UMPSCQueue<Func, true> queue_;
Func func_;
};
} // namespace folly
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/TimedDrivableExecutor.h>
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
TEST(TimedDrivableExecutor, runIsStable) {
TimedDrivableExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
auto f2 = [&]() {
x.add(f1);
x.add(f1);
};
x.add(f2);
x.run();
EXPECT_EQ(count, 0);
}
TEST(TimedDrivableExecutor, drainIsNotStable) {
TimedDrivableExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
auto f2 = [&]() {
x.add(f1);
x.add(f1);
};
x.add(f2);
x.drain();
EXPECT_EQ(count, 2);
}
TEST(TimedDrivableExecutor, try_drive) {
TimedDrivableExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
x.try_drive();
EXPECT_EQ(count, 0);
x.add(f1);
x.try_drive();
EXPECT_EQ(count, 1);
}
TEST(TimedDrivableExecutor, try_drive_for) {
TimedDrivableExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
x.try_drive_for(std::chrono::milliseconds(100));
EXPECT_EQ(count, 0);
x.add(f1);
x.try_drive_for(std::chrono::milliseconds(100));
EXPECT_EQ(count, 1);
}
TEST(TimedDrivableExecutor, try_drive_until) {
TimedDrivableExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
x.try_drive_until(
std::chrono::system_clock::now() + std::chrono::milliseconds(100));
EXPECT_EQ(count, 0);
x.add(f1);
x.try_drive_until(
std::chrono::system_clock::now() + std::chrono::milliseconds(100));
EXPECT_EQ(count, 1);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment