Commit b3e7e89c authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Use Executor::KeepAlive to store Executor in Core

Reviewed By: yfeldblum

Differential Revision: D8022229

fbshipit-source-id: 1ba1d777cbbeb1d1ad95dc3f4bae6b3d431b9440
parent 360d0d32
......@@ -1733,6 +1733,9 @@ namespace detail {
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f) {
if (std::is_base_of<Future<T>, FutureType>::value) {
f = std::move(f).via(nullptr);
}
// short-circuit if there's nothing to do
if (f.isReady()) {
return;
......@@ -1759,6 +1762,9 @@ void convertFuture(SemiFuture<T>&& sf, SemiFuture<T>& f) {
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f, Duration dur) {
if (std::is_base_of<Future<T>, FutureType>::value) {
f = std::move(f).via(nullptr);
}
// short-circuit if there's nothing to do
if (f.isReady()) {
return;
......@@ -1779,6 +1785,7 @@ void waitImpl(FutureType& f, Duration dur) {
template <class T>
void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
f = std::move(f).via(nullptr);
// Set callback so to ensure that the via executor has something on it
// so that once the preceding future triggers this callback, drive will
// always have a callback to satisfy it
......@@ -1797,6 +1804,7 @@ void waitViaImpl(
Future<T>& f,
TimedDrivableExecutor* e,
const std::chrono::duration<Rep, Period>& timeout) {
f = std::move(f).via(nullptr);
// Set callback so to ensure that the via executor has something on it
// so that once the preceding future triggers this callback, drive will
// always have a callback to satisfy it
......
......@@ -196,12 +196,12 @@ class Core final {
/// which might trigger invocation of the callback
void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
DCHECK(fsm_.getState() != State::OnlyCallback);
executor_ = x;
executor_ = x ? getKeepAliveToken(x) : Executor::KeepAlive<>();
priority_ = priority;
}
Executor* getExecutor() const {
return executor_;
return executor_.get();
}
/// Call only from Future thread
......@@ -295,7 +295,7 @@ class Core final {
void doCallback() {
DCHECK(fsm_.getState() == State::Done);
Executor* x = executor_;
auto x = exchange(executor_, Executor::KeepAlive<>());
int8_t priority = priority_;
if (x) {
......@@ -314,16 +314,19 @@ class Core final {
CoreAndCallbackReference guard_local_scope(this);
CoreAndCallbackReference guard_lambda(this);
try {
auto xPtr = x.get();
if (LIKELY(x->getNumPriorities() == 1)) {
x->add([core_ref = std::move(guard_lambda)]() mutable {
xPtr->add([core_ref = std::move(guard_lambda),
keepAlive = std::move(x)]() mutable {
auto cr = std::move(core_ref);
Core* const core = cr.getCore();
RequestContextScopeGuard rctx(core->context_);
core->callback_(std::move(*core->result_));
});
} else {
x->addWithPriority(
[core_ref = std::move(guard_lambda)]() mutable {
xPtr->addWithPriority(
[core_ref = std::move(guard_lambda),
keepAlive = std::move(x)]() mutable {
auto cr = std::move(core_ref);
Core* const core = cr.getCore();
RequestContextScopeGuard rctx(core->context_);
......@@ -382,7 +385,7 @@ class Core final {
std::atomic<bool> interruptHandlerSet_ {false};
SpinLock interruptLock_;
int8_t priority_ {-1};
Executor* executor_ {nullptr};
Executor::KeepAlive<> executor_;
std::shared_ptr<RequestContext> context_ {nullptr};
std::unique_ptr<exception_wrapper> interrupt_ {};
std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
......
......@@ -390,11 +390,11 @@ TEST(SemiFuture, MakeFutureFromSemiFuture) {
result = value;
return value;
});
e.loop();
e.loopOnce();
EXPECT_EQ(result, 0);
EXPECT_FALSE(future.isReady());
p.setValue(42);
e.loop();
e.loopOnce();
EXPECT_TRUE(future.isReady());
ASSERT_EQ(future.value(), 42);
ASSERT_EQ(result, 42);
......@@ -409,11 +409,11 @@ TEST(SemiFuture, MakeFutureFromSemiFutureReturnFuture) {
result = value;
return folly::makeFuture(std::move(value));
});
e.loop();
e.loopOnce();
EXPECT_EQ(result, 0);
EXPECT_FALSE(future.isReady());
p.setValue(42);
e.loop();
e.loopOnce();
EXPECT_TRUE(future.isReady());
ASSERT_EQ(future.value(), 42);
ASSERT_EQ(result, 42);
......@@ -433,11 +433,13 @@ TEST(SemiFuture, MakeFutureFromSemiFutureReturnSemiFuture) {
.then([&](int value) {
return folly::makeSemiFuture(std::move(value));
});
e.loop();
e.loopOnce();
EXPECT_EQ(result, 0);
EXPECT_FALSE(future.isReady());
p.setValue(42);
e.loop();
e.loopOnce();
e.loopOnce();
e.loopOnce();
EXPECT_TRUE(future.isReady());
ASSERT_EQ(future.value(), 42);
ASSERT_EQ(result, 42);
......@@ -452,11 +454,11 @@ TEST(SemiFuture, MakeFutureFromSemiFutureLValue) {
result = value;
return value;
});
e.loop();
e.loopOnce();
EXPECT_EQ(result, 0);
EXPECT_FALSE(future.isReady());
p.setValue(42);
e.loop();
e.loopOnce();
EXPECT_TRUE(future.isReady());
ASSERT_EQ(future.value(), 42);
ASSERT_EQ(result, 42);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment