Commit ed88a3a3 authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Make Future<T>::delayed complete on correct executor 5/n: Add back...

Make Future<T>::delayed complete on correct executor 5/n: Add back Future::delayed with correct behaviour

Summary:
Overall plan to modify Future<T>::delayed to complete on the same
executor as the input future.

[folly::Futures] Make Future<T>::delayed complete on correct executor 5/n:
 * Add back future::delayed.
 * Have Future::delayed complete on input executor.
 * r-value qualify Future::delayed

Reviewed By: yfeldblum, marshallcline

Differential Revision: D8238220

fbshipit-source-id: 79afa8cc9a9fe588609ad186ad62f714ee322f7d
parent 36502346
......@@ -1734,6 +1734,16 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
// delayed
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) && {
auto e = this->getExecutor();
return collectAllSemiFuture(*this, futures::sleep(dur, tk))
.via(e ? e : &folly::InlineExecutor::instance())
.then([](std::tuple<Try<T>, Try<Unit>>&& tup) {
return makeFuture<T>(std::get<0>(std::move(tup)));
});
}
template <class T>
Future<T> Future<T>::delayedUnsafe(Duration dur, Timekeeper* tk) {
return collectAllSemiFuture(*this, futures::sleep(dur, tk))
......
......@@ -934,6 +934,10 @@ class Future : private futures::detail::FutureBase<T> {
template <class E>
Future<T> within(Duration, E exception, Timekeeper* = nullptr);
/// Delay the completion of this Future for at least this duration from
/// now. The optional Timekeeper is as with futures::sleep().
Future<T> delayed(Duration, Timekeeper* = nullptr) &&;
/// Delay the completion of this Future for at least this duration from
/// now. The optional Timekeeper is as with futures::sleep().
/// NOTE: Deprecated
......
......@@ -15,6 +15,7 @@
*/
#include <folly/Singleton.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
#include <folly/futures/ThreadWheelTimekeeper.h>
#include <folly/portability/GTest.h>
......@@ -98,6 +99,14 @@ TEST(Timekeeper, futureWithinHandlesNullTimekeeperSingleton) {
EXPECT_THROW(f.get(), FutureNoTimekeeper);
}
TEST(Timekeeper, futureDelayed) {
auto t1 = now();
auto dur =
makeFuture().delayed(one_ms).then([=] { return now() - t1; }).get();
EXPECT_GE(dur, one_ms);
}
TEST(Timekeeper, futureDelayedUnsafe) {
auto t1 = now();
auto dur =
......@@ -106,6 +115,65 @@ TEST(Timekeeper, futureDelayedUnsafe) {
EXPECT_GE(dur, one_ms);
}
TEST(Timekeeper, futureDelayedStickyExecutor) {
// Check that delayed without an executor binds the inline executor.
{
auto t1 = now();
class TimekeeperHelper : public ThreadWheelTimekeeper {
public:
std::thread::id get_thread_id() {
return thread_.get_id();
}
};
TimekeeperHelper tk;
std::thread::id timekeeper_thread_id = tk.get_thread_id();
std::thread::id task_thread_id{};
auto dur = makeFuture()
.delayed(one_ms, &tk)
.then([=, &task_thread_id] {
task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(timekeeper_thread_id, task_thread_id);
}
// Check that delayed applied to an executor returns a future that binds
// to the same executor as was input.
{
auto t1 = now();
std::thread::id driver_thread_id{};
std::thread::id first_task_thread_id{};
std::thread::id second_task_thread_id{};
folly::ManualExecutor me;
std::atomic<bool> stop_signal{false};
std::thread me_driver{[&me, &driver_thread_id, &stop_signal] {
driver_thread_id = std::this_thread::get_id();
while (!stop_signal) {
me.run();
}
}};
auto dur = makeSemiFuture()
.via(&me)
.then([&first_task_thread_id] {
first_task_thread_id = std::this_thread::get_id();
})
.delayed(one_ms)
.then([=, &second_task_thread_id] {
second_task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
stop_signal = true;
me_driver.join();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(driver_thread_id, first_task_thread_id);
EXPECT_EQ(driver_thread_id, second_task_thread_id);
}
}
TEST(Timekeeper, futureWithinThrows) {
Promise<int> p;
auto f =
......@@ -144,7 +212,7 @@ TEST(Timekeeper, futureWithinException) {
TEST(Timekeeper, onTimeout) {
bool flag = false;
makeFuture(42)
.delayedUnsafe(10 * one_ms)
.delayed(10 * one_ms)
.onTimeout(
zero_ms,
[&] {
......@@ -166,7 +234,7 @@ TEST(Timekeeper, onTimeoutComplete) {
TEST(Timekeeper, onTimeoutReturnsFuture) {
bool flag = false;
makeFuture(42)
.delayedUnsafe(10 * one_ms)
.delayed(10 * one_ms)
.onTimeout(
zero_ms,
[&] {
......@@ -178,8 +246,8 @@ TEST(Timekeeper, onTimeoutReturnsFuture) {
}
TEST(Timekeeper, onTimeoutVoid) {
makeFuture().delayedUnsafe(one_ms).onTimeout(zero_ms, [&] {});
makeFuture().delayedUnsafe(one_ms).onTimeout(zero_ms, [&] {
makeFuture().delayed(one_ms).onTimeout(zero_ms, [&] {});
makeFuture().delayed(one_ms).onTimeout(zero_ms, [&] {
return makeFuture<Unit>(std::runtime_error("expected"));
});
// just testing compilation here
......@@ -238,7 +306,7 @@ TEST(Timekeeper, executor) {
TEST(Timekeeper, onTimeoutPropagates) {
bool flag = false;
EXPECT_THROW(
makeFuture(42).delayedUnsafe(one_ms)
makeFuture(42).delayed(one_ms)
.onTimeout(zero_ms, [&]{ flag = true; })
.get(),
FutureTimeout);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment