Commit 49cf5372 authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Fix for deferredexecutor behaviour.

Summary: DeferredExecutor suffered from a case where boosting would not work if no work was present. This change modifies is so that on a call to via the next executor will be set in the DeferredExecutor. When add is called the callback will be stored. Whichever happens second actually enqueues the work on the chained executor. get uses via to correctly implement this.

Reviewed By: andriigrynenko

Differential Revision: D6663244

fbshipit-source-id: 74600d511373e0f6f7ca4128f25f3947a4981200
parent 3c49c7d6
......@@ -378,6 +378,28 @@ FutureBase<T>::thenImplementation(
return f;
}
struct TimedDrivableExecutorWrapperMaker;
class TimedDrivableExecutorWrapper : public TimedDrivableExecutor {
public:
~TimedDrivableExecutorWrapper() = default;
// Returns a KeepAlive that owns the executor and a pointer to the object
// cast to TimedDrivableExecutor that can be passed into operations
static std::pair<KeepAlive, TimedDrivableExecutor*> get();
protected:
TimedDrivableExecutorWrapper() = default;
void keepAliveAcquire() override;
void keepAliveRelease() override;
KeepAlive getKeepAliveToken() override;
std::atomic<ssize_t> keepAliveCount_{0};
friend struct TimedDrivableExecutorWrapperMaker;
};
} // namespace detail
} // namespace futures
......@@ -479,19 +501,6 @@ SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
return *this;
}
template <class T>
void SemiFuture<T>::boost_() {
// If a SemiFuture has an executor it should be deferred, so boost it
if (auto e = this->getExecutor()) {
// We know in a SemiFuture that if we have an executor it should be
// DeferredExecutor. Verify this in debug mode.
DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
static_cast<DeferredExecutor*>(e)->boost();
}
}
template <class T>
inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
throwIfInvalid();
......@@ -499,17 +508,15 @@ inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
throwNoExecutor();
}
// If current executor is deferred, boost block to ensure that work
// progresses and is run on the new executor.
// If current executor is deferred, try to set the new executor on it to
// ensure boost blocking happens correctly
auto oldExecutor = this->getExecutor();
if (oldExecutor && executor && (executor != oldExecutor)) {
// We know in a SemiFuture that if we have an executor it should be
// DeferredExecutor. Verify this in debug mode.
DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
if (static_cast<DeferredExecutor*>(oldExecutor)) {
executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
});
if (auto defExecutor = static_cast<DeferredExecutor*>(oldExecutor)) {
defExecutor->setExecutor(executor);
}
}
......@@ -1358,14 +1365,6 @@ Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
namespace futures {
namespace detail {
template <class T>
void doBoost(folly::Future<T>& /* usused */) {}
template <class T>
void doBoost(folly::SemiFuture<T>& f) {
f.boost_();
}
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
......@@ -1375,7 +1374,6 @@ void waitImpl(FutureType& f) {
FutureBatonType baton;
f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
doBoost(f);
baton.wait();
assert(f.isReady());
}
......@@ -1394,7 +1392,6 @@ void waitImpl(FutureType& f, Duration dur) {
promise.setTry(std::move(t));
baton->post();
});
doBoost(f);
f = std::move(ret);
if (baton->try_wait_for(dur)) {
assert(f.isReady());
......@@ -1427,7 +1424,9 @@ void waitViaImpl(
if (f.isReady()) {
return;
}
f = std::move(f).via(e).then([](T&& t) { return std::move(t); });
// Chain operations, ensuring that the executor is kept alive for the duration
f = std::move(f).via(e).then(
[keepAlive = e->getKeepAliveToken()](T&& t) { return std::move(t); });
auto now = std::chrono::steady_clock::now();
auto deadline = now + timeout;
while (!f.isReady() && (now < deadline)) {
......@@ -1454,44 +1453,40 @@ SemiFuture<T>&& SemiFuture<T>::wait() && {
template <class T>
SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
futures::detail::waitImpl(*this, dur);
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
*this = std::move(*this).via(tde.second).waitVia(tde.second, dur).semi();
return *this;
}
template <class T>
SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
futures::detail::waitImpl(*this, dur);
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
*this = std::move(*this).via(tde.second).waitVia(tde.second, dur).semi();
return std::move(*this);
}
template <class T>
T SemiFuture<T>::get() && {
return std::move(wait()).value();
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getVia(tde.second);
}
template <class T>
T SemiFuture<T>::get(Duration dur) && {
wait(dur);
if (this->isReady()) {
return std::move(this->value());
} else {
throwTimedOut();
}
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getVia(tde.second, dur);
}
template <class T>
Try<T> SemiFuture<T>::getTry() && {
return std::move(wait()).result();
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getTryVia(tde.second);
}
template <class T>
Try<T> SemiFuture<T>::getTry(Duration dur) && {
wait(dur);
if (this->isReady()) {
return std::move(this->result());
} else {
throwTimedOut();
}
auto tde = futures::detail::TimedDrivableExecutorWrapper::get();
return std::move(*this).via(tde.second).getTryVia(tde.second, dur);
}
template <class T>
......
......@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// included by Future.h, do not include directly.
......@@ -167,7 +166,7 @@ struct Extract<R (&)(Args...)> {
};
/**
* Defer work until executor is actively boosted.
* Defer work until executor is chained.
*
* NOTE: that this executor is a private implementation detail belonging to the
* Folly Futures library and not intended to be used elsewhere. It is designed
......@@ -207,31 +206,46 @@ class DeferredExecutor final : public Executor {
/// Enqueue a function to executed by this executor. This is not thread-safe.
void add(Func func) override {
// If we already have a function, wrap and chain. Otherwise assign.
if (func_) {
func_ = [oldFunc = std::move(func_), func = std::move(func)]() mutable {
oldFunc();
// We should never have a function here already. Either we are RUNNING,
// in which case we are on one and it should have been removed from the
// executor, or we are not in which case it should be the first.
assert(!func_);
// If we are already running, must be reentrant. Just call func.
if (state_.load() == State::RUNNING) {
func();
};
} else {
return;
}
// If we already have a function, wrap and chain. Otherwise assign.
func_ = std::move(func);
State expected = State::NEW;
// If the state is new, then attempt to change it to HAS_CALLBACK, set the
// executor and return.
if (state_.load() == expected) {
if (state_.compare_exchange_strong(expected, State::HAS_CALLBACK)) {
return;
}
}
// If we have the executor set, we now have the callback too.
// Enqueue the callback on the executor and change to the RUNNING state.
enqueueWork();
}
// Boost is like drive for certain types of deferred work
// Unlike drive it is safe to run on another executor because it
// will only be implemented on deferred-safe executors
void boost() {
// Ensure that the DeferredExecutor outlives its run operation
++keepAliveCount_;
SCOPE_EXIT {
releaseAndTryFree();
};
// Drain the executor
while (auto func = std::move(func_)) {
func();
void setExecutor(Executor* exec) {
executorKeepAlive_ = exec->getKeepAliveToken();
State expected = State::NEW;
// If the state is new, then attempt to change it to HAS_EXECUTOR, set the
// executor and return.
if (state_.load() == expected) {
if (state_.compare_exchange_strong(expected, State::HAS_EXECUTOR)) {
return;
}
}
// If we have the callback set, we now have the executor too.
// Enqueue the callback on the executor and change to the RUNNING state.
enqueueWork();
}
KeepAlive getKeepAliveToken() override {
......@@ -265,10 +279,26 @@ class DeferredExecutor final : public Executor {
}
private:
enum class State {
NEW,
HAS_EXECUTOR,
HAS_CALLBACK,
RUNNING,
};
Func func_;
ssize_t keepAliveCount_{0};
std::atomic<State> state_{State::NEW};
KeepAlive executorKeepAlive_;
DeferredExecutor() = default;
void enqueueWork() {
DCHECK(func_);
state_.store(State::RUNNING);
executorKeepAlive_.get()->add(std::move(func_));
return;
}
};
} // namespace detail
......
......@@ -16,6 +16,7 @@
#include <folly/futures/Future.h>
#include <folly/Likely.h>
#include <folly/SingletonThreadLocal.h>
#include <folly/futures/ThreadWheelTimekeeper.h>
namespace folly {
......@@ -52,5 +53,52 @@ Future<Unit> sleep(Duration dur, Timekeeper* tk) {
return tk->after(dur);
}
namespace detail {
struct TimedDrivableExecutorWrapperTag {};
struct TimedDrivableExecutorWrapperMaker {
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*> operator()() {
std::unique_ptr<futures::detail::TimedDrivableExecutorWrapper> devb{
new futures::detail::TimedDrivableExecutorWrapper{}};
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*> ret{
devb->getKeepAliveToken(),
static_cast<TimedDrivableExecutor*>(devb.get())};
devb.release();
return ret;
}
};
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*>
TimedDrivableExecutorWrapper::get() {
// Thread-local is a pair of a KeepAlive that owns the executor safely
// relative to later created keepalives, and a pre-cast pointer to avoid
// recasting later (which would have to be dynamic from Executor*).
auto& p = folly::SingletonThreadLocal<
std::pair<folly::Executor::KeepAlive, TimedDrivableExecutor*>,
TimedDrivableExecutorWrapperTag,
TimedDrivableExecutorWrapperMaker>::get();
// Reconstruct pair from a new keepalive token and the pointer because
// the keepalive is not copyable
return {p.second->getKeepAliveToken(), p.second};
}
void TimedDrivableExecutorWrapper::keepAliveAcquire() {
keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
}
void TimedDrivableExecutorWrapper::keepAliveRelease() {
if (keepAliveCount_.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
delete this;
}
}
folly::Executor::KeepAlive TimedDrivableExecutorWrapper::getKeepAliveToken() {
keepAliveAcquire();
return makeKeepAlive();
}
} // namespace detail
} // namespace futures
} // namespace folly
......@@ -319,11 +319,6 @@ class SemiFuture : private futures::detail::FutureBase<T> {
SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
defer(F&& func) &&;
// Public as for setCallback_
// Ensure that a boostable executor performs work to chain deferred work
// cleanly
void boost_();
private:
friend class Promise<T>;
template <class>
......
......@@ -24,6 +24,7 @@
#include <algorithm>
#include <atomic>
#include <future>
#include <memory>
#include <numeric>
#include <string>
......@@ -284,7 +285,31 @@ TEST(SemiFuture, SimpleGetTry) {
TEST(SemiFuture, SimpleTimedGet) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
EXPECT_THROW(std::move(sf).get(std::chrono::seconds(1)), TimedOut);
EXPECT_THROW(std::move(sf).get(std::chrono::milliseconds(100)), TimedOut);
}
TEST(SemiFuture, SimpleTimedWait) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
sf.wait(std::chrono::milliseconds(100));
EXPECT_FALSE(sf.isReady());
p.setValue();
EXPECT_FALSE(sf.isReady());
// The internals of wait mean that there is an executor in the way. We
// cannot expect that the promise immediately statisfies the future.
sf.wait(std::chrono::milliseconds(100));
EXPECT_TRUE(sf.isReady());
}
TEST(SemiFuture, SimpleTimedMultipleWait) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
sf.wait(std::chrono::milliseconds(100));
sf.wait(std::chrono::milliseconds(100));
EXPECT_FALSE(sf.isReady());
p.setValue();
sf.wait(std::chrono::milliseconds(100));
EXPECT_TRUE(sf.isReady());
}
TEST(SemiFuture, SimpleTimedGetViaFromSemiFuture) {
......@@ -292,13 +317,14 @@ TEST(SemiFuture, SimpleTimedGetViaFromSemiFuture) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
EXPECT_THROW(
std::move(sf).via(&e2).getVia(&e2, std::chrono::seconds(1)), TimedOut);
std::move(sf).via(&e2).getVia(&e2, std::chrono::milliseconds(100)),
TimedOut);
}
TEST(SemiFuture, SimpleTimedGetTry) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
EXPECT_THROW(std::move(sf).getTry(std::chrono::seconds(1)), TimedOut);
EXPECT_THROW(std::move(sf).getTry(std::chrono::milliseconds(100)), TimedOut);
}
TEST(SemiFuture, SimpleTimedGetTryViaFromSemiFuture) {
......@@ -306,7 +332,8 @@ TEST(SemiFuture, SimpleTimedGetTryViaFromSemiFuture) {
Promise<folly::Unit> p;
auto sf = p.getSemiFuture();
EXPECT_THROW(
std::move(sf).via(&e2).getTryVia(&e2, std::chrono::seconds(1)), TimedOut);
std::move(sf).via(&e2).getTryVia(&e2, std::chrono::milliseconds(100)),
TimedOut);
}
TEST(SemiFuture, SimpleValue) {
......@@ -350,6 +377,79 @@ TEST(SemiFuture, SimpleDefer) {
ASSERT_EQ(innerResult, 17);
}
TEST(SemiFuture, DeferWithDelayedSetValue) {
EventBase e2;
Promise<folly::Unit> p;
auto f = p.getFuture();
auto sf = std::move(f).semi().defer([&]() { return 17; });
// Start thread and have it blocking in the semifuture before we satisfy the
// promise
auto resultF =
std::async(std::launch::async, [&]() { return std::move(sf).get(); });
// Check that future is not already satisfied before setting the promise
// Async task should be blocked on sf.
ASSERT_EQ(
resultF.wait_for(std::chrono::milliseconds(100)),
std::future_status::timeout);
p.setValue();
ASSERT_EQ(resultF.get(), 17);
}
TEST(SemiFuture, DeferWithViaAndDelayedSetValue) {
EventBase e2;
Promise<folly::Unit> p;
auto f = p.getFuture();
auto sf = std::move(f).semi().defer([&]() { return 17; }).via(&e2);
// Start thread and have it blocking in the semifuture before we satisfy the
// promise.
auto resultF =
std::async(std::launch::async, [&]() { return std::move(sf).get(); });
std::thread t([&]() { e2.loopForever(); });
// Check that future is not already satisfied before setting the promise
// Async task should be blocked on sf.
ASSERT_EQ(
resultF.wait_for(std::chrono::milliseconds(100)),
std::future_status::timeout);
p.setValue();
e2.terminateLoopSoon();
t.join();
ASSERT_EQ(resultF.get(), 17);
}
TEST(SemiFuture, DeferWithGetTimedGet) {
std::atomic<int> innerResult{0};
Promise<folly::Unit> p;
auto f = p.getFuture();
auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
EXPECT_THROW(std::move(sf).get(std::chrono::milliseconds(100)), TimedOut);
ASSERT_EQ(innerResult, 0);
}
TEST(SemiFuture, DeferWithGetTimedWait) {
Promise<folly::Unit> p;
auto f = p.getFuture();
auto sf = std::move(f).semi().defer([&]() { return 17; });
ASSERT_FALSE(sf.isReady());
sf.wait(std::chrono::milliseconds(100));
ASSERT_FALSE(sf.isReady());
p.setValue();
ASSERT_EQ(std::move(sf).get(), 17);
}
TEST(SemiFuture, DeferWithGetMultipleTimedWait) {
Promise<folly::Unit> p;
auto f = p.getFuture();
auto sf = std::move(f).semi().defer([&]() { return 17; });
sf.wait(std::chrono::milliseconds(100));
sf.wait(std::chrono::milliseconds(100));
ASSERT_FALSE(sf.isReady());
p.setValue();
sf.wait(std::chrono::milliseconds(100));
ASSERT_EQ(std::move(sf).get(), 17);
}
TEST(SemiFuture, DeferWithVia) {
std::atomic<int> innerResult{0};
EventBase e2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment