Commit cfc602df authored by Giuseppe Ottaviano's avatar Giuseppe Ottaviano Committed by Facebook Github Bot

Make global executors shutdown-safe

Summary:
The `get*Executor()` APIs don't check whether the singletons
haven't been destroyed already. Add a check and allow to return
`nullptr` during shutdown.

Also do a general clean up of the code, there was no reason to use
three independent singletons (non-atomically destroyed) for each
executor.

Reviewed By: philippv, luciang

Differential Revision: D6589486

fbshipit-source-id: 20fb835db7e446bd811bbd6d5ddbc41db9e98b54
parent 3975849d
......@@ -14,6 +14,11 @@
* limitations under the License.
*/
#include <memory>
#include <thread>
#include <folly/Function.h>
#include <folly/SharedMutex.h>
#include <folly/Singleton.h>
#include <folly/executors/IOExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
......@@ -23,90 +28,93 @@ using namespace folly;
namespace {
// lock protecting global CPU executor
struct CPUExecutorLock {};
Singleton<RWSpinLock, CPUExecutorLock> globalCPUExecutorLock;
// global CPU executor
Singleton<std::weak_ptr<Executor>> globalCPUExecutor;
// default global CPU executor is an InlineExecutor
Singleton<std::shared_ptr<InlineExecutor>> globalInlineExecutor([] {
return new std::shared_ptr<InlineExecutor>(
std::make_shared<InlineExecutor>());
});
// lock protecting global IO executor
struct IOExecutorLock {};
Singleton<RWSpinLock, IOExecutorLock> globalIOExecutorLock;
// global IO executor
Singleton<std::weak_ptr<IOExecutor>> globalIOExecutor;
// default global IO executor is an IOThreadPoolExecutor
Singleton<std::shared_ptr<IOThreadPoolExecutor>> globalIOThreadPool([] {
return new std::shared_ptr<IOThreadPoolExecutor>(
std::make_shared<IOThreadPoolExecutor>(
sysconf(_SC_NPROCESSORS_ONLN),
std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
});
} // namespace
namespace folly {
template <class ExecutorBase>
class GlobalExecutor {
public:
explicit GlobalExecutor(
Function<std::unique_ptr<ExecutorBase>()> constructDefault)
: constructDefault_(std::move(constructDefault)) {}
std::shared_ptr<ExecutorBase> get() {
{
SharedMutex::ReadHolder guard(mutex_);
if (auto executor = executor_.lock()) {
return executor; // Fast path.
}
}
template <class Exe, class DefaultExe, class LockTag>
std::shared_ptr<Exe> getExecutor(
Singleton<std::weak_ptr<Exe>>& sExecutor,
Singleton<std::shared_ptr<DefaultExe>>& sDefaultExecutor,
Singleton<RWSpinLock, LockTag>& sExecutorLock) {
std::shared_ptr<Exe> executor;
auto singleton = sExecutor.try_get();
auto lock = sExecutorLock.try_get();
{
RWSpinLock::ReadHolder guard(lock.get());
if ((executor = sExecutor.try_get()->lock())) {
SharedMutex::WriteHolder guard(mutex_);
if (auto executor = executor_.lock()) {
return executor;
}
if (!defaultExecutor_) {
defaultExecutor_ = constructDefault_();
}
return defaultExecutor_;
}
RWSpinLock::WriteHolder guard(lock.get());
executor = singleton->lock();
if (!executor) {
std::weak_ptr<Exe> defaultExecutor = *sDefaultExecutor.try_get().get();
executor = defaultExecutor.lock();
sExecutor.try_get().get()->swap(defaultExecutor);
void set(std::weak_ptr<ExecutorBase> executor) {
SharedMutex::WriteHolder guard(mutex_);
executor_.swap(executor);
}
return executor;
}
template <class Exe, class LockTag>
void setExecutor(
std::weak_ptr<Exe> executor,
Singleton<std::weak_ptr<Exe>>& sExecutor,
Singleton<RWSpinLock, LockTag>& sExecutorLock) {
auto lock = sExecutorLock.try_get();
RWSpinLock::WriteHolder guard(*lock);
std::weak_ptr<Exe> executor_weak = std::move(executor);
sExecutor.try_get().get()->swap(executor_weak);
}
private:
SharedMutex mutex_;
std::weak_ptr<ExecutorBase> executor_;
std::shared_ptr<ExecutorBase> defaultExecutor_;
Function<std::unique_ptr<ExecutorBase>()> constructDefault_;
};
Singleton<GlobalExecutor<Executor>> gGlobalCPUExecutor([] {
return new GlobalExecutor<Executor>(
// Default global CPU executor is an InlineExecutor.
[] { return std::make_unique<InlineExecutor>(); });
});
Singleton<GlobalExecutor<IOExecutor>> gGlobalIOExecutor([] {
return new GlobalExecutor<IOExecutor>(
// Default global IO executor is an IOThreadPoolExecutor.
[] {
return std::make_unique<IOThreadPoolExecutor>(
std::thread::hardware_concurrency(),
std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
});
});
} // namespace
namespace folly {
std::shared_ptr<Executor> getCPUExecutor() {
return getExecutor(
globalCPUExecutor, globalInlineExecutor, globalCPUExecutorLock);
if (auto singleton = gGlobalCPUExecutor.try_get()) {
return singleton->get();
}
return nullptr;
}
void setCPUExecutor(std::weak_ptr<Executor> executor) {
setExecutor(std::move(executor), globalCPUExecutor, globalCPUExecutorLock);
if (auto singleton = gGlobalCPUExecutor.try_get()) {
singleton->set(std::move(executor));
}
}
std::shared_ptr<IOExecutor> getIOExecutor() {
return getExecutor(
globalIOExecutor, globalIOThreadPool, globalIOExecutorLock);
if (auto singleton = gGlobalIOExecutor.try_get()) {
return singleton->get();
}
return nullptr;
}
EventBase* getEventBase() {
return getIOExecutor()->getEventBase();
void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
if (auto singleton = gGlobalIOExecutor.try_get()) {
singleton->set(std::move(executor));
}
}
void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
setExecutor(std::move(executor), globalIOExecutor, globalIOExecutorLock);
EventBase* getEventBase() {
return getIOExecutor()->getEventBase();
}
} // namespace folly
......@@ -23,27 +23,44 @@
namespace folly {
// Retrieve the global Executor. If there is none, a default InlineExecutor
// will be constructed and returned. This is named CPUExecutor to distinguish
// it from IOExecutor below and to hint that it's intended for CPU-bound tasks.
/**
* Retrieve the global Executor. If there is none, a default InlineExecutor
* will be constructed and returned. This is named CPUExecutor to distinguish
* it from IOExecutor below and to hint that it's intended for CPU-bound tasks.
*
* Can return nullptr on shutdown.
*/
std::shared_ptr<folly::Executor> getCPUExecutor();
// Set an Executor to be the global Executor which will be returned by
// subsequent calls to getCPUExecutor().
/**
* Set an Executor to be the global Executor which will be returned by
* subsequent calls to getCPUExecutor().
*/
void setCPUExecutor(std::weak_ptr<folly::Executor> executor);
// Retrieve the global IOExecutor. If there is none, a default
// IOThreadPoolExecutor will be constructed and returned.
//
// IOExecutors differ from Executors in that they drive and provide access to
// one or more EventBases.
/**
* Retrieve the global IOExecutor. If there is none, a default
* IOThreadPoolExecutor will be constructed and returned.
*
* IOExecutors differ from Executors in that they drive and provide access to
* one or more EventBases.
*
* Can return nullptr on shutdown.
*/
std::shared_ptr<IOExecutor> getIOExecutor();
// Retrieve an event base from the global IOExecutor
folly::EventBase* getEventBase();
// Set an IOExecutor to be the global IOExecutor which will be returned by
// subsequent calls to getIOExecutor().
/**
* Set an IOExecutor to be the global IOExecutor which will be returned by
* subsequent calls to getIOExecutor().
*/
void setIOExecutor(std::weak_ptr<IOExecutor> executor);
/**
* Retrieve an event base from the global IOExecutor
*
* NOTE: This is not shutdown-safe, the returned pointer may be
* invalid during shutdown.
*/
folly::EventBase* getEventBase();
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment