Commit 13c58db9 authored by Jimmy Saade's avatar Jimmy Saade Committed by Facebook Github Bot 7

folly::FunctionScheduler: Adding capability to reset a function's timer

Summary:Adding support for resetting a specified function's timer.

"Resetting a function's timer" effectively means "canceling whatever next runs it would have had, and treating it as though it were just added".
When `resetFunctionTimer` is called, the specified function's interval (timer) will be reset, and it will execute after its initially-specified `startDelay`. If the `startDelay` is zero, the function will execute immediately, and then be scheduled as before - once every `interval` milliseconds.

Motivation: batch processing of updates, where both a size and time limit are in play. If the size limit is reached, it makes sense to reset the timer for the scheduled function.

Differential Revision: D3045868

fb-gh-sync-id: a5ceb0069c04a77fdab16b61679987ee55484e89
shipit-source-id: a5ceb0069c04a77fdab16b61679987ee55484e89
parent 1c15b5bb
...@@ -157,7 +157,7 @@ void FunctionScheduler::addFunctionGenericDistribution( ...@@ -157,7 +157,7 @@ void FunctionScheduler::addFunctionGenericDistribution(
"FunctionScheduler: start delay must be non-negative"); "FunctionScheduler: start delay must be non-negative");
} }
std::lock_guard<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
// check if the nameID is unique // check if the nameID is unique
for (const auto& f : functions_) { for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) { if (f.isValid() && f.name == nameID) {
...@@ -172,21 +172,15 @@ void FunctionScheduler::addFunctionGenericDistribution( ...@@ -172,21 +172,15 @@ void FunctionScheduler::addFunctionGenericDistribution(
"FunctionScheduler: a function named \"", nameID, "\" already exists")); "FunctionScheduler: a function named \"", nameID, "\" already exists"));
} }
functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay); addFunctionToHeap(
if (running_) { l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay));
functions_.back().resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change it's
// current scheduling decision.
runningCondvar_.notify_one();
}
} }
bool FunctionScheduler::cancelFunction(StringPiece nameID) { bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_ && currentFunction_->name == nameID) { if (currentFunction_ && currentFunction_->name == nameID) {
// This function is currently being run. Clear currentFunction_ // This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function. // The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr; currentFunction_ = nullptr;
return true; return true;
...@@ -212,7 +206,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l, ...@@ -212,7 +206,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
// heap. Unfortunately it isn't part of the standard API. // heap. Unfortunately it isn't part of the standard API.
// //
// For now we just leave the RepeatFunc in our heap, but mark it as unused. // For now we just leave the RepeatFunc in our heap, but mark it as unused.
// When it's nextTimeInterval comes up, the runner thread will pop it from // When its nextTimeInterval comes up, the runner thread will pop it from
// the heap and simply throw it away. // the heap and simply throw it away.
it->cancel(); it->cancel();
} else { } else {
...@@ -227,6 +221,32 @@ void FunctionScheduler::cancelAllFunctions() { ...@@ -227,6 +221,32 @@ void FunctionScheduler::cancelAllFunctions() {
functions_.clear(); functions_.clear();
} }
bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_ && currentFunction_->name == nameID) {
RepeatFunc* funcPtrCopy = currentFunction_;
// This function is currently being run. Clear currentFunction_
// to avoid rescheduling it, and add the function again to honor the
// startDelay.
currentFunction_ = nullptr;
addFunctionToHeap(l, std::move(*funcPtrCopy));
return true;
}
// Since __adjust_heap() isn't a part of the standard API, there's no way to
// fix the heap ordering if we adjust the key (nextRunTime) for the existing
// RepeatFunc. Instead, we just cancel it and add an identical object.
for (auto it = functions_.begin(); it != functions_.end(); ++it) {
if (it->isValid() && it->name == nameID) {
RepeatFunc funcCopy(std::move(*it));
cancelFunction(l, it);
addFunctionToHeap(l, std::move(funcCopy));
return true;
}
}
return false;
}
bool FunctionScheduler::start() { bool FunctionScheduler::start() {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
if (running_) { if (running_) {
...@@ -369,6 +389,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock, ...@@ -369,6 +389,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
} }
} }
void FunctionScheduler::addFunctionToHeap(
const std::unique_lock<std::mutex>& lock,
RepeatFunc&& func) {
// This function should only be called with mutex_ already locked.
DCHECK(lock.mutex() == &mutex_);
DCHECK(lock.owns_lock());
functions_.emplace_back(std::move(func));
if (running_) {
functions_.back().resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change
// its current scheduling decision.
runningCondvar_.notify_one();
}
}
void FunctionScheduler::setThreadName(StringPiece threadName) { void FunctionScheduler::setThreadName(StringPiece threadName) {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
threadName_ = threadName.str(); threadName_ = threadName.str();
......
...@@ -155,6 +155,17 @@ class FunctionScheduler { ...@@ -155,6 +155,17 @@ class FunctionScheduler {
*/ */
void cancelAllFunctions(); void cancelAllFunctions();
/**
* Resets the specified function's timer.
* When resetFunctionTimer is called, the specified function's timer will
* be reset with the same parameters it was passed initially, including
* its startDelay. If the startDelay was 0, the function will be invoked
* immediately.
*
* Returns false if no function exists with the specified name.
*/
bool resetFunctionTimer(StringPiece nameID);
/** /**
* Starts the scheduler. * Starts the scheduler.
* *
...@@ -225,6 +236,8 @@ class FunctionScheduler { ...@@ -225,6 +236,8 @@ class FunctionScheduler {
std::chrono::steady_clock::time_point now); std::chrono::steady_clock::time_point now);
void cancelFunction(const std::unique_lock<std::mutex>& lock, void cancelFunction(const std::unique_lock<std::mutex>& lock,
FunctionHeap::iterator it); FunctionHeap::iterator it);
void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
RepeatFunc&& func);
std::thread thread_; std::thread thread_;
......
...@@ -211,6 +211,27 @@ TEST(FunctionScheduler, ShutdownStart) { ...@@ -211,6 +211,27 @@ TEST(FunctionScheduler, ShutdownStart) {
EXPECT_EQ(6, total); EXPECT_EQ(6, total);
} }
TEST(FunctionScheduler, ResetFunc) {
int total = 0;
FunctionScheduler fs;
fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
fs.start();
delay(1);
EXPECT_EQ(5, total);
EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
EXPECT_TRUE(fs.resetFunctionTimer("add2"));
delay(1);
// t2: after the reset, add2 should have been invoked immediately
EXPECT_EQ(7, total);
usleep(150000);
// t3.5: add3 should have been invoked. add2 should not
EXPECT_EQ(10, total);
delay(1);
// t4.5: add2 should have been invoked once more (it was reset at t1)
EXPECT_EQ(12, total);
}
TEST(FunctionScheduler, AddInvalid) { TEST(FunctionScheduler, AddInvalid) {
int total = 0; int total = 0;
FunctionScheduler fs; FunctionScheduler fs;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment