Commit b5338e82 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by facebook-github-bot-4

Fix ASAN failure in folly/io/async/test/NotificationQueueTest.cpp.

Summary: [Folly] Fix ASAN failure in folly/io/async/test/NotificationQueueTest.cpp.

Reviewed By: @djwatson

Differential Revision: D2299112
parent ace10485
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h> #include <folly/io/async/EventHandler.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/Request.h> #include <folly/io/async/Request.h>
#include <folly/Likely.h> #include <folly/Likely.h>
#include <folly/ScopeGuard.h> #include <folly/ScopeGuard.h>
...@@ -60,7 +61,7 @@ class NotificationQueue { ...@@ -60,7 +61,7 @@ class NotificationQueue {
/** /**
* A callback interface for consuming messages from the queue as they arrive. * A callback interface for consuming messages from the queue as they arrive.
*/ */
class Consumer : private EventHandler { class Consumer : public DelayedDestruction, private EventHandler {
public: public:
enum : uint16_t { kDefaultMaxReadAtOnce = 10 }; enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
...@@ -69,8 +70,6 @@ class NotificationQueue { ...@@ -69,8 +70,6 @@ class NotificationQueue {
destroyedFlagPtr_(nullptr), destroyedFlagPtr_(nullptr),
maxReadAtOnce_(kDefaultMaxReadAtOnce) {} maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
virtual ~Consumer();
/** /**
* messageAvailable() will be invoked whenever a new * messageAvailable() will be invoked whenever a new
* message is available from the pipe. * message is available from the pipe.
...@@ -152,7 +151,13 @@ class NotificationQueue { ...@@ -152,7 +151,13 @@ class NotificationQueue {
return base_; return base_;
} }
virtual void handlerReady(uint16_t events) noexcept; void handlerReady(uint16_t events) noexcept override;
protected:
void destroy() override;
virtual ~Consumer() {}
private: private:
/** /**
...@@ -577,7 +582,7 @@ class NotificationQueue { ...@@ -577,7 +582,7 @@ class NotificationQueue {
}; };
template<typename MessageT> template<typename MessageT>
NotificationQueue<MessageT>::Consumer::~Consumer() { void NotificationQueue<MessageT>::Consumer::destroy() {
// If we are in the middle of a call to handlerReady(), destroyedFlagPtr_ // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
// will be non-nullptr. Mark the value that it points to, so that // will be non-nullptr. Mark the value that it points to, so that
// handlerReady() will know the callback is destroyed, and that it cannot // handlerReady() will know the callback is destroyed, and that it cannot
...@@ -585,6 +590,8 @@ NotificationQueue<MessageT>::Consumer::~Consumer() { ...@@ -585,6 +590,8 @@ NotificationQueue<MessageT>::Consumer::~Consumer() {
if (destroyedFlagPtr_) { if (destroyedFlagPtr_) {
*destroyedFlagPtr_ = true; *destroyedFlagPtr_ = true;
} }
stopConsuming();
DelayedDestruction::destroy();
} }
template<typename MessageT> template<typename MessageT>
...@@ -596,6 +603,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/) ...@@ -596,6 +603,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t /*events*/)
template<typename MessageT> template<typename MessageT>
void NotificationQueue<MessageT>::Consumer::consumeMessages( void NotificationQueue<MessageT>::Consumer::consumeMessages(
bool isDrain, size_t* numConsumed) noexcept { bool isDrain, size_t* numConsumed) noexcept {
DestructorGuard dg(this);
uint32_t numProcessed = 0; uint32_t numProcessed = 0;
bool firstRun = true; bool firstRun = true;
setActive(true); setActive(true);
...@@ -661,6 +669,7 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages( ...@@ -661,6 +669,7 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
CHECK(destroyedFlagPtr_ == nullptr); CHECK(destroyedFlagPtr_ == nullptr);
destroyedFlagPtr_ = &callbackDestroyed; destroyedFlagPtr_ = &callbackDestroyed;
messageAvailable(std::move(msg)); messageAvailable(std::move(msg));
destroyedFlagPtr_ = nullptr;
RequestContext::setContext(old_ctx); RequestContext::setContext(old_ctx);
...@@ -668,7 +677,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages( ...@@ -668,7 +677,6 @@ void NotificationQueue<MessageT>::Consumer::consumeMessages(
if (callbackDestroyed) { if (callbackDestroyed) {
return; return;
} }
destroyedFlagPtr_ = nullptr;
// If the callback is no longer installed, we are done. // If the callback is no longer installed, we are done.
if (queue_ == nullptr) { if (queue_ == nullptr) {
...@@ -767,6 +775,7 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() { ...@@ -767,6 +775,7 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
template<typename MessageT> template<typename MessageT>
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained( bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
size_t* numConsumed) noexcept { size_t* numConsumed) noexcept {
DestructorGuard dg(this);
{ {
folly::SpinLockGuard g(queue_->spinlock_); folly::SpinLockGuard g(queue_->spinlock_);
if (queue_->draining_) { if (queue_->draining_) {
......
...@@ -360,15 +360,16 @@ void QueueTest::destroyCallback() { ...@@ -360,15 +360,16 @@ void QueueTest::destroyCallback() {
// avoid destroying the function object. // avoid destroying the function object.
class DestroyTestConsumer : public IntQueue::Consumer { class DestroyTestConsumer : public IntQueue::Consumer {
public: public:
DestroyTestConsumer() {}
void messageAvailable(int&& value) override { void messageAvailable(int&& value) override {
DestructorGuard g(this);
if (fn && *fn) { if (fn && *fn) {
(*fn)(value); (*fn)(value);
} }
} }
std::function<void(int)> *fn; std::function<void(int)> *fn;
protected:
virtual ~DestroyTestConsumer() = default;
}; };
EventBase eventBase; EventBase eventBase;
...@@ -381,11 +382,13 @@ void QueueTest::destroyCallback() { ...@@ -381,11 +382,13 @@ void QueueTest::destroyCallback() {
// This way one consumer will be destroyed from inside its messageAvailable() // This way one consumer will be destroyed from inside its messageAvailable()
// callback, and one consume will be destroyed when it isn't inside // callback, and one consume will be destroyed when it isn't inside
// messageAvailable(). // messageAvailable().
std::unique_ptr<DestroyTestConsumer> consumer1(new DestroyTestConsumer); std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
std::unique_ptr<DestroyTestConsumer> consumer2(new DestroyTestConsumer); consumer1(new DestroyTestConsumer);
std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
consumer2(new DestroyTestConsumer);
std::function<void(int)> fn = [&](int) { std::function<void(int)> fn = [&](int) {
consumer1.reset(); consumer1 = nullptr;
consumer2.reset(); consumer2 = nullptr;
}; };
consumer1->fn = &fn; consumer1->fn = &fn;
consumer2->fn = &fn; consumer2->fn = &fn;
...@@ -617,6 +620,7 @@ TEST(NotificationQueueTest, UseAfterFork) { ...@@ -617,6 +620,7 @@ TEST(NotificationQueueTest, UseAfterFork) {
// We shouldn't reach here. // We shouldn't reach here.
_exit(0); _exit(0);
} }
PCHECK(pid > 0);
// Parent. Wait for the child to exit. // Parent. Wait for the child to exit.
auto waited = waitpid(pid, &childStatus, 0); auto waited = waitpid(pid, &childStatus, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment