Commit ab0aeeea authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Make folly::coro::Mutex lock operations SemiAwaitable

Summary:
Making the `co_scoped_lock()` and `co_lock()` operations semi-awatitable forces the caller to provide an executor on which to resume the awaiting coroutine in the case that the lock could not be acquire synchronously.

This improves the safety of folly::coro::Mutex when used from within an InlineTask which does not implicitly wrap all co_await expressions to ensure transitions back to the executor and would otherwise implicitly resume the next lock-acquirer inline inside the call to .unlock().

Reviewed By: andriigrynenko

Differential Revision: D14708636

fbshipit-source-id: bb9cda1a4a6e53978071a7b98ebf9affeef47c7d
parent 20ddb17b
......@@ -59,7 +59,7 @@ void Mutex::unlock() noexcept {
assert(currentState != unlockedState());
assert(currentState != nullptr);
auto* waiter = static_cast<LockOperation*>(currentState);
auto* waiter = static_cast<LockAwaiter*>(currentState);
do {
auto* temp = waiter->next_;
waiter->next_ = waitersHead;
......@@ -75,7 +75,7 @@ void Mutex::unlock() noexcept {
waitersHead->awaitingCoroutine_.resume();
}
bool Mutex::lockAsyncImpl(LockOperation* awaiter) {
bool Mutex::lockAsyncImpl(LockAwaiter* awaiter) {
void* oldValue = state_.load(std::memory_order_relaxed);
while (true) {
if (oldValue == unlockedState()) {
......@@ -94,7 +94,7 @@ bool Mutex::lockAsyncImpl(LockOperation* awaiter) {
// It looks like the mutex is currently locked.
// Try to queue this waiter to the list of waiters.
void* newValue = awaiter;
awaiter->next_ = static_cast<LockOperation*>(oldValue);
awaiter->next_ = static_cast<LockAwaiter*>(oldValue);
if (state_.compare_exchange_weak(
oldValue,
newValue,
......
......@@ -15,6 +15,9 @@
*/
#pragma once
#include <folly/Executor.h>
#include <folly/experimental/coro/ViaIfAsync.h>
#include <atomic>
#include <experimental/coroutine>
#include <mutex>
......@@ -30,7 +33,7 @@ namespace coro {
/// on another thread.
///
/// This mutex guarantees a FIFO scheduling algorithm - coroutines acquire the
/// lock in the order that they execute the 'co_await mutex.lockAsync()'
/// lock in the order that they execute the 'co_await mutex.co_lock()'
/// operation.
///
/// Note that you cannot use std::scoped_lock/std::lock_guard to acquire the
......@@ -39,8 +42,8 @@ namespace coro {
///
/// You can still use the std::scoped_lock/std::lock_guard in conjunction with
/// std::adopt_lock to automatically unlock the mutex when the current scope
/// exits after having locked the mutex using either co_await m.lock_async()
/// or try_lock() .
/// exits after having locked the mutex using either 'co_await m.co_lock()'
/// or 'm.try_lock()'.
///
/// You can also attempt to acquire the lock using std::unique_lock in
/// conjunction with std::try_to_lock.
......@@ -82,7 +85,9 @@ namespace coro {
/// }
/// }
class Mutex {
class ScopedLockOperation;
class ScopedLockAwaiter;
class LockAwaiter;
template <typename Awaiter>
class LockOperation;
public:
......@@ -117,32 +122,30 @@ class Mutex {
///
/// You must co_await the return value to wait until the lock is acquired.
///
/// Chain a call to .via() to specify the executor to resume on when the lock
/// is eventually acquired in the case that the lock could not be acquired
/// synchronously. The awaiting coroutine will continue without suspending
/// if the lock could be acquired synchronously.
///
/// If you do not specify an executor by calling .via() then the inline
/// executor is used and the awaiting coroutine is resumed inline inside the
/// call to .unlock() by the previous lock holder.
[[nodiscard]] ScopedLockOperation co_scoped_lock() noexcept;
/// Chain a call to .viaIfAsync() to specify the executor to resume on when
/// the lock is eventually acquired in the case that the lock could not be
/// acquired synchronously. Note that the executor will be passed implicitly
/// if awaiting from a Task or AsyncGenerator coroutine. The awaiting
/// coroutine will continue without suspending if the lock could be acquired
/// synchronously.
[[nodiscard]] LockOperation<ScopedLockAwaiter> co_scoped_lock() noexcept;
/// Lock the mutex asynchronously.
///
/// You must co_await the return value to wait until the lock is acquired.
///
/// Chain a call to .via() to specify the executor to resume on when the lock
/// is eventually acquired in the case that the lock could not be acquired
/// synchronously. The awaiting coroutine will continue without suspending
/// if the lock could be acquired synchronously.
/// Chain a call to .viaIfAsync() to specify the executor to resume on when
/// the lock is eventually acquired in the case that the lock could not be
/// acquired synchronously. The awaiting coroutine will continue without
/// suspending if the lock could be acquired synchronously.
///
/// Once the 'co_await m.lockAsync()' operation completes, the awaiting
/// Once the 'co_await m.co_lock()' operation completes, the awaiting
/// coroutine is responsible for ensuring that .unlock() is called to release
/// the lock.
///
/// Consider using scopedLockAsync() instead to obtain a std::scoped_lock
/// Consider using co_scoped_lock() instead to obtain a std::scoped_lock
/// that handles releasing the lock at the end of the scope.
[[nodiscard]] LockOperation co_lock() noexcept;
[[nodiscard]] LockOperation<LockAwaiter> co_lock() noexcept;
/// Unlock the mutex.
///
......@@ -151,8 +154,10 @@ class Mutex {
void unlock() noexcept;
private:
class LockOperation {
class LockAwaiter {
public:
explicit LockAwaiter(Mutex& mutex) noexcept : mutex_(mutex) {}
bool await_ready() noexcept {
return mutex_.try_lock();
}
......@@ -166,28 +171,37 @@ class Mutex {
void await_resume() noexcept {}
protected:
friend class Mutex;
Mutex& mutex_;
explicit LockOperation(Mutex& mutex) noexcept : mutex_(mutex) {}
private:
friend Mutex;
Mutex& mutex_;
std::experimental::coroutine_handle<> awaitingCoroutine_;
LockOperation* next_;
LockAwaiter* next_;
};
class ScopedLockOperation : public LockOperation {
class ScopedLockAwaiter : public LockAwaiter {
public:
using LockAwaiter::LockAwaiter;
std::unique_lock<Mutex> await_resume() noexcept {
return std::unique_lock<Mutex>{mutex_, std::adopt_lock};
}
};
template <typename Awaiter>
class LockOperation {
public:
explicit LockOperation(Mutex& mutex) noexcept : mutex_(mutex) {}
auto viaIfAsync(folly::Executor::KeepAlive<> executor) const {
return folly::coro::co_viaIfAsync(std::move(executor), Awaiter{mutex_});
}
private:
friend class Mutex;
using LockOperation::LockOperation;
Mutex& mutex_;
};
friend class LockOperation;
// Special value for state_ that indicates the mutex is not locked.
void* unlockedState() noexcept {
return this;
......@@ -200,26 +214,27 @@ class Mutex {
// once it acquires the mutex. Returns false if the lock was acquired
// synchronously and the awaiting coroutine should continue without
// suspending.
bool lockAsyncImpl(LockOperation* awaiter);
bool lockAsyncImpl(LockAwaiter* awaiter);
// This contains either:
// - this => Not locked
// - nullptr => Locked, no newly queued waiters (ie. empty list of waiters)
// - other => Pointer to first LockAwaiterBase* in a linked-list of newly
// - other => Pointer to first LockAwaiter* in a linked-list of newly
// queued awaiters in LIFO order.
std::atomic<void*> state_;
// Linked-list of waiters in FIFO order.
// Only the current lock holder is allowed to access this member.
LockOperation* waiters_;
LockAwaiter* waiters_;
};
inline Mutex::ScopedLockOperation Mutex::co_scoped_lock() noexcept {
return ScopedLockOperation{*this};
inline Mutex::LockOperation<Mutex::ScopedLockAwaiter>
Mutex::co_scoped_lock() noexcept {
return LockOperation<ScopedLockAwaiter>{*this};
}
inline Mutex::LockOperation Mutex::co_lock() noexcept {
return LockOperation{*this};
inline Mutex::LockOperation<Mutex::LockAwaiter> Mutex::co_lock() noexcept {
return LockOperation<LockAwaiter>{*this};
}
} // namespace coro
......
......@@ -156,37 +156,4 @@ TEST(Mutex, ThreadSafety) {
CHECK_EQ(30'000, value);
}
TEST(Mutex, InlineTaskDeadlock) {
coro::Mutex coroMutex;
std::timed_mutex stdMutex;
std::thread thread1([&] {
coro::blockingWait(
[](auto& coroMutex, auto& stdMutex) -> coro::detail::InlineTask<void> {
co_await coroMutex.co_lock();
std::this_thread::sleep_for(std::chrono::milliseconds{200});
stdMutex.lock();
// At this point the other coroutine is suspended waiting on
// coroMutex.co_lock(). coroMutex.unlock() will unlock the mutex and
// run the other coroutine *inline*. That coroutine will
// try to acquire stdMutex resulting in a deadlock.
coroMutex.unlock();
stdMutex.unlock();
}(coroMutex, stdMutex));
});
std::thread thread2([&] {
coro::blockingWait(
[](auto& coroMutex, auto& stdMutex) -> coro::detail::InlineTask<void> {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
co_await coroMutex.co_lock();
EXPECT_FALSE(stdMutex.try_lock_for(std::chrono::milliseconds{500}));
coroMutex.unlock();
}(coroMutex, stdMutex));
});
thread1.join();
thread2.join();
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment