Commit 5e4b2f87 authored by Brandon Schlinker's avatar Brandon Schlinker Committed by Facebook GitHub Bot

Lifecycle observer

Summary:
Adds `AsyncTransport::LifecycleCallback`, an observer that is notified when a transport is closed and destroyed.

Currently only supported for `AsyncSocket` and derived types (e.g., `AsyncSSLSocket`).

`AsyncSocket::LifecycleCallback` is derived from `AsyncTransport::LifecycleCallback` and adds support for additional lifecycle events relevant to `AsyncSocket`.

Details:
- Can be used by instrumentation that ties its lifetime to that of the transport.
- Intentionally separate from all existing callbacks that may be added / used by application logic because it is designed to be used by instrumentation that is generic, and thus separate / unaware of application logic.
- Multiple observer can be registered, but a `folly::small_vector` is used to minimize alloc overhead in the common case of 0 - 2 being registered.
- The observer implementation is responsible for calling `removeLifecycleObserver` to remove itself if it is destroyed before the socket is destroyed.

Differential Revision: D21613750

fbshipit-source-id: 92bb5de30bc8bab56fa29e62800bf58e47486f1e
parent 17df5922
...@@ -350,6 +350,16 @@ AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket) ...@@ -350,6 +350,16 @@ AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
oldAsyncSocket->detachNetworkSocket(), oldAsyncSocket->detachNetworkSocket(),
oldAsyncSocket->getZeroCopyBufId()) { oldAsyncSocket->getZeroCopyBufId()) {
preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_); preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
// inform lifecycle observers to give them an opportunity to unsubscribe from
// events for the old socket and subscribe to the new socket; we do not move
// the subscription ourselves
for (const auto& cb : oldAsyncSocket->lifecycleObservers_) {
// only available for observers derived from AsyncSocket::LifecycleObserver
if (auto dCb = dynamic_cast<AsyncSocket::LifecycleObserver*>(cb)) {
dCb->move(oldAsyncSocket.get(), this);
}
}
} }
// init() method, since constructor forwarding isn't supported in most // init() method, since constructor forwarding isn't supported in most
...@@ -380,6 +390,9 @@ AsyncSocket::~AsyncSocket() { ...@@ -380,6 +390,9 @@ AsyncSocket::~AsyncSocket() {
VLOG(7) << "actual destruction of AsyncSocket(this=" << this VLOG(7) << "actual destruction of AsyncSocket(this=" << this
<< ", evb=" << eventBase_ << ", fd=" << fd_ << ", state=" << state_ << ", evb=" << eventBase_ << ", fd=" << fd_ << ", state=" << state_
<< ")"; << ")";
for (const auto& cb : lifecycleObservers_) {
cb->destroy(this);
}
} }
void AsyncSocket::destroy() { void AsyncSocket::destroy() {
...@@ -397,6 +410,12 @@ NetworkSocket AsyncSocket::detachNetworkSocket() { ...@@ -397,6 +410,12 @@ NetworkSocket AsyncSocket::detachNetworkSocket() {
VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_ VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
<< ", evb=" << eventBase_ << ", state=" << state_ << ", evb=" << eventBase_ << ", state=" << state_
<< ", events=" << std::hex << eventFlags_ << ")"; << ", events=" << std::hex << eventFlags_ << ")";
for (const auto& cb : lifecycleObservers_) {
// only available for observers derived from AsyncSocket::LifecycleObserver
if (auto dCb = dynamic_cast<AsyncSocket::LifecycleObserver*>(cb)) {
dCb->fdDetach(this);
}
}
// Extract the fd, and set fd_ to -1 first, so closeNow() won't // Extract the fd, and set fd_ to -1 first, so closeNow() won't
// actually close the descriptor. // actually close the descriptor.
if (const auto socketSet = wShutdownSocketSet_.lock()) { if (const auto socketSet = wShutdownSocketSet_.lock()) {
...@@ -1977,6 +1996,39 @@ bool AsyncSocket::processZeroCopyWriteInProgress() noexcept { ...@@ -1977,6 +1996,39 @@ bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
return idZeroCopyBufPtrMap_.empty(); return idZeroCopyBufPtrMap_.empty();
} }
void AsyncSocket::addLifecycleObserver(
AsyncTransport::LifecycleObserver* observer) {
if (eventBase_) {
eventBase_->dcheckIsInEventBaseThread();
}
lifecycleObservers_.push_back(observer);
observer->observerAttach(this);
}
bool AsyncSocket::removeLifecycleObserver(
AsyncTransport::LifecycleObserver* observer) {
const auto eraseIt = std::remove(
lifecycleObservers_.begin(), lifecycleObservers_.end(), observer);
if (eraseIt == lifecycleObservers_.end()) {
return false;
}
for (auto it = eraseIt; it != lifecycleObservers_.end(); it++) {
(*it)->observerDetach(this);
}
lifecycleObservers_.erase(eraseIt, lifecycleObservers_.end());
return true;
}
std::vector<AsyncTransport::LifecycleObserver*>
AsyncSocket::getLifecycleObservers() const {
if (eventBase_) {
eventBase_->dcheckIsInEventBaseThread();
}
return std::vector<AsyncTransport::LifecycleObserver*>(
lifecycleObservers_.begin(), lifecycleObservers_.end());
}
void AsyncSocket::handleRead() noexcept { void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_ VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_; << ", state=" << state_;
...@@ -2838,6 +2890,9 @@ void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) { ...@@ -2838,6 +2890,9 @@ void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
void AsyncSocket::invokeConnectSuccess() { void AsyncSocket::invokeConnectSuccess() {
connectEndTime_ = std::chrono::steady_clock::now(); connectEndTime_ = std::chrono::steady_clock::now();
for (const auto& cb : lifecycleObservers_) {
cb->connect(this);
}
if (connectCallback_) { if (connectCallback_) {
ConnectCallback* callback = connectCallback_; ConnectCallback* callback = connectCallback_;
connectCallback_ = nullptr; connectCallback_ = nullptr;
...@@ -2888,6 +2943,9 @@ void AsyncSocket::invalidState(WriteCallback* callback) { ...@@ -2888,6 +2943,9 @@ void AsyncSocket::invalidState(WriteCallback* callback) {
} }
void AsyncSocket::doClose() { void AsyncSocket::doClose() {
for (const auto& cb : lifecycleObservers_) {
cb->close(this);
}
if (fd_ == NetworkSocket()) { if (fd_ == NetworkSocket()) {
return; return;
} }
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <folly/io/async/DelayedDestruction.h> #include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventHandler.h> #include <folly/io/async/EventHandler.h>
#include <folly/portability/Sockets.h> #include <folly/portability/Sockets.h>
#include <folly/small_vector.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -966,6 +967,90 @@ class AsyncSocket : public AsyncTransport { ...@@ -966,6 +967,90 @@ class AsyncSocket : public AsyncTransport {
uint32_t totalBytesWritten_{0}; ///< total bytes written uint32_t totalBytesWritten_{0}; ///< total bytes written
}; };
class LifecycleObserver : public AsyncTransport::LifecycleObserver {
public:
using AsyncTransport::LifecycleObserver::LifecycleObserver;
/**
* fdDetach() is invoked if the socket file descriptor is detached.
*
* detachNetworkSocket() will be triggered when a new AsyncSocket is being
* constructed from an old one. See the moved() event for details about
* this special case.
*
* @param socket Socket for which detachNetworkSocket was invoked.
*/
virtual void fdDetach(AsyncSocket* /* socket */) noexcept = 0;
/**
* move() will be invoked when a new AsyncSocket is being constructed via
* constructor AsyncSocket(AsyncSocket* oldAsyncSocket) from an AsyncSocket
* that has an observer attached.
*
* This type of construction is common during TLS/SSL accept process.
* wangle::Acceptor may transform an AsyncSocket to an AsyncFizzServer, and
* then transform the AsyncFizzServer to an AsyncSSLSocket on fallback.
* AsyncFizzServer and AsyncSSLSocket derive from AsyncSocket and at each
* stage the aforementioned constructor will be called.
*
* Observers may be attached when the initial AsyncSocket is created, before
* TLS/SSL accept handling has completed. As a result, AsyncSocket must
* notify the observer during each transformation so that:
* (1) The observer can track these transformations for debugging.
* (2) The observer does not become separated from the underlying
* operating system socket and corresponding file descriptor.
*
* When a new AsyncSocket is being constructed via the aforementioned
* constructor, the following observer events will be triggered:
* (1) fdDetach
* (2) move
*
* When move is triggered, the observer can CHOOSE to detach the old socket
* and attach to the new socket. This process will not happen automatically;
* the observer must explicitly perform these steps.
*
* @param oldSocket Old socket that fd was detached from.
* @param newSocket New socket being constructed with fd attached.
*/
virtual void move(
AsyncSocket* /* oldSocket */,
AsyncSocket* /* newSocket */) noexcept = 0;
};
/**
* Adds a lifecycle observer.
*
* Observers can tie their lifetime to aspects of this socket's lifecycle /
* lifetime and perform inspection at various states.
*
* This enables instrumentation to be added without changing / interfering
* with how the application uses the socket.
*
* Observer should implement AsyncTransport::LifecycleObserver to receive
* additional lifecycle events specific to AsyncSocket.
*
* @param observer Observer to add (implements LifecycleObserver).
*/
void addLifecycleObserver(
AsyncTransport::LifecycleObserver* observer) override;
/**
* Removes a lifecycle observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
bool removeLifecycleObserver(
AsyncTransport::LifecycleObserver* observer) override;
/**
* Returns installed lifecycle observers.
*
* @return Vector with installed observers.
*/
FOLLY_NODISCARD virtual std::vector<AsyncTransport::LifecycleObserver*>
getLifecycleObservers() const override;
protected: protected:
enum ReadResultEnum { enum ReadResultEnum {
READ_EOF = 0, READ_EOF = 0,
...@@ -1293,6 +1378,19 @@ class AsyncSocket : public AsyncTransport { ...@@ -1293,6 +1378,19 @@ class AsyncSocket : public AsyncTransport {
// include failed writes, but it does include buffered writes. // include failed writes, but it does include buffered writes.
size_t totalAppBytesScheduledForWrite_; size_t totalAppBytesScheduledForWrite_;
// Lifecycle observers.
//
// Use small_vector to avoid heap allocation for up to two observers, unless
// mobile, in which case we fallback to std::vector to prioritize code size.
#if !FOLLY_MOBILE
using LifecycleObserverVecImpl =
folly::small_vector<AsyncTransport::LifecycleObserver*, 2>;
#else
using LifecycleObserverVecImpl =
std::vector<AsyncTransport::LifecycleObserver*>;
#endif
LifecycleObserverVecImpl lifecycleObservers_;
// Pre-received data, to be returned to read callback before any data from the // Pre-received data, to be returned to read callback before any data from the
// socket. // socket.
std::unique_ptr<IOBuf> preReceivedData_; std::unique_ptr<IOBuf> preReceivedData_;
......
...@@ -732,6 +732,92 @@ class AsyncTransport : public DelayedDestruction, ...@@ -732,6 +732,92 @@ class AsyncTransport : public DelayedDestruction,
} }
} }
class LifecycleObserver {
public:
virtual ~LifecycleObserver() = default;
/**
* observerAttach() will be invoked when an observer is added.
*
* @param transport Transport where observer was installed.
*/
virtual void observerAttach(AsyncTransport* /* transport */) noexcept = 0;
/**
* observerDetached() will be invoked if the observer is uninstalled prior
* to transport destruction.
*
* No further events will be invoked after observerDetach().
*
* @param transport Transport where observer was uninstalled.
*/
virtual void observerDetach(AsyncTransport* /* transport */) noexcept = 0;
/**
* destroy() will be invoked when the transport's destructor is invoked.
*
* No further events will be invoked after destroy().
*
* @param transport Transport being destroyed.
*/
virtual void destroy(AsyncTransport* /* transport */) noexcept = 0;
/**
* close() will be invoked when the transport is being closed.
*
* Can be called multiple times during shutdown / destruction for the same
* transport. Observers may detach after first call or track if event
* previously observed.
*
* @param transport Transport being closed.
*/
virtual void close(AsyncTransport* /* transport */) noexcept = 0;
/**
* connect() will be invoked when connect() returns successfully.
*
* Triggered before any application connection callback.
*
* @param transport Transport that has connected.
*/
virtual void connect(AsyncTransport* /* transport */) noexcept = 0;
};
/**
* Adds a lifecycle observer.
*
* Observers can tie their lifetime to aspects of this socket's lifecycle /
* lifetime and perform inspection at various states.
*
* This enables instrumentation to be added without changing / interfering
* with how the application uses the socket.
*
* @param observer Observer to add (implements LifecycleObserver).
*/
virtual void addLifecycleObserver(LifecycleObserver* /* observer */) {
CHECK(false) << "addLifecycleObserver() not supported";
}
/**
* Removes a lifecycle observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
virtual bool removeLifecycleObserver(LifecycleObserver* /* observer */) {
CHECK(false) << "removeLifecycleObserver() not supported";
}
/**
* Returns installed lifecycle observers.
*
* @return Vector with installed observers.
*/
FOLLY_NODISCARD virtual std::vector<LifecycleObserver*>
getLifecycleObservers() const {
CHECK(false) << "getLifecycleObservers() not supported";
}
/** /**
* AsyncTransports may wrap other AsyncTransport. This returns the * AsyncTransports may wrap other AsyncTransport. This returns the
* transport that is wrapped. It returns nullptr if there is no wrapped * transport that is wrapped. It returns nullptr if there is no wrapped
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment