Commit 3490aa71 authored by James Sedgwick's avatar James Sedgwick Committed by JoelMarcey

global io executor

Summary:
This is something we've talked about for a while. It's also an alternative to the mechanism in D1714645.
If we like it, I'll do something similar for a global cpu executor. That functionality should probably just be
baked into Executor itself instead of a separate subclass, which is why the IOExecutor stuff is in Executor.h/.cpp,
because it'll be pretty similar. The main exception is that for getCPUExecutor() you could return a default global
InlineExecutor instead of exploding as in getIOExecutor()

Test Plan: wangle unit, will start plumbing this into the services in #5003045 if we like it

Reviewed By: davejwatson@fb.com

Subscribers: hannesr, trunkagent, fugalh, alandau, mshneer, folly-diffs@, bmatheny

FB internal diff: D1727894

Tasks: 5002442

Signature: t1:1727894:1418344077:9e54088a6acb3f78e53011a32fd1dfe8b3214c1d
parent 973d5f61
...@@ -82,6 +82,8 @@ nobase_follyinclude_HEADERS = \ ...@@ -82,6 +82,8 @@ nobase_follyinclude_HEADERS = \
experimental/wangle/concurrent/Codel.h \ experimental/wangle/concurrent/Codel.h \
experimental/wangle/concurrent/CPUThreadPoolExecutor.h \ experimental/wangle/concurrent/CPUThreadPoolExecutor.h \
experimental/wangle/concurrent/FutureExecutor.h \ experimental/wangle/concurrent/FutureExecutor.h \
experimental/wangle/concurrent/GlobalExecutor.h \
experimental/wangle/concurrent/IOExecutor.h \
experimental/wangle/concurrent/IOThreadPoolExecutor.h \ experimental/wangle/concurrent/IOThreadPoolExecutor.h \
experimental/wangle/concurrent/LifoSemMPMCQueue.h \ experimental/wangle/concurrent/LifoSemMPMCQueue.h \
experimental/wangle/concurrent/NamedThreadFactory.h \ experimental/wangle/concurrent/NamedThreadFactory.h \
...@@ -328,6 +330,8 @@ libfolly_la_SOURCES = \ ...@@ -328,6 +330,8 @@ libfolly_la_SOURCES = \
experimental/TestUtil.cpp \ experimental/TestUtil.cpp \
experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp \ experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp \
experimental/wangle/concurrent/Codel.cpp \ experimental/wangle/concurrent/Codel.cpp \
experimental/wangle/concurrent/GlobalExecutor.cpp \
experimental/wangle/concurrent/IOExecutor.cpp \
experimental/wangle/concurrent/IOThreadPoolExecutor.cpp \ experimental/wangle/concurrent/IOThreadPoolExecutor.cpp \
experimental/wangle/concurrent/ThreadPoolExecutor.cpp \ experimental/wangle/concurrent/ThreadPoolExecutor.cpp \
experimental/wangle/ConnectionManager.cpp \ experimental/wangle/ConnectionManager.cpp \
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/Singleton.h>
#include <folly/experimental/wangle/concurrent/IOExecutor.h>
#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
using namespace folly;
using namespace folly::wangle;
namespace {
Singleton<IOThreadPoolExecutor> globalIOThreadPoolSingleton(
"GlobalIOThreadPool",
[](){
return new IOThreadPoolExecutor(
sysconf(_SC_NPROCESSORS_ONLN),
std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
});
}
namespace folly { namespace wangle {
IOExecutor* getIOExecutor() {
auto singleton = IOExecutor::getSingleton();
auto executor = singleton->load();
while (!executor) {
IOExecutor* nullIOExecutor = nullptr;
singleton->compare_exchange_strong(
nullIOExecutor,
Singleton<IOThreadPoolExecutor>::get("GlobalIOThreadPool"));
executor = singleton->load();
}
return executor;
}
void setIOExecutor(IOExecutor* executor) {
IOExecutor::getSingleton()->store(executor);
}
}} // folly::wangle
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
class IOExecutor;
// Retrieve the global IOExecutor. If there is none, a default
// IOThreadPoolExecutor will be constructed and returned.
IOExecutor* getIOExecutor();
// Set an IOExecutor to be the global IOExecutor which will be returned by
// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves
// as global when they are destructed.
void setIOExecutor(IOExecutor* executor);
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/wangle/concurrent/IOExecutor.h>
#include <folly/experimental/Singleton.h>
#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
using folly::Singleton;
using folly::wangle::IOExecutor;
namespace {
Singleton<std::atomic<IOExecutor*>> globalIOExecutorSingleton(
"GlobalIOExecutor",
[](){
return new std::atomic<IOExecutor*>(nullptr);
});
}
namespace folly { namespace wangle {
IOExecutor::~IOExecutor() {
auto thisCopy = this;
try {
getSingleton()->compare_exchange_strong(thisCopy, nullptr);
} catch (const std::runtime_error& e) {
// The global IOExecutor singleton was already destructed so doesn't need to
// be restored. Ignore.
}
}
std::atomic<IOExecutor*>* IOExecutor::getSingleton() {
return Singleton<std::atomic<IOExecutor*>>::get("GlobalIOExecutor");
}
}} // folly::wangle
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <folly/Executor.h>
namespace folly {
class EventBase;
}
namespace folly { namespace wangle {
// An IOExecutor is an executor that operates on at least one EventBase. One of
// these EventBases should be accessible via getEventBase(). The event base
// returned by a call to getEventBase() is implementation dependent.
//
// Note that IOExecutors don't necessarily loop on the base themselves - for
// instance, EventBase itself is an IOExecutor but doesn't drive itself.
//
// Implementations of IOExecutor are eligible to become the global IO executor,
// returned on every call to getIOExecutor(), via setIOExecutor().
// These functions are declared in GlobalExecutor.h
//
// If getIOExecutor is called and none has been set, a default global
// IOThreadPoolExecutor will be created and returned.
class IOExecutor : public virtual Executor {
public:
virtual ~IOExecutor();
virtual EventBase* getEventBase() = 0;
private:
static std::atomic<IOExecutor*>* getSingleton();
friend IOExecutor* getIOExecutor();
friend void setIOExecutor(IOExecutor* executor);
};
}}
...@@ -88,8 +88,7 @@ void IOThreadPoolExecutor::add( ...@@ -88,8 +88,7 @@ void IOThreadPoolExecutor::add(
if (threadList_.get().empty()) { if (threadList_.get().empty()) {
throw std::runtime_error("No threads available"); throw std::runtime_error("No threads available");
} }
auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; auto ioThread = pickThread();
auto ioThread = std::static_pointer_cast<IOThread>(thread);
auto moveTask = folly::makeMoveWrapper( auto moveTask = folly::makeMoveWrapper(
Task(std::move(func), expiration, std::move(expireCallback))); Task(std::move(func), expiration, std::move(expireCallback)));
...@@ -105,6 +104,19 @@ void IOThreadPoolExecutor::add( ...@@ -105,6 +104,19 @@ void IOThreadPoolExecutor::add(
} }
} }
std::shared_ptr<IOThreadPoolExecutor::IOThread>
IOThreadPoolExecutor::pickThread() {
if (*thisThread_) {
return *thisThread_;
}
auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
return std::static_pointer_cast<IOThread>(thread);
}
EventBase* IOThreadPoolExecutor::getEventBase() {
return pickThread()->eventBase;
}
std::shared_ptr<ThreadPoolExecutor::Thread> std::shared_ptr<ThreadPoolExecutor::Thread>
IOThreadPoolExecutor::makeThread() { IOThreadPoolExecutor::makeThread() {
return std::make_shared<IOThread>(this); return std::make_shared<IOThread>(this);
...@@ -114,6 +126,7 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { ...@@ -114,6 +126,7 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
const auto ioThread = std::static_pointer_cast<IOThread>(thread); const auto ioThread = std::static_pointer_cast<IOThread>(thread);
ioThread->eventBase = ioThread->eventBase =
folly::EventBaseManager::get()->getEventBase(); folly::EventBaseManager::get()->getEventBase();
thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
auto idler = new MemoryIdlerTimeout(ioThread->eventBase); auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
ioThread->eventBase->runBeforeLoop(idler); ioThread->eventBase->runBeforeLoop(idler);
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
*/ */
#pragma once #pragma once
#include <folly/experimental/wangle/concurrent/IOExecutor.h>
#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h> #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
...@@ -22,7 +24,7 @@ namespace folly { namespace wangle { ...@@ -22,7 +24,7 @@ namespace folly { namespace wangle {
// N.B. For this thread pool, stop() behaves like join() because outstanding // N.B. For this thread pool, stop() behaves like join() because outstanding
// tasks belong to the event base and will be executed upon its destruction. // tasks belong to the event base and will be executed upon its destruction.
class IOThreadPoolExecutor : public ThreadPoolExecutor { class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
public: public:
explicit IOThreadPoolExecutor( explicit IOThreadPoolExecutor(
size_t numThreads, size_t numThreads,
...@@ -37,12 +39,9 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -37,12 +39,9 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
std::chrono::milliseconds expiration, std::chrono::milliseconds expiration,
Func expireCallback = nullptr) override; Func expireCallback = nullptr) override;
private: EventBase* getEventBase() override;
ThreadPtr makeThread() override;
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
uint64_t getPendingTaskCount() override;
private:
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
IOThread(IOThreadPoolExecutor* pool) IOThread(IOThreadPoolExecutor* pool)
: Thread(pool), : Thread(pool),
...@@ -53,7 +52,14 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -53,7 +52,14 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
EventBase* eventBase; EventBase* eventBase;
}; };
ThreadPtr makeThread() override;
std::shared_ptr<IOThread> pickThread();
void threadRun(ThreadPtr thread) override;
void stopThreads(size_t n) override;
uint64_t getPendingTaskCount() override;
size_t nextThread_; size_t nextThread_;
ThreadLocal<std::shared_ptr<IOThread>> thisThread_;
}; };
}} // folly::wangle }} // folly::wangle
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
namespace folly { namespace wangle { namespace folly { namespace wangle {
class ThreadPoolExecutor : public Executor { class ThreadPoolExecutor : public virtual Executor {
public: public:
explicit ThreadPoolExecutor( explicit ThreadPoolExecutor(
size_t numThreads, size_t numThreads,
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
#include <folly/experimental/wangle/concurrent/IOExecutor.h>
using namespace folly::wangle;
TEST(GlobalExecutorTest, GlobalIOExecutor) {
class DummyExecutor : public IOExecutor {
public:
void add(folly::Func f) override {
count++;
}
folly::EventBase* getEventBase() override {
return nullptr;
}
int count{0};
};
auto f = [](){};
// Don't explode, we should create the default global IOExecutor lazily here.
getIOExecutor()->add(f);
{
DummyExecutor dummy;
setIOExecutor(&dummy);
getIOExecutor()->add(f);
// Make sure we were properly installed.
EXPECT_EQ(1, dummy.count);
}
// Don't explode, we should restore the default global IOExecutor when dummy
// is destructed.
getIOExecutor()->add(f);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment