Commit d1be08e3 authored by Hans Fugal's avatar Hans Fugal Committed by Dave Watson

(wangle) Timeouts basic

Summary:
Add basic timeout functionality. This adds `futures::sleep` which makes an async Future that finishes after the given duration, and `Future::get` which blocks on the result and takes an optional timeout.

Introducing the folly::wangle::futures namespace (soon to be just folly::futures) which will hold our wangle utility functions, the things that live in the Future object in Twitter's scala code. We'll probably move when* and wait-ish methods in here too, and perhaps alias makeFuture-ish methods too, though James has me mostly convinced not to deprecate them at the folly::wangle level (because they're basically Future constructors and Future lives at folly::wangle)

`Future::delayed` after Twitter's helper of the same name

Test Plan: new and old unit tests

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, exa, folly-diffs@

FB internal diff: D1748894

Tasks: 4548494

Signature: t1:1748894:1419363496:1f4a62ec8455989c1fcce845695ace1d01c101c8
parent 062bc87d
......@@ -19,11 +19,18 @@
#include <chrono>
#include <thread>
#include <folly/wangle/futures/detail/Core.h>
#include <folly/Baton.h>
#include <folly/wangle/futures/detail/Core.h>
#include <folly/wangle/futures/Timekeeper.h>
namespace folly { namespace wangle {
class Timekeeper;
namespace detail {
Timekeeper* getTimekeeperSingleton();
}
template <typename T>
struct isFuture {
static const bool value = false;
......@@ -670,9 +677,9 @@ inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
return done;
}
template <typename T, class Duration>
template <typename T, class Dur>
Future<T>
waitWithSemaphore(Future<T>&& f, Duration timeout) {
waitWithSemaphore(Future<T>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<T> &&t) {
baton->post();
......@@ -682,9 +689,9 @@ waitWithSemaphore(Future<T>&& f, Duration timeout) {
return done;
}
template <class Duration>
template <class Dur>
Future<void>
waitWithSemaphore(Future<void>&& f, Duration timeout) {
waitWithSemaphore(Future<void>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<void> &&t) {
baton->post();
......@@ -694,6 +701,93 @@ waitWithSemaphore(Future<void>&& f, Duration timeout) {
return done;
}
namespace {
template <class T>
void getWaitHelper(Future<T>* f) {
// If we already have a value do the cheap thing
if (f->isReady()) {
return;
}
folly::Baton<> baton;
f->then([&](Try<T> const&) {
baton.post();
});
baton.wait();
}
template <class T>
Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
// TODO make and use variadic whenAny #5877971
Promise<T> p;
auto token = std::make_shared<std::atomic<bool>>();
folly::Baton<> baton;
folly::wangle::detail::getTimekeeperSingleton()->after(dur)
.then([&,token](Try<void> const& t) {
try {
t.value();
if (token->exchange(true) == false) {
p.setException(TimedOut());
baton.post();
}
} catch (std::exception const& e) {
if (token->exchange(true) == false) {
p.setException(std::current_exception());
baton.post();
}
}
});
f->then([&, token](Try<T>&& t) {
if (token->exchange(true) == false) {
p.fulfilTry(std::move(t));
baton.post();
}
});
baton.wait();
return p.getFuture();
}
}
template <class T>
T Future<T>::get() {
getWaitHelper(this);
// Big assumption here: the then() call above, since it doesn't move out
// the value, leaves us with a value to return here. This would be a big
// no-no in user code, but I'm invoking internal developer privilege. This
// is slightly more efficient (save a move()) especially if there's an
// exception (save a throw).
return std::move(value());
}
template <>
inline void Future<void>::get() {
getWaitHelper(this);
}
template <class T>
T Future<T>::get(Duration dur) {
return std::move(getWaitTimeoutHelper(this, dur).value());
}
template <>
inline void Future<void>::get(Duration dur) {
getWaitTimeoutHelper(this, dur).value();
}
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk)
{
return whenAll(*this, futures::sleep(dur, tk))
.then([](Try<std::tuple<Try<T>, Try<void>>>&& tup) {
Try<T>& t = std::get<0>(tup.value());
return makeFuture<T>(std::move(t));
});
}
}}
// I haven't included a Future<T&> specialization because I don't forsee us
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/futures/Future.h>
#include <folly/wangle/futures/detail/ThreadWheelTimekeeper.h>
#include <folly/Likely.h>
namespace folly { namespace wangle { namespace futures {
Future<void> sleep(Duration dur, Timekeeper* tk) {
if (LIKELY(!tk)) {
tk = detail::getTimekeeperSingleton();
}
return tk->after(dur);
}
}}}
......@@ -26,9 +26,13 @@
#include <folly/MoveWrapper.h>
#include <folly/wangle/futures/Promise.h>
#include <folly/wangle/futures/Try.h>
#include <folly/wangle/futures/WangleException.h>
#include <folly/wangle/futures/detail/Types.h>
namespace folly { namespace wangle {
template <class> struct Promise;
namespace detail {
template <class> struct Core;
......@@ -85,12 +89,34 @@ struct Extract<R(Class::*)(Args...)> {
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
} // detail
template <class> struct Promise;
struct Timekeeper;
template <typename T> struct isFuture;
/// This namespace is for utility functions that would usually be static
/// members of Future, except they don't make sense there because they don't
/// depend on the template type (rather, on the type of their arguments in
/// some cases). This is the least-bad naming scheme we could think of. Some
/// of the functions herein have really-likely-to-collide names, like "map"
/// and "sleep".
namespace futures {
/// Returns a Future that will complete after the specified duration. The
/// Duration typedef of a `std::chrono` duration type indicates the
/// resolution you can expect to be meaningful (milliseconds at the time of
/// writing). Normally you wouldn't need to specify a Timekeeper, we will
/// use the global wangle timekeeper (we run a thread whose job it is to
/// keep time for wangle timeouts) but we provide the option for power
/// users.
///
/// The Timekeeper thread will be lazily created the first time it is
/// needed. If your program never uses any timeouts or other time-based
/// Futures you will pay no Timekeeper thread overhead.
Future<void> sleep(Duration, Timekeeper* = nullptr);
}
template <class T>
class Future {
public:
......@@ -154,6 +180,15 @@ class Future {
/** A reference to the Try of the value */
Try<T>& getTry();
/// Block until the future is fulfilled. Returns the value (moved out), or
/// throws the exception. The future must not already have a callback.
T get();
/// Block until the future is fulfilled, or until timed out. Returns the
/// value (moved out), or throws the exception (which might be a TimedOut
/// exception).
T get(Duration dur);
/** When this Future has completed, execute func which is a function that
takes a Try<T>&&. A Future for the return type of func is
returned. e.g.
......@@ -427,6 +462,10 @@ class Future {
raise(FutureCancellation());
}
/// Delay the completion of this Future for at least this duration from
/// now. The optional Timekeeper is as with futures::sleep().
Future<T> delayed(Duration, Timekeeper* = nullptr);
private:
typedef detail::Core<T>* corePtr;
......@@ -568,8 +607,8 @@ Future<T> waitWithSemaphore(Future<T>&& f);
*
* Note: each call to this starts a (short-lived) thread and allocates memory.
*/
template <typename T, class Duration>
Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
template <typename T, class Dur>
Future<T> waitWithSemaphore(Future<T>&& f, Dur timeout);
}} // folly::wangle
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/futures/detail/Types.h>
namespace folly { namespace wangle {
template <class> struct Future;
/// A Timekeeper handles the details of keeping time and fulfilling delay
/// promises. The returned Future<void> will either complete after the
/// elapsed time, or in the event of some kind of exceptional error may hold
/// an exception. These Futures respond to cancellation. If you use a lot of
/// Delays and many of them ultimately are unneeded (as would be the case for
/// Delays that are used to trigger timeouts of async operations), then you
/// can and should cancel them to reclaim resources.
///
/// Users will typically get one of these via Future::sleep(Duration) or
/// use them implicitly behind the scenes by passing a timeout to some Future
/// operation.
///
/// Although we don't formally alias Delay = Future<void>,
/// that's an appropriate term for it. People will probably also call these
/// Timeouts, and that's ok I guess, but that term is so overloaded I thought
/// it made sense to introduce a cleaner term.
///
/// Remember that Duration is a std::chrono duration (millisecond resolution
/// at the time of writing).
class Timekeeper {
public:
virtual ~Timekeeper() = default;
/// Returns a future that will complete after the given duration with the
/// elapsed time. Exceptional errors can happen but they must be
/// exceptional. Use the steady (monotonic) clock.
///
/// You may cancel this Future to reclaim resources.
///
/// This future probably completes on the timer thread. You should almost
/// certainly follow it with a via() call or the accuracy of other timers
/// will suffer.
virtual Future<void> after(Duration) = 0;
/// Returns a future that will complete at the requested time.
///
/// You may cancel this Future to reclaim resources.
///
/// NB This is sugar for `after(when - now)`, so while you are welcome to
/// use a std::chrono::system_clock::time_point it will not track changes to
/// the system clock but rather execute that many milliseconds in the future
/// according to the steady clock.
template <class Clock>
Future<void> at(std::chrono::time_point<Clock> when);
};
}}
// now get those definitions
#include <folly/wangle/futures/Future.h>
// finally we can use Future
namespace folly { namespace wangle {
template <class Clock>
Future<void> Timekeeper::at(std::chrono::time_point<Clock> when) {
auto now = Clock::now();
if (when <= now) {
return makeFuture();
}
return after(when - now);
}
}}
......@@ -86,4 +86,9 @@ class FutureCancellation : public WangleException {
FutureCancellation() : WangleException("Future was cancelled") {}
};
class TimedOut : public WangleException {
public:
TimedOut() : WangleException("Timed out") {}
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThreadWheelTimekeeper.h"
#include <folly/experimental/Singleton.h>
#include <folly/wangle/futures/Future.h>
#include <future>
namespace folly { namespace wangle { namespace detail {
namespace {
Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
// Our Callback object for HHWheelTimer
struct WTCallback : public folly::HHWheelTimer::Callback {
// Only allow creation by this factory, to ensure heap allocation.
static WTCallback* create() {
// optimization opportunity: memory pool
return new WTCallback();
}
Future<void> getFuture() {
return promise_.getFuture();
}
protected:
Promise<void> promise_;
explicit WTCallback() {
promise_.setInterruptHandler(
std::bind(&WTCallback::interruptHandler, this));
}
void timeoutExpired() noexcept override {
promise_.setValue();
delete this;
}
void interruptHandler() {
cancelTimeout();
delete this;
}
};
} // namespace
ThreadWheelTimekeeper::ThreadWheelTimekeeper() :
thread_([this]{ eventBase_.loopForever(); }),
wheelTimer_(new HHWheelTimer(&eventBase_, std::chrono::milliseconds(1)))
{
eventBase_.waitUntilRunning();
eventBase_.runInEventBaseThread([this]{
// 15 characters max
eventBase_.setName("FutureTimekeepr");
});
}
ThreadWheelTimekeeper::~ThreadWheelTimekeeper() {
eventBase_.runInEventBaseThread([this]{
wheelTimer_->cancelAll();
});
eventBase_.terminateLoopSoon();
thread_.join();
}
Future<void> ThreadWheelTimekeeper::after(Duration dur) {
auto cob = WTCallback::create();
auto f = cob->getFuture();
eventBase_.runInEventBaseThread([=]{
wheelTimer_->scheduleTimeout(cob, dur);
});
return f;
}
Timekeeper* getTimekeeperSingleton() {
return timekeeperSingleton_.get_fast();
}
}}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/futures/Future.h>
#include <folly/wangle/futures/Timekeeper.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h>
#include <thread>
namespace folly { namespace wangle { namespace detail {
/// The default Timekeeper implementation which uses a HHWheelTimer on an
/// EventBase in a dedicated thread. Users needn't deal with this directly, it
/// is used by default by Future methods that work with timeouts.
class ThreadWheelTimekeeper : public Timekeeper {
public:
/// But it doesn't *have* to be a singleton.
ThreadWheelTimekeeper();
~ThreadWheelTimekeeper();
/// Implement the Timekeeper interface
/// This future *does* complete on the timer thread. You should almost
/// certainly follow it with a via() call or the accuracy of other timers
/// will suffer.
Future<void> after(Duration) override;
protected:
folly::EventBase eventBase_;
std::thread thread_;
HHWheelTimer::UniquePtr wheelTimer_;
};
Timekeeper* getTimekeeperSingleton();
}}}
......@@ -14,6 +14,12 @@
* limitations under the License.
*/
// fbbuild is too dumb to know that .h files in the directory affect
// our project, unless we have a .cpp file in the target, in the same
// directory.
#pragma once
#include <chrono>
namespace folly { namespace wangle {
using Duration = std::chrono::milliseconds;
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/wangle/futures/Timekeeper.h>
#include <unistd.h>
using namespace folly::wangle;
using namespace std::chrono;
using folly::wangle::Timekeeper;
using Duration = folly::wangle::Duration;
std::chrono::milliseconds const one_ms(1);
std::chrono::milliseconds const awhile(10);
std::chrono::steady_clock::time_point now() {
return std::chrono::steady_clock::now();
}
struct TimekeeperFixture : public testing::Test {
TimekeeperFixture() :
timeLord_(folly::wangle::detail::getTimekeeperSingleton())
{}
Timekeeper* timeLord_;
};
TEST_F(TimekeeperFixture, after) {
Duration waited(0);
auto t1 = now();
auto f = timeLord_->after(awhile);
EXPECT_FALSE(f.isReady());
f.get();
auto t2 = now();
EXPECT_GE(t2 - t1, awhile);
}
TEST(Timekeeper, futureGet) {
Promise<int> p;
std::thread([&]{ p.setValue(42); }).detach();
EXPECT_EQ(42, p.getFuture().get());
}
TEST(Timekeeper, futureGetBeforeTimeout) {
Promise<int> p;
std::thread([&]{ p.setValue(42); }).detach();
// Technically this is a race and if the test server is REALLY overloaded
// and it takes more than a second to do that thread it could be flaky. But
// I want a low timeout (in human terms) so if this regresses and someone
// runs it by hand they're not sitting there forever wondering why it's
// blocked, and get a useful error message instead. If it does get flaky,
// empirically increase the timeout to the point where it's very improbable.
EXPECT_EQ(42, p.getFuture().get(seconds(2)));
}
TEST(Timekeeper, futureGetTimeout) {
Promise<int> p;
EXPECT_THROW(p.getFuture().get(Duration(1)), folly::wangle::TimedOut);
}
TEST(Timekeeper, futureSleep) {
auto t1 = now();
futures::sleep(one_ms).get();
EXPECT_GE(now() - t1, one_ms);
}
TEST(Timekeeper, futureDelayed) {
auto t1 = now();
auto dur = makeFuture()
.delayed(one_ms)
.then([=]{ return now() - t1; })
.get();
EXPECT_GE(dur, one_ms);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment