Commit b3ef1edc authored by Qi Zhou's avatar Qi Zhou Committed by Facebook Github Bot

fix folly::FunctionScheduler.cancelFunctionAndWait() hanging issue

Summary:
When
- only one function is scheduled in FunctionScheduler; and
- the function is running while cancelFunctionAndWait() is being called;
FunctionScheduler.cancelFunctionAndWait() will hang forever.  The root cause is that the condition in cancelFunctionAndWait() is incorrect:

```
runningCondvar_.wait(l, [currentFunction, this]() {
  return currentFunction != currentFunction_;
});
```

because currentFunction will not be changed if only one function is in the scheduler.

The fix here is to
- clear currentFunction as nullptr.  This also makes the internal behaviors of cancelFunction() and cancelFunctionAndWait() consistent.
- introduces additional variable to indicate the state of cancelling current function.  After running the function, the background thread will detect cancellation of current function and clear the variable.
- cancelFunctionAndWait() condition variable will wait for the variable to be cleared.

Similarly, cancelAllFunctionsAndWait() also suffers from the same issue.

Unit tests are added to reproduce the issue.

Reviewed By: yfeldblum

Differential Revision: D5271664

fbshipit-source-id: acb223304d3eab23129907ce9ff5e57e55f1e909
parent a5766338
...@@ -213,13 +213,24 @@ void FunctionScheduler::addFunctionInternal( ...@@ -213,13 +213,24 @@ void FunctionScheduler::addFunctionInternal(
runOnce)); runOnce));
} }
bool FunctionScheduler::cancelFunction(StringPiece nameID) { bool FunctionScheduler::cancelFunctionWithLock(
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex>& lock,
StringPiece nameID) {
CHECK_EQ(lock.owns_lock(), true);
if (currentFunction_ && currentFunction_->name == nameID) { if (currentFunction_ && currentFunction_->name == nameID) {
// This function is currently being run. Clear currentFunction_ // This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function. // The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr; currentFunction_ = nullptr;
cancellingCurrentFunction_ = true;
return true;
}
return false;
}
bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (cancelFunctionWithLock(l, nameID)) {
return true; return true;
} }
...@@ -235,11 +246,9 @@ bool FunctionScheduler::cancelFunction(StringPiece nameID) { ...@@ -235,11 +246,9 @@ bool FunctionScheduler::cancelFunction(StringPiece nameID) {
bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) { bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
auto* currentFunction = currentFunction_; if (cancelFunctionWithLock(l, nameID)) {
if (currentFunction && currentFunction->name == nameID) { runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
runningCondvar_.wait(l, [currentFunction, this]() { return true;
return currentFunction != currentFunction_;
});
} }
for (auto it = functions_.begin(); it != functions_.end(); ++it) { for (auto it = functions_.begin(); it != functions_.end(); ++it) {
...@@ -272,18 +281,27 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l, ...@@ -272,18 +281,27 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
} }
} }
void FunctionScheduler::cancelAllFunctions() { bool FunctionScheduler::cancelAllFunctionsWithLock(
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex>& lock) {
CHECK_EQ(lock.owns_lock(), true);
functions_.clear(); functions_.clear();
if (currentFunction_) {
cancellingCurrentFunction_ = true;
}
currentFunction_ = nullptr; currentFunction_ = nullptr;
return cancellingCurrentFunction_;
}
void FunctionScheduler::cancelAllFunctions() {
std::unique_lock<std::mutex> l(mutex_);
cancelAllFunctionsWithLock(l);
} }
void FunctionScheduler::cancelAllFunctionsAndWait() { void FunctionScheduler::cancelAllFunctionsAndWait() {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_) { if (cancelAllFunctionsWithLock(l)) {
runningCondvar_.wait(l, [this]() { return currentFunction_ == nullptr; }); runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
} }
functions_.clear();
} }
bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
...@@ -441,6 +459,7 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock, ...@@ -441,6 +459,7 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
if (!currentFunction_) { if (!currentFunction_) {
// The function was cancelled while we were running it. // The function was cancelled while we were running it.
// We shouldn't reschedule it; // We shouldn't reschedule it;
cancellingCurrentFunction_ = false;
return; return;
} }
if (currentFunction_->runOnce) { if (currentFunction_->runOnce) {
......
...@@ -264,6 +264,12 @@ class FunctionScheduler { ...@@ -264,6 +264,12 @@ class FunctionScheduler {
std::chrono::milliseconds startDelay, std::chrono::milliseconds startDelay,
bool runOnce); bool runOnce);
// Return true if the current function is being canceled
bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
bool cancelFunctionWithLock(
std::unique_lock<std::mutex>& lock,
StringPiece nameID);
std::thread thread_; std::thread thread_;
// Mutex to protect our member variables. // Mutex to protect our member variables.
...@@ -285,6 +291,7 @@ class FunctionScheduler { ...@@ -285,6 +291,7 @@ class FunctionScheduler {
std::string threadName_; std::string threadName_;
bool steady_{false}; bool steady_{false};
bool cancellingCurrentFunction_{false};
}; };
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <cassert> #include <cassert>
#include <random> #include <random>
#include <folly/Baton.h>
#include <folly/Random.h> #include <folly/Random.h>
#include <folly/experimental/FunctionScheduler.h> #include <folly/experimental/FunctionScheduler.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
...@@ -567,3 +568,77 @@ TEST(FunctionScheduler, cancelAllFunctionsAndWait) { ...@@ -567,3 +568,77 @@ TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
fs.shutdown(); fs.shutdown();
} }
TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
folly::Baton<> baton;
std::thread th([&baton]() {
FunctionScheduler fs;
fs.addFunction([] { delay(10); }, testInterval(2), "func");
fs.start();
delay(1);
EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
baton.post();
});
ASSERT_TRUE(baton.timed_wait(testInterval(15)));
th.join();
}
TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
folly::Baton<> baton;
std::thread th([&baton]() {
FunctionScheduler fs;
fs.addFunction([] { delay(10); }, testInterval(2), "func");
fs.start();
delay(1);
fs.cancelAllFunctionsAndWait();
baton.post();
});
ASSERT_TRUE(baton.timed_wait(testInterval(15)));
th.join();
}
TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
folly::Baton<> baton;
std::thread th([&baton]() {
std::atomic<int> nExecuted(0);
FunctionScheduler fs;
fs.addFunction(
[&nExecuted] {
nExecuted++;
delay(10);
},
testInterval(2),
"func0");
fs.addFunction(
[&nExecuted] {
nExecuted++;
delay(10);
},
testInterval(2),
"func1",
testInterval(5));
fs.start();
delay(1);
fs.cancelAllFunctionsAndWait();
EXPECT_EQ(nExecuted, 1);
baton.post();
});
ASSERT_TRUE(baton.timed_wait(testInterval(15)));
th.join();
}
TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
FunctionScheduler fs;
fs.addFunction([] { delay(10); }, testInterval(2), "func");
fs.start();
delay(1);
std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
delay(1);
std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
th1.join();
th2.join();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment