Commit 4ebfdff3 authored by Dave Watson's avatar Dave Watson Committed by dcsommer

Add MemoryIdler suppot to IOThreadPoolExecutor

Summary:
Idle memory in IO threads.   If loop is unused for a period of time, free associated memory, and call epoll again.

Had to add a new list of callbacks that don't make the loop nonblocking (i.e. using runInLoop() instead would use the nonblocking version of epoll).

Could bake this in to EventBase directly, but that seems like the wrong abstraction, since EventBase doesn't actually control the thread - for example, that approach would also free up memory for stack-allocated EventBases where they are used synchronously by clients.

This diff doesn't change IO scheduling at all - current IO work is round robin, so this probably only helps if the whole server is idle (at least until we add smarter scheduling)

Test Plan:
Based on top of D1585087.

fbconfig thrift/perf/cpp; fbmake dbg
_bin/thrift/perf/cpp/ThriftServer
_bin/thrift/perf/cpp/loadgen -num_threads=100 -weight_sendrecv=1 -cpp2 -async

Ran loadgen for a while, watched res memory in top.  Stopped loadgen.  After ~5 sec, res memory was much reduced.

Reviewed By: jsedgwick@fb.com

Subscribers: trunkagent, doug, fugalh, njormrod, folly-diffs@

FB internal diff: D1641057

Tasks: 5002425
parent 889093b2
...@@ -74,6 +74,31 @@ struct MemoryIdler { ...@@ -74,6 +74,31 @@ struct MemoryIdler {
/// avoid synchronizing their flushes. /// avoid synchronizing their flushes.
static AtomicStruct<std::chrono::steady_clock::duration> defaultIdleTimeout; static AtomicStruct<std::chrono::steady_clock::duration> defaultIdleTimeout;
/// Selects a timeout pseudo-randomly chosen to be between
/// idleTimeout and idleTimeout * (1 + timeoutVariationFraction), to
/// smooth out the behavior in a bursty system
template <typename Clock = std::chrono::steady_clock>
static typename Clock::duration getVariationTimeout(
typename Clock::duration idleTimeout
= defaultIdleTimeout.load(std::memory_order_acquire),
float timeoutVariationFrac = 0.5) {
if (idleTimeout.count() > 0 && timeoutVariationFrac > 0) {
// hash the pthread_t and the time to get the adjustment.
// Standard hash func isn't very good, so bit mix the result
auto pr = std::make_pair(pthread_self(),
Clock::now().time_since_epoch().count());
std::hash<decltype(pr)> hash_fn;
uint64_t h = folly::hash::twang_mix64(hash_fn(pr));
// multiplying the duration by a floating point doesn't work, grr..
auto extraFrac =
timeoutVariationFrac / std::numeric_limits<uint64_t>::max() * h;
uint64_t tics = idleTimeout.count() * (1 + extraFrac);
idleTimeout = typename Clock::duration(tics);
}
return idleTimeout;
}
/// Equivalent to fut.futexWait(expected, waitMask), but calls /// Equivalent to fut.futexWait(expected, waitMask), but calls
/// flushLocalMallocCaches() and unmapUnusedStack(stackToRetain) /// flushLocalMallocCaches() and unmapUnusedStack(stackToRetain)
...@@ -100,26 +125,11 @@ struct MemoryIdler { ...@@ -100,26 +125,11 @@ struct MemoryIdler {
return fut.futexWait(expected, waitMask); return fut.futexWait(expected, waitMask);
} }
idleTimeout = getVariationTimeout(idleTimeout, timeoutVariationFrac);
if (idleTimeout.count() > 0) { if (idleTimeout.count() > 0) {
auto begin = Clock::now();
if (timeoutVariationFrac > 0) {
// hash the pthread_t and the time to get the adjustment.
// Standard hash func isn't very good, so bit mix the result
auto pr = std::make_pair(pthread_self(),
begin.time_since_epoch().count());
std::hash<decltype(pr)> hash_fn;
uint64_t h = folly::hash::twang_mix64(hash_fn(pr));
// multiplying the duration by a floating point doesn't work, grr..
auto extraFrac =
timeoutVariationFrac / std::numeric_limits<uint64_t>::max() * h;
uint64_t tics = idleTimeout.count() * (1 + extraFrac);
idleTimeout = typename Clock::duration(tics);
}
while (true) { while (true) {
auto rv = fut.futexWaitUntil(expected, begin + idleTimeout, waitMask); auto rv = fut.futexWaitUntil(
expected, Clock::now() + idleTimeout, waitMask);
if (rv == FutexResult::TIMEDOUT) { if (rv == FutexResult::TIMEDOUT) {
// timeout is over // timeout is over
break; break;
......
...@@ -20,8 +20,49 @@ ...@@ -20,8 +20,49 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <folly/io/async/EventBaseManager.h> #include <folly/io/async/EventBaseManager.h>
#include <folly/detail/MemoryIdler.h>
namespace folly { namespace wangle { namespace folly { namespace wangle {
using folly::detail::MemoryIdler;
/* Class that will free jemalloc caches and madvise the stack away
* if the event loop is unused for some period of time
*/
class MemoryIdlerTimeout
: public AsyncTimeout , public EventBase::LoopCallback {
public:
explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
virtual void timeoutExpired() noexcept {
idled = true;
}
virtual void runLoopCallback() noexcept {
if (idled) {
MemoryIdler::flushLocalMallocCaches();
MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
idled = false;
} else {
std::chrono::steady_clock::duration idleTimeout =
MemoryIdler::defaultIdleTimeout.load(
std::memory_order_acquire);
idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
idleTimeout).count());
}
// reschedule this callback for the next event loop.
base_->runBeforeLoop(this);
}
private:
EventBase* base_;
bool idled{false};
} ;
IOThreadPoolExecutor::IOThreadPoolExecutor( IOThreadPoolExecutor::IOThreadPoolExecutor(
size_t numThreads, size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory) std::shared_ptr<ThreadFactory> threadFactory)
...@@ -73,6 +114,10 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { ...@@ -73,6 +114,10 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
const auto ioThread = std::static_pointer_cast<IOThread>(thread); const auto ioThread = std::static_pointer_cast<IOThread>(thread);
ioThread->eventBase = ioThread->eventBase =
folly::EventBaseManager::get()->getEventBase(); folly::EventBaseManager::get()->getEventBase();
auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
ioThread->eventBase->runBeforeLoop(idler);
thread->startupBaton.post(); thread->startupBaton.post();
while (ioThread->shouldRun) { while (ioThread->shouldRun) {
ioThread->eventBase->loopForever(); ioThread->eventBase->loopForever();
......
...@@ -185,7 +185,7 @@ EventBase::~EventBase() { ...@@ -185,7 +185,7 @@ EventBase::~EventBase() {
callback->runLoopCallback(); callback->runLoopCallback();
} }
// Delete any unfired CobTimeout objects, so that we don't leak memory // Delete any unfired callback objects, so that we don't leak memory
// (Note that we don't fire them. The caller is responsible for cleaning up // (Note that we don't fire them. The caller is responsible for cleaning up
// its own data structures if it destroys the EventBase with unfired events // its own data structures if it destroys the EventBase with unfired events
// remaining.) // remaining.)
...@@ -194,6 +194,10 @@ EventBase::~EventBase() { ...@@ -194,6 +194,10 @@ EventBase::~EventBase() {
delete timeout; delete timeout;
} }
while (!noWaitLoopCallbacks_.empty()) {
delete &noWaitLoopCallbacks_.front();
}
(void) runLoopCallbacks(false); (void) runLoopCallbacks(false);
// Stop consumer before deleting NotificationQueue // Stop consumer before deleting NotificationQueue
...@@ -274,10 +278,20 @@ bool EventBase::loopBody(int flags) { ...@@ -274,10 +278,20 @@ bool EventBase::loopBody(int flags) {
// nobody can add loop callbacks from within this thread if // nobody can add loop callbacks from within this thread if
// we don't have to handle anything to start with... // we don't have to handle anything to start with...
if (blocking && loopCallbacks_.empty()) { if (blocking && loopCallbacks_.empty()) {
LoopCallbackList callbacks;
callbacks.swap(noWaitLoopCallbacks_);
while(!callbacks.empty()) {
auto* item = &callbacks.front();
callbacks.pop_front();
item->runLoopCallback();
}
res = event_base_loop(evb_, EVLOOP_ONCE); res = event_base_loop(evb_, EVLOOP_ONCE);
} else { } else {
res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK); res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
} }
ranLoopCallbacks = runLoopCallbacks(); ranLoopCallbacks = runLoopCallbacks();
int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>( int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
...@@ -458,6 +472,12 @@ void EventBase::runOnDestruction(LoopCallback* callback) { ...@@ -458,6 +472,12 @@ void EventBase::runOnDestruction(LoopCallback* callback) {
onDestructionCallbacks_.push_back(*callback); onDestructionCallbacks_.push_back(*callback);
} }
void EventBase::runBeforeLoop(LoopCallback* callback) {
DCHECK(isInEventBaseThread());
callback->cancelLoopCallback();
noWaitLoopCallbacks_.push_back(*callback);
}
bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) { bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
// Send the message. // Send the message.
// It will be received by the FunctionRunner in the EventBase's thread. // It will be received by the FunctionRunner in the EventBase's thread.
......
...@@ -259,6 +259,8 @@ class EventBase : private boost::noncopyable, public TimeoutManager { ...@@ -259,6 +259,8 @@ class EventBase : private boost::noncopyable, public TimeoutManager {
*/ */
void runOnDestruction(LoopCallback* callback); void runOnDestruction(LoopCallback* callback);
void runBeforeLoop(LoopCallback* callback);
/** /**
* Run the specified function in the EventBase's thread. * Run the specified function in the EventBase's thread.
* *
...@@ -519,6 +521,7 @@ class EventBase : private boost::noncopyable, public TimeoutManager { ...@@ -519,6 +521,7 @@ class EventBase : private boost::noncopyable, public TimeoutManager {
CobTimeout::List pendingCobTimeouts_; CobTimeout::List pendingCobTimeouts_;
LoopCallbackList loopCallbacks_; LoopCallbackList loopCallbacks_;
LoopCallbackList noWaitLoopCallbacks_;
LoopCallbackList onDestructionCallbacks_; LoopCallbackList onDestructionCallbacks_;
// This will be null most of the time, but point to currentCallbacks // This will be null most of the time, but point to currentCallbacks
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment