Commit aa605f8e authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook GitHub Bot

fix lock order inversion in SharedPromise interrupt handler setting exn

Summary:
The approach taken to fixing the lock inversion is not to use locks. Instead, we take a page from the existing futures playbook and use an atomic acyclic finite state machine! In this case, we have a single atomic pointer which may own either a raised object of type `exception_wrapper` or an interrupt-handler. Both are at least 4-byte aligned (8-byte aligned on 64-bit architectures) so both have the bottom two bits free. We use the bottom two bits as the state machine to identify whether `0x0u` initial where nothing is owned, `0x1u` where an interrupt-handler is owned, `0x2u` where a raised object is owned, or `0x3u` terminal where both an interrupt-handler and a raised object have been set and the handler has been invoked on the object.

As choices, we forbid repeated calls to set the interrupt handler but permit repeated calls to raise an object. Calls after the first to set an interrupt handler terminate while calls to raise an object after the first are ignored. Existing tests demonstrate raising twice so we may like to be cautious about breaking that behavior.

Some semantics are changed. Raised objects and interrupt handlers are destroyed earlier than before: they are now destroyed immediately after invoking handlers on objects.

The lock order inversion is observed by thread sanitizer (tsan) in the new shared-promise test.

Differential Revision: D29250149

fbshipit-source-id: 63e257d03cebbf8dba95a514b7e89680cae569a7
parent 6a6e02c2
......@@ -310,25 +310,63 @@ DeferredWrapper CoreBase::stealDeferredExecutor() {
}
void CoreBase::raise(exception_wrapper e) {
std::lock_guard<SpinLock> lock(interruptLock_);
if (!interrupt_ && !hasResult()) {
interrupt_ = std::make_unique<exception_wrapper>(std::move(e));
auto interruptHandler = interruptHandler_.load(std::memory_order_relaxed);
if (interruptHandler) {
interruptHandler->handle(*interrupt_);
if (hasResult()) {
return;
}
auto interrupt = interrupt_.load(std::memory_order_acquire);
switch (interrupt & InterruptMask) {
case InterruptInitial: { // store the object
assert(!interrupt);
auto object = new exception_wrapper(std::move(e));
auto exchanged = folly::atomic_compare_exchange_strong_explicit(
&interrupt_,
&interrupt,
reinterpret_cast<uintptr_t>(object) | InterruptHasObject,
std::memory_order_release,
std::memory_order_acquire);
if (exchanged) {
return;
}
// lost the race!
e = std::move(*object);
delete object;
if (interrupt & InterruptHasObject) { // ignore all calls after the first
return;
}
assert(interrupt & InterruptHasHandler);
FOLLY_FALLTHROUGH;
}
case InterruptHasHandler: { // invoke the stored handler
auto exchanged = interrupt_.compare_exchange_strong(
interrupt, InterruptTerminal, std::memory_order_relaxed);
if (!exchanged) { // ignore all calls after the first
return;
}
auto handler =
reinterpret_cast<InterruptHandler*>(interrupt & ~InterruptHasHandler);
handler->handle(e);
handler->release();
return;
}
case InterruptHasObject: // ignore all calls after the first
return;
case InterruptTerminal: // ignore all calls after the first
return;
}
}
void CoreBase::initCopyInterruptHandlerFrom(const CoreBase& other) {
auto interruptHandler =
other.interruptHandler_.load(std::memory_order_acquire);
if (interruptHandler != nullptr) {
interruptHandler->acquire();
assert(!interrupt_.load(std::memory_order_relaxed));
auto interrupt = other.interrupt_.load(std::memory_order_acquire);
switch (interrupt & InterruptMask) {
case InterruptHasHandler: { // copy the handler
auto handler =
reinterpret_cast<InterruptHandler*>(interrupt & ~InterruptHasHandler);
handler->acquire();
interrupt_.store(interrupt, std::memory_order_release);
break;
}
}
auto oldInterruptHandler =
interruptHandler_.exchange(interruptHandler, std::memory_order_release);
DCHECK(oldInterruptHandler == nullptr);
}
class CoreBase::CoreAndCallbackReference {
......@@ -362,9 +400,20 @@ CoreBase::CoreBase(State state, unsigned char attached)
: state_(state), attached_(attached) {}
CoreBase::~CoreBase() {
auto interruptHandler = interruptHandler_.load(std::memory_order_relaxed);
if (interruptHandler != nullptr) {
interruptHandler->release();
auto interrupt = interrupt_.load(std::memory_order_acquire);
switch (interrupt & InterruptMask) {
case InterruptHasHandler: {
auto handler =
reinterpret_cast<InterruptHandler*>(interrupt & ~InterruptHasHandler);
handler->release();
break;
}
case InterruptHasObject: {
auto object =
reinterpret_cast<exception_wrapper*>(interrupt & ~InterruptHasObject);
delete object;
break;
}
}
}
......
......@@ -432,19 +432,48 @@ class CoreBase {
/// `setResult()`.
template <typename F>
void setInterruptHandler(F&& fn) {
std::lock_guard<SpinLock> lock(interruptLock_);
if (!hasResult()) {
if (interrupt_) {
fn(as_const(*interrupt_));
} else {
auto oldInterruptHandler = interruptHandler_.exchange(
new InterruptHandlerImpl<typename std::decay<F>::type>(
static_cast<F&&>(fn)),
std::memory_order_relaxed);
if (oldInterruptHandler) {
oldInterruptHandler->release();
using handler_type = InterruptHandlerImpl<std::decay_t<F>>;
if (hasResult()) {
return;
}
auto interrupt = interrupt_.load(std::memory_order_acquire);
switch (interrupt & InterruptMask) {
case InterruptInitial: { // store the handler
assert(!interrupt);
auto handler = new handler_type(static_cast<F&&>(fn));
auto exchanged = folly::atomic_compare_exchange_strong_explicit(
&interrupt_,
&interrupt,
reinterpret_cast<uintptr_t>(handler) | InterruptHasHandler,
std::memory_order_release,
std::memory_order_acquire);
if (exchanged) {
return;
}
// lost the race!
delete handler;
if (interrupt & InterruptHasHandler) {
terminate_with<std::logic_error>("set-interrupt-handler race");
}
assert(interrupt & InterruptHasObject);
FOLLY_FALLTHROUGH;
}
case InterruptHasObject: { // invoke over the stored object
auto exchanged = interrupt_.compare_exchange_strong(
interrupt, InterruptTerminal, std::memory_order_relaxed);
if (!exchanged) {
terminate_with<std::logic_error>("set-interrupt-handler race");
}
auto object = reinterpret_cast<exception_wrapper*>(
interrupt & ~InterruptHasObject);
fn(as_const(*object));
delete object;
return;
}
case InterruptHasHandler: // fail all calls after the first
terminate_with<std::logic_error>("set-interrupt-handler duplicate");
case InterruptTerminal: // fail all calls after the first
terminate_with<std::logic_error>("set-interrupt-handler after done");
}
}
......@@ -457,6 +486,23 @@ class CoreBase {
// `derefCallback` and `detachOne` in the destructor.
class CoreAndCallbackReference;
// interrupt_ is an atomic acyclic finite state machine with guarded state
// which takes the form of either a pointer to a copy of the object passed to
// raise or a pointer to a copy of the handler passed to setInterruptHandler
//
// the object and the handler values are both at least pointer-aligned so they
// leave the bottom 2 bits free on all supported platforms; these bits are
// stolen for the state machine
enum : uintptr_t {
InterruptMask = 0x3u,
};
enum InterruptState : uintptr_t {
InterruptInitial = 0x0u,
InterruptHasHandler = 0x1u,
InterruptHasObject = 0x2u,
InterruptTerminal = 0x3u,
};
void setCallback_(
Callback&& callback,
std::shared_ptr<folly::RequestContext>&& context,
......@@ -477,13 +523,11 @@ class CoreBase {
std::atomic<State> state_;
std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_{0};
SpinLock interruptLock_;
KeepAliveOrDeferred executor_;
union {
Context context_;
};
std::unique_ptr<exception_wrapper> interrupt_{};
std::atomic<InterruptHandler*> interruptHandler_{};
std::atomic<uintptr_t> interrupt_{}; // see InterruptMask, InterruptState
CoreBase* proxy_;
};
......
......@@ -36,11 +36,9 @@ TEST(Core, size) {
std::atomic<futures::detail::State> state_;
std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_;
futures::detail::SpinLock interruptLock_;
KeepAliveOrDeferredGold executor_;
std::shared_ptr<RequestContext> context_;
std::unique_ptr<exception_wrapper> interrupt_;
std::atomic<futures::detail::InterruptHandler*> interruptHandler_{};
std::atomic<uintptr_t> interrupt_;
CoreBaseGold* proxy_;
};
class CoreGold : Try<Unit>, public CoreBaseGold {};
......
......@@ -157,3 +157,12 @@ TEST(SharedPromise, ConstMethods) {
EXPECT_TRUE(fut.isReady());
EXPECT_EQ(42, std::move(fut).get());
}
TEST(SharedPromise, InterruptHandlerSetsException) {
folly::SharedPromise<int> p;
p.setInterruptHandler([&](auto&& ew) { p.setException(ew); });
auto f = p.getSemiFuture();
f.cancel();
ASSERT_TRUE(f.isReady());
EXPECT_THROW(std::move(f).get(), FutureCancellation);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment