Commit 0f0583e4 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

add many, many_deferred and from

fbshipit-source-id: 71a8c1998a0688e268a2a9a33aa601931e93e8c6
parent 7b28fc97
......@@ -41,12 +41,15 @@ set(header_files
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/executor.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/flow_single.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/flow_single_deferred.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/many.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/many_deferred.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/trampoline.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/new_thread.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/extension_operators.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/submit.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/subject.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/empty.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/from.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/just.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/error.h"
"${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/defer.h"
......
This diff is collapsed.
......@@ -34,6 +34,9 @@ struct construct_deduced<none>;
template<>
struct construct_deduced<single>;
template<>
struct construct_deduced<many>;
template<>
struct construct_deduced<flow_single>;
......
......@@ -56,9 +56,15 @@ class deferred;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class single;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class many;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class single_deferred;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class many_deferred;
template <PUSHMI_TYPE_CONSTRAINT(SemiMovable)... TN>
class time_single_deferred;
......
This diff is collapsed.
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "many.h"
namespace pushmi {
template <class V, class E = std::exception_ptr>
class any_many_deferred {
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(V)]; // can hold a V in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible<Wrapped>::value;
}
struct vtable {
static void s_op(data&, data*) {}
static void s_submit(data&, many<V, E>) {}
void (*op_)(data&, data*) = vtable::s_op;
void (*submit_)(data&, many<V, E>) = vtable::s_submit;
};
static constexpr vtable const noop_ {};
vtable const* vptr_ = &noop_;
template <class Wrapped>
any_many_deferred(Wrapped obj, std::false_type) : any_many_deferred() {
struct s {
static void op(data& src, data* dst) {
if (dst)
dst->pobj_ = std::exchange(src.pobj_, nullptr);
delete static_cast<Wrapped const*>(src.pobj_);
}
static void submit(data& src, many<V, E> out) {
::pushmi::submit(*static_cast<Wrapped*>(src.pobj_), std::move(out));
}
};
static const vtable vtbl{s::op, s::submit};
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtbl;
}
template <class Wrapped>
any_many_deferred(Wrapped obj, std::true_type) noexcept
: any_many_deferred() {
struct s {
static void op(data& src, data* dst) {
if (dst)
new (dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
}
static void submit(data& src, many<V, E> out) {
::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out));
}
};
static const vtable vtbl{s::op, s::submit};
new (data_.buffer_) Wrapped(std::move(obj));
vptr_ = &vtbl;
}
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same<U, any_many_deferred>::value, U>;
public:
using properties = property_set<is_sender<>, is_many<>>;
any_many_deferred() = default;
any_many_deferred(any_many_deferred&& that) noexcept
: any_many_deferred() {
that.vptr_->op_(that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
PUSHMI_TEMPLATE(class Wrapped)
(requires SenderTo<wrapped_t<Wrapped>, many<V, E>, is_many<>>)
explicit any_many_deferred(Wrapped obj) noexcept(insitu<Wrapped>())
: any_many_deferred{std::move(obj), bool_<insitu<Wrapped>()>{}} {}
~any_many_deferred() {
vptr_->op_(data_, nullptr);
}
any_many_deferred& operator=(any_many_deferred&& that) noexcept {
this->~any_many_deferred();
new ((void*)this) any_many_deferred(std::move(that));
return *this;
}
void submit(many<V, E> out) {
vptr_->submit_(data_, std::move(out));
}
};
// Class static definitions:
template <class V, class E>
constexpr typename any_many_deferred<V, E>::vtable const
any_many_deferred<V, E>::noop_;
template <class SF>
class many_deferred<SF> {
SF sf_;
public:
using properties = property_set<is_sender<>, is_many<>>;
constexpr many_deferred() = default;
constexpr explicit many_deferred(SF sf)
: sf_(std::move(sf)) {}
PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP(defer::Receiver<Out, is_many<>> PUSHMI_AND defer::Invocable<SF&, Out>))
void submit(Out out) {
sf_(std::move(out));
}
};
namespace detail {
template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>>) Data, class DSF>
class many_deferred_2 {
Data data_;
DSF sf_;
public:
using properties = property_set<is_sender<>, is_many<>>;
constexpr many_deferred_2() = default;
constexpr explicit many_deferred_2(Data data)
: data_(std::move(data)) {}
constexpr many_deferred_2(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {}
PUSHMI_TEMPLATE(class Out)
(requires PUSHMI_EXP(defer::Receiver<Out, is_many<>> PUSHMI_AND
defer::Invocable<DSF&, Data&, Out>))
void submit(Out out) {
sf_(data_, std::move(out));
}
};
template <class A, class B>
using many_deferred_base =
std::conditional_t<
(bool)Sender<A, is_many<>>,
many_deferred_2<A, B>,
any_many_deferred<A, B>>;
} // namespace detail
template <class A, class B>
struct many_deferred<A, B>
: detail::many_deferred_base<A, B> {
constexpr many_deferred() = default;
using detail::many_deferred_base<A, B>::many_deferred_base;
};
////////////////////////////////////////////////////////////////////////////////
// make_many_deferred
PUSHMI_INLINE_VAR constexpr struct make_many_deferred_fn {
inline auto operator()() const {
return many_deferred<ignoreSF>{};
}
PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
auto operator()(SF sf) const {
return many_deferred<SF>{std::move(sf)};
}
PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_many<>>)
auto operator()(Data d) const {
return many_deferred<Data, passDSF>{std::move(d)};
}
PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>>)
auto operator()(Data d, DSF sf) const {
return many_deferred<Data, DSF>{std::move(d), std::move(sf)};
}
} const make_many_deferred {};
////////////////////////////////////////////////////////////////////////////////
// deduction guides
#if __cpp_deduction_guides >= 201703
many_deferred() -> many_deferred<ignoreSF>;
PUSHMI_TEMPLATE(class SF)
(requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
many_deferred(SF) -> many_deferred<SF>;
PUSHMI_TEMPLATE(class Data)
(requires True<> && Sender<Data, is_many<>>)
many_deferred(Data) -> many_deferred<Data, passDSF>;
PUSHMI_TEMPLATE(class Data, class DSF)
(requires Sender<Data, is_many<>>)
many_deferred(Data, DSF) -> many_deferred<Data, DSF>;
#endif
// template <
// class V,
// class E = std::exception_ptr,
// SenderTo<many<V, E>, is_many<>> Wrapped>
// auto erase_cast(Wrapped w) {
// return many_deferred<V, E>{std::move(w)};
// }
} // namespace pushmi
......@@ -12,6 +12,8 @@
#include "../single.h"
#include "../deferred.h"
#include "../single_deferred.h"
#include "../many.h"
#include "../many_deferred.h"
#include "../time_single_deferred.h"
#include "../flow_single.h"
#include "../flow_single_deferred.h"
......@@ -53,6 +55,8 @@ struct make_receiver<is_none<>, void> : construct_deduced<none> {};
template <>
struct make_receiver<is_single<>, void> : construct_deduced<single> {};
template <>
struct make_receiver<is_many<>, void> : construct_deduced<many> {};
template <>
struct make_receiver<is_single<>, is_flow<>> : construct_deduced<flow_single> {};
template <PUSHMI_TYPE_CONSTRAINT(Sender) In>
......@@ -130,18 +134,23 @@ auto submit_transform_out(SDSF sdsf, TSDSF tsdsf) {
);
}
PUSHMI_TEMPLATE(class In, class Out)
(requires Sender<In> && Receiver<Out>)
PUSHMI_TEMPLATE(class In)
(requires Sender<In>)
auto deferred_from_maker() {
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) TimeSenderTo<In, Out, is_single<>>) (
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) Sender<In, is_flow<>, is_single<>>) (
return make_flow_single_deferred;
) else (
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) Sender<In, is_time<>, is_single<>>) (
return make_time_single_deferred;
) else (
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) SenderTo<In, Out, is_single<>>) (
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) Sender<In, is_single<>>) (
return make_single_deferred;
) else (
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) SenderTo<In, Out>) (
return make_deferred;
PUSHMI_IF_CONSTEXPR_RETURN( ((bool) Sender<In, is_many<>>) (
return make_many_deferred;
) else (
return make_deferred;
))
))
))
))
......@@ -150,13 +159,25 @@ auto deferred_from_maker() {
PUSHMI_TEMPLATE(class In, class Out, class... FN)
(requires Sender<In> && Receiver<Out>)
auto deferred_from(FN&&... fn) {
return deferred_from_maker<In, Out>()((FN&&) fn...);
return deferred_from_maker<In>()((FN&&) fn...);
}
PUSHMI_TEMPLATE(class In, class Out, class... FN)
(requires Sender<In> && Receiver<Out>)
auto deferred_from(In in, FN&&... fn) {
return deferred_from_maker<In, Out>()(std::move(in), (FN&&) fn...);
return deferred_from_maker<In>()(std::move(in), (FN&&) fn...);
}
PUSHMI_TEMPLATE(class In, class... FN)
(requires Sender<In>)
auto deferred_from(FN&&... fn) {
return deferred_from_maker<In>()((FN&&) fn...);
}
PUSHMI_TEMPLATE(class In, class... FN)
(requires Sender<In>)
auto deferred_from(In in, FN&&... fn) {
return deferred_from_maker<In>()(std::move(in), (FN&&) fn...);
}
PUSHMI_TEMPLATE(
......@@ -214,6 +235,17 @@ struct set_done_fn {
}
};
struct set_next_fn {
template<class V>
auto operator()(V&& v) const {
return constrain(lazy::Receiver<_1, is_many<>>,
[v = (V&&) v](auto out) mutable {
::pushmi::set_next(out, (V&&) v);
}
);
}
};
struct set_starting_fn {
PUSHMI_TEMPLATE(class Up)
(requires Receiver<Up>)
......@@ -264,6 +296,7 @@ namespace extension_operators {
PUSHMI_INLINE_VAR constexpr detail::set_done_fn set_done{};
PUSHMI_INLINE_VAR constexpr detail::set_error_fn set_error{};
PUSHMI_INLINE_VAR constexpr detail::set_value_fn set_value{};
PUSHMI_INLINE_VAR constexpr detail::set_next_fn set_next{};
PUSHMI_INLINE_VAR constexpr detail::set_starting_fn set_starting{};
PUSHMI_INLINE_VAR constexpr detail::do_submit_fn submit{};
PUSHMI_INLINE_VAR constexpr detail::now_fn now{};
......
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../many_deferred.h"
#include "extension_operators.h"
#include "submit.h"
namespace pushmi {
namespace operators {
PUSHMI_TEMPLATE(class O, class S)
(requires
ConvertibleTo<
typename std::iterator_traits<O>::iterator_category,
std::forward_iterator_tag> &&
ConvertibleTo<
typename std::iterator_traits<S>::iterator_category,
std::forward_iterator_tag>)
auto from(O begin, S end) {
return make_many_deferred(constrain(
lazy::ManyReceiver<_1, typename std::iterator_traits<O>::value_type>,
[begin = std::move(begin), end = std::move(end)](auto out) {
auto c = begin;
for (; c != end; ++c) {
::pushmi::set_next(out, *c);
}
::pushmi::set_done(out);
}));
}
PUSHMI_TEMPLATE(class R)
(requires requires (
std::begin(std::declval<R&&>()),
std::end(std::declval<R&&>())
))
auto from(R&& range) {
return from(std::begin(range), std::end(range));
}
} // namespace operators
} // namespace pushmi
......@@ -7,6 +7,7 @@
// LICENSE file in the root directory of this source tree.
#include "../single.h"
#include "../many.h"
#include "submit.h"
#include "extension_operators.h"
......@@ -14,13 +15,19 @@ namespace pushmi {
namespace detail {
// extracted this to workaround cuda compiler failure to compute the static_asserts in the nested lambda context
template<class F, class Tag>
struct transform_on;
template<class F>
struct transform_on_value {
struct transform_on<F, is_single<>> {
F f_;
transform_on_value() = default;
constexpr explicit transform_on_value(F f)
transform_on() = default;
constexpr explicit transform_on(F f)
: f_(std::move(f)) {}
template<class Out>
auto operator()(Out out) const {
return make_single(std::move(out), on_value(*this));
}
template<class Out, class V>
auto operator()(Out& out, V&& v) {
using Result = decltype(f_((V&&) v));
......@@ -32,6 +39,27 @@ struct transform_on_value {
}
};
template<class F>
struct transform_on<F, is_many<>> {
F f_;
transform_on() = default;
constexpr explicit transform_on(F f)
: f_(std::move(f)) {}
template<class Out>
auto operator()(Out out) const {
return make_many(std::move(out), on_next(*this));
}
template<class Out, class V>
auto operator()(Out& out, V&& v) {
using Result = decltype(f_((V&&) v));
static_assert(::pushmi::SemiMovable<Result>,
"none of the functions supplied to transform can convert this value");
static_assert(::pushmi::ManyReceiver<Out, Result>,
"Result of value transform cannot be delivered to Out");
::pushmi::set_next(out, f_((V&&) v));
}
};
struct transform_fn {
template <class... FN>
auto operator()(FN... fn) const;
......@@ -44,28 +72,11 @@ auto transform_fn::operator()(FN... fn) const {
using In = decltype(in);
// copy 'f' to allow multiple calls to connect to multiple 'in'
using F = decltype(f);
return ::pushmi::detail::deferred_from<In, ::pushmi::single<>>(
using Cardinality = property_set_index_t<properties_t<In>, is_silent<>>;
return ::pushmi::detail::deferred_from<In>(
std::move(in),
::pushmi::detail::submit_transform_out<In>(
::pushmi::constrain(::pushmi::lazy::Receiver<::pushmi::_1>, [f](auto out) {
using Out = decltype(out);
return ::pushmi::detail::out_from_fn<In>()(
std::move(out),
// copy 'f' to allow multiple calls to submit
::pushmi::on_value(
transform_on_value<F>(f)
// [f](Out& out, auto&& v) {
// using V = decltype(v);
// using Result = decltype(f((V&&) v));
// static_assert(::pushmi::SemiMovable<Result>,
// "none of the functions supplied to transform can convert this value");
// static_assert(::pushmi::SingleReceiver<Out, Result>,
// "Result of value transform cannot be delivered to Out");
// ::pushmi::set_value(out, f((V&&) v));
// }
)
);
})
transform_on<F, Cardinality>{f}
)
);
});
......
......@@ -137,6 +137,58 @@ void single_test() {
auto any3 = pushmi::any_single<int>(proxy0);
}
void many_test() {
auto out0 = pushmi::MAKE(many)();
auto out1 = pushmi::MAKE(many)(pushmi::ignoreNF{});
auto out2 = pushmi::MAKE(many)(pushmi::ignoreNF{}, pushmi::abortEF{});
auto out3 =
pushmi::MAKE(many)(pushmi::ignoreNF{}, pushmi::abortEF{}, pushmi::ignoreDF{});
auto out4 = pushmi::MAKE(many)([](auto v) { v.get(); });
auto out5 = pushmi::MAKE(many)(
pushmi::on_next([](auto v) { v.get(); }, [](int v) {}),
pushmi::on_error(
[](std::exception_ptr e) noexcept {},
[](auto e)noexcept { e.get(); }
));
auto out6 = pushmi::MAKE(many)(
pushmi::on_error(
[](std::exception_ptr e) noexcept {},
[](auto e) noexcept { e.get(); }
));
auto out7 = pushmi::MAKE(many)(
pushmi::on_done([]() { }));
using Out0 = decltype(out0);
auto proxy0 = pushmi::MAKE(many)(out0);
auto proxy1 = pushmi::MAKE(many)(out0, pushmi::passDNXF{});
auto proxy2 = pushmi::MAKE(many)(out0, pushmi::passDNXF{}, pushmi::passDEF{});
auto proxy3 = pushmi::MAKE(many)(
out0, pushmi::passDNXF{}, pushmi::passDEF{}, pushmi::passDDF{});
auto proxy4 = pushmi::MAKE(many)(out0, [](auto d, auto v) {
pushmi::set_next(d, v.get());
});
auto proxy5 = pushmi::MAKE(many)(
out0,
pushmi::on_next([](Out0&, auto v) { v.get(); }, [](Out0&, int v) {}),
pushmi::on_error(
[](Out0&, std::exception_ptr e) noexcept {},
[](Out0&, auto e) noexcept { e.get(); }
));
auto proxy6 = pushmi::MAKE(many)(
out0,
pushmi::on_error(
[](Out0&, std::exception_ptr e) noexcept {},
[](Out0&, auto e) noexcept { e.get(); }
));
auto proxy7 = pushmi::MAKE(many)(
out0,
pushmi::on_done([](Out0&) { }));
auto any0 = pushmi::any_many<int>(out0);
auto any1 = pushmi::any_many<int>(proxy0);
}
void single_deferred_test(){
auto in0 = pushmi::MAKE(single_deferred)();
auto in1 = pushmi::MAKE(single_deferred)(pushmi::ignoreSF{});
......@@ -159,6 +211,24 @@ void single_deferred_test(){
auto any0 = pushmi::any_single_deferred<int>(in0);
}
void many_deferred_test(){
auto in0 = pushmi::MAKE(many_deferred)();
auto in1 = pushmi::MAKE(many_deferred)(pushmi::ignoreSF{});
auto in3 = pushmi::MAKE(many_deferred)([&](auto out){
in0.submit(pushmi::MAKE(many)(std::move(out),
pushmi::on_next([](auto d, int v){ pushmi::set_next(d, v); })
));
});
auto out0 = pushmi::MAKE(many)();
auto out1 = pushmi::MAKE(many)(out0, pushmi::on_next([](auto d, int v){
pushmi::set_next(d, v);
}));
in3.submit(out1);
auto any0 = pushmi::any_many_deferred<int>(in0);
}
void time_single_deferred_test(){
auto in0 = pushmi::MAKE(time_single_deferred)();
auto in1 = pushmi::MAKE(time_single_deferred)(pushmi::ignoreSF{});
......
......@@ -7,6 +7,7 @@ using namespace std::literals;
#include "pushmi/flow_single_deferred.h"
#include "pushmi/o/empty.h"
#include "pushmi/o/from.h"
#include "pushmi/o/just.h"
#include "pushmi/o/on.h"
#include "pushmi/o/transform.h"
......@@ -117,3 +118,35 @@ SCENARIO( "just() can be used with transform and submit", "[just][deferred]" ) {
}
}
}
SCENARIO( "from() can be used with transform and submit", "[from][deferred]" ) {
GIVEN( "A from int many_deferred" ) {
int arr[] = {0, 9, 99};
auto m = op::from(arr);
using M = decltype(m);
REQUIRE( v::SenderTo<M, v::any_many<int>, v::is_many<>> );
WHEN( "transform and submit are applied" ) {
int signals = 0;
int value = 0;
m |
op::transform(
[&](int v){ signals += 10000; return v + 1; },
[&](auto v){ std:abort(); return v; }) |
op::transform(
[&](int v){ signals += 10000; return v * 2; }) |
op::submit(
[&](auto v){ value += v; signals += 100; },
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; });
THEN( "the transform signal is recorded twice, the value signal once and the result is correct" ) {
REQUIRE( signals == 60310 );
REQUIRE( value == 222 );
}
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment