Commit cafe469a authored by Joseph Griego's avatar Joseph Griego Committed by Facebook Github Bot 9

EventBase keepAlive counter is not atomic

Summary: Since loopKeepAlive() is always used from the EventBase thread, there's no need for the overhead of an shared_ptr (and therefore, an atomic ref counter); we can get away without thread safety. This also allows us to discard the (sometimes incorrect) optimization of not returning a handle when it appears the loop will continue running anyways

Reviewed By: andriigrynenko

Differential Revision: D3375503

fbshipit-source-id: 474e4fcf992bdc4fcca9370d3c57bdcc4e042386
parent f04cf0cd
...@@ -198,7 +198,7 @@ EventBase::~EventBase() { ...@@ -198,7 +198,7 @@ EventBase::~EventBase() {
// Keep looping until all keep-alive handles are released. Each keep-alive // Keep looping until all keep-alive handles are released. Each keep-alive
// handle signals that some external code will still schedule some work on // handle signals that some external code will still schedule some work on
// this EventBase (so it's not safe to destroy it). // this EventBase (so it's not safe to destroy it).
while (!loopKeepAlive_.unique()) { while (loopKeepAliveCount_ > 0) {
applyLoopKeepAlive(); applyLoopKeepAlive();
loopOnce(); loopOnce();
} }
...@@ -448,12 +448,12 @@ bool EventBase::loopBody(int flags) { ...@@ -448,12 +448,12 @@ bool EventBase::loopBody(int flags) {
} }
void EventBase::applyLoopKeepAlive() { void EventBase::applyLoopKeepAlive() {
if (loopKeepAliveActive_ && loopKeepAlive_.unique()) { if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) {
// Restore the notification queue internal flag // Restore the notification queue internal flag
fnRunner_->stopConsuming(); fnRunner_->stopConsuming();
fnRunner_->startConsumingInternal(this, queue_.get()); fnRunner_->startConsumingInternal(this, queue_.get());
loopKeepAliveActive_ = false; loopKeepAliveActive_ = false;
} else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) { } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) {
// Update the notification queue event to treat it as a normal // Update the notification queue event to treat it as a normal
// (non-internal) event. The notification queue event always remains // (non-internal) event. The notification queue event always remains
// installed, and the main loop won't exit with it installed. // installed, and the main loop won't exit with it installed.
...@@ -468,11 +468,9 @@ void EventBase::loopForever() { ...@@ -468,11 +468,9 @@ void EventBase::loopForever() {
{ {
SCOPE_EXIT { SCOPE_EXIT {
applyLoopKeepAlive(); applyLoopKeepAlive();
loopForeverActive_ = false;
}; };
loopForeverActive_ = true;
// Make sure notification queue events are treated as normal events. // Make sure notification queue events are treated as normal events.
auto loopKeepAlive = loopKeepAlive_; auto keepAlive = loopKeepAlive();
ret = loop(); ret = loop();
} }
......
...@@ -587,7 +587,13 @@ class EventBase : private boost::noncopyable, ...@@ -587,7 +587,13 @@ class EventBase : private boost::noncopyable,
loopOnce(); loopOnce();
} }
using LoopKeepAlive = std::shared_ptr<void>; struct LoopKeepAliveDeleter {
void operator()(EventBase* evb) {
DCHECK(evb->isInEventBaseThread());
evb->loopKeepAliveCount_--;
}
};
using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
/// Returns you a handle which make loop() behave like loopForever() until /// Returns you a handle which make loop() behave like loopForever() until
/// destroyed. loop() will return to its original behavior only when all /// destroyed. loop() will return to its original behavior only when all
...@@ -596,11 +602,9 @@ class EventBase : private boost::noncopyable, ...@@ -596,11 +602,9 @@ class EventBase : private boost::noncopyable,
/// ///
/// May return no op LoopKeepAlive if loopForever() is already running. /// May return no op LoopKeepAlive if loopForever() is already running.
LoopKeepAlive loopKeepAlive() { LoopKeepAlive loopKeepAlive() {
if (loopForeverActive_) { DCHECK(isInEventBaseThread());
return nullptr; loopKeepAliveCount_++;
} else { return LoopKeepAlive(this);
return loopKeepAlive_;
}
} }
private: private:
...@@ -692,9 +696,8 @@ class EventBase : private boost::noncopyable, ...@@ -692,9 +696,8 @@ class EventBase : private boost::noncopyable,
// to send function requests to the EventBase thread. // to send function requests to the EventBase thread.
std::unique_ptr<NotificationQueue<Func>> queue_; std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_; std::unique_ptr<FunctionRunner> fnRunner_;
LoopKeepAlive loopKeepAlive_{std::make_shared<int>(42)}; size_t loopKeepAliveCount_{0};
bool loopKeepAliveActive_{false}; bool loopKeepAliveActive_{false};
std::atomic<bool> loopForeverActive_{false};
// limit for latency in microseconds (0 disables) // limit for latency in microseconds (0 disables)
int64_t maxLatency_; int64_t maxLatency_;
......
...@@ -1733,7 +1733,7 @@ TEST(EventBaseTest, LoopKeepAlive) { ...@@ -1733,7 +1733,7 @@ TEST(EventBaseTest, LoopKeepAlive) {
EventBase evb; EventBase evb;
bool done = false; bool done = false;
std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] { std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
/* sleep override */ std::this_thread::sleep_for( /* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
evb.runInEventBaseThread( evb.runInEventBaseThread(
...@@ -1754,7 +1754,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { ...@@ -1754,7 +1754,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
std::thread t; std::thread t;
evb.runInEventBaseThread([&] { evb.runInEventBaseThread([&] {
t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] { t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
/* sleep override */ std::this_thread::sleep_for( /* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100)); std::chrono::milliseconds(100));
evb.runInEventBaseThread( evb.runInEventBaseThread(
...@@ -1769,20 +1769,49 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { ...@@ -1769,20 +1769,49 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
t.join(); t.join();
} }
TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
bool done = false;
std::thread evThread([&] {
evb->loopForever();
evb.reset();
done = true;
});
{
auto* ev = evb.get();
EventBase::LoopKeepAlive keepAlive;
ev->runInEventBaseThreadAndWait(
[&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
ASSERT_FALSE(done) << "Loop finished before we asked it to";
ev->terminateLoopSoon();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(30));
ASSERT_FALSE(done) << "Loop terminated early";
ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
}
evThread.join();
ASSERT_TRUE(done);
}
TEST(EventBaseTest, LoopKeepAliveShutdown) { TEST(EventBaseTest, LoopKeepAliveShutdown) {
auto evb = folly::make_unique<EventBase>(); auto evb = folly::make_unique<EventBase>();
bool done = false; bool done = false;
std::thread t( std::thread t([
[&done, loopKeepAlive = evb->loopKeepAlive(), evbPtr = evb.get() ] { &done,
/* sleep override */ std::this_thread::sleep_for( loopKeepAlive = evb->loopKeepAlive(),
std::chrono::milliseconds(100)); evbPtr = evb.get()
evbPtr->runInEventBaseThread( ]() mutable {
[&done, loopKeepAlive = std::move(loopKeepAlive) ] { /* sleep override */ std::this_thread::sleep_for(
done = true; std::chrono::milliseconds(100));
}); evbPtr->runInEventBaseThread(
}); [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
});
evb.reset(); evb.reset();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment