Commit 142c1e1a authored by Jeffrey Shen's avatar Jeffrey Shen Committed by Facebook Github Bot

Add a way to schedule runs for startDelay modulo interval

Summary:
Currently, scheduling only allows specifying an interval, not the next run time. This makes it impossible to schedule for a given startDelay modulo the interval. This refactors things internally to use a customizable NextRunTimeFunc, and exposes a way to run at a consistent startDelay or pass in a NextRunTimeFunc.

A use case: if we have n FunctionSchedulers schedule with different startDelays but the same interval, and they sync up (e.g. they all get blocked on some IO call, which eventually gets fixed), then they won't unsync or will unsync slowly.

Reviewed By: yfeldblum

Differential Revision: D7651111

fbshipit-source-id: 3c7a49dbfaa073b913107474f039a53a0a03cfa0
parent aaa4b610
......@@ -30,6 +30,26 @@ namespace folly {
namespace {
struct ConsistentDelayFunctor {
const milliseconds constInterval;
explicit ConsistentDelayFunctor(milliseconds interval)
: constInterval(interval) {
if (interval < milliseconds::zero()) {
throw std::invalid_argument(
"FunctionScheduler: "
"time interval must be non-negative");
}
}
steady_clock::time_point operator()(
steady_clock::time_point curNextRunTime,
steady_clock::time_point curTime) const {
auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
return (intervalsPassed + 1) * constInterval + curNextRunTime;
}
};
struct ConstIntervalFunctor {
const milliseconds constInterval;
......@@ -152,6 +172,20 @@ void FunctionScheduler::addFunctionUniformDistribution(
false /*runOnce*/);
}
void FunctionScheduler::addFunctionConsistentDelay(
Function<void()>&& cb,
milliseconds interval,
StringPiece nameID,
milliseconds startDelay) {
addFunctionInternal(
std::move(cb),
ConsistentDelayFunctor(interval),
nameID.str(),
to<std::string>(interval.count(), "ms"),
startDelay,
false /*runOnce*/);
}
void FunctionScheduler::addFunctionGenericDistribution(
Function<void()>&& cb,
IntervalDistributionFunc&& intervalFunc,
......@@ -167,9 +201,25 @@ void FunctionScheduler::addFunctionGenericDistribution(
false /*runOnce*/);
}
void FunctionScheduler::addFunctionInternal(
void FunctionScheduler::addFunctionGenericNextRunTimeFunctor(
Function<void()>&& cb,
IntervalDistributionFunc&& intervalFunc,
NextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
milliseconds startDelay) {
addFunctionInternal(
std::move(cb),
std::move(fn),
nameID,
intervalDescr,
startDelay,
false /*runOnce*/);
}
template <typename RepeatFuncNextRunTimeFunc>
void FunctionScheduler::addFunctionToHeapChecked(
Function<void()>&& cb,
RepeatFuncNextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
milliseconds startDelay,
......@@ -178,9 +228,10 @@ void FunctionScheduler::addFunctionInternal(
throw std::invalid_argument(
"FunctionScheduler: Scheduled function must be set");
}
if (!intervalFunc) {
if (!fn) {
throw std::invalid_argument(
"FunctionScheduler: interval distribution function must be set");
"FunctionScheduler: "
"interval distribution or next run time function must be set");
}
if (startDelay < milliseconds::zero()) {
throw std::invalid_argument(
......@@ -204,13 +255,35 @@ void FunctionScheduler::addFunctionInternal(
l,
std::make_unique<RepeatFunc>(
std::move(cb),
std::move(intervalFunc),
std::forward<RepeatFuncNextRunTimeFunc>(fn),
nameID,
intervalDescr,
startDelay,
runOnce));
}
void FunctionScheduler::addFunctionInternal(
Function<void()>&& cb,
NextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
milliseconds startDelay,
bool runOnce) {
return addFunctionToHeapChecked(
std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
}
void FunctionScheduler::addFunctionInternal(
Function<void()>&& cb,
IntervalDistributionFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
milliseconds startDelay,
bool runOnce) {
return addFunctionToHeapChecked(
std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
}
bool FunctionScheduler::cancelFunctionWithLock(
std::unique_lock<std::mutex>& lock,
StringPiece nameID) {
......
......@@ -133,11 +133,31 @@ class FunctionScheduler {
StringPiece nameID,
std::chrono::milliseconds startDelay);
/**
* Add a new function to the FunctionScheduler whose start times are attempted
* to be scheduled so that they are congruent modulo the interval.
* Note: The scheduling of the next run time happens right before the function
* invocation, so the first time a function takes more time than the interval,
* it will be reinvoked immediately.
*/
void addFunctionConsistentDelay(
Function<void()>&& cb,
std::chrono::milliseconds interval,
StringPiece nameID = StringPiece(),
std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
/**
* A type alias for function that is called to determine the time
* interval for the next scheduled run.
*/
using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
/**
* A type alias for function that returns the next run time, given the current
* run time and the current start time.
*/
using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point(
std::chrono::steady_clock::time_point,
std::chrono::steady_clock::time_point)>;
/**
* Add a new function to the FunctionScheduler. The scheduling interval
......@@ -155,6 +175,18 @@ class FunctionScheduler {
const std::string& intervalDescr,
std::chrono::milliseconds startDelay);
/**
* Like addFunctionGenericDistribution, adds a new function to the
* FunctionScheduler, but the next run time is determined directly by the
* given functor, rather than by adding an interval.
*/
void addFunctionGenericNextRunTimeFunctor(
Function<void()>&& cb,
NextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
std::chrono::milliseconds startDelay);
/**
* Cancels the function with the specified name, so it will no longer be run.
*
......@@ -203,7 +235,7 @@ class FunctionScheduler {
private:
struct RepeatFunc {
Function<void()> cb;
IntervalDistributionFunc intervalFunc;
NextRunTimeFunc nextRunTimeFunc;
std::chrono::steady_clock::time_point nextRunTime;
std::string name;
std::chrono::milliseconds startDelay;
......@@ -217,21 +249,47 @@ class FunctionScheduler {
const std::string& intervalDistDescription,
std::chrono::milliseconds delay,
bool once)
: RepeatFunc(
std::move(cback),
getNextRunTimeFunc(std::move(intervalFn)),
nameID,
intervalDistDescription,
delay,
once) {}
RepeatFunc(
Function<void()>&& cback,
NextRunTimeFunc&& nextRunTimeFn,
const std::string& nameID,
const std::string& intervalDistDescription,
std::chrono::milliseconds delay,
bool once)
: cb(std::move(cback)),
intervalFunc(std::move(intervalFn)),
nextRunTimeFunc(std::move(nextRunTimeFn)),
nextRunTime(),
name(nameID),
startDelay(delay),
intervalDescr(intervalDistDescription),
runOnce(once) {}
static NextRunTimeFunc getNextRunTimeFunc(
IntervalDistributionFunc&& intervalFn) {
return [intervalFn = std::move(intervalFn)](
std::chrono::steady_clock::time_point /* curNextRunTime */,
std::chrono::steady_clock::time_point curTime) mutable {
return curTime + intervalFn();
};
}
std::chrono::steady_clock::time_point getNextRunTime() const {
return nextRunTime;
}
void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
nextRunTime = curTime + intervalFunc();
nextRunTime = nextRunTimeFunc(nextRunTime, curTime);
}
void setNextRunTimeSteady() {
nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime);
}
void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
nextRunTime = curTime + startDelay;
}
......@@ -259,9 +317,25 @@ class FunctionScheduler {
void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
std::unique_ptr<RepeatFunc> func);
template <typename RepeatFuncNextRunTimeFunc>
void addFunctionToHeapChecked(
Function<void()>&& cb,
RepeatFuncNextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
std::chrono::milliseconds startDelay,
bool runOnce);
void addFunctionInternal(
Function<void()>&& cb,
IntervalDistributionFunc&& intervalFunc,
NextRunTimeFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
std::chrono::milliseconds startDelay,
bool runOnce);
void addFunctionInternal(
Function<void()>&& cb,
IntervalDistributionFunc&& fn,
const std::string& nameID,
const std::string& intervalDescr,
std::chrono::milliseconds startDelay,
......
......@@ -35,6 +35,7 @@ using std::atomic;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
namespace {
......@@ -465,6 +466,41 @@ TEST(FunctionScheduler, UniformDistribution) {
EXPECT_EQ(6, total);
}
TEST(FunctionScheduler, ConsistentDelay) {
std::atomic<int> ticks(0);
FunctionScheduler fs;
std::atomic<long long> epoch(0);
epoch = duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
.count();
// We should have runs at t = 0, 600, 800, 1200, or 4 total.
// If at const interval, it would be t = 0, 600, 1000, or 3 total.
fs.addFunctionConsistentDelay(
[&ticks, &epoch] {
auto now =
duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
.count();
int t = ++ticks;
if (t != 2) {
// Sensitive to delays above 100ms.
EXPECT_NEAR((now - epoch) - (t - 1) * 400, 0, 100);
}
if (t == 1) {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(600));
}
},
milliseconds(400),
"ConsistentDelay");
fs.start();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(1300));
EXPECT_EQ(ticks.load(), 4);
}
TEST(FunctionScheduler, ExponentialBackoff) {
atomic<int> total{0};
atomic<int> expectedInterval{0};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment