Commit 48a1dd79 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add folly::coro::Mutex

Summary:
Adds an async Mutex that allows a coroutine to `co_await` acquiring the lock on a mutex.

This mutex is not cancellable and does not support timeouts, but is very efficient
and does not require any per-operation memory allocations (memory for tracking the list of waiting coroutines is borrowed from the awaiting coroutine frame).

The Mutex class provides fair locking by implementing a FIFO queue of waiting coroutines.
ie. coroutines will acquire the lock in the order that they await the lock operation.
This could be relaxed in the future if needed to provide bounded wait-times but not necessarily FIFO order.

Example usage:
```c++
template<typename T>
class ConcurrentStack
{
  folly::coro::Mutex mutex_;
  std::vector<T> values_;
public:
  coro::Task<void> pushAsync(T value)
  {
    auto lock = co_await mutex_.scopedLockAsync();
    values_.push_back(value);
  }

  coro::Task<Optional<T>> tryPopAsync()
  {
    auto lock = co_await mutex_.scopedLockAsync();
    if (values_.empty()) co_return Optional<T>{};
    T result = values_.back();
    values_.pop_back();
    co_return result;
  }
};
```

Reviewed By: andriigrynenko

Differential Revision: D9209637

fbshipit-source-id: 4b81c8f33b9fc39781f4237d1821b0e0b2e4f3c5
parent db0bf37e
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/Mutex.h>
#include <cassert>
using namespace folly::coro;
Mutex::~Mutex() {
// Check there are no waiters waiting to acquire the lock.
assert(
state_.load(std::memory_order_relaxed) == unlockedState() ||
state_.load(std::memory_order_relaxed) == nullptr);
assert(waiters_ == nullptr);
}
void Mutex::unlock() noexcept {
assert(state_.load(std::memory_order_relaxed) != unlockedState());
auto* waitersHead = waiters_;
if (waitersHead == nullptr) {
void* currentState = state_.load(std::memory_order_relaxed);
if (currentState == nullptr) {
// Looks like there are no waiters waiting to acquire the lock.
// Try to unlock it - use a compare-exchange to decide the race between
// unlocking the mutex and another thread enqueueing another waiter.
const bool releasedLock = state_.compare_exchange_strong(
currentState,
unlockedState(),
std::memory_order_release,
std::memory_order_relaxed);
if (releasedLock) {
return;
}
}
// There are some awaiters that have been newly queued.
// Dequeue them and reverse their order from LIFO to FIFO.
currentState = state_.exchange(nullptr, std::memory_order_acquire);
assert(currentState != unlockedState());
assert(currentState != nullptr);
auto* waiter = static_cast<LockOperation*>(currentState);
do {
auto* temp = waiter->next_;
waiter->next_ = waitersHead;
waitersHead = waiter;
waiter = temp;
} while (waiter != nullptr);
}
assert(waitersHead != nullptr);
waiters_ = waitersHead->next_;
waitersHead->awaitingCoroutine_.resume();
}
bool Mutex::lockAsyncImpl(LockOperation* awaiter) {
void* oldValue = state_.load(std::memory_order_relaxed);
while (true) {
if (oldValue == unlockedState()) {
// It looks like the mutex is currently unlocked.
// Try to acquire it synchronously.
void* newValue = nullptr;
if (state_.compare_exchange_weak(
oldValue,
newValue,
std::memory_order_acquire,
std::memory_order_relaxed)) {
// Acquired synchronously, don't suspend.
return false;
}
} else {
// It looks like the mutex is currently locked.
// Try to queue this waiter to the list of waiters.
void* newValue = awaiter;
awaiter->next_ = static_cast<LockOperation*>(oldValue);
if (state_.compare_exchange_weak(
oldValue,
newValue,
std::memory_order_release,
std::memory_order_relaxed)) {
// Queued waiter successfully. Awaiting coroutine should suspend.
return true;
}
}
}
}
#endif
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <experimental/coroutine>
#include <mutex>
namespace folly {
namespace coro {
/// A mutex that can be locked asynchronously using 'co_await'.
///
/// Ownership of the mutex is not tied to any particular thread.
/// This allows the coroutine owning the lock to transition from one thread
/// to another while holding the lock and then perform the unlock() operation
/// on another thread.
///
/// This mutex guarantees a FIFO scheduling algorithm - coroutines acquire the
/// lock in the order that they execute the 'co_await mutex.lockAsync()'
/// operation.
///
/// Note that you cannot use std::scoped_lock/std::lock_guard to acquire the
/// lock as the lock must be acquired with use of 'co_await' which cannot be
/// used in a constructor.
///
/// You can still use the std::scoped_lock/std::lock_guard in conjunction with
/// std::adopt_lock to automatically unlock the mutex when the current scope
/// exits after having locked the mutex using either co_await m.lock_async()
/// or try_lock() .
///
/// You can also attempt to acquire the lock using std::unique_lock in
/// conjunction with std::try_to_lock.
///
/// For example:
/// folly::coro::Mutex m;
/// folly::Executor& executor;
///
/// folly::coro::Task<> asyncScopedLockExample()
/// {
/// std::unique_lock<folly::coro::Mutex> lock{co_await m.co_scoped_lock()};
/// ...
/// }
///
/// folly::coro::Task<> asyncManualLockAndUnlock()
/// {
/// co_await m.co_lock(executor);
/// ...
/// m.unlock();
/// }
///
/// void nonAsyncTryLock()
/// {
/// if (m.try_lock())
/// {
/// // Once the lock is acquired you can pass ownership of the lock to
/// // a std::lock_guard object.
/// std::lock_guard<folly::coro::Mutex> lock{m, std::adopt_lock};
/// ...
/// }
/// }
///
/// void nonAsyncScopedTryLock()
/// {
/// std::unique_lock<folly::coro::Mutex> lock{m, std::try_to_lock};
/// if (lock)
/// {
/// ...
/// }
/// }
class Mutex {
class ScopedLockOperation;
class LockOperation;
public:
/// Construct a new async mutex that is initially unlocked.
Mutex() noexcept : state_(unlockedState()), waiters_(nullptr) {}
Mutex(const Mutex&) = delete;
Mutex(Mutex&&) = delete;
Mutex& operator=(const Mutex&) = delete;
Mutex& operator=(Mutex&&) = delete;
~Mutex();
/// Try to lock the mutex synchronously.
///
/// Returns true if the lock was able to be acquired synchronously, false
/// if the lock could not be acquired because it was already locked.
///
/// If this method returns true then the caller is responsible for ensuring
/// that unlock() is called to release the lock.
bool try_lock() noexcept {
void* oldValue = unlockedState();
return state_.compare_exchange_strong(
oldValue,
nullptr,
std::memory_order_acquire,
std::memory_order_relaxed);
}
/// Lock the mutex asynchronously, returning an RAII object that will release
/// the lock at the end of the scope.
///
/// You must co_await the return value to wait until the lock is acquired.
///
/// Chain a call to .via() to specify the executor to resume on when the lock
/// is eventually acquired in the case that the lock could not be acquired
/// synchronously. The awaiting coroutine will continue without suspending
/// if the lock could be acquired synchronously.
///
/// If you do not specify an executor by calling .via() then the inline
/// executor is used and the awaiting coroutine is resumed inline inside the
/// call to .unlock() by the previous lock holder.
[[nodiscard]] ScopedLockOperation co_scoped_lock() noexcept;
/// Lock the mutex asynchronously.
///
/// You must co_await the return value to wait until the lock is acquired.
///
/// Chain a call to .via() to specify the executor to resume on when the lock
/// is eventually acquired in the case that the lock could not be acquired
/// synchronously. The awaiting coroutine will continue without suspending
/// if the lock could be acquired synchronously.
///
/// Once the 'co_await m.lockAsync()' operation completes, the awaiting
/// coroutine is responsible for ensuring that .unlock() is called to release
/// the lock.
///
/// Consider using scopedLockAsync() instead to obtain a std::scoped_lock
/// that handles releasing the lock at the end of the scope.
[[nodiscard]] LockOperation co_lock() noexcept;
/// Unlock the mutex.
///
/// If there are other coroutines waiting to lock the mutex then this will
/// schedule the resumption of the next coroutine in the queue.
void unlock() noexcept;
private:
class LockOperation {
public:
bool await_ready() noexcept {
return mutex_.try_lock();
}
bool await_suspend(
std::experimental::coroutine_handle<> awaitingCoroutine) noexcept {
awaitingCoroutine_ = awaitingCoroutine;
return mutex_.lockAsyncImpl(this);
}
void await_resume() noexcept {}
protected:
friend class Mutex;
explicit LockOperation(Mutex& mutex) noexcept : mutex_(mutex) {}
Mutex& mutex_;
std::experimental::coroutine_handle<> awaitingCoroutine_;
LockOperation* next_;
};
class ScopedLockOperation : public LockOperation {
public:
std::unique_lock<Mutex> await_resume() noexcept {
return std::unique_lock<Mutex>{mutex_, std::adopt_lock};
}
private:
friend class Mutex;
using LockOperation::LockOperation;
};
friend class LockOperation;
// Special value for state_ that indicates the mutex is not locked.
void* unlockedState() noexcept {
return this;
}
// Try to lock the mutex.
//
// Returns true if the lock could not be acquired synchronously and awaiting
// coroutine should suspend. In this case the coroutine will be resumed later
// once it acquires the mutex. Returns false if the lock was acquired
// synchronously and the awaiting coroutine should continue without
// suspending.
bool lockAsyncImpl(LockOperation* awaiter);
// This contains either:
// - this => Not locked
// - nullptr => Locked, no newly queued waiters (ie. empty list of waiters)
// - other => Pointer to first LockAwaiterBase* in a linked-list of newly
// queued awaiters in LIFO order.
std::atomic<void*> state_;
// Linked-list of waiters in FIFO order.
// Only the current lock holder is allowed to access this member.
LockOperation* waiters_;
};
inline Mutex::ScopedLockOperation Mutex::co_scoped_lock() noexcept {
return ScopedLockOperation{*this};
}
inline Mutex::LockOperation Mutex::co_lock() noexcept {
return LockOperation{*this};
}
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/Future.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/Promise.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
#include <mutex>
using namespace folly;
TEST(Mutex, TryLock) {
coro::Mutex m;
CHECK(m.try_lock());
CHECK(!m.try_lock());
m.unlock();
CHECK(m.try_lock());
}
TEST(Mutex, ScopedLock) {
coro::Mutex m;
{
std::unique_lock<coro::Mutex> lock{m, std::try_to_lock};
CHECK(lock.owns_lock());
{
std::unique_lock<coro::Mutex> lock2{m, std::try_to_lock};
CHECK(!lock2.owns_lock());
}
}
CHECK(m.try_lock());
m.unlock();
}
TEST(Mutex, LockAsync) {
coro::Mutex m;
coro::Baton b1;
coro::Baton b2;
int value = 0;
auto makeTask = [&](coro::Baton& b) -> coro::Task<void> {
co_await m.co_lock();
++value;
co_await b.waitAsync();
++value;
m.unlock();
};
auto& inlineExecutor = InlineExecutor::instance();
auto f1 = makeTask(b1).scheduleVia(&inlineExecutor);
CHECK_EQ(1, value);
CHECK(!m.try_lock());
auto f2 = makeTask(b2).scheduleVia(&inlineExecutor);
CHECK_EQ(1, value);
// This will resume f1 coroutine and let it release the
// lock. This will in turn resume f2 which was suspended
// at co_await m.lockAsync() which will then increment the value
// before becoming blocked on
b1.post();
CHECK_EQ(3, value);
CHECK(!m.try_lock());
b2.post();
CHECK_EQ(4, value);
CHECK(m.try_lock());
}
TEST(Mutex, ScopedLockAsync) {
coro::Mutex m;
coro::Baton b1;
coro::Baton b2;
int value = 0;
auto makeTask = [&](coro::Baton& b) -> coro::Task<void> {
auto lock = co_await m.co_scoped_lock();
++value;
co_await b.waitAsync();
++value;
};
auto& inlineExecutor = InlineExecutor::instance();
auto f1 = makeTask(b1).scheduleVia(&inlineExecutor);
CHECK_EQ(1, value);
CHECK(!m.try_lock());
auto f2 = makeTask(b2).scheduleVia(&inlineExecutor);
CHECK_EQ(1, value);
// This will resume f1 coroutine and let it release the
// lock. This will in turn resume f2 which was suspended
// at co_await m.lockAsync() which will then increment the value
// before becoming blocked on
b1.post();
CHECK_EQ(3, value);
CHECK(!m.try_lock());
b2.post();
CHECK_EQ(4, value);
CHECK(m.try_lock());
}
TEST(Mutex, ThreadSafety) {
CPUThreadPoolExecutor threadPool{
2, std::make_shared<NamedThreadFactory>("CPUThreadPool")};
int value = 0;
coro::Mutex mutex;
auto makeTask = [&]() -> coro::Task<void> {
for (int i = 0; i < 10'000; ++i) {
auto lock = co_await mutex.co_scoped_lock();
++value;
}
};
auto f1 = makeTask().scheduleVia(&threadPool);
auto f2 = makeTask().scheduleVia(&threadPool);
auto f3 = makeTask().scheduleVia(&threadPool);
f1.wait();
f2.wait();
f3.wait();
CHECK_EQ(30'000, value);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment