Commit 843b72c0 authored by Dave Watson's avatar Dave Watson

Move thrift/lib/cpp/async to folly.

Summary:
Move the minimum amount of stuff and still have everything compile.  Would like to move TAsyncSocket/ServerSocket/SSL/UDP eventually, but not this round.

thrift async is used very widely now: thrift, proxygen, newer mysql async interface, even trying it out in memcache.  A common complaint is that it doesn't get wide enough notice under thrift/, so let's move it to folly/.  After moving TAsyncSocket, both HHVM and proxygen could avoid a dep on thrift as well.

Changes:
* mv files to folly/io/async
* remove 'T' prefix on classes/filenames
* change namespace to 'folly'
* remove any thrift references.

Tried this before in D470080, this time doesn't attempt to abstract libevent

@override-unit-failures

Test Plan:
fbconfig -r thrift; fbmake dev.
Will iterate on any other contbuild failures

Reviewed By: pgriess@fb.com

FB internal diff: D1195393
parent e22f0a67
...@@ -79,6 +79,14 @@ nobase_follyinclude_HEADERS = \ ...@@ -79,6 +79,14 @@ nobase_follyinclude_HEADERS = \
io/RecordIO.h \ io/RecordIO.h \
io/RecordIO-inl.h \ io/RecordIO-inl.h \
io/TypedIOBuf.h \ io/TypedIOBuf.h \
io/async/AsyncTimeout.h \
io/async/EventBase.h \
io/async/EventFDWrapper.h \
io/async/EventHandler.h \
io/async/EventUtil.h \
io/async/NotificationQueue.h \
io/async/Request.h \
io/async/TimeoutManager.h \
json.h \ json.h \
Lazy.h \ Lazy.h \
Likely.h \ Likely.h \
...@@ -175,6 +183,10 @@ libfolly_la_SOURCES = \ ...@@ -175,6 +183,10 @@ libfolly_la_SOURCES = \
io/IOBuf.cpp \ io/IOBuf.cpp \
io/IOBufQueue.cpp \ io/IOBufQueue.cpp \
io/RecordIO.cpp \ io/RecordIO.cpp \
io/async/AsyncTimeout.cpp \
io/async/EventBase.cpp \
io/async/EventHandler.cpp \
io/async/Request.cpp \
json.cpp \ json.cpp \
detail/MemoryIdler.cpp \ detail/MemoryIdler.cpp \
MemoryMapping.cpp \ MemoryMapping.cpp \
......
...@@ -60,6 +60,8 @@ AC_CHECK_HEADER(double-conversion/double-conversion.h, [], [AC_MSG_ERROR( ...@@ -60,6 +60,8 @@ AC_CHECK_HEADER(double-conversion/double-conversion.h, [], [AC_MSG_ERROR(
AC_CHECK_LIB([double-conversion],[ceil],[],[AC_MSG_ERROR( AC_CHECK_LIB([double-conversion],[ceil],[],[AC_MSG_ERROR(
[Please install double-conversion library])]) [Please install double-conversion library])])
AC_CHECK_LIB([event], [event_set], [], [AC_MSG_ERROR([Unable to find libevent])])
# Checks for typedefs, structures, and compiler characteristics. # Checks for typedefs, structures, and compiler characteristics.
AC_HEADER_STDBOOL AC_HEADER_STDBOOL
AC_C_CONST AC_C_CONST
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "folly/io/async/AsyncTimeout.h"
#include "folly/io/async/EventBase.h"
#include "folly/io/async/EventUtil.h"
#include "folly/io/async/Request.h"
#include <assert.h>
#include <glog/logging.h>
namespace folly {
AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager)
: timeoutManager_(timeoutManager) {
event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
event_.ev_base = nullptr;
timeoutManager_->attachTimeoutManager(
this,
TimeoutManager::InternalEnum::NORMAL);
RequestContext::getStaticContext();
}
AsyncTimeout::AsyncTimeout(EventBase* eventBase)
: timeoutManager_(eventBase) {
event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
event_.ev_base = nullptr;
timeoutManager_->attachTimeoutManager(
this,
TimeoutManager::InternalEnum::NORMAL);
RequestContext::getStaticContext();
}
AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager,
InternalEnum internal)
: timeoutManager_(timeoutManager) {
event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
event_.ev_base = nullptr;
timeoutManager_->attachTimeoutManager(this, internal);
RequestContext::getStaticContext();
}
AsyncTimeout::AsyncTimeout(EventBase* eventBase, InternalEnum internal)
: timeoutManager_(eventBase) {
event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
event_.ev_base = nullptr;
timeoutManager_->attachTimeoutManager(this, internal);
RequestContext::getStaticContext();
}
AsyncTimeout::AsyncTimeout(): timeoutManager_(nullptr) {
event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
event_.ev_base = nullptr;
RequestContext::getStaticContext();
}
AsyncTimeout::~AsyncTimeout() {
cancelTimeout();
}
bool AsyncTimeout::scheduleTimeout(std::chrono::milliseconds timeout) {
assert(timeoutManager_ != nullptr);
context_ = RequestContext::saveContext();
return timeoutManager_->scheduleTimeout(this, timeout);
}
bool AsyncTimeout::scheduleTimeout(uint32_t milliseconds) {
return scheduleTimeout(std::chrono::milliseconds(milliseconds));
}
void AsyncTimeout::cancelTimeout() {
if (isScheduled()) {
timeoutManager_->cancelTimeout(this);
}
}
bool AsyncTimeout::isScheduled() const {
return EventUtil::isEventRegistered(&event_);
}
void AsyncTimeout::attachTimeoutManager(
TimeoutManager* timeoutManager,
InternalEnum internal) {
// This also implies no timeout is scheduled.
assert(timeoutManager_ == nullptr);
assert(timeoutManager->isInTimeoutManagerThread());
timeoutManager_ = timeoutManager;
timeoutManager_->attachTimeoutManager(this, internal);
}
void AsyncTimeout::attachEventBase(
EventBase* eventBase,
InternalEnum internal) {
attachTimeoutManager(eventBase, internal);
}
void AsyncTimeout::detachTimeoutManager() {
// Only allow the event base to be changed if the timeout is not
// currently installed.
if (isScheduled()) {
// Programmer bug. Abort the program.
LOG(ERROR) << "detachEventBase() called on scheduled timeout; aborting";
abort();
return;
}
if (timeoutManager_) {
timeoutManager_->detachTimeoutManager(this);
timeoutManager_ = nullptr;
}
}
void AsyncTimeout::detachEventBase() {
detachTimeoutManager();
}
void AsyncTimeout::libeventCallback(int fd, short events, void* arg) {
AsyncTimeout* timeout = reinterpret_cast<AsyncTimeout*>(arg);
assert(fd == -1);
assert(events == EV_TIMEOUT);
// double check that ev_flags gets reset when the timeout is not running
assert((timeout->event_.ev_flags & ~EVLIST_INTERNAL) == EVLIST_INIT);
// this can't possibly fire if timeout->eventBase_ is nullptr
(void) timeout->timeoutManager_->bumpHandlingTime();
auto old_ctx =
RequestContext::setContext(timeout->context_);
timeout->timeoutExpired();
RequestContext::setContext(old_ctx);
}
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include "folly/io/async/TimeoutManager.h"
#include <boost/noncopyable.hpp>
#include <event.h>
#include <memory>
namespace folly {
class EventBase;
class RequestContext;
class TimeoutManager;
/**
* AsyncTimeout is used to asynchronously wait for a timeout to occur.
*/
class AsyncTimeout : private boost::noncopyable {
public:
typedef TimeoutManager::InternalEnum InternalEnum;
/**
* Create a new AsyncTimeout object, driven by the specified TimeoutManager.
*/
explicit AsyncTimeout(TimeoutManager* timeoutManager);
explicit AsyncTimeout(EventBase* eventBase);
/**
* Create a new internal AsyncTimeout object.
*
* Internal timeouts are like regular timeouts, but will not stop the
* TimeoutManager loop from exiting if the only remaining events are internal
* timeouts.
*
* This is useful for implementing fallback timeouts to abort the
* TimeoutManager loop if the other events have not been processed within a
* specified time period: if the event loop takes too long the timeout will
* fire and can stop the event loop. However, if all other events complete,
* the event loop will exit even though the internal timeout is still
* installed.
*/
AsyncTimeout(TimeoutManager* timeoutManager, InternalEnum internal);
AsyncTimeout(EventBase* eventBase, InternalEnum internal);
/**
* Create a new AsyncTimeout object, not yet assigned to a TimeoutManager.
*
* attachEventBase() must be called prior to scheduling the timeout.
*/
AsyncTimeout();
/**
* AsyncTimeout destructor.
*
* The timeout will be automatically cancelled if it is running.
*/
virtual ~AsyncTimeout();
/**
* timeoutExpired() is invoked when the timeout period has expired.
*/
virtual void timeoutExpired() noexcept = 0;
/**
* Schedule the timeout to fire in the specified number of milliseconds.
*
* After the specified number of milliseconds has elapsed, timeoutExpired()
* will be invoked by the TimeoutManager's main loop.
*
* If the timeout is already running, it will be rescheduled with the
* new timeout value.
*
* @param milliseconds The timeout duration, in milliseconds.
*
* @return Returns true if the timeout was successfully scheduled,
* and false if an error occurred. After an error, the timeout is
* always unscheduled, even if scheduleTimeout() was just
* rescheduling an existing timeout.
*/
bool scheduleTimeout(uint32_t milliseconds);
bool scheduleTimeout(std::chrono::milliseconds timeout);
/**
* Cancel the timeout, if it is running.
*/
void cancelTimeout();
/**
* Returns true if the timeout is currently scheduled.
*/
bool isScheduled() const;
/**
* Attach the timeout to a TimeoutManager.
*
* This may only be called if the timeout is not currently attached to a
* TimeoutManager (either by using the default constructor, or by calling
* detachTimeoutManager()).
*
* This method must be invoked in the TimeoutManager's thread.
*
* The internal parameter specifies if this timeout should be treated as an
* internal event. TimeoutManager::loop() will return when there are no more
* non-internal events remaining.
*/
void attachTimeoutManager(TimeoutManager* timeoutManager,
InternalEnum internal = InternalEnum::NORMAL);
void attachEventBase(EventBase* eventBase,
InternalEnum internal = InternalEnum::NORMAL);
/**
* Detach the timeout from its TimeoutManager.
*
* This may only be called when the timeout is not running.
* Once detached, the timeout may not be scheduled again until it is
* re-attached to a EventBase by calling attachEventBase().
*
* This method must be called from the current TimeoutManager's thread.
*/
void detachTimeoutManager();
void detachEventBase();
/**
* Returns the internal handle to the event
*/
struct event* getEvent() {
return &event_;
}
private:
static void libeventCallback(int fd, short events, void* arg);
struct event event_;
/*
* Store a pointer to the TimeoutManager. We only use this
* for some assert() statements, to make sure that AsyncTimeout is always
* used from the correct thread.
*/
TimeoutManager* timeoutManager_;
// Save the request context for when the timeout fires.
std::shared_ptr<RequestContext> context_;
};
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "folly/io/async/EventBase.h"
#include "folly/io/async/NotificationQueue.h"
#include <boost/static_assert.hpp>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
namespace {
using folly::Cob;
using folly::EventBase;
class Tr1FunctionLoopCallback : public EventBase::LoopCallback {
public:
explicit Tr1FunctionLoopCallback(const Cob& function)
: function_(function) {}
virtual void runLoopCallback() noexcept {
function_();
delete this;
}
private:
Cob function_;
};
}
namespace folly {
const int kNoFD = -1;
/*
* EventBase::FunctionRunner
*/
class EventBase::FunctionRunner
: public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
public:
void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
// In libevent2, internal events do not break the loop.
// Most users would expect loop(), followed by runInEventBaseThread(),
// to break the loop and check if it should exit or not.
// To have similar bejaviour to libevent1.4, tell the loop to break here.
// Note that loop() may still continue to loop, but it will also check the
// stop_ flag as well as runInLoop callbacks, etc.
event_base_loopbreak(getEventBase()->evb_);
if (msg.first == nullptr && msg.second == nullptr) {
// terminateLoopSoon() sends a null message just to
// wake up the loop. We can ignore these messages.
return;
}
// If function is nullptr, just log and move on
if (!msg.first) {
LOG(ERROR) << "nullptr callback registered to be run in "
<< "event base thread";
return;
}
// The function should never throw an exception, because we have no
// way of knowing what sort of error handling to perform.
//
// If it does throw, log a message and abort the program.
try {
msg.first(msg.second);
} catch (const std::exception& ex) {
LOG(ERROR) << "runInEventBaseThread() function threw a "
<< typeid(ex).name() << " exception: " << ex.what();
abort();
} catch (...) {
LOG(ERROR) << "runInEventBaseThread() function threw an exception";
abort();
}
}
};
/*
* EventBase::CobTimeout methods
*/
void EventBase::CobTimeout::timeoutExpired() noexcept {
// For now, we just swallow any exceptions that the callback threw.
try {
cob_();
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
<< typeid(ex).name() << " exception: " << ex.what();
} catch (...) {
LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
<< "type";
}
// The CobTimeout object was allocated on the heap by runAfterDelay(),
// so delete it now that the it has fired.
delete this;
}
/*
* EventBase methods
*/
EventBase::EventBase()
: runOnceCallbacks_(nullptr)
, stop_(false)
, loopThread_(0)
, evb_(static_cast<event_base*>(event_init()))
, queue_(nullptr)
, fnRunner_(nullptr)
, maxLatency_(0)
, avgLoopTime_(2000000)
, maxLatencyLoopTime_(avgLoopTime_)
, nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
, latestLoopCnt_(nextLoopCnt_)
, startWork_(0)
, observer_(nullptr)
, observerSampleCount_(0) {
VLOG(5) << "EventBase(): Created.";
initNotificationQueue();
RequestContext::getStaticContext();
}
// takes ownership of the event_base
EventBase::EventBase(event_base* evb)
: runOnceCallbacks_(nullptr)
, stop_(false)
, loopThread_(0)
, evb_(evb)
, queue_(nullptr)
, fnRunner_(nullptr)
, maxLatency_(0)
, avgLoopTime_(2000000)
, maxLatencyLoopTime_(avgLoopTime_)
, nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
, latestLoopCnt_(nextLoopCnt_)
, startWork_(0)
, observer_(nullptr)
, observerSampleCount_(0) {
initNotificationQueue();
RequestContext::getStaticContext();
}
EventBase::~EventBase() {
// Delete any unfired CobTimeout objects, so that we don't leak memory
// (Note that we don't fire them. The caller is responsible for cleaning up
// its own data structures if it destroys the EventBase with unfired events
// remaining.)
while (!pendingCobTimeouts_.empty()) {
CobTimeout* timeout = &pendingCobTimeouts_.front();
delete timeout;
}
(void) runLoopCallbacks(false);
// Stop consumer before deleting NotificationQueue
fnRunner_->stopConsuming();
event_base_free(evb_);
VLOG(5) << "EventBase(): Destroyed.";
}
int EventBase::getNotificationQueueSize() const {
return queue_->size();
}
// Set smoothing coefficient for loop load average; input is # of milliseconds
// for exp(-1) decay.
void EventBase::setLoadAvgMsec(uint32_t ms) {
uint64_t us = 1000 * ms;
if (ms > 0) {
maxLatencyLoopTime_.setTimeInterval(us);
avgLoopTime_.setTimeInterval(us);
} else {
LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
}
}
void EventBase::resetLoadAvg(double value) {
avgLoopTime_.reset(value);
maxLatencyLoopTime_.reset(value);
}
static int64_t getTimeDelta(int64_t *prev) {
int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
int64_t delta = now - *prev;
*prev = now;
return delta;
}
void EventBase::waitUntilRunning() {
while (!isRunning()) {
sched_yield();
}
}
// enters the event_base loop -- will only exit when forced to
bool EventBase::loop() {
VLOG(5) << "EventBase(): Starting loop.";
int res = 0;
bool ranLoopCallbacks;
int nonBlocking;
loopThread_.store(pthread_self(), std::memory_order_release);
#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
if (!name_.empty()) {
pthread_setname_np(pthread_self(), name_.c_str());
}
#endif
int64_t prev = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// TODO: Read stop_ atomically with an acquire barrier.
while (!stop_) {
++nextLoopCnt_;
// nobody can add loop callbacks from within this thread if
// we don't have to handle anything to start with...
nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
ranLoopCallbacks = runLoopCallbacks();
int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
int64_t idle = startWork_ - idleStart;
avgLoopTime_.addSample(idle, busy);
maxLatencyLoopTime_.addSample(idle, busy);
if (observer_) {
if (observerSampleCount_++ == observer_->getSampleRate()) {
observerSampleCount_ = 0;
observer_->loopSample(busy, idle);
}
}
VLOG(11) << "EventBase " << this << " did not timeout "
" loop time guess: " << busy + idle <<
" idle time: " << idle <<
" busy time: " << busy <<
" avgLoopTime: " << avgLoopTime_.get() <<
" maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
" maxLatency_: " << maxLatency_ <<
" nothingHandledYet(): "<< nothingHandledYet();
// see if our average loop time has exceeded our limit
if ((maxLatency_ > 0) &&
(maxLatencyLoopTime_.get() > double(maxLatency_))) {
maxLatencyCob_();
// back off temporarily -- don't keep spamming maxLatencyCob_
// if we're only a bit over the limit
maxLatencyLoopTime_.dampen(0.9);
}
// Our loop run did real work; reset the idle timer
idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// If the event loop indicate that there were no more events, and
// we also didn't have any loop callbacks to run, there is nothing left to
// do.
if (res != 0 && !ranLoopCallbacks) {
// Since Notification Queue is marked 'internal' some events may not have
// run. Run them manually if so, and continue looping.
//
if (getNotificationQueueSize() > 0) {
fnRunner_->handlerReady(0);
} else {
break;
}
}
VLOG(5) << "EventBase " << this << " loop time: " << getTimeDelta(&prev);
}
// Reset stop_ so loop() can be called again
stop_ = false;
if (res < 0) {
LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
return false;
} else if (res == 1) {
VLOG(5) << "EventBase: ran out of events (exiting loop)!";
} else if (res > 1) {
LOG(ERROR) << "EventBase: unknown event loop result = " << res;
return false;
}
loopThread_.store(0, std::memory_order_release);
VLOG(5) << "EventBase(): Done with loop.";
return true;
}
void EventBase::loopForever() {
// Update the notification queue event to treat it as a normal (non-internal)
// event. The notification queue event always remains installed, and the main
// loop won't exit with it installed.
fnRunner_->stopConsuming();
fnRunner_->startConsuming(this, queue_.get());
bool ret = loop();
// Restore the notification queue internal flag
fnRunner_->stopConsuming();
fnRunner_->startConsumingInternal(this, queue_.get());
if (!ret) {
folly::throwSystemError("error in EventBase::loopForever()");
}
}
bool EventBase::bumpHandlingTime() {
VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
" (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
if(nothingHandledYet()) {
latestLoopCnt_ = nextLoopCnt_;
// set the time
startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
" (loop) startWork_ " << startWork_;
return true;
}
return false;
}
void EventBase::terminateLoopSoon() {
VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
if (!isRunning()) {
return;
}
// Set stop to true, so the event loop will know to exit.
// TODO: We should really use an atomic operation here with a release
// barrier.
stop_ = true;
// Call event_base_loopbreak() so that libevent will exit the next time
// around the loop.
event_base_loopbreak(evb_);
// If terminateLoopSoon() is called from another thread,
// the EventBase thread might be stuck waiting for events.
// In this case, it won't wake up and notice that stop_ is set until it
// receives another event. Send an empty frame to the notification queue
// so that the event loop will wake up even if there are no other events.
//
// We don't care about the return value of trySendFrame(). If it fails
// this likely means the EventBase already has lots of events waiting
// anyway.
try {
queue_->putMessage(std::make_pair(nullptr, nullptr));
} catch (...) {
// We don't care if putMessage() fails. This likely means
// the EventBase already has lots of events waiting anyway.
}
}
void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
DCHECK(isInEventBaseThread());
callback->cancelLoopCallback();
callback->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
runOnceCallbacks_->push_back(*callback);
} else {
loopCallbacks_.push_back(*callback);
}
}
void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
DCHECK(isInEventBaseThread());
Tr1FunctionLoopCallback* wrapper = new Tr1FunctionLoopCallback(cob);
wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
runOnceCallbacks_->push_back(*wrapper);
} else {
loopCallbacks_.push_back(*wrapper);
}
}
bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
// Send the message.
// It will be received by the FunctionRunner in the EventBase's thread.
// We try not to schedule nullptr callbacks
if (!fn) {
LOG(ERROR) << "EventBase " << this
<< ": Scheduling nullptr callbacks is not allowed";
return false;
}
// Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) {
runInLoop(new RunInLoopCallback(fn, arg));
return true;
}
try {
queue_->putMessage(std::make_pair(fn, arg));
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
<< fn << "for EventBase thread: " << ex.what();
return false;
}
return true;
}
bool EventBase::runInEventBaseThread(const Cob& fn) {
// Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) {
runInLoop(fn);
return true;
}
Cob* fnCopy;
// Allocate a copy of the function so we can pass it to the other thread
// The other thread will delete this copy once the function has been run
try {
fnCopy = new Cob(fn);
} catch (const std::bad_alloc& ex) {
LOG(ERROR) << "failed to allocate tr::function copy "
<< "for runInEventBaseThread()";
return false;
}
if (!runInEventBaseThread(&EventBase::runTr1FunctionPtr, fnCopy)) {
delete fnCopy;
return false;
}
return true;
}
bool EventBase::runAfterDelay(const Cob& cob,
int milliseconds,
TimeoutManager::InternalEnum in) {
CobTimeout* timeout = new CobTimeout(this, cob, in);
if (!timeout->scheduleTimeout(milliseconds)) {
delete timeout;
return false;
}
pendingCobTimeouts_.push_back(*timeout);
return true;
}
bool EventBase::runLoopCallbacks(bool setContext) {
if (!loopCallbacks_.empty()) {
bumpHandlingTime();
// Swap the loopCallbacks_ list with a temporary list on our stack.
// This way we will only run callbacks scheduled at the time
// runLoopCallbacks() was invoked.
//
// If any of these callbacks in turn call runInLoop() to schedule more
// callbacks, those new callbacks won't be run until the next iteration
// around the event loop. This prevents runInLoop() callbacks from being
// able to start file descriptor and timeout based events.
LoopCallbackList currentCallbacks;
currentCallbacks.swap(loopCallbacks_);
runOnceCallbacks_ = &currentCallbacks;
while (!currentCallbacks.empty()) {
LoopCallback* callback = &currentCallbacks.front();
currentCallbacks.pop_front();
if (setContext) {
RequestContext::setContext(callback->context_);
}
callback->runLoopCallback();
}
runOnceCallbacks_ = nullptr;
return true;
}
return false;
}
void EventBase::initNotificationQueue() {
// Infinite size queue
queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
// We allocate fnRunner_ separately, rather than declaring it directly
// as a member of EventBase solely so that we don't need to include
// NotificationQueue.h from EventBase.h
fnRunner_.reset(new FunctionRunner());
// Mark this as an internal event, so event_base_loop() will return if
// there are no other events besides this one installed.
//
// Most callers don't care about the internal notification queue used by
// EventBase. The queue is always installed, so if we did count the queue as
// an active event, loop() would never exit with no more events to process.
// Users can use loopForever() if they do care about the notification queue.
// (This is useful for EventBase threads that do nothing but process
// runInEventBaseThread() notifications.)
fnRunner_->startConsumingInternal(this, queue_.get());
}
void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
expCoeff_ = -1.0/timeInterval;
VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
}
void EventBase::SmoothLoopTime::reset(double value) {
value_ = value;
}
void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
/*
* Position at which the busy sample is considered to be taken.
* (Allows to quickly skew our average without editing much code)
*/
enum BusySamplePosition {
RIGHT = 0, // busy sample placed at the end of the iteration
CENTER = 1, // busy sample placed at the middle point of the iteration
LEFT = 2, // busy sample placed at the beginning of the iteration
};
VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
" idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
" busy " << busy << " " << __PRETTY_FUNCTION__;
idle += oldBusyLeftover_ + busy;
oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
idle -= oldBusyLeftover_;
double coeff = exp(idle * expCoeff_);
value_ *= coeff;
value_ += (1.0 - coeff) * busy;
}
bool EventBase::nothingHandledYet() {
VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
return (nextLoopCnt_ != latestLoopCnt_);
}
/* static */
void EventBase::runTr1FunctionPtr(Cob* fn) {
// The function should never throw an exception, because we have no
// way of knowing what sort of error handling to perform.
//
// If it does throw, log a message and abort the program.
try {
(*fn)();
} catch (const std::exception &ex) {
LOG(ERROR) << "runInEventBaseThread() std::function threw a "
<< typeid(ex).name() << " exception: " << ex.what();
abort();
} catch (...) {
LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
abort();
}
// The function object was allocated by runInEventBaseThread().
// Delete it once it has been run.
delete fn;
}
EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
: fn_(fn)
, arg_(arg) {}
void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
fn_(arg_);
delete this;
}
void EventBase::attachTimeoutManager(AsyncTimeout* obj,
InternalEnum internal) {
struct event* ev = obj->getEvent();
assert(ev->ev_base == nullptr);
event_base_set(getLibeventBase(), ev);
if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
// Set the EVLIST_INTERNAL flag
ev->ev_flags |= EVLIST_INTERNAL;
}
}
void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
cancelTimeout(obj);
struct event* ev = obj->getEvent();
ev->ev_base = nullptr;
}
bool EventBase::scheduleTimeout(AsyncTimeout* obj,
std::chrono::milliseconds timeout) {
assert(isInEventBaseThread());
// Set up the timeval and add the event
struct timeval tv;
tv.tv_sec = timeout.count() / 1000LL;
tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
struct event* ev = obj->getEvent();
if (event_add(ev, &tv) < 0) {
LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
return false;
}
return true;
}
void EventBase::cancelTimeout(AsyncTimeout* obj) {
assert(isInEventBaseThread());
struct event* ev = obj->getEvent();
if (EventUtil::isEventRegistered(ev)) {
event_del(ev);
}
}
void EventBase::setName(const std::string& name) {
assert(isInEventBaseThread());
name_ = name;
#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
if (isRunning()) {
pthread_setname_np(loopThread_.load(std::memory_order_relaxed),
name_.c_str());
}
#endif
}
const std::string& EventBase::getName() {
assert(isInEventBaseThread());
return name_;
}
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <glog/logging.h>
#include "folly/io/async/AsyncTimeout.h"
#include "folly/io/async/TimeoutManager.h"
#include <memory>
#include <stack>
#include <list>
#include <queue>
#include <cstdlib>
#include <set>
#include <utility>
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
#include <functional>
#include <event.h> // libevent
#include <errno.h>
#include <math.h>
#include <atomic>
namespace folly {
typedef std::function<void()> Cob;
template <typename MessageT>
class NotificationQueue;
class EventBaseObserver {
public:
virtual ~EventBaseObserver() {}
virtual uint32_t getSampleRate() const = 0;
virtual void loopSample(
int64_t busyTime, int64_t idleTime) = 0;
};
/**
* This class is a wrapper for all asynchronous I/O processing functionality
*
* EventBase provides a main loop that notifies EventHandler callback objects
* when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
* when a specified timeout has expired. More complex, higher-level callback
* mechanisms can then be built on top of EventHandler and AsyncTimeout.
*
* A EventBase object can only drive an event loop for a single thread. To
* take advantage of multiple CPU cores, most asynchronous I/O servers have one
* thread per CPU, and use a separate EventBase for each thread.
*
* In general, most EventBase methods may only be called from the thread
* running the EventBase's loop. There are a few exceptions to this rule, for
* methods that are explicitly intended to allow communication with a
* EventBase from other threads. When it is safe to call a method from
* another thread it is explicitly listed in the method comments.
*/
class EventBase : private boost::noncopyable, public TimeoutManager {
public:
/**
* A callback interface to use with runInLoop()
*
* Derive from this class if you need to delay some code execution until the
* next iteration of the event loop. This allows you to schedule code to be
* invoked from the top-level of the loop, after your immediate callers have
* returned.
*
* If a LoopCallback object is destroyed while it is scheduled to be run in
* the next loop iteration, it will automatically be cancelled.
*/
class LoopCallback {
public:
virtual ~LoopCallback() {}
virtual void runLoopCallback() noexcept = 0;
void cancelLoopCallback() {
hook_.unlink();
}
bool isLoopCallbackScheduled() const {
return hook_.is_linked();
}
private:
typedef boost::intrusive::list_member_hook<
boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
ListHook hook_;
typedef boost::intrusive::list<
LoopCallback,
boost::intrusive::member_hook<LoopCallback, ListHook,
&LoopCallback::hook_>,
boost::intrusive::constant_time_size<false> > List;
// EventBase needs access to LoopCallbackList (and therefore to hook_)
friend class EventBase;
std::shared_ptr<RequestContext> context_;
};
/**
* Create a new EventBase object.
*/
EventBase();
/**
* Create a new EventBase object that will use the specified libevent
* event_base object to drive the event loop.
*
* The EventBase will take ownership of this event_base, and will call
* event_base_free(evb) when the EventBase is destroyed.
*/
explicit EventBase(event_base* evb);
~EventBase();
/**
* Runs the event loop.
*
* loop() will loop waiting for I/O or timeouts and invoking EventHandler
* and AsyncTimeout callbacks as their events become ready. loop() will
* only return when there are no more events remaining to process, or after
* terminateLoopSoon() has been called.
*
* loop() may be called again to restart event processing after a previous
* call to loop() or loopForever() has returned.
*
* Returns true if the loop completed normally (if it processed all
* outstanding requests, or if terminateLoopSoon() was called). If an error
* occurs waiting for events, false will be returned.
*/
bool loop();
/**
* Runs the event loop.
*
* loopForever() behaves like loop(), except that it keeps running even if
* when there are no more user-supplied EventHandlers or AsyncTimeouts
* registered. It will only return after terminateLoopSoon() has been
* called.
*
* This is useful for callers that want to wait for other threads to call
* runInEventBaseThread(), even when there are no other scheduled events.
*
* loopForever() may be called again to restart event processing after a
* previous call to loop() or loopForever() has returned.
*
* Throws a std::system_error if an error occurs.
*/
void loopForever();
/**
* Causes the event loop to exit soon.
*
* This will cause an existing call to loop() or loopForever() to stop event
* processing and return, even if there are still events remaining to be
* processed.
*
* It is safe to call terminateLoopSoon() from another thread to cause loop()
* to wake up and return in the EventBase loop thread. terminateLoopSoon()
* may also be called from the loop thread itself (for example, a
* EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
* cause the loop to exit after the callback returns.)
*
* Note that the caller is responsible for ensuring that cleanup of all event
* callbacks occurs properly. Since terminateLoopSoon() causes the loop to
* exit even when there are pending events present, there may be remaining
* callbacks present waiting to be invoked. If the loop is later restarted
* pending events will continue to be processed normally, however if the
* EventBase is destroyed after calling terminateLoopSoon() it is the
* caller's responsibility to ensure that cleanup happens properly even if
* some outstanding events are never processed.
*/
void terminateLoopSoon();
/**
* Adds the given callback to a queue of things run after the current pass
* through the event loop completes. Note that if this callback calls
* runInLoop() the new callback won't be called until the main event loop
* has gone through a cycle.
*
* This method may only be called from the EventBase's thread. This
* essentially allows an event handler to schedule an additional callback to
* be invoked after it returns.
*
* Use runInEventBaseThread() to schedule functions from another thread.
*
* The thisIteration parameter makes this callback run in this loop
* iteration, instead of the next one, even if called from a
* runInLoop callback (normal io callbacks that call runInLoop will
* always run in this iteration). This was originally added to
* support detachEventBase, as a user callback may have called
* terminateLoopSoon(), but we want to make sure we detach. Also,
* detachEventBase almost always must be called from the base event
* loop to ensure the stack is unwound, since most users of
* EventBase are not thread safe.
*
* Ideally we would not need thisIteration, and instead just use
* runInLoop with loop() (instead of terminateLoopSoon).
*/
void runInLoop(LoopCallback* callback, bool thisIteration = false);
/**
* Convenience function to call runInLoop() with a std::function.
*
* This creates a LoopCallback object to wrap the std::function, and invoke
* the std::function when the loop callback fires. This is slightly more
* expensive than defining your own LoopCallback, but more convenient in
* areas that aren't performance sensitive where you just want to use
* std::bind. (std::bind is fairly slow on even by itself.)
*
* This method may only be called from the EventBase's thread. This
* essentially allows an event handler to schedule an additional callback to
* be invoked after it returns.
*
* Use runInEventBaseThread() to schedule functions from another thread.
*/
void runInLoop(const Cob& c, bool thisIteration = false);
/**
* Run the specified function in the EventBase's thread.
*
* This method is thread-safe, and may be called from another thread.
*
* If runInEventBaseThread() is called when the EventBase loop is not
* running, the function call will be delayed until the next time the loop is
* started.
*
* If runInEventBaseThread() returns true the function has successfully been
* scheduled to run in the loop thread. However, if the loop is terminated
* (and never later restarted) before it has a chance to run the requested
* function, the function may never be run at all. The caller is responsible
* for handling this situation correctly if they may terminate the loop with
* outstanding runInEventBaseThread() calls pending.
*
* If two calls to runInEventBaseThread() are made from the same thread, the
* functions will always be run in the order that they were scheduled.
* Ordering between functions scheduled from separate threads is not
* guaranteed.
*
* @param fn The function to run. The function must not throw any
* exceptions.
* @param arg An argument to pass to the function.
*
* @return Returns true if the function was successfully scheduled, or false
* if there was an error scheduling the function.
*/
template<typename T>
bool runInEventBaseThread(void (*fn)(T*), T* arg) {
return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
reinterpret_cast<void*>(arg));
}
bool runInEventBaseThread(void (*fn)(void*), void* arg);
/**
* Run the specified function in the EventBase's thread
*
* This version of runInEventBaseThread() takes a std::function object.
* Note that this is less efficient than the version that takes a plain
* function pointer and void* argument, as it has to allocate memory to copy
* the std::function object.
*
* If the EventBase loop is terminated before it has a chance to run this
* function, the allocated memory will be leaked. The caller is responsible
* for ensuring that the EventBase loop is not terminated before this
* function can run.
*
* The function must not throw any exceptions.
*/
bool runInEventBaseThread(const Cob& fn);
/**
* Runs the given Cob at some time after the specified number of
* milliseconds. (No guarantees exactly when.)
*
* @return true iff the cob was successfully registered.
*/
bool runAfterDelay(
const Cob& c,
int milliseconds,
TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
/**
* Set the maximum desired latency in us and provide a callback which will be
* called when that latency is exceeded.
*/
void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
maxLatency_ = maxLatency;
maxLatencyCob_ = maxLatencyCob;
}
/**
* Set smoothing coefficient for loop load average; # of milliseconds
* for exp(-1) (1/2.71828...) decay.
*/
void setLoadAvgMsec(uint32_t ms);
/**
* reset the load average to a desired value
*/
void resetLoadAvg(double value = 0.0);
/**
* Get the average loop time in microseconds (an exponentially-smoothed ave)
*/
double getAvgLoopTime() const {
return avgLoopTime_.get();
}
/**
* check if the event base loop is running.
*/
bool isRunning() const {
return loopThread_.load(std::memory_order_relaxed) != 0;
}
/**
* wait until the event loop starts (after starting the event loop thread).
*/
void waitUntilRunning();
int getNotificationQueueSize() const;
/**
* Verify that current thread is the EventBase thread, if the EventBase is
* running.
*/
bool isInEventBaseThread() const {
auto tid = loopThread_.load(std::memory_order_relaxed);
return tid == 0 || pthread_equal(tid, pthread_self());
}
bool inRunningEventBaseThread() const {
return pthread_equal(
loopThread_.load(std::memory_order_relaxed), pthread_self());
}
// --------- interface to underlying libevent base ------------
// Avoid using these functions if possible. These functions are not
// guaranteed to always be present if we ever provide alternative EventBase
// implementations that do not use libevent internally.
event_base* getLibeventBase() const { return evb_; }
static const char* getLibeventVersion() { return event_get_version(); }
static const char* getLibeventMethod() { return event_get_method(); }
/**
* only EventHandler/AsyncTimeout subclasses and ourselves should
* ever call this.
*
* This is used to mark the beginning of a new loop cycle by the
* first handler fired within that cycle.
*
*/
bool bumpHandlingTime();
class SmoothLoopTime {
public:
explicit SmoothLoopTime(uint64_t timeInterval)
: expCoeff_(-1.0/timeInterval)
, value_(0.0)
, oldBusyLeftover_(0) {
VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
}
void setTimeInterval(uint64_t timeInterval);
void reset(double value = 0.0);
void addSample(int64_t idle, int64_t busy);
double get() const {
return value_;
}
void dampen(double factor) {
value_ *= factor;
}
private:
double expCoeff_;
double value_;
int64_t oldBusyLeftover_;
};
void setObserver(
const std::shared_ptr<EventBaseObserver>& observer) {
observer_ = observer;
}
const std::shared_ptr<EventBaseObserver>& getObserver() {
return observer_;
}
/**
* Set the name of the thread that runs this event base.
*/
void setName(const std::string& name);
/**
* Returns the name of the thread that runs this event base.
*/
const std::string& getName();
private:
// TimeoutManager
void attachTimeoutManager(AsyncTimeout* obj,
TimeoutManager::InternalEnum internal);
void detachTimeoutManager(AsyncTimeout* obj);
bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
void cancelTimeout(AsyncTimeout* obj);
bool isInTimeoutManagerThread() {
return isInEventBaseThread();
}
// Helper class used to short circuit runInEventBaseThread
class RunInLoopCallback : public LoopCallback {
public:
RunInLoopCallback(void (*fn)(void*), void* arg);
void runLoopCallback() noexcept;
private:
void (*fn_)(void*);
void* arg_;
};
/*
* Helper function that tells us whether we have already handled
* some event/timeout/callback in this loop iteration.
*/
bool nothingHandledYet();
// --------- libevent callbacks (not for client use) ------------
static void runTr1FunctionPtr(std::function<void()>* fn);
// small object used as a callback arg with enough info to execute the
// appropriate client-provided Cob
class CobTimeout : public AsyncTimeout {
public:
CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
: AsyncTimeout(b, in), cob_(c) {}
virtual void timeoutExpired() noexcept;
private:
Cob cob_;
public:
typedef boost::intrusive::list_member_hook<
boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
ListHook hook;
typedef boost::intrusive::list<
CobTimeout,
boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
boost::intrusive::constant_time_size<false> > List;
};
typedef LoopCallback::List LoopCallbackList;
class FunctionRunner;
// executes any callbacks queued by runInLoop(); returns false if none found
bool runLoopCallbacks(bool setContext = true);
void initNotificationQueue();
CobTimeout::List pendingCobTimeouts_;
LoopCallbackList loopCallbacks_;
// This will be null most of the time, but point to currentCallbacks
// if we are in the middle of running loop callbacks, such that
// runInLoop(..., true) will always run in the current loop
// iteration.
LoopCallbackList* runOnceCallbacks_;
// stop_ is set by terminateLoopSoon() and is used by the main loop
// to determine if it should exit
bool stop_;
// The ID of the thread running the main loop.
// 0 if loop is not running.
// Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
// even that atomic<pthread_t> is valid), but that's how it is
// everywhere (at least on Linux, FreeBSD, and OSX).
std::atomic<pthread_t> loopThread_;
// pointer to underlying event_base class doing the heavy lifting
event_base* evb_;
// A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
// limit for latency in microseconds (0 disables)
int64_t maxLatency_;
// exponentially-smoothed average loop time for latency-limiting
SmoothLoopTime avgLoopTime_;
// smoothed loop time used to invoke latency callbacks; differs from
// avgLoopTime_ in that it's scaled down after triggering a callback
// to reduce spamminess
SmoothLoopTime maxLatencyLoopTime_;
// callback called when latency limit is exceeded
Cob maxLatencyCob_;
// we'll wait this long before running deferred callbacks if the event
// loop is idle.
static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
// Wrap-around loop counter to detect beginning of each loop
uint64_t nextLoopCnt_;
uint64_t latestLoopCnt_;
uint64_t startWork_;
// Observer to export counters
std::shared_ptr<EventBaseObserver> observer_;
uint32_t observerSampleCount_;
// Name of the thread running this EventBase
std::string name_;
};
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Work around the lack of <sys/eventfd.h> on glibc 2.5.1 which we still
* need to support, sigh.
*/
#pragma once
#include <features.h>
// <sys/eventfd.h> doesn't exist on older glibc versions
#if (defined(__GLIBC__) && __GLIBC_PREREQ(2, 9))
#include <sys/eventfd.h>
#else /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
#include <sys/syscall.h>
#include <unistd.h>
#include <fcntl.h>
// Use existing __NR_eventfd2 if already defined
// Values from the Linux kernel source:
// arch/x86/include/asm/unistd_{32,64}.h
#ifndef __NR_eventfd2
#if defined(__x86_64__)
#define __NR_eventfd2 290
#elif defined(__i386__)
#define __NR_eventfd2 328
#else
#error "Can't define __NR_eventfd2 for your architecture."
#endif
#endif
enum
{
EFD_SEMAPHORE = 1,
#define EFD_SEMAPHORE EFD_SEMAPHORE
EFD_CLOEXEC = 02000000,
#define EFD_CLOEXEC EFD_CLOEXEC
EFD_NONBLOCK = 04000
#define EFD_NONBLOCK EFD_NONBLOCK
};
// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
// Use the eventfd2 system call, as in glibc 2.9+
// (requires kernel 2.6.30+)
#define eventfd(initval, flags) syscall(__NR_eventfd2,(initval),(flags))
#endif /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "folly/io/async/EventHandler.h"
#include "folly/io/async/EventBase.h"
#include <assert.h>
namespace folly {
EventHandler::EventHandler(EventBase* eventBase, int fd) {
event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
if (eventBase != nullptr) {
setEventBase(eventBase);
} else {
// Callers must set the EventBase and fd before using this timeout.
// Set event_->ev_base to nullptr to ensure that this happens.
// (otherwise libevent will initialize it to the "default" event_base)
event_.ev_base = nullptr;
eventBase_ = nullptr;
}
}
EventHandler::~EventHandler() {
unregisterHandler();
}
bool EventHandler::registerImpl(uint16_t events, bool internal) {
assert(event_.ev_base != nullptr);
// We have to unregister the event before we can change the event flags
if (isHandlerRegistered()) {
// If the new events are the same are the same as the already registered
// flags, we don't have to do anything. Just return.
if (events == event_.ev_events &&
static_cast<bool>(event_.ev_flags & EVLIST_INTERNAL) == internal) {
return true;
}
event_del(&event_);
}
// Update the event flags
// Unfortunately, event_set() resets the event_base, so we have to remember
// it before hand, then pass it back into event_base_set() afterwards
struct event_base* evb = event_.ev_base;
event_set(&event_, event_.ev_fd, events,
&EventHandler::libeventCallback, this);
event_base_set(evb, &event_);
// Set EVLIST_INTERNAL if this is an internal event
if (internal) {
event_.ev_flags |= EVLIST_INTERNAL;
}
// Add the event.
//
// Although libevent allows events to wait on both I/O and a timeout,
// we intentionally don't allow an EventHandler to also use a timeout.
// Callers must maintain a separate AsyncTimeout object if they want a
// timeout.
//
// Otherwise, it is difficult to handle persistent events properly. (The I/O
// event and timeout may both fire together the same time around the event
// loop. Normally we would want to inform the caller of the I/O event first,
// then the timeout. However, it is difficult to do this properly since the
// I/O callback could delete the EventHandler.) Additionally, if a caller
// uses the same struct event for both I/O and timeout, and they just want to
// reschedule the timeout, libevent currently makes an epoll_ctl() call even
// if the I/O event flags haven't changed. Using a separate event struct is
// therefore slightly more efficient in this case (although it does take up
// more space).
if (event_add(&event_, nullptr) < 0) {
LOG(ERROR) << "EventBase: failed to register event handler for fd "
<< event_.ev_fd << ": " << strerror(errno);
// Call event_del() to make sure the event is completely uninstalled
event_del(&event_);
return false;
}
return true;
}
void EventHandler::unregisterHandler() {
if (isHandlerRegistered()) {
event_del(&event_);
}
}
void EventHandler::attachEventBase(EventBase* eventBase) {
// attachEventBase() may only be called on detached handlers
assert(event_.ev_base == nullptr);
assert(!isHandlerRegistered());
// This must be invoked from the EventBase's thread
assert(eventBase->isInEventBaseThread());
setEventBase(eventBase);
}
void EventHandler::detachEventBase() {
ensureNotRegistered(__func__);
event_.ev_base = nullptr;
}
void EventHandler::changeHandlerFD(int fd) {
ensureNotRegistered(__func__);
// event_set() resets event_base.ev_base, so manually restore it afterwards
struct event_base* evb = event_.ev_base;
event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
event_.ev_base = evb; // don't use event_base_set(), since evb may be nullptr
}
void EventHandler::initHandler(EventBase* eventBase, int fd) {
ensureNotRegistered(__func__);
event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
setEventBase(eventBase);
}
void EventHandler::ensureNotRegistered(const char* fn) {
// Neither the EventBase nor file descriptor may be changed while the
// handler is registered. Treat it as a programmer bug and abort the program
// if this requirement is violated.
if (isHandlerRegistered()) {
LOG(ERROR) << fn << " called on registered handler; aborting";
abort();
}
}
void EventHandler::libeventCallback(int fd, short events, void* arg) {
EventHandler* handler = reinterpret_cast<EventHandler*>(arg);
assert(fd == handler->event_.ev_fd);
// this can't possibly fire if handler->eventBase_ is nullptr
(void) handler->eventBase_->bumpHandlingTime();
handler->handlerReady(events);
}
void EventHandler::setEventBase(EventBase* eventBase) {
event_base_set(eventBase->getLibeventBase(), &event_);
eventBase_ = eventBase;
}
bool EventHandler::isPending() {
if (event_.ev_flags & EVLIST_ACTIVE) {
if (event_.ev_res & EV_READ) {
return true;
}
}
return false;
}
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <glog/logging.h>
#include "folly/io/async/EventUtil.h"
#include <boost/noncopyable.hpp>
#include <stddef.h>
namespace folly {
class EventBase;
/**
* The EventHandler class is used to asynchronously wait for events on a file
* descriptor.
*
* Users that wish to wait on I/O events should derive from EventHandler and
* implement the handlerReady() method.
*/
class EventHandler : private boost::noncopyable {
public:
enum EventFlags {
NONE = 0,
READ = EV_READ,
WRITE = EV_WRITE,
READ_WRITE = (READ | WRITE),
PERSIST = EV_PERSIST
};
/**
* Create a new EventHandler object.
*
* @param eventBase The EventBase to use to drive this event handler.
* This may be nullptr, in which case the EventBase must be
* set separately using initHandler() or attachEventBase()
* before the handler can be registered.
* @param fd The file descriptor that this EventHandler will
* monitor. This may be -1, in which case the file
* descriptor must be set separately using initHandler() or
* changeHandlerFD() before the handler can be registered.
*/
explicit EventHandler(EventBase* eventBase = nullptr, int fd = -1);
/**
* EventHandler destructor.
*
* The event will be automatically unregistered if it is still registered.
*/
virtual ~EventHandler();
/**
* handlerReady() is invoked when the handler is ready.
*
* @param events A bitset indicating the events that are ready.
*/
virtual void handlerReady(uint16_t events) noexcept = 0;
/**
* Register the handler.
*
* If the handler is already registered, the registration will be updated
* to wait on the new set of events.
*
* @param events A bitset specifying the events to monitor.
* If the PERSIST bit is set, the handler will remain
* registered even after handlerReady() is called.
*
* @return Returns true if the handler was successfully registered,
* or false if an error occurred. After an error, the handler is
* always unregistered, even if it was already registered prior to
* this call to registerHandler().
*/
bool registerHandler(uint16_t events) {
return registerImpl(events, false);
}
/**
* Unregister the handler, if it is registered.
*/
void unregisterHandler();
/**
* Returns true if the handler is currently registered.
*/
bool isHandlerRegistered() const {
return EventUtil::isEventRegistered(&event_);
}
/**
* Attach the handler to a EventBase.
*
* This may only be called if the handler is not currently attached to a
* EventBase (either by using the default constructor, or by calling
* detachEventBase()).
*
* This method must be invoked in the EventBase's thread.
*/
void attachEventBase(EventBase* eventBase);
/**
* Detach the handler from its EventBase.
*
* This may only be called when the handler is not currently registered.
* Once detached, the handler may not be registered again until it is
* re-attached to a EventBase by calling attachEventBase().
*
* This method must be called from the current EventBase's thread.
*/
void detachEventBase();
/**
* Change the file descriptor that this handler is associated with.
*
* This may only be called when the handler is not currently registered.
*/
void changeHandlerFD(int fd);
/**
* Attach the handler to a EventBase, and change the file descriptor.
*
* This method may only be called if the handler is not currently attached to
* a EventBase. This is primarily intended to be used to initialize
* EventHandler objects created using the default constructor.
*/
void initHandler(EventBase* eventBase, int fd);
/**
* Return the set of events that we're currently registered for.
*/
uint16_t getRegisteredEvents() const {
return (isHandlerRegistered()) ?
event_.ev_events : 0;
}
/**
* Register the handler as an internal event.
*
* This event will not count as an active event for determining if the
* EventBase loop has more events to process. The EventBase loop runs
* only as long as there are active EventHandlers, however "internal" event
* handlers are not counted. Therefore this event handler will not prevent
* EventBase loop from exiting with no more work to do if there are no other
* non-internal event handlers registered.
*
* This is intended to be used only in very rare cases by the internal
* EventBase code. This API is not guaranteed to remain stable or portable
* in the future.
*/
bool registerInternalHandler(uint16_t events) {
return registerImpl(events, true);
}
bool isPending();
private:
bool registerImpl(uint16_t events, bool internal);
void ensureNotRegistered(const char* fn);
void setEventBase(EventBase* eventBase);
static void libeventCallback(int fd, short events, void* arg);
struct event event_;
EventBase* eventBase_;
};
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <event.h> // libevent
namespace folly {
/**
* low-level libevent utility functions
*/
class EventUtil {
public:
static bool isEventRegistered(const struct event* ev) {
// If any of these flags are set, the event is registered.
enum {
EVLIST_REGISTERED = (EVLIST_INSERTED | EVLIST_ACTIVE |
EVLIST_TIMEOUT | EVLIST_SIGNAL)
};
return (ev->ev_flags & EVLIST_REGISTERED);
}
};
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <fcntl.h>
#include <unistd.h>
#include "folly/io/async/EventBase.h"
#include "folly/io/async/EventFDWrapper.h"
#include "folly/io/async/EventHandler.h"
#include "folly/io/async/Request.h"
#include "folly/Likely.h"
#include "folly/SmallLocks.h"
#include <glog/logging.h>
#include <deque>
namespace folly {
/**
* A producer-consumer queue for passing messages between EventBase threads.
*
* Messages can be added to the queue from any thread. Multiple consumers may
* listen to the queue from multiple EventBase threads.
*
* A NotificationQueue may not be destroyed while there are still consumers
* registered to receive events from the queue. It is the user's
* responsibility to ensure that all consumers are unregistered before the
* queue is destroyed.
*
* MessageT should be MoveConstructible (i.e., must support either a move
* constructor or a copy constructor, or both). Ideally it's move constructor
* (or copy constructor if no move constructor is provided) should never throw
* exceptions. If the constructor may throw, the consumers could end up
* spinning trying to move a message off the queue and failing, and then
* retrying.
*/
template<typename MessageT>
class NotificationQueue {
public:
/**
* A callback interface for consuming messages from the queue as they arrive.
*/
class Consumer : private EventHandler {
public:
enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
Consumer()
: queue_(nullptr),
destroyedFlagPtr_(nullptr),
maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
virtual ~Consumer();
/**
* messageAvailable() will be invoked whenever a new
* message is available from the pipe.
*/
virtual void messageAvailable(MessageT&& message) = 0;
/**
* Begin consuming messages from the specified queue.
*
* messageAvailable() will be called whenever a message is available. This
* consumer will continue to consume messages until stopConsuming() is
* called.
*
* A Consumer may only consume messages from a single NotificationQueue at
* a time. startConsuming() should not be called if this consumer is
* already consuming.
*/
void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
init(eventBase, queue);
registerHandler(READ | PERSIST);
}
/**
* Same as above but registers this event handler as internal so that it
* doesn't count towards the pending reader count for the IOLoop.
*/
void startConsumingInternal(
EventBase* eventBase, NotificationQueue* queue) {
init(eventBase, queue);
registerInternalHandler(READ | PERSIST);
}
/**
* Stop consuming messages.
*
* startConsuming() may be called again to resume consumption of messages
* at a later point in time.
*/
void stopConsuming();
/**
* Get the NotificationQueue that this consumer is currently consuming
* messages from. Returns nullptr if the consumer is not currently
* consuming events from any queue.
*/
NotificationQueue* getCurrentQueue() const {
return queue_;
}
/**
* Set a limit on how many messages this consumer will read each iteration
* around the event loop.
*
* This helps rate-limit how much work the Consumer will do each event loop
* iteration, to prevent it from starving other event handlers.
*
* A limit of 0 means no limit will be enforced. If unset, the limit
* defaults to kDefaultMaxReadAtOnce (defined to 10 above).
*/
void setMaxReadAtOnce(uint32_t maxAtOnce) {
maxReadAtOnce_ = maxAtOnce;
}
uint32_t getMaxReadAtOnce() const {
return maxReadAtOnce_;
}
EventBase* getEventBase() {
return base_;
}
virtual void handlerReady(uint16_t events) noexcept;
private:
void init(EventBase* eventBase, NotificationQueue* queue);
NotificationQueue* queue_;
bool* destroyedFlagPtr_;
uint32_t maxReadAtOnce_;
EventBase* base_;
};
enum class FdType {
EVENTFD,
PIPE
};
/**
* Create a new NotificationQueue.
*
* If the maxSize parameter is specified, this sets the maximum queue size
* that will be enforced by tryPutMessage(). (This size is advisory, and may
* be exceeded if producers explicitly use putMessage() instead of
* tryPutMessage().)
*
* The fdType parameter determines the type of file descriptor used
* internally to signal message availability. The default (eventfd) is
* preferable for performance and because it won't fail when the queue gets
* too long. It is not available on on older and non-linux kernels, however.
* In this case the code will fall back to using a pipe, the parameter is
* mostly for testing purposes.
*/
explicit NotificationQueue(uint32_t maxSize = 0,
FdType fdType = FdType::EVENTFD)
: eventfd_(-1),
pipeFds_{-1, -1},
advisoryMaxQueueSize_(maxSize),
pid_(getpid()),
queue_() {
spinlock_.init();
RequestContext::getStaticContext();
if (fdType == FdType::EVENTFD) {
eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
if (eventfd_ == -1) {
if (errno == ENOSYS || errno == EINVAL) {
// eventfd not availalble
LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
<< errno << ", falling back to pipe mode (is your kernel "
<< "> 2.6.30?)";
fdType = FdType::PIPE;
} else {
// some other error
folly::throwSystemError("Failed to create eventfd for "
"NotificationQueue", errno);
}
}
}
if (fdType == FdType::PIPE) {
if (pipe(pipeFds_)) {
folly::throwSystemError("Failed to create pipe for NotificationQueue",
errno);
}
try {
// put both ends of the pipe into non-blocking mode
if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
folly::throwSystemError("failed to put NotificationQueue pipe read "
"endpoint into non-blocking mode", errno);
}
if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
folly::throwSystemError("failed to put NotificationQueue pipe write "
"endpoint into non-blocking mode", errno);
}
} catch (...) {
::close(pipeFds_[0]);
::close(pipeFds_[1]);
throw;
}
}
}
~NotificationQueue() {
if (eventfd_ >= 0) {
::close(eventfd_);
eventfd_ = -1;
}
if (pipeFds_[0] >= 0) {
::close(pipeFds_[0]);
pipeFds_[0] = -1;
}
if (pipeFds_[1] >= 0) {
::close(pipeFds_[1]);
pipeFds_[1] = -1;
}
}
/**
* Set the advisory maximum queue size.
*
* This maximum queue size affects calls to tryPutMessage(). Message
* producers can still use the putMessage() call to unconditionally put a
* message on the queue, ignoring the configured maximum queue size. This
* can cause the queue size to exceed the configured maximum.
*/
void setMaxQueueSize(uint32_t max) {
advisoryMaxQueueSize_ = max;
}
/**
* Attempt to put a message on the queue if the queue is not already full.
*
* If the queue is full, a std::overflow_error will be thrown. The
* setMaxQueueSize() function controls the maximum queue size.
*
* This method may contend briefly on a spinlock if many threads are
* concurrently accessing the queue, but for all intents and purposes it will
* immediately place the message on the queue and return.
*
* tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
* may throw any other exception thrown by the MessageT move/copy
* constructor.
*/
void tryPutMessage(MessageT&& message) {
putMessageImpl(std::move(message), advisoryMaxQueueSize_);
}
void tryPutMessage(const MessageT& message) {
putMessageImpl(message, advisoryMaxQueueSize_);
}
/**
* No-throw versions of the above. Instead returns true on success, false on
* failure.
*
* Only std::overflow_error is prevented from being thrown (since this is the
* common exception case), user code must still catch std::bad_alloc errors.
*/
bool tryPutMessageNoThrow(MessageT&& message) {
return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
}
bool tryPutMessageNoThrow(const MessageT& message) {
return putMessageImpl(message, advisoryMaxQueueSize_, false);
}
/**
* Unconditionally put a message on the queue.
*
* This method is like tryPutMessage(), but ignores the maximum queue size
* and always puts the message on the queue, even if the maximum queue size
* would be exceeded.
*
* putMessage() may throw std::bad_alloc if memory allocation fails, and may
* throw any other exception thrown by the MessageT move/copy constructor.
*/
void putMessage(MessageT&& message) {
putMessageImpl(std::move(message), 0);
}
void putMessage(const MessageT& message) {
putMessageImpl(message, 0);
}
/**
* Put several messages on the queue.
*/
template<typename InputIteratorT>
void putMessages(InputIteratorT first, InputIteratorT last) {
typedef typename std::iterator_traits<InputIteratorT>::iterator_category
IterCategory;
putMessagesImpl(first, last, IterCategory());
}
/**
* Try to immediately pull a message off of the queue, without blocking.
*
* If a message is immediately available, the result parameter will be
* updated to contain the message contents and true will be returned.
*
* If no message is available, false will be returned and result will be left
* unmodified.
*/
bool tryConsume(MessageT& result) {
checkPid();
if (!tryConsumeEvent()) {
return false;
}
try {
folly::MSLGuard g(spinlock_);
// This shouldn't happen normally. See the comments in
// Consumer::handlerReady() for more details.
if (UNLIKELY(queue_.empty())) {
LOG(ERROR) << "found empty queue after signalled event";
return false;
}
auto data = std::move(queue_.front());
result = data.first;
RequestContext::setContext(data.second);
queue_.pop_front();
} catch (...) {
// Handle an exception if the assignment operator happens to throw.
// We consumed an event but weren't able to pop the message off the
// queue. Signal the event again since the message is still in the
// queue.
signalEvent(1);
throw;
}
return true;
}
int size() {
folly::MSLGuard g(spinlock_);
return queue_.size();
}
/**
* Check that the NotificationQueue is being used from the correct process.
*
* If you create a NotificationQueue in one process, then fork, and try to
* send messages to the queue from the child process, you're going to have a
* bad time. Unfortunately users have (accidentally) run into this.
*
* Because we use an eventfd/pipe, the child process can actually signal the
* parent process that an event is ready. However, it can't put anything on
* the parent's queue, so the parent wakes up and finds an empty queue. This
* check ensures that we catch the problem in the misbehaving child process
* code, and crash before signalling the parent process.
*/
void checkPid() const {
CHECK_EQ(pid_, getpid());
}
private:
// Forbidden copy constructor and assignment operator
NotificationQueue(NotificationQueue const &) = delete;
NotificationQueue& operator=(NotificationQueue const &) = delete;
inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
DCHECK(0 == spinlock_.try_lock());
if (maxSize > 0 && queue_.size() >= maxSize) {
if (throws) {
throw std::overflow_error("unable to add message to NotificationQueue: "
"queue is full");
}
return false;
}
return true;
}
inline void signalEvent(size_t numAdded = 1) const {
static const uint8_t kPipeMessage[] = {
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
};
ssize_t bytes_written = 0;
ssize_t bytes_expected = 0;
if (eventfd_ >= 0) {
// eventfd(2) dictates that we must write a 64-bit integer
uint64_t numAdded64(numAdded);
bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
} else {
// pipe semantics, add one message for each numAdded
bytes_expected = numAdded;
do {
size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
if (rc < 0) {
// TODO: if the pipe is full, write will fail with EAGAIN.
// See task #1044651 for how this could be handled
break;
}
numAdded -= rc;
bytes_written += rc;
} while (numAdded > 0);
}
if (bytes_written != bytes_expected) {
folly::throwSystemError("failed to signal NotificationQueue after "
"write", errno);
}
}
bool tryConsumeEvent() {
uint64_t value = 0;
ssize_t rc = -1;
if (eventfd_ >= 0) {
rc = ::read(eventfd_, &value, sizeof(value));
} else {
uint8_t value8;
rc = ::read(pipeFds_[0], &value8, sizeof(value8));
value = value8;
}
if (rc < 0) {
// EAGAIN should pretty much be the only error we can ever get.
// This means someone else already processed the only available message.
assert(errno == EAGAIN);
return false;
}
assert(value == 1);
return true;
}
bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
checkPid();
{
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
}
queue_.push_back(
std::make_pair(std::move(message),
RequestContext::saveContext()));
}
signalEvent();
return true;
}
bool putMessageImpl(
const MessageT& message, size_t maxSize, bool throws=true) {
checkPid();
{
folly::MSLGuard g(spinlock_);
if (!checkQueueSize(maxSize, throws)) {
return false;
}
queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
}
signalEvent();
return true;
}
template<typename InputIteratorT>
void putMessagesImpl(InputIteratorT first, InputIteratorT last,
std::input_iterator_tag) {
checkPid();
size_t numAdded = 0;
{
folly::MSLGuard g(spinlock_);
while (first != last) {
queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
++first;
++numAdded;
}
}
signalEvent(numAdded);
}
mutable folly::MicroSpinLock spinlock_;
int eventfd_;
int pipeFds_[2]; // to fallback to on older/non-linux systems
uint32_t advisoryMaxQueueSize_;
pid_t pid_;
std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
};
template<typename MessageT>
NotificationQueue<MessageT>::Consumer::~Consumer() {
// If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
// will be non-nullptr. Mark the value that it points to, so that
// handlerReady() will know the callback is destroyed, and that it cannot
// access any member variables anymore.
if (destroyedFlagPtr_) {
*destroyedFlagPtr_ = true;
}
}
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
noexcept {
uint32_t numProcessed = 0;
while (true) {
// Try to decrement the eventfd.
//
// We decrement the eventfd before checking the queue, and only pop a
// message off the queue if we read from the eventfd.
//
// Reading the eventfd first allows us to not have to hold the spinlock
// while accessing the eventfd. If we popped from the queue first, we
// would have to hold the lock while reading from or writing to the
// eventfd. (Multiple consumers may be woken up from a single eventfd
// notification. If we popped from the queue first, we could end up
// popping a message from the queue before the eventfd has been notified by
// the producer, unless the consumer and producer both held the spinlock
// around the entire operation.)
if (!queue_->tryConsumeEvent()) {
// no message available right now
return;
}
// Now pop the message off of the queue.
// We successfully consumed the eventfd notification.
// There should be a message available for us to consume.
//
// We have to manually acquire and release the spinlock here, rather than
// using SpinLockHolder since the MessageT has to be constructed while
// holding the spinlock and available after we release it. SpinLockHolder
// unfortunately doesn't provide a release() method. (We can't construct
// MessageT first since we have no guarantee that MessageT has a default
// constructor.
queue_->spinlock_.lock();
bool locked = true;
try {
// The eventfd is incremented once for every message, and only
// decremented when a message is popped off. There should always be a
// message here to read.
if (UNLIKELY(queue_->queue_.empty())) {
// Unfortunately we have seen this happen in practice if a user forks
// the process, and then the child tries to send a message to a
// NotificationQueue being monitored by a thread in the parent.
// The child can signal the parent via the eventfd, but won't have been
// able to put anything on the parent's queue since it has a separate
// address space.
//
// This is a bug in the sender's code. putMessagesImpl() should cause
// the sender to crash now before trying to send a message from the
// wrong process. However, just in case let's handle this case in the
// consumer without crashing.
LOG(ERROR) << "found empty queue after signalled event";
queue_->spinlock_.unlock();
return;
}
// Pull a message off the queue.
auto& data = queue_->queue_.front();
MessageT msg(std::move(data.first));
auto old_ctx =
RequestContext::setContext(data.second);
queue_->queue_.pop_front();
// Check to see if the queue is empty now.
// We use this as an optimization to see if we should bother trying to
// loop again and read another message after invoking this callback.
bool wasEmpty = queue_->queue_.empty();
// Now unlock the spinlock before we invoke the callback.
queue_->spinlock_.unlock();
locked = false;
// Call the callback
bool callbackDestroyed = false;
CHECK(destroyedFlagPtr_ == nullptr);
destroyedFlagPtr_ = &callbackDestroyed;
messageAvailable(std::move(msg));
RequestContext::setContext(old_ctx);
// If the callback was destroyed before it returned, we are done
if (callbackDestroyed) {
return;
}
destroyedFlagPtr_ = nullptr;
// If the callback is no longer installed, we are done.
if (queue_ == nullptr) {
return;
}
// If we have hit maxReadAtOnce_, we are done.
++numProcessed;
if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
return;
}
// If the queue was empty before we invoked the callback, it's probable
// that it is still empty now. Just go ahead and return, rather than
// looping again and trying to re-read from the eventfd. (If a new
// message had in fact arrived while we were invoking the callback, we
// will simply be woken up the next time around the event loop and will
// process the message then.)
if (wasEmpty) {
return;
}
} catch (const std::exception& ex) {
// This catch block is really just to handle the case where the MessageT
// constructor throws. The messageAvailable() callback itself is
// declared as noexcept and should never throw.
//
// If the MessageT constructor does throw we try to handle it as best as
// we can, but we can't work miracles. We will just ignore the error for
// now and return. The next time around the event loop we will end up
// trying to read the message again. If MessageT continues to throw we
// will never make forward progress and will keep trying each time around
// the event loop.
if (locked) {
// Unlock the spinlock.
queue_->spinlock_.unlock();
// Push a notification back on the eventfd since we didn't actually
// read the message off of the queue.
queue_->signalEvent(1);
}
return;
}
}
}
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::init(
EventBase* eventBase,
NotificationQueue* queue) {
assert(eventBase->isInEventBaseThread());
assert(queue_ == nullptr);
assert(!isHandlerRegistered());
queue->checkPid();
base_ = eventBase;
queue_ = queue;
if (queue_->eventfd_ >= 0) {
initHandler(eventBase, queue_->eventfd_);
} else {
initHandler(eventBase, queue_->pipeFds_[0]);
}
}
template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::stopConsuming() {
if (queue_ == nullptr) {
assert(!isHandlerRegistered());
return;
}
assert(isHandlerRegistered());
unregisterHandler();
detachEventBase();
queue_ = nullptr;
}
} // folly
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "folly/io/async/Request.h"
#ifndef NO_LIB_GFLAGS
DEFINE_bool(enable_request_context, true,
"Enable collection of per-request queueing stats for thrift");
#endif
namespace folly {
#ifdef NO_LIB_GFLAGS
bool FLAGS_enable_thrift_request_context = true;
#endif
RequestContext* defaultContext;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <glog/logging.h>
#include "folly/ThreadLocal.h"
#include "folly/RWSpinLock.h"
/**
* In many cases this header is included as a
* dependency to libraries which do not need
* command line flags. GFLAGS is a large binary
* and thus we do this so that a library which
* is size sensitive doesn't have to pull in
* GFLAGS if it doesn't want to.
*/
#ifndef NO_LIB_GFLAGS
#include <gflags/gflags.h>
DECLARE_bool(enable_request_context);
#endif
namespace folly {
#ifdef NO_LIB_GFLAGS
extern bool FLAGS_enable_request_context;
#endif
// Some request context that follows an async request through a process
// Everything in the context must be thread safe
class RequestData {
public:
virtual ~RequestData() {}
};
class RequestContext;
// If you do not call create() to create a unique request context,
// this default request context will always be returned, and is never
// copied between threads.
extern RequestContext* defaultContext;
class RequestContext {
public:
// Create a unique requext context for this request.
// It will be passed between queues / threads (where implemented),
// so it should be valid for the lifetime of the request.
static bool create() {
if(!FLAGS_enable_request_context) {
return false;
}
bool prev = getStaticContext().get() != nullptr;
getStaticContext().reset(new std::shared_ptr<RequestContext>(
std::make_shared<RequestContext>()));
return prev;
}
// Get the current context.
static RequestContext* get() {
if (!FLAGS_enable_request_context ||
getStaticContext().get() == nullptr) {
if (defaultContext == nullptr) {
defaultContext = new RequestContext;
}
return defaultContext;
}
return getStaticContext().get()->get();
}
// The following API may be used to set per-request data in a thread-safe way.
// This access is still performance sensitive, so please ask if you need help
// profiling any use of these functions.
void setContextData(
const std::string& val, std::unique_ptr<RequestData> data) {
if (!FLAGS_enable_request_context) {
return;
}
folly::RWSpinLock::WriteHolder guard(lock);
if (data_.find(val) != data_.end()) {
LOG_FIRST_N(WARNING, 1) <<
"Called RequestContext::setContextData with data already set";
data_[val] = nullptr;
} else {
data_[val] = std::move(data);
}
}
bool hasContextData(const std::string& val) {
folly::RWSpinLock::ReadHolder guard(lock);
return data_.find(val) != data_.end();
}
RequestData* getContextData(const std::string& val) {
folly::RWSpinLock::ReadHolder guard(lock);
auto r = data_.find(val);
if (r == data_.end()) {
return nullptr;
} else {
return r->second.get();
}
}
void clearContextData(const std::string& val) {
folly::RWSpinLock::WriteHolder guard(lock);
data_.erase(val);
}
// The following API is used to pass the context through queues / threads.
// saveContext is called to geta shared_ptr to the context, and
// setContext is used to reset it on the other side of the queue.
//
// A shared_ptr is used, because many request may fan out across
// multiple threads, or do post-send processing, etc.
static std::shared_ptr<RequestContext>
setContext(std::shared_ptr<RequestContext> ctx) {
if (FLAGS_enable_request_context) {
std::shared_ptr<RequestContext> old_ctx;
if (getStaticContext().get()) {
old_ctx = *getStaticContext().get();
}
if (ctx == nullptr) {
getStaticContext().reset(nullptr);
} else {
getStaticContext().reset(new std::shared_ptr<RequestContext>(ctx));
}
return old_ctx;
}
return std::shared_ptr<RequestContext>();
}
static std::shared_ptr<RequestContext> saveContext() {
if (!FLAGS_enable_request_context) {
return std::shared_ptr<RequestContext>();
}
if (getStaticContext().get() == nullptr) {
return std::shared_ptr<RequestContext>();
} else {
return *getStaticContext().get();
}
}
// Used to solve static destruction ordering issue. Any static object
// that uses RequestContext must call this function in its constructor.
//
// See below link for more details.
// http://stackoverflow.com/questions/335369/
// finding-c-static-initialization-order-problems#335746
static folly::ThreadLocalPtr<std::shared_ptr<RequestContext>>&
getStaticContext() {
static folly::ThreadLocalPtr<std::shared_ptr<RequestContext> > context;
return context;
}
private:
folly::RWSpinLock lock;
std::map<std::string, std::unique_ptr<RequestData>> data_;
};
/**
* Set the request context for a specific scope. For example,
* if you ran a part of a request in another thread you could
* use RequestContextGuard to copy apply the request context
* inside the other therad.
*/
class RequestContextGuard {
public:
explicit RequestContextGuard(std::shared_ptr<RequestContext> ctx) {
oldctx_ = RequestContext::setContext(std::move(ctx));
}
~RequestContextGuard() {
RequestContext::setContext(std::move(oldctx_));
}
private:
std::shared_ptr<RequestContext> oldctx_;
};
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <chrono>
#include <stdint.h>
namespace folly {
class AsyncTimeout;
/**
* Base interface to be implemented by all classes expecting to manage
* timeouts. AsyncTimeout will use implementations of this interface
* to schedule/cancel timeouts.
*/
class TimeoutManager {
public:
enum class InternalEnum {
INTERNAL,
NORMAL
};
virtual ~TimeoutManager() {}
/**
* Attaches/detaches TimeoutManager to AsyncTimeout
*/
virtual void attachTimeoutManager(AsyncTimeout* obj,
InternalEnum internal) = 0;
virtual void detachTimeoutManager(AsyncTimeout* obj) = 0;
/**
* Schedules AsyncTimeout to fire after `timeout` milliseconds
*/
virtual bool scheduleTimeout(AsyncTimeout* obj,
std::chrono::milliseconds timeout) = 0;
/**
* Cancels the AsyncTimeout, if scheduled
*/
virtual void cancelTimeout(AsyncTimeout* obj) = 0;
/**
* This is used to mark the beginning of a new loop cycle by the
* first handler fired within that cycle.
*/
virtual bool bumpHandlingTime() = 0;
/**
* Helper method to know whether we are running in the timeout manager
* thread
*/
virtual bool isInTimeoutManagerThread() = 0;
};
} // folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment