Commit 6b01128d authored by Giuseppe Ottaviano's avatar Giuseppe Ottaviano Committed by Facebook GitHub Bot

Optimize the storage of the interrupt handler

Summary:
`std::function` has a footprint of 32 bytes and (almost) always allocates the interrupt handler. By using an intrusively reference-counted atomic pointer the footprint is just 8 bytes, and we save further 8 bytes by eliminating `interruptHandlerSet_` (it's a `bool`, but poorly aligned). This also allows to share the handler along the continuation chain, instead of copying for every core.

In addition, the `getInterruptHandler()`/`setInterruptHandlerNoLock()` API was replaced by a single `initializeInterruptHandlerFrom()`, so we don't need to expose the internal storage details anymore.

Reviewed By: yfeldblum

Differential Revision: D22474230

fbshipit-source-id: 059828de3b89c25684465baf8e94bc1b68dac0da
parent dceeeb92
...@@ -370,7 +370,7 @@ FutureBase<T>::thenImplementation( ...@@ -370,7 +370,7 @@ FutureBase<T>::thenImplementation(
typedef typename R::ReturnsFuture::Inner B; typedef typename R::ReturnsFuture::Inner B;
Promise<B> p; Promise<B> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
// grab the Future now before we lose our handle on the Promise // grab the Future now before we lose our handle on the Promise
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
...@@ -453,7 +453,7 @@ FutureBase<T>::thenImplementation( ...@@ -453,7 +453,7 @@ FutureBase<T>::thenImplementation(
typedef typename R::ReturnsFuture::Inner B; typedef typename R::ReturnsFuture::Inner B;
Promise<B> p; Promise<B> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
// grab the Future now before we lose our handle on the Promise // grab the Future now before we lose our handle on the Promise
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
...@@ -1098,7 +1098,7 @@ typename std::enable_if< ...@@ -1098,7 +1098,7 @@ typename std::enable_if<
Future<T>>::type Future<T>>::type
Future<T>::thenError(tag_t<ExceptionType>, F&& func) && { Future<T>::thenError(tag_t<ExceptionType>, F&& func) && {
Promise<T> p; Promise<T> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
auto* ePtr = this->getExecutor(); auto* ePtr = this->getExecutor();
auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
...@@ -1133,7 +1133,7 @@ typename std::enable_if< ...@@ -1133,7 +1133,7 @@ typename std::enable_if<
Future<T>>::type Future<T>>::type
Future<T>::thenError(tag_t<ExceptionType>, F&& func) && { Future<T>::thenError(tag_t<ExceptionType>, F&& func) && {
Promise<T> p; Promise<T> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
auto* ePtr = this->getExecutor(); auto* ePtr = this->getExecutor();
auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
...@@ -1164,7 +1164,7 @@ Future<T>::thenError(F&& func) && { ...@@ -1164,7 +1164,7 @@ Future<T>::thenError(F&& func) && {
auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
Promise<T> p; Promise<T> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
this->setCallback_([state = futures::detail::makeCoreCallbackState( this->setCallback_([state = futures::detail::makeCoreCallbackState(
std::move(p), std::forward<F>(func))]( std::move(p), std::forward<F>(func))](
...@@ -1198,7 +1198,7 @@ Future<T>::thenError(F&& func) && { ...@@ -1198,7 +1198,7 @@ Future<T>::thenError(F&& func) && {
auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
Promise<T> p; Promise<T> p;
p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler()); p.core_->initCopyInterruptHandlerFrom(this->getCore());
auto sf = p.getSemiFuture(); auto sf = p.getSemiFuture();
this->setCallback_([state = futures::detail::makeCoreCallbackState( this->setCallback_([state = futures::detail::makeCoreCallbackState(
std::move(p), std::forward<F>(func))]( std::move(p), std::forward<F>(func))](
...@@ -2143,7 +2143,7 @@ Future<T> convertFuture(SemiFuture<T>&& sf, const Future<T>& f) { ...@@ -2143,7 +2143,7 @@ Future<T> convertFuture(SemiFuture<T>&& sf, const Future<T>& f) {
// Carry executor from f, inserting an inline executor if it did not have one // Carry executor from f, inserting an inline executor if it did not have one
auto* exe = f.getExecutor(); auto* exe = f.getExecutor();
auto newFut = std::move(sf).via(exe ? exe : &InlineExecutor::instance()); auto newFut = std::move(sf).via(exe ? exe : &InlineExecutor::instance());
newFut.core_->setInterruptHandlerNoLock(f.core_->getInterruptHandler()); newFut.core_->initCopyInterruptHandlerFrom(*f.core_);
return newFut; return newFut;
} }
......
...@@ -246,20 +246,34 @@ DeferredWrapper DeferredExecutor::copy() { ...@@ -246,20 +246,34 @@ DeferredWrapper DeferredExecutor::copy() {
DeferredExecutor::DeferredExecutor() {} DeferredExecutor::DeferredExecutor() {}
bool DeferredExecutor::acquire() { void DeferredExecutor::acquire() {
auto keepAliveCount = keepAliveCount_.fetch_add(1, std::memory_order_relaxed); auto keepAliveCount = keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
DCHECK(keepAliveCount > 0); DCHECK_GT(keepAliveCount, 0);
return true;
} }
void DeferredExecutor::release() { void DeferredExecutor::release() {
auto keepAliveCount = keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel); auto keepAliveCount = keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK(keepAliveCount > 0); DCHECK_GT(keepAliveCount, 0);
if (keepAliveCount == 1) { if (keepAliveCount == 1) {
delete this; delete this;
} }
} }
InterruptHandler::~InterruptHandler() = default;
void InterruptHandler::acquire() {
auto refCount = refCount_.fetch_add(1, std::memory_order_relaxed);
DCHECK_GT(refCount, 0);
}
void InterruptHandler::release() {
auto refCount = refCount_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK_GT(refCount, 0);
if (refCount == 1) {
delete this;
}
}
bool CoreBase::hasResult() const noexcept { bool CoreBase::hasResult() const noexcept {
constexpr auto allowed = State::OnlyResult | State::Done; constexpr auto allowed = State::OnlyResult | State::Done;
auto core = this; auto core = this;
...@@ -298,18 +312,22 @@ void CoreBase::raise(exception_wrapper e) { ...@@ -298,18 +312,22 @@ void CoreBase::raise(exception_wrapper e) {
std::lock_guard<SpinLock> lock(interruptLock_); std::lock_guard<SpinLock> lock(interruptLock_);
if (!interrupt_ && !hasResult()) { if (!interrupt_ && !hasResult()) {
interrupt_ = std::make_unique<exception_wrapper>(std::move(e)); interrupt_ = std::make_unique<exception_wrapper>(std::move(e));
if (interruptHandler_) { auto interruptHandler = interruptHandler_.load(std::memory_order_relaxed);
interruptHandler_(*interrupt_); if (interruptHandler) {
interruptHandler->handle(*interrupt_);
} }
} }
} }
std::function<void(exception_wrapper const&)> CoreBase::getInterruptHandler() { void CoreBase::initCopyInterruptHandlerFrom(const CoreBase& other) {
if (!interruptHandlerSet_.load(std::memory_order_acquire)) { auto interruptHandler =
return nullptr; other.interruptHandler_.load(std::memory_order_acquire);
if (interruptHandler != nullptr) {
interruptHandler->acquire();
} }
std::lock_guard<SpinLock> lock(interruptLock_); auto oldInterruptHandler =
return interruptHandler_; interruptHandler_.exchange(interruptHandler, std::memory_order_release);
DCHECK(oldInterruptHandler == nullptr);
} }
class CoreBase::CoreAndCallbackReference { class CoreBase::CoreAndCallbackReference {
...@@ -346,7 +364,12 @@ class CoreBase::CoreAndCallbackReference { ...@@ -346,7 +364,12 @@ class CoreBase::CoreAndCallbackReference {
CoreBase::CoreBase(State state, unsigned char attached) CoreBase::CoreBase(State state, unsigned char attached)
: state_(state), attached_(attached) {} : state_(state), attached_(attached) {}
CoreBase::~CoreBase() {} CoreBase::~CoreBase() {
auto interruptHandler = interruptHandler_.load(std::memory_order_relaxed);
if (interruptHandler != nullptr) {
interruptHandler->release();
}
}
void CoreBase::setCallback_( void CoreBase::setCallback_(
Callback&& callback, Callback&& callback,
......
...@@ -167,8 +167,7 @@ class DeferredExecutor final { ...@@ -167,8 +167,7 @@ class DeferredExecutor final {
DeferredExecutor(); DeferredExecutor();
bool acquire(); void acquire();
void release(); void release();
enum class State { EMPTY, HAS_FUNCTION, HAS_EXECUTOR, DETACHED }; enum class State { EMPTY, HAS_FUNCTION, HAS_EXECUTOR, DETACHED };
...@@ -180,6 +179,32 @@ class DeferredExecutor final { ...@@ -180,6 +179,32 @@ class DeferredExecutor final {
std::atomic<ssize_t> keepAliveCount_{1}; std::atomic<ssize_t> keepAliveCount_{1};
}; };
class InterruptHandler {
public:
virtual ~InterruptHandler();
virtual void handle(const folly::exception_wrapper& ew) const = 0;
void acquire();
void release();
private:
std::atomic<ssize_t> refCount_{1};
};
template <class F>
class InterruptHandlerImpl : public InterruptHandler {
public:
explicit InterruptHandlerImpl(F f) : f_(std::move(f)) {}
void handle(const folly::exception_wrapper& ew) const override {
f_(ew);
}
private:
F f_;
};
/// The shared state object for Future and Promise. /// The shared state object for Future and Promise.
/// ///
/// Nomenclature: /// Nomenclature:
...@@ -393,7 +418,12 @@ class CoreBase { ...@@ -393,7 +418,12 @@ class CoreBase {
/// Has no effect if State is OnlyResult or Done. /// Has no effect if State is OnlyResult or Done.
void raise(exception_wrapper e); void raise(exception_wrapper e);
std::function<void(exception_wrapper const&)> getInterruptHandler(); /// Copy the interrupt handler from another core. This should be done only
/// when initializing a new core:
///
/// - interruptHandler_ must be nullptr
/// - interruptLock_ is not acquired.
void initCopyInterruptHandlerFrom(const CoreBase& other);
/// Call only from producer thread /// Call only from producer thread
/// ///
...@@ -413,17 +443,17 @@ class CoreBase { ...@@ -413,17 +443,17 @@ class CoreBase {
if (interrupt_) { if (interrupt_) {
fn(as_const(*interrupt_)); fn(as_const(*interrupt_));
} else { } else {
setInterruptHandlerNoLock(std::forward<F>(fn)); auto oldInterruptHandler = interruptHandler_.exchange(
new InterruptHandlerImpl<typename std::decay<F>::type>(
std::forward<F>(fn)),
std::memory_order_relaxed);
if (oldInterruptHandler) {
oldInterruptHandler->release();
}
} }
} }
} }
void setInterruptHandlerNoLock(
std::function<void(exception_wrapper const&)> fn) {
interruptHandlerSet_.store(true, std::memory_order_relaxed);
interruptHandler_ = std::move(fn);
}
protected: protected:
CoreBase(State state, unsigned char attached); CoreBase(State state, unsigned char attached);
...@@ -453,14 +483,13 @@ class CoreBase { ...@@ -453,14 +483,13 @@ class CoreBase {
std::atomic<State> state_; std::atomic<State> state_;
std::atomic<unsigned char> attached_; std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_{0}; std::atomic<unsigned char> callbackReferences_{0};
std::atomic<bool> interruptHandlerSet_{false};
SpinLock interruptLock_; SpinLock interruptLock_;
KeepAliveOrDeferred executor_; KeepAliveOrDeferred executor_;
union { union {
Context context_; Context context_;
}; };
std::unique_ptr<exception_wrapper> interrupt_{}; std::unique_ptr<exception_wrapper> interrupt_{};
std::function<void(exception_wrapper const&)> interruptHandler_{nullptr}; std::atomic<InterruptHandler*> interruptHandler_{};
CoreBase* proxy_; CoreBase* proxy_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment