Commit b8c06335 authored by Hans Fugal's avatar Hans Fugal Committed by dcsommer

kill threadgate

Test Plan: This diff is actually the test plan. It was contbuild bait. I fixed things that broke (in separate diffs), and then I rebased this on top, everything is green, and we pulled the trigger.

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, net-systems@, fugalh, exa, njormrod, folly-diffs@

FB internal diff: D1632937

Tasks: 5409538

Signature: t1:1632937:1414531009:4dbaa9513fbfba45a9c63e7be3de3646a6e926a2
parent 4ebfdff3
......@@ -203,7 +203,6 @@ nobase_follyinclude_HEADERS = \
wangle/Executor.h \
wangle/Future-inl.h \
wangle/Future.h \
wangle/GenericThreadGate.h \
wangle/InlineExecutor.h \
wangle/Later-inl.h \
wangle/Later.h \
......@@ -213,7 +212,6 @@ nobase_follyinclude_HEADERS = \
wangle/Promise.h \
wangle/QueuedImmediateExecutor.h \
wangle/ScheduledExecutor.h \
wangle/ThreadGate.h \
wangle/Try-inl.h \
wangle/Try.h \
wangle/WangleException.h \
......@@ -286,7 +284,6 @@ libfolly_la_SOURCES = \
Version.cpp \
wangle/InlineExecutor.cpp \
wangle/ManualExecutor.cpp \
wangle/ThreadGate.cpp \
experimental/io/FsUtil.cpp \
experimental/Singleton.cpp \
experimental/TestUtil.cpp \
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/ThreadGate.h>
#include <folly/wangle/Executor.h>
#include <type_traits>
namespace folly { namespace wangle {
/// This generic threadgate takes two executors and an optional waiter (if you
/// need to support waiting). Hint: use executors that inherit from Executor
/// (in Executor.h), then you just do
///
/// GenericThreadGate tg(westExecutor, eastExecutor, waiter);
template <
class WestExecutorPtr = Executor*,
class EastExecutorPtr = Executor*,
class WaiterPtr = void*>
class GenericThreadGate : public ThreadGate {
public:
/**
EastExecutor and WestExecutor respond threadsafely to
`add(std::function<void()>&&)`
Waiter responds to `makeProgress()`. It may block, as long as progress
will be made on the west front.
*/
GenericThreadGate(WestExecutorPtr west,
EastExecutorPtr east,
WaiterPtr waiter = nullptr) :
westExecutor(west),
eastExecutor(east),
waiter(waiter)
{}
void addWest(std::function<void()>&& fn) { westExecutor->add(std::move(fn)); }
void addEast(std::function<void()>&& fn) { eastExecutor->add(std::move(fn)); }
virtual void makeProgress() {
makeProgress_(std::is_same<WaiterPtr, void*>());
}
WestExecutorPtr westExecutor;
EastExecutorPtr eastExecutor;
WaiterPtr waiter;
private:
void makeProgress_(std::true_type const&) {
throw std::logic_error("No waiter.");
}
void makeProgress_(std::false_type const&) {
waiter->makeProgress();
}
};
}} // executor
......@@ -215,17 +215,6 @@ later = later.then(y3); // nor here
later.launch(); // explicit launch
```
The third and least flexible (but sometimes very useful) method assumes only two threads and that you want to do something in the far thread, then come back to the current thread. `ThreadGate` is an interface for a bidirectional gateway between two threads. It's usually easier to use a Later, but ThreadGate can be more efficient, and if the pattern is used often in your code it can be more convenient.
```C++
// Using a ThreadGate (which has two executors xe and xw)
tg.gate(a).then(b);
// Using via
makeFuture()
.via(xe).then(a)
.via(xw).then(b);
```
## You make me Promises, Promises
If you are wrapping an asynchronous operation, or providing an asynchronous API to users, then you will want to make Promises. Every Future has a corresponding Promise (except Futures that spring into existence already completed, with `makeFuture()`). Promises are simple, you make one, you extract the Future, and you fulfil it with a value or an exception. Example:
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/ThreadGate.h>
#include <stdexcept>
namespace folly { namespace wangle {
void ThreadGate::makeProgress()
{
throw std::logic_error("This ThreadGate doesn't know how to "
"make progress.");
}
}} // namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <folly/wangle/Future.h>
#include <folly/wangle/Deprecated.h>
namespace folly { namespace wangle {
/**
The ThreadGate strategy encapsulates a bidirectional gate via two Executors,
kind of like a stargate for wangle Future chains. Its implementation is
slightly more efficient then using Future::via in both directions, and if
the pattern is common it can be more convenient (although the overhead of
setting up a ThreadGate is less convenient in most situations).
// Using a ThreadGate (which has two executors xe and xw)
tg.gate(a).then(b);
// Using via
makeFuture()
.via(xe).then(a)
.via(xw).then(b);
If you're not sure whether you want a ThreadGate, you don't. Use via.
There are two actors, the east thread which does the asynchronous operation
(the server) and the west thread that wants the asynchronous operation done
(the client).
The client calls gate<T>(fn), which returns a Future<T>. Practically speaking
the returned Future<T> is the same as the Future<T> returned by fn. But
there are actually two futures involved - the original Future which will be
generated by fn (called the east Future), and the Future actually returned
by gate<T>(fn) (called the west Future).
These two futures are decoupled, and although the fulfilment of the east
Future eventually causes fulfilment of the west Future, those fulfilments
happen in their own threads.
In order to make and use a ThreadGate, you need to provide a strategy for
executing code in the east and west threads. These strategies may be
different. The only requirement is a threadsafe method
`void add(function<void()>&&)`.
In order for your ThreadGate to do anything, you need to drive those
executors somehow. An event loop is a natural fit. A thread pool might be
made to work. You could use a busy loop to make a very expensive space
heater. 0MQ would be pleasant.
Another pattern supported by the ThreadGate is the single-thread pattern. In
this pattern, non-blocking I/O drives the asynchronous operation, and
futures are fulfilled in an event loop callback. In this scenario,
ThreadGate is largely superfluous, and the executors would likely just
execute code immediately and inline (and therefore not need to be driven, or
threadsafe). But a Waiter strategy that makes progress by driving the event
loop one iteration would allow for gate-and-wait code which is agnostic to
the small detail that everything happens in one thread. It would also make
Future change toward a multithreaded architecture easier, as you need only
change the components of the ThreadGate which your client code is already
using.
*/
// DEPRECATED. Just use Future::via() to accomplish the same thing. If it's
// not obvious how, feel free to reach out.
class DEPRECATED ThreadGate {
public:
virtual ~ThreadGate() {}
/**
Returns a Future that will be fulfilled after the Future that will be
returned by fn() has been fulfilled, with the same value or exception
(moved).
There's a lot of nuance in that sentence. Let's break it down.
fn kicks off the asynchronous operation (makes the east Promise), and must
be executed in the east thread because the east thread is where the east
Promise will be fulfilled. Since gate is being called from the west
thread, we must gate fn using the east executor. fn is not executed
immediately, it is queued up and will be executed by the east thread as it
drives the executor.
We create the west Promise and return its Future.
When the east thread executes its task, fn is called and the resulting
Future gets a callback that will gate another task back to the west.
Sometime later, the asynchronous operation completes and the east Promise
is fulfilled. Then the east Future executes its callback, which adds a
task to the west executor that task is to fulfil the west Promise with the
same Try<T>, and it will execute in the west thread.
At this point, the west Future is still unfulfilled, even though the east
Future has been fulfilled and its callback has finished executing. Only
when the west executor is driven to execute that task, the west Future
will be completed and its callbacks called.
In summary, both east and west need to have plans to drive their
executors, or nothing will actually happen. When the executors are driven,
then everything flows. */
template <class T>
Future<T> gate(std::function<Future<T>()>&& fn) {
Promise<T> pWest;
Future<T> fWest = pWest.getFuture();
gate(std::move(fn), std::move(pWest));
return fWest;
}
/**
* This version of gate is to support use cases where the calling thread is
* not the west thread. Here is an example use case.
*
* Promise<T> pWest;
* Future<T> fWest = pWest.getFuture();
*
* // Set up callbacks for west from a thread that is not west.
* fWest.then(...).then(...);
*
* threadGate.gate(..., std::move(pWest));
*
* This function assumes that it is safe to call addEast from a thread that is
* not the west thread.
*/
template <class T>
void gate(std::function<Future<T>()>&& fn,
Promise<T>&& p) {
folly::MoveWrapper<Promise<T>> pWest(std::move(p));
folly::MoveWrapper<std::function<Future<T>()>> fnm(std::move(fn));
this->addEast([pWest, fnm, this]() mutable {
(*fnm)().then([pWest, this](Try<T>&& t) mutable {
folly::MoveWrapper<Try<T>> tm(std::move(t));
this->addWest([pWest, tm]() mutable {
pWest->fulfilTry(std::move(*tm));
});
});
});
}
/**
If your workflow calls for synchronizing with a
west Future, then you may call waitFor, but if your west thread is
event-driven you will probably not need to call waitFor.
In order for waitFor to behave properly, you must ensure that the Waiter's
makeProgress method causes some progress to be made on the west thread,
i.e. drives the west executor either directly or indirectly.
(Naturally, progress needs to be made on the east thread as well. i.e. the
east executor is driven, the asynchronous operation happens, and its
Promise is fulfilled. It is likely that none of this concerns the consumer
of waitFor.)
This is the only function that uses the Waiter. It is never called
internally. Therefore, if you never use waitFor you can safely provide a
DummyWaiter.
*/
template <class T>
void waitFor(Future<T> const& f) {
while (!f.isReady()) {
this->makeProgress();
}
}
template <class T>
typename std::add_lvalue_reference<T>::type
value(Future<T>& f) {
waitFor<T>(f);
return f.value();
}
template <class T>
typename std::add_lvalue_reference<const T>::type
value(Future<T> const& f) {
waitFor<T>(f);
return f.value();
}
virtual void addEast(std::function<void()>&&) = 0;
virtual void addWest(std::function<void()>&&) = 0;
virtual void makeProgress();
};
}} // namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <thread>
#include <future>
#include <folly/wangle/Executor.h>
#include <folly/wangle/ManualExecutor.h>
#include <folly/wangle/ThreadGate.h>
#include <folly/wangle/GenericThreadGate.h>
using namespace folly::wangle;
using std::make_shared;
using std::shared_ptr;
using std::thread;
using std::vector;
struct ManualWaiter {
explicit ManualWaiter(shared_ptr<ManualExecutor> ex) : ex(ex) {}
void makeProgress() {
ex->wait();
ex->run();
}
shared_ptr<ManualExecutor> ex;
};
struct GenericThreadGateFixture : public testing::Test {
GenericThreadGateFixture() :
westExecutor(new ManualExecutor),
eastExecutor(new ManualExecutor),
waiter(new ManualWaiter(westExecutor)),
tg(westExecutor, eastExecutor, waiter),
done(false)
{
t = thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
eastWaiter.makeProgress();
});
}
~GenericThreadGateFixture() {
done = true;
tg.gate<void>([] { return makeFuture(); });
t.join();
}
shared_ptr<ManualExecutor> westExecutor;
shared_ptr<ManualExecutor> eastExecutor;
shared_ptr<ManualWaiter> waiter;
GenericThreadGate<
shared_ptr<ManualExecutor>,
shared_ptr<ManualExecutor>,
shared_ptr<ManualWaiter>> tg;
bool done;
thread t;
};
TEST_F(GenericThreadGateFixture, gate_and_wait) {
auto f = tg.gate<void>([] { return makeFuture(); });
EXPECT_FALSE(f.isReady());
tg.waitFor(f);
EXPECT_TRUE(f.isReady());
}
TEST_F(GenericThreadGateFixture, gate_many) {
vector<Future<void>> fs;
int n = 10;
for (int i = 0; i < n; i++)
fs.push_back(tg.gate<void>([&] { return makeFuture(); }));
for (auto& f : fs)
EXPECT_FALSE(f.isReady());
auto all = whenAll(fs.begin(), fs.end());
tg.waitFor(all);
}
TEST_F(GenericThreadGateFixture, gate_alternating) {
vector<Promise<void>> ps(10);
vector<Future<void>> fs;
size_t count = 0;
for (auto& p : ps) {
auto* pp = &p;
auto f = tg.gate<void>([=] { return pp->getFuture(); });
// abuse the thread gate to do our dirty work in the other thread
tg.gate<void>([=] { pp->setValue(); return makeFuture(); });
fs.push_back(f.then([&](Try<void>&&) { count++; }));
}
for (auto& f : fs)
EXPECT_FALSE(f.isReady());
EXPECT_EQ(0, count);
auto all = whenAll(fs.begin(), fs.end());
tg.waitFor(all);
EXPECT_EQ(ps.size(), count);
}
TEST(GenericThreadGate, noWaiter) {
auto west = make_shared<ManualExecutor>();
auto east = make_shared<ManualExecutor>();
Promise<void> p;
auto dummyFuture = p.getFuture();
GenericThreadGate<ManualExecutor*, ManualExecutor*>
tg(west.get(), east.get());
EXPECT_THROW(tg.waitFor(dummyFuture), std::logic_error);
}
TEST_F(GenericThreadGateFixture, gate_with_promise) {
Promise<int> p;
auto westId = std::this_thread::get_id();
bool westThenCalled = false;
auto f = p.getFuture().then(
[westId, &westThenCalled](Try<int>&& t) {
EXPECT_EQ(t.value(), 1);
EXPECT_EQ(std::this_thread::get_id(), westId);
westThenCalled = true;
return t.value();
});
bool eastPromiseMade = false;
auto thread = std::thread([&p, &eastPromiseMade, this]() {
EXPECT_NE(t.get_id(), std::this_thread::get_id());
// South thread != west thread. p gets set in west thread.
tg.gate<int>([&p, &eastPromiseMade, this] {
EXPECT_EQ(t.get_id(), std::this_thread::get_id());
Promise<int> eastPromise;
auto eastFuture = eastPromise.getFuture();
eastPromise.setValue(1);
eastPromiseMade = true;
return eastFuture;
},
std::move(p));
});
tg.waitFor(f);
EXPECT_TRUE(westThenCalled);
EXPECT_TRUE(eastPromiseMade);
EXPECT_EQ(f.value(), 1);
thread.join();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment