Commit a745237d authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Reorder SemiFuture::within to propagate executor

Summary: Tweak futures::within to be completely deferred, including the timeout path, allowing it to support a SemiFuture-returning TimeKeeper.

Reviewed By: yfeldblum

Differential Revision: D17006080

fbshipit-source-id: cbed58579d33f6d6ecf60dce5f9c8b886fc4d4c4
parent 44a17b1f
...@@ -2035,7 +2035,7 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && { ...@@ -2035,7 +2035,7 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
explicit Context(E ex) : exception(std::move(ex)) {} explicit Context(E ex) : exception(std::move(ex)) {}
E exception; E exception;
SemiFuture<Unit> thisFuture; SemiFuture<Unit> thisFuture;
Future<Unit> afterFuture; SemiFuture<Unit> afterFuture;
Promise<T> promise; Promise<T> promise;
std::atomic<bool> token{false}; std::atomic<bool> token{false};
}; };
...@@ -2061,7 +2061,7 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && { ...@@ -2061,7 +2061,7 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
// Have time keeper use a weak ptr to hold ctx, // Have time keeper use a weak ptr to hold ctx,
// so that ctx can be deallocated as soon as the future job finished. // so that ctx can be deallocated as soon as the future job finished.
ctx->afterFuture = tk->after(dur).thenTry( ctx->afterFuture = tk->after(dur).semi().defer(
[weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable { [weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
if (t.hasException() && if (t.hasException() &&
t.exception().is_compatible_with<FutureCancellation>()) { t.exception().is_compatible_with<FutureCancellation>()) {
...@@ -2093,8 +2093,15 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && { ...@@ -2093,8 +2093,15 @@ SemiFuture<T> SemiFuture<T>::within(Duration dur, E e, Timekeeper* tk) && {
} }
}); });
// Construct the future to return, assume the deferred executor from
// thisFuture and make afterFuture's executor nested to correctly propagate
// concrete exeutors
auto fut = ctx->promise.getSemiFuture(); auto fut = ctx->promise.getSemiFuture();
fut.setExecutor(ctx->thisFuture.stealDeferredExecutor()); fut.setExecutor(ctx->thisFuture.stealDeferredExecutor());
std::vector<folly::futures::detail::DeferredWrapper> nestedExecutors;
nestedExecutors.emplace_back(ctx->afterFuture.stealDeferredExecutor());
futures::detail::getDeferredExecutor(fut)->setNestedExecutors(
std::move(nestedExecutors));
return std::move(fut); return std::move(fut);
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/futures/Promise.h> #include <folly/futures/Promise.h>
#include <folly/futures/test/TestExecutor.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h> #include <folly/synchronization/Baton.h>
...@@ -83,10 +84,11 @@ TEST(Interrupt, futureWithinTimedOut) { ...@@ -83,10 +84,11 @@ TEST(Interrupt, futureWithinTimedOut) {
} }
TEST(Interrupt, semiFutureWithinTimedOut) { TEST(Interrupt, semiFutureWithinTimedOut) {
folly::TestExecutor ex(1);
Promise<int> p; Promise<int> p;
Baton<> done; Baton<> done;
p.setInterruptHandler([&](const exception_wrapper& /* e */) { done.post(); }); p.setInterruptHandler([&](const exception_wrapper& /* e */) { done.post(); });
p.getSemiFuture().within(std::chrono::milliseconds(1)); p.getSemiFuture().within(std::chrono::milliseconds(1)).via(&ex);
// Give it 100ms to time out and call the interrupt handler // Give it 100ms to time out and call the interrupt handler
EXPECT_TRUE(done.try_wait_for(std::chrono::milliseconds(100))); EXPECT_TRUE(done.try_wait_for(std::chrono::milliseconds(100)));
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Singleton.h> #include <folly/Singleton.h>
#include <folly/executors/ManualExecutor.h> #include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
...@@ -431,9 +432,12 @@ TEST(Timekeeper, semiFutureWithinChainedInterruptTest) { ...@@ -431,9 +432,12 @@ TEST(Timekeeper, semiFutureWithinChainedInterruptTest) {
} }
TEST(Timekeeper, executor) { TEST(Timekeeper, executor) {
class ExecutorTester : public Executor { class ExecutorTester : public DefaultKeepAliveExecutor {
public: public:
void add(Func f) override { ~ExecutorTester() override {
joinKeepAlive();
}
virtual void add(Func f) override {
count++; count++;
f(); f();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment