Commit 3c383a2d authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Add new immutable global CPU executor that is guaranteed to not be changed.

Summary:
The mutable global executor is problematic: it defaults to Inline, and it is constructed off of an error-prone weak_ptr and it returns a shared_ptr. A KeepAlive-based global immutable executor is a cleaner default.

This change adds immutable global executors, and moves the default executors into separate singletons for cleaner interaction between the mutable executor and the underlying default immutable ones, including the default inline global cpu executor.

Reviewed By: andriigrynenko

Differential Revision: D18513433

fbshipit-source-id: 0ad825c34cc7ba935f57ff81adb8cff3bf001a45
parent 7f4492d6
......@@ -20,40 +20,78 @@
#include <folly/Function.h>
#include <folly/SharedMutex.h>
#include <folly/Singleton.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/IOExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/system/HardwareConcurrency.h>
using namespace folly;
namespace {
class GlobalTag {};
// aka InlineExecutor
class DefaultCPUExecutor : public Executor {
public:
FOLLY_NOINLINE void add(Func f) override {
f();
}
};
Singleton<std::shared_ptr<DefaultCPUExecutor>> gDefaultGlobalCPUExecutor([] {
return new std::shared_ptr<DefaultCPUExecutor>(new DefaultCPUExecutor{});
});
Singleton<std::shared_ptr<CPUThreadPoolExecutor>, GlobalTag>
gImmutableGlobalCPUExecutor([] {
return new std::shared_ptr<CPUThreadPoolExecutor>(
new CPUThreadPoolExecutor(
folly::hardware_concurrency(),
std::make_shared<NamedThreadFactory>("GlobalCPUThreadPool")));
});
Singleton<std::shared_ptr<IOThreadPoolExecutor>, GlobalTag>
gImmutableGlobalIOExecutor([] {
return new std::shared_ptr<IOThreadPoolExecutor>(new IOThreadPoolExecutor(
folly::hardware_concurrency(),
std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
});
template <class ExecutorBase>
std::shared_ptr<ExecutorBase> getImmutable();
template <>
std::shared_ptr<Executor> getImmutable() {
if (auto executorPtrPtr = gImmutableGlobalCPUExecutor.try_get()) {
return *executorPtrPtr;
}
return nullptr;
}
template <>
std::shared_ptr<IOExecutor> getImmutable() {
if (auto executorPtrPtr = gImmutableGlobalIOExecutor.try_get()) {
return *executorPtrPtr;
}
return nullptr;
}
template <class ExecutorBase>
class GlobalExecutor {
public:
explicit GlobalExecutor(
Function<std::unique_ptr<ExecutorBase>()> constructDefault)
: constructDefault_(std::move(constructDefault)) {}
Function<std::shared_ptr<ExecutorBase>()> constructDefault)
: getDefault_(std::move(constructDefault)) {}
std::shared_ptr<ExecutorBase> get() {
{
SharedMutex::ReadHolder guard(mutex_);
if (auto executor = executor_.lock()) {
return executor; // Fast path.
}
}
SharedMutex::WriteHolder guard(mutex_);
if (auto executor = executor_.lock()) {
return executor;
}
if (!defaultExecutor_) {
defaultExecutor_ = constructDefault_();
}
return defaultExecutor_;
return getDefault_();
}
void set(std::weak_ptr<ExecutorBase> executor) {
......@@ -61,65 +99,77 @@ class GlobalExecutor {
executor_.swap(executor);
}
// Replace the constructDefault function to use the immutable singleton
// rather than the default singleton
void setFromImmutable() {
SharedMutex::WriteHolder guard(mutex_);
getDefault_ = [] { return getImmutable<ExecutorBase>(); };
executor_ = std::weak_ptr<ExecutorBase>{};
}
private:
SharedMutex mutex_;
std::weak_ptr<ExecutorBase> executor_;
std::shared_ptr<ExecutorBase> defaultExecutor_;
Function<std::unique_ptr<ExecutorBase>()> constructDefault_;
Function<std::shared_ptr<ExecutorBase>()> getDefault_;
};
// aka InlineExecutor
class DefaultCPUExecutor : public Executor {
public:
FOLLY_NOINLINE void add(Func f) override {
f();
}
};
Singleton<GlobalExecutor<Executor>> gGlobalCPUExecutor([] {
LeakySingleton<GlobalExecutor<Executor>> gGlobalCPUExecutor([] {
return new GlobalExecutor<Executor>(
// Default global CPU executor is an InlineExecutor.
[] { return std::make_unique<DefaultCPUExecutor>(); });
[] {
if (auto executorPtrPtr = gDefaultGlobalCPUExecutor.try_get()) {
return *executorPtrPtr;
}
return std::shared_ptr<DefaultCPUExecutor>{};
});
});
Singleton<GlobalExecutor<IOExecutor>> gGlobalIOExecutor([] {
LeakySingleton<GlobalExecutor<IOExecutor>> gGlobalIOExecutor([] {
return new GlobalExecutor<IOExecutor>(
// Default global IO executor is an IOThreadPoolExecutor.
[] {
return std::make_unique<IOThreadPoolExecutor>(
folly::hardware_concurrency(),
std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
});
[] { return getImmutable<IOExecutor>(); });
});
} // namespace
namespace folly {
std::shared_ptr<Executor> getCPUExecutor() {
if (auto singleton = gGlobalCPUExecutor.try_get()) {
return singleton->get();
Executor::KeepAlive<> getGlobalCPUExecutor() {
auto executorPtr = getImmutable<Executor>();
if (!executorPtr) {
throw std::runtime_error("Requested global CPU executor during shutdown.");
}
return nullptr;
return folly::getKeepAliveToken(executorPtr.get());
}
void setCPUExecutor(std::weak_ptr<Executor> executor) {
if (auto singleton = gGlobalCPUExecutor.try_get()) {
singleton->set(std::move(executor));
Executor::KeepAlive<> getGlobalIOExecutor() {
auto executorPtr = getImmutable<IOExecutor>();
if (!executorPtr) {
throw std::runtime_error("Requested global IO executor during shutdown.");
}
return folly::getKeepAliveToken(executorPtr.get());
}
std::shared_ptr<Executor> getCPUExecutor() {
auto& singleton = gGlobalCPUExecutor.get();
return singleton.get();
}
void setCPUExecutorToGlobalCPUExecutor() {
gGlobalCPUExecutor.get().setFromImmutable();
}
void setCPUExecutor(std::weak_ptr<Executor> executor) {
gGlobalCPUExecutor.get().set(std::move(executor));
}
std::shared_ptr<IOExecutor> getIOExecutor() {
if (auto singleton = gGlobalIOExecutor.try_get()) {
return singleton->get();
}
return nullptr;
return gGlobalIOExecutor.get().get();
}
void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
if (auto singleton = gGlobalIOExecutor.try_get()) {
singleton->set(std::move(executor));
}
gGlobalIOExecutor.get().set(std::move(executor));
}
EventBase* getEventBase() {
......
......@@ -23,6 +23,22 @@
namespace folly {
/**
* Return the global executor.
* The global executor is a CPU thread pool and is immutable.
*
* May return an invalid KeepAlive on shutdown.
*/
folly::Executor::KeepAlive<> getGlobalCPUExecutor();
/**
* Return the global IO executor.
* The global executor is an IO thread pool and is immutable.
*
* May return an invalid KeepAlive on shutdown.
*/
folly::Executor::KeepAlive<> getGlobalIOExecutor();
/**
* Retrieve the global Executor. If there is none, a default InlineExecutor
* will be constructed and returned. This is named CPUExecutor to distinguish
......@@ -38,6 +54,12 @@ std::shared_ptr<folly::Executor> getCPUExecutor();
*/
void setCPUExecutor(std::weak_ptr<folly::Executor> executor);
/**
* Set the CPU executor to the immutable default returned by
* getGlobalCPUExecutor.
*/
void setCPUExecutorToGlobalCPUExecutor();
/**
* Retrieve the global IOExecutor. If there is none, a default
* IOThreadPoolExecutor will be constructed and returned.
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Singleton.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/IOExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
TEST(GlobalExecutorAssignmentTest, GlobalCPUExecutorAsImmutable) {
folly::Baton b;
// The default CPU executor is a synchronous inline executor, lets verify
// that work we add is executed
auto count = 0;
auto f = [&]() {
count++;
b.post();
};
{
auto inlineExec = getCPUExecutor();
EXPECT_EQ(
dynamic_cast<folly::CPUThreadPoolExecutor*>(inlineExec.get()), nullptr);
setCPUExecutorToGlobalCPUExecutor();
auto cpuExec = getCPUExecutor();
EXPECT_NE(
dynamic_cast<folly::CPUThreadPoolExecutor*>(cpuExec.get()), nullptr);
// Verify execution
getCPUExecutor()->add(f);
b.wait();
EXPECT_EQ(1, count);
}
}
......@@ -15,29 +15,58 @@
*/
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/IOExecutor.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
TEST(GlobalExecutorTest, GlobalImmutableCPUExecutor) {
folly::Baton<> b;
auto count = 0;
auto f = [&]() {
count++;
b.post();
};
// Don't explode, we should create the default global CPUExecutor lazily here.
getGlobalCPUExecutor()->add(f);
b.wait();
EXPECT_EQ(1, count);
}
TEST(GlobalExecutorTest, GlobalImmutableIOExecutor) {
folly::Baton<> b;
auto count = 0;
auto f = [&]() {
count++;
b.post();
};
// Don't explode, we should create the default global CPUExecutor lazily here.
getGlobalIOExecutor()->add(f);
b.wait();
EXPECT_EQ(1, count);
}
TEST(GlobalExecutorTest, GlobalCPUExecutor) {
class DummyExecutor : public folly::Executor {
public:
void add(Func f) override {
f();
void add(Func fn) override {
fn();
count++;
}
int count{0};
};
// The default CPU executor is a synchronous inline executor, lets verify
// that work we add is executed
auto count = 0;
auto f = [&]() { count++; };
// The default CPU executor is a thread pool
auto f = [&]() {};
// Don't explode, we should create the default global CPUExecutor lazily here.
getCPUExecutor()->add(f);
EXPECT_EQ(1, count);
{
auto dummy = std::make_shared<DummyExecutor>();
......@@ -45,13 +74,11 @@ TEST(GlobalExecutorTest, GlobalCPUExecutor) {
getCPUExecutor()->add(f);
// Make sure we were properly installed.
EXPECT_EQ(1, dummy->count);
EXPECT_EQ(2, count);
}
// Don't explode, we should restore the default global CPUExecutor because our
// weak reference to dummy has expired
getCPUExecutor()->add(f);
EXPECT_EQ(3, count);
}
TEST(GlobalExecutorTest, GlobalIOExecutor) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment