Commit 77e7e33f authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

SemiFuture wait() and timed_wait() without TimedDrivableExecutor

Reviewed By: yfeldblum

Differential Revision: D6893567

fbshipit-source-id: 054e109e3c3c3b2b20ac6ffe8f2ee05335e3dd89
parent 7c1e9ed8
......@@ -379,26 +379,164 @@ FutureBase<T>::thenImplementation(
return f;
}
struct TimedDrivableExecutorWrapperMaker;
class TimedDrivableExecutorWrapper : public TimedDrivableExecutor {
/**
* Defer work until executor is actively boosted.
*
* NOTE: that this executor is a private implementation detail belonging to the
* Folly Futures library and not intended to be used elsewhere. It is designed
* specifically for the use case of deferring work on a SemiFuture. It is NOT
* thread safe. Please do not use for any other purpose without great care.
*/
class DeferredExecutor final : public Executor {
public:
~TimedDrivableExecutorWrapper() = default;
void add(Func func) override {
auto state = state_.load(std::memory_order_acquire);
if (state == State::HAS_FUNCTION) {
// This means we are inside runAndDestroy, just run the function inline
func();
return;
}
// Returns a KeepAlive that owns the executor and a pointer to the object
// cast to TimedDrivableExecutor that can be passed into operations
static std::pair<KeepAlive, TimedDrivableExecutor*> get();
func_ = std::move(func);
std::shared_ptr<FutureBatonType> baton;
do {
if (state == State::HAS_EXECUTOR) {
state_.store(State::HAS_FUNCTION, std::memory_order_release);
executor_->add([this] { this->runAndDestroy(); });
return;
}
if (state == State::DETACHED) {
// Function destructor may trigger more functions to be added to the
// Executor. They should be run inline.
state = State::HAS_FUNCTION;
func_ = nullptr;
delete this;
return;
}
if (state == State::HAS_BATON) {
baton = baton_.copy();
}
assert(state == State::EMPTY || state == State::HAS_BATON);
} while (!state_.compare_exchange_weak(
state,
State::HAS_FUNCTION,
std::memory_order_release,
std::memory_order_acquire));
protected:
TimedDrivableExecutorWrapper() = default;
// After compare_exchange_weak is complete, we can no longer use this
// object since it may be destroyed from another thread.
if (baton) {
baton->post();
}
}
void keepAliveAcquire() override;
void setExecutor(folly::Executor* executor) {
executor_ = executor;
auto state = state_.load(std::memory_order_acquire);
do {
if (state == State::HAS_FUNCTION) {
executor_->add([this] { this->runAndDestroy(); });
return;
}
assert(state == State::EMPTY);
} while (!state_.compare_exchange_weak(
state,
State::HAS_EXECUTOR,
std::memory_order_release,
std::memory_order_acquire));
}
void keepAliveRelease() override;
void runAndDestroy() {
assert(state_.load(std::memory_order_relaxed) == State::HAS_FUNCTION);
func_();
delete this;
}
KeepAlive getKeepAliveToken() override;
void detach() {
auto state = state_.load(std::memory_order_acquire);
do {
if (state == State::HAS_FUNCTION) {
// Function destructor may trigger more functions to be added to the
// Executor. They should be run inline.
func_ = nullptr;
delete this;
return;
}
std::atomic<ssize_t> keepAliveCount_{0};
friend struct TimedDrivableExecutorWrapperMaker;
assert(state == State::EMPTY);
} while (!state_.compare_exchange_weak(
state,
State::DETACHED,
std::memory_order_release,
std::memory_order_acquire));
}
void wait() {
auto state = state_.load(std::memory_order_acquire);
auto baton = std::make_shared<FutureBatonType>();
baton_ = baton;
do {
if (state == State::HAS_FUNCTION) {
return;
}
assert(state == State::EMPTY);
} while (!state_.compare_exchange_weak(
state,
State::HAS_BATON,
std::memory_order_release,
std::memory_order_acquire));
baton->wait();
assert(state_.load(std::memory_order_relaxed) == State::HAS_FUNCTION);
}
bool wait(Duration duration) {
auto state = state_.load(std::memory_order_acquire);
auto baton = std::make_shared<FutureBatonType>();
baton_ = baton;
do {
if (state == State::HAS_FUNCTION) {
return true;
}
assert(state == State::EMPTY);
} while (!state_.compare_exchange_weak(
state,
State::HAS_BATON,
std::memory_order_release,
std::memory_order_acquire));
if (baton->try_wait_for(duration)) {
assert(state_.load(std::memory_order_relaxed) == State::HAS_FUNCTION);
return true;
}
state = state_.load(std::memory_order_acquire);
do {
if (state == State::HAS_FUNCTION) {
return true;
}
assert(state == State::HAS_BATON);
} while (!state_.compare_exchange_weak(
state,
State::EMPTY,
std::memory_order_release,
std::memory_order_acquire));
return false;
}
private:
enum class State {
EMPTY,
HAS_FUNCTION,
HAS_EXECUTOR,
HAS_BATON,
DETACHED,
};
std::atomic<State> state_{State::EMPTY};
Func func_;
Executor* executor_;
folly::Synchronized<std::shared_ptr<FutureBatonType>> baton_;
};
} // namespace detail
} // namespace futures
......@@ -472,6 +610,33 @@ SemiFuture<T> SemiFuture<T>::makeEmpty() {
return SemiFuture<T>(futures::detail::EmptyConstruct{});
}
template <class T>
typename SemiFuture<T>::DeferredExecutor* SemiFuture<T>::getDeferredExecutor()
const {
if (auto executor = this->core_->getExecutor()) {
assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
return static_cast<DeferredExecutor*>(executor);
}
return nullptr;
}
template <class T>
void SemiFuture<T>::releaseDeferredExecutor(corePtr core) {
if (!core) {
return;
}
if (auto executor = core->getExecutor()) {
assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
static_cast<DeferredExecutor*>(executor)->detach();
core->setExecutor(nullptr);
}
}
template <class T>
SemiFuture<T>::~SemiFuture() {
releaseDeferredExecutor(this->core_);
}
template <class T>
SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
: futures::detail::FutureBase<T>(std::move(other)) {}
......@@ -487,12 +652,14 @@ SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
template <class T>
SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
releaseDeferredExecutor(this->core_);
this->assign(other);
return *this;
}
template <class T>
SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
releaseDeferredExecutor(this->core_);
this->assign(other);
// SemiFuture should not have an executor on construction
if (this->core_) {
......@@ -508,22 +675,14 @@ inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
throwNoExecutor();
}
// If current executor is deferred, try to set the new executor on it to
// ensure boost blocking happens correctly
auto oldExecutor = this->getExecutor();
if (oldExecutor && executor && (executor != oldExecutor)) {
// We know in a SemiFuture that if we have an executor it should be
// DeferredExecutor. Verify this in debug mode.
DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
if (auto defExecutor = static_cast<DeferredExecutor*>(oldExecutor)) {
defExecutor->setExecutor(executor);
if (auto deferredExecutor = getDeferredExecutor()) {
deferredExecutor->setExecutor(executor);
}
}
this->setExecutor(executor, priority);
auto newFuture = Future<T>(this->core_);
this->core_ = nullptr;
newFuture.setExecutor(executor, priority);
return newFuture;
}
......@@ -531,30 +690,17 @@ template <class T>
template <typename F>
SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
SemiFuture<T>::defer(F&& func) && {
// If we already have a deferred executor, use it, otherwise create one
auto defKeepAlive = this->getExecutor()
? this->getExecutor()->getKeepAliveToken()
: DeferredExecutor::create();
auto e = defKeepAlive.get();
// We know in a SemiFuture that if we have an executor it should be
// DeferredExecutor (either it was that way before, or we just created it).
// Verify this in debug mode.
DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
// Convert to a folly::future with a deferred executor
// Will be low-cost if this is not a new executor as via optimises for that
// case
auto sf =
std::move(*this)
.via(e)
// Then add the work, with a wrapper function that captures the
// keepAlive so the executor is destroyed at the right time.
.then(
DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
// Finally, convert back o a folly::SemiFuture to hide the executor
.semi();
DeferredExecutor* deferredExecutor = getDeferredExecutor();
if (!deferredExecutor) {
deferredExecutor = new DeferredExecutor();
this->setExecutor(deferredExecutor);
}
auto sf = Future<T>(this->core_).then(std::forward<F>(func)).semi();
this->core_ = nullptr;
// Carry deferred executor through chain as constructor from Future will
// nullify it
sf.setExecutor(e);
sf.setExecutor(deferredExecutor);
return sf;
}
......@@ -1441,52 +1587,71 @@ void waitViaImpl(
template <class T>
SemiFuture<T>& SemiFuture<T>::wait() & {
if (auto deferredExecutor = getDeferredExecutor()) {
deferredExecutor->wait();
deferredExecutor->runAndDestroy();
this->core_->setExecutor(nullptr);
} else {
futures::detail::waitImpl(*this);
}
return *this;
}
template <class T>
SemiFuture<T>&& SemiFuture<T>::wait() && {
futures::detail::waitImpl(*this);
return std::move(*this);
return std::move(wait());
}
template <class T>
SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
*this = std::move(*this).via(tde.second).waitVia(tde.second, dur).semi();
if (auto deferredExecutor = getDeferredExecutor()) {
if (deferredExecutor->wait(dur)) {
deferredExecutor->runAndDestroy();
this->core_->setExecutor(nullptr);
}
} else {
futures::detail::waitImpl(*this, dur);
}
return *this;
}
template <class T>
SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
*this = std::move(*this).via(tde.second).waitVia(tde.second, dur).semi();
return std::move(*this);
return std::move(wait(dur));
}
template <class T>
T SemiFuture<T>::get() && {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getVia(tde.second);
return std::move(*this).getTry().value();
}
template <class T>
T SemiFuture<T>::get(Duration dur) && {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getVia(tde.second, dur);
return std::move(*this).getTry(dur).value();
}
template <class T>
Try<T> SemiFuture<T>::getTry() && {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getTryVia(tde.second);
wait();
auto future = folly::Future<T>(this->core_);
this->core_ = nullptr;
return std::move(std::move(future).getTry());
}
template <class T>
Try<T> SemiFuture<T>::getTry(Duration dur) && {
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getTryVia(tde.second, dur);
wait(dur);
if (auto deferredExecutor = getDeferredExecutor()) {
deferredExecutor->detach();
}
this->core_->setExecutor(nullptr);
auto future = folly::Future<T>(this->core_);
this->core_ = nullptr;
if (!future.isReady()) {
throwTimedOut();
}
return std::move(std::move(future).getTry());
}
template <class T>
......
......@@ -165,141 +165,7 @@ struct Extract<R (&)(Args...)> {
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
/**
* Defer work until executor is chained.
*
* NOTE: that this executor is a private implementation detail belonging to the
* Folly Futures library and not intended to be used elsewhere. It is designed
* specifically for the use case of deferring work on a SemiFuture. It is NOT
* thread safe. Please do not use for any other purpose without great care.
*/
class DeferredExecutor final : public Executor {
public:
template <typename Class, typename F>
struct DeferredWorkWrapper;
/**
* Work wrapper class to capture the keepalive and forward the argument
* list to the captured function.
*/
template <typename F, typename R, typename... Args>
struct DeferredWorkWrapper<F, R (F::*)(Args...) const> {
R operator()(Args... args) {
return func(std::forward<Args>(args)...);
}
Executor::KeepAlive a;
F func;
};
/**
* Construction is private to ensure that creation and deletion are
* symmetric
*/
static KeepAlive create() {
std::unique_ptr<futures::detail::DeferredExecutor> devb{
new futures::detail::DeferredExecutor{}};
auto keepAlive = devb->getKeepAliveToken();
devb.release();
return keepAlive;
}
/// Enqueue a function to executed by this executor. This is not thread-safe.
void add(Func func) override {
// We should never have a function here already. Either we are RUNNING,
// in which case we are on one and it should have been removed from the
// executor, or we are not in which case it should be the first.
assert(!func_);
// If we are already running, must be reentrant. Just call func.
if (state_.load() == State::RUNNING) {
func();
return;
}
// If we already have a function, wrap and chain. Otherwise assign.
func_ = std::move(func);
State expected = State::NEW;
// If the state is new, then attempt to change it to HAS_CALLBACK, set the
// executor and return.
if (state_.load() == expected) {
if (state_.compare_exchange_strong(expected, State::HAS_CALLBACK)) {
return;
}
}
// If we have the executor set, we now have the callback too.
// Enqueue the callback on the executor and change to the RUNNING state.
enqueueWork();
}
void setExecutor(Executor* exec) {
executorKeepAlive_ = exec->getKeepAliveToken();
State expected = State::NEW;
// If the state is new, then attempt to change it to HAS_EXECUTOR, set the
// executor and return.
if (state_.load() == expected) {
if (state_.compare_exchange_strong(expected, State::HAS_EXECUTOR)) {
return;
}
}
// If we have the callback set, we now have the executor too.
// Enqueue the callback on the executor and change to the RUNNING state.
enqueueWork();
}
KeepAlive getKeepAliveToken() override {
keepAliveAcquire();
return makeKeepAlive();
}
~DeferredExecutor() = default;
template <class F>
static auto wrap(Executor::KeepAlive keepAlive, F&& func)
-> DeferredWorkWrapper<F, decltype(&F::operator())> {
return DeferredExecutor::DeferredWorkWrapper<F, decltype(&F::operator())>{
std::move(keepAlive), std::forward<F>(func)};
}
protected:
void keepAliveAcquire() override {
++keepAliveCount_;
}
void keepAliveRelease() override {
releaseAndTryFree();
}
void releaseAndTryFree() {
--keepAliveCount_;
if (keepAliveCount_ == 0) {
delete this;
}
}
private:
enum class State {
NEW,
HAS_EXECUTOR,
HAS_CALLBACK,
RUNNING,
};
Func func_;
ssize_t keepAliveCount_{0};
std::atomic<State> state_{State::NEW};
KeepAlive executorKeepAlive_;
DeferredExecutor() = default;
void enqueueWork() {
DCHECK(func_);
state_.store(State::RUNNING);
executorKeepAlive_.get()->add(std::move(func_));
return;
}
};
class DeferredExecutor;
} // namespace detail
} // namespace futures
......
......@@ -53,52 +53,5 @@ Future<Unit> sleep(Duration dur, Timekeeper* tk) {
return tk->after(dur);
}
namespace detail {
struct TimedDrivableExecutorWrapperTag {};
struct TimedDrivableExecutorWrapperMaker {
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*> operator()() {
std::unique_ptr<futures::detail::TimedDrivableExecutorWrapper> devb{
new futures::detail::TimedDrivableExecutorWrapper{}};
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*> ret{
devb->getKeepAliveToken(),
static_cast<TimedDrivableExecutor*>(devb.get())};
devb.release();
return ret;
}
};
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*>
TimedDrivableExecutorWrapper::get() {
// Thread-local is a pair of a KeepAlive that owns the executor safely
// relative to later created keepalives, and a pre-cast pointer to avoid
// recasting later (which would have to be dynamic from Executor*).
auto& p = folly::SingletonThreadLocal<
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*>,
TimedDrivableExecutorWrapperTag,
TimedDrivableExecutorWrapperMaker>::get();
// Reconstruct pair from a new keepalive token and the pointer because
// the keepalive is not copyable
return {p.second->getKeepAliveToken(), p.second};
}
void TimedDrivableExecutorWrapper::keepAliveAcquire() {
keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
}
void TimedDrivableExecutorWrapper::keepAliveRelease() {
if (keepAliveCount_.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
delete this;
}
}
folly::Executor::KeepAlive TimedDrivableExecutorWrapper::getKeepAliveToken() {
keepAliveAcquire();
return makeKeepAlive();
}
} // namespace detail
} // namespace futures
} // namespace folly
......@@ -204,6 +204,8 @@ class SemiFuture : private futures::detail::FutureBase<T> {
using TimePoint = std::chrono::system_clock::time_point;
public:
~SemiFuture();
static SemiFuture<T> makeEmpty(); // equivalent to moved-from
// Export public interface of FutureBase
......@@ -337,6 +339,10 @@ class SemiFuture : private futures::detail::FutureBase<T> {
explicit SemiFuture(futures::detail::EmptyConstruct) noexcept
: Base(futures::detail::EmptyConstruct{}) {}
DeferredExecutor* getDeferredExecutor() const;
static void releaseDeferredExecutor(corePtr core);
};
template <class T>
......
......@@ -294,10 +294,6 @@ TEST(SemiFuture, SimpleTimedWait) {
sf.wait(std::chrono::milliseconds(100));
EXPECT_FALSE(sf.isReady());
p.setValue();
EXPECT_FALSE(sf.isReady());
// The internals of wait mean that there is an executor in the way. We
// cannot expect that the promise immediately statisfies the future.
sf.wait(std::chrono::milliseconds(100));
EXPECT_TRUE(sf.isReady());
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment