Commit e5ccead7 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add ThreadWheelTimekeeperHighRes class

Summary: Add ThreadWheelTimekeeperHighRes class

Reviewed By: LeeHowes

Differential Revision: D20355093

fbshipit-source-id: b76563a3706d7732b490a88096fed484954820e5
parent 543fef62
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/ThreadWheelTimekeeperHighRes.h>
#include <folly/Chrono.h>
#include <folly/Singleton.h>
#include <folly/futures/Future.h>
#include <folly/futures/WTCallback.h>
#include <future>
namespace folly {
ThreadWheelTimekeeperHighRes::ThreadWheelTimekeeperHighRes(
std::chrono::microseconds intervalDuration)
: timeoutMgr_(&eventBase_),
thread_([this] { eventBase_.loopForever(); }),
wheelTimer_(
HHWheelTimerHighRes::newTimer(&timeoutMgr_, intervalDuration)) {
eventBase_.waitUntilRunning();
eventBase_.runInEventBaseThread([this] {
// 15 characters max
eventBase_.setName("FutureTimekeepr");
});
}
ThreadWheelTimekeeperHighRes::~ThreadWheelTimekeeperHighRes() {
eventBase_.runInEventBaseThreadAndWait([this] {
wheelTimer_->cancelAll();
eventBase_.terminateLoopSoon();
});
thread_.join();
}
SemiFuture<Unit> ThreadWheelTimekeeperHighRes::after(HighResDuration dur) {
auto cob = WTCallback<HighResDuration>::create(&eventBase_);
auto f = cob->getSemiFuture();
//
// Even shared_ptr of cob is captured in lambda this is still somewhat *racy*
// because it will be released once timeout is scheduled. So technically there
// is no gurantee that EventBase thread can safely call timeout callback.
// However due to fact that we are having circular reference here:
// WTCallback->Promise->Core->WTCallbak, so three of them won't go away until
// we break the circular reference. The break happens either in
// WTCallback::timeoutExpired or WTCallback::interruptHandler. Former means
// timeout callback is being safely executed. Latter captures shared_ptr of
// WTCallback again in another lambda for canceling timeout. The moment
// canceling timeout is executed in EventBase thread, the actual timeout
// callback has either been executed, or will never be executed. So we are
// fine here.
//
eventBase_.runInEventBaseThread(
[this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); });
return f;
}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/STTimerFDTimeoutManager.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
#include <thread>
namespace folly {
class ThreadWheelTimekeeperHighRes : public Timekeeper {
public:
explicit ThreadWheelTimekeeperHighRes(
std::chrono::microseconds intervalDuration = std::chrono::microseconds(
HHWheelTimerHighRes::DEFAULT_TICK_INTERVAL));
~ThreadWheelTimekeeperHighRes() override;
/// Implement the Timekeeper interface
SemiFuture<Unit> after(HighResDuration) override;
protected:
folly::EventBase eventBase_;
STTimerFDTimeoutManager timeoutMgr_;
std::thread thread_;
HHWheelTimerHighRes::UniquePtr wheelTimer_;
};
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Singleton.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/ThreadWheelTimekeeperHighRes.h>
#include <folly/futures/Future.h>
#include <folly/futures/ThreadWheelTimekeeper.h>
#include <folly/portability/GTest.h>
using namespace folly;
using std::chrono::milliseconds;
std::chrono::milliseconds const zero_ms(0);
std::chrono::milliseconds const one_ms(1);
std::chrono::milliseconds const awhile(10);
std::chrono::seconds const too_long(10);
std::chrono::steady_clock::time_point now() {
return std::chrono::steady_clock::now();
}
struct TimekeeperFixture : public testing::Test {
TimekeeperFixture()
: timeLord_(std::make_shared<ThreadWheelTimekeeperHighRes>()) {}
std::shared_ptr<Timekeeper> timeLord_;
};
TEST_F(TimekeeperFixture, after) {
auto t1 = now();
auto f = timeLord_->after(awhile);
EXPECT_FALSE(f.isReady());
std::move(f).get();
auto t2 = now();
EXPECT_GE(t2 - t1, awhile);
}
TEST_F(TimekeeperFixture, afterUnsafe) {
auto t1 = now();
auto f = timeLord_->afterUnsafe(awhile);
EXPECT_FALSE(f.isReady());
std::move(f).get();
auto t2 = now();
EXPECT_GE(t2 - t1, awhile);
}
TEST(Timekeeper, futureGet) {
Promise<int> p;
auto t = std::thread([&] { p.setValue(42); });
EXPECT_EQ(42, p.getFuture().get());
t.join();
}
TEST(Timekeeper, futureGetBeforeTimeout) {
Promise<int> p;
auto t = std::thread([&] { p.setValue(42); });
// Technically this is a race and if the test server is REALLY overloaded
// and it takes more than a second to do that thread it could be flaky. But
// I want a low timeout (in human terms) so if this regresses and someone
// runs it by hand they're not sitting there forever wondering why it's
// blocked, and get a useful error message instead. If it does get flaky,
// empirically increase the timeout to the point where it's very improbable.
EXPECT_EQ(42, p.getFuture().get(std::chrono::seconds(2)));
t.join();
}
TEST(Timekeeper, futureGetTimeout) {
Promise<int> p;
EXPECT_THROW(p.getFuture().get(one_ms), folly::FutureTimeout);
}
TEST(Timekeeper, futureSleep) {
auto t1 = now();
futures::sleep(one_ms).get();
EXPECT_GE(now() - t1, one_ms);
}
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
TEST(Timekeeper, futureSleepUnsafe) {
auto t1 = now();
futures::sleepUnsafe(one_ms).get();
EXPECT_GE(now() - t1, one_ms);
}
FOLLY_POP_WARNING
TEST(Timekeeper, futureSleepHandlesNullTimekeeperSingleton) {
Singleton<ThreadWheelTimekeeper>::make_mock([] { return nullptr; });
SCOPE_EXIT {
Singleton<ThreadWheelTimekeeper>::make_mock();
};
EXPECT_THROW(futures::sleep(one_ms).get(), FutureNoTimekeeper);
}
TEST(Timekeeper, futureWithinHandlesNullTimekeeperSingleton) {
Singleton<ThreadWheelTimekeeper>::make_mock([] { return nullptr; });
SCOPE_EXIT {
Singleton<ThreadWheelTimekeeper>::make_mock();
};
Promise<int> p;
auto f = p.getFuture().within(one_ms);
EXPECT_THROW(std::move(f).get(), FutureNoTimekeeper);
}
TEST(Timekeeper, semiFutureWithinHandlesNullTimekeeperSingleton) {
Singleton<ThreadWheelTimekeeper>::make_mock([] { return nullptr; });
SCOPE_EXIT {
Singleton<ThreadWheelTimekeeper>::make_mock();
};
Promise<int> p;
auto f = p.getSemiFuture().within(one_ms);
EXPECT_THROW(std::move(f).get(), FutureNoTimekeeper);
}
TEST(Timekeeper, semiFutureWithinCancelsTimeout) {
struct MockTimekeeper : Timekeeper {
MockTimekeeper() {
p_.setInterruptHandler([this](const exception_wrapper& ew) {
ew.handle([this](const FutureCancellation&) { cancelled_ = true; });
p_.setException(ew);
});
}
SemiFuture<Unit> after(HighResDuration) override {
return p_.getSemiFuture();
}
Promise<Unit> p_;
bool cancelled_{false};
};
MockTimekeeper tk;
Promise<int> p;
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
p.setValue(1);
f.wait();
EXPECT_TRUE(tk.cancelled_);
}
TEST(Timekeeper, semiFutureWithinInlineAfter) {
struct MockTimekeeper : Timekeeper {
SemiFuture<Unit> after(HighResDuration) override {
return folly::makeSemiFuture<folly::Unit>(folly::FutureNoTimekeeper());
}
};
MockTimekeeper tk;
Promise<int> p;
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
EXPECT_THROW(std::move(f).get(), folly::FutureNoTimekeeper);
}
TEST(Timekeeper, semiFutureWithinReady) {
struct MockTimekeeper : Timekeeper {
SemiFuture<Unit> after(HighResDuration) override {
called_ = true;
return folly::makeSemiFuture<folly::Unit>(folly::FutureNoTimekeeper());
}
bool called_{false};
};
MockTimekeeper tk;
Promise<int> p;
p.setValue(1);
auto f = p.getSemiFuture().within(too_long, static_cast<Timekeeper*>(&tk));
f.wait();
EXPECT_FALSE(tk.called_);
}
TEST(Timekeeper, futureDelayed) {
auto t1 = now();
auto dur = makeFuture()
.delayed(one_ms)
.thenValue([=](auto&&) { return now() - t1; })
.get();
EXPECT_GE(dur, one_ms);
}
TEST(Timekeeper, semiFutureDelayed) {
auto t1 = now();
auto dur = makeSemiFuture()
.delayed(one_ms)
.toUnsafeFuture()
.thenValue([=](auto&&) { return now() - t1; })
.get();
EXPECT_GE(dur, one_ms);
}
TEST(Timekeeper, futureDelayedStickyExecutor) {
// Check that delayed without an executor binds the inline executor.
{
auto t1 = now();
class TimekeeperHelper : public ThreadWheelTimekeeper {
public:
std::thread::id get_thread_id() {
return thread_.get_id();
}
};
TimekeeperHelper tk;
std::thread::id timekeeper_thread_id = tk.get_thread_id();
std::thread::id task_thread_id{};
auto dur = makeFuture()
.delayed(one_ms, &tk)
.thenValue([=, &task_thread_id](auto&&) {
task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(timekeeper_thread_id, task_thread_id);
}
// Check that delayed applied to an executor returns a future that binds
// to the same executor as was input.
{
auto t1 = now();
std::thread::id driver_thread_id{};
std::thread::id first_task_thread_id{};
std::thread::id second_task_thread_id{};
folly::ManualExecutor me;
std::atomic<bool> stop_signal{false};
std::thread me_driver{[&me, &driver_thread_id, &stop_signal] {
driver_thread_id = std::this_thread::get_id();
while (!stop_signal) {
me.run();
}
}};
auto dur = makeSemiFuture()
.via(&me)
.thenValue([&first_task_thread_id](auto&&) {
first_task_thread_id = std::this_thread::get_id();
})
.delayed(one_ms)
.thenValue([=, &second_task_thread_id](auto&&) {
second_task_thread_id = std::this_thread::get_id();
return now() - t1;
})
.get();
stop_signal = true;
me_driver.join();
EXPECT_GE(dur, one_ms);
EXPECT_EQ(driver_thread_id, first_task_thread_id);
EXPECT_EQ(driver_thread_id, second_task_thread_id);
}
}
TEST(Timekeeper, futureWithinThrows) {
Promise<int> p;
auto f = p.getFuture().within(one_ms).thenError(
tag_t<FutureTimeout>{}, [](auto&&) { return -1; });
EXPECT_EQ(-1, std::move(f).get());
}
TEST(Timekeeper, semiFutureWithinThrows) {
Promise<int> p;
auto f = p.getSemiFuture().within(one_ms).toUnsafeFuture().thenError(
tag_t<FutureTimeout>{}, [](auto&&) { return -1; });
EXPECT_EQ(-1, std::move(f).get());
}
TEST(Timekeeper, futureWithinAlreadyComplete) {
auto f = makeFuture(42).within(one_ms).thenError(
tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
EXPECT_EQ(42, std::move(f).get());
}
TEST(Timekeeper, semiFutureWithinAlreadyComplete) {
auto f = makeSemiFuture(42).within(one_ms).toUnsafeFuture().thenError(
tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
EXPECT_EQ(42, std::move(f).get());
}
TEST(Timekeeper, futureWithinFinishesInTime) {
Promise<int> p;
auto f = p.getFuture()
.within(std::chrono::minutes(1))
.thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
p.setValue(42);
EXPECT_EQ(42, std::move(f).get());
}
TEST(Timekeeper, semiFutureWithinFinishesInTime) {
Promise<int> p;
auto f = p.getSemiFuture()
.within(std::chrono::minutes(1))
.toUnsafeFuture()
.thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
p.setValue(42);
EXPECT_EQ(42, std::move(f).get());
}
TEST(Timekeeper, futureWithinVoidSpecialization) {
makeFuture().within(one_ms);
}
TEST(Timekeeper, semiFutureWithinVoidSpecialization) {
makeSemiFuture().within(one_ms);
}
TEST(Timekeeper, futureWithinException) {
Promise<Unit> p;
auto f = p.getFuture().within(awhile, std::runtime_error("expected"));
EXPECT_THROW(std::move(f).get(), std::runtime_error);
}
TEST(Timekeeper, semiFutureWithinException) {
Promise<Unit> p;
auto f = p.getSemiFuture().within(awhile, std::runtime_error("expected"));
EXPECT_THROW(std::move(f).get(), std::runtime_error);
}
TEST(Timekeeper, onTimeout) {
bool flag = false;
makeFuture(42)
.delayed(10 * one_ms)
.onTimeout(
zero_ms,
[&] {
flag = true;
return -1;
})
.get();
EXPECT_TRUE(flag);
}
TEST(Timekeeper, onTimeoutComplete) {
bool flag = false;
makeFuture(42)
.onTimeout(
zero_ms,
[&] {
flag = true;
return -1;
})
.get();
EXPECT_FALSE(flag);
}
TEST(Timekeeper, onTimeoutReturnsFuture) {
bool flag = false;
makeFuture(42)
.delayed(10 * one_ms)
.onTimeout(
zero_ms,
[&] {
flag = true;
return makeFuture(-1);
})
.get();
EXPECT_TRUE(flag);
}
TEST(Timekeeper, onTimeoutVoid) {
makeFuture().delayed(one_ms).onTimeout(zero_ms, [&] {});
makeFuture().delayed(one_ms).onTimeout(zero_ms, [&] {
return makeFuture<Unit>(std::runtime_error("expected"));
});
// just testing compilation here
}
TEST(Timekeeper, interruptDoesntCrash) {
auto f = futures::sleep(too_long);
f.cancel();
}
TEST(Timekeeper, chainedInterruptTest) {
bool test = false;
auto f = futures::sleep(milliseconds(100)).deferValue([&](auto&&) {
test = true;
});
f.cancel();
f.wait();
EXPECT_FALSE(test);
}
TEST(Timekeeper, futureWithinChainedInterruptTest) {
bool test = false;
Promise<Unit> p;
p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
ex.handle(
[&test](const FutureCancellation& /* cancellation */) { test = true; });
p.setException(ex);
});
auto f = p.getFuture().within(milliseconds(100));
EXPECT_FALSE(test) << "Sanity check";
f.cancel();
f.wait();
EXPECT_TRUE(test);
}
TEST(Timekeeper, semiFutureWithinChainedInterruptTest) {
bool test = false;
Promise<Unit> p;
p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
ex.handle(
[&test](const FutureCancellation& /* cancellation */) { test = true; });
p.setException(ex);
});
auto f = p.getSemiFuture().within(milliseconds(100));
EXPECT_FALSE(test) << "Sanity check";
f.cancel();
f.wait();
EXPECT_TRUE(test);
}
TEST(Timekeeper, executor) {
class ExecutorTester : public DefaultKeepAliveExecutor {
public:
~ExecutorTester() override {
joinKeepAlive();
}
virtual void add(Func f) override {
count++;
f();
}
std::atomic<int> count{0};
};
Promise<Unit> p;
ExecutorTester tester;
auto f = p.getFuture()
.via(&tester)
.within(milliseconds(100))
.thenValue([&](auto&&) {});
p.setValue();
f.wait();
EXPECT_EQ(2, tester.count);
}
// TODO(5921764)
/*
TEST(Timekeeper, onTimeoutPropagates) {
bool flag = false;
EXPECT_THROW(
makeFuture(42).delayed(one_ms)
.onTimeout(zero_ms, [&]{ flag = true; })
.get(),
FutureTimeout);
EXPECT_TRUE(flag);
}
*/
TEST_F(TimekeeperFixture, atBeforeNow) {
auto f = timeLord_->at(now() - too_long);
EXPECT_TRUE(f.isReady());
EXPECT_FALSE(f.hasException());
}
TEST_F(TimekeeperFixture, howToCastDuration) {
// I'm not sure whether this rounds up or down but it's irrelevant for the
// purpose of this example.
auto f = timeLord_->after(
std::chrono::duration_cast<Duration>(std::chrono::nanoseconds(1)));
}
TEST_F(TimekeeperFixture, destruction) {
folly::Optional<ThreadWheelTimekeeper> tk;
tk.emplace();
auto f = tk->after(std::chrono::seconds(10));
EXPECT_FALSE(f.isReady());
tk.reset();
EXPECT_TRUE(f.isReady());
EXPECT_TRUE(f.hasException());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment