Commit fbe60fe1 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add simple SharedMutex to folly::coro

Summary:
Adds a simple SharedMutex type to the folly::coro namespace.

The `co_[scoped_]lock[_shared]()` methods return semi-awaitables that require the caller to provide an executor to resume on in the case that the lock could not be acquired synchronously. This avoids some potential issues that could occur if the `.unlock()` operation were to resume awaiting coroutines inline.

If you are awaiting within a `folly::coro::Task` then the current executor is implicitly provided. Otherwise, the caller can explicitly provide an executor by calling `.viaIfAsync()`.

The implementation has not been optimised and currently just relies on a `SpinLock` to synchronise access to internal state.

The main aim for this change is to make available a SharedMutex abstraction with the desired API that applications can start writing against which we can later optimise as required.

Reviewed By: andriigrynenko

Differential Revision: D9995286

fbshipit-source-id: aa141ad241d29daff2df5f7296161517c99ab8ef
parent 61a8ac33
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <mutex>
#include <utility>
#include <glog/logging.h>
#include <folly/Portability.h>
namespace folly {
namespace coro {
/// This type mirrors the interface of std::shared_lock as much as possible.
///
/// The main difference between this type and std::shared_lock is that this
/// type is designed to be used with asynchronous shared-mutex types where
/// the lock acquisition is an asynchronous operation.
///
/// TODO: Actually implement the .co_lock() method on this class.
///
/// Workaround for now is to use:
/// SharedLock<SharedMutex> lock{mutex, std::defer_lock};
/// ...
/// lock = co_await lock.mutex()->co_scoped_lock_shared();
template <typename Mutex>
class FOLLY_NODISCARD SharedLock {
public:
SharedLock() noexcept : mutex_(nullptr), locked_(false) {}
explicit SharedLock(Mutex& mutex, std::defer_lock_t) noexcept
: mutex_(std::addressof(mutex)), locked_(false) {}
explicit SharedLock(Mutex& mutex, std::adopt_lock_t) noexcept
: mutex_(std::addressof(mutex)), locked_(true) {}
explicit SharedLock(Mutex& mutex, std::try_to_lock_t) noexcept(
noexcept(mutex.try_lock_shared()))
: mutex_(std::addressof(mutex)), locked_(mutex.try_lock_shared()) {}
SharedLock(SharedLock&& other) noexcept
: mutex_(std::exchange(other.mutex_, nullptr)),
locked_(std::exchange(other.locked_, false)) {}
SharedLock(const SharedLock&) = delete;
SharedLock& operator=(const SharedLock&) = delete;
~SharedLock() {
if (locked_) {
mutex_->unlock_shared();
}
}
SharedLock& operator=(SharedLock&& other) noexcept {
SharedLock temp(std::move(other));
swap(temp);
return *this;
}
Mutex* mutex() const noexcept {
return mutex_;
}
Mutex* release() noexcept {
locked_ = false;
return std::exchange(mutex_, nullptr);
}
bool owns_lock() const noexcept {
return locked_;
}
explicit operator bool() const noexcept {
return owns_lock();
}
bool try_lock() noexcept(noexcept(mutex_->try_lock_shared())) {
DCHECK(!locked_);
DCHECK(mutex_ != nullptr);
locked_ = mutex_->try_lock_shared();
return locked_;
}
void unlock() noexcept(noexcept(mutex_->unlock_shared())) {
DCHECK(locked_);
locked_ = false;
mutex_->unlock_shared();
}
void swap(SharedLock& other) noexcept {
std::swap(mutex_, other.mutex_);
std::swap(locked_, other.locked_);
}
private:
Mutex* mutex_;
bool locked_;
};
} // namespace coro
} // namespace folly
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/SharedMutex.h>
using namespace folly::coro;
SharedMutexFair::~SharedMutexFair() {
assert(state_->lockedFlagAndReaderCount_ == kUnlocked);
assert(state_->waitersHead_ == nullptr);
}
bool SharedMutexFair::try_lock() noexcept {
auto lock = state_.contextualLock();
if (lock->lockedFlagAndReaderCount_ == kUnlocked) {
lock->lockedFlagAndReaderCount_ = kExclusiveLockFlag;
return true;
}
return false;
}
bool SharedMutexFair::try_lock_shared() noexcept {
auto lock = state_.contextualLock();
if (lock->lockedFlagAndReaderCount_ == kUnlocked ||
(lock->lockedFlagAndReaderCount_ >= kSharedLockCountIncrement &&
lock->waitersHead_ == nullptr)) {
lock->lockedFlagAndReaderCount_ += kSharedLockCountIncrement;
return true;
}
return false;
}
void SharedMutexFair::unlock() noexcept {
LockAwaiterBase* awaitersToResume = nullptr;
{
auto lockedState = state_.contextualLock();
assert(lockedState->lockedFlagAndReaderCount_ == kExclusiveLockFlag);
awaitersToResume = unlockOrGetNextWaitersToResume(*lockedState);
}
resumeWaiters(awaitersToResume);
}
void SharedMutexFair::unlock_shared() noexcept {
LockAwaiterBase* awaitersToResume = nullptr;
{
auto lockedState = state_.contextualLock();
assert(lockedState->lockedFlagAndReaderCount_ >= kSharedLockCountIncrement);
lockedState->lockedFlagAndReaderCount_ -= kSharedLockCountIncrement;
if (lockedState->lockedFlagAndReaderCount_ != kUnlocked) {
return;
}
awaitersToResume = unlockOrGetNextWaitersToResume(*lockedState);
}
resumeWaiters(awaitersToResume);
}
SharedMutexFair::LockAwaiterBase*
SharedMutexFair::unlockOrGetNextWaitersToResume(
SharedMutexFair::State& state) noexcept {
auto* head = state.waitersHead_;
if (head != nullptr) {
if (head->lockType_ == LockType::EXCLUSIVE) {
state.waitersHead_ = std::exchange(head->nextAwaiter_, nullptr);
state.lockedFlagAndReaderCount_ = kExclusiveLockFlag;
} else {
std::size_t newState = kSharedLockCountIncrement;
// Scan for a run of SHARED lock types.
auto* last = head;
auto* next = last->nextAwaiter_;
while (next != nullptr && next->lockType_ == LockType::SHARED) {
last = next;
next = next->nextAwaiter_;
newState += kSharedLockCountIncrement;
}
last->nextAwaiter_ = nullptr;
state.lockedFlagAndReaderCount_ = newState;
state.waitersHead_ = next;
}
if (state.waitersHead_ == nullptr) {
state.waitersTailNext_ = &state.waitersHead_;
}
} else {
state.lockedFlagAndReaderCount_ = kUnlocked;
}
return head;
}
void SharedMutexFair::resumeWaiters(LockAwaiterBase* awaiters) noexcept {
while (awaiters != nullptr) {
std::exchange(awaiters, awaiters->nextAwaiter_)->resume();
}
}
#endif
This diff is collapsed.
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/SharedMutex.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
#include <mutex>
using namespace folly;
TEST(SharedMutex, TryLock) {
coro::SharedMutex m;
CHECK(m.try_lock());
CHECK(!m.try_lock());
CHECK(!m.try_lock_shared());
m.unlock();
CHECK(m.try_lock_shared());
CHECK(!m.try_lock());
CHECK(m.try_lock_shared());
CHECK(!m.try_lock());
m.unlock_shared();
CHECK(!m.try_lock());
CHECK(m.try_lock_shared());
m.unlock_shared();
m.unlock_shared();
CHECK(m.try_lock());
m.unlock();
}
TEST(SharedMutex, ManualLockAsync) {
coro::SharedMutex mutex;
int value = 0;
auto makeReaderTask = [&](coro::Baton& b) -> coro::Task<int> {
co_await mutex.co_lock_shared();
int valueCopy = value;
co_await b;
mutex.unlock_shared();
co_return valueCopy;
};
auto makeWriterTask = [&](coro::Baton& b) -> coro::Task<void> {
co_await mutex.co_lock();
co_await b;
value += 1;
mutex.unlock();
};
auto& executor = InlineExecutor::instance();
{
coro::Baton b1;
coro::Baton b2;
coro::Baton b3;
coro::Baton b4;
coro::Baton b5;
auto r1 = makeReaderTask(b1).scheduleOn(&executor).start();
auto r2 = makeReaderTask(b2).scheduleOn(&executor).start();
auto w1 = makeWriterTask(b3).scheduleOn(&executor).start();
auto w2 = makeWriterTask(b4).scheduleOn(&executor).start();
auto r3 = makeReaderTask(b5).scheduleOn(&executor).start();
b1.post();
CHECK_EQ(0, std::move(r1).get());
b2.post();
CHECK_EQ(0, std::move(r2).get());
b3.post();
CHECK_EQ(1, value);
b4.post();
CHECK_EQ(2, value);
// This reader should have had to wait for the prior two write locks
// to complete before it acquired the read-lock.
b5.post();
CHECK_EQ(2, std::move(r3).get());
}
}
TEST(SharedMutex, ScopedLockAsync) {
coro::SharedMutex mutex;
int value = 0;
auto makeReaderTask = [&](coro::Baton& b) -> coro::Task<int> {
auto lock = co_await mutex.co_scoped_lock_shared();
co_await b;
co_return value;
};
auto makeWriterTask = [&](coro::Baton& b) -> coro::Task<void> {
auto lock = co_await mutex.co_scoped_lock();
co_await b;
value += 1;
};
auto& executor = InlineExecutor::instance();
{
coro::Baton b1;
coro::Baton b2;
coro::Baton b3;
coro::Baton b4;
coro::Baton b5;
auto r1 = makeReaderTask(b1).scheduleOn(&executor).start();
auto r2 = makeReaderTask(b2).scheduleOn(&executor).start();
auto w1 = makeWriterTask(b3).scheduleOn(&executor).start();
auto w2 = makeWriterTask(b4).scheduleOn(&executor).start();
auto r3 = makeReaderTask(b5).scheduleOn(&executor).start();
b1.post();
CHECK_EQ(0, std::move(r1).get());
b2.post();
CHECK_EQ(0, std::move(r2).get());
b3.post();
CHECK_EQ(1, value);
b4.post();
CHECK_EQ(2, value);
// This reader should have had to wait for the prior two write locks
// to complete before it acquired the read-lock.
b5.post();
CHECK_EQ(2, std::move(r3).get());
}
}
TEST(SharedMutex, ThreadSafety) {
// Spin up a thread-pool with 3 threads and 6 coroutines
// (2 writers, 4 readers) that are constantly spinning in a loop reading
// and modifying some shared state.
CPUThreadPoolExecutor threadPool{
3, std::make_shared<NamedThreadFactory>("TestThreadPool")};
static constexpr int iterationCount = 100'000;
coro::SharedMutex mutex;
int value1 = 0;
int value2 = 0;
auto makeWriterTask = [&]() -> coro::Task<void> {
for (int i = 0; i < iterationCount; ++i) {
auto lock = co_await mutex.co_scoped_lock();
++value1;
++value2;
}
};
auto makeReaderTask = [&]() -> coro::Task<void> {
for (int i = 0; i < iterationCount; ++i) {
auto lock = co_await mutex.co_scoped_lock_shared();
CHECK_EQ(value1, value2);
}
};
auto w1 = makeWriterTask().scheduleOn(&threadPool).start();
auto w2 = makeWriterTask().scheduleOn(&threadPool).start();
auto r1 = makeReaderTask().scheduleOn(&threadPool).start();
auto r2 = makeReaderTask().scheduleOn(&threadPool).start();
auto r3 = makeReaderTask().scheduleOn(&threadPool).start();
auto r4 = makeReaderTask().scheduleOn(&threadPool).start();
std::move(w1).get();
std::move(w2).get();
std::move(r1).get();
std::move(r2).get();
std::move(r3).get();
std::move(r4).get();
CHECK_EQ(value1, 2 * iterationCount);
CHECK_EQ(value2, 2 * iterationCount);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment