Commit f22f6d96 authored by Lee Howes's avatar Lee Howes Committed by Facebook Github Bot

Make Future::delayed complete on Future's executor.

Summary: Future::delayed could complete on Future::sleep's executor. Ensure instead that it completes on the future's executor.

Reviewed By: yfeldblum

Differential Revision: D8052038

fbshipit-source-id: adcd387e237dbd724d5f9deb5c80d1418987482d
parent a0b0d9b5
......@@ -1743,8 +1743,11 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
auto* currentExecutor = this->getExecutor();
return collectAllSemiFuture(*this, futures::sleep(dur, tk))
.toUnsafeFuture()
.via(
currentExecutor ? currentExecutor
: &folly::InlineExecutor::instance())
.then([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(t));
......
......@@ -15,6 +15,7 @@
*/
#include <folly/Singleton.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
#include <folly/futures/ThreadWheelTimekeeper.h>
#include <folly/portability/GTest.h>
......@@ -108,6 +109,65 @@ TEST(Timekeeper, futureDelayed) {
EXPECT_GE(dur, one_ms);
}
TEST(Timekeeper, futureDelayedStickyExecutor) {
auto t1 = now();
// Check that delayed without an executor binds the inline executor.
{
class TimekeeperHelper : public ThreadWheelTimekeeper {
public:
std::thread::id get_thread_id() {
return thread_.get_id();
}
};
TimekeeperHelper tk;
std::thread::id timekeeper_thread_id = tk.get_thread_id();
std::thread::id task_thread_id{};
auto dur = makeFuture()
.delayed(one_ms, &tk)
.then([=, &task_thread_id] {
task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(timekeeper_thread_id, task_thread_id);
}
// Check that delayed applied to an executor returns a future that binds
// to the same executor as was input.
{
std::thread::id driver_thread_id{};
std::thread::id first_task_thread_id{};
std::thread::id second_task_thread_id{};
folly::ManualExecutor me;
std::atomic<bool> stop_signal{false};
std::thread me_driver{[&me, &driver_thread_id, &stop_signal] {
driver_thread_id = std::this_thread::get_id();
while (!stop_signal) {
me.run();
}
}};
auto dur = makeSemiFuture()
.via(&me)
.then([&first_task_thread_id] {
first_task_thread_id = std::this_thread::get_id();
})
.delayed(one_ms)
.then([=, &second_task_thread_id] {
second_task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
stop_signal = true;
me_driver.join();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(driver_thread_id, first_task_thread_id);
EXPECT_EQ(driver_thread_id, second_task_thread_id);
}
}
TEST(Timekeeper, futureWithinThrows) {
Promise<int> p;
auto f =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment