Commit f8a3d164 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook GitHub Bot

Change folly::fibers::Semaphore to use an intrusive list of waiters

Summary:
Now use an intrusive list structure to store the list of waiters waiting
for the semaphore to be signalled.

This is primarily motivated by the need to support efficient cancellation,
which requires the ability to efficiently remove a specific element from
the waiter list without having to scan the list like we would for the
existing `std::deque`-based list storage.

Another beneift of this change is that it now eliminates memory allocations
that were previously required by the std::deque container.

The `Semaphore::future_wait()` implementation should now also have one fewer
allocations as the allocation of the Baton and the Baton::Waiter are now fused.

Reviewed By: yfeldblum

Differential Revision: D21692257

fbshipit-source-id: 9beff176b250ae3f37df9e2138f93b15d2607353
parent ac31a538
...@@ -73,6 +73,19 @@ using IntrusiveList = boost::intrusive::list< ...@@ -73,6 +73,19 @@ using IntrusiveList = boost::intrusive::list<
using SafeIntrusiveListHook = boost::intrusive::list_member_hook< using SafeIntrusiveListHook = boost::intrusive::list_member_hook<
boost::intrusive::link_mode<boost::intrusive::safe_link>>; boost::intrusive::link_mode<boost::intrusive::safe_link>>;
/**
* A safe intrusive list.
*
* This is like IntrusiveList but always uses the safe-link hook which ensures
* that the hook is initialised to an unlinked state on construction and reset
* an unlinked state upon removing it from a list.
*/
template <typename T, SafeIntrusiveListHook T::*PtrToMember>
using SafeIntrusiveList = boost::intrusive::list<
T,
boost::intrusive::member_hook<T, SafeIntrusiveListHook, PtrToMember>,
boost::intrusive::constant_time_size<false>>;
/** /**
* An intrusive list with const-time size() method. * An intrusive list with const-time size() method.
* *
......
...@@ -20,7 +20,7 @@ namespace folly { ...@@ -20,7 +20,7 @@ namespace folly {
namespace fibers { namespace fibers {
bool Semaphore::signalSlow() { bool Semaphore::signalSlow() {
Baton* waiter = nullptr; Waiter* waiter = nullptr;
{ {
// If we signalled a release, notify the waitlist // If we signalled a release, notify the waitlist
auto waitListLock = waitList_.wlock(); auto waitListLock = waitList_.wlock();
...@@ -38,13 +38,13 @@ bool Semaphore::signalSlow() { ...@@ -38,13 +38,13 @@ bool Semaphore::signalSlow() {
testVal, testVal + 1, std::memory_order_relaxed)); testVal, testVal + 1, std::memory_order_relaxed));
return true; return true;
} }
waiter = waitList.front(); waiter = &waitList.front();
waitList.pop(); waitList.pop_front();
} }
// Trigger waiter if there is one // Trigger waiter if there is one
// Do it after releasing the waitList mutex, in case the waiter // Do it after releasing the waitList mutex, in case the waiter
// eagerly calls signal // eagerly calls signal
waiter->post(); waiter->baton.post();
return true; return true;
} }
...@@ -64,7 +64,7 @@ void Semaphore::signal() { ...@@ -64,7 +64,7 @@ void Semaphore::signal() {
std::memory_order_acquire)); std::memory_order_acquire));
} }
bool Semaphore::waitSlow(folly::fibers::Baton& waitBaton) { bool Semaphore::waitSlow(Waiter& waiter) {
// Slow path, create a baton and acquire a mutex to update the wait list // Slow path, create a baton and acquire a mutex to update the wait list
{ {
auto waitListLock = waitList_.wlock(); auto waitListLock = waitList_.wlock();
...@@ -75,9 +75,10 @@ bool Semaphore::waitSlow(folly::fibers::Baton& waitBaton) { ...@@ -75,9 +75,10 @@ bool Semaphore::waitSlow(folly::fibers::Baton& waitBaton) {
return false; return false;
} }
// prepare baton and add to queue // prepare baton and add to queue
waitList.push(&waitBaton); waitList.push_back(waiter);
assert(!waitList.empty());
} }
// Signal to caller that we managed to push a baton // Signal to caller that we managed to push a waiter
return true; return true;
} }
...@@ -85,11 +86,11 @@ void Semaphore::wait() { ...@@ -85,11 +86,11 @@ void Semaphore::wait() {
auto oldVal = tokens_.load(std::memory_order_acquire); auto oldVal = tokens_.load(std::memory_order_acquire);
do { do {
while (oldVal == 0) { while (oldVal == 0) {
folly::fibers::Baton waitBaton; Waiter waiter;
// If waitSlow fails it is because the token is non-zero by the time // If waitSlow fails it is because the token is non-zero by the time
// the lock is taken, so we can just continue round the loop // the lock is taken, so we can just continue round the loop
if (waitSlow(waitBaton)) { if (waitSlow(waiter)) {
waitBaton.wait(); waiter.baton.wait();
return; return;
} }
oldVal = tokens_.load(std::memory_order_acquire); oldVal = tokens_.load(std::memory_order_acquire);
...@@ -101,11 +102,11 @@ void Semaphore::wait() { ...@@ -101,11 +102,11 @@ void Semaphore::wait() {
std::memory_order_acquire)); std::memory_order_acquire));
} }
bool Semaphore::try_wait(Baton& waitBaton) { bool Semaphore::try_wait(Waiter& waiter) {
auto oldVal = tokens_.load(std::memory_order_acquire); auto oldVal = tokens_.load(std::memory_order_acquire);
do { do {
while (oldVal == 0) { while (oldVal == 0) {
if (waitSlow(waitBaton)) { if (waitSlow(waiter)) {
return false; return false;
} }
oldVal = tokens_.load(std::memory_order_acquire); oldVal = tokens_.load(std::memory_order_acquire);
...@@ -124,11 +125,11 @@ coro::Task<void> Semaphore::co_wait() { ...@@ -124,11 +125,11 @@ coro::Task<void> Semaphore::co_wait() {
auto oldVal = tokens_.load(std::memory_order_acquire); auto oldVal = tokens_.load(std::memory_order_acquire);
do { do {
while (oldVal == 0) { while (oldVal == 0) {
folly::fibers::Baton waitBaton; Waiter waiter;
// If waitSlow fails it is because the token is non-zero by the time // If waitSlow fails it is because the token is non-zero by the time
// the lock is taken, so we can just continue round the loop // the lock is taken, so we can just continue round the loop
if (waitSlow(waitBaton)) { if (waitSlow(waiter)) {
co_await waitBaton; co_await waiter.baton;
co_return; co_return;
} }
oldVal = tokens_.load(std::memory_order_acquire); oldVal = tokens_.load(std::memory_order_acquire);
...@@ -144,15 +145,36 @@ coro::Task<void> Semaphore::co_wait() { ...@@ -144,15 +145,36 @@ coro::Task<void> Semaphore::co_wait() {
#if FOLLY_FUTURE_USING_FIBER #if FOLLY_FUTURE_USING_FIBER
namespace {
class FutureWaiter final : public fibers::Baton::Waiter {
public:
FutureWaiter() {
semaphoreWaiter.baton.setWaiter(*this);
}
void post() override {
std::unique_ptr<FutureWaiter> destroyOnReturn{this};
promise.setValue();
}
Semaphore::Waiter semaphoreWaiter;
folly::Promise<Unit> promise;
};
} // namespace
SemiFuture<Unit> Semaphore::future_wait() { SemiFuture<Unit> Semaphore::future_wait() {
auto oldVal = tokens_.load(std::memory_order_acquire); auto oldVal = tokens_.load(std::memory_order_acquire);
do { do {
while (oldVal == 0) { while (oldVal == 0) {
auto waitBaton = std::make_unique<fibers::Baton>(); auto batonWaiterPtr = std::make_unique<FutureWaiter>();
// If waitSlow fails it is because the token is non-zero by the time // If waitSlow fails it is because the token is non-zero by the time
// the lock is taken, so we can just continue round the loop // the lock is taken, so we can just continue round the loop
if (waitSlow(*waitBaton)) { auto future = batonWaiterPtr->promise.getSemiFuture();
return futures::wait(std::move(waitBaton)); if (waitSlow(batonWaiterPtr->semaphoreWaiter)) {
(void)batonWaiterPtr.release();
return future;
} }
oldVal = tokens_.load(std::memory_order_acquire); oldVal = tokens_.load(std::memory_order_acquire);
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#pragma once #pragma once
#include <folly/IntrusiveList.h>
#include <folly/Synchronized.h> #include <folly/Synchronized.h>
#include <folly/fibers/Baton.h> #include <folly/fibers/Baton.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
...@@ -50,13 +51,24 @@ class Semaphore { ...@@ -50,13 +51,24 @@ class Semaphore {
*/ */
void wait(); void wait();
struct Waiter {
Waiter() noexcept {}
// The baton will be signalled when this waiter acquires the semaphore.
Baton baton;
private:
friend Semaphore;
folly::SafeIntrusiveListHook hook_;
};
/** /**
* Try to wait on the semaphore. * Try to wait on the semaphore.
* Return true on success. * Return true on success.
* On failure, the passed baton is enqueued, it will be posted once the * On failure, the passed waiter is enqueued, its baton will be posted once
* semaphore has capacity. Caller is responsible to wait then signal. * semaphore has capacity. Caller is responsible to wait then signal.
*/ */
bool try_wait(Baton& waitBaton); bool try_wait(Waiter& waiter);
#if FOLLY_HAS_COROUTINES #if FOLLY_HAS_COROUTINES
...@@ -79,13 +91,16 @@ class Semaphore { ...@@ -79,13 +91,16 @@ class Semaphore {
size_t getCapacity() const; size_t getCapacity() const;
private: private:
bool waitSlow(folly::fibers::Baton& waitBaton); bool waitSlow(Waiter& waiter);
bool signalSlow(); bool signalSlow();
size_t capacity_; size_t capacity_;
// Atomic counter // Atomic counter
std::atomic<int64_t> tokens_; std::atomic<int64_t> tokens_;
folly::Synchronized<std::queue<folly::fibers::Baton*>> waitList_;
using WaiterList = folly::SafeIntrusiveList<Waiter, &Waiter::hook_>;
folly::Synchronized<WaiterList> waitList_;
}; };
} // namespace fibers } // namespace fibers
......
...@@ -1737,10 +1737,10 @@ TEST(FiberManager, semaphore) { ...@@ -1737,10 +1737,10 @@ TEST(FiberManager, semaphore) {
#endif #endif
break; break;
case 2: { case 2: {
Baton baton; Semaphore::Waiter waiter;
bool acquired = sem.try_wait(baton); bool acquired = sem.try_wait(waiter);
if (!acquired) { if (!acquired) {
baton.wait(); waiter.baton.wait();
} }
break; break;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment