Commit 204ae3a6 authored by Yingnan Li's avatar Yingnan Li Committed by Facebook GitHub Bot

Use ExecutionObserver in EventBase to monitor function not executed...

Use ExecutionObserver in EventBase to monitor function not executed EventBaseAtomicNotificationQueueby in EventBase

Summary:
Our service schedules tasks (eSemiFuture/coroutine) on SerialExecutor on top of IOThreadPoolExecutor. When using EventBaseWatchdog to locate long running tasks, I found some long running tasks were not observed by PerThreadExecController. After some investigation, I found there are 2 cases in EventBase that tasks are not executed by handlerReady in EventBaseAtomicNotificationQueue.

1. Short-circuit scenario in runInEventBaseThread which bypasses EventBaseAtomicNotificationQueue.
2. If notification Queue is marked 'internal', some events may be ran "manually" in loopBody.

As a result, the observer logic in EventHandler (EventBaseAtomicNotificationQueue derives from EventHandler) is passed.

Reviewed By: andriigrynenko

Differential Revision: D26563459

fbshipit-source-id: e2741cff373c89a5cd438904f910b23a64fad653
parent 842ecea5
......@@ -1185,7 +1185,8 @@ REGISTER_TYPED_TEST_CASE_P(
RunOnDestructionAfterHandleDestroyed,
RunOnDestructionAddCallbackWithinCallback,
InternalExternalCallbackOrderTest,
pidCheck);
pidCheck,
EventBaseExecutionObserver);
// Instantiate the non registered fd tests
INSTANTIATE_TYPED_TEST_CASE_P(IoUring, EventBaseTest, IoUringBackendProvider);
......
......@@ -128,6 +128,25 @@ EventBaseBackend::~EventBaseBackend() {
event_base_free(evb_);
}
class ExecutionObserverScopeGuard {
public:
ExecutionObserverScopeGuard(folly::ExecutionObserver* observer, void* id)
: observer_(observer), id_{reinterpret_cast<uintptr_t>(id)} {
if (observer_) {
observer_->starting(id_);
}
}
~ExecutionObserverScopeGuard() {
if (observer_) {
observer_->stopped(id_);
}
}
private:
folly::ExecutionObserver* observer_;
uintptr_t id_;
};
} // namespace
namespace folly {
......@@ -367,11 +386,7 @@ bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
LoopCallbackList callbacks;
callbacks.swap(runBeforeLoopCallbacks_);
while (!callbacks.empty()) {
auto* item = &callbacks.front();
callbacks.pop_front();
item->runLoopCallback();
}
runLoopCallbacks(callbacks);
// nobody can add loop callbacks from within this thread if
// we don't have to handle anything to start with...
......@@ -434,6 +449,7 @@ bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
// run. Run them manually if so, and continue looping.
//
if (!queue_->empty()) {
ExecutionObserverScopeGuard guard(executionObserver_, queue_.get());
queue_->execute();
} else if (!ranLoopCallbacks) {
// If there were no more events and we also didn't have any loop
......@@ -660,6 +676,16 @@ void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept {
}
}
void EventBase::runLoopCallbacks(LoopCallbackList& currentCallbacks) {
while (!currentCallbacks.empty()) {
LoopCallback* callback = &currentCallbacks.front();
currentCallbacks.pop_front();
folly::RequestContextScopeGuard rctx(std::move(callback->context_));
ExecutionObserverScopeGuard guard(executionObserver_, callback);
callback->runLoopCallback();
}
}
bool EventBase::runLoopCallbacks() {
bumpHandlingTime();
if (!loopCallbacks_.empty()) {
......@@ -675,12 +701,7 @@ bool EventBase::runLoopCallbacks() {
currentCallbacks.swap(loopCallbacks_);
runOnceCallbacks_ = &currentCallbacks;
while (!currentCallbacks.empty()) {
LoopCallback* callback = &currentCallbacks.front();
currentCallbacks.pop_front();
folly::RequestContextScopeGuard rctx(std::move(callback->context_));
callback->runLoopCallback();
}
runLoopCallbacks(currentCallbacks);
runOnceCallbacks_ = nullptr;
return true;
......
......@@ -873,6 +873,8 @@ class EventBase : public TimeoutManager,
bool loopBody(int flags = 0, bool ignoreKeepAlive = false);
void runLoopCallbacks(LoopCallbackList& currentCallbacks);
// executes any callbacks queued by runInLoop(); returns false if none found
bool runLoopCallbacks();
......
......@@ -86,7 +86,8 @@ REGISTER_TYPED_TEST_CASE_P(
RunOnDestructionAfterHandleDestroyed,
RunOnDestructionAddCallbackWithinCallback,
InternalExternalCallbackOrderTest,
pidCheck);
pidCheck,
EventBaseExecutionObserver);
struct DefaultBackendProvider {
static std::unique_ptr<folly::EventBaseBackendBase> getBackend() {
......
......@@ -204,6 +204,27 @@ FOLLY_ALWAYS_INLINE void scheduleEvents(
}
}
class TestObserver : public folly::ExecutionObserver {
public:
virtual void starting(uintptr_t /* id */) noexcept override {
if (nestedStart_ == 0) {
nestedStart_ = 1;
}
numStartingCalled_++;
}
virtual void stopped(uintptr_t /* id */) noexcept override {
nestedStart_--;
numStoppedCalled_++;
}
virtual void runnable(uintptr_t /* id */) noexcept override {
// Unused
}
int nestedStart_{0};
int numStartingCalled_{0};
int numStoppedCalled_{0};
};
class TestHandler : public folly::EventHandler {
public:
TestHandler(folly::EventBase* eventBase, int fd)
......@@ -1993,7 +2014,6 @@ TYPED_TEST_P(EventBaseTest, EventBaseThreadLoop) {
SKIP_IF(!eventBasePtr) << "Backend not available";
folly::EventBase& base = *eventBasePtr;
bool ran = false;
base.runInEventBaseThread([&]() { ran = true; });
base.loop();
......@@ -2405,5 +2425,28 @@ TYPED_TEST_P(EventBaseTest1, RunOnDestructionAddCallbackWithinCallback) {
}
EXPECT_EQ(2, callbacksCalled);
}
TYPED_TEST_P(EventBaseTest1, EventBaseExecutionObserver) {
auto eventBasePtr = getEventBase<TypeParam>();
SKIP_IF(!eventBasePtr) << "Backend not available";
folly::EventBase& base = *eventBasePtr;
bool ranBeforeLoop = false;
bool ran = false;
TestObserver observer;
base.setExecutionObserver(&observer);
CountedLoopCallback cb(&base, 1, [&]() { ranBeforeLoop = true; });
base.runBeforeLoop(&cb);
base.runInEventBaseThread(
[&]() { base.runInEventBaseThread([&]() { ran = true; }); });
base.loop();
ASSERT_TRUE(ranBeforeLoop);
ASSERT_TRUE(ran);
ASSERT_EQ(0, observer.nestedStart_);
ASSERT_EQ(4, observer.numStartingCalled_);
ASSERT_EQ(4, observer.numStoppedCalled_);
}
} // namespace test
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment