Commit d7c40bc1 authored by Misha Shneerson's avatar Misha Shneerson Committed by Facebook GitHub Bot

Fix race conditions on shutdown of MeteredExecutor

Summary: Allow scheduling a "poison" task to indicate shutdown

Reviewed By: andriigrynenko

Differential Revision: D25609015

fbshipit-source-id: 8a42b99b402d78db7cfa3bd194730bb6948567c6
parent ed4b5a77
...@@ -44,10 +44,6 @@ void MeteredExecutor::add(Func func) { ...@@ -44,10 +44,6 @@ void MeteredExecutor::add(Func func) {
} }
void MeteredExecutor::loopCallback() { void MeteredExecutor::loopCallback() {
if (UNLIKELY(draining_)) {
return drain();
}
Consumer consumer(*this); Consumer consumer(*this);
if (queue_.drive(consumer) || !queue_.arm()) { if (queue_.drive(consumer) || !queue_.arm()) {
scheduleCallback(); scheduleCallback();
...@@ -57,27 +53,11 @@ void MeteredExecutor::loopCallback() { ...@@ -57,27 +53,11 @@ void MeteredExecutor::loopCallback() {
void MeteredExecutor::scheduleCallback() { void MeteredExecutor::scheduleCallback() {
folly::RequestContextScopeGuard g{std::shared_ptr<RequestContext>()}; folly::RequestContextScopeGuard g{std::shared_ptr<RequestContext>()};
kaInner_->add([this] { loopCallback(); }); kaInner_->add([self = getKeepAliveToken(this)] { self->loopCallback(); });
}
void MeteredExecutor::drain() {
bool keepDriving = true;
while (keepDriving) {
Consumer consumer(*this);
keepDriving = queue_.drive(consumer);
consumer.executeIfNotEmpty();
}
drained_.post();
} }
MeteredExecutor::~MeteredExecutor() { MeteredExecutor::~MeteredExecutor() {
joinKeepAlive(); joinKeepAlive();
// Shutdown sequence deserves explanation.
// After below task is consumed, consumer transitions into "draining"
// state and a loopcallback is rescheduled. Once loopback is executed again,
// it drains the queue and signals the baton.
add([&] { draining_ = true; });
drained_.wait();
} }
MeteredExecutor::Consumer::~Consumer() { MeteredExecutor::Consumer::~Consumer() {
......
...@@ -54,7 +54,6 @@ class MeteredExecutor : public DefaultKeepAliveExecutor { ...@@ -54,7 +54,6 @@ class MeteredExecutor : public DefaultKeepAliveExecutor {
private: private:
void loopCallback(); void loopCallback();
void scheduleCallback(); void scheduleCallback();
void drain();
class Consumer { class Consumer {
Func first_; Func first_;
...@@ -70,8 +69,6 @@ class MeteredExecutor : public DefaultKeepAliveExecutor { ...@@ -70,8 +69,6 @@ class MeteredExecutor : public DefaultKeepAliveExecutor {
folly::AtomicNotificationQueue<Func> queue_; folly::AtomicNotificationQueue<Func> queue_;
std::unique_ptr<Executor> ownedExecutor_; std::unique_ptr<Executor> ownedExecutor_;
KeepAlive kaInner_; KeepAlive kaInner_;
bool draining_{false};
folly::Baton<> drained_;
}; };
} // namespace folly } // namespace folly
...@@ -14,8 +14,10 @@ ...@@ -14,8 +14,10 @@
* limitations under the License. * limitations under the License.
*/ */
#include <folly/executors/MeteredExecutor.h> #include <list>
#include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/MeteredExecutor.h>
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
...@@ -24,10 +26,12 @@ using namespace folly; ...@@ -24,10 +26,12 @@ using namespace folly;
class MeteredExecutorTest : public testing::Test { class MeteredExecutorTest : public testing::Test {
protected: protected:
void createAdapter(int numLevels, int maxReadAtOnce = 1) { void createAdapter(
int numLevels,
int maxReadAtOnce = 1,
std::unique_ptr<Executor> exc =
std::make_unique<CPUThreadPoolExecutor>(1)) {
executors_.resize(numLevels + 1); executors_.resize(numLevels + 1);
std::unique_ptr<folly::Executor> exc =
std::make_unique<CPUThreadPoolExecutor>(1);
executors_[0] = exc.get(); executors_[0] = exc.get();
for (int i = 0; i < numLevels; i++) { for (int i = 0; i < numLevels; i++) {
auto mlsa = std::make_unique<MeteredExecutor>(std::move(exc)); auto mlsa = std::make_unique<MeteredExecutor>(std::move(exc));
...@@ -252,6 +256,76 @@ TEST_F(MeteredExecutorTest, ResetJoins) { ...@@ -252,6 +256,76 @@ TEST_F(MeteredExecutorTest, ResetJoins) {
EXPECT_EQ(v, 5); EXPECT_EQ(v, 5);
} }
TEST_F(MeteredExecutorTest, ConcurrentShutdown) {
// ensure no data races on shutdown when executor has 2 threads
createAdapter(2, 1, std::make_unique<CPUThreadPoolExecutor>(2));
}
TEST_F(MeteredExecutorTest, CostOfMeteredExecutors) {
// This test is to demonstrate how many tasks are scheduled
// on the primarty executor when it is wrapped by MeteredExecutors
class MyExecutor : public folly::Executor {
public:
int count{0};
bool driveWhenAdded{false};
std::list<folly::Func> queue;
void add(folly::Func f) override {
++count;
queue.push_back(std::move(f));
if (driveWhenAdded) {
drive();
}
}
void drive() {
while (!queue.empty()) {
auto ff = std::move(queue.front());
queue.pop_front();
ff();
}
}
};
auto exc = std::make_unique<MyExecutor>();
auto drive = [exc = exc.get()] { exc->drive(); };
auto getCount = [exc = exc.get()] { return std::exchange(exc->count, 0); };
auto driveOnAdd = [exc = exc.get()] { exc->driveWhenAdded = true; };
createAdapter(3, 1, std::move(exc));
// When queues are empty, we will schedule as many tasks on the main
// executor as there are executors in the chain.
add([&] {}, 0);
drive();
EXPECT_EQ(1, getCount());
add([&] {}, 1);
drive();
EXPECT_EQ(2, getCount());
add([&] {}, 2);
drive();
EXPECT_EQ(3, getCount());
add([&] {}, 3);
drive();
EXPECT_EQ(4, getCount());
add([&] {}, 3);
drive();
EXPECT_EQ(4, getCount());
// However, when queues are not empty, each additional task
// scheduled on any MeteredExecutor, results in a only one more
// task scheduled onto the main executor.
add([&] {}, 3);
add([&] {}, 3);
add([&] {}, 3);
drive();
EXPECT_EQ(6, getCount());
// To allow shutting down properly
driveOnAdd();
}
TEST_F(MeteredExecutorTest, ExceptionHandling) { TEST_F(MeteredExecutorTest, ExceptionHandling) {
createAdapter(2); createAdapter(2);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment