Commit d0cb4828 authored by Dave Watson's avatar Dave Watson

Move wangle to folly

Summary:
* git mv
* codemod facebook::wangle folly::wangle
* Change 'runnable' to be a base class in wangle instead of thrift

Justification:
* std::future doesn't have then, whenall, etc.
* boost::future doesn't support executors

@override-unit-failures

Test Plan: contbuild and pray

Reviewed By: hans@fb.com

FB internal diff: D1185194
parent ab5a66e2
......@@ -125,7 +125,22 @@ nobase_follyinclude_HEADERS = \
Unicode.h \
Uri.h \
Uri-inl.h \
Varint.h
Varint.h \
wangle/Executor.h \
wangle/Future-inl.h \
wangle/Future.h \
wangle/GenericThreadGate.h \
wangle/InlineExecutor.h \
wangle/Later-inl.h \
wangle/Later.h \
wangle/ManualExecutor.h \
wangle/Promise-inl.h \
wangle/Promise.h \
wangle/ThreadGate.h \
wangle/Try-inl.h \
wangle/Try.h \
wangle/WangleException.h \
wangle/detail.h
FormatTables.cpp: build/generate_format_tables.py
build/generate_format_tables.py
......@@ -168,7 +183,10 @@ libfolly_la_SOURCES = \
ThreadCachedArena.cpp \
TimeoutQueue.cpp \
Unicode.cpp \
Uri.cpp
Uri.cpp \
wangle/InlineExecutor.cpp \
wangle/ManualExecutor.cpp \
wangle/ThreadGate.cpp
if !HAVE_LINUX
nobase_follyinclude_HEADERS += detail/Clock.h
......@@ -196,4 +214,3 @@ libfollybenchmark_la_LIBADD = libfolly.la
libfollytimeout_queue_la_SOURCES = TimeoutQueue.cpp
libfollytimeout_queue_la_LIBADD = libfolly.la
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <boost/noncopyable.hpp>
#include <functional>
namespace folly { namespace wangle {
class Executor : boost::noncopyable {
public:
virtual ~Executor() = default;
virtual void add(std::function<void()>&&) = 0;
};
}}
This diff is collapsed.
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <exception>
#include <functional>
#include <memory>
#include <type_traits>
#include "folly/MoveWrapper.h"
#include "Promise.h"
#include "Try.h"
namespace folly { namespace wangle {
template <typename T> struct isFuture;
template <class T>
class Future {
public:
typedef T value_type;
// not copyable
Future(Future const&) = delete;
Future& operator=(Future const&) = delete;
// movable
Future(Future&&);
Future& operator=(Future&&);
~Future();
/** Return the reference to result. Should not be called if !isReady().
Will rethrow the exception if an exception has been
captured.
This function is not thread safe - the returned Future can only
be executed from the thread that the executor runs it in.
See below for a thread safe version
*/
typename std::add_lvalue_reference<T>::type
value();
typename std::add_lvalue_reference<const T>::type
value() const;
template <typename Executor>
Future<T> executeWithSameThread(Executor* executor);
/**
Thread-safe version of executeWith
Since an executor would likely start executing the Future chain
right away, it would be a race condition to call:
Future.executeWith(...).then(...), as there would be race
condition between the then and the running Future.
Instead, you may pass in a Promise so that we can set up
the rest of the chain in advance, without any racey
modifications of the continuation
*/
template <typename Executor>
void executeWith(Executor* executor, Promise<T>&& cont_promise);
/** True when the result (or exception) is ready. value() will not block
when this returns true. */
bool isReady() const;
/** Wait until the result (or exception) is ready. Once this returns,
value() will not block, and isReady() will return true.
XXX This implementation is simplistic and inefficient, but it does work
and a fully intelligent implementation is coming down the pipe.
*/
void wait() const {
while (!isReady()) {
// spin
std::this_thread::yield();
}
}
Try<T>& valueTry();
/** When this Future has completed, execute func which is a function that
takes a Try<T>&&. A Future for the return type of func is
returned. e.g.
Future<string> f2 = f1.then([](Try<T>&&) { return string("foo"); });
The functor given may call value() without blocking, which may rethrow if
this has captured an exception. If func throws, the exception will be
captured in the Future that is returned.
*/
/* n3428 has then(scheduler&, F&&), we might want to reorganize to use
similar API. or maybe not */
template <class F>
typename std::enable_if<
!isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Future<typename std::result_of<F(Try<T>&&)>::type> >::type
then(F&& func);
template <class F>
typename std::enable_if<
isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
then(F&& func);
/** Use this method on the Future when we don't really care about the
returned value and want to convert the Future<T> to a Future<void>
Convenience function
*/
Future<void> then();
template <class F>
void setContinuation(F&& func);
private:
/* Eventually this may not be a shared_ptr, but something similar without
expensive thread-safety. */
typedef detail::FutureObject<T>* objPtr;
// shared state object
objPtr obj_;
explicit
Future(objPtr obj) : obj_(obj) {}
void throwIfInvalid() const;
friend class Promise<T>;
};
/** Make a completed Future by moving in a value. e.g.
auto f = makeFuture(string("foo"));
*/
template <class T>
Future<typename std::decay<T>::type> makeFuture(T&& t);
/** Make a completed void Future. */
Future<void> makeFuture();
/** Make a completed Future by executing a function. If the function throws
we capture the exception, otherwise we capture the result. */
template <class F>
auto makeFutureTry(
F&& func,
typename std::enable_if<
!std::is_reference<F>::value, bool>::type sdf = false)
-> Future<decltype(func())>;
template <class F>
auto makeFutureTry(
F const& func)
-> Future<decltype(func())>;
/** Make a completed (error) Future from an exception_ptr. Because the type
can't be inferred you have to give it, e.g.
auto f = makeFuture<string>(std::current_exception());
*/
template <class T>
Future<T> makeFuture(std::exception_ptr const& e);
/** Make a Future from an exception type E that can be passed to
std::make_exception_ptr(). */
template <class T, class E>
typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::type
makeFuture(E const& e);
/** When all the input Futures complete, the returned Future will complete.
Errors do not cause early termination; this Future will always succeed
after all its Futures have finished (whether successfully or with an
error).
The Futures are moved in, so your copies are invalid. If you need to
chain further from these Futures, use the variant with an output iterator.
This function is thread-safe for Futures running on different threads.
The return type for Future<T> input is a Future<vector<Try<T>>>
*/
template <class InputIterator>
Future<std::vector<Try<
typename std::iterator_traits<InputIterator>::value_type::value_type>>>
whenAll(InputIterator first, InputIterator last);
/** This version takes a varying number of Futures instead of an iterator.
The return type for (Future<T1>, Future<T2>, ...) input
is a Future<tuple<Try<T1>, Try<T2>, ...>>.
*/
template <typename... Fs>
typename detail::VariadicContext<typename Fs::value_type...>::type
whenAll(Fs&... fs);
/** The result is a pair of the index of the first Future to complete and
the Try. If multiple Futures complete at the same time (or are already
complete when passed in), the "winner" is chosen non-deterministically.
This function is thread-safe for Futures running on different threads.
*/
template <class InputIterator>
Future<std::pair<
size_t,
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
whenAny(InputIterator first, InputIterator last);
/** when n Futures have completed, the Future completes with a vector of
the index and Try of those n Futures (the indices refer to the original
order, but the result vector will be in an arbitrary order)
Not thread safe.
*/
template <class InputIterator>
Future<std::vector<std::pair<
size_t,
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
whenN(InputIterator first, InputIterator last, size_t n);
}} // folly::wangle
#include "Future-inl.h"
/*
TODO
I haven't included a Future<T&> specialization because I don't forsee us
using it, however it is not difficult to add when needed. Refer to
Future<void> for guidance. std::Future and boost::Future code would also be
instructive.
I think that this might be a good candidate for folly, once it has baked for
awhile.
*/
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "ThreadGate.h"
#include "Executor.h"
#include <type_traits>
namespace folly { namespace wangle {
template <
class WestExecutorPtr = Executor*,
class EastExecutorPtr = Executor*,
class WaiterPtr = void*>
class GenericThreadGate : public ThreadGate {
public:
/**
EastExecutor and WestExecutor respond threadsafely to
`add(std::function<void()>&&)`
Waiter responds to `makeProgress()`. It may block, as long as progress
will be made on the west front.
*/
GenericThreadGate(WestExecutorPtr west,
EastExecutorPtr east,
WaiterPtr waiter = nullptr) :
westExecutor(west),
eastExecutor(east),
waiter(waiter)
{}
void addWest(std::function<void()>&& fn) { westExecutor->add(std::move(fn)); }
void addEast(std::function<void()>&& fn) { eastExecutor->add(std::move(fn)); }
virtual void makeProgress() {
makeProgress_(std::is_same<WaiterPtr, void*>());
}
WestExecutorPtr westExecutor;
EastExecutorPtr eastExecutor;
WaiterPtr waiter;
private:
void makeProgress_(std::true_type const&) {
throw std::logic_error("No waiter.");
}
void makeProgress_(std::false_type const&) {
waiter->makeProgress();
}
};
}} // executor
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "folly/wangle/Executor.h"
namespace folly { namespace wangle {
class InlineExecutor : public Executor {
public:
void add(std::function<void()>&& f) override {
f();
}
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "folly/wangle/Executor.h"
#include "folly/wangle/Future.h"
#include "folly/Optional.h"
namespace folly { namespace wangle {
template <typename T>
struct isLater {
static const bool value = false;
};
template <typename T>
struct isLater<Later<T> > {
static const bool value = true;
};
template <typename T>
struct isLaterOrFuture {
static const bool value = false;
};
template <typename T>
struct isLaterOrFuture<Later<T>> {
static const bool value = true;
};
template <typename T>
struct isLaterOrFuture<Future<T>> {
static const bool value = true;
};
template <typename T>
template <class U, class Unused, class Unused2>
Later<T>::Later() {
future_ = starter_.getFuture();
}
template <typename T>
Later<T>::Later(Promise<void>&& starter)
: starter_(std::forward<Promise<void>>(starter)) { }
template <class T>
template <class U, class Unused, class Unused2>
Later<T>::Later(U&& input) {
folly::MoveWrapper<Promise<U>> promise;
folly::MoveWrapper<U> inputm(std::forward<U>(input));
future_ = promise->getFuture();
starter_.getFuture().then([=](Try<void>&& t) mutable {
promise->setValue(std::move(*inputm));
});
}
template <class T>
template <class F>
typename std::enable_if<
!isLaterOrFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type> >::type
Later<T>::then(F&& fn) {
typedef typename std::result_of<F(Try<T>&&)>::type B;
Later<B> later(std::move(starter_));
later.future_ = future_->then(std::forward<F>(fn));
return later;
}
template <class T>
template <class F>
typename std::enable_if<
isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
Later<T>::then(F&& fn) {
typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
Later<B> later(std::move(starter_));
later.future_ = future_->then(std::move(fn));
return later;
}
template <class T>
template <class F>
typename std::enable_if<
isLater<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
Later<T>::then(F&& fn) {
typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
folly::MoveWrapper<Promise<B>> promise;
folly::MoveWrapper<F> fnm(std::move(fn));
Later<B> later(std::move(starter_));
later.future_ = promise->getFuture();
future_->then([=](Try<T>&& t) mutable {
(*fnm)(std::move(t))
.then([=](Try<B>&& t2) mutable {
promise->fulfilTry(std::move(t2));
})
.launch();
});
return later;
}
template <class T>
Later<T> Later<T>::via(Executor* executor) {
Promise<T> promise;
Later<T> later(std::move(starter_));
later.future_ = promise.getFuture();
future_->executeWith(executor, std::move(promise));
return later;
}
template <class T>
Future<T> Later<T>::launch() {
starter_.setValue();
return std::move(*future_);
}
template <class T>
void Later<T>::fireAndForget() {
future_->setContinuation([] (Try<T>&& t) {}); // detach
starter_.setValue();
}
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "folly/wangle/Executor.h"
#include "folly/wangle/Future.h"
#include "folly/Optional.h"
namespace folly { namespace wangle {
template <typename T> struct isLaterOrFuture;
template <typename T> struct isLater;
/*
* Since wangle primitives (promise/future) are not thread safe, it is difficult
* to build complex asynchronous workflows. A Later allows you to build such a
* workflow before actually launching it so that continuations can be set in a
* threadsafe manner.
*
* The interface to add additional work is the same as future: a then() method
* that can take either a type T, a Future<T>, or a Later<T>
*
* Thread transitions are done by using executors and calling the via() method.
*
* Here is an example of a workflow:
*
* Later<ClientRequest> later(std::move(request));
*
* auto future = later.
* .via(cpuExecutor)
* .then([=](Try<ClientRequest>&& t) { return doCpuWork(t.value()); })
* .via(diskExecutor)
* .then([=](Try<CpuResponse>&& t) { return doDiskWork(t.value()); })
* .via(serverExecutor)
* .then([=]Try<DiskResponse>&& t) { return sendClientResponse(t.value()); })
* .launch();
*
* Although this workflow traverses many threads, we are able to string
* continuations together in a threadsafe manner.
*
* Laters can also be used to wrap preexisting asynchronous modules that were
* not built with wangle in mind. You can create a Later with a function that
* takes a callback as input. The function will not actually be called until
* launch(), allowing you to string then() statements on top of the callback.
*/
template <class T>
class Later {
public:
typedef T value_type;
template <class U = void,
class = typename std::enable_if<std::is_void<U>::value>::type,
class = typename std::enable_if<std::is_same<T, U>::value>::type>
Later();
template <class U,
class = typename std::enable_if<!std::is_void<U>::value>::type,
class = typename std::enable_if<std::is_same<T, U>::value>::type>
explicit Later(U&& input);
/*
* then() adds additional work to the end of the workflow. If the lambda
* provided to then() returns a future, that future must be fulfilled in the
* same thread of the last set executor (either at constructor or from a call
* to via()).
*/
template <class F>
typename std::enable_if<
!isLaterOrFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type> >::type
then(F&& fn);
template <class F>
typename std::enable_if<
isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
then(F&& fn);
/*
* If the function passed to then() returns a Later<T>, calls to then() will
* be chained to the new Later before launching the new Later.
*
* This can be used to build asynchronous modules that can be called from a
* user thread and completed in a callback thread. Callbacks can be set up
* ahead of time without thread safety issues.
*
* Using the Later(std::function<void(std::function<void(T&&)>)>&& fn)
* constructor, you can wrap existing asynchronous modules with a Later and
* can chain it to wangle asynchronous workflows via this call.
*/
template <class F>
typename std::enable_if<
isLater<typename std::result_of<F(Try<T>&&)>::type>::value,
Later<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
then(F&& fn);
/*
* Resets the executor - all then() calls made after the call to via() will be
* made in the new executor.
*/
Later<T> via(Executor* executor);
/*
* Starts the workflow. The function provided in the constructor will be
* called in the executor provided in the constructor. All proximate then()
* calls will be made, potentially changing threads if a via() call is made.
* The future returned will be fulfilled in the last executor.
*
* Thread safety issues of Futures still apply. If you want to wait on the
* Future, it must be done in the thread that will fulfill it. If you do not
* plan to use the result of the Future, use fireAndForget()
*/
Future<T> launch();
/*
* Same as launch, only no Future is returned. This guarantees thread safe
* cleanup of the internal Futures, even if the Later completes in a different
* thread than the thread that calls fireAndForget().
*/
void fireAndForget();
private:
Promise<void> starter_;
folly::Optional<Future<T>> future_;
struct hide { };
explicit Later(Promise<void>&& starter);
template <class U>
friend class Later;
};
}}
#include "Later-inl.h"
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ManualExecutor.h"
#include <string.h>
#include <stdexcept>
namespace folly { namespace wangle {
ManualExecutor::ManualExecutor() {
if (sem_init(&sem_, 0, 0) == -1) {
throw std::runtime_error(std::string("sem_init: ") + strerror(errno));
}
}
void ManualExecutor::add(std::function<void()>&& callback) {
std::lock_guard<std::mutex> lock(lock_);
runnables_.push(callback);
sem_post(&sem_);
}
size_t ManualExecutor::run() {
size_t count;
size_t n;
std::function<void()> runnable;
{
std::lock_guard<std::mutex> lock(lock_);
n = runnables_.size();
}
for (count = 0; count < n; count++) {
{
std::lock_guard<std::mutex> lock(lock_);
if (runnables_.empty()) {
break;
}
// Balance the semaphore so it doesn't grow without bound
// if nobody is calling wait().
// This may fail (with EAGAIN), that's fine.
sem_trywait(&sem_);
runnable = std::move(runnables_.front());
runnables_.pop();
}
runnable();
}
return count;
}
void ManualExecutor::wait() {
while (true) {
{
std::lock_guard<std::mutex> lock(lock_);
if (!runnables_.empty())
break;
}
auto ret = sem_wait(&sem_);
if (ret == 0) {
break;
}
if (errno != EINVAL) {
throw std::runtime_error(std::string("sem_wait: ") + strerror(errno));
}
}
}
}} // namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "folly/wangle/Executor.h"
#include <semaphore.h>
#include <memory>
#include <mutex>
#include <queue>
namespace folly { namespace wangle {
class ManualExecutor : public Executor {
public:
ManualExecutor();
void add(std::function<void()>&&) override;
/// Do work. Returns the number of runnables that were executed (maybe 0).
/// Non-blocking.
size_t run();
/// Wait for work to do.
void wait();
/// Wait for work to do, and do it.
void makeProgress() {
wait();
run();
}
private:
std::mutex lock_;
std::queue<std::function<void()>> runnables_;
sem_t sem_;
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <thread>
#include "WangleException.h"
#include "detail.h"
namespace folly { namespace wangle {
template <class T>
Promise<T>::Promise() : retrieved_(false), obj_(new detail::FutureObject<T>())
{}
template <class T>
Promise<T>::Promise(Promise<T>&& other) :
retrieved_(other.retrieved_), obj_(other.obj_) {
other.obj_ = nullptr;
}
template <class T>
Promise<T>& Promise<T>::operator=(Promise<T>&& other) {
std::swap(obj_, other.obj_);
std::swap(retrieved_, other.retrieved_);
return *this;
}
template <class T>
void Promise<T>::throwIfFulfilled() {
if (!obj_)
throw PromiseAlreadySatisfied();
}
template <class T>
void Promise<T>::throwIfRetrieved() {
if (retrieved_)
throw FutureAlreadyRetrieved();
}
template <class T>
Promise<T>::~Promise() {
if (obj_) {
setException(BrokenPromise());
}
}
template <class T>
Future<T> Promise<T>::getFuture() {
throwIfRetrieved();
throwIfFulfilled();
retrieved_ = true;
return Future<T>(obj_);
}
template <class T>
template <class E>
void Promise<T>::setException(E const& e) {
throwIfFulfilled();
setException(std::make_exception_ptr<E>(e));
}
template <class T>
void Promise<T>::setException(std::exception_ptr const& e) {
throwIfFulfilled();
obj_->setException(e);
if (!retrieved_) {
delete obj_;
}
obj_ = nullptr;
}
template <class T>
void Promise<T>::fulfilTry(Try<T>&& t) {
throwIfFulfilled();
obj_->fulfil(std::move(t));
if (!retrieved_) {
delete obj_;
}
obj_ = nullptr;
}
template <class T>
template <class M>
void Promise<T>::setValue(M&& v) {
static_assert(!std::is_same<T, void>::value,
"Use setValue() instead");
throwIfFulfilled();
obj_->fulfil(Try<T>(std::forward<M>(v)));
if (!retrieved_) {
delete obj_;
}
obj_ = nullptr;
}
template <class T>
void Promise<T>::setValue() {
static_assert(std::is_same<T, void>::value,
"Use setValue(value) instead");
throwIfFulfilled();
obj_->fulfil(Try<void>());
if (!retrieved_) {
delete obj_;
}
obj_ = nullptr;
}
template <class T>
template <class F>
void Promise<T>::fulfil(const F& func) {
fulfilHelper(func);
}
template <class T>
template <class F>
typename std::enable_if<
std::is_convertible<typename std::result_of<F()>::type, T>::value &&
!std::is_same<T, void>::value>::type
inline Promise<T>::fulfilHelper(const F& func) {
throwIfFulfilled();
try {
setValue(func());
} catch (...) {
setException(std::current_exception());
}
}
template <class T>
template <class F>
typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value &&
std::is_same<T, void>::value>::type
inline Promise<T>::fulfilHelper(const F& func) {
throwIfFulfilled();
try {
func();
setValue();
} catch (...) {
setException(std::current_exception());
}
}
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "Try.h"
#include "Future.h"
namespace folly { namespace wangle {
// forward declaration
template <class T> class Future;
template <class T>
class Promise {
public:
Promise();
~Promise();
// not copyable
Promise(Promise const&) = delete;
Promise& operator=(Promise const&) = delete;
// movable
Promise(Promise<T>&&);
Promise& operator=(Promise<T>&&);
/** Return a Future tied to the shared state. This can be called only
once, thereafter Future already retrieved exception will be raised. */
Future<T> getFuture();
/** Fulfil the Promise with an exception_ptr, e.g.
try {
...
} catch (...) {
p.setException(std::current_exception());
}
*/
void setException(std::exception_ptr const&);
/** Fulfil the Promise with an exception type E, which can be passed to
std::make_exception_ptr(). Useful for originating exceptions. If you
caught an exception the exception_ptr form is more appropriate.
*/
template <class E> void setException(E const&);
/** Fulfil this Promise (only for Promise<void>) */
void setValue();
/** Set the value (use perfect forwarding for both move and copy) */
template <class M>
void setValue(M&& value);
void fulfilTry(Try<T>&& t);
/** Fulfil this Promise with the result of a function that takes no
arguments and returns something implicitly convertible to T.
Captures exceptions. e.g.
p.fulfil([] { do something that may throw; return a T; });
*/
template <class F>
void fulfil(const F& func);
private:
typedef typename Future<T>::objPtr objPtr;
// Whether the Future has been retrieved (a one-time operation).
bool retrieved_;
// shared state object
objPtr obj_;
void throwIfFulfilled();
void throwIfRetrieved();
template <class F>
typename std::enable_if<
std::is_convertible<typename std::result_of<F()>::type, T>::value &&
!std::is_same<T, void>::value>::type
fulfilHelper(const F& func);
template <class F>
typename std::enable_if<
std::is_same<typename std::result_of<F()>::type, void>::value &&
std::is_same<T, void>::value>::type
fulfilHelper(const F& func);
};
}}
#include "Promise-inl.h"
Please see https://our.intern.facebook.com/intern/dex/wangle/
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ThreadGate.h"
#include <stdexcept>
namespace folly { namespace wangle {
void ThreadGate::makeProgress()
{
throw std::logic_error("This ThreadGate doesn't know how to "
"make progress.");
}
}} // namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include "Future.h"
namespace folly { namespace wangle {
/**
Yo dawg, I heard you like asynchrony so I put asynchrony in your asynchronous
framework.
Wangle's futures and promises are not thread safe. Counterintuitive as this
may seem at first, this is very intentional. Making futures and promises
threadsafe drastically reduces their performance.
On the other hand, an asynchronous framework isn't much use if you can't do
asynchronous things in other threads. So we use the ThreadGate strategy to
decouple the threads and their futures with a form of message passing.
There are two actors, the east thread which does the asynchronous operation
(the server) and the west thread that wants the asynchronous operation done
(the client).
The client calls gate<T>(fn), which returns a Future<T>. Practically speaking
the returned Future<T> is the same as the Future<T> returned by fn. But
there are actually two futures involved - the original Future which will be
generated by fn (called the east Future), and the Future actually returned
by gate<T>(fn) (called the west Future).
These two futures are decoupled, and although the fulfilment of the east
Future eventually causes fulfilment of the west Future, those fulfilments
happen in their own threads.
In order to make and use a ThreadGate, you need to provide a strategy for
executing code in the east and west threads. These strategies may be
different. The only requirement is a threadsafe method
`void add(function<void()>&&)`. You may find the executors in
Executor.h handy, but ensure that you are using them
threadsafely.
In order for your ThreadGate to do anything, you need to drive those
executors somehow. An event loop is a natural fit. A thread pool might be
made to work. You could use a busy loop to make a very expensive space
heater. 0MQ would be pleasant.
Another pattern supported by the ThreadGate is the single-thread pattern. In
this pattern, non-blocking I/O drives the asynchronous operation, and
futures are fulfilled in an event loop callback. In this scenario,
ThreadGate is largely superfluous, and the executors would likely just
execute code immediately and inline (and therefore not need to be driven, or
threadsafe). But a Waiter strategy that makes progress by driving the event
loop one iteration would allow for gate-and-wait code which is agnostic to
the small detail that everything happens in one thread. It would also make
Future change toward a multithreaded architecture easier, as you need only
change the components of the ThreadGate which your client code is already
using.
*/
class ThreadGate {
public:
virtual ~ThreadGate() {}
/**
Returns a Future that will be fulfilled after the Future that will be
returned by fn() has been fulfilled, with the same value or exception
(moved).
There's a lot of nuance in that sentence. Let's break it down.
fn kicks off the asynchronous operation (makes the east Promise), and must
be executed in the east thread because the east thread is where the east
Promise will be fulfilled. Since gate is being called from the west
thread, we must gate fn using the east executor. fn is not executed
immediately, it is queued up and will be executed by the east thread as it
drives the executor.
We create the west Promise and return its Future.
When the east thread executes its task, fn is called and the resulting
Future gets a callback that will gate another task back to the west.
Sometime later, the asynchronous operation completes and the east Promise
is fulfilled. Then the east Future executes its callback, which adds a
task to the west executor that task is to fulfil the west Promise with the
same Try<T>, and it will execute in the west thread.
At this point, the west Future is still unfulfilled, even though the east
Future has been fulfilled and its callback has finished executing. Only
when the west executor is driven to execute that task, the west Future
will be completed and its callbacks called.
In summary, both east and west need to have plans to drive their
executors, or nothing will actually happen. When the executors are driven,
then everything flows. */
template <class T>
Future<T> gate(std::function<Future<T>()>&& fn) {
Promise<T> pWest;
Future<T> fWest = pWest.getFuture();
gate(std::move(fn), std::move(pWest));
return fWest;
}
/**
* This version of gate is to support use cases where the calling thread is
* not the west thread. Here is an example use case.
*
* Promise<T> pWest;
* Future<T> fWest = pWest.getFuture();
*
* // Set up callbacks for west from a thread that is not west.
* fWest.then(...).then(...);
*
* threadGate.gate(..., std::move(pWest));
*
* This function assumes that it is safe to call addEast from a thread that is
* not the west thread.
*/
template <class T>
void gate(std::function<Future<T>()>&& fn,
Promise<T>&& p) {
folly::MoveWrapper<Promise<T>> pWest(std::move(p));
folly::MoveWrapper<std::function<Future<T>()>> fnm(std::move(fn));
this->addEast([pWest, fnm, this]() mutable {
(*fnm)().then([pWest, this](Try<T>&& t) mutable {
folly::MoveWrapper<Try<T>> tm(std::move(t));
this->addWest([pWest, tm]() mutable {
pWest->fulfilTry(std::move(*tm));
});
});
});
}
/**
If your workflow calls for synchronizing with a
west Future, then you may call waitFor, but if your west thread is
event-driven you will probably not need to call waitFor.
In order for waitFor to behave properly, you must ensure that the Waiter's
makeProgress method causes some progress to be made on the west thread,
i.e. drives the west executor either directly or indirectly.
(Naturally, progress needs to be made on the east thread as well. i.e. the
east executor is driven, the asynchronous operation happens, and its
Promise is fulfilled. It is likely that none of this concerns the consumer
of waitFor.)
This is the only function that uses the Waiter. It is never called
internally. Therefore, if you never use waitFor you can safely provide a
DummyWaiter.
*/
template <class T>
void waitFor(Future<T> const& f) {
while (!f.isReady()) {
this->makeProgress();
}
}
template <class T>
typename std::add_lvalue_reference<T>::type
value(Future<T>& f) {
waitFor<T>(f);
return f.value();
}
template <class T>
typename std::add_lvalue_reference<const T>::type
value(Future<T> const& f) {
waitFor<T>(f);
return f.value();
}
virtual void addEast(std::function<void()>&&) = 0;
virtual void addWest(std::function<void()>&&) = 0;
virtual void makeProgress();
};
}} // namespace
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <stdexcept>
#include "WangleException.h"
namespace folly { namespace wangle {
template <class T>
Try<T>::Try(Try<T>&& t) : contains_(t.contains_) {
if (contains_ == VALUE) {
new (&value_)T(std::move(t.value_));
} else if (contains_ == EXCEPTION) {
new (&e_)std::exception_ptr(t.e_);
}
}
template <class T>
Try<T>& Try<T>::operator=(Try<T>&& t) {
this->~Try();
contains_ = t.contains_;
if (contains_ == VALUE) {
new (&value_)T(std::move(t.value_));
} else if (contains_ == EXCEPTION) {
new (&e_)std::exception_ptr(t.e_);
}
return *this;
}
template <class T>
Try<T>::~Try() {
if (contains_ == VALUE) {
value_.~T();
} else if (contains_ == EXCEPTION) {
e_.~exception_ptr();
}
}
template <class T>
T& Try<T>::value() {
throwIfFailed();
return value_;
}
template <class T>
const T& Try<T>::value() const {
throwIfFailed();
return value_;
}
template <class T>
void Try<T>::throwIfFailed() const {
if (contains_ != VALUE) {
if (contains_ == EXCEPTION) {
std::rethrow_exception(e_);
} else {
throw UsingUninitializedTry();
}
}
}
void Try<void>::throwIfFailed() const {
if (!hasValue_) {
std::rethrow_exception(e_);
}
}
template <typename T>
inline T moveFromTry(wangle::Try<T>&& t) {
return std::move(t.value());
}
inline void moveFromTry(wangle::Try<void>&& t) {
return t.value();
}
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
template <class T>
class Try {
static_assert(!std::is_reference<T>::value,
"Try may not be used with reference types");
enum Contains {
VALUE,
EXCEPTION,
NOTHING,
};
public:
typedef T element_type;
Try() : contains_(NOTHING) {}
explicit Try(const T& v) : contains_(VALUE), value_(v) {}
explicit Try(T&& v) : contains_(VALUE), value_(std::move(v)) {}
explicit Try(std::exception_ptr e) : contains_(EXCEPTION), e_(e) {}
// move
Try(Try<T>&& t);
Try& operator=(Try<T>&& t);
// no copy
Try(const Try<T>& t) = delete;
Try& operator=(const Try<T>& t) = delete;
~Try();
T& value();
const T& value() const;
void throwIfFailed() const;
const T& operator*() const { return value(); }
T& operator*() { return value(); }
const T* operator->() const { return &value(); }
T* operator->() { return &value(); }
bool hasValue() const { return contains_ == VALUE; }
bool hasException() const { return contains_ == EXCEPTION; }
private:
Contains contains_;
union {
T value_;
std::exception_ptr e_;
};
};
template <>
class Try<void> {
public:
Try() : hasValue_(true) {}
explicit Try(std::exception_ptr e) : hasValue_(false), e_(e) {}
void value() const { throwIfFailed(); }
void operator*() const { return value(); }
inline void throwIfFailed() const;
bool hasValue() const { return hasValue_; }
bool hasException() const { return !hasValue_; }
private:
bool hasValue_;
std::exception_ptr e_;
};
/**
* Extracts value from try and returns it. Throws if try contained an exception.
*/
template <typename T>
T moveFromTry(wangle::Try<T>&& t);
/**
* Throws if try contained an exception.
*/
void moveFromTry(wangle::Try<void>&& t);
}}
#include "Try-inl.h"
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <exception>
namespace folly { namespace wangle {
class WangleException : public std::exception {
public:
explicit WangleException(std::string message_arg)
: message(message_arg) {}
~WangleException() throw(){}
virtual const char *what() const throw() {
return message.c_str();
}
bool operator==(const WangleException &other) const{
return other.message == this->message;
}
bool operator!=(const WangleException &other) const{
return !(*this == other);
}
protected:
std::string message;
};
class BrokenPromise : public WangleException {
public:
explicit BrokenPromise() :
WangleException("Broken promise") { }
};
class NoState : public WangleException {
public:
explicit NoState() : WangleException("No state") { }
};
class PromiseAlreadySatisfied : public WangleException {
public:
explicit PromiseAlreadySatisfied() :
WangleException("Promise already satisfied") { }
};
class FutureAlreadyRetrieved : public WangleException {
public:
explicit FutureAlreadyRetrieved () :
WangleException("Future already retrieved") { }
};
class UsingUninitializedTry : public WangleException {
public:
explicit UsingUninitializedTry() :
WangleException("Using unitialized try") { }
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Optional.h>
#include <stdexcept>
#include <atomic>
#include "Try.h"
#include "Promise.h"
#include "Future.h"
namespace folly { namespace wangle { namespace detail {
/** The shared state object for Future and Promise. */
template<typename T>
class FutureObject {
public:
FutureObject() = default;
// not copyable
FutureObject(FutureObject const&) = delete;
FutureObject& operator=(FutureObject const&) = delete;
// not movable (see comment in the implementation of Future::then)
FutureObject(FutureObject&&) = delete;
FutureObject& operator=(FutureObject&&) = delete;
Try<T>& valueTry() {
return *value_;
}
template <typename F>
void setContinuation(F func) {
if (continuation_) {
throw std::logic_error("setContinuation called twice");
}
if (value_.hasValue()) {
func(std::move(*value_));
delete this;
} else {
continuation_ = std::move(func);
}
}
void fulfil(Try<T>&& t) {
if (value_.hasValue()) {
throw std::logic_error("fulfil called twice");
}
if (continuation_) {
continuation_(std::move(t));
delete this;
} else {
value_ = std::move(t);
}
}
void setException(std::exception_ptr const& e) {
fulfil(Try<T>(e));
}
template <class E> void setException(E const& e) {
fulfil(Try<T>(std::make_exception_ptr<E>(e)));
}
bool ready() const {
return value_.hasValue();
}
typename std::add_lvalue_reference<T>::type value() {
return value_->value();
}
private:
folly::Optional<Try<T>> value_;
std::function<void(Try<T>&&)> continuation_;
};
template <typename... Ts>
struct VariadicContext {
VariadicContext() : total(0), count(0) {}
Promise<std::tuple<Try<Ts>... > > p;
std::tuple<Try<Ts>... > results;
size_t total;
std::atomic<size_t> count;
typedef Future<std::tuple<Try<Ts>...>> type;
};
template <typename... Ts, typename THead, typename... Fs>
typename std::enable_if<sizeof...(Fs) == 0, void>::type
whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead& head, Fs&... tail) {
head.setContinuation([ctx](Try<typename THead::value_type>&& t) {
const size_t i = sizeof...(Ts) - sizeof...(Fs) - 1;
std::get<i>(ctx->results) = std::move(t);
if (++ctx->count == ctx->total) {
ctx->p.setValue(std::move(ctx->results));
delete ctx;
}
});
}
template <typename... Ts, typename THead, typename... Fs>
typename std::enable_if<sizeof...(Fs) != 0, void>::type
whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead& head, Fs&... tail) {
head.setContinuation([ctx](Try<typename THead::value_type>&& t) {
const size_t i = sizeof...(Ts) - sizeof...(Fs) - 1;
std::get<i>(ctx->results) = std::move(t);
if (++ctx->count == ctx->total) {
ctx->p.setValue(std::move(ctx->results));
delete ctx;
}
});
whenAllVariadicHelper(ctx, tail...); // recursive template tail call
}
template <typename T>
struct WhenAllContext {
explicit WhenAllContext() : count(0), total(0) {}
Promise<std::vector<Try<T> > > p;
std::vector<Try<T> > results;
std::atomic<size_t> count;
size_t total;
};
template <typename T>
struct WhenAnyContext {
explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done;
std::atomic<size_t> ref_count;
void decref() {
if (--ref_count == 0) {
delete this;
}
}
};
}}} // namespace
This diff is collapsed.
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <thread>
#include "folly/wangle/ManualExecutor.h"
#include "folly/wangle/InlineExecutor.h"
#include "folly/wangle/Later.h"
using namespace folly::wangle;
struct ManualWaiter {
explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
void makeProgress() {
ex->wait();
ex->run();
}
std::shared_ptr<ManualExecutor> ex;
};
struct LaterFixture : public testing::Test {
LaterFixture() :
westExecutor(new ManualExecutor),
eastExecutor(new ManualExecutor),
waiter(new ManualWaiter(westExecutor)),
done(false)
{
t = std::thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
eastWaiter.makeProgress();
});
}
~LaterFixture() {
done = true;
eastExecutor->add([=]() { });
t.join();
}
Later<void> later;
std::shared_ptr<ManualExecutor> westExecutor;
std::shared_ptr<ManualExecutor> eastExecutor;
std::shared_ptr<ManualWaiter> waiter;
InlineExecutor inlineExecutor;
bool done;
std::thread t;
};
TEST(Later, construct_and_launch) {
bool fulfilled = false;
auto later = Later<void>().then([&](Try<void>&& t) {
fulfilled = true;
return makeFuture<int>(1);
});
// has not started yet.
EXPECT_FALSE(fulfilled);
EXPECT_EQ(later.launch().value(), 1);
EXPECT_TRUE(fulfilled);
}
TEST(Later, then_value) {
auto future = Later<int>(std::move(1))
.then([](Try<int>&& t) {
return t.value() == 1;
})
.launch();
EXPECT_TRUE(future.value());
}
TEST(Later, then_future) {
auto future = Later<int>(1)
.then([](Try<int>&& t) {
return makeFuture(t.value() == 1);
})
.launch();
EXPECT_TRUE(future.value());
}
TEST_F(LaterFixture, thread_hops) {
auto westThreadId = std::this_thread::get_id();
auto future = later.via(eastExecutor.get()).then([=](Try<void>&& t) {
EXPECT_NE(std::this_thread::get_id(), westThreadId);
return makeFuture<int>(1);
}).via(westExecutor.get()
).then([=](Try<int>&& t) {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
}).launch();
while (!future.isReady()) {
waiter->makeProgress();
}
EXPECT_EQ(future.value(), 1);
}
TEST_F(LaterFixture, chain_laters) {
auto westThreadId = std::this_thread::get_id();
auto future = later.via(eastExecutor.get()).then([=](Try<void>&& t) {
EXPECT_NE(std::this_thread::get_id(), westThreadId);
return makeFuture<int>(1);
}).then([=](Try<int>&& t) {
int val = t.value();
return Later<int>(std::move(val)).via(westExecutor.get())
.then([=](Try<int>&& t) mutable {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
});
}).then([=](Try<int>&& t) {
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
}).launch();
while (!future.isReady()) {
waiter->makeProgress();
}
EXPECT_EQ(future.value(), 1);
}
TEST_F(LaterFixture, fire_and_forget) {
auto west = westExecutor.get();
later.via(eastExecutor.get()).then([=](Try<void>&& t) {
west->add([]() {});
}).fireAndForget();
waiter->makeProgress();
}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <thread>
#include <future>
#include "folly/wangle/Executor.h"
#include "folly/wangle/ManualExecutor.h"
#include "folly/wangle/ThreadGate.h"
#include "folly/wangle/GenericThreadGate.h"
using namespace folly::wangle;
using std::make_shared;
using std::shared_ptr;
using std::thread;
using std::vector;
struct ManualWaiter {
explicit ManualWaiter(shared_ptr<ManualExecutor> ex) : ex(ex) {}
void makeProgress() {
ex->wait();
ex->run();
}
shared_ptr<ManualExecutor> ex;
};
struct GenericThreadGateFixture : public testing::Test {
GenericThreadGateFixture() :
westExecutor(new ManualExecutor),
eastExecutor(new ManualExecutor),
waiter(new ManualWaiter(westExecutor)),
tg(westExecutor, eastExecutor, waiter),
done(false)
{
t = thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
eastWaiter.makeProgress();
});
}
~GenericThreadGateFixture() {
done = true;
tg.gate<void>([] { return makeFuture(); });
t.join();
}
shared_ptr<ManualExecutor> westExecutor;
shared_ptr<ManualExecutor> eastExecutor;
shared_ptr<ManualWaiter> waiter;
GenericThreadGate<
shared_ptr<ManualExecutor>,
shared_ptr<ManualExecutor>,
shared_ptr<ManualWaiter>> tg;
bool done;
thread t;
};
TEST_F(GenericThreadGateFixture, gate_and_wait) {
auto f = tg.gate<void>([] { return makeFuture(); });
EXPECT_FALSE(f.isReady());
tg.waitFor(f);
EXPECT_TRUE(f.isReady());
}
TEST_F(GenericThreadGateFixture, gate_many) {
vector<Future<void>> fs;
int n = 10;
for (int i = 0; i < n; i++)
fs.push_back(tg.gate<void>([&] { return makeFuture(); }));
for (auto& f : fs)
EXPECT_FALSE(f.isReady());
auto all = whenAll(fs.begin(), fs.end());
tg.waitFor(all);
}
TEST_F(GenericThreadGateFixture, gate_alternating) {
vector<Promise<void>> ps(10);
vector<Future<void>> fs;
size_t count = 0;
for (auto& p : ps) {
auto* pp = &p;
auto f = tg.gate<void>([=] { return pp->getFuture(); });
// abuse the thread gate to do our dirty work in the other thread
tg.gate<void>([=] { pp->setValue(); return makeFuture(); });
fs.push_back(f.then([&](Try<void>&&) { count++; }));
}
for (auto& f : fs)
EXPECT_FALSE(f.isReady());
EXPECT_EQ(0, count);
auto all = whenAll(fs.begin(), fs.end());
tg.waitFor(all);
EXPECT_EQ(ps.size(), count);
}
TEST(GenericThreadGate, noWaiter) {
auto west = make_shared<ManualExecutor>();
auto east = make_shared<ManualExecutor>();
Promise<void> p;
auto dummyFuture = p.getFuture();
GenericThreadGate<ManualExecutor*, ManualExecutor*>
tg(west.get(), east.get());
EXPECT_THROW(tg.waitFor(dummyFuture), std::logic_error);
}
TEST_F(GenericThreadGateFixture, gate_with_promise) {
Promise<int> p;
auto westId = std::this_thread::get_id();
bool westThenCalled = false;
auto f = p.getFuture().then(
[westId, &westThenCalled](Try<int>&& t) {
EXPECT_EQ(t.value(), 1);
EXPECT_EQ(std::this_thread::get_id(), westId);
westThenCalled = true;
return t.value();
});
bool eastPromiseMade = false;
std::async(std::launch::async, [&p, &eastPromiseMade, this]() {
// South thread != west thread. p gets set in west thread.
tg.gate<int>([&p, &eastPromiseMade, this] {
EXPECT_EQ(t.get_id(), std::this_thread::get_id());
Promise<int> eastPromise;
auto eastFuture = eastPromise.getFuture();
eastPromise.setValue(1);
eastPromiseMade = true;
return eastFuture;
},
std::move(p));
});
tg.waitFor(f);
EXPECT_TRUE(westThenCalled);
EXPECT_TRUE(eastPromiseMade);
EXPECT_EQ(f.value(), 1);
}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment