Commit 951a822c authored by Patryk Zaryjewski's avatar Patryk Zaryjewski Committed by Facebook Github Bot

Make cancelling and rescheduling of functions O(1)

Summary: Currently FunctionScheduler calls that cancel/restart timer for a function of particular id are O(n). By introducing hashmap that translate id to pointer of particular RepeatFunc, we make it O(1).

Reviewed By: simpkins

Differential Revision: D5668557

fbshipit-source-id: e5e8bf9bd75b6d5d42f0bfa398d476703e5801fa
parent db9fd491
......@@ -188,15 +188,13 @@ void FunctionScheduler::addFunctionInternal(
}
std::unique_lock<std::mutex> l(mutex_);
auto it = functionsMap_.find(nameID);
// check if the nameID is unique
for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) {
throw std::invalid_argument(
to<std::string>("FunctionScheduler: a function named \"",
nameID,
"\" already exists"));
}
if (it != functionsMap_.end() && it->second->isValid()) {
throw std::invalid_argument(to<std::string>(
"FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
if (currentFunction_ && currentFunction_->name == nameID) {
throw std::invalid_argument(to<std::string>(
"FunctionScheduler: a function named \"", nameID, "\" already exists"));
......@@ -204,7 +202,7 @@ void FunctionScheduler::addFunctionInternal(
addFunctionToHeap(
l,
RepeatFunc(
std::make_unique<RepeatFunc>(
std::move(cb),
std::move(intervalFunc),
nameID,
......@@ -218,6 +216,7 @@ bool FunctionScheduler::cancelFunctionWithLock(
StringPiece nameID) {
CHECK_EQ(lock.owns_lock(), true);
if (currentFunction_ && currentFunction_->name == nameID) {
functionsMap_.erase(currentFunction_->name);
// This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr;
......@@ -229,17 +228,15 @@ bool FunctionScheduler::cancelFunctionWithLock(
bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (cancelFunctionWithLock(l, nameID)) {
return true;
}
for (auto it = functions_.begin(); it != functions_.end(); ++it) {
if (it->isValid() && it->name == nameID) {
cancelFunction(l, it);
auto it = functionsMap_.find(nameID);
if (it != functionsMap_.end() && it->second->isValid()) {
cancelFunction(l, it->second);
return true;
}
}
return false;
}
......@@ -251,40 +248,28 @@ bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
return true;
}
for (auto it = functions_.begin(); it != functions_.end(); ++it) {
if (it->isValid() && it->name == nameID) {
cancelFunction(l, it);
auto it = functionsMap_.find(nameID);
if (it != functionsMap_.end() && it->second->isValid()) {
cancelFunction(l, it->second);
return true;
}
}
return false;
}
void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
FunctionHeap::iterator it) {
RepeatFunc* it) {
// This function should only be called with mutex_ already locked.
DCHECK(l.mutex() == &mutex_);
DCHECK(l.owns_lock());
if (running_) {
// Internally gcc has an __adjust_heap() function to fill in a hole in the
// heap. Unfortunately it isn't part of the standard API.
//
// For now we just leave the RepeatFunc in our heap, but mark it as unused.
// When its nextTimeInterval comes up, the runner thread will pop it from
// the heap and simply throw it away.
functionsMap_.erase(it->name);
it->cancel();
} else {
// We're not running, so functions_ doesn't need to be maintained in heap
// order.
functions_.erase(it);
}
}
bool FunctionScheduler::cancelAllFunctionsWithLock(
std::unique_lock<std::mutex>& lock) {
CHECK_EQ(lock.owns_lock(), true);
functions_.clear();
functionsMap_.clear();
if (currentFunction_) {
cancellingCurrentFunction_ = true;
}
......@@ -307,26 +292,30 @@ void FunctionScheduler::cancelAllFunctionsAndWait() {
bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_ && currentFunction_->name == nameID) {
RepeatFunc* funcPtrCopy = currentFunction_;
// TODO: This moves out of RepeatFunc object while folly:Function can
// potentially be executed. This might be unsafe.
auto funcPtrCopy = std::make_unique<RepeatFunc>(std::move(*currentFunction_));
// This function is currently being run. Clear currentFunction_
// to avoid rescheduling it, and add the function again to honor the
// startDelay.
currentFunction_ = nullptr;
addFunctionToHeap(l, std::move(*funcPtrCopy));
addFunctionToHeap(l, std::move(funcPtrCopy));
return true;
}
// Since __adjust_heap() isn't a part of the standard API, there's no way to
// fix the heap ordering if we adjust the key (nextRunTime) for the existing
// RepeatFunc. Instead, we just cancel it and add an identical object.
for (auto it = functions_.begin(); it != functions_.end(); ++it) {
if (it->isValid() && it->name == nameID) {
RepeatFunc funcCopy(std::move(*it));
cancelFunction(l, it);
auto it = functionsMap_.find(nameID);
if (it != functionsMap_.end() && it->second->isValid()) {
auto funcCopy = std::make_unique<RepeatFunc>(std::move(*(it->second)));
it->second->cancel();
// This will take care of making sure that functionsMap_[it->first] =
// funcCopy.
addFunctionToHeap(l, std::move(funcCopy));
return true;
}
}
return false;
}
......@@ -341,11 +330,11 @@ bool FunctionScheduler::start() {
auto now = steady_clock::now();
// Reset the next run time. for all functions.
// note: this is needed since one can shutdown() and start() again
for (auto& f : functions_) {
f.resetNextRunTime(now);
VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
<< ", period = " << f.intervalDescr
<< ", delay = " << f.startDelay.count() << "ms";
for (const auto& f : functions_) {
f->resetNextRunTime(now);
VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
<< ", period = " << f->intervalDescr
<< ", delay = " << f->startDelay.count() << "ms";
}
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
......@@ -391,12 +380,12 @@ void FunctionScheduler::run() {
// Check to see if the function was cancelled.
// If so, just remove it and continue around the loop.
if (!functions_.back().isValid()) {
if (!functions_.back()->isValid()) {
functions_.pop_back();
continue;
}
auto sleepTime = functions_.back().getNextRunTime() - now;
auto sleepTime = functions_.back()->getNextRunTime() - now;
if (sleepTime < milliseconds::zero()) {
// We need to run this function now
runOneFunction(lock, now);
......@@ -420,25 +409,24 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
// Fully remove it from functions_ now.
// We need to release mutex_ while we invoke this function, and we need to
// maintain the heap property on functions_ while mutex_ is unlocked.
RepeatFunc func(std::move(functions_.back()));
auto func = std::move(functions_.back());
functions_.pop_back();
if (!func.cb) {
VLOG(5) << func.name << "function has been canceled while waiting";
if (!func->cb) {
VLOG(5) << func->name << "function has been canceled while waiting";
return;
}
currentFunction_ = &func;
currentFunction_ = func.get();
// Update the function's next run time.
if (steady_) {
// This allows scheduler to catch up
func.setNextRunTimeSteady();
func->setNextRunTimeSteady();
} else {
// Note that we set nextRunTime based on the current time where we started
// the function call, rather than the time when the function finishes.
// This ensures that we call the function once every time interval, as
// opposed to waiting time interval seconds between calls. (These can be
// different if the function takes a significant amount of time to run.)
func.setNextRunTimeStrict(now);
func->setNextRunTimeStrict(now);
}
// Release the lock while we invoke the user's function
......@@ -446,11 +434,11 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
// Invoke the function
try {
VLOG(5) << "Now running " << func.name;
func.cb();
VLOG(5) << "Now running " << func->name;
func->cb();
} catch (const std::exception& ex) {
LOG(ERROR) << "Error running the scheduled function <"
<< func.name << ">: " << exceptionStr(ex);
<< func->name << ">: " << exceptionStr(ex);
}
// Re-acquire the lock
......@@ -464,17 +452,19 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
}
if (currentFunction_->runOnce) {
// Don't reschedule if the function only needed to run once.
functionsMap_.erase(currentFunction_->name);
currentFunction_ = nullptr;
return;
}
// Clear currentFunction_
CHECK_EQ(currentFunction_, &func);
currentFunction_ = nullptr;
// Re-insert the function into our functions_ heap.
// We only maintain the heap property while running_ is set. (running_ may
// have been cleared while we were invoking the user's function.)
functions_.push_back(std::move(func));
// Clear currentFunction_
currentFunction_ = nullptr;
if (running_) {
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
}
......@@ -482,14 +472,15 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
void FunctionScheduler::addFunctionToHeap(
const std::unique_lock<std::mutex>& lock,
RepeatFunc&& func) {
std::unique_ptr<RepeatFunc> func) {
// This function should only be called with mutex_ already locked.
DCHECK(lock.mutex() == &mutex_);
DCHECK(lock.owns_lock());
functions_.emplace_back(std::move(func));
functions_.push_back(std::move(func));
functionsMap_[functions_.back()->name] = functions_.back().get();
if (running_) {
functions_.back().resetNextRunTime(steady_clock::now());
functions_.back()->resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change
// its current scheduling decision.
......
......@@ -18,11 +18,13 @@
#include <folly/Function.h>
#include <folly/Range.h>
#include <folly/Hash.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
#include <unordered_map>
namespace folly {
......@@ -241,20 +243,21 @@ class FunctionScheduler {
};
struct RunTimeOrder {
bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
return f1.getNextRunTime() > f2.getNextRunTime();
bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
return f1->getNextRunTime() > f2->getNextRunTime();
}
};
typedef std::vector<RepeatFunc> FunctionHeap;
typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
void run();
void runOneFunction(std::unique_lock<std::mutex>& lock,
std::chrono::steady_clock::time_point now);
void cancelFunction(const std::unique_lock<std::mutex>& lock,
FunctionHeap::iterator it);
RepeatFunc* it);
void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
RepeatFunc&& func);
std::unique_ptr<RepeatFunc> func);
void addFunctionInternal(
Function<void()>&& cb,
......@@ -279,6 +282,7 @@ class FunctionScheduler {
// The functions to run.
// This is a heap, ordered by next run time.
FunctionHeap functions_;
FunctionMap functionsMap_;
RunTimeOrder fnCmp_;
// The function currently being invoked by the running thread.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment