Commit ae548cc0 authored by Lewis Baker's avatar Lewis Baker Committed by Facebook Github Bot

Add folly::coro::Baton implementation

Summary:
Adds a fundamental thread-synchronisation primitive for coroutines
that allows one or more coroutines to suspend execution until some thread
signals them.

This differs from the folly::fibers::Baton implementation in that it
does not expose a blocking wait API, only an asynchronous wait API that
must be waited-on from within a coroutine. This implementation also
permits multiple coroutines concurrently waiting on the baton.

Example usage:
```c++
folly::coro::Baton baton;
std::string sharedValue;

folly::coro::Task<void> consumer()
{
  // Wait until the baton is posted.
  co_await baton.waitAsync();

  // Now safe to read shared state.
  std::cout << sharedValue << std::cout;
}

void producer()
{
  // Write to shared state
  sharedValue = "some result";

  // Publish the value by 'posting' the baton.
  // This will resume the consumer if it was currently suspended.
  baton.post();
}
```

Reviewed By: andriigrynenko

Differential Revision: D9208955

fbshipit-source-id: 07aa372b4c6b90dc0565763add716e591d312c65
parent ffbc9664
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/Baton.h>
#include <cassert>
using namespace folly::coro;
Baton::~Baton() {
// Should not be any waiting coroutines when the baton is destruced.
// Caller should ensure the baton is posted before destructing.
assert(
state_.load(std::memory_order_relaxed) == static_cast<void*>(this) ||
state_.load(std::memory_order_relaxed) == nullptr);
}
void Baton::post() noexcept {
void* signalledState = static_cast<void*>(this);
void* oldValue = state_.exchange(signalledState, std::memory_order_acq_rel);
if (oldValue != signalledState && oldValue != nullptr) {
// We are the first thread to set the state to signalled and there is
// a waiting coroutine. We are responsible for resuming it.
WaitOperation* awaiter = static_cast<WaitOperation*>(oldValue);
awaiter->awaitingCoroutine_.resume();
}
}
bool Baton::waitImpl(WaitOperation* awaiter) const noexcept {
void* oldValue = nullptr;
if (!state_.compare_exchange_strong(
oldValue,
static_cast<void*>(awaiter),
std::memory_order_release,
std::memory_order_acquire)) {
// If the compare-exchange fails it should be because the baton was
// set to the signalled state. If this not the case then this could
// indicate that there are two awaiting coroutines.
assert(oldValue == static_cast<const void*>(this));
return false;
}
return true;
}
#endif // FOLLY_HAS_COROUTINES
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <experimental/coroutine>
namespace folly {
namespace coro {
/// A baton is a synchronisation primitive for coroutines that allows one
/// coroutine to co_await the baton and suspend until the baton is posted
/// by some other thread via a call to .post().
///
/// The Baton supports multiple awaiting coroutines at a time. When the
/// baton is posted, all awaiting coroutines are resumed.
///
/// Example usage:
///
/// folly::coro::Baton baton;
/// std::string sharedValue;
///
/// folly::coro::Task<void> consumer()
/// {
/// // Wait until the baton is posted.
/// co_await baton.waitAsync().via(executor);
///
/// // Now safe to read shared state.
/// std::cout << sharedValue << std::cout;
/// }
///
/// void producer()
/// {
/// // Write to shared state
/// sharedValue = "some result";
///
/// // Publish the value by 'posting' the baton.
/// // This will resume the consumer if it was currently suspended.
/// baton.post();
/// }
class Baton {
public:
class WaitOperation;
/// Initialise the Baton to either the signalled or non-signalled state.
explicit Baton(bool initiallySignalled = false) noexcept;
~Baton();
/// Query whether the Baton is currently in the signalled state.
bool ready() const noexcept;
/// Asynchronously wait for the Baton to enter the signalled state.
///
/// The returned object must be co_awaited from a coroutine. If the Baton
/// is already signalled then the awaiting coroutine will continue without
/// suspending. Otherwise, if the Baton is not yet signalled then the
/// awaiting coroutine will suspend execution and will be resumed when some
/// thread later calls post().
///
/// You may optionally specify an executor on which to resume executing the
/// awaiting coroutine if the baton was not already in the signalled state
/// by chaining a .via(executor) call. If you do not specify an executor then
/// the behaviour is as if an inline executor was specified.
/// i.e. the coroutine will be resumed inside the call to .post() on the
/// thread that next calls .post().
[[nodiscard]] WaitOperation waitAsync() const noexcept;
/// Set the Baton to the signalled state if it is not already signalled.
///
/// This will resume any coroutines that are currently suspended waiting
/// for the Baton inside 'co_await baton.waitAsync()'.
void post() noexcept;
/// Reset the baton back to the non-signalled state.
///
/// This method is not safe to be called concurrently with any other
/// method on the Baton. The caller must ensure that there are no coroutines
/// currently waiting on the Baton and that there are no threads currently
/// calling .post() when .reset() is called.
void reset() noexcept;
class WaitOperation {
public:
explicit WaitOperation(const Baton& baton) noexcept : baton_(baton) {}
bool await_ready() const noexcept {
return baton_.ready();
}
bool await_suspend(
std::experimental::coroutine_handle<> awaitingCoroutine) noexcept {
awaitingCoroutine_ = awaitingCoroutine;
return baton_.waitImpl(this);
}
void await_resume() noexcept {}
protected:
friend class Baton;
const Baton& baton_;
std::experimental::coroutine_handle<> awaitingCoroutine_;
WaitOperation* next_;
};
private:
// Try to register the awaiter as
bool waitImpl(WaitOperation* awaiter) const noexcept;
// this - Baton is in the signalled/posted state.
// other - Baton is not signalled/posted and this is a pointer to the head
// of a potentially empty linked-list of Awaiter nodes that were
// waiting for the baton to become signalled.
mutable std::atomic<void*> state_;
};
inline Baton::Baton(bool initiallySignalled) noexcept
: state_(initiallySignalled ? static_cast<void*>(this) : nullptr) {}
inline bool Baton::ready() const noexcept {
return state_.load(std::memory_order_acquire) ==
static_cast<const void*>(this);
}
inline Baton::WaitOperation Baton::waitAsync() const noexcept {
return Baton::WaitOperation{*this};
}
inline void Baton::reset() noexcept {
state_.store(nullptr, std::memory_order_relaxed);
}
} // namespace coro
} // namespace folly
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/executors/InlineExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/Future.h>
#include <folly/experimental/coro/Promise.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
using namespace folly;
TEST(Baton, Ready) {
coro::Baton b;
CHECK(!b.ready());
b.post();
CHECK(b.ready());
b.reset();
CHECK(!b.ready());
}
TEST(Baton, InitiallyReady) {
coro::Baton b{true};
CHECK(b.ready());
b.reset();
CHECK(!b.ready());
}
TEST(Baton, AwaitBaton) {
coro::Baton baton;
bool reachedBeforeWait = false;
bool reachedAfterWait = false;
auto makeTask = [&]() -> coro::Task<void> {
reachedBeforeWait = true;
co_await baton.waitAsync();
reachedAfterWait = true;
};
coro::Task<void> t = makeTask();
CHECK(!reachedBeforeWait);
CHECK(!reachedAfterWait);
auto& executor = InlineExecutor::instance();
coro::Future<void> f = via(&executor, std::move(t));
CHECK(reachedBeforeWait);
CHECK(!reachedAfterWait);
baton.post();
CHECK(reachedAfterWait);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment