Commit 0415fc2a authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook GitHub Bot

remove pushmi from folly

Summary: remove pushmi from folly

Reviewed By: yfeldblum

Differential Revision: D22531090

fbshipit-source-id: 3e6561f220cc3fd227dec321112ec848c4f17c6f
parent 9683c8db
......@@ -145,7 +145,6 @@ REMOVE_MATCHES_FROM_LISTS(files hfiles
MATCHES
"^${FOLLY_DIR}/build/"
"^${FOLLY_DIR}/experimental/exception_tracer/"
"^${FOLLY_DIR}/experimental/pushmi/"
"^${FOLLY_DIR}/futures/exercises/"
"^${FOLLY_DIR}/logging/example/"
"^${FOLLY_DIR}/(.*/)?test/"
......
......@@ -26,10 +26,6 @@
#include <folly/Utility.h>
namespace folly {
namespace pushmi {
// derive from this for types that need to find operator|() overloads by ADL
struct folly_pipeorigin {};
} // namespace pushmi
using Func = Function<void()>;
......@@ -89,8 +85,7 @@ class Executor {
* preserve the original Executor type.
*/
template <typename ExecutorT = Executor>
class KeepAlive : pushmi::folly_pipeorigin,
private detail::ExecutorKeepAliveBase {
class KeepAlive : private detail::ExecutorKeepAliveBase {
public:
using KeepAliveFunc = Function<void(KeepAlive&&)>;
......
---
DisableFormat: true
SortIncludes: false
...
# Pushmi
## pushing values around
This library is counterpart to [P1055 - *A Modest Executor Proposal*](http://wg21.link/p1055r0).
*pushmi* is a header-only library that uses git submodules for dependencies (`git clone --recursive`), uses CMake to build, requires compliant C++14 compiler to build and has dependencies on meta and catch2 and some other libraries for testing and examples.
[![godbolt](https://img.shields.io/badge/godbolt-master-brightgreen.svg?style=flat-square)](https://godbolt.org/z/vCUK0M)
*pushmi* is an implementation for prototyping how Futures, Executors can be defined with shared Concepts. These Concepts can be implemented over and over again to solve different problems and make different tradeoffs. User implementations of the Concepts are first-class citizens due to the attention to composition. Composition also enables each implementation of the Concepts to focus on one concern and then be composed to build more complex solutions.
## Build status
Travis-CI: [![Travis Build Status](https://travis-ci.org/facebookresearch/pushmi.svg?branch=master)](https://travis-ci.org/facebookresearch/pushmi)
## Callbacks
*Callbacks* are very familiar though they take many forms. It is precisely the multiplicity of forms that make Callbacks difficult to compose.
A minimal callback might be passed some state. The state might include an error or might only be a result code. Maybe this is delivered as one parameter, or as many parameters. Maybe the Callback is called once, or many or even zero times.
*Promises* provide a stable contract for Callbacks of a single result or error.
## `std::promise<>`
The interface for `std::promise<void>` is fairly straightforward.
```cpp
struct promise<void> {
void set_value();
void set_exception(std::exception_ptr);
future<void> get_future();
};
```
usage is also simple, but a bit convoluted (the promise produces the future, has the result set_ function called, and only then is future::get called to get the result).
```cpp
std::promise<void> p;
auto f = p.get_future();
p.set_value();
// or
// p.set_exception(std::exception_ptr{});
f.get();
```
it is this convolution that creates the race between the producer and consumer that requires expensive internal state to resolve.
## `receiver`
The `receiver` type in the library provides simple ways to construct new implementations of the Receiver concept.
construct a receiver_tag type that accepts any value or error type (and aborts on error)
```cpp
receiver<> s;
```
construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets
```cpp
// provide done
auto s0 = receiver{on_done{[](){}}};
// provide value
auto s1 = receiver{[](auto v){}};
auto s2 = receiver{on_value{[](int){}, [](auto v){}}};
// these are quite dangerous as they suppress errors
// provide error
auto s3 = receiver{[](auto v){}, [](std::exception_ptr){}, [](){}};
auto s4 = receiver{on_error{[](std::exception_ptr){}}, on_done{[](){}}};
auto s5 = receiver{on_error{[](std::exception_ptr){}, [](auto){}}};
auto s6 = receiver{on_error{[](std::exception_ptr){}}};
```
construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing receiver. The state must be a Receiver, but can be a super-set with additional state for this filter.
```cpp
auto s0 = receiver{receiver{}};
auto s1 = receiver{receiver{}, on_done{
[](receiver<>& out, std::exception_ptr ep){out | set_done();}}};
auto s2 = receiver{receiver{},
[](receiver<>& out, auto v){out | set_value(v);};
auto s3 = receiver{receiver{}, on_value{
[](receiver<>& out, int v){out | set_value(v);},
[](receiver<>& out, auto v){out | set_value(v);}}};
// these are quite dangerous as they suppress errors
auto s4 = receiver{receiver{},
[](){}
[](receiver<>& out, std::exception_ptr ep){out | set_done();},
[](receiver<>&){out | set_done();}};
auto s5 = receiver{receiver{}, on_error{
[](receiver<>& out, std::exception_ptr ep){out | set_done();},
[](receiver<>& out, auto e){out | set_done();}}};
```
construct a type-erased type for a particular T & E (each of which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors.
```cpp
auto s0 = any_receiver<std::exception_ptr, int>{receiver{}};
auto s1 = any_receiver<std::exception_ptr, int>{receiver{}};
```
## `single_sender`
The `single_sender` type in the library provides simple ways to construct new implementations of the SingleSender concept.
construct a producer of nothing, aka `never()`
```cpp
single_sender<> sd;
```
construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets
```cpp
auto sd0 = single_sender{on_submit{[](auto out){}}};
auto sd1 = single_sender{[](auto out){}};
auto sd2 = single_sender{on_submit{[](receiver<> out){}, [](auto out){}}};
```
construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing single_sender. The state must be a SingleSender, but can be a super-set with additional state for this filter.
```cpp
auto sd0 = single_sender{single_sender{}};
auto sd1 = single_sender{single_sender{}, on_submit{
[](single_sender<>& in, auto out){in | submit(out);}}};
auto sd2 = single_sender{single_sender{},
[](single_sender<>& in, auto out){in | submit(out);}};
```
construct a type-erased type for a particular T & E (which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors.
```cpp
auto sd0 = any_single_sender<std::exception_ptr, int>{single_sender{}};
auto sd1 = any_single_sender<std::exception_ptr, int>{single_sender{}};
```
## `time_single_sender`
The `time_single_sender` type in the library provides simple ways to construct new implementations of the TimeSingleSender concept.
construct a producer of nothing, aka `never()`
```cpp
time_single_sender<> tsd;
```
construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets
```cpp
auto tsd0 = time_single_sender{on_submit{[](auto at, auto out){}}};
auto tsd1 = time_single_sender{[](auto at, auto out){}};
auto tsd2 = time_single_sender{on_submit{[](auto at, receiver<> out){}, [](auto at, auto out){}}};
```
construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing time_single_sender. The state must be a SingleSender, but can be a super-set with additional state for this filter.
```cpp
auto tsd0 = time_single_sender{single_sender{}};
auto tsd1 = time_single_sender{single_sender{}, on_submit{
[](time_single_sender<>& in, auto at, auto out){in | submit(at, out);}}};
auto tsd2 = time_single_sender{single_sender{},
[](time_single_sender<>& in, auto at, auto out){in | submit(at, out);}};
```
construct a type-erased type for a particular T & E (which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors.
```cpp
auto tsd0 = any_time_single_sender<std::exception_ptr, std::system_clock::time_point, int>{time_single_sender{}};
auto tsd1 = any_time_single_sender<std::exception_ptr, std::system_clock::time_point, int>{time_single_sender{}};
```
## put it all together with some algorithms
[![godbolt](https://img.shields.io/badge/godbolt-master-brightgreen.svg?style=flat-square)](https://godbolt.org/z/vCUK0M)
### Executor
```cpp
auto nt = new_thread();
nt | blocking_submit([](auto nt){
nt |
transform([](auto nt){ return 42; }) | submit([](int){}) |
transform([](int fortyTwo){ return "42"s; }) | submit([](std::string){});
});
```
### Single
```cpp
auto fortyTwo = just(42) |
transform([](auto v){ return std::to_string(v); }) |
on(new_thread) |
via(new_thread) |
get<std::string>;
just(42) |
transform([](auto v){ return std::to_string(v); }) |
on(new_thread) |
via(new_thread) |
blocking_submit([](std::string){});
```
### Many
[![godbolt](https://img.shields.io/badge/godbolt-master-brightgreen.svg?style=flat-square)](https://godbolt.org/z/woVAi9)
```cpp
auto values = std::array<int, 5>{4, 20, 7, 3, 8};
auto f = op::from(values) |
op::submit([&](int){});
```
### FlowMany
[![godbolt](https://img.shields.io/badge/godbolt-master-brightgreen.svg?style=flat-square)](https://godbolt.org/z/woVAi9)
```cpp
auto values = std::array<int, 5>{4, 20, 7, 3, 8};
auto f = op::flow_from(values) |
op::for_each([&](int){});
```
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <utility>
#include <type_traits>
#include <folly/Portability.h>
#include <folly/Traits.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/Traits.h>
#endif
#include <folly/experimental/pushmi/detail/concept_def.h>
#include <folly/experimental/pushmi/forwards.h>
#include <folly/experimental/pushmi/sender/detail/concepts.h>
#include <folly/experimental/pushmi/traits.h>
namespace folly {
namespace pushmi {
#if FOLLY_HAS_COROUTINES
namespace detail {
PUSHMI_CONCEPT_DEF(
template(typename Tp)
concept AwaitableLike_,
SemiMovable<Tp> && coro::is_awaitable_v<Tp>
);
template<typename T>
struct awaitable_sender_traits_impl : awaitable_senders::sender_adl_hook {
template<template<class...> class Tuple, template<class...> class Variant>
using value_types = Variant<Tuple<T>>;
template<template<class...> class Variant>
using error_type = Variant<std::exception_ptr>;
};
template<>
struct awaitable_sender_traits_impl<void> : awaitable_senders::sender_adl_hook {
template<template<class...> class Tuple, template<class...> class Variant>
using value_types = Variant<Tuple<>>;
template<template<class...> class Variant>
using error_type = Variant<std::exception_ptr>;
};
template<class T>
struct IsAwaitableLike_
: std::integral_constant<bool, detail::AwaitableLike_<T>> {
};
std::true_type safe_to_test_awaitable(void const*);
template<
typename T,
// Don't ask if a type is awaitable if it inherits from
// awaitable_senders::sender_adl_hook, because that will find the
// default operator co_await that is constrained with TypedSingleSender
// and cause a recursive template instantiation.
bool = Conjunction<
decltype(safe_to_test_awaitable(static_cast<T*>(nullptr))),
IsAwaitableLike_<T>>::value>
struct awaitable_sender_traits {
};
template<typename T>
struct awaitable_sender_traits<T, true>
: awaitable_sender_traits_impl<coro::await_result_t<T>> {
using sender_category = single_sender_tag;
};
} // namespace detail
PUSHMI_CONCEPT_DEF(
template(typename Tp)
concept Awaiter,
coro::is_awaiter_v<Tp>
);
PUSHMI_CONCEPT_DEF(
template(typename Tp)
concept Awaitable,
detail::AwaitableLike_<Tp> &&
SingleTypedSender<Tp>
);
PUSHMI_CONCEPT_DEF(
template(typename Tp, typename Result)
concept AwaitableOf,
Awaitable<Tp> && ConvertibleTo<coro::await_result_t<Tp>, Result>
);
#else // if FOLLY_HAS_COROUTINES
namespace detail {
template <class>
using awaitable_sender_traits = awaitable_senders::sender_adl_hook;
} // namespace detail
#endif // if FOLLY_HAS_COROUTINES
namespace detail {
template<class S>
struct basic_typed_sender_traits : awaitable_senders::sender_adl_hook {
template<template<class...> class Tuple, template<class...> class Variant>
using value_types = typename S::template value_types<Tuple, Variant>;
template<template<class...> class Variant>
using error_type = typename S::template error_type<Variant>;
};
template<class S, bool = SenderLike_<S>>
struct basic_sender_traits : awaitable_sender_traits<S> {
};
template<class S>
struct basic_sender_traits<S, true>
: std::conditional_t<
TypedSenderLike_<S>,
basic_typed_sender_traits<S>,
awaitable_sender_traits<S>> {
using sender_category = typename S::sender_category;
};
} // namespace detail
template<typename S, typename>
struct sender_traits
: std::conditional_t<
std::is_same<std::decay_t<S>, S>::value,
detail::basic_sender_traits<S>,
sender_traits<std::decay_t<S>>> {
using _not_specialized = void;
};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cassert>
#include <exception>
#include <type_traits>
#include <utility>
#include <folly/Portability.h>
#include <folly/experimental/pushmi/awaitable/concepts.h>
#include <folly/experimental/pushmi/sender/detail/concepts.h>
#include <folly/experimental/pushmi/sender/primitives.h>
#include <folly/experimental/pushmi/sender/properties.h>
#include <folly/experimental/pushmi/traits.h>
#include <folly/experimental/pushmi/receiver/tags.h>
#if FOLLY_HAS_COROUTINES
#include <experimental/coroutine>
#include <folly/experimental/coro/detail/ManualLifetime.h>
#endif
namespace folly {
namespace pushmi {
struct operation_cancelled : std::exception {
virtual const char* what() const noexcept {
return "operation cancelled";
}
};
#if FOLLY_HAS_COROUTINES
PUSHMI_TEMPLATE(class From)( //
requires SingleTypedSender<From> //
) //
struct sender_awaiter {
private:
using value_type = sender_values_t<From, detail::identity_or_void_t>;
using coro_handle = std::experimental::coroutine_handle<>;
std::add_pointer_t<From> sender_{};
enum class state { empty, value, exception };
coro_handle continuation_{};
state state_ = state::empty;
union {
coro::detail::ManualLifetime<value_type> value_{};
coro::detail::ManualLifetime<std::exception_ptr> exception_;
};
using is_always_blocking = property_query<From, is_always_blocking<>>;
struct internal_receiver {
sender_awaiter *this_;
using receiver_category = receiver_tag;
PUSHMI_TEMPLATE(class U)( //
requires ConvertibleTo<U, value_type> //
) //
void value(U&& value)
noexcept(std::is_nothrow_constructible<value_type, U>::value) {
this_->value_.construct(static_cast<U&&>(value));
this_->state_ = state::value;
}
PUSHMI_TEMPLATE(class V = value_type)( //
requires std::is_void<V>::value //
) //
void value() noexcept {
this_->value_.construct();
this_->state_ = state::value;
}
void done() noexcept {
if (!is_always_blocking::value)
this_->continuation_.resume();
}
template<typename Error>
void error(Error error) noexcept {
assert(this_->state_ != state::value);
PUSHMI_IF_CONSTEXPR( (std::is_same<Error, std::exception_ptr>::value) (
this_->exception_.construct(std::move(error));
) else (
this_->exception_.construct(std::make_exception_ptr(std::move(error)));
))
this_->state_ = state::exception;
if (!is_always_blocking::value)
this_->continuation_.resume();
}
};
public:
sender_awaiter() {}
sender_awaiter(From&& sender) noexcept
: sender_(std::addressof(sender))
{}
sender_awaiter(sender_awaiter &&that)
noexcept(std::is_nothrow_move_constructible<value_type>::value ||
std::is_void<value_type>::value)
: sender_(std::exchange(that.sender_, nullptr))
, continuation_{std::exchange(that.continuation_, {})}
, state_(that.state_) {
if (that.state_ == state::value) {
PUSHMI_IF_CONSTEXPR( (!std::is_void<value_type>::value) (
id(value_).construct(std::move(that.value_).get());
) else (
))
that.value_.destruct();
that.state_ = state::empty;
} else if (that.state_ == state::exception) {
exception_.construct(std::move(that.exception_).get());
that.exception_.destruct();
that.state_ = state::empty;
}
}
~sender_awaiter() {
if (state_ == state::value) {
value_.destruct();
} else if (state_ == state::exception) {
exception_.destruct();
}
}
static constexpr bool await_ready() noexcept {
return false;
}
// Add detection and handling of blocking completion here, and
// return 'false' from await_suspend() in that case rather than
// potentially recursively resuming the awaiting coroutine which
// could eventually lead to a stack-overflow.
using await_suspend_result_t =
std::conditional_t<is_always_blocking::value, bool, void>;
await_suspend_result_t await_suspend(coro_handle continuation) noexcept {
continuation_ = continuation;
pushmi::submit(static_cast<From&&>(*sender_), internal_receiver{this});
return await_suspend_result_t(); // return false or void
}
decltype(auto) await_resume() {
if (state_ == state::exception) {
std::rethrow_exception(std::move(exception_).get());
} else if (state_ == state::empty) {
throw operation_cancelled{};
} else {
return std::move(value_).get();
}
}
};
#if __cpp_deduction_guides >= 201703
template<typename From>
sender_awaiter(From&&) -> sender_awaiter<From>;
#endif
namespace awaitable_senders {
// Any TypedSender that inherits from `sender` or `sender_traits` is
// automatically awaitable with the following operator co_await through the
// magic of associated namespaces. To make any other TypedSender awaitable,
// from within the body of the awaiting coroutine, do:
// `using namespace ::folly::pushmi::awaitable_senders;`
PUSHMI_TEMPLATE(class From)( //
requires not Awaiter<From> && SingleTypedSender<From> //
) //
sender_awaiter<From> operator co_await(From&& from) {
return static_cast<From&&>(from);
}
} // namespace awaitable_senders
#endif // FOLLY_HAS_COROUTINES
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/customizations/ScheduledExecutor.h>
#include <folly/experimental/pushmi/customizations/SequencedExecutor.h>
#include <folly/experimental/pushmi/executor/executor.h>
#include <folly/io/async/EventBase.h>
namespace folly {
namespace pushmi {
template <class T>
struct property_set_traits_disable<
T,
::folly::Executor,
typename std::enable_if<
std::is_base_of<::folly::EventBase, T>::value>::type> : std::true_type {
};
template <class T>
struct property_set_traits_disable<
T,
::folly::SequencedExecutor,
typename std::enable_if<
std::is_base_of<::folly::EventBase, T>::value>::type> : std::true_type {
};
template <class T>
struct property_set_traits_disable<
T,
::folly::ScheduledExecutor,
typename std::enable_if<
std::is_base_of<::folly::EventBase, T>::value>::type> : std::true_type {
};
template <class T>
struct property_set_traits<
::folly::Executor::KeepAlive<T>,
typename std::enable_if<
std::is_base_of<::folly::EventBase, T>::value>::type> {
using properties = property_set<is_fifo_sequence<>>;
};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Executor.h>
#include <folly/experimental/pushmi/executor/executor.h>
namespace folly {
template <typename ExecutorT>
class ExecutorTask : public pushmi::single_sender_tag::with_values<
Executor::KeepAlive<ExecutorT>>::no_error,
public pushmi::pipeorigin {
Executor::KeepAlive<ExecutorT> ex_;
public:
explicit ExecutorTask(Executor::KeepAlive<ExecutorT> ex)
: ex_(std::move(ex)) {}
// assume the worst - specialize pushmi::property_set_traits<> to strengthen
using properties = pushmi::property_set<pushmi::is_maybe_blocking<>>;
template <class TaskReceiver>
void submit(TaskReceiver&& out) && {
// capturing ex into a function stored in ex is a circular reference.
// the only way to break the reference is to destroy the function
ExecutorT* ep = ex_.get();
ep->add([ex = std::move(ex_), pout = (TaskReceiver &&) out]() mutable {
pushmi::set_value(pout, std::move(ex));
pushmi::set_done(pout);
});
}
};
// a derived class can disable the traits definitions by adding the following
//
// namespace pushmi {
// template <class T>
// struct property_set_traits_disable<
// T,
// ::folly::Executor,
// typename std::enable_if<
// std::is_base_of<Derived, T>::value>::type> : std::true_type {
// };
// } // namespace pushmi
//
namespace pushmi {
template <class T>
struct property_set_traits<
::folly::Executor::KeepAlive<T>,
typename std::enable_if<
std::is_base_of<::folly::Executor, T>::value &&
!property_set_traits_disable_v<T, ::folly::Executor>>::type> {
// assume the worst - specialize pushmi::property_set_traits<> to strengthen
// these
using properties = property_set<is_concurrent_sequence<>>;
};
} // namespace pushmi
/// create a sender that will enqueue a call to value() and done() of the
/// receiver on this Executor's execution context. Value will be passed a
/// copy of this Executor. This adds support for Executors to compose with
/// other sender/receiver implementations including algorithms
template <
class ExecutorT,
typename std::enable_if<
std::is_base_of<::folly::Executor, ExecutorT>::value,
int>::type = 0>
ExecutorTask<ExecutorT> schedule(::folly::Executor::KeepAlive<ExecutorT>& se) {
return ExecutorTask<ExecutorT>{se};
}
template <
class ExecutorT,
typename std::enable_if<
std::is_base_of<::folly::Executor, ExecutorT>::value,
int>::type = 0>
ExecutorTask<ExecutorT> schedule(::folly::Executor::KeepAlive<ExecutorT>&& se) {
return ExecutorTask<ExecutorT>{std::move(se)};
}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/executors/InlineExecutor.h>
#include <folly/experimental/pushmi/customizations/Executor.h>
#include <folly/experimental/pushmi/executor/executor.h>
namespace folly {
namespace pushmi {
template <>
struct property_set_traits<
::folly::Executor::KeepAlive<::folly::InlineExecutor>> {
using properties = property_set<is_fifo_sequence<>>;
};
template <>
struct property_set_traits<::folly::ExecutorTask<::folly::InlineExecutor>> {
using properties = property_set<is_always_blocking<>>;
};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/experimental/pushmi/customizations/Executor.h>
#include <folly/experimental/pushmi/executor/executor.h>
namespace folly {
namespace pushmi {
template <>
struct property_set_traits<
::folly::Executor::KeepAlive<::folly::QueuedImmediateExecutor>> {
using properties = property_set<is_fifo_sequence<>>;
};
template <>
struct property_set_traits<
::folly::ExecutorTask<::folly::QueuedImmediateExecutor>> {
using properties = property_set<is_maybe_blocking<>>;
};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/executors/ScheduledExecutor.h>
#include <folly/experimental/pushmi/customizations/Executor.h>
#include <folly/experimental/pushmi/executor/executor.h>
#include <folly/experimental/pushmi/executor/time_executor.h>
namespace folly {
template <typename ExecutorT>
class ScheduledExecutorTask : public pushmi::single_sender_tag::with_values<
Executor::KeepAlive<ExecutorT>>::no_error,
public pushmi::pipeorigin {
using TimePoint = typename ScheduledExecutor::TimePoint;
Executor::KeepAlive<ExecutorT> ex_;
TimePoint at_;
public:
explicit ScheduledExecutorTask(
::folly::Executor::KeepAlive<ExecutorT> ex,
TimePoint at)
: ex_(std::move(ex)), at_(at) {}
using properties = pushmi::property_set<pushmi::is_never_blocking<>>;
template <class TaskReceiver>
void submit(TaskReceiver&& out) && {
// capturing ex into a function stored in ex is a circular reference.
// the only way to break the reference is to destroy the function
ExecutorT* ep = ex_.get();
ep->scheduleAt(
Func{[ex = std::move(ex_), pout = (TaskReceiver &&) out]() mutable {
pushmi::set_value(pout, std::move(ex));
pushmi::set_done(pout);
}},
std::move(at_));
}
};
template <
class ExecutorT,
typename std::enable_if<
std::is_base_of<::folly::ScheduledExecutor, ExecutorT>::value,
int>::type = 0>
auto top(::folly::Executor::KeepAlive<ExecutorT>& se) {
return se->now();
}
template <
class ExecutorT,
typename std::enable_if<
std::is_base_of<::folly::ScheduledExecutor, ExecutorT>::value,
int>::type = 0>
auto schedule(
::folly::Executor::KeepAlive<ExecutorT>& se,
typename ScheduledExecutor::TimePoint at) {
return ScheduledExecutorTask<ExecutorT>{se, std::move(at)};
}
template <
class ExecutorT,
typename std::enable_if<
std::is_base_of<::folly::ScheduledExecutor, ExecutorT>::value,
int>::type = 0>
auto schedule(
::folly::Executor::KeepAlive<ExecutorT>&& se,
typename ScheduledExecutor::TimePoint at) {
return ScheduledExecutorTask<ExecutorT>{std::move(se), std::move(at)};
}
// a derived class can disable the traits definitions by adding the following
//
// namespace pushmi {
// template <class T>
// struct property_set_traits_disable<
// T,
// ::folly::ScheduledExecutor,
// typename std::enable_if<
// std::is_base_of<Derived, T>::value>::type> : std::true_type {
// };
// } // namespace pushmi
//
namespace pushmi {
template <class T>
struct property_set_traits_disable<
T,
::folly::Executor,
typename std::enable_if<
std::is_base_of<::folly::ScheduledExecutor, T>::value &&
!property_set_traits_disable_v<T, ::folly::ScheduledExecutor>>::type>
: std::true_type {};
template <class T>
struct property_set_traits<
::folly::Executor::KeepAlive<T>,
typename std::enable_if<
std::is_base_of<::folly::ScheduledExecutor, T>::value &&
!property_set_traits_disable_v<T, ::folly::ScheduledExecutor>>::type> {
using properties = property_set<is_fifo_sequence<>>;
};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/executors/SequencedExecutor.h>
#include <folly/experimental/pushmi/customizations/Executor.h>
#include <folly/experimental/pushmi/executor/executor.h>
namespace folly {
// a derived class can disable the traits definitions by adding the following
//
// namespace pushmi {
// template <class T>
// struct property_set_traits_disable<
// T,
// ::folly::SequencedExecutor,
// typename std::enable_if<
// std::is_base_of<Derived, T>::value>::type> : std::true_type {
// };
// } // namespace pushmi
//
namespace pushmi {
template <class T>
struct property_set_traits_disable<
T,
::folly::Executor,
typename std::enable_if<
std::is_base_of<::folly::SequencedExecutor, T>::value &&
!property_set_traits_disable_v<T, ::folly::SequencedExecutor>>::type>
: std::true_type {};
template <class T>
struct property_set_traits<
::folly::Executor::KeepAlive<T>,
typename std::enable_if<
std::is_base_of<::folly::SequencedExecutor, T>::value &&
!property_set_traits_disable_v<T, ::folly::SequencedExecutor>>::type> {
using properties = property_set<is_fifo_sequence<>>;
};
template <class T>
struct property_set_traits<
::folly::ExecutorTask<T>,
typename std::enable_if<
std::is_base_of<::folly::SequencedExecutor, T>::value &&
!property_set_traits_disable_v<T, ::folly::SequencedExecutor>>::type> {
using properties = property_set<is_maybe_blocking<>>;
};
} // namespace pushmi
} // namespace folly
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <utility>
#include <folly/experimental/pushmi/detail/concept_def.h>
#include <folly/experimental/pushmi/detail/traits.h>
#include <folly/functional/Invoke.h>
namespace folly {
namespace pushmi {
/* using override */ using folly::invoke;
/* using override */ using folly::invoke_result;
/* using override */ using folly::invoke_result_t;
/* using override */ using folly::is_invocable;
/* using override */ using folly::is_invocable_r;
/* using override */ using folly::is_nothrow_invocable;
/* using override */ using folly::is_nothrow_invocable_r;
PUSHMI_CONCEPT_DEF(
template(class F, class... Args) //
(concept Invocable)(F, Args...), //
requires(F&& f)( //
pushmi::invoke((F &&) f, std::declval<Args>()...) //
) //
);
PUSHMI_CONCEPT_DEF(
template(class F, class Ret, class... Args) //
(concept _InvocableR)(F, Ret, Args...), //
Invocable<F, Args...>&& ConvertibleTo<invoke_result_t<F, Args...>, Ret> //
);
PUSHMI_CONCEPT_DEF(
template(class F, class... Args) //
(concept NothrowInvocable)(F, Args...), //
requires(F&& f)( //
requires_<
noexcept(pushmi::invoke((F &&) f, std::declval<Args>()...))> //
) &&
Invocable<F, Args...> //
);
//
// construct_deduced
//
// For emulating CTAD on compilers that don't support it. Must be specialized.
template <template <class...> class T>
struct construct_deduced;
template <template <class...> class T, class... AN>
using deduced_type_t = invoke_result_t<construct_deduced<T>, AN...>;
//
// overload
//
// inspired by Ovrld - shown in a presentation by Nicolai Josuttis
#if __cpp_variadic_using >= 201611 && __cpp_concepts
template <SemiMovable... Fns>
requires sizeof...(Fns) > 0 //
struct overload_fn : Fns... {
constexpr overload_fn() = default;
constexpr explicit overload_fn(Fns... fns) //
requires sizeof...(Fns) == 1 : Fns(std::move(fns))... {}
constexpr overload_fn(Fns... fns) requires sizeof...(Fns) > 1
: Fns(std::move(fns))... {}
using Fns::operator()...;
};
#else
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... Fns>
#if __cpp_concepts
requires sizeof...(Fns) > 0
#endif
struct overload_fn;
template <class Fn>
struct overload_fn<Fn> : Fn {
constexpr overload_fn() = default;
constexpr explicit overload_fn(Fn fn) : Fn(std::move(fn)) {}
constexpr overload_fn(overload_fn&&) = default;
constexpr overload_fn& operator=(overload_fn&&) = default;
constexpr overload_fn(const overload_fn&) = default;
constexpr overload_fn& operator=(const overload_fn&) = default;
using Fn::operator();
};
#if !defined(__GNUC__) || __GNUC__ >= 8
template <class Fn, class... Fns>
struct overload_fn<Fn, Fns...> : Fn, overload_fn<Fns...> {
constexpr overload_fn() = default;
constexpr overload_fn(Fn fn, Fns... fns)
: Fn(std::move(fn)), overload_fn<Fns...>{std::move(fns)...} {}
constexpr overload_fn(overload_fn&&) = default;
constexpr overload_fn& operator=(overload_fn&&) = default;
constexpr overload_fn(const overload_fn&) = default;
constexpr overload_fn& operator=(const overload_fn&) = default;
using Fn::operator();
using overload_fn<Fns...>::operator();
};
#else
template <class Fn, class... Fns>
struct overload_fn<Fn, Fns...> {
private:
std::pair<Fn, overload_fn<Fns...>> fns_;
template <bool B>
using _which_t = std::conditional_t<B, Fn, overload_fn<Fns...>>;
public:
constexpr overload_fn() = default;
constexpr overload_fn(Fn fn, Fns... fns)
: fns_{std::move(fn), overload_fn<Fns...>{std::move(fns)...}} {}
constexpr overload_fn(overload_fn&&) = default;
constexpr overload_fn& operator=(overload_fn&&) = default;
constexpr overload_fn(const overload_fn&) = default;
constexpr overload_fn& operator=(const overload_fn&) = default;
PUSHMI_TEMPLATE(class... Args) //
(requires lazy::Invocable<Fn&, Args...> ||
lazy::Invocable<overload_fn<Fns...>&, Args...>) //
decltype(auto)
operator()(Args&&... args) noexcept(
noexcept(std::declval<_which_t<Invocable<Fn&, Args...>>&>()(
std::declval<Args>()...))) {
return std::get<!Invocable<Fn&, Args...>>(fns_)((Args &&) args...);
}
PUSHMI_TEMPLATE(class... Args) //
(requires lazy::Invocable<const Fn&, Args...> ||
lazy::Invocable<const overload_fn<Fns...>&, Args...>) //
decltype(auto)
operator()(Args&&... args) const noexcept(noexcept(
std::declval<const _which_t<Invocable<const Fn&, Args...>>&>()(
std::declval<Args>()...))) {
return std::get<!Invocable<const Fn&, Args...>>(fns_)((Args &&) args...);
}
};
#endif
#endif
template <class... Fns>
auto overload(Fns... fns) -> overload_fn<Fns...> {
return overload_fn<Fns...>{std::move(fns)...};
}
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// Usage:
//
// PUSHMI_IF_CONSTEXPR((condition)(
// stmt1;
// stmt2;
// ) else (
// stmt3;
// stmt4;
// ))
//
// If the statements could potentially be ill-formed, you can give some
// part of the expression a dependent type by wrapping it in `id`. For
/**
* Maybe_unused indicates that a function, variable or parameter might or
* might not be used, e.g.
*
* int foo(FOLLY_MAYBE_UNUSED int x) {
* #ifdef USE_X
* return x;
* #else
* return 0;
* #endif
* }
*/
#if defined(__GNUC__)
#define PUSHMI_CEXP_NOT_ON_WINDOWS 1
#else
#define PUSHMI_CEXP_NOT_ON_WINDOWS 0
#endif
#ifndef __has_attribute
#define PUSHMI_CEXP_HAS_ATTRIBUTE(x) 0
#else
#define PUSHMI_CEXP_HAS_ATTRIBUTE(x) __has_attribute(x)
#endif
#ifndef __has_cpp_attribute
#define PUSHMI_CEXP_HAS_CPP_ATTRIBUTE(x) 0
#else
#define PUSHMI_CEXP_HAS_CPP_ATTRIBUTE(x) __has_cpp_attribute(x)
#endif
#if PUSHMI_CEXP_HAS_CPP_ATTRIBUTE(maybe_unused)
#define PUSHMI_CEXP_MAYBE_UNUSED [[maybe_unused]]
#elif PUSHMI_CEXP_HAS_ATTRIBUTE(__unused__) || __GNUC__
#define PUSHMI_CEXP_MAYBE_UNUSED __attribute__((__unused__))
#else
#define PUSHMI_CEXP_MAYBE_UNUSED
#endif
// disable buggy compatibility warning about "requires" and "concept" being
// C++20 keywords.
#if PUSHMI_CEXP_NOT_ON_WINDOWS
#define PUSHMI_CEXP_IGNORE_SHADOW_BEGIN \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wshadow\"") \
_Pragma("GCC diagnostic ignored \"-Wshadow-local\"") \
/**/
#define PUSHMI_CEXP_IGNORE_SHADOW_END \
_Pragma("GCC diagnostic pop")
#else
#define PUSHMI_CEXP_IGNORE_SHADOW_BEGIN
#define PUSHMI_CEXP_IGNORE_SHADOW_END
#endif
#define PUSHMI_CEXP_COMMA ,
#if PUSHMI_CEXP_NOT_ON_WINDOWS
#define PUSHMI_CEXP_EVAL(X, ...) X(__VA_ARGS__)
#define PUSHMI_CEXP_STRIP(...) __VA_ARGS__
#else
// https://stackoverflow.com/questions/5134523/msvc-doesnt-expand-va-args-correctly
#define PUSHMI_CEXP_VA_ARGS_EXPANDER_INNER(X) X
#define PUSHMI_CEXP_VA_ARGS_EXPANDER(X) PUSHMI_CEXP_VA_ARGS_EXPANDER_INNER(X)
#define PUSHMI_CEXP_EVAL(X, ...) PUSHMI_CEXP_VA_ARGS_EXPANDER(X(__VA_ARGS__))
#define PUSHMI_CEXP_STRIP(...) PUSHMI_CEXP_VA_ARGS_EXPANDER(__VA_ARGS__)
#endif
namespace folly {
namespace pushmi {
namespace detail {
struct id_fn {
constexpr explicit operator bool() const noexcept {
return false;
}
template <class T>
constexpr T&& operator()(T&& t) const noexcept {
return (T&&) t;
}
};
} // namespace detail
} // namespace pushmi
} // namespace folly
#if __cpp_if_constexpr >= 201606
#define PUSHMI_IF_CONSTEXPR(LIST) \
if constexpr (::folly::pushmi::detail::id_fn id = {}) { \
} else if constexpr \
PUSHMI_CEXP_EVAL(PUSHMI_IF_CONSTEXPR_ELSE_, PUSHMI_IF_CONSTEXPR_IF_ LIST)
#define PUSHMI_IF_CONSTEXPR_RETURN(LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_BEGIN \
PUSHMI_IF_CONSTEXPR(LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_END \
/**/
#define PUSHMI_IF_CONSTEXPR_IF_(...) \
(__VA_ARGS__) PUSHMI_CEXP_COMMA PUSHMI_IF_CONSTEXPR_THEN_
#define PUSHMI_IF_CONSTEXPR_THEN_(...) \
({__VA_ARGS__}) PUSHMI_CEXP_COMMA
#define PUSHMI_IF_CONSTEXPR_ELSE_(A, B, C) \
A PUSHMI_CEXP_STRIP B PUSHMI_IF_CONSTEXPR_ ## C
#define PUSHMI_IF_CONSTEXPR_else(...) \
else {__VA_ARGS__}
#else
#include <type_traits>
#define PUSHMI_IF_CONSTEXPR(LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_BEGIN \
PUSHMI_CEXP_EVAL(PUSHMI_IF_CONSTEXPR_ELSE_, PUSHMI_IF_CONSTEXPR_IF_ LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_END \
/**/
#define PUSHMI_IF_CONSTEXPR_RETURN(LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_BEGIN \
return PUSHMI_CEXP_EVAL(PUSHMI_IF_CONSTEXPR_ELSE_, PUSHMI_IF_CONSTEXPR_IF_ LIST)\
PUSHMI_CEXP_IGNORE_SHADOW_END \
/**/
#define PUSHMI_IF_CONSTEXPR_IF_(...) \
(::folly::pushmi::detail::select<bool(__VA_ARGS__)>() ->* PUSHMI_IF_CONSTEXPR_THEN_ \
/**/
#define PUSHMI_IF_CONSTEXPR_THEN_(...) \
([&](PUSHMI_CEXP_MAYBE_UNUSED auto id)mutable->decltype(auto){__VA_ARGS__})) PUSHMI_CEXP_COMMA \
/**/
#define PUSHMI_IF_CONSTEXPR_ELSE_(A, B) \
A ->* PUSHMI_IF_CONSTEXPR_ ## B \
/**/
#define PUSHMI_IF_CONSTEXPR_else(...) \
([&](PUSHMI_CEXP_MAYBE_UNUSED auto id)mutable->decltype(auto){__VA_ARGS__});\
/**/
namespace folly {
namespace pushmi {
namespace detail {
template <bool>
struct select {
template <class R, class = std::enable_if_t<!std::is_void<R>::value>>
struct eat_return {
R value_;
template <class T>
constexpr R operator->*(T&&) {
return static_cast<R&&>(value_);
}
};
struct eat {
template <class T>
constexpr void operator->*(T&&) {}
};
template <class T>
constexpr auto operator->*(T&& t)
-> eat_return<decltype(t(::folly::pushmi::detail::id_fn{}))> {
return {t(::folly::pushmi::detail::id_fn{})};
}
template <class T>
constexpr auto operator->*(T&& t) const -> eat {
return t(::folly::pushmi::detail::id_fn{}), void(), eat{};
}
};
template <>
struct select<false> {
struct eat {
template <class T>
constexpr auto operator->*(T&& t) -> decltype(auto) {
return t(::folly::pushmi::detail::id_fn{});
}
};
template <class T>
constexpr eat operator->*(T&&) {
return {};
}
};
} // namespace detail
} // namespace pushmi
} // namespace folly
#endif
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if __cpp_lib_optional >= 201606
#include <optional> // @manual
#endif
#include <type_traits>
#include <utility>
namespace folly {
namespace pushmi {
namespace detail {
#if __cpp_lib_optional >= 201606
template <class T>
struct opt : private std::optional<T> {
opt() = default;
opt& operator=(T&& t) {
this->std::optional<T>::operator=(std::move(t));
return *this;
}
using std::optional<T>::operator*;
using std::optional<T>::operator bool;
};
#else
template <class T>
struct opt {
private:
bool empty_ = true;
std::aligned_union_t<0, T> data_;
T* ptr() {
return static_cast<T*>((void*)&data_);
}
const T* ptr() const {
return static_cast<const T*>((const void*)&data_);
}
void reset() {
if (!empty_) {
ptr()->~T();
empty_ = true;
}
}
public:
opt() = default;
opt(T&& t) noexcept(std::is_nothrow_move_constructible<T>::value) {
::new (ptr()) T(std::move(t));
empty_ = false;
}
opt(const T& t) {
::new (ptr()) T(t);
empty_ = false;
}
opt(opt&& that) noexcept(std::is_nothrow_move_constructible<T>::value) {
if (that) {
::new (ptr()) T(std::move(*that));
empty_ = false;
that.reset();
}
}
opt(const opt& that) {
if (that) {
::new (ptr()) T(*that);
empty_ = false;
}
}
~opt() {
reset();
}
opt& operator=(opt&& that) noexcept(
std::is_nothrow_move_constructible<T>::value&&
std::is_nothrow_move_assignable<T>::value) {
if (*this && that) {
**this = std::move(*that);
that.reset();
} else if (*this) {
reset();
} else if (that) {
::new (ptr()) T(std::move(*that));
empty_ = false;
}
return *this;
}
opt& operator=(const opt& that) {
if (*this && that) {
**this = *that;
} else if (*this) {
reset();
} else if (that) {
::new (ptr()) T(*that);
empty_ = false;
}
return *this;
}
opt& operator=(T&& t) noexcept(
std::is_nothrow_move_constructible<T>::value&&
std::is_nothrow_move_assignable<T>::value) {
if (*this)
**this = std::move(t);
else {
::new (ptr()) T(std::move(t));
empty_ = false;
}
return *this;
}
opt& operator=(const T& t) {
if (*this)
**this = t;
else {
::new (ptr()) T(t);
empty_ = false;
}
return *this;
}
explicit operator bool() const noexcept {
return !empty_;
}
T& operator*() noexcept {
return *ptr();
}
const T& operator*() const noexcept {
return *ptr();
}
};
#endif
} // namespace detail
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <type_traits>
#include <folly/experimental/pushmi/detail/concept_def.h>
namespace folly {
namespace pushmi {
PUSHMI_CONCEPT_DEF(
template(class T) //
concept Object,
requires(T* p) ( //
*p, implicitly_convertible_to<const volatile void*>(p))//
);
PUSHMI_CONCEPT_DEF(
template(class T, class... Args) //
(concept Constructible)(T, Args...),
PUSHMI_PP_IS_CONSTRUCTIBLE(T, Args...));
PUSHMI_CONCEPT_DEF(
template(class From, class To) //
concept ExplicitlyConvertibleTo,
requires(From (&f)()) ( //
static_cast<To>(f()))
);
PUSHMI_CONCEPT_DEF(
template(class From, class To) //
concept ConvertibleTo,
ExplicitlyConvertibleTo<From, To>&& std::is_convertible<From, To>::value);
PUSHMI_CONCEPT_DEF(
template(class T) //
concept SemiMovable,
Object<T>&& Constructible<T, T>&& ConvertibleTo<T, T>);
} // namespace pushmi
} // namespace folly
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/sender/single_sender.h>
namespace folly {
namespace pushmi {
namespace operators {
PUSHMI_INLINE_VAR constexpr struct bulk_fn {
template <
class F,
class ShapeBegin,
class ShapeEnd,
class Target,
class IF,
class RS>
auto operator()(
F&& func,
ShapeBegin sb,
ShapeEnd se,
Target&& driver,
IF&& initFunc,
RS&& selector) const {
return [func, sb, se, driver, initFunc, selector](auto in) mutable {
return make_single_sender(
[in, func, sb, se, driver, initFunc, selector](auto out) mutable {
using Out = decltype(out);
struct data : Out {
data(Out out) : Out(std::move(out)) {}
bool empty = true;
};
submit(
in,
make_receiver(
data{std::move(out)},
[func, sb, se, driver, initFunc, selector](
auto& out_, auto input) mutable noexcept {
out_.empty = false;
driver(
initFunc,
selector,
std::move(input),
func,
sb,
se,
std::move(static_cast<Out&>(out_)));
},
// forward to output
[](auto o, auto e) noexcept {set_error(o, e);},
// only pass done through when empty
[](auto o){ if (o.empty) { set_done(o); }}));
});
};
}
} bulk{};
} // namespace operators
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/examples/pool.h>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/o/via.h>
#include <folly/experimental/pushmi/executor/strand.h>
using namespace folly::pushmi::aliases;
struct f_t {};
f_t f() {
return {};
}
struct g_t {};
g_t g(f_t) {
return {};
}
// these expressions are read backward, bottom-right to top-left
template <class CPUExecutor, class IOExecutor>
void lisp(CPUExecutor cpu, IOExecutor io) {
// f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
// the functions)
op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
op::transform([](auto) { return f(); })(cpu.schedule())));
// f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
// task runs g)
op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
op::via(mi::strands(cpu))(op::transform([](auto) { return f(); })(cpu.schedule()))));
// f on io - g on cpu
op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
op::via(mi::strands(cpu))(op::transform([](auto) { return f(); })(io.schedule()))));
}
template <class CPUExecutor, class IOExecutor>
void sugar(CPUExecutor cpu, IOExecutor io) {
// f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
// the functions)
cpu.schedule() | op::transform([](auto) { return f(); }) |
op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
// f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
// task runs g)
cpu.schedule() | op::transform([](auto) { return f(); }) | op::via(mi::strands(cpu)) |
op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
// f on io - g on cpu
io.schedule() | op::transform([](auto) { return f(); }) | op::via(mi::strands(cpu)) |
op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
}
template <class CPUExecutor, class IOExecutor>
void pipe(CPUExecutor cpu, IOExecutor io) {
// f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
// the functions)
mi::pipe(
cpu.schedule(),
op::transform([](auto) { return f(); }),
op::transform([](f_t ft) { return g(ft); }),
op::submit([](g_t) {}));
// f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
// task runs g)
mi::pipe(
cpu.schedule(),
op::transform([](auto) { return f(); }),
op::via(mi::strands(cpu)),
op::transform([](f_t ft) { return g(ft); }),
op::submit([](g_t) {}));
// f on io - g on cpu
mi::pipe(
io.schedule(),
op::transform([](auto) { return f(); }),
op::via(mi::strands(cpu)),
op::transform([](f_t ft) { return g(ft); }),
op::submit([](g_t) {}));
}
int main() {
mi::pool cpuPool{std::max(1u, std::thread::hardware_concurrency())};
mi::pool ioPool{std::max(1u, std::thread::hardware_concurrency())};
lisp(cpuPool.executor(), ioPool.executor());
sugar(cpuPool.executor(), ioPool.executor());
pipe(cpuPool.executor(), ioPool.executor());
ioPool.wait();
cpuPool.wait();
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/o/defer.h>
#include <folly/experimental/pushmi/o/share.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/tap.h>
// https://godbolt.org/g/rVLMTu
using namespace folly::pushmi::aliases;
// three models of submission deferral
// (none of these use an executor, they are all running
// synchronously on the main thread)
// this constructs eagerly and submits just() lazily
auto defer_execution() {
printf("construct just\n");
return op::just(42) | op::tap([](int v) { printf("just - %d\n", v); });
}
// this constructs defer() eagerly, constructs just() and submits just() lazily
auto defer_construction() {
return op::defer([] { return defer_execution(); });
}
// this constructs defer(), constructs just() and submits just() eagerly
auto eager_execution() {
return defer_execution() | op::share<int>();
}
int main() {
printf("\ncall defer_execution\n");
auto de = defer_execution();
printf("submit defer_execution\n");
de | op::submit();
// call defer_execution
// construct just
// submit defer_execution
// just - 42
printf("\ncall defer_construction\n");
auto dc = defer_construction();
printf("submit defer_construction\n");
dc | op::submit();
// call defer_construction
// submit defer_construction
// construct just
// just - 42
printf("\ncall eager_execution\n");
auto ee = eager_execution();
printf("submit eager_execution\n");
ee | op::submit();
// call eager_execution
// construct just
// just - 42
// submit eager_execution
std::cout << "OK" << std::endl;
// OK
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdio>
#include <iostream>
#include <folly/experimental/pushmi/executor/strand.h>
#include <folly/experimental/pushmi/o/request_via.h>
#include <folly/experimental/pushmi/o/tap.h>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/examples/pool.h>
using namespace folly::pushmi::aliases;
template <class Io>
auto io_operation(Io io) {
return io.schedule() | op::transform([](auto) { return 42; }) |
op::tap([](int v) { std::printf("io pool producing, %d\n", v); }) |
op::request_via();
}
int main() {
mi::pool cpuPool{std::max(1u, std::thread::hardware_concurrency())};
mi::pool ioPool{std::max(1u, std::thread::hardware_concurrency())};
auto io = ioPool.executor();
auto cpu = cpuPool.executor();
io_operation(io).via(mi::strands(cpu)) |
op::tap([](int v) { std::printf("cpu pool processing, %d\n", v); }) |
op::submit();
// when the caller is not going to process the result (only side-effect
// matters) or the caller is just going to push the result into a queue,
// provide a way to skip the transition to a different executor and make it
// stand out so that it has to be justified in code reviews.
mi::via_cast(io_operation(io)) | op::submit();
io = mi::pool_executor{};
cpu = mi::pool_executor{};
ioPool.wait();
cpuPool.wait();
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/examples/bulk.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/submit.h>
namespace folly {
namespace pushmi {
PUSHMI_INLINE_VAR constexpr struct for_each_fn {
private:
template <class Function>
struct fn {
Function f_;
template <class Cursor>
void operator()(detail::any, Cursor cursor) const {
f_(*cursor);
}
};
struct identity {
template <class T>
auto operator()(T&& t) const {
return (T &&) t;
}
};
struct zero {
int operator()(detail::any) const noexcept {
return 0;
}
};
public:
template <class ExecutionPolicy, class RandomAccessIterator, class Function>
void operator()(
ExecutionPolicy&& policy,
RandomAccessIterator begin,
RandomAccessIterator end,
Function f) const {
operators::just(0) |
operators::bulk(
fn<Function>{f}, begin, end, policy, identity{}, zero{}) |
operators::blocking_submit();
}
} for_each{};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/examples/for_each.h>
#include <folly/experimental/pushmi/examples/pool.h>
using namespace folly::pushmi::aliases;
template <class Executor, class Allocator = std::allocator<char>>
auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) {
return [e, a](
auto init,
auto selector,
auto input,
auto&& func,
auto sb,
auto se,
auto out) mutable {
using RS = decltype(selector);
using F = std::conditional_t<
std::is_lvalue_reference<decltype(func)>::value,
decltype(func),
typename std::remove_reference<decltype(func)>::type>;
using Out = decltype(out);
try {
typename std::allocator_traits<Allocator>::template rebind_alloc<char>
allocState(a);
using Acc = decltype(init(input));
struct shared_state_type {
std::exception_ptr first_exception_{};
Out destination_;
RS selector_;
F func_;
std::atomic<Acc> accumulation_;
std::atomic<std::size_t> pending_{1};
std::atomic<std::size_t> exception_count_{0}; // protects assignment to
// first exception
shared_state_type(Out&& destination, RS&& selector, F&& func, Acc acc)
: destination_((Out&&) destination)
, selector_((RS&&) selector)
, func_((F&&) func)
, accumulation_(acc)
{}
};
auto shared_state = std::allocate_shared<shared_state_type>(
allocState,
std::move(out),
std::move(selector),
(decltype(func)&&)func,
init(std::move(input)));
e.schedule() | op::submit([e, sb, se, shared_state](auto) mutable {
auto stepDone = [](auto shared_state_) {
// pending
if (--shared_state_->pending_ == 0) {
// first exception
if (shared_state_->first_exception_) {
mi::set_error(
shared_state_->destination_, shared_state_->first_exception_);
return;
}
try {
// selector(accumulation)
auto result = shared_state_->selector_(
std::move(shared_state_->accumulation_.load()));
mi::set_value(shared_state_->destination_, std::move(result));
mi::set_done(shared_state_->destination_);
} catch (...) {
mi::set_error(
shared_state_->destination_, std::current_exception());
}
}
};
for (decltype(sb) idx{sb}; idx != se; ++idx) {
++shared_state->pending_;
e.schedule() | op::submit([shared_state, idx, stepDone](auto) {
try {
// this indicates to me that bulk is not the right abstraction
auto old = shared_state->accumulation_.load();
Acc step;
do {
step = old;
// func(accumulation, idx)
shared_state->func_(step, idx);
} while (!shared_state->accumulation_
.compare_exchange_strong(old, step));
} catch (...) {
// exception count
if (shared_state->exception_count_++ == 0) {
// store first exception
shared_state->first_exception_ = std::current_exception();
} // else eat the exception
}
stepDone(shared_state);
});
}
stepDone(shared_state);
});
} catch (...) {
e.schedule() |
op::submit([out = std::move(out), ep = std::current_exception()](
auto) mutable { mi::set_error(out, ep); });
}
};
}
int main() {
mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
std::vector<int> vec(10);
mi::for_each(
naive_executor_bulk_target(p.executor()),
vec.begin(),
vec.end(),
[](int& x) { x = 42; });
assert(
std::count(vec.begin(), vec.end(), 42) == static_cast<int>(vec.size()));
std::cout << "OK" << std::endl;
p.stop();
p.wait();
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/examples/for_each.h>
using namespace folly::pushmi::aliases;
auto inline_bulk_target() {
return [](auto init,
auto selector,
auto input,
auto&& func,
auto sb,
auto se,
auto out) {
try {
auto acc = init(input);
for (decltype(sb) idx{sb}; idx != se; ++idx) {
func(acc, idx);
}
auto result = selector(std::move(acc));
mi::set_value(out, std::move(result));
mi::set_done(out);
} catch (...) {
mi::set_error(out, std::current_exception());
}
};
}
int main() {
std::vector<int> vec(10);
mi::for_each(
inline_bulk_target(), vec.begin(), vec.end(), [](int& x) { x = 42; });
assert(
std::count(vec.begin(), vec.end(), 42) == static_cast<int>(vec.size()));
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/o/submit.h>
#include <folly/experimental/pushmi/sender/single_sender.h>
namespace folly {
namespace pushmi {
namespace detail {
struct no_fail_fn {
private:
struct on_error_impl {
[[noreturn]] void operator()(any, any) noexcept {
std::terminate();
}
};
template <class In>
struct out_impl {
PUSHMI_TEMPLATE(class SIn, class Out)
(requires Receiver<Out>) //
void operator()(SIn&& in, Out out) const {
submit(
(In&&)in,
receiver_from_fn<In>()(
std::move(out),
::folly::pushmi::on_error(on_error_impl{})));
}
};
struct in_impl {
PUSHMI_TEMPLATE(class In)
(requires Sender<In>) //
auto operator()(In in) const {
return ::folly::pushmi::detail::sender_from(
std::move(in),
out_impl<In>{});
}
};
public:
auto operator()() const {
return in_impl{};
}
};
} // namespace detail
namespace operators {
PUSHMI_INLINE_VAR constexpr detail::no_fail_fn no_fail{};
} // namespace operators
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/properties.h>
#include <folly/experimental/pushmi/receiver/concepts.h>
#include <folly/experimental/pushmi/sender/tags.h>
#include <folly/experimental/pushmi/sender/properties.h>
#include <folly/experimental/pushmi/executor/properties.h>
#include <folly/Executor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
namespace folly {
namespace pushmi {
class pool;
class pool_executor {
struct task;
Executor::KeepAlive<CPUThreadPoolExecutor> exec_ {};
public:
using properties = property_set<is_concurrent_sequence<>>;
pool_executor() = default;
explicit pool_executor(pool &e);
task schedule();
};
struct pool_executor::task
: single_sender_tag::with_values<pool_executor&>::no_error {
using properties = property_set<is_never_blocking<>>;
explicit task(pool_executor e)
: pool_ex_(std::move(e))
{}
PUSHMI_TEMPLATE(class Out)
(requires ReceiveValue<Out, pool_executor&>)
void submit(Out out) && {
pool_ex_.exec_->add([e = pool_ex_, out = std::move(out)]() mutable {
set_value(out, e);
set_done(out);
});
}
private:
pool_executor pool_ex_;
};
class pool {
friend pool_executor;
CPUThreadPoolExecutor pool_;
public:
explicit pool(std::size_t threads) : pool_(threads) {}
auto executor() {
return pool_executor{*this};
}
void stop() {
pool_.stop();
}
void wait() {
pool_.join();
}
};
inline pool_executor::pool_executor(pool &e)
: exec_(Executor::getKeepAliveToken(e.pool_))
{}
inline pool_executor::task pool_executor::schedule() {
return task{*this};
}
} // namespace pushmi
} // namespace folly
# things I learned from std::for_each and std::reduce
I wrote an operator called `bulk()` and implemented for_each and reduce in terms of it. I departed from the `bulk_execute()` signature and tried to model the reduce signature on my `bulk` operator. I am not satisfied with the result and would need to invest more to get an abstraction for bulk that I was confident was minimal and efficient.
# Background
## bulk_execute
The `bulk_execute()` function is intended to be an abstraction that allows efficient implementation of the parallel std algorithms on both CPU and GPU executors.
```cpp
template<class Function, class SharedFactory>
void bulk_execute(Function&& f, size_t n, SharedFactory&& sf) const;
```
A sequential implementation might look like:
```cpp
template<class Function, class SharedFactory>
void bulk_execute(Function&& f, size_t n, SharedFactory&& sf)
{
auto state = sf();
for(size_t idx = 0; idx < n; ++idx) {
f(state, idx);
}
}
```
The `Function f` already appears to be similar to the accumulate function passed to reduce. It takes the shared state and the index indicating the current value. The SharedFactory is very similar to the initialValue parameter to reduce. The Shape parameter is very similar to the Range parameter to reduce. These similarities motivated me to modify the signature to more explicitly match the reduce pattern.
## bulk operator
```cpp
template<class F, class ShapeBegin, class ShapeEnd, class Target, class IF, class RS>
auto bulk(
F&& func,
ShapeBegin sb,
ShapeEnd se,
Target&& driver,
IF&& initFunc,
RS&& selector);
```
The `bulk` function packages the parameters and returns an adapter function.
> A Sender is an object with a `submit()` method
> An Adapter is a function that takes a Sender and returns a Sender. Adapters are used for composition.
When called, the Adapter from `bulk()` will package the Adapter parameter with the original parameters and return a Sender.
> A Receiver is an object that has methods like `value()`, `error()` and `done()`. A Receiver is like a Promise.
The `submit()` method takes a Receiver. When called, the Sender from the bulk Adapter will create a Receiver with the original parameters to `bulk()` and the Receiver parameter. This new Receiver will be passed to `submit()` on the Sender that the bulk Adapter stored in this bulk Sender.
When called, the `value()` method on the bulk Receiver will pass all the packaged parameters to the Target.
> A Target is a function that orchestrates the bulk operation using the parameters. There would be different Target implementations for device, sequential, concurrent execution patterns.
A Target implementation might look like:
```cpp
template<class IF, class RS, class Input, class F, class ShapeBegin, class ShapeEnd, class Out>
void inline_driver(
IF init,
RS selector,
Input input,
F&& func,
ShapeBegin sb,
ShapeEnd se,
Out out) {
try {
auto acc = init(input);
for (decltype(sb) idx{sb}; idx != se; ++idx){
func(acc, idx);
}
auto result = selector(std::move(acc));
mi::set_value(out, std::move(result));
mi::set_done(out);
} catch(...) {
mi::set_error(out, std::current_exception());
}
};
```
> ways to improve bulk:
> - merge ShapeBegin and ShapeEnd into a Range.
> - pass out to selector so that it can deliver an error or a success.
> - initFunc multiple times to have context local state that does not need locking or CAS loop.
> - compose the driver implementations from operators rather than each having a bespoke implementation
# for_each
implementing for_each was straight-forward with the interface.
```cpp
template<class ExecutionPolicy, class RandomAccessIterator, class Function>
void for_each(
ExecutionPolicy&& policy,
RandomAccessIterator begin,
RandomAccessIterator end,
Function f)
{
operators::just(0) |
operators::bulk(
[f](auto& acc, auto cursor){ f(*cursor); },
begin,
end,
policy,
[](auto&& args){ return args; },
[](auto&& acc){ return 0; }) |
operators::blocking_submit();
}
```
The oddity is that bulk is expecting a shared state value and a value as input and a value result. Since for_each does not have shared state, this is overhead that becomes obvious and disturbing when looking at the naive concurrent driver in the code (there is a CAS loop around the call to the state update function even though the state is not updated here).
# reduce
implementing reduce took more effort and some of the code in the drivers and parameters to the driver were modified to get reduce working.
```cpp
template<class ExecutionPolicy, class ForwardIt, class T, class BinaryOp>
T reduce(
ExecutionPolicy&& policy,
ForwardIt begin,
ForwardIt end,
T init,
BinaryOp binary_op){
return operators::just(std::move(init)) |
operators::bulk(
[binary_op](auto& acc, auto cursor){ acc = binary_op(acc, *cursor); },
begin,
end,
policy,
[](auto&& args){ return args; },
[](auto&& acc){ return acc; }) |
operators::get<T>;
}
```
Based on examples that I have been shown, the existing bulk_execute would expect the bulk_execute caller to provide the synchronization for the shared state. In the case of reduce it is important to synchronize when the execution is concurrent and equally important not to synchronize when it is not concurrent. Using if constexpr to implement reduce with and without sync in the parameters to bulk would add a lot of unsafe bespoke work and complication into every algorithm using bulk. In this bulk design the driver owns synchronization instead.
> NOTE - in any case, if a high-level async library design requires manual lock or lock-free primitive usage for correct behavior, then the design needs to be changed.
I am dissatisfied with the expected perf results from the naive concurrent driver (expectation from looking at the code, will need to measure). For instance, here is the CAS loop over the accumulator function from the naive concurrent driver:
```cpp
// this indicates to me that bulk is not the right abstraction
auto old = std::get<4>(*shared_state).load();
auto step = old;
do {
step = old;
// func(accumulation, idx)
std::get<3>(*shared_state)(step, idx);
} while(!std::get<4>(*shared_state).compare_exchange_strong(old, step));
```
This is due to having a single shared_state being shared concurrently. I would much prefer having multiple states that are never used concurrently and then composing them all into one final result.
> creating factor * hardware_concurrency() number of states would allow user controlled granularity (factor) for work stealing. each state would only be used from one `hardware_concurrency` context and thus would have no synchronization when it was modified.
# static_thread_pool
this bonus section is to mention the bulk_execute implementation in the static_thread_pool. The static thread pool is a cool piece of tech. in the bulk_execute method I had two observations.
1. every index in the range from 0-N is allocated as a task node
2. this list of nodes is built locally and then inserted in one lock operation
For #1, I expect that there is a desire to reduce the number of task nodes allocated in bulk.
There are multiple ways to achieve #2 on P1055.
one way to achieve this is to add a type that is an executor but just accumulates a local queue. usage would be similar to..
```cpp
auto pool = thread_pool();
auto e = pool.bulk_executor();
my_bulk_generator(e, . . .); // lots of calls to submit
pool.bulk_enqueue(e);
```
# ExecutionPolicy
In building these algorithms I observed that the parallel std algorithms do not really depend on executor, they depend on ExecutionPolicy. GPU and CPU can have different execution policies and it does not affect the implementation or expression of the parallel algorithms (rather than passing an executor around pass an ExecutionPolicy).
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/examples/bulk.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/submit.h>
namespace folly {
namespace pushmi {
PUSHMI_INLINE_VAR constexpr struct reduce_fn {
private:
template <class BinaryOp>
struct fn {
BinaryOp binary_op_;
template <class Acc, class Cursor>
void operator()(Acc& acc, Cursor cursor) const {
acc = binary_op_(acc, *cursor);
}
};
struct identity {
template <class T>
auto operator()(T&& t) const {
return (T &&) t;
}
};
public:
template <class ExecutionPolicy, class ForwardIt, class T, class BinaryOp>
T operator()(
ExecutionPolicy&& policy,
ForwardIt begin,
ForwardIt end,
T init,
BinaryOp binary_op) const {
return operators::just(std::move(init)) |
operators::bulk(
fn<BinaryOp>{binary_op},
begin,
end,
policy,
identity{},
identity{}) |
operators::get<T>;
}
} reduce{};
} // namespace pushmi
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <exception>
#include <iostream>
#include <numeric>
#include <vector>
#include <folly/experimental/pushmi/examples/pool.h>
#include <folly/experimental/pushmi/examples/reduce.h>
using namespace folly::pushmi::aliases;
template <class Executor, class Allocator = std::allocator<char>>
auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) {
return [e, a](
auto init,
auto selector,
auto input,
auto&& func,
auto sb,
auto se,
auto out) mutable {
using RS = decltype(selector);
using F = std::conditional_t<
std::is_lvalue_reference<decltype(func)>::value,
decltype(func),
typename std::remove_reference<decltype(func)>::type>;
using Out = decltype(out);
try {
typename std::allocator_traits<Allocator>::template rebind_alloc<char>
allocState(a);
using Acc = decltype(init(input));
struct shared_state_type {
std::exception_ptr first_exception_{};
Out destination_;
RS selector_;
F func_;
std::atomic<Acc> accumulation_;
std::atomic<std::size_t> pending_{1};
std::atomic<std::size_t> exception_count_{0}; // protects assignment to
// first exception
shared_state_type(Out&& destination, RS&& selector, F&& func, Acc acc)
: destination_((Out&&) destination)
, selector_((RS&&) selector)
, func_((F&&) func)
, accumulation_(acc)
{}
};
auto shared_state = std::allocate_shared<shared_state_type>(
allocState,
std::move(out),
std::move(selector),
(decltype(func)&&)func,
init(std::move(input)));
e.schedule() | op::submit([e, sb, se, shared_state](auto) mutable {
auto stepDone = [](auto shared_state_) {
// pending
if (--shared_state_->pending_ == 0) {
// first exception
if (shared_state_->first_exception_) {
mi::set_error(
shared_state_->destination_, shared_state_->first_exception_);
return;
}
try {
// selector(accumulation)
auto result = shared_state_->selector_(
std::move(shared_state_->accumulation_.load()));
mi::set_value(shared_state_->destination_, std::move(result));
mi::set_done(shared_state_->destination_);
} catch (...) {
mi::set_error(
shared_state_->destination_, std::current_exception());
}
}
};
for (decltype(sb) idx{sb}; idx != se; ++idx) {
++shared_state->pending_;
e.schedule() | op::submit([shared_state, idx, stepDone](auto) {
try {
// this indicates to me that bulk is not the right abstraction
auto old = shared_state->accumulation_.load();
Acc step;
do {
step = old;
// func(accumulation, idx)
shared_state->func_(step, idx);
} while (!shared_state->accumulation_
.compare_exchange_strong(old, step));
} catch (...) {
// exception count
if (shared_state->exception_count_++ == 0) {
// store first exception
shared_state->first_exception_ = std::current_exception();
} // else eat the exception
}
stepDone(shared_state);
});
}
stepDone(shared_state);
});
} catch (...) {
e.schedule() |
op::submit([out = std::move(out), ep = std::current_exception()](
auto) mutable { mi::set_error(out, ep); });
}
};
}
int main() {
mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
std::vector<int> vec(10);
std::fill(vec.begin(), vec.end(), 4);
auto fortyTwo = mi::reduce(
naive_executor_bulk_target(p.executor()),
vec.begin(),
vec.end(),
2,
std::plus<>{});
std::ignore = fortyTwo;
assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo);
std::cout << "OK" << std::endl;
p.wait();
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cassert>
#include <exception>
#include <iostream>
#include <numeric>
#include <vector>
#include <folly/experimental/pushmi/examples/reduce.h>
using namespace folly::pushmi::aliases;
auto inline_bulk_target() {
return [](auto init,
auto selector,
auto input,
auto&& func,
auto sb,
auto se,
auto out) {
try {
auto acc = init(input);
for (decltype(sb) idx{sb}; idx != se; ++idx) {
func(acc, idx);
}
auto result = selector(std::move(acc));
mi::set_value(out, std::move(result));
mi::set_done(out);
} catch (...) {
mi::set_error(out, std::current_exception());
}
};
}
int main() {
std::vector<int> vec(10);
std::fill(vec.begin(), vec.end(), 4);
auto fortyTwo = mi::reduce(
inline_bulk_target(), vec.begin(), vec.end(), 2, std::plus<>{});
assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo);
std::cout << fortyTwo << std::endl;
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/o/empty.h>
#include <folly/experimental/pushmi/o/filter.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/tap.h>
#include <folly/experimental/pushmi/o/transform.h>
using namespace folly::pushmi::aliases;
const bool setting_exists = false;
auto get_setting() {
return mi::make_single_sender([](auto out) {
if (setting_exists) {
op::just(42) | op::submit(out);
} else {
op::empty() | op::submit(out);
}
});
}
auto println = [](auto v) { std::cout << v << std::endl; };
// concat not yet implemented
template <class T, class E = std::exception_ptr>
auto concat() {
return [](auto in) {
return mi::make_single_sender([in](auto out) mutable {
mi::submit(in, mi::make_receiver(out, [](auto out_, auto v) {
mi::submit(v, mi::any_receiver<E, T>(out_));
}));
});
};
}
int main() {
get_setting() | op::transform([](int i) { return std::to_string(i); }) |
op::submit(println);
op::just(42) | op::filter([](int i) { return i < 42; }) |
op::transform([](int i) { return std::to_string(i); }) |
op::submit(println);
op::just(42) | op::transform([](int i) {
if (i < 42) {
return mi::any_single_sender<std::exception_ptr, std::string>{
op::empty()};
}
return mi::any_single_sender<std::exception_ptr, std::string>{
op::just(std::to_string(i))};
}) | concat<std::string>() |
op::submit(println);
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/examples/no_fail.h>
#include <folly/experimental/pushmi/o/empty.h>
#include <folly/experimental/pushmi/o/error.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/switch_on_error.h>
#include <folly/experimental/pushmi/o/transform.h>
using namespace folly::pushmi::aliases;
// concat not yet implemented
template <class T, class E = std::exception_ptr>
auto concat() {
return [](auto in) {
return mi::make_single_sender([in](auto out) mutable {
mi::submit(in, mi::make_receiver(out, [](auto out_, auto v) {
mi::submit(v, mi::any_receiver<E, T>(out_));
}));
});
};
}
int main() {
auto stop_abort = mi::on_error([](auto) noexcept {});
// support all error value types
op::error(std::exception_ptr{}) | op::submit(stop_abort);
op::error(std::errc::argument_list_too_long) | op::submit(stop_abort);
// transform an error
op::error(std::errc::argument_list_too_long) | op::switch_on_error([
](auto) noexcept { return op::error(std::exception_ptr{}); }) |
op::submit(stop_abort);
// use default value if an error occurs
op::just(42) |
op::switch_on_error([](auto) noexcept { return op::just(0); }) |
op::submit();
// suppress if an error occurs
op::error(std::errc::argument_list_too_long) |
op::switch_on_error([](auto) noexcept { return op::empty(); }) |
op::submit();
// abort if an error occurs
op::just(42) | op::no_fail() | op::submit();
// transform value to error_
op::just(42) | op::transform([](auto v) {
using r_t = mi::any_single_sender<std::exception_ptr, int>;
if (v < 40) {
return r_t{op::error(std::exception_ptr{})};
} else {
return r_t{op::just(v)};
}
}) | concat<int>() |
op::submit();
// retry on error
// http.get(ex) |
// op::timeout(ex, 1s) |
// op::switch_on_error([](auto e) noexcept { return op::timer(ex, 1s); }) |
// op::repeat() |
// op::timeout(ex, 10s) |
// op::submit();
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <atomic>
#include <cassert>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include <folly/experimental/pushmi/executor/strand.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/o/via.h>
#include <folly/experimental/pushmi/examples/pool.h>
// // See https://github.com/executors/futures-impl
// #include <futures.h>
// #include <futures_static_thread_pool.h>
using namespace folly::pushmi::aliases;
struct inline_executor {
public:
friend bool operator==(
const inline_executor&,
const inline_executor&) noexcept {
return true;
}
friend bool operator!=(
const inline_executor&,
const inline_executor&) noexcept {
return false;
}
template <class Function>
void execute(Function f) const noexcept {
f();
}
// constexpr bool query(std::experimental::execution::oneway_t) {
// return true;
// }
// constexpr bool query(std::experimental::execution::twoway_t) {
// return false;
// }
// constexpr bool query(std::experimental::execution::single_t) {
// return true;
// }
};
// namespace p1054 {
// // A promise refers to a promise and is associated with a future,
// // either through type-erasure or through construction of an
// // underlying promise with an overload of make_promise_contract().
//
// // make_promise_contract() cannot be written to produce a lazy future.
// // The promise has to exist prior to .then() getting a continuation.
// // there must be a shared allocation to connect the promise and future.
// template <class T, class Executor>
// std::pair<
// std::experimental::standard_promise<T>,
// std::experimental::standard_future<T, std::decay_t<Executor>>>
// make_promise_contract(const Executor& e) {
// std::experimental::standard_promise<T> promise;
// auto ex = e;
// return {promise, promise.get_future(std::move(ex))};
// }
//
// template <class Executor, class Function, class Future>
// std::experimental::standard_future<
// std::result_of_t<
// Function(std::decay_t<typename std::decay_t<Future>::value_type>&&)>,
// std::decay_t<Executor>>
// then_execute(Executor&& e, Function&& f, Future&& pred) {
// using V = std::decay_t<typename std::decay_t<Future>::value_type>;
// using T = std::result_of_t<Function(V &&)>;
// auto pc = make_promise_contract<T>(e);
// auto p = std::get<0>(pc);
// auto r = std::get<1>(pc);
// ((Future &&) pred).then([e, p, f](V v) mutable {
// e.execute([p, f, v]() mutable { p.set_value(f(v)); });
// return 0;
// });
// return r;
// }
//
// } // namespace p1054
namespace p1055 {
template <class Executor, class Function, class Future>
auto then_execute(Executor&& e, Function&& f, Future&& pred) {
return pred | op::via(mi::strands(e)) |
op::transform([f](auto v) { return f(v); });
}
} // namespace p1055
int main() {
mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
p1055::then_execute(p.executor(), [](int v) { return v * 2; }, op::just(21)) |
op::get<int>;
p.stop();
p.wait();
// std::experimental::futures_static_thread_pool sp{
// std::max(1u, std::thread::hardware_concurrency())};
//
// auto pc = p1054::make_promise_contract<int>(inline_executor{});
// auto& pr = std::get<0>(pc);
// auto& r = std::get<1>(pc);
// auto f = p1054::then_execute(
// sp.executor(), [](int v) { return v * 2; }, std::move(r));
// pr.set_value(42);
// f.get();
//
// sp.stop();
// sp.wait();
std::cout << "OK" << std::endl;
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <atomic>
#include <cassert>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/examples/pool.h>
using namespace folly::pushmi::aliases;
// // See https://github.com/executors/futures-impl
// #include <futures.h>
//
// namespace p1054 {
// // A promise refers to a promise and is associated with a future,
// // either through type-erasure or through construction of an
// // underlying promise with an overload of make_promise_contract().
//
// // make_promise_contract() cannot be written to produce a lazy future.
// // the promise has to exist prior to .then() getting a continuation.
// // there must be a shared allocation to connect the promise and future.
// template <class T, class Executor>
// std::pair<
// std::experimental::standard_promise<T>,
// std::experimental::standard_future<T, std::decay_t<Executor>>>
// make_promise_contract(const Executor& e) {
// std::experimental::standard_promise<T> promise;
// auto ex = e;
// return {promise, promise.get_future(std::move(ex))};
// }
//
// template <class Executor, class Function>
// std::experimental::standard_future<
// std::result_of_t<std::decay_t<Function>()>,
// std::decay_t<Executor>>
// twoway_execute(Executor&& e, Function&& f) {
// using T = std::result_of_t<std::decay_t<Function>()>;
// auto pc = make_promise_contract<T>(e);
// auto p = std::get<0>(pc);
// auto r = std::get<1>(pc);
// e.execute([p, f]() mutable { p.set_value(f()); });
// return r;
// }
// } // namespace p1054
namespace p1055 {
template <class Executor, class Function>
auto twoway_execute(Executor&& e, Function&& f) {
return e.schedule() | op::transform([f](auto) { return f(); });
}
} // namespace p1055
int main() {
mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
p1055::twoway_execute(p.executor(), []() { return 42; }) | op::get<int>;
p.stop();
p.wait();
// std::experimental::static_thread_pool sp{
// std::max(1u, std::thread::hardware_concurrency())};
//
// p1054::twoway_execute(sp.executor(), []() { return 42; }).get();
//
// sp.stop();
// sp.wait();
std::cout << "OK" << std::endl;
}
This diff is collapsed.
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <folly/experimental/pushmi/detail/functional.h>
#include <folly/experimental/pushmi/executor/concepts.h>
#include <folly/experimental/pushmi/executor/primitives.h>
namespace folly {
namespace pushmi {
struct systemNowF {
auto operator()() { return std::chrono::system_clock::now(); }
};
struct priorityZeroF {
auto operator()(){ return 0; }
};
PUSHMI_TEMPLATE(class Exec)
(requires Strand<Exec>)
struct strandFactory {
Exec ex_;
strandFactory() = default;
explicit strandFactory(Exec ex) : ex_(std::move(ex)) {}
Exec operator()(){ return ex_; }
};
struct passDNF {
PUSHMI_TEMPLATE(class Data)
(requires TimeExecutor<Data>)
auto operator()(Data& in) const noexcept {
return ::folly::pushmi::now(in);
}
};
struct passDZF {
PUSHMI_TEMPLATE(class Data)
(requires ConstrainedExecutor<Data>)
auto operator()(Data& in) const noexcept {
return ::folly::pushmi::top(in);
}
};
template <class Fn>
struct on_executor_fn : overload_fn<Fn> {
constexpr on_executor_fn() = default;
using overload_fn<Fn>::overload_fn;
};
template <class Fn>
auto on_executor(Fn fn) -> on_executor_fn<Fn> {
return on_executor_fn<Fn>{std::move(fn)};
}
template <class Fn>
struct on_make_strand_fn : overload_fn<Fn> {
constexpr on_make_strand_fn() = default;
using overload_fn<Fn>::overload_fn;
};
template <class Fn>
auto on_make_strand(Fn fn) -> on_make_strand_fn<Fn> {
return on_make_strand_fn<Fn>{std::move(fn)};
}
template <class... Fns>
struct on_schedule_fn : overload_fn<Fns...> {
constexpr on_schedule_fn() = default;
using overload_fn<Fns...>::overload_fn;
};
template <class... Fns>
auto on_schedule(Fns... fns) -> on_schedule_fn<Fns...> {
return on_schedule_fn<Fns...>{std::move(fns)...};
}
template <class Fn>
struct on_now_fn : overload_fn<Fn> {
constexpr on_now_fn() = default;
using overload_fn<Fn>::overload_fn;
};
template <class Fn>
auto on_now(Fn fn) -> on_now_fn<Fn> {
return on_now_fn<Fn>{std::move(fn)};
}
} // namespace pushmi
} // namespace folly
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly {
namespace pushmi {
struct receiver_tag {
};
struct flow_receiver_tag : receiver_tag {
};
} // namespace pushmi
} // namespace folly
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment