Commit 9f916e14 authored by James Sedgwick's avatar James Sedgwick Committed by woo

getVia() and waitVia()

Summary:
Introduce ProgressableExecutor, which is an Executor that can be driven somehow. Examples include EventBase and ManualExecutor
Then introduce Future<T>::getVia(ProgressableExecutor*) and Future<T>::waitVia(ProgressableExecutor*) that drive the given executor until the Future is complete, with the usual semantics of get and wait respectively
This is a really common pattern in tests and you can see in the various changes to other projects lends sopme nice redness and cleanliness

Some notes:
1. I don't like ProgressableExecutor::makeProgress() that much. Too verbose. Maybe DrivableExecutor::drive()? Something else? Thoughts?
2. I still need to integrate this with some stuff in Zeus (SessionFuture) and Zookeeper (ZookeeperFuture) but I'm going to do that in a separate diff because it's going to be a little more intrusive
3. These APIs take a raw ptr so that they are consistent with via()
4. See inline note on ManualExecutor
5. See inline note in added unit tests

Test Plan: add unit for new API, wait for contbuild

Reviewed By: hans@fb.com

Subscribers: trunkagent, dresende, pzq, tdimson, fbcode-common-diffs@, targeting-diff-backend@, alandau, apollo-diffs@, bmatheny, everstore-dev@, zhuohuang, laser-diffs@, mshneer, folly-diffs@, hannesr, jsedgwick

FB internal diff: D1789122

Tasks: 5940008

Signature: t1:1789122:1421868315:6ea2fc2702be1dc283c24a46d345fb5da3935b32
parent ff1b7be8
......@@ -85,6 +85,7 @@ nobase_follyinclude_HEADERS = \
Format.h \
Format-inl.h \
futures/Deprecated.h \
futures/DrivableExecutor.h \
futures/Future-inl.h \
futures/Future.h \
futures/FutureException.h \
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Executor.h>
namespace folly {
/*
* A DrivableExecutor can be driven via its drive() method
* Examples include EventBase (via loopOnce()) and ManualExecutor
* (via makeProgress()).
*
* This interface is most handy in conjunction with
* Future<T>::getVia(DrivableExecutor*) and
* Future<T>::waitVia(DrivableExecutor*)
*
* These call drive() * repeatedly until the Future is fulfilled.
* getVia() returns the value (or throws the exception) and waitVia() returns
* the same Future for chainability.
*
* These will be most helpful in tests, for instance if you need to pump a mock
* EventBase until Futures complete.
*/
class DrivableExecutor : public virtual Executor {
public:
virtual ~DrivableExecutor() = default;
// Make progress on this Executor's work.
virtual void drive() = 0;
};
} // folly
......@@ -720,6 +720,22 @@ inline void Future<void>::get(Duration dur) {
getWaitTimeoutHelper(this, dur).value();
}
template <class T>
T Future<T>::getVia(DrivableExecutor* e) {
while (!isReady()) {
e->drive();
}
return std::move(value());
}
template <>
inline void Future<void>::getVia(DrivableExecutor* e) {
while (!isReady()) {
e->drive();
}
value();
}
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
......@@ -803,6 +819,22 @@ Future<T> Future<T>::wait(Duration dur) {
return done;
}
template <class T>
Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
while (!isReady()) {
e->drive();
}
return *this;
}
template <class T>
Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
while (!isReady()) {
e->drive();
}
return std::move(*this);
}
}
// I haven't included a Future<T&> specialization because I don't forsee us
......
......@@ -25,6 +25,7 @@
#include <folly/MoveWrapper.h>
#include <folly/futures/Deprecated.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/futures/Promise.h>
#include <folly/futures/Try.h>
#include <folly/futures/FutureException.h>
......@@ -90,7 +91,6 @@ struct Extract<R(Class::*)(Args...)> {
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
} // detail
struct Timekeeper;
......@@ -190,6 +190,11 @@ class Future {
/// exception).
T get(Duration dur);
/// Call e->drive() repeatedly until the future is fulfilled. Examples
/// of DrivableExecutor include EventBase and ManualExecutor. Returns the
/// value (moved out), or throws the exception.
T getVia(DrivableExecutor* e);
/** When this Future has completed, execute func which is a function that
takes a Try<T>&&. A Future for the return type of func is
returned. e.g.
......@@ -487,6 +492,15 @@ class Future {
/// depending on whether the Duration passed.
Future<T> wait(Duration);
/// Call e->drive() repeatedly until the future is fulfilled. Examples
/// of DrivableExecutor include EventBase and ManualExecutor. Returns a
/// reference to this Future so that you can chain calls if desired.
/// value (moved out), or throws the exception.
Future<T>& waitVia(DrivableExecutor* e) &;
/// Overload of waitVia() for rvalue Futures
Future<T> waitVia(DrivableExecutor* e) &&;
private:
typedef detail::Core<T>* corePtr;
......
......@@ -15,6 +15,7 @@
*/
#pragma once
#include <folly/futures/DrivableExecutor.h>
#include <folly/futures/ScheduledExecutor.h>
#include <semaphore.h>
#include <memory>
......@@ -31,7 +32,8 @@ namespace folly {
///
/// NB No attempt has been made to make anything other than add and schedule
/// threadsafe.
class ManualExecutor : public ScheduledExecutor {
class ManualExecutor : public DrivableExecutor,
public ScheduledExecutor {
public:
ManualExecutor();
......@@ -54,6 +56,11 @@ namespace folly {
run();
}
/// Implements DrivableExecutor
void drive() override {
makeProgress();
}
/// makeProgress until this Future is ready.
template <class F> void waitFor(F const& f) {
// TODO(5427828)
......
......@@ -23,7 +23,7 @@
namespace folly {
// An executor that supports timed scheduling. Like RxScheduler.
class ScheduledExecutor : public Executor {
class ScheduledExecutor : public virtual Executor {
public:
// Reality is that better than millisecond resolution is very hard to
// achieve. However, we reserve the right to be incredible.
......
......@@ -27,6 +27,7 @@
#include <folly/Executor.h>
#include <folly/futures/Future.h>
#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/MPMCQueue.h>
#include <folly/io/async/Request.h>
......@@ -969,6 +970,59 @@ TEST(Future, waitWithDuration) {
}
}
class DummyDrivableExecutor : public DrivableExecutor {
public:
void add(Func f) override {}
void drive() override { ran = true; }
bool ran{false};
};
TEST(Future, getVia) {
{
// non-void
ManualExecutor x;
auto f = via(&x).then([]{ return true; });
EXPECT_TRUE(f.getVia(&x));
}
{
// void
ManualExecutor x;
auto f = via(&x).then();
f.getVia(&x);
}
{
DummyDrivableExecutor x;
auto f = makeFuture(true);
EXPECT_TRUE(f.getVia(&x));
EXPECT_FALSE(x.ran);
}
}
TEST(Future, waitVia) {
{
ManualExecutor x;
auto f = via(&x).then();
EXPECT_FALSE(f.isReady());
f.waitVia(&x);
EXPECT_TRUE(f.isReady());
}
{
// try rvalue as well
ManualExecutor x;
auto f = via(&x).activate().then().waitVia(&x);
EXPECT_TRUE(f.isReady());
}
{
DummyDrivableExecutor x;
makeFuture(true).waitVia(&x);
EXPECT_FALSE(x.ran);
}
}
TEST(Future, callbackAfterActivate) {
Promise<void> p;
auto f = p.getFuture();
......
......@@ -20,13 +20,18 @@
#include <folly/futures/Future.h>
#include <folly/futures/InlineExecutor.h>
#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
using namespace folly;
struct ManualWaiter {
struct ManualWaiter : public DrivableExecutor {
explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
void makeProgress() {
void add(Func f) {
ex->add(f);
}
void drive() override {
ex->wait();
ex->run();
}
......@@ -44,7 +49,7 @@ struct ViaFixture : public testing::Test {
t = std::thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
eastWaiter.makeProgress();
eastWaiter.drive();
});
}
......@@ -146,10 +151,7 @@ TEST_F(ViaFixture, thread_hops) {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
});
while (!f.isReady()) {
waiter->makeProgress();
}
EXPECT_EQ(f.value(), 1);
EXPECT_EQ(f.getVia(waiter.get()), 1);
}
TEST_F(ViaFixture, chain_vias) {
......@@ -169,10 +171,7 @@ TEST_F(ViaFixture, chain_vias) {
return t.value();
});
while (!f.isReady()) {
waiter->makeProgress();
}
EXPECT_EQ(f.value(), 1);
EXPECT_EQ(f.getVia(waiter.get()), 1);
}
TEST_F(ViaFixture, bareViaAssignment) {
......
......@@ -21,6 +21,7 @@
#include <folly/io/async/TimeoutManager.h>
#include <folly/io/async/Request.h>
#include <folly/Executor.h>
#include <folly/futures/DrivableExecutor.h>
#include <memory>
#include <stack>
#include <list>
......@@ -95,9 +96,9 @@ class RequestEventBase : public RequestData {
* EventBase from other threads. When it is safe to call a method from
* another thread it is explicitly listed in the method comments.
*/
class EventBase :
private boost::noncopyable, public TimeoutManager, public Executor
{
class EventBase : private boost::noncopyable,
public TimeoutManager,
public DrivableExecutor {
public:
/**
* A callback interface to use with runInLoop()
......@@ -485,6 +486,11 @@ class EventBase :
runInEventBaseThread(fn);
}
/// Implements the DrivableExecutor interface
void drive() override {
loopOnce();
}
private:
// TimeoutManager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment