Commit 19d7960b authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

change definition of Executor

Summary: convert all executors in pushmi and all executor concepts and usage in pushmi to use `schedule()`

Reviewed By: ericniebler

Differential Revision: D13463902

fbshipit-source-id: 01eac4fe0b5cd49afbd95118db769db59a3b968c
parent ce0fd826
...@@ -56,12 +56,6 @@ struct construct_deduced<many_sender>; ...@@ -56,12 +56,6 @@ struct construct_deduced<many_sender>;
template<> template<>
struct construct_deduced<flow_single_sender>; struct construct_deduced<flow_single_sender>;
template<>
struct construct_deduced<constrained_single_sender>;
template<>
struct construct_deduced<time_single_sender>;
template<> template<>
struct construct_deduced<flow_many_sender>; struct construct_deduced<flow_many_sender>;
...@@ -115,7 +109,6 @@ struct priorityZeroF { ...@@ -115,7 +109,6 @@ struct priorityZeroF {
struct passDVF { struct passDVF {
PUSHMI_TEMPLATE(class Data, class... VN) PUSHMI_TEMPLATE(class Data, class... VN)
// (requires True<>) //
(requires requires ( (requires requires (
set_value(std::declval<Data&>(), std::declval<VN>()...) set_value(std::declval<Data&>(), std::declval<VN>()...)
) && Receiver<Data>) ) && Receiver<Data>)
...@@ -127,8 +120,8 @@ struct passDVF { ...@@ -127,8 +120,8 @@ struct passDVF {
struct passDEF { struct passDEF {
PUSHMI_TEMPLATE(class E, class Data) PUSHMI_TEMPLATE(class E, class Data)
(requires ReceiveError<Data, E>) (requires ReceiveError<Data, E>)
void operator()(Data& out, E e) const noexcept { void operator()(Data& out, E&& e) const noexcept {
set_error(out, e); set_error(out, (E&&)e);
} }
}; };
...@@ -154,7 +147,7 @@ struct passDEXF { ...@@ -154,7 +147,7 @@ struct passDEXF {
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires Sender<Data>) (requires Sender<Data>)
auto operator()(Data& in) const noexcept { auto operator()(Data& in) const noexcept {
return executor(in); return get_executor(in);
} }
}; };
...@@ -171,7 +164,7 @@ struct passDSF { ...@@ -171,7 +164,7 @@ struct passDSF {
struct passDNF { struct passDNF {
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires TimeSender<Data>) (requires TimeExecutor<Data>)
auto operator()(Data& in) const noexcept { auto operator()(Data& in) const noexcept {
return ::folly::pushmi::now(in); return ::folly::pushmi::now(in);
} }
...@@ -179,7 +172,7 @@ struct passDNF { ...@@ -179,7 +172,7 @@ struct passDNF {
struct passDZF { struct passDZF {
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires ConstrainedSender<Data>) (requires ConstrainedExecutor<Data>)
auto operator()(Data& in) const noexcept { auto operator()(Data& in) const noexcept {
return ::folly::pushmi::top(in); return ::folly::pushmi::top(in);
} }
...@@ -309,6 +302,17 @@ auto on_executor(Fn fn) -> on_executor_fn<Fn> { ...@@ -309,6 +302,17 @@ auto on_executor(Fn fn) -> on_executor_fn<Fn> {
return on_executor_fn<Fn>{std::move(fn)}; return on_executor_fn<Fn>{std::move(fn)};
} }
template <class Fn>
struct on_make_strand_fn : overload_fn<Fn> {
constexpr on_make_strand_fn() = default;
using overload_fn<Fn>::overload_fn;
};
template <class Fn>
auto on_make_strand(Fn fn) -> on_make_strand_fn<Fn> {
return on_make_strand_fn<Fn>{std::move(fn)};
}
template <class... Fns> template <class... Fns>
struct on_submit_fn : overload_fn<Fns...> { struct on_submit_fn : overload_fn<Fns...> {
constexpr on_submit_fn() = default; constexpr on_submit_fn() = default;
...@@ -320,6 +324,17 @@ auto on_submit(Fns... fns) -> on_submit_fn<Fns...> { ...@@ -320,6 +324,17 @@ auto on_submit(Fns... fns) -> on_submit_fn<Fns...> {
return on_submit_fn<Fns...>{std::move(fns)...}; return on_submit_fn<Fns...>{std::move(fns)...};
} }
template <class... Fns>
struct on_schedule_fn : overload_fn<Fns...> {
constexpr on_schedule_fn() = default;
using overload_fn<Fns...>::overload_fn;
};
template <class... Fns>
auto on_schedule(Fns... fns) -> on_schedule_fn<Fns...> {
return on_schedule_fn<Fns...>{std::move(fns)...};
}
template <class Fn> template <class Fn>
struct on_now_fn : overload_fn<Fn> { struct on_now_fn : overload_fn<Fn> {
constexpr on_now_fn() = default; constexpr on_now_fn() = default;
......
This diff is collapsed.
This diff is collapsed.
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
// disable buggy compatibility warning about "requires" and "concept" being // disable buggy compatibility warning about "requires" and "concept" being
// C++20 keywords. // C++20 keywords.
#if defined(__clang__) && not defined(__APPLE__) #if defined(__clang__) && !defined(__APPLE__)
#define PUSHMI_PP_IGNORE_CXX2A_COMPAT_BEGIN \ #define PUSHMI_PP_IGNORE_CXX2A_COMPAT_BEGIN \
_Pragma("GCC diagnostic push") \ _Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wunknown-pragmas\"") \ _Pragma("GCC diagnostic ignored \"-Wunknown-pragmas\"") \
...@@ -419,9 +419,7 @@ PUSHMI_PP_IGNORE_CXX2A_COMPAT_BEGIN ...@@ -419,9 +419,7 @@ PUSHMI_PP_IGNORE_CXX2A_COMPAT_BEGIN
#else #else
#define PUSHMI_BROKEN_SUBSUMPTION(...) __VA_ARGS__ #define PUSHMI_BROKEN_SUBSUMPTION(...) __VA_ARGS__
#define PUSHMI_TYPE_CONSTRAINT(...) class #define PUSHMI_TYPE_CONSTRAINT(...) class
// bool() is used to prevent 'error: pasting "PUSHMI_PP_REQUIRES_PROBE_" and #define PUSHMI_EXP(...) decltype(::folly::pushmi::expAnd(__VA_ARGS__)){}
// "::" does not give a valid preprocessing token'
#define PUSHMI_EXP(...) bool(::folly::pushmi::expAnd(__VA_ARGS__))
#define PUSHMI_AND , #define PUSHMI_AND ,
#endif #endif
...@@ -484,7 +482,7 @@ struct Not { ...@@ -484,7 +482,7 @@ struct Not {
template <class T, class U> template <class T, class U>
struct And { struct And {
explicit constexpr operator bool() const noexcept { explicit constexpr operator bool() const noexcept {
return (bool)std::conditional_t<(bool)T{}, U, std::false_type>{}; return static_cast<bool>(std::conditional_t<static_cast<bool>(T{}), U, std::false_type>{});
} }
PUSHMI_TEMPLATE(class This = And, bool B) // PUSHMI_TEMPLATE(class This = And, bool B) //
(requires B == static_cast<bool>(This{})) // (requires B == static_cast<bool>(This{})) //
...@@ -507,7 +505,7 @@ struct And { ...@@ -507,7 +505,7 @@ struct And {
template <class T, class U> template <class T, class U>
struct Or { struct Or {
explicit constexpr operator bool() const noexcept { explicit constexpr operator bool() const noexcept {
return (bool)std::conditional_t<(bool)T{}, std::true_type, U>{}; return static_cast<bool>(std::conditional_t<static_cast<bool>(T{}), std::true_type, U>{});
} }
PUSHMI_TEMPLATE(class This = Or, bool B) // PUSHMI_TEMPLATE(class This = Or, bool B) //
(requires B == static_cast<bool>(This{})) // (requires B == static_cast<bool>(This{})) //
......
...@@ -28,16 +28,20 @@ PUSHMI_INLINE_VAR constexpr struct invoke_fn { ...@@ -28,16 +28,20 @@ PUSHMI_INLINE_VAR constexpr struct invoke_fn {
using mem_fn_t = decltype(std::mem_fn(std::declval<F>())); using mem_fn_t = decltype(std::mem_fn(std::declval<F>()));
public: public:
template <class F, class... As> PUSHMI_TEMPLATE(class F, class... As)
(requires //
requires(
std::declval<F>()(std::declval<As>()...))) //
auto operator()(F&& f, As&&... as) const auto operator()(F&& f, As&&... as) const
noexcept(noexcept(((F &&) f)((As &&) as...))) noexcept(noexcept(((F &&) f)((As &&) as...))) {
-> decltype(((F &&) f)((As &&) as...)) {
return ((F &&) f)((As &&) as...); return ((F &&) f)((As &&) as...);
} }
template <class F, class... As> PUSHMI_TEMPLATE(class F, class... As)
(requires //
requires(
std::mem_fn(std::declval<F>())(std::declval<As>()...))) //
auto operator()(F&& f, As&&... as) const auto operator()(F&& f, As&&... as) const
noexcept(noexcept(std::declval<mem_fn_t<F>>()((As &&) as...))) noexcept(noexcept(std::declval<mem_fn_t<F>>()((As &&) as...))) {
-> decltype(std::mem_fn(f)((As &&) as...)) {
return std::mem_fn(f)((As &&) as...); return std::mem_fn(f)((As &&) as...);
} }
} invoke{}; } invoke{};
......
...@@ -32,10 +32,11 @@ struct no_fail_fn { ...@@ -32,10 +32,11 @@ struct no_fail_fn {
}; };
template <class In> template <class In>
struct out_impl { struct out_impl {
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class SIn, class Out)
(requires Receiver<Out>)auto operator()(Out out) const { (requires Receiver<Out>) //
return ::folly::pushmi::detail::receiver_from_fn<In>()( void operator()(SIn&& in, Out out) const {
std::move(out), ::folly::pushmi::on_error(on_error_impl{})); submit((In&&)in, ::folly::pushmi::detail::receiver_from_fn<In>()(
std::move(out), ::folly::pushmi::on_error(on_error_impl{})));
} }
}; };
struct in_impl { struct in_impl {
...@@ -43,7 +44,7 @@ struct no_fail_fn { ...@@ -43,7 +44,7 @@ struct no_fail_fn {
(requires Sender<In>)auto operator()(In in) const { (requires Sender<In>)auto operator()(In in) const {
return ::folly::pushmi::detail::sender_from( return ::folly::pushmi::detail::sender_from(
std::move(in), std::move(in),
::folly::pushmi::detail::submit_transform_out<In>(out_impl<In>{})); out_impl<In>{});
} }
}; };
......
...@@ -33,7 +33,7 @@ auto get_setting() { ...@@ -33,7 +33,7 @@ auto get_setting() {
if (setting_exists) { if (setting_exists) {
op::just(42) | op::submit(out); op::just(42) | op::submit(out);
} else { } else {
op::empty<int>() | op::submit(out); op::empty() | op::submit(out);
} }
}); });
} }
...@@ -63,7 +63,7 @@ int main() { ...@@ -63,7 +63,7 @@ int main() {
op::just(42) | op::transform([](int i) { op::just(42) | op::transform([](int i) {
if (i < 42) { if (i < 42) {
return mi::any_single_sender<std::exception_ptr, std::string>{ return mi::any_single_sender<std::exception_ptr, std::string>{
op::empty<std::string>()}; op::empty()};
} }
return mi::any_single_sender<std::exception_ptr, std::string>{ return mi::any_single_sender<std::exception_ptr, std::string>{
op::just(std::to_string(i))}; op::just(std::to_string(i))};
......
...@@ -74,7 +74,7 @@ int main() { ...@@ -74,7 +74,7 @@ int main() {
op::just(42) | op::transform([](auto v) { op::just(42) | op::transform([](auto v) {
using r_t = mi::any_single_sender<std::exception_ptr, int>; using r_t = mi::any_single_sender<std::exception_ptr, int>;
if (v < 40) { if (v < 40) {
return r_t{op::error<int>(std::exception_ptr{})}; return r_t{op::error(std::exception_ptr{})};
} else { } else {
return r_t{op::just(v)}; return r_t{op::just(v)};
} }
......
This diff is collapsed.
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/experimental/pushmi/flow_receiver.h> #include <folly/experimental/pushmi/flow_receiver.h>
#include <folly/experimental/pushmi/executor.h> #include <folly/experimental/pushmi/executor.h>
#include <folly/experimental/pushmi/trampoline.h> #include <folly/experimental/pushmi/trampoline.h>
#include <type_traits>
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
...@@ -26,7 +27,7 @@ template <class PE, class PV, class E, class... VN> ...@@ -26,7 +27,7 @@ template <class PE, class PV, class E, class... VN>
class any_flow_many_sender { class any_flow_many_sender {
union data { union data {
void* pobj_ = nullptr; void* pobj_ = nullptr;
char buffer_[sizeof(std::tuple<VN...>)]; // can hold a V in-situ std::aligned_union_t<0, std::tuple<VN...>> buffer_;
} data_{}; } data_{};
template <class Wrapped> template <class Wrapped>
static constexpr bool insitu() { static constexpr bool insitu() {
...@@ -35,10 +36,8 @@ class any_flow_many_sender { ...@@ -35,10 +36,8 @@ class any_flow_many_sender {
} }
struct vtable { struct vtable {
static void s_op(data&, data*) {} static void s_op(data&, data*) {}
static any_executor<E> s_executor(data&) { return {}; }
static void s_submit(data&, any_flow_receiver<PE, PV, E, VN...>) {} static void s_submit(data&, any_flow_receiver<PE, PV, E, VN...>) {}
void (*op_)(data&, data*) = vtable::s_op; void (*op_)(data&, data*) = vtable::s_op;
any_executor<E> (*executor_)(data&) = vtable::s_executor;
void (*submit_)(data&, any_flow_receiver<PE, PV, E, VN...>) = vtable::s_submit; void (*submit_)(data&, any_flow_receiver<PE, PV, E, VN...>) = vtable::s_submit;
}; };
static constexpr vtable const noop_ {}; static constexpr vtable const noop_ {};
...@@ -51,16 +50,12 @@ class any_flow_many_sender { ...@@ -51,16 +50,12 @@ class any_flow_many_sender {
dst->pobj_ = std::exchange(src.pobj_, nullptr); dst->pobj_ = std::exchange(src.pobj_, nullptr);
delete static_cast<Wrapped const*>(src.pobj_); delete static_cast<Wrapped const*>(src.pobj_);
} }
static any_executor<E> executor(data& src) {
return any_executor<E>{
::folly::pushmi::executor(*static_cast<Wrapped*>(src.pobj_))};
}
static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) { static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) {
::folly::pushmi::submit( ::folly::pushmi::submit(
*static_cast<Wrapped*>(src.pobj_), std::move(out)); *static_cast<Wrapped*>(src.pobj_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
data_.pobj_ = new Wrapped(std::move(obj)); data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
...@@ -70,21 +65,17 @@ class any_flow_many_sender { ...@@ -70,21 +65,17 @@ class any_flow_many_sender {
struct s { struct s {
static void op(data& src, data* dst) { static void op(data& src, data* dst) {
if (dst) if (dst)
new (dst->buffer_) Wrapped( new (&dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_))); std::move(*static_cast<Wrapped*>((void*)&src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped(); static_cast<Wrapped const*>((void*)&src.buffer_)->~Wrapped();
}
static any_executor<E> executor(data& src) {
return any_executor<E>{::folly::pushmi::executor(
*static_cast<Wrapped*>((void*)src.buffer_))};
} }
static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) { static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) {
::folly::pushmi::submit( ::folly::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out)); *static_cast<Wrapped*>((void*)&src.buffer_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
new (data_.buffer_) Wrapped(std::move(obj)); new (&data_.buffer_) Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
template <class T, class U = std::decay_t<T>> template <class T, class U = std::decay_t<T>>
...@@ -111,11 +102,10 @@ class any_flow_many_sender { ...@@ -111,11 +102,10 @@ class any_flow_many_sender {
new ((void*)this) any_flow_many_sender(std::move(that)); new ((void*)this) any_flow_many_sender(std::move(that));
return *this; return *this;
} }
any_executor<E> executor() { PUSHMI_TEMPLATE(class Out)
return vptr_->executor_(data_); (requires ReceiveError<Out, E>&& ReceiveValue<Out, VN...>) //
} void submit(Out&& out) {
void submit(any_flow_receiver<PE, PV, E, VN...> out) { vptr_->submit_(data_, any_flow_receiver<PE, PV, E, VN...>{(Out &&) out});
vptr_->submit_(data_, std::move(out));
} }
}; };
...@@ -124,10 +114,9 @@ template <class PE, class PV, class E, class... VN> ...@@ -124,10 +114,9 @@ template <class PE, class PV, class E, class... VN>
constexpr typename any_flow_many_sender<PE, PV, E, VN...>::vtable const constexpr typename any_flow_many_sender<PE, PV, E, VN...>::vtable const
any_flow_many_sender<PE, PV, E, VN...>::noop_; any_flow_many_sender<PE, PV, E, VN...>::noop_;
template <class SF, class EXF> template <class SF>
class flow_many_sender<SF, EXF> { class flow_many_sender<SF> {
SF sf_; SF sf_;
EXF exf_;
public: public:
using properties = property_set<is_sender<>, is_flow<>, is_many<>>; using properties = property_set<is_sender<>, is_flow<>, is_many<>>;
...@@ -135,10 +124,7 @@ class flow_many_sender<SF, EXF> { ...@@ -135,10 +124,7 @@ class flow_many_sender<SF, EXF> {
constexpr flow_many_sender() = default; constexpr flow_many_sender() = default;
constexpr explicit flow_many_sender(SF sf) constexpr explicit flow_many_sender(SF sf)
: sf_(std::move(sf)) {} : sf_(std::move(sf)) {}
constexpr flow_many_sender(SF sf, EXF exf)
: sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() { return exf_(); }
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires FlowReceiver<Out> && Invocable<SF&, Out>) (requires FlowReceiver<Out> && Invocable<SF&, Out>)
void submit(Out out) { void submit(Out out) {
...@@ -146,11 +132,10 @@ class flow_many_sender<SF, EXF> { ...@@ -146,11 +132,10 @@ class flow_many_sender<SF, EXF> {
} }
}; };
template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>, is_flow<>>) Data, class DSF, class DEXF> template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>, is_flow<>>) Data, class DSF>
class flow_many_sender<Data, DSF, DEXF> { class flow_many_sender<Data, DSF> {
Data data_; Data data_;
DSF sf_; DSF sf_;
DEXF exf_;
public: public:
using properties = property_set_insert_t<properties_t<Data>, property_set<is_sender<>, is_flow<>, is_many<>>>; using properties = property_set_insert_t<properties_t<Data>, property_set<is_sender<>, is_flow<>, is_many<>>>;
...@@ -160,10 +145,7 @@ class flow_many_sender<Data, DSF, DEXF> { ...@@ -160,10 +145,7 @@ class flow_many_sender<Data, DSF, DEXF> {
: data_(std::move(data)) {} : data_(std::move(data)) {}
constexpr flow_many_sender(Data data, DSF sf) constexpr flow_many_sender(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {} : data_(std::move(data)), sf_(std::move(sf)) {}
constexpr flow_many_sender(Data data, DSF sf, DEXF exf)
: data_(std::move(data)), sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() { return exf_(data_); }
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP(lazy::FlowReceiver<Out> PUSHMI_AND (requires PUSHMI_EXP(lazy::FlowReceiver<Out> PUSHMI_AND
lazy::Invocable<DSF&, Data&, Out>)) lazy::Invocable<DSF&, Data&, Out>))
...@@ -174,7 +156,7 @@ class flow_many_sender<Data, DSF, DEXF> { ...@@ -174,7 +156,7 @@ class flow_many_sender<Data, DSF, DEXF> {
template <> template <>
class flow_many_sender<> class flow_many_sender<>
: public flow_many_sender<ignoreSF, trampolineEXF> { : public flow_many_sender<ignoreSF> {
public: public:
flow_many_sender() = default; flow_many_sender() = default;
}; };
...@@ -183,59 +165,41 @@ public: ...@@ -183,59 +165,41 @@ public:
// make_flow_many_sender // make_flow_many_sender
PUSHMI_INLINE_VAR constexpr struct make_flow_many_sender_fn { PUSHMI_INLINE_VAR constexpr struct make_flow_many_sender_fn {
inline auto operator()() const { inline auto operator()() const {
return flow_many_sender<ignoreSF, trampolineEXF>{}; return flow_many_sender<ignoreSF>{};
} }
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>)) (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
auto operator()(SF sf) const { auto operator()(SF sf) const {
return flow_many_sender<SF, trampolineEXF>{std::move(sf)}; return flow_many_sender<SF>{std::move(sf)};
}
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<> && Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
auto operator()(SF sf, EXF exf) const {
return flow_many_sender<SF, EXF>{std::move(sf), std::move(exf)};
} }
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_many<>, is_flow<>>) (requires True<> && Sender<Data, is_many<>, is_flow<>>)
auto operator()(Data d) const { auto operator()(Data d) const {
return flow_many_sender<Data, passDSF, passDEXF>{std::move(d)}; return flow_many_sender<Data, passDSF>{std::move(d)};
} }
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>, is_flow<>>) (requires Sender<Data, is_many<>, is_flow<>>)
auto operator()(Data d, DSF sf) const { auto operator()(Data d, DSF sf) const {
return flow_many_sender<Data, DSF, passDEXF>{std::move(d), std::move(sf)}; return flow_many_sender<Data, DSF>{std::move(d), std::move(sf)};
}
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_many<>, is_flow<>> && Invocable<DEXF&, Data&>)
auto operator()(Data d, DSF sf, DEXF exf) const {
return flow_many_sender<Data, DSF, DEXF>{std::move(d), std::move(sf), std::move(exf)};
} }
} const make_flow_many_sender {}; } const make_flow_many_sender {};
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// deduction guides // deduction guides
#if __cpp_deduction_guides >= 201703 #if __cpp_deduction_guides >= 201703
flow_many_sender() -> flow_many_sender<ignoreSF, trampolineEXF>; flow_many_sender() -> flow_many_sender<ignoreSF>;
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>)) (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
flow_many_sender(SF) -> flow_many_sender<SF, trampolineEXF>; flow_many_sender(SF) -> flow_many_sender<SF>;
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<> && Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
flow_many_sender(SF, EXF) -> flow_many_sender<SF, EXF>;
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_many<>, is_flow<>>) (requires True<> && Sender<Data, is_many<>, is_flow<>>)
flow_many_sender(Data) -> flow_many_sender<Data, passDSF, passDEXF>; flow_many_sender(Data) -> flow_many_sender<Data, passDSF>;
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>, is_flow<>>) (requires Sender<Data, is_many<>, is_flow<>>)
flow_many_sender(Data, DSF) -> flow_many_sender<Data, DSF, passDEXF>; flow_many_sender(Data, DSF) -> flow_many_sender<Data, DSF>;
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_many<>, is_flow<>> && Invocable<DEXF&, Data&>)
flow_many_sender(Data, DSF, DEXF) -> flow_many_sender<Data, DSF, DEXF>;
#endif #endif
template<> template<>
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#pragma once #pragma once
#include <folly/experimental/pushmi/receiver.h> #include <folly/experimental/pushmi/receiver.h>
#include <type_traits>
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
...@@ -26,7 +27,7 @@ class any_flow_receiver { ...@@ -26,7 +27,7 @@ class any_flow_receiver {
bool started_ = false; bool started_ = false;
union data { union data {
void* pobj_ = nullptr; void* pobj_ = nullptr;
char buffer_[sizeof(std::tuple<VN...>)]; // can hold V in-situ std::aligned_union_t<0, std::tuple<VN...>> buffer_;
} data_{}; } data_{};
template <class Wrapped> template <class Wrapped>
static constexpr bool insitu() { static constexpr bool insitu() {
...@@ -77,25 +78,25 @@ class any_flow_receiver { ...@@ -77,25 +78,25 @@ class any_flow_receiver {
struct s { struct s {
static void op(data& src, data* dst) { static void op(data& src, data* dst) {
if (dst) if (dst)
new (dst->buffer_) Wrapped( new (&dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_))); std::move(*static_cast<Wrapped*>((void*)&src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped(); static_cast<Wrapped const*>((void*)&src.buffer_)->~Wrapped();
} }
static void done(data& src) { static void done(data& src) {
set_done(*static_cast<Wrapped*>((void*)src.buffer_)); set_done(*static_cast<Wrapped*>((void*)&src.buffer_));
} }
static void error(data& src, E e) noexcept { static void error(data& src, E e) noexcept {
set_error(*static_cast<Wrapped*>((void*)src.buffer_), std::move(e)); set_error(*static_cast<Wrapped*>((void*)&src.buffer_), std::move(e));
} }
static void value(data& src, VN... vn) { static void value(data& src, VN... vn) {
set_value(*static_cast<Wrapped*>((void*)src.buffer_), std::move(vn)...); set_value(*static_cast<Wrapped*>((void*)&src.buffer_), std::move(vn)...);
} }
static void starting(data& src, any_receiver<PE, PV> up) { static void starting(data& src, any_receiver<PE, PV> up) {
set_starting(*static_cast<Wrapped*>((void*)src.buffer_), std::move(up)); set_starting(*static_cast<Wrapped*>((void*)&src.buffer_), std::move(up));
} }
}; };
static const vtable vtbl{s::op, s::done, s::error, s::value, s::starting}; static const vtable vtbl{s::op, s::done, s::error, s::value, s::starting};
new (data_.buffer_) Wrapped(std::move(obj)); new (&data_.buffer_) Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
template <class T, class U = std::decay_t<T>> template <class T, class U = std::decay_t<T>>
...@@ -128,11 +129,12 @@ public: ...@@ -128,11 +129,12 @@ public:
if (done_){ return; } if (done_){ return; }
vptr_->value_(data_, std::move(vn)...); vptr_->value_(data_, std::move(vn)...);
} }
void error(E e) noexcept { template<class A>
void error(A&& e) noexcept {
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
if (done_){ return; } if (done_){ return; }
done_ = true; done_ = true;
vptr_->error_(data_, std::move(e)); vptr_->error_(data_, E{(A&&)e});
} }
void done() { void done() {
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
...@@ -141,10 +143,12 @@ public: ...@@ -141,10 +143,12 @@ public:
vptr_->done_(data_); vptr_->done_(data_);
} }
void starting(any_receiver<PE, PV> up) { PUSHMI_TEMPLATE(class Up)
(requires ReceiveValue<Up, PV> && ReceiveError<Up, PE>)
void starting(Up&& up) {
if (started_) {std::terminate();} if (started_) {std::terminate();}
started_ = true; started_ = true;
vptr_->starting_(data_, std::move(up)); vptr_->starting_(data_, any_receiver<PE, PV>{(Up&&)up});
} }
}; };
...@@ -194,20 +198,20 @@ class flow_receiver<VF, EF, DF, StrtF> { ...@@ -194,20 +198,20 @@ class flow_receiver<VF, EF, DF, StrtF> {
df_(std::move(df)), df_(std::move(df)),
strtf_(std::move(strtf)) {} strtf_(std::move(strtf)) {}
PUSHMI_TEMPLATE (class V) PUSHMI_TEMPLATE (class V)
(requires Invocable<VF&, V>) (requires Invocable<VF&, V>) //
void value(V&& v) { void value(V&& v) {
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
if (done_){ return; } if (done_){ return; }
nf_((V&&) v); nf_((V&&) v);
} }
PUSHMI_TEMPLATE (class E) PUSHMI_TEMPLATE (class E)
(requires Invocable<EF&, E>) (requires Invocable<EF&, E&&>) //
void error(E e) noexcept { void error(E&& e) noexcept {
static_assert(NothrowInvocable<EF&, E>, "error function must be noexcept"); static_assert(NothrowInvocable<EF&, E&&>, "error function must be noexcept");
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
if (done_){ return; } if (done_){ return; }
done_ = true; done_ = true;
ef_(std::move(e)); ef_((E&&)e);
} }
void done() { void done() {
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
...@@ -273,25 +277,27 @@ class flow_receiver<Data, DVF, DEF, DDF, DStrtF> { ...@@ -273,25 +277,27 @@ class flow_receiver<Data, DVF, DEF, DDF, DStrtF> {
Data& data() { return data_; } Data& data() { return data_; }
PUSHMI_TEMPLATE (class V) PUSHMI_TEMPLATE (class... VN)
(requires Invocable<DVF&, Data&, V>) (requires Invocable<DVF&, Data&, VN...>) //
void value(V&& v) { void value(VN&&... vn) {
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
if (done_){ return; } if (done_){ return; }
nf_(data_, (V&&) v); nf_(data_, (VN&&) vn...);
} }
PUSHMI_TEMPLATE (class E) PUSHMI_TEMPLATE (class E)
(requires Invocable<DEF&, Data&, E>) (requires Invocable<DEF&, Data&, E&&>) //
void error(E&& e) noexcept { void error(E&& e) noexcept {
static_assert( static_assert(
NothrowInvocable<DEF&, Data&, E>, "error function must be noexcept"); NothrowInvocable<DEF&, Data&, E&&>, "error function must be noexcept");
if (!started_) {std::terminate();} if (!started_) {std::terminate();}
if (done_){ return; } if (done_){ return; }
done_ = true; done_ = true;
ef_(data_, (E&&) e); ef_(data_, (E&&) e);
} }
void done() { void done() {
if (!started_) {std::terminate();} if (!started_) {
std::terminate();
}
if (done_){ return; } if (done_){ return; }
done_ = true; done_ = true;
df_(data_); df_(data_);
...@@ -340,9 +346,9 @@ PUSHMI_INLINE_VAR constexpr struct make_flow_receiver_fn { ...@@ -340,9 +346,9 @@ PUSHMI_INLINE_VAR constexpr struct make_flow_receiver_fn {
return flow_receiver<ignoreVF, on_error_fn<EFN...>, ignoreDF, ignoreStrtF>{ return flow_receiver<ignoreVF, on_error_fn<EFN...>, ignoreDF, ignoreStrtF>{
std::move(ef)}; std::move(ef)};
} }
template <class... DFN> template <class DF>
auto operator()(on_done_fn<DFN...> df) const { auto operator()(on_done_fn<DF> df) const {
return flow_receiver<ignoreVF, abortEF, on_done_fn<DFN...>, ignoreStrtF>{ return flow_receiver<ignoreVF, abortEF, on_done_fn<DF>, ignoreStrtF>{
std::move(df)}; std::move(df)};
} }
PUSHMI_TEMPLATE (class VF, class EF) PUSHMI_TEMPLATE (class VF, class EF)
...@@ -405,11 +411,11 @@ PUSHMI_INLINE_VAR constexpr struct make_flow_receiver_fn { ...@@ -405,11 +411,11 @@ PUSHMI_INLINE_VAR constexpr struct make_flow_receiver_fn {
return flow_receiver<Data, passDVF, on_error_fn<DEFN...>, passDDF, passDStrtF>{ return flow_receiver<Data, passDVF, on_error_fn<DEFN...>, passDDF, passDStrtF>{
std::move(d), std::move(ef)}; std::move(d), std::move(ef)};
} }
PUSHMI_TEMPLATE(class Data, class... DDFN) PUSHMI_TEMPLATE(class Data, class DDF)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
lazy::FlowReceiverDataArg<Data>)) lazy::FlowReceiverDataArg<Data>))
auto operator()(Data d, on_done_fn<DDFN...> df) const { auto operator()(Data d, on_done_fn<DDF> df) const {
return flow_receiver<Data, passDVF, passDEF, on_done_fn<DDFN...>, passDStrtF>{ return flow_receiver<Data, passDVF, passDEF, on_done_fn<DDF>, passDStrtF>{
std::move(d), std::move(df)}; std::move(d), std::move(df)};
} }
PUSHMI_TEMPLATE(class Data, class DVF, class DEF) PUSHMI_TEMPLATE(class Data, class DVF, class DEF)
...@@ -463,9 +469,9 @@ template <class... EFN> ...@@ -463,9 +469,9 @@ template <class... EFN>
flow_receiver(on_error_fn<EFN...>) -> flow_receiver(on_error_fn<EFN...>) ->
flow_receiver<ignoreVF, on_error_fn<EFN...>, ignoreDF, ignoreStrtF>; flow_receiver<ignoreVF, on_error_fn<EFN...>, ignoreDF, ignoreStrtF>;
template <class... DFN> template <class DF>
flow_receiver(on_done_fn<DFN...>) -> flow_receiver(on_done_fn<DF>) ->
flow_receiver<ignoreVF, abortEF, on_done_fn<DFN...>, ignoreStrtF>; flow_receiver<ignoreVF, abortEF, on_done_fn<DF>, ignoreStrtF>;
PUSHMI_TEMPLATE(class VF, class EF) PUSHMI_TEMPLATE(class VF, class EF)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
...@@ -521,19 +527,11 @@ PUSHMI_TEMPLATE(class Data, class... DEFN) ...@@ -521,19 +527,11 @@ PUSHMI_TEMPLATE(class Data, class... DEFN)
flow_receiver(Data d, on_error_fn<DEFN...>) -> flow_receiver(Data d, on_error_fn<DEFN...>) ->
flow_receiver<Data, passDVF, on_error_fn<DEFN...>, passDDF, passDStrtF>; flow_receiver<Data, passDVF, on_error_fn<DEFN...>, passDDF, passDStrtF>;
PUSHMI_TEMPLATE(class Data, class... DDFN)
(requires PUSHMI_EXP(
lazy::FlowReceiverDataArg<Data>))
flow_receiver(Data d, on_done_fn<DDFN...>) ->
flow_receiver<Data, passDVF, passDEF, on_done_fn<DDFN...>, passDStrtF>;
PUSHMI_TEMPLATE(class Data, class DDF) PUSHMI_TEMPLATE(class Data, class DDF)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
lazy::True<> PUSHMI_AND lazy::FlowReceiverDataArg<Data>))
lazy::FlowReceiverDataArg<Data> PUSHMI_AND flow_receiver(Data d, on_done_fn<DDF>) ->
lazy::Invocable<DDF&, Data&>)) flow_receiver<Data, passDVF, passDEF, on_done_fn<DDF>, passDStrtF>;
flow_receiver(Data d, DDF) ->
flow_receiver<Data, passDVF, passDEF, DDF, passDStrtF>;
PUSHMI_TEMPLATE(class Data, class DVF, class DEF) PUSHMI_TEMPLATE(class Data, class DVF, class DEF)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/experimental/pushmi/flow_receiver.h> #include <folly/experimental/pushmi/flow_receiver.h>
#include <folly/experimental/pushmi/executor.h> #include <folly/experimental/pushmi/executor.h>
#include <folly/experimental/pushmi/trampoline.h> #include <folly/experimental/pushmi/trampoline.h>
#include <type_traits>
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
...@@ -26,7 +27,8 @@ template <class PE, class E, class... VN> ...@@ -26,7 +27,8 @@ template <class PE, class E, class... VN>
class any_flow_single_sender { class any_flow_single_sender {
union data { union data {
void* pobj_ = nullptr; void* pobj_ = nullptr;
char buffer_[sizeof(std::tuple<VN...>)]; // can hold a V in-situ std::aligned_storage_t<
sizeof(std::tuple<VN...>), alignof(std::tuple<VN...>)> buffer_;
} data_{}; } data_{};
template <class Wrapped> template <class Wrapped>
static constexpr bool insitu() { static constexpr bool insitu() {
...@@ -35,10 +37,8 @@ class any_flow_single_sender { ...@@ -35,10 +37,8 @@ class any_flow_single_sender {
} }
struct vtable { struct vtable {
static void s_op(data&, data*) {} static void s_op(data&, data*) {}
static any_executor<E> s_executor(data&) { return {}; }
static void s_submit(data&, any_flow_receiver<PE, std::ptrdiff_t, E, VN...>) {} static void s_submit(data&, any_flow_receiver<PE, std::ptrdiff_t, E, VN...>) {}
void (*op_)(data&, data*) = vtable::s_op; void (*op_)(data&, data*) = vtable::s_op;
any_executor<E> (*executor_)(data&) = vtable::s_executor;
void (*submit_)(data&, any_flow_receiver<PE, std::ptrdiff_t, E, VN...>) = void (*submit_)(data&, any_flow_receiver<PE, std::ptrdiff_t, E, VN...>) =
vtable::s_submit; vtable::s_submit;
}; };
...@@ -53,10 +53,6 @@ class any_flow_single_sender { ...@@ -53,10 +53,6 @@ class any_flow_single_sender {
dst->pobj_ = std::exchange(src.pobj_, nullptr); dst->pobj_ = std::exchange(src.pobj_, nullptr);
delete static_cast<Wrapped const*>(src.pobj_); delete static_cast<Wrapped const*>(src.pobj_);
} }
static any_executor<E> executor(data& src) {
return any_executor<E>{
::folly::pushmi::executor(*static_cast<Wrapped*>(src.pobj_))};
}
static void submit( static void submit(
data& src, data& src,
any_flow_receiver<PE, std::ptrdiff_t, E, VN...> out) { any_flow_receiver<PE, std::ptrdiff_t, E, VN...> out) {
...@@ -64,7 +60,7 @@ class any_flow_single_sender { ...@@ -64,7 +60,7 @@ class any_flow_single_sender {
*static_cast<Wrapped*>(src.pobj_), std::move(out)); *static_cast<Wrapped*>(src.pobj_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
data_.pobj_ = new Wrapped(std::move(obj)); data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
...@@ -74,23 +70,19 @@ class any_flow_single_sender { ...@@ -74,23 +70,19 @@ class any_flow_single_sender {
struct s { struct s {
static void op(data& src, data* dst) { static void op(data& src, data* dst) {
if (dst) if (dst)
new (dst->buffer_) new (&dst->buffer_)
Wrapped(std::move(*static_cast<Wrapped*>((void*)src.buffer_))); Wrapped(std::move(*static_cast<Wrapped*>((void*)&src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped(); static_cast<Wrapped const*>((void*)&src.buffer_)->~Wrapped();
}
static any_executor<E> executor(data& src) {
return any_executor<E>{::folly::pushmi::executor(
*static_cast<Wrapped*>((void*)src.buffer_))};
} }
static void submit( static void submit(
data& src, data& src,
any_flow_receiver<PE, std::ptrdiff_t, E, VN...> out) { any_flow_receiver<PE, std::ptrdiff_t, E, VN...> out) {
::folly::pushmi::submit( ::folly::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out)); *static_cast<Wrapped*>((void*)&src.buffer_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
new (data_.buffer_) Wrapped(std::move(obj)); new (&data_.buffer_) Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
template <class T, class U = std::decay_t<T>> template <class T, class U = std::decay_t<T>>
...@@ -117,11 +109,10 @@ class any_flow_single_sender { ...@@ -117,11 +109,10 @@ class any_flow_single_sender {
new ((void*)this) any_flow_single_sender(std::move(that)); new ((void*)this) any_flow_single_sender(std::move(that));
return *this; return *this;
} }
any_executor<E> executor() { PUSHMI_TEMPLATE(class Out)
return vptr_->executor_(data_); (requires ReceiveError<Out, E>&& ReceiveValue<Out, VN...>) //
} void submit(Out&& out) {
void submit(any_flow_receiver<PE, std::ptrdiff_t, E, VN...> out) { vptr_->submit_(data_, any_flow_receiver<PE, std::ptrdiff_t, E, VN...>{(Out &&) out});
vptr_->submit_(data_, std::move(out));
} }
}; };
...@@ -130,10 +121,9 @@ template <class PE, class E, class... VN> ...@@ -130,10 +121,9 @@ template <class PE, class E, class... VN>
constexpr typename any_flow_single_sender<PE, E, VN...>::vtable const constexpr typename any_flow_single_sender<PE, E, VN...>::vtable const
any_flow_single_sender<PE, E, VN...>::noop_; any_flow_single_sender<PE, E, VN...>::noop_;
template <class SF, class EXF> template <class SF>
class flow_single_sender<SF, EXF> { class flow_single_sender<SF> {
SF sf_; SF sf_;
EXF exf_;
public: public:
using properties = property_set<is_sender<>, is_flow<>, is_single<>>; using properties = property_set<is_sender<>, is_flow<>, is_single<>>;
...@@ -141,10 +131,7 @@ class flow_single_sender<SF, EXF> { ...@@ -141,10 +131,7 @@ class flow_single_sender<SF, EXF> {
constexpr flow_single_sender() = default; constexpr flow_single_sender() = default;
constexpr explicit flow_single_sender(SF sf) constexpr explicit flow_single_sender(SF sf)
: sf_(std::move(sf)) {} : sf_(std::move(sf)) {}
constexpr flow_single_sender(SF sf, EXF exf)
: sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() { return exf_(); }
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires Receiver<Out> && Invocable<SF&, Out>) (requires Receiver<Out> && Invocable<SF&, Out>)
void submit(Out out) { void submit(Out out) {
...@@ -154,12 +141,10 @@ class flow_single_sender<SF, EXF> { ...@@ -154,12 +141,10 @@ class flow_single_sender<SF, EXF> {
template < template <
PUSHMI_TYPE_CONSTRAINT(Sender<is_single<>, is_flow<>>) Data, PUSHMI_TYPE_CONSTRAINT(Sender<is_single<>, is_flow<>>) Data,
class DSF, class DSF>
class DEXF> class flow_single_sender<Data, DSF> {
class flow_single_sender<Data, DSF, DEXF> {
Data data_; Data data_;
DSF sf_; DSF sf_;
DEXF exf_;
public: public:
using properties = property_set_insert_t< using properties = property_set_insert_t<
...@@ -171,10 +156,7 @@ class flow_single_sender<Data, DSF, DEXF> { ...@@ -171,10 +156,7 @@ class flow_single_sender<Data, DSF, DEXF> {
: data_(std::move(data)) {} : data_(std::move(data)) {}
constexpr flow_single_sender(Data data, DSF sf) constexpr flow_single_sender(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {} : data_(std::move(data)), sf_(std::move(sf)) {}
constexpr flow_single_sender(Data data, DSF sf, DEXF exf)
: data_(std::move(data)), sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() { return exf_(data_); }
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP(lazy::Receiver<Out> PUSHMI_AND (requires PUSHMI_EXP(lazy::Receiver<Out> PUSHMI_AND
lazy::Invocable<DSF&, Data&, Out>)) lazy::Invocable<DSF&, Data&, Out>))
...@@ -185,7 +167,7 @@ class flow_single_sender<Data, DSF, DEXF> { ...@@ -185,7 +167,7 @@ class flow_single_sender<Data, DSF, DEXF> {
template <> template <>
class flow_single_sender<> class flow_single_sender<>
: public flow_single_sender<ignoreSF, trampolineEXF> { : public flow_single_sender<ignoreSF> {
public: public:
flow_single_sender() = default; flow_single_sender() = default;
}; };
...@@ -194,62 +176,41 @@ public: ...@@ -194,62 +176,41 @@ public:
// make_flow_single_sender // make_flow_single_sender
PUSHMI_INLINE_VAR constexpr struct make_flow_single_sender_fn { PUSHMI_INLINE_VAR constexpr struct make_flow_single_sender_fn {
inline auto operator()() const { inline auto operator()() const {
return flow_single_sender<ignoreSF, trampolineEXF>{}; return flow_single_sender<ignoreSF>{};
} }
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>)) (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
auto operator()(SF sf) const { auto operator()(SF sf) const {
return flow_single_sender<SF, trampolineEXF>{std::move(sf)}; return flow_single_sender<SF>{std::move(sf)};
}
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<> && Invocable<EXF&>
PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
auto operator()(SF sf, EXF exf) const {
return flow_single_sender<SF, EXF>{std::move(sf), std::move(exf)};
} }
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_single<>, is_flow<>>) (requires True<> && Sender<Data, is_single<>, is_flow<>>)
auto operator()(Data d) const { auto operator()(Data d) const {
return flow_single_sender<Data, passDSF, passDEXF>{std::move(d)}; return flow_single_sender<Data, passDSF>{std::move(d)};
} }
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_single<>, is_flow<>>) (requires Sender<Data, is_single<>, is_flow<>>)
auto operator()(Data d, DSF sf) const { auto operator()(Data d, DSF sf) const {
return flow_single_sender<Data, DSF, passDEXF>{std::move(d), std::move(sf)}; return flow_single_sender<Data, DSF>{std::move(d), std::move(sf)};
}
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_single<>, is_flow<>> && Invocable<DEXF&, Data&>)
auto operator()(Data d, DSF sf, DEXF exf) const {
return flow_single_sender<Data, DSF, DEXF>{std::move(d),
std::move(sf), std::move(exf)};
} }
} const make_flow_single_sender {}; } const make_flow_single_sender {};
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// deduction guides // deduction guides
#if __cpp_deduction_guides >= 201703 #if __cpp_deduction_guides >= 201703
flow_single_sender() -> flow_single_sender<ignoreSF, trampolineEXF>; flow_single_sender() -> flow_single_sender<ignoreSF>;
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>)) (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
flow_single_sender(SF) -> flow_single_sender<SF, trampolineEXF>; flow_single_sender(SF) -> flow_single_sender<SF>;
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<> && Invocable<EXF&>
PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
flow_single_sender(SF, EXF) -> flow_single_sender<SF, EXF>;
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_single<>, is_flow<>>) (requires True<> && Sender<Data, is_single<>, is_flow<>>)
flow_single_sender(Data) -> flow_single_sender<Data, passDSF, passDEXF>; flow_single_sender(Data) -> flow_single_sender<Data, passDSF>;
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_single<>, is_flow<>>) (requires Sender<Data, is_single<>, is_flow<>>)
flow_single_sender(Data, DSF) -> flow_single_sender<Data, DSF, passDEXF>; flow_single_sender(Data, DSF) -> flow_single_sender<Data, DSF>;
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_single<>, is_flow<>> && Invocable<DEXF&, Data&>)
flow_single_sender(Data, DSF, DEXF) -> flow_single_sender<Data, DSF, DEXF>;
#endif #endif
template<> template<>
......
...@@ -74,22 +74,25 @@ struct is_concurrent_sequence; ...@@ -74,22 +74,25 @@ struct is_concurrent_sequence;
// implementation types // implementation types
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class receiver; class executor;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class flow_receiver; class constrained_executor;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class single_sender; class time_executor;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class many_sender; class receiver;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class constrained_single_sender; class flow_receiver;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class time_single_sender; class single_sender;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class many_sender;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN> template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class flow_single_sender; class flow_single_sender;
...@@ -123,35 +126,28 @@ template < ...@@ -123,35 +126,28 @@ template <
class... VN> class... VN>
class any_flow_many_sender; class any_flow_many_sender;
template <class E = std::exception_ptr, class C = std::ptrdiff_t, class... VN> template <class E = std::exception_ptr, class... VN>
class any_constrained_single_sender; class any_executor;
template <
class E = std::exception_ptr,
class TP = std::chrono::system_clock::time_point,
class... VN>
class any_time_single_sender;
template <class E = std::exception_ptr>
struct any_executor;
template <class E = std::exception_ptr> template <class E = std::exception_ptr, class... VN>
struct any_executor_ref; struct any_executor_ref;
template <class E = std::exception_ptr, class CV = std::ptrdiff_t> template <class E = std::exception_ptr, class CV = std::ptrdiff_t, class... VN>
struct any_constrained_executor; class any_constrained_executor;
template <class E = std::exception_ptr, class TP = std::ptrdiff_t> template <class E = std::exception_ptr, class TP = std::ptrdiff_t, class... VN>
struct any_constrained_executor_ref; struct any_constrained_executor_ref;
template < template <
class E = std::exception_ptr, class E = std::exception_ptr,
class TP = std::chrono::system_clock::time_point> class TP = std::chrono::system_clock::time_point,
struct any_time_executor; class... VN>
class any_time_executor;
template < template <
class E = std::exception_ptr, class E = std::exception_ptr,
class TP = std::chrono::system_clock::time_point> class TP = std::chrono::system_clock::time_point,
class... VN>
struct any_time_executor_ref; struct any_time_executor_ref;
namespace operators {} namespace operators {}
......
...@@ -22,23 +22,34 @@ namespace pushmi { ...@@ -22,23 +22,34 @@ namespace pushmi {
class inline_constrained_executor_t { class inline_constrained_executor_t {
public: public:
using properties = property_set<
is_constrained<>,
is_fifo_sequence<>>;
template<class CV>
struct task {
CV cv_;
using properties = property_set< using properties = property_set<
is_constrained<>, is_sender<>,
is_executor<>,
is_always_blocking<>, is_always_blocking<>,
is_fifo_sequence<>,
is_single<>>; is_single<>>;
std::ptrdiff_t top() { PUSHMI_TEMPLATE(class Out)
(requires Receiver<Out>) //
void submit(Out out) && {
set_value(out, inline_constrained_executor_t{});
set_done(out);
}
};
auto top() {
return 0; return 0;
} }
auto executor() { task<std::ptrdiff_t> schedule() {
return *this; return {top()};
} }
PUSHMI_TEMPLATE(class CV, class Out) template<class CV>
(requires Regular<CV>&& Receiver<Out>)void submit(CV, Out out) { task<std::ptrdiff_t> schedule(CV cv) {
set_value(out, *this); return {cv};
set_done(out);
} }
}; };
...@@ -54,24 +65,36 @@ inline inline_constrained_executor_t inline_constrained_executor() { ...@@ -54,24 +65,36 @@ inline inline_constrained_executor_t inline_constrained_executor() {
class inline_time_executor_t { class inline_time_executor_t {
public: public:
using properties = property_set<
is_time<>,
is_fifo_sequence<>>;
template<class TP>
struct task {
TP tp_;
using properties = property_set< using properties = property_set<
is_time<>, is_sender<>,
is_executor<>,
is_always_blocking<>, is_always_blocking<>,
is_fifo_sequence<>,
is_single<>>; is_single<>>;
PUSHMI_TEMPLATE(class Out)
(requires Receiver<Out>) //
void submit(Out out) && {
std::this_thread::sleep_until(tp_);
set_value(out, inline_time_executor_t{});
set_done(out);
}
};
auto top() { auto top() {
return std::chrono::system_clock::now(); return std::chrono::system_clock::now();
} }
auto executor() { task<std::chrono::system_clock::time_point> schedule() {
return *this; return {top()};
} }
PUSHMI_TEMPLATE(class TP, class Out) template<class TP>
(requires Regular<TP>&& Receiver<Out>)void submit(TP tp, Out out) { task<TP> schedule(TP tp) {
std::this_thread::sleep_until(tp); return {tp};
set_value(out, *this);
set_done(out);
} }
}; };
...@@ -86,21 +109,27 @@ inline inline_time_executor_t inline_time_executor() { ...@@ -86,21 +109,27 @@ inline inline_time_executor_t inline_time_executor() {
} }
class inline_executor_t { class inline_executor_t {
public: public:
using properties = property_set< using properties = property_set<
is_sender<>, is_executor<>,
is_executor<>, is_fifo_sequence<>>;
is_always_blocking<>,
is_fifo_sequence<>,
is_single<>>;
auto executor() { struct task {
return *this; using properties = property_set<
} is_sender<>,
PUSHMI_TEMPLATE(class Out) is_always_blocking<>,
(requires Receiver<Out>)void submit(Out out) { is_single<>>;
set_value(out, *this);
set_done(out); PUSHMI_TEMPLATE(class Out)
(requires Receiver<Out>) //
void submit(Out out) && {
set_value(out, inline_executor_t{});
set_done(out);
}
};
task schedule() {
return {};
} }
}; };
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/experimental/pushmi/executor.h> #include <folly/experimental/pushmi/executor.h>
#include <folly/experimental/pushmi/receiver.h> #include <folly/experimental/pushmi/receiver.h>
#include <folly/experimental/pushmi/trampoline.h> #include <folly/experimental/pushmi/trampoline.h>
#include <type_traits>
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
...@@ -26,7 +27,7 @@ template <class E, class... VN> ...@@ -26,7 +27,7 @@ template <class E, class... VN>
class any_many_sender { class any_many_sender {
union data { union data {
void* pobj_ = nullptr; void* pobj_ = nullptr;
char buffer_[sizeof(std::tuple<VN...>)]; // can hold a V in-situ std::aligned_union_t<0, std::tuple<VN...>> buffer_;
} data_{}; } data_{};
template <class Wrapped> template <class Wrapped>
static constexpr bool insitu() { static constexpr bool insitu() {
...@@ -35,12 +36,8 @@ class any_many_sender { ...@@ -35,12 +36,8 @@ class any_many_sender {
} }
struct vtable { struct vtable {
static void s_op(data&, data*) {} static void s_op(data&, data*) {}
static any_executor<E> s_executor(data&) {
return {};
}
static void s_submit(data&, any_receiver<E, VN...>) {} static void s_submit(data&, any_receiver<E, VN...>) {}
void (*op_)(data&, data*) = vtable::s_op; void (*op_)(data&, data*) = vtable::s_op;
any_executor<E> (*executor_)(data&) = vtable::s_executor;
void (*submit_)(data&, any_receiver<E, VN...>) = vtable::s_submit; void (*submit_)(data&, any_receiver<E, VN...>) = vtable::s_submit;
}; };
static constexpr vtable const noop_{}; static constexpr vtable const noop_{};
...@@ -53,16 +50,12 @@ class any_many_sender { ...@@ -53,16 +50,12 @@ class any_many_sender {
dst->pobj_ = std::exchange(src.pobj_, nullptr); dst->pobj_ = std::exchange(src.pobj_, nullptr);
delete static_cast<Wrapped const*>(src.pobj_); delete static_cast<Wrapped const*>(src.pobj_);
} }
static any_executor<E> executor(data& src) {
return any_executor<E>{
::folly::pushmi::executor(*static_cast<Wrapped*>(src.pobj_))};
}
static void submit(data& src, any_receiver<E, VN...> out) { static void submit(data& src, any_receiver<E, VN...> out) {
::folly::pushmi::submit( ::folly::pushmi::submit(
*static_cast<Wrapped*>(src.pobj_), std::move(out)); *static_cast<Wrapped*>(src.pobj_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
data_.pobj_ = new Wrapped(std::move(obj)); data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
...@@ -71,21 +64,17 @@ class any_many_sender { ...@@ -71,21 +64,17 @@ class any_many_sender {
struct s { struct s {
static void op(data& src, data* dst) { static void op(data& src, data* dst) {
if (dst) if (dst)
new (dst->buffer_) new (&dst->buffer_)
Wrapped(std::move(*static_cast<Wrapped*>((void*)src.buffer_))); Wrapped(std::move(*static_cast<Wrapped*>((void*)&src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped(); static_cast<Wrapped const*>((void*)&src.buffer_)->~Wrapped();
}
static any_executor<E> executor(data& src) {
return any_executor<E>{::folly::pushmi::executor(
*static_cast<Wrapped*>((void*)src.buffer_))};
} }
static void submit(data& src, any_receiver<E, VN...> out) { static void submit(data& src, any_receiver<E, VN...> out) {
::folly::pushmi::submit( ::folly::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out)); *static_cast<Wrapped*>((void*)&src.buffer_), std::move(out));
} }
}; };
static const vtable vtbl{s::op, s::executor, s::submit}; static const vtable vtbl{s::op, s::submit};
new (data_.buffer_) Wrapped(std::move(obj)); new (&data_.buffer_) Wrapped(std::move(obj));
vptr_ = &vtbl; vptr_ = &vtbl;
} }
template <class T, class U = std::decay_t<T>> template <class T, class U = std::decay_t<T>>
...@@ -117,11 +106,10 @@ class any_many_sender { ...@@ -117,11 +106,10 @@ class any_many_sender {
new ((void*)this) any_many_sender(std::move(that)); new ((void*)this) any_many_sender(std::move(that));
return *this; return *this;
} }
any_executor<E> executor() { PUSHMI_TEMPLATE(class Out)
return vptr_->executor_(data_); (requires ReceiveError<Out, E>&& ReceiveValue<Out, VN...>) //
} void submit(Out&& out) {
void submit(any_receiver<E, VN...> out) { vptr_->submit_(data_, any_receiver<E, VN...>{(Out &&) out});
vptr_->submit_(data_, std::move(out));
} }
}; };
...@@ -130,22 +118,16 @@ template <class E, class... VN> ...@@ -130,22 +118,16 @@ template <class E, class... VN>
constexpr typename any_many_sender<E, VN...>::vtable const constexpr typename any_many_sender<E, VN...>::vtable const
any_many_sender<E, VN...>::noop_; any_many_sender<E, VN...>::noop_;
template <class SF, class EXF> template <class SF>
class many_sender<SF, EXF> { class many_sender<SF> {
SF sf_; SF sf_;
EXF exf_;
public: public:
using properties = property_set<is_sender<>, is_many<>>; using properties = property_set<is_sender<>, is_many<>>;
constexpr many_sender() = default; constexpr many_sender() = default;
constexpr explicit many_sender(SF sf) : sf_(std::move(sf)) {} constexpr explicit many_sender(SF sf) : sf_(std::move(sf)) {}
constexpr many_sender(SF sf, EXF exf)
: sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() {
return exf_();
}
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
lazy::Receiver<Out> PUSHMI_AND lazy::Invocable<SF&, Out>)) // lazy::Receiver<Out> PUSHMI_AND lazy::Invocable<SF&, Out>)) //
...@@ -154,11 +136,10 @@ class many_sender<SF, EXF> { ...@@ -154,11 +136,10 @@ class many_sender<SF, EXF> {
} }
}; };
template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>>) Data, class DSF, class DEXF> template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>>) Data, class DSF>
class many_sender<Data, DSF, DEXF> { class many_sender<Data, DSF> {
Data data_; Data data_;
DSF sf_; DSF sf_;
DEXF exf_;
public: public:
using properties = property_set_insert_t< using properties = property_set_insert_t<
...@@ -169,22 +150,23 @@ class many_sender<Data, DSF, DEXF> { ...@@ -169,22 +150,23 @@ class many_sender<Data, DSF, DEXF> {
constexpr explicit many_sender(Data data) : data_(std::move(data)) {} constexpr explicit many_sender(Data data) : data_(std::move(data)) {}
constexpr many_sender(Data data, DSF sf) constexpr many_sender(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {} : data_(std::move(data)), sf_(std::move(sf)) {}
constexpr many_sender(Data data, DSF sf, DEXF exf)
: data_(std::move(data)), sf_(std::move(sf)), exf_(std::move(exf)) {}
auto executor() {
return exf_(data_);
}
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP( (requires PUSHMI_EXP(
lazy::Receiver<Out> PUSHMI_AND lazy::Invocable<DSF&, Data&, Out>)) // lazy::Receiver<Out> PUSHMI_AND lazy::Invocable<DSF&, Data&, Out>)) //
void submit(Out out) { void submit(Out out) & {
sf_(data_, std::move(out)); sf_(data_, std::move(out));
} }
PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP(
lazy::Receiver<Out> PUSHMI_AND lazy::Invocable<DSF&, Data&&, Out>)) //
void submit(Out out) && {
sf_(std::move(data_), std::move(out));
}
}; };
template <> template <>
class many_sender<> : public many_sender<ignoreSF, trampolineEXF> { class many_sender<> : public many_sender<ignoreSF> {
public: public:
many_sender() = default; many_sender() = default;
}; };
...@@ -193,72 +175,47 @@ class many_sender<> : public many_sender<ignoreSF, trampolineEXF> { ...@@ -193,72 +175,47 @@ class many_sender<> : public many_sender<ignoreSF, trampolineEXF> {
// make_many_sender // make_many_sender
PUSHMI_INLINE_VAR constexpr struct make_many_sender_fn { PUSHMI_INLINE_VAR constexpr struct make_many_sender_fn {
inline auto operator()() const { inline auto operator()() const {
return many_sender<ignoreSF, trampolineEXF>{}; return many_sender<ignoreSF>{};
} }
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&&not Sender<SF>)) // (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&&not Sender<SF>)) //
auto auto
operator()(SF sf) const { operator()(SF sf) const {
return many_sender<SF, trampolineEXF>{std::move(sf)}; return many_sender<SF>{std::move(sf)};
}
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<>&& Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(
&&not Sender<SF>)) //
auto
operator()(SF sf, EXF exf) const {
return many_sender<SF, EXF>{std::move(sf), std::move(exf)};
} }
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<>&& Sender<Data, is_many<>>) // (requires True<>&& Sender<Data, is_many<>>) //
auto auto
operator()(Data d) const { operator()(Data d) const {
return many_sender<Data, passDSF, passDEXF>{std::move(d)}; return many_sender<Data, passDSF>{std::move(d)};
} }
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>>) // (requires Sender<Data, is_many<>>) //
auto auto
operator()(Data d, DSF sf) const { operator()(Data d, DSF sf) const {
return many_sender<Data, DSF, passDEXF>{std::move(d), std::move(sf)}; return many_sender<Data, DSF>{std::move(d), std::move(sf)};
}
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_many<>>&& Invocable<DEXF&, Data&>) //
auto
operator()(Data d, DSF sf, DEXF exf) const {
return many_sender<Data, DSF, DEXF>{
std::move(d), std::move(sf), std::move(exf)};
} }
} const make_many_sender{}; } const make_many_sender{};
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// deduction guides // deduction guides
#if __cpp_deduction_guides >= 201703 #if __cpp_deduction_guides >= 201703
many_sender()->many_sender<ignoreSF, trampolineEXF>; many_sender()->many_sender<ignoreSF>;
PUSHMI_TEMPLATE(class SF) PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&&not Sender<SF>)) // (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&&not Sender<SF>)) //
many_sender(SF) many_sender(SF)
->many_sender<SF, trampolineEXF>; ->many_sender<SF>;
PUSHMI_TEMPLATE(class SF, class EXF)
(requires True<>&& Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(
&&not Sender<SF>)) //
many_sender(SF, EXF)
->many_sender<SF, EXF>;
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires True<>&& Sender<Data, is_many<>>) // (requires True<>&& Sender<Data, is_many<>>) //
many_sender(Data) many_sender(Data)
->many_sender<Data, passDSF, passDEXF>; ->many_sender<Data, passDSF>;
PUSHMI_TEMPLATE(class Data, class DSF) PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>>) // (requires Sender<Data, is_many<>>) //
many_sender(Data, DSF) many_sender(Data, DSF)
->many_sender<Data, DSF, passDEXF>; ->many_sender<Data, DSF>;
PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
(requires Sender<Data, is_many<>>&& Invocable<DEXF&, Data&>) //
many_sender(Data, DSF, DEXF)
->many_sender<Data, DSF, DEXF>;
#endif #endif
template <> template <>
......
...@@ -24,29 +24,35 @@ namespace pushmi { ...@@ -24,29 +24,35 @@ namespace pushmi {
// very poor perf example executor. // very poor perf example executor.
// //
struct new_thread_executor { struct new_thread_executor;
struct new_thread_task {
using properties = property_set< using properties = property_set<
is_sender<>, is_sender<>,
is_executor<>,
is_never_blocking<>, is_never_blocking<>,
is_concurrent_sequence<>,
is_single<>>; is_single<>>;
new_thread_executor executor() {
return {};
}
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires Receiver<Out>) (requires Receiver<Out>)
void submit(Out out) { void submit(Out out) && {
std::thread t{[out = std::move(out)]() mutable { std::thread t{[out = std::move(out)]() mutable {
auto tr = ::folly::pushmi::trampoline(); auto tr = ::folly::pushmi::trampoline();
::folly::pushmi::submit(tr, std::move(out)); ::folly::pushmi::submit(::folly::pushmi::schedule(tr), std::move(out));
}}; }};
// pass ownership of thread to out // pass ownership of thread to out
t.detach(); t.detach();
} }
}; };
struct new_thread_executor {
using properties = property_set<is_executor<>, is_concurrent_sequence<>>;
new_thread_task schedule() {
return {};
}
};
inline new_thread_executor new_thread() { inline new_thread_executor new_thread() {
return {}; return {};
} }
......
...@@ -22,34 +22,23 @@ ...@@ -22,34 +22,23 @@
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
namespace detail { namespace detail {
struct single_empty_sender_base : single_sender<ignoreSF, inlineEXF> { struct single_empty_impl : pipeorigin {
using properties = property_set< using properties = property_set<
is_sender<>, is_sender<>,
is_single<>, is_single<>,
is_always_blocking<>, is_always_blocking<>>;
is_fifo_sequence<>>;
};
template <class... VN>
struct single_empty_impl {
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires ReceiveValue<Out, VN...>) // (requires Receiver<Out>&& Invocable<decltype(set_done)&, Out&>) //
void void submit(Out&& out) {
operator()(single_empty_sender_base&, Out out) {
set_done(out); set_done(out);
} }
}; };
} // namespace detail } // namespace detail
namespace operators { namespace operators {
template <class... VN> inline detail::single_empty_impl empty() {
auto empty() { return {};
return make_single_sender(
detail::single_empty_sender_base{}, detail::single_empty_impl<VN...>{});
}
inline auto empty() {
return make_single_sender(
detail::single_empty_sender_base{}, detail::single_empty_impl<>{});
} }
} // namespace operators } // namespace operators
......
...@@ -21,21 +21,20 @@ ...@@ -21,21 +21,20 @@
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
namespace detail { namespace detail {
struct single_error_sender_base : single_sender<ignoreSF, inlineEXF> { struct single_error_sender_base : single_sender<ignoreSF> {
using properties = property_set< using properties = property_set<
is_sender<>, is_sender<>,
is_single<>, is_single<>,
is_always_blocking<>, is_always_blocking<>>;
is_fifo_sequence<>>;
}; };
template <class E, class... VN> template <class E>
struct single_error_impl { struct single_error_impl {
E e_; E e_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Base, class Out)
(requires ReceiveError<Out, E>&& ReceiveValue<Out, VN...>) (requires ReceiveError<Out, E>)
void operator()( void operator()(
single_error_sender_base&, Base&&,
Out out) { Out&& out) {
set_error(out, std::move(e_)); set_error(out, std::move(e_));
} }
}; };
...@@ -43,12 +42,12 @@ struct single_error_impl { ...@@ -43,12 +42,12 @@ struct single_error_impl {
namespace operators { namespace operators {
PUSHMI_TEMPLATE(class... VN, class E) PUSHMI_TEMPLATE(class E)
(requires And<SemiMovable<VN>...>&& SemiMovable<E>) (requires MoveConstructible<E>)
auto error(E e) { auto error(E e) {
return make_single_sender( return make_single_sender(
detail::single_error_sender_base{}, detail::single_error_sender_base{},
detail::single_error_impl<E, VN...>{std::move(e)}); detail::single_error_impl<E>{std::move(e)});
} }
} // namespace operators } // namespace operators
......
...@@ -39,13 +39,13 @@ struct filter_fn { ...@@ -39,13 +39,13 @@ struct filter_fn {
template <class In, class Predicate> template <class In, class Predicate>
struct submit_impl { struct submit_impl {
Predicate p_; Predicate p_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class SIn, class Out)
(requires Receiver<Out>) (requires Receiver<Out>)
auto operator()(Out out) const { auto operator()(SIn&& in, Out out) const {
return ::folly::pushmi::detail::receiver_from_fn<In>()( submit((In&&)in, ::folly::pushmi::detail::receiver_from_fn<In>()(
std::move(out), std::move(out),
// copy 'p' to allow multiple calls to submit // copy 'p' to allow multiple calls to submit
on_value_impl<In, Predicate>{p_}); on_value_impl<In, Predicate>{p_}));
} }
}; };
template <class Predicate> template <class Predicate>
...@@ -56,8 +56,7 @@ struct filter_fn { ...@@ -56,8 +56,7 @@ struct filter_fn {
auto operator()(In in) const { auto operator()(In in) const {
return ::folly::pushmi::detail::sender_from( return ::folly::pushmi::detail::sender_from(
std::move(in), std::move(in),
::folly::pushmi::detail::submit_transform_out<In>( submit_impl<In, Predicate>{p_});
submit_impl<In, Predicate>{p_}));
} }
}; };
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <folly/experimental/pushmi/o/extension_operators.h> #include <folly/experimental/pushmi/o/extension_operators.h>
#include <folly/experimental/pushmi/o/submit.h> #include <folly/experimental/pushmi/o/submit.h>
#include <folly/Function.h>
namespace folly { namespace folly {
namespace pushmi { namespace pushmi {
...@@ -28,38 +29,46 @@ struct for_each_fn { ...@@ -28,38 +29,46 @@ struct for_each_fn {
struct subset { struct subset {
using properties = property_set<PN...>; using properties = property_set<PN...>;
}; };
template<class Up>
struct request_fn {
Up up_;
explicit request_fn(Up up) : up_(std::move(up)) {}
request_fn(request_fn&& o) : up_(std::move(o.up_)) {}
void operator()(std::ptrdiff_t requested) {
::folly::pushmi::set_value(up_, requested);
}
};
template <class In, class Out> template <class In, class Out>
struct Pull : Out { struct Pull {
explicit Pull(Out out) : Out(std::move(out)) {} Out out_;
explicit Pull(Out out) : out_(std::move(out)) {}
using properties = using properties =
property_set_insert_t<properties_t<Out>, property_set<is_flow<>>>; property_set_insert_t<properties_t<Out>, property_set<is_flow<>>>;
std::function<void(std::ptrdiff_t)> pull; folly::Function<void(std::ptrdiff_t)> pull;
template <class... VN> template <class... VN>
void value(VN&&... vn) { void value(VN&&... vn) {
::folly::pushmi::set_value(static_cast<Out&>(*this), (VN &&) vn...); ::folly::pushmi::set_value(out_, (VN &&) vn...);
pull(1); pull(1);
} }
template <class E> template <class E>
void error(E&& e) { void error(E&& e) noexcept {
// break circular reference // break circular reference
pull = nullptr; pull = nullptr;
::folly::pushmi::set_error(static_cast<Out&>(*this), (E &&) e); ::folly::pushmi::set_error(out_, (E &&) e);
} }
void done() { void done() {
// break circular reference // break circular reference
pull = nullptr; pull = nullptr;
::folly::pushmi::set_done(static_cast<Out&>(*this)); ::folly::pushmi::set_done(out_);
} }
PUSHMI_TEMPLATE(class Up) PUSHMI_TEMPLATE(class Up)
(requires Receiver<Up> && ReceiveValue<Up, std::ptrdiff_t>) (requires ReceiveValue<Up, std::ptrdiff_t>)
void starting(Up up) { void starting(Up up) {
pull = [up = std::move(up)](std::ptrdiff_t requested) mutable { pull = request_fn<Up>{std::move(up)};
::folly::pushmi::set_value(up, requested);
};
pull(1); pull(1);
} }
PUSHMI_TEMPLATE(class Up) PUSHMI_TEMPLATE(class Up)
(requires ReceiveValue<Up>) (requires ReceiveValue<Up> && not ReceiveValue<Up, std::ptrdiff_t>)
void starting(Up) {} void starting(Up) {}
}; };
template <class... AN> template <class... AN>
......
...@@ -34,23 +34,22 @@ namespace operators { ...@@ -34,23 +34,22 @@ namespace operators {
PUSHMI_INLINE_VAR constexpr struct from_fn { PUSHMI_INLINE_VAR constexpr struct from_fn {
private: private:
struct sender_base : many_sender<ignoreSF, inlineEXF> { struct sender_base : many_sender<> {
using properties = property_set< using properties = property_set<
is_sender<>, is_sender<>,
is_many<>, is_many<>,
is_always_blocking<>, is_always_blocking<>>;
is_fifo_sequence<>>;
}; };
template <class I, class S> template <class I, class S>
struct out_impl { struct out_impl {
I begin_; I begin_;
S end_; S end_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class In, class Out)
(requires ReceiveValue< (requires ReceiveValue<
Out, Out,
typename std::iterator_traits<I>::value_type>) // typename std::iterator_traits<I>::value_type>) //
void void
operator()(sender_base&, Out out) const { operator()(In&&, Out out) const {
auto c = begin_; auto c = begin_;
for (; c != end_; ++c) { for (; c != end_; ++c) {
set_value(out, *c); set_value(out, *c);
...@@ -105,7 +104,7 @@ struct flow_from_up { ...@@ -105,7 +104,7 @@ struct flow_from_up {
} }
// submit work to exec // submit work to exec
::folly::pushmi::submit( ::folly::pushmi::submit(
p->exec, make_receiver([p = p, requested](auto) { ::folly::pushmi::schedule(p->exec), make_receiver([p = p, requested](auto) {
auto remaining = requested; auto remaining = requested;
// this loop is structured to work when there is // this loop is structured to work when there is
// re-entrancy out.value in the loop may call up.value. // re-entrancy out.value in the loop may call up.value.
...@@ -126,13 +125,13 @@ struct flow_from_up { ...@@ -126,13 +125,13 @@ struct flow_from_up {
void error(E) noexcept { void error(E) noexcept {
p->stop.store(true); p->stop.store(true);
::folly::pushmi::submit( ::folly::pushmi::submit(
p->exec, make_receiver([p = p](auto) { set_done(p->out); })); ::folly::pushmi::schedule(p->exec), make_receiver([p = p](auto) { set_done(p->out); }));
} }
void done() { void done() {
p->stop.store(true); p->stop.store(true);
::folly::pushmi::submit( ::folly::pushmi::submit(
p->exec, make_receiver([p = p](auto) { set_done(p->out); })); ::folly::pushmi::schedule(p->exec), make_receiver([p = p](auto) { set_done(p->out); }));
} }
}; };
...@@ -142,19 +141,19 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { ...@@ -142,19 +141,19 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
struct out_impl { struct out_impl {
I begin_; I begin_;
S end_; S end_;
mutable Exec exec_; Exec exec_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires ReceiveValue< (requires ReceiveValue<
Out, Out,
typename std::iterator_traits<I>::value_type>) // typename std::iterator_traits<I>::value_type>) //
void void
operator()(Out out) const { operator()(Out out) {
using Producer = flow_from_producer<I, S, Out, Exec>; using Producer = flow_from_producer<I, S, Out, Exec>;
auto p = std::make_shared<Producer>( auto p = std::make_shared<Producer>(
begin_, end_, std::move(out), exec_, false); begin_, end_, std::move(out), exec_, false);
::folly::pushmi::submit( ::folly::pushmi::submit(
exec_, make_receiver([p](auto) { ::folly::pushmi::schedule(exec_), make_receiver([p](auto) {
// pass reference for cancellation. // pass reference for cancellation.
set_starting(p->out, make_receiver(flow_from_up<Producer>{p})); set_starting(p->out, make_receiver(flow_from_up<Producer>{p}));
})); }));
...@@ -181,14 +180,14 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { ...@@ -181,14 +180,14 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
PUSHMI_TEMPLATE(class I, class S, class Exec) PUSHMI_TEMPLATE(class I, class S, class Exec)
(requires DerivedFrom< (requires DerivedFrom<
typename std::iterator_traits<I>::iterator_category, typename std::iterator_traits<I>::iterator_category,
std::forward_iterator_tag>&& Sender<Exec, is_single<>, is_executor<>>) // std::forward_iterator_tag>&& Executor<Exec>) //
auto auto
operator()(I begin, S end, Exec exec) const { operator()(I begin, S end, Exec exec) const {
return make_flow_many_sender(out_impl<I, S, Exec>{begin, end, exec}); return make_flow_many_sender(out_impl<I, S, Exec>{begin, end, exec});
} }
PUSHMI_TEMPLATE(class R, class Exec) PUSHMI_TEMPLATE(class R, class Exec)
(requires Range<R>&& Sender<Exec, is_single<>, is_executor<>>) // (requires Range<R>&& Executor<Exec>) //
auto auto
operator()(R&& range, Exec exec) const { operator()(R&& range, Exec exec) const {
return (*this)(std::begin(range), std::end(range), exec); return (*this)(std::begin(range), std::end(range), exec);
......
...@@ -26,24 +26,23 @@ namespace operators { ...@@ -26,24 +26,23 @@ namespace operators {
PUSHMI_INLINE_VAR constexpr struct just_fn { PUSHMI_INLINE_VAR constexpr struct just_fn {
private: private:
struct sender_base : single_sender<ignoreSF, inlineEXF> { struct sender_base : single_sender<> {
using properties = property_set< using properties = property_set<
is_sender<>, is_sender<>,
is_single<>, is_single<>,
is_always_blocking<>, is_always_blocking<>>;
is_fifo_sequence<>>;
}; };
template <class... VN> template <class... VN>
struct impl { struct impl {
std::tuple<VN...> vn_; std::tuple<VN...> vn_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class In, class Out)
(requires ReceiveValue<Out, VN...>) // (requires ReceiveValue<Out, VN...>) //
void void
operator()(sender_base&, Out out) { operator()(In&&, Out&& out) {
::folly::pushmi::apply( ::folly::pushmi::apply(
::folly::pushmi::set_value, ::folly::pushmi::set_value,
std::tuple_cat(std::tuple<Out&>{out}, std::move(vn_))); std::tuple_cat(std::tuple<Out&>{out}, std::move(vn_)));
set_done(std::move(out)); set_done(out);
} }
}; };
......
...@@ -34,66 +34,38 @@ struct on_fn { ...@@ -34,66 +34,38 @@ struct on_fn {
submit(in_, std::move(out_)); submit(in_, std::move(out_));
} }
}; };
template <class In, class ExecutorFactory> template <class Factory>
struct out_impl { struct submit_impl {
ExecutorFactory ef_; Factory ef_;
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class In, class Out)
(requires SenderTo<In, Out>) // (requires SenderTo<In, Out>) //
void void
operator()(In& in, Out out) const { operator()(In&& in, Out out) const {
auto exec = ef_(); auto exec = ::folly::pushmi::make_strand(ef_);
submit( submit(
exec, ::folly::pushmi::schedule(exec),
::folly::pushmi::make_receiver( ::folly::pushmi::make_receiver(on_value_impl<std::decay_t<In>, Out>{
on_value_impl<In, Out>{in, std::move(out)})); (In&&) in, std::move(out)}));
} }
}; };
template <class In, class TP, class Out> template <class Factory>
struct time_on_value_impl { struct adapt_impl {
In in_; Factory ef_;
TP at_;
Out out_;
void operator()(any) {
submit(in_, at_, std::move(out_));
}
};
template <class In, class ExecutorFactory>
struct time_out_impl {
ExecutorFactory ef_;
PUSHMI_TEMPLATE(class TP, class Out)
(requires TimeSenderTo<In, Out>) //
void
operator()(In& in, TP at, Out out) const {
auto exec = ef_();
submit(
exec,
at,
::folly::pushmi::make_receiver(
time_on_value_impl<In, TP, Out>{in, at, std::move(out)}));
}
};
template <class ExecutorFactory>
struct in_impl {
ExecutorFactory ef_;
PUSHMI_TEMPLATE(class In) PUSHMI_TEMPLATE(class In)
(requires Sender<In>) // (requires Sender<std::decay_t<In>>) //
auto auto
operator()(In in) const { operator()(In&& in) const {
return ::folly::pushmi::detail::sender_from( return ::folly::pushmi::detail::sender_from(
std::move(in), (In&&) in, submit_impl<Factory>{ef_});
detail::submit_transform_out<In>(
out_impl<In, ExecutorFactory>{ef_},
time_out_impl<In, ExecutorFactory>{ef_}));
} }
}; };
public: public:
PUSHMI_TEMPLATE(class ExecutorFactory) PUSHMI_TEMPLATE(class Factory)
(requires Invocable<ExecutorFactory&>&& (requires StrandFactory<Factory>) //
Executor<invoke_result_t<ExecutorFactory&>>) //
auto auto
operator()(ExecutorFactory ef) const { operator()(Factory ef) const {
return in_impl<ExecutorFactory>{std::move(ef)}; return adapt_impl<Factory>{std::move(ef)};
} }
}; };
......
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/pushmi/boosters.h>
#include <folly/experimental/pushmi/detail/opt.h>
#include <folly/experimental/pushmi/o/extension_operators.h>
#include <folly/experimental/pushmi/trampoline.h>
#include <functional>
namespace folly {
namespace pushmi {
namespace detail {
struct schedule_fn {
private:
// TODO - only move, move-only types..
// if out can be copied, then schedule can be called multiple
// times..
struct fn {
PUSHMI_TEMPLATE(class Exec)
(requires Executor<Exec>) //
auto
operator()(Exec& ex) {
return schedule(ex);
}
};
public:
auto operator()() const {
return schedule_fn::fn{};
}
};
struct schedule_at_fn {
private:
// TODO - only move, move-only types..
// if out can be copied, then schedule can be called multiple
// times..
template <class CV>
struct fn {
CV at_;
PUSHMI_TEMPLATE(class Exec)
(requires ConstrainedExecutor<Exec>) //
auto
operator()(Exec& ex) {
return schedule(ex, std::move(at_));
}
};
public:
template <class CV>
auto operator()(CV&& at) const {
return schedule_at_fn::fn<CV>{(CV &&) at};
}
};
struct schedule_after_fn {
private:
// TODO - only move, move-only types..
// if out can be copied, then schedule can be called multiple
// times..
template <class Dur>
struct fn {
Dur after_;
PUSHMI_TEMPLATE(class Exec)
(requires TimeExecutor<Exec>) //
auto
operator()(Exec& ex) {
return schedule(ex, now(ex) + after_);
}
};
public:
template <class Dur>
auto operator()(Dur&& after) const {
return schedule_after_fn::fn<Dur>{(Dur &&) after};
}
};
} // namespace detail
namespace operators {
PUSHMI_INLINE_VAR constexpr detail::schedule_fn schedule{};
PUSHMI_INLINE_VAR constexpr detail::schedule_at_fn schedule_at{};
PUSHMI_INLINE_VAR constexpr detail::schedule_after_fn schedule_after{};
} // namespace operators
} // namespace pushmi
} // namespace folly
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -132,7 +132,8 @@ TEST_F(ImmediateFlowSingleSender, LateCancellation) { ...@@ -132,7 +132,8 @@ TEST_F(ImmediateFlowSingleSender, LateCancellation) {
using NT = decltype(mi::new_thread()); using NT = decltype(mi::new_thread());
inline auto make_time(mi::time_source<>& t, NT& ex) { inline auto make_time(mi::time_source<>& t, NT& ex) {
return t.make(mi::systemNowF{}, [ex]() { return ex; })(); auto strands = t.make(mi::systemNowF{}, ex);
return mi::make_strand(strands);
} }
class ConcurrentFlowSingleSender : public Test { class ConcurrentFlowSingleSender : public Test {
...@@ -185,6 +186,7 @@ class ConcurrentFlowSingleSender : public Test { ...@@ -185,6 +186,7 @@ class ConcurrentFlowSingleSender : public Test {
// make all the signals come from the same thread // make all the signals come from the same thread
tnt_ | tnt_ |
op::schedule() |
op::submit([&, op::submit([&,
stoppee = std::move(tokens.first), stoppee = std::move(tokens.first),
up_not_a_shadow_howtoeven = std::move(up), up_not_a_shadow_howtoeven = std::move(up),
...@@ -194,8 +196,8 @@ class ConcurrentFlowSingleSender : public Test { ...@@ -194,8 +196,8 @@ class ConcurrentFlowSingleSender : public Test {
// submit work to happen later // submit work to happen later
tnt | tnt |
op::submit_at( op::schedule_at(at_) |
at_, op::submit(
[stoppee = std::move(stoppee), [stoppee = std::move(stoppee),
out = std::move(out)](auto) mutable { out = std::move(out)](auto) mutable {
// check boolean to select signal // check boolean to select signal
...@@ -225,7 +227,7 @@ class ConcurrentFlowSingleSender : public Test { ...@@ -225,7 +227,7 @@ class ConcurrentFlowSingleSender : public Test {
// stop producer before it is scheduled to run // stop producer before it is scheduled to run
mi::on_starting([&, at](auto up) { mi::on_starting([&, at](auto up) {
signals_ += 10; signals_ += 10;
tcncl_ | op::submit_at(at, [up = std::move(up)](auto) mutable { tcncl_ | op::schedule_at(at) | op::submit([up = std::move(up)](auto) mutable {
::mi::set_done(up); ::mi::set_done(up);
}); });
})); }));
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment