Commit 4309d2bb authored by Eric Niebler's avatar Eric Niebler Committed by Facebook Github Bot

initial commit

fbshipit-source-id: f5fba149a4200b522bf90700f320685cbf0737f9
parent 799a718f
cmake_minimum_required(VERSION 3.7)
project(pushmi-project CXX)
FIND_PACKAGE (Threads)
add_library(pushmi INTERFACE)
target_include_directories(pushmi INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/external/networking-ts-impl/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/external/meta/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/external/Catch2/single_include>
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include/net>
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include/meta>
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include/Catch2>)
#target_compile_features(pushmi INTERFACE cxx_std_17)
target_compile_options(pushmi INTERFACE
$<$<CXX_COMPILER_ID:GNU>:-std=c++2a>
$<$<CXX_COMPILER_ID:GNU>:-fconcepts>)
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/
DESTINATION include)
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/networking-ts-impl/include/
DESTINATION include/net)
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/meta/include/
DESTINATION include/meta)
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/Catch2/
DESTINATION include/Catch2)
install(TARGETS pushmi EXPORT pushmi-project)
install(EXPORT pushmi-project DESTINATION pushmi-project)
enable_testing()
include(CTest)
add_subdirectory(test)
# Code of Conduct
Facebook has adopted a Code of Conduct that we expect project participants to adhere to. Please read the [full text](https://code.facebook.com/pages/876921332402685/open-source-code-of-conduct) so that you can understand what actions will and will not be tolerated.
# Contributing to pushmi
We want to make contributing to this project as easy and transparent as
possible.
## Our Development Process
All Development on this project is public.
## Pull Requests
We actively welcome your pull requests.
1. Fork the repo and create your branch from `master`.
2. If you've added code that should be tested, add tests.
3. If you've changed APIs, update the documentation.
4. Ensure the test suite passes.
5. Make sure your code lints.
6. If you haven't already, complete the Contributor License Agreement ("CLA").
## Contributor License Agreement ("CLA")
In order to accept your pull request, we need you to submit a CLA. You only need
to do this once to work on any of Facebook's open source projects.
Complete your CLA here: <https://code.facebook.com/cla>
## Issues
We use GitHub issues to track public bugs. Please ensure your description is
clear and has sufficient instructions to be able to reproduce the issue.
Facebook has a [bounty program](https://www.facebook.com/whitehat/) for the safe
disclosure of security bugs. In those cases, please go through the process
outlined on that page and do not file a public issue.
## Coding Style
* 2 spaces for indentation rather than tabs
* 80 character line length
## License
By contributing to pushmi, you agree that your contributions will be licensed
under the LICENSE file in the root directory of this source tree.
MIT License
Copyright (c) 2018-present, Facebook, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
This diff is collapsed.
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <functional>
#include <utility>
#include "concepts.h"
namespace pushmi {
template<class T>
struct construct {
template<class... AN>
requires Constructible<T, AN...>
auto operator()(AN&&... an) const {
return T{std::forward<AN>(an)...};
}
};
template<template <class...> class T>
struct construct_deduced {
template<class... AN>
requires requires (AN&&... an) { T{(AN&&) an...}; }
auto operator()(AN&&... an) const {
return T{std::forward<AN>(an)...};
}
};
template <template <class...> class T, class... AN>
using deduced_type_t = std::invoke_result_t<construct_deduced<T>, AN...>;
// template <class Fn>
// struct apply {
// template <class Tup>
// requires requires (Tup&& tup) { std::apply(Fn{}, (Tup&&) tup); }
// decltype(auto) operator()(Tup&& tup) const {
// return std::apply(Fn{}, (Tup&&) tup);
// }
// };
//
// template <class Fn, class Gn>
// struct compose {
// template <class... AN>
// requires Invocable<Gn, AN...> &&
// Invocable<Fn, std::invoke_result_t<Gn, AN...>>
// decltype(auto) operator()(AN&&... an) const {
// return std::invoke(Fn{}, std::invoke(Gn{}, (AN&&) an...));
// }
// };
//
// template <class T>
// inline constexpr apply<construct<T>> from_tuple {};
//
// template <template <class...> class T>
// inline constexpr apply<construct_deduced<T>> from_tuple_deduced {};
template <class T, class... AN>
auto from_tuple(std::tuple<AN...>&& t) {
return std::apply(construct<T>{}, std::move(t));
}
template <template<class...> class T, class... AN>
auto from_tuple(std::tuple<AN...>&& t) {
using Deduced = decltype(T{std::declval<AN&&>()...});
return std::apply(construct<Deduced>{}, std::move(t));
}
template <class T>
void sfinae_from_tuple(...);
template <template<class... TN> class T>
void sfinae_from_tuple(...);
template <class T, class... AN,
class Constructor = construct<T>>
auto sfinae_from_tuple(std::tuple<AN...>&& t) ->
decltype(std::apply(Constructor{}, std::move(t))) {
return std::apply(Constructor{}, std::move(t));
}
template <template<class...> class T, class... AN,
class Deduced = decltype(T{std::declval<AN>()...}),
class Constructor = construct<Deduced>>
auto sfinae_from_tuple(std::tuple<AN...>&& t) ->
decltype(std::apply(Constructor{}, std::move(t))) {
return std::apply(Constructor{}, std::move(t));
}
struct ignoreVF {
template <class V>
void operator()(V&&) {}
};
struct abortEF {
template <class E>
void operator()(E) noexcept {
std::abort();
}
};
struct ignoreDF {
void operator()() {}
};
struct ignoreStpF {
void operator()() {}
};
struct ignoreStrtF {
template <class Up>
void operator()(Up&) {}
};
struct ignoreSF {
template <class Out>
void operator()(Out) {}
template <class TP, class Out>
void operator()(TP, Out) {}
};
struct systemNowF {
auto operator()() { return std::chrono::system_clock::now(); }
};
struct passDVF {
template <class V, Receiver Data>
requires requires(Data& out, V&& v) {
::pushmi::set_value(out, (V&&) v);
}
void operator()(Data& out, V&& v) const {
::pushmi::set_value(out, (V&&) v);
}
};
struct passDEF {
template <class E, NoneReceiver<E> Data>
void operator()(Data& out, E e) const noexcept {
::pushmi::set_error(out, e);
}
};
struct passDDF {
template <Receiver Data>
void operator()(Data& out) const {
::pushmi::set_done(out);
}
};
struct passDStpF {
template <Receiver Data>
void operator()(Data& out) const {
::pushmi::set_stopping(out);
}
};
struct passDStrtF {
template <class Up, Receiver Data>
requires requires(Data& out, Up& up) {
::pushmi::set_starting(out, up);
}
void operator()(Data& out, Up& up) const {
::pushmi::set_starting(out, up);
}
};
struct passDSF {
template <class Data, class Out>
void operator()(Data& in, Out out) {
::pushmi::submit(in, std::move(out));
}
template <class Data, class TP, class Out>
void operator()(Data& in, TP at, Out out) {
::pushmi::submit(in, std::move(at), std::move(out));
}
};
struct passDNF {
template <TimeSender Data>
auto operator()(Data& in) const noexcept {
return ::pushmi::now(in);
}
};
// inspired by Ovrld - shown in a presentation by Nicolai Josuttis
template <SemiMovable... Fns>
struct overload : Fns... {
constexpr overload() = default;
constexpr explicit overload(Fns... fns) requires sizeof...(Fns) == 1
: Fns(std::move(fns))... {}
constexpr overload(Fns... fns) requires sizeof...(Fns) > 1
: Fns(std::move(fns))... {}
using Fns::operator()...;
};
template <class... F>
overload(F...) -> overload<F...>;
template <class... Fns>
struct on_value : overload<Fns...> {
constexpr on_value() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class... F>
on_value(F...)->on_value<F...>;
template <class... Fns>
struct on_error : overload<Fns...> {
constexpr on_error() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class... F>
on_error(F...)->on_error<F...>;
template <class... Fns>
struct on_done : overload<Fns...> {
constexpr on_done() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class F>
on_done(F)->on_done<F>;
template <class... Fns>
struct on_stopping : overload<Fns...> {
constexpr on_stopping() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class F>
on_stopping(F)->on_stopping<F>;
template <class... Fns>
struct on_starting : overload<Fns...> {
constexpr on_starting() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class... F>
on_starting(F...)->on_starting<F...>;
template <class... Fns>
struct on_submit : overload<Fns...> {
constexpr on_submit() = default;
using overload<Fns...>::overload;
using Fns::operator()...;
};
template <class... F>
on_submit(F...)->on_submit<F...>;
} // namespace pushmi
// clang-format off
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "forwards.h"
#include "extension_points.h"
namespace pushmi {
// tag types
struct silent_tag {};
struct none_tag : silent_tag {};
struct single_tag : none_tag {};
struct flow_tag : single_tag {};
template <class Tag>
concept bool Silent = Derived<Tag, silent_tag>;
template <class Tag>
concept bool None = Silent<Tag> && Derived<Tag, none_tag>;
template <class Tag>
concept bool Single = None<Tag> && Derived<Tag, single_tag>;
template <class Tag>
concept bool Flow = Single<Tag> && Derived<Tag, flow_tag>;
template <class T>
using __sender_category_t = typename T::sender_category;
template <class T>
struct sender_traits : sender_traits<std::decay_t<T>> {
};
template <Decayed T>
struct sender_traits<T> {
};
template <Decayed T>
requires Valid<T, __sender_category_t>
struct sender_traits<T> {
using sender_category = __sender_category_t<T>;
};
template <class T>
using sender_category_t = __sender_category_t<sender_traits<T>>;
template <class T>
using __receiver_category_t = typename T::receiver_category;
template <class T>
struct receiver_traits : receiver_traits<std::decay_t<T>> {
};
template <Decayed T>
struct receiver_traits<T> {
};
template <Decayed T>
requires Valid<T, __receiver_category_t>
struct receiver_traits<T> {
using receiver_category = __receiver_category_t<T>;
};
template <class T>
using receiver_category_t = __receiver_category_t<receiver_traits<T>>;
template <class S, class Tag = silent_tag>
concept bool Receiver = Valid<receiver_traits<S>, __receiver_category_t> &&
Derived<receiver_category_t<S>, Tag> &&
SemiMovable<S> && requires (S& s) {
::pushmi::set_done(s);
};
template <class S, class E = std::exception_ptr>
concept bool NoneReceiver = Receiver<S> &&
Derived<receiver_category_t<S>, none_tag> &&
requires(S& s, E&& e) {
::pushmi::set_error(s, (E &&) e);
};
template <class S, class T, class E = std::exception_ptr>
concept bool SingleReceiver = NoneReceiver<S, E> &&
Derived<receiver_category_t<S>, single_tag> &&
requires(S& s, T&& t) {
::pushmi::set_value(s, (T &&) t); // Semantics: called exactly once.
};
template <class D, class Tag = silent_tag>
concept bool Sender = Valid<sender_traits<D>, __sender_category_t> &&
Derived<sender_category_t<D>, Tag> && SemiMovable<D>;
template <class D, class S, class Tag = silent_tag>
concept bool SenderTo = Sender<D, Tag> &&
Derived<sender_category_t<D>, Tag> &&
Receiver<S, Tag> && requires(D& d, S&& s) {
::pushmi::submit(d, (S &&) s);
};
template <class D, class Tag = silent_tag>
concept bool TimeSender = Sender<D, Tag> && requires(D& d) {
{ ::pushmi::now(d) } -> Regular
};
template <class D, class S, class Tag = silent_tag>
concept bool TimeSenderTo = Receiver<S, Tag> && TimeSender<D, Tag> &&
requires(D& d, S&& s) {
::pushmi::submit(d, ::pushmi::now(d), (S &&) s);
};
template <TimeSender D>
using time_point_t = decltype(::pushmi::now(std::declval<D&>()));
// // this is a more general form where C (Constraint) could be time or priority
// // enum or any other ordering constraint value-type.
// //
// // top() returns the constraint value that will push the item as high in the
// // queue as currently possible. So now() for time and HIGH for priority.
// //
// // I would like to replace Time.. with Priority.. but not sure if it will
// // obscure too much.
// template <class D>
// concept bool PrioritySource = requires(D& d) {
// { ::pushmi::top(d) } -> Regular
// };
//
// template <PrioritySource D>
// using constraint_t = decltype(::pushmi::top(std::declval<D&>()));
//
// template <class D, class S>
// concept bool SemiPrioritySender = requires(D& d, S&& s) {
// ::pushmi::submit(d, ::pushmi::top(d), (S &&) s);
// };
//
// template <class D, class S, class E = std::exception_ptr>
// concept bool PrioritySender =
// NoneReceiver<S, E> && SemiPrioritySender<D, S> &&
// PrioritySource<D> && requires(D& d, S& s) {
// { ::pushmi::top(d) } -> constraint_t<D>;
// };
//
// template <class D, class S, class T, class E = std::exception_ptr>
// concept bool PrioritySingleSender = SingleReceiver<S, T, E> &&
// PrioritySender<D, S, E>;
//
// template <class D, class S>
// concept bool SemiPrioritySingleSender = SemiPrioritySender<D, S>;
// add concepts to support cancellation
//
template <class N, class Up, class PE = std::exception_ptr>
concept bool FlowNone = NoneReceiver<Up, PE> && requires(N& n, Up& up) {
::pushmi::set_stopping(n);
::pushmi::set_starting(n, up);
};
template <
class D,
class N,
class Up,
class PE = std::exception_ptr,
class E = PE>
concept bool FlowNoneSender = FlowNone<N, Up, PE> &&
SenderTo<D, N> && NoneReceiver<N, E>;
template <
class S,
class Up,
class T,
class PE = std::exception_ptr,
class E = PE>
concept bool FlowSingle = SingleReceiver<S, T, E> && FlowNone<S, Up, PE>;
template <
class D,
class S,
class Up,
class T,
class PE = std::exception_ptr,
class E = PE>
concept bool FlowSingleSender =
FlowSingle<S, Up, T, PE, E> && FlowNoneSender<D, S, Up, PE, E>;
template <class D, class S>
concept bool SemiFlowSingleSender = SenderTo<D, S, single_tag>;
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "none.h"
namespace pushmi {
namespace detail {
struct erase_deferred_t {};
} // namespace detail
template <class E>
class deferred<detail::erase_deferred_t, E> {
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(std::promise<void>)]; // can hold a void promise in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible_v<Wrapped>;
}
struct vtable {
void (*op_)(data&, data*) = +[](data&, data*) {};
void (*submit_)(data&, any_none<E>) = +[](data&, any_none<E>) {};
static constexpr vtable const noop_ = {};
} const* vptr_ = &vtable::noop_;
template <class Wrapped, bool = insitu<Wrapped>()>
static constexpr vtable const vtable_v = {
+[](data& src, data* dst) {
if (dst)
dst->pobj_ = std::exchange(src.pobj_, nullptr);
delete static_cast<Wrapped const*>(src.pobj_);
},
+[](data& src, any_none<E> out) {
::pushmi::submit(*static_cast<Wrapped*>(src.pobj_), std::move(out));
}
};
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same_v<U, deferred>, U>;
public:
using sender_category = none_tag;
deferred() = default;
deferred(deferred&& that) noexcept : deferred() {
that.vptr_->op_(that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
template <class Wrapped>
requires SenderTo<wrapped_t<Wrapped>, any_none<E>, none_tag>
explicit deferred(Wrapped obj)
: deferred() {
if constexpr (insitu<Wrapped>())
new (data_.buffer_) Wrapped(std::move(obj));
else
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtable_v<Wrapped>;
}
~deferred() {
vptr_->op_(data_, nullptr);
}
deferred& operator=(deferred&& that) noexcept {
this->~deferred();
new ((void*)this) deferred(std::move(that));
return *this;
}
void submit(any_none<E> out) {
vptr_->submit_(data_, std::move(out));
}
};
// Class static definitions:
template <class E>
constexpr typename deferred<detail::erase_deferred_t, E>::vtable const
deferred<detail::erase_deferred_t, E>::vtable::noop_;
template <class E>
template <class Wrapped, bool Big>
constexpr typename deferred<detail::erase_deferred_t, E>::vtable const
deferred<detail::erase_deferred_t, E>::vtable_v;
template <class E>
template <class Wrapped>
constexpr typename deferred<detail::erase_deferred_t, E>::vtable const
deferred<detail::erase_deferred_t, E>::vtable_v<Wrapped, true> = {
+[](data& src, data* dst) {
if (dst)
new (dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
},
+[](data& src, any_none<E> out) {
::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out));
}
};
template <class SF>
class deferred<SF> {
SF sf_;
public:
using sender_category = none_tag;
constexpr deferred() = default;
constexpr explicit deferred(SF sf) : sf_(std::move(sf)) {}
template <Receiver<none_tag> Out>
requires Invocable<SF&, Out>
void submit(Out out) {
sf_(std::move(out));
}
};
template <Sender<none_tag> Data, class DSF>
class deferred<Data, DSF> {
Data data_;
DSF sf_;
public:
using sender_category = none_tag;
constexpr deferred() = default;
constexpr explicit deferred(Data data)
: data_(std::move(data)) {}
constexpr deferred(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {}
template <Receiver<none_tag> Out>
requires Invocable<DSF&, Data&, Out>
void submit(Out out) {
sf_(data_, std::move(out));
}
};
deferred() -> deferred<ignoreSF>;
template <class SF>
deferred(SF) -> deferred<SF>;
template <Sender<none_tag> Wrapped>
deferred(Wrapped) ->
deferred<detail::erase_deferred_t, std::exception_ptr>;
template <Sender<none_tag> Data, class DSF>
deferred(Data, DSF) -> deferred<Data, DSF>;
template <class E = std::exception_ptr>
using any_deferred = deferred<detail::erase_deferred_t, E>;
// template <SenderTo<any_none<std::exception_ptr>, none_tag> Wrapped>
// auto erase_cast(Wrapped w) {
// return deferred<detail::erase_deferred_t, std::exception_ptr>{std::move(w)};
// }
//
// template <class E, SenderTo<any_none<E>, none_tag> Wrapped>
// requires Same<none_tag, sender_category_t<Wrapped>>
// auto erase_cast(Wrapped w) {
// return deferred<detail::erase_deferred_t, E>{std::move(w)};
// }
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <chrono>
#include <functional>
#include "time_single_deferred.h"
namespace pushmi {
namespace detail {
template<class TP>
struct any_time_executor_ref_vtable {
TP (*now_)(void*);
void (*submit_)(void*, TP, void*);
};
template <class E, class TP, class Other, class Wrapped>
auto any_time_executor_ref_vtable_v() {
static constexpr any_time_executor_ref_vtable<TP> const vtable_v {
+[](void* pobj) { return ::pushmi::now(*static_cast<Wrapped*>(pobj)); },
+[](void* pobj, TP tp, void* s) {
return ::pushmi::submit(
*static_cast<Wrapped*>(pobj),
tp,
std::move(*static_cast<single<Other, E>*>(s)));
}
};
return &vtable_v;
};
} // namespace detail
template<class E, class TP, int i>
struct any_time_executor_ref {
private:
// use two instances to resolve recurive type definition.
using This = any_time_executor_ref<E, TP, i>;
using Other = any_time_executor_ref<E, TP, i == 0 ? 1 : 0>;
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<
!std::is_same_v<U, This> &&
!std::is_same_v<U, Other>, U>;
void* pobj_;
detail::any_time_executor_ref_vtable<TP> const *vptr_;
public:
using sender_category = single_tag;
any_time_executor_ref() = delete;
template<int n>
any_time_executor_ref(any_time_executor_ref<E, TP, n>&& o) :
pobj_(o.pobj_), vptr_(o.vptr_) {
o.pobj_ = nullptr;
o.vptr_ = nullptr;
};
template<int n>
any_time_executor_ref(const any_time_executor_ref<E, TP, n>& o) :
pobj_(o.pobj_), vptr_(o.vptr_) {};
template <class Wrapped, TimeSender<single_tag> W = wrapped_t<Wrapped>>
// requires TimeSenderTo<W, single<Other, E>>
any_time_executor_ref(Wrapped& w) {
// This can't be a requirement because it asks if submit(w, now(w), single<T,E>)
// is well-formed (where T is an alias for any_time_executor_ref). If w
// has a submit that is constrained with SingleReceiver<single<T, E>, T'&, E'>, that
// will ask whether value(single<T,E>, T'&) is well-formed. And *that* will
// ask whether T'& is convertible to T. That brings us right back to this
// constructor. Constraint recursion!
static_assert(TimeSenderTo<W, single<Other, E>>);
if constexpr ((bool)TimeSenderTo<W, single<Other, E>>) {
pobj_ = std::addressof(w);
vptr_ = detail::any_time_executor_ref_vtable_v<E, TP, Other, Wrapped>();
}
}
std::chrono::system_clock::time_point now() {
return vptr_->now_(pobj_);
}
template<class SingleReceiver>
void submit(TP tp, SingleReceiver&& sa) {
// static_assert(
// ConvertibleTo<SingleReceiver, any_single<Other, E>>,
// "requires any_single<any_time_executor_ref<E, TP>, E>");
any_single<Other, E> s{(SingleReceiver&&) sa};
vptr_->submit_(pobj_, tp, &s);
}
};
using archtype_any_time_executor_ref = any_time_executor_ref<std::exception_ptr, std::chrono::system_clock::time_point, 0>;
template <class E = std::exception_ptr, class TP = std::chrono::system_clock::time_point>
any_time_executor_ref()->any_time_executor_ref<E, TP, 0>;
template <class Wrapped, class E = std::exception_ptr, class TP = std::chrono::system_clock::time_point>
any_time_executor_ref(Wrapped)->any_time_executor_ref<E, TP, 0>;
template<class E, class TP>
struct any_time_executor :
any_time_single_deferred<any_time_executor_ref<E, TP>, E, TP> {
constexpr any_time_executor() = default;
using any_time_single_deferred<any_time_executor_ref<E, TP>, E, TP>::
any_time_single_deferred;
};
template <class E = std::exception_ptr, class TP = std::chrono::system_clock::time_point>
any_time_executor()->any_time_executor<E, TP>;
template <class E = std::exception_ptr, class TP = std::chrono::system_clock::time_point, class Wrapped>
any_time_executor(Wrapped)->any_time_executor<E, TP>;
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <future>
#include <functional>
#include "traits.h"
namespace pushmi {
namespace __adl {
template <class S>
requires requires(S& s) {
s.done();
}
void set_done(S& s) noexcept(noexcept(s.done())) {
s.done();
}
template <class S, class E>
requires requires(S& s, E e) {
s.error(std::move(e));
}
void set_error(S& s, E e) noexcept(noexcept(s.error(std::move(e)))) {
s.error(std::move(e));
}
template <class S, class V>
requires requires(S& s, V&& v) {
s.value((V&&) v);
}
void set_value(S& s, V&& v) noexcept(noexcept(s.value((V&&) v))) {
s.value((V&&) v);
}
template <class S>
requires requires(S& s) {
s.stopping();
}
void set_stopping(S& s) noexcept(noexcept(s.stopping())) {
s.stopping();
}
template <class S, class Up>
requires requires(S& s, Up& up) {
s.starting(up);
}
void set_starting(S& s, Up& up) noexcept(noexcept(s.starting(up))) {
s.starting(up);
}
template <class SD, class Out>
requires requires(SD& sd, Out out) {
sd.submit(std::move(out));
}
void submit(SD& sd, Out out) noexcept(noexcept(sd.submit(std::move(out)))) {
sd.submit(std::move(out));
}
template <class SD>
requires requires(SD& sd) {
sd.now();
}
auto now(SD& sd) noexcept(noexcept(sd.now())) {
return sd.now();
}
template <class SD, class TP, class Out>
requires requires(SD& sd, TP tp, Out out) {
{ sd.now() } -> TP;
sd.submit(std::move(tp), std::move(out));
}
void submit(SD& sd, TP tp, Out out)
noexcept(noexcept(sd.submit(std::move(tp), std::move(out)))) {
sd.submit(std::move(tp), std::move(out));
}
template <class T>
void set_done(std::promise<T>& p) noexcept(
noexcept(p.set_exception(std::make_exception_ptr(0)))) {
p.set_exception(std::make_exception_ptr(
std::logic_error("std::promise does not support done.")));
}
inline void set_done(std::promise<void>& p) noexcept(noexcept(p.set_value())) {
p.set_value();
}
template <class T>
void set_error(std::promise<T>& s, std::exception_ptr e) noexcept {
s.set_exception(std::move(e));
}
template <class T, class E>
void set_error(std::promise<T>& s, E e) noexcept {
s.set_exception(std::make_exception_ptr(std::move(e)));
}
template <class T>
requires !Same<T, void>
void set_value(std::promise<T>& s, T t) {
s.set_value(std::move(t));
}
template <class S>
requires requires (S& s) { set_done(s); }
void set_done(std::reference_wrapper<S> s) noexcept(
noexcept(set_done(s.get()))) {
set_done(s.get());
}
template <class S, class E>
requires requires (S& s, E e) { set_error(s, std::move(e)); }
void set_error(std::reference_wrapper<S> s, E e) noexcept {
set_error(s.get(), std::move(e));
}
template <class S, class V>
requires requires (S& s, V&& v) { set_value(s, (V&&) v); }
void set_value(std::reference_wrapper<S> s, V&& v) noexcept(
noexcept(set_value(s.get(), (V&&) v))) {
set_value(s.get(), (V&&) v);
}
template <class S>
requires requires(S& s) { set_stopping(s); }
void set_stopping(std::reference_wrapper<S> s) noexcept(
noexcept(set_stopping(s.get()))) {
set_stopping(s.get());
}
template <class S, class Up>
requires requires(S& s, Up& up) { set_starting(s, up); }
void set_starting(std::reference_wrapper<S> s, Up& up) noexcept(
noexcept(set_starting(s.get(), up))) {
set_starting(s.get(), up);
}
template <class SD, class Out>
requires requires(SD& sd, Out out) { submit(sd, std::move(out)); }
void submit(std::reference_wrapper<SD> sd, Out out) noexcept(
noexcept(submit(sd.get(), std::move(out)))) {
submit(sd.get(), std::move(out));
}
template <class SD>
requires requires(SD& sd) { now(sd); }
auto now(std::reference_wrapper<SD> sd) noexcept(noexcept(now(sd.get()))) {
return now(sd.get());
}
template <class SD, class TP, class Out>
requires requires(SD& sd, TP tp, Out out) {
submit(sd, std::move(tp), std::move(out));
}
void submit(std::reference_wrapper<SD> sd, TP tp, Out out)
noexcept(noexcept(submit(sd.get(), std::move(tp), std::move(out)))) {
submit(sd.get(), std::move(tp), std::move(out));
}
struct set_done_fn {
template <class S>
requires requires(S& s) {
set_done(s);
set_error(s, std::current_exception());
}
void operator()(S&& s) const noexcept(noexcept(set_done(s))) {
try {
set_done(s);
} catch (...) {
set_error(s, std::current_exception());
}
}
};
struct set_error_fn {
template <class S, class E>
requires requires(S& s, E e) {
{ set_error(s, std::move(e)) } noexcept;
}
void operator()(S&& s, E e) const
noexcept(noexcept(set_error(s, std::move(e)))) {
set_error(s, std::move(e));
}
};
struct set_value_fn {
template <class S, class V>
requires requires(S& s, V&& v) {
set_value(s, (V&&) v);
set_error(s, std::current_exception());
}
void operator()(S&& s, V&& v) const
noexcept(noexcept(set_value(s, (V&&) v))) {
try {
set_value(s, (V&&) v);
} catch (...) {
set_error(s, std::current_exception());
}
}
};
struct set_stopping_fn {
template <class S>
requires requires(S& s) {
set_stopping(s);
}
void operator()(S&& s) const noexcept(noexcept(set_stopping(s))) {
set_stopping(s);
}
};
struct set_starting_fn {
template <class S, class Up>
requires requires(S& s, Up& up) {
set_starting(s, up);
set_error(s, std::current_exception());
}
void operator()(S&& s, Up& up) const
noexcept(noexcept(set_starting(s, up))) {
try {
set_starting(s, up);
} catch (...) {
set_error(s, std::current_exception());
}
}
};
struct do_submit_fn {
template <class SD, class Out>
requires requires(SD& s, Out out) {
submit(s, std::move(out));
}
void operator()(SD&& s, Out out) const
noexcept(noexcept(submit(s, std::move(out)))) {
submit(s, std::move(out));
}
template <class SD, class TP, class Out>
requires requires(SD& s, TP tp, Out out) {
submit(s, std::move(tp), std::move(out));
}
void operator()(SD&& s, TP tp, Out out) const
noexcept(noexcept(submit(s, std::move(tp), std::move(out)))) {
submit(s, std::move(tp), std::move(out));
}
};
struct get_now_fn {
template <class SD>
requires requires(SD& sd) {
now(sd);
}
auto operator()(SD&& sd) const noexcept(noexcept(now(sd))) {
return now(sd);
}
};
} // namespace __adl
inline constexpr __adl::set_done_fn set_done{};
inline constexpr __adl::set_error_fn set_error{};
inline constexpr __adl::set_value_fn set_value{};
inline constexpr __adl::set_stopping_fn set_stopping{};
inline constexpr __adl::set_starting_fn set_starting{};
inline constexpr __adl::do_submit_fn submit{};
inline constexpr __adl::get_now_fn now{};
inline constexpr __adl::get_now_fn top{};
template <class T>
struct receiver_traits<std::promise<T>> {
using receiver_category = single_tag;
};
template <>
struct receiver_traits<std::promise<void>> {
using receiver_category = none_tag;
};
} // namespace pushmi
This diff is collapsed.
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "flow_single.h"
namespace pushmi {
template <class V, class PE, class E>
class flow_single_deferred<V, PE, E> {
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(V)]; // can hold a V in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible_v<Wrapped>;
}
enum struct op { destroy, move };
struct vtable {
void (*op_)(op, data&, data*) = +[](op, data&, data*) {};
void (*submit_)(data&, flow_single<V, PE, E>) =
+[](data&, flow_single<V, PE, E>) {};
static constexpr vtable const noop_ = {};
} const* vptr_ = &vtable::noop_;
template <class Wrapped, bool = insitu<Wrapped>()>
static constexpr vtable const vtable_v = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
dst->pobj_ = std::exchange(src.pobj_, nullptr);
case op::destroy:
delete static_cast<Wrapped const*>(src.pobj_);
}
},
+[](data& src, flow_single<V, PE, E> out) {
::pushmi::submit(*static_cast<Wrapped*>(src.pobj_), std::move(out));
}
};
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same_v<U, flow_single_deferred>, U>;
public:
using sender_category = flow_tag;
flow_single_deferred() = default;
flow_single_deferred(flow_single_deferred&& that) noexcept
: flow_single_deferred() {
that.vptr_->op_(op::move, that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
template <class Wrapped>
requires FlowSingleSender<
wrapped_t<Wrapped>,
flow_single<V, PE, E>,
any_none<PE>,
V,
E>
explicit flow_single_deferred(Wrapped obj)
: flow_single_deferred() {
if constexpr (insitu<Wrapped>())
new (data_.buffer_) Wrapped(std::move(obj));
else
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtable_v<Wrapped>;
}
~flow_single_deferred() {
vptr_->op_(op::destroy, data_, nullptr);
}
flow_single_deferred& operator=(flow_single_deferred&& that) noexcept {
this->~flow_single_deferred();
new ((void*)this) flow_single_deferred(std::move(that));
return *this;
}
void submit(flow_single<V, PE, E> out) {
vptr_->submit_(data_, std::move(out));
}
};
// Class static definitions:
template <class V, class PE, class E>
constexpr typename flow_single_deferred<V, PE, E>::vtable const
flow_single_deferred<V, PE, E>::vtable::noop_;
template <class V, class PE, class E>
template <class Wrapped, bool Big>
constexpr typename flow_single_deferred<V, PE, E>::vtable const
flow_single_deferred<V, PE, E>::vtable_v;
template <class V, class PE, class E>
template <class Wrapped>
constexpr typename flow_single_deferred<V, PE, E>::vtable const
flow_single_deferred<V, PE, E>::vtable_v<Wrapped, true> = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
new (dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
case op::destroy:
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
}
},
+[](data& src, flow_single<V, PE, E> out) {
::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out));
}
};
template <class SF>
class flow_single_deferred<SF> {
SF sf_;
public:
using sender_category = flow_tag;
constexpr flow_single_deferred() = default;
constexpr explicit flow_single_deferred(SF sf)
: sf_(std::move(sf)) {}
template <Receiver<flow_tag> Out>
requires Invocable<SF&, Out>
void submit(Out out) {
sf_(std::move(out));
}
};
flow_single_deferred() -> flow_single_deferred<ignoreSF>;
template <class SF>
flow_single_deferred(SF) -> flow_single_deferred<SF>;
template <class V, class PE = std::exception_ptr, class E = PE>
using any_flow_single_deferred = flow_single_deferred<V, PE, E>;
// // TODO constrain me
// template <class V, class E = std::exception_ptr, Sender Wrapped>
// auto erase_cast(Wrapped w) {
// return flow_single_deferred<V, E>{std::move(w)};
// }
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "traits.h"
namespace pushmi {
// tag types
struct silent_tag;
struct none_tag;
struct single_tag;
struct flow_tag;
template <class>
struct sender_traits;
template <class>
struct receiver_traits;
template <SemiMovable... TN>
class none;
template <SemiMovable... TN>
class deferred;
template <SemiMovable... TN>
class single;
template <SemiMovable... TN>
class single_deferred;
template <SemiMovable... TN>
class time_single_deferred;
template <SemiMovable... TN>
class flow_single;
template <SemiMovable... TN>
class flow_single_deferred;
template<class E, class TP, int i = 0>
struct any_time_executor_ref;
namespace operators {}
namespace extension_operators {}
namespace aliases {
namespace v = ::pushmi;
namespace op = ::pushmi::operators;
namespace ep = ::pushmi::extension_operators;
}
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "executor.h"
#include "trampoline.h"
namespace pushmi {
// very poor perf example executor.
//
struct __new_thread_submit {
template<Regular TP, Receiver Out>
void operator()(TP at, Out out) const;
};
template<Regular TP, Receiver Out>
void __new_thread_submit::operator()(TP at, Out out) const {
std::thread t{[at = std::move(at), out = std::move(out)]() mutable {
auto tr = trampoline();
::pushmi::submit(tr, std::move(at), std::move(out));
}};
// pass ownership of thread to out
t.detach();
}
inline auto new_thread() {
return time_single_deferred{__new_thread_submit{}};
}
}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "boosters.h"
namespace pushmi {
template <class E>
class none<E> {
bool done_ = false;
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(std::promise<int>)]; // can hold a std::promise in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible_v<Wrapped>;
}
enum struct op { destroy, move };
struct vtable {
void (*op_)(op, data&, data*) = +[](op, data&, data*) {};
void (*done_)(data&) = +[](data&) {};
void (*error_)(data&, E) noexcept = +[](data&, E) noexcept {
std::terminate();
};
static constexpr vtable const noop_ = {};
} const* vptr_ = &vtable::noop_;
template <class Wrapped, bool = insitu<Wrapped>()>
static constexpr vtable const vtable_v = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
dst->pobj_ = std::exchange(src.pobj_, nullptr);
case op::destroy:
delete static_cast<Wrapped const*>(src.pobj_);
}
},
+[](data& src) { ::pushmi::set_done(*static_cast<Wrapped*>(src.pobj_)); },
+[](data& src, E e) noexcept {
::pushmi::set_error(*static_cast<Wrapped*>(src.pobj_), std::move(e));
}
}; // namespace pushmi
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same_v<U, none>, U>;
public:
using receiver_category = none_tag;
none() = default;
none(none&& that) noexcept : none() {
that.vptr_->op_(op::move, that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
template <class Wrapped>
requires NoneReceiver<wrapped_t<Wrapped>, E>
explicit none(Wrapped obj) : none() {
if constexpr (insitu<Wrapped>())
new (data_.buffer_) Wrapped(std::move(obj));
else
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtable_v<Wrapped>;
}
~none() {
vptr_->op_(op::destroy, data_, nullptr);
}
none& operator=(none&& that) noexcept {
this->~none();
new ((void*)this) none(std::move(that));
return *this;
}
void error(E e) noexcept {
if (done_) {return;}
done_ = true;
vptr_->error_(data_, std::move(e));
}
void done() {
if (done_) {return;}
done_ = true;
vptr_->done_(data_);
}
};
// Class static definitions:
template <class E>
constexpr typename none<E>::vtable const none<E>::vtable::noop_;
template <class E>
template <class Wrapped, bool Big>
constexpr typename none<E>::vtable const none<E>::vtable_v;
template <class E>
template <class Wrapped>
constexpr typename none<E>::vtable const none<E>::vtable_v<Wrapped, true> = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
new (dst->buffer_)
Wrapped(std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
case op::destroy:
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
}
},
+[](data& src) {
::pushmi::set_done(*static_cast<Wrapped*>((void*)src.buffer_));
},
+[](data& src, E e) noexcept {::pushmi::set_error(
*static_cast<Wrapped*>((void*)src.buffer_),
std::move(e));
}
}
;
template <class EF, class DF>
requires Invocable<DF&> && !detail::is_v<EF, on_value> && !detail::is_v<EF, single>
class none<EF, DF> {
bool done_ = false;
EF ef_;
DF df_;
public:
using receiver_category = none_tag;
// static_assert(
// !detail::is_v<EF, on_value>,
// "the first parameter is the error implementation, but on_value{} was passed");
//
none() = default;
constexpr explicit none(EF ef)
: none(std::move(ef), DF{}) {}
constexpr explicit none(DF df)
: none(EF{}, std::move(df)) {}
constexpr none(EF ef, DF df)
: done_(false), ef_(std::move(ef)), df_(std::move(df)) {}
template <class E>
requires Invocable<EF&, E>
void error(E e) noexcept {
static_assert(NothrowInvocable<EF&, E>, "error function must be noexcept");
if (done_) {return;}
done_ = true;
ef_(e);
}
void done() {
if (done_) {return;}
done_ = true;
df_();
}
};
template <Receiver Data, class DEF, class DDF>
requires Invocable<DDF&, Data&> && !detail::is_v<DEF, on_value> && !detail::is_v<Data, single>
class none<Data, DEF, DDF> {
bool done_ = false;
Data data_;
DEF ef_;
DDF df_;
public:
using receiver_category = none_tag;
// static_assert(
// !detail::is_v<DEF, on_value>,
// "the first parameter is the error implementation, but on_value{} was passed");
//
constexpr explicit none(Data d) : none(std::move(d), DEF{}, DDF{}) {}
constexpr none(Data d, DDF df)
: done_(false), data_(std::move(d)), ef_(), df_(std::move(df)) {}
constexpr none(Data d, DEF ef, DDF df = DDF{})
: done_(false), data_(std::move(d)), ef_(std::move(ef)), df_(std::move(df)) {}
template <class E>
requires Invocable<DEF&, Data&, E> void error(E e) noexcept {
static_assert(
NothrowInvocable<DEF&, Data&, E>, "error function must be noexcept");
if (done_) {return;}
done_ = true;
ef_(data_, e);
}
void done() {
if (done_) {return;}
done_ = true;
df_(data_);
}
};
template <> class none<> : public none<abortEF, ignoreDF> {};
using archetype_none = none<>;
none()->archetype_none;
template <class EF>
requires !Receiver<EF> &&
!detail::is_v<EF, single> &&
!detail::is_v<EF, on_value> &&
!detail::is_v<EF, on_done>
none(EF)->none<EF, ignoreDF>;
template <class DF>
requires !Receiver<DF> &&
!detail::is_v<DF, on_value> &&
!detail::is_v<DF, single>
none(on_done<DF>)->none<abortEF, on_done<DF>>;
template <class E, class Wrapped>
requires NoneReceiver<Wrapped, E> &&
!detail::is_v<E, on_value> &&
!detail::is_v<Wrapped, single>
none(Wrapped)->none<E>;
template <class EF, class DF>
requires Invocable<DF&> none(EF, DF)->none<EF, DF>;
template <Receiver Data>
requires !detail::is_v<Data, on_value> &&
!detail::is_v<Data, single>
none(Data)->none<Data, passDEF, passDDF>;
template <Receiver Data, class DEF>
requires !detail::is_v<DEF, on_done> &&
!detail::is_v<Data, on_value> &&
!detail::is_v<Data, single>
none(Data, DEF)->none<Data, DEF, passDDF>;
template <Receiver Data, class DDF>
requires !detail::is_v<Data, on_value> &&
!detail::is_v<Data, single>
none(Data, on_done<DDF>)->none<Data, passDEF, on_done<DDF>>;
template <Receiver Data, class DEF, class DDF>
requires Invocable<DDF&, Data&> &&
!detail::is_v<Data, on_value> &&
!detail::is_v<Data, single>
none(Data, DEF, DDF)->none<Data, DEF, DDF>;
template <class E = std::exception_ptr>
using any_none = none<E>;
// // this is ambiguous because NoneReceiver and SingleReceiver only constrain the done() method.
// // template <class E = std::exception_ptr, NoneReceiver<E> Wrapped>
// // auto erase_cast(Wrapped w) {
// // return none<erase_cast_t, E>{std::move(w)};
// // }
// template <class E = std::exception_ptr, class... TN>
// auto erase_cast(none<TN...> w) {
// return none<E>{std::move(w)};
// }
// template <class E = std::exception_ptr>
// auto erase_cast(std::promise<void> w) {
// return none<E>{std::move(w)};
// }
template <SenderTo<std::promise<void>, none_tag> S>
std::future<void> future_from(S sender) {
std::promise<void> p;
auto result = p.get_future();
submit(sender, std::move(p));
return result;
}
} // namespace values
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../deferred.h"
#include "../single_deferred.h"
namespace pushmi {
namespace operators {
template <class V>
auto empty() {
return single_deferred{
[]<class Out>(Out out) mutable PUSHMI_VOID_LAMBDA_REQUIRES(
SingleReceiver<Out, V>){
::pushmi::set_done(out);
}};
}
inline auto empty() {
return deferred{[]<class Out>(Out out) mutable PUSHMI_VOID_LAMBDA_REQUIRES(
NoneReceiver<Out>){
::pushmi::set_done(out);
}};
}
} // namespace operators
} // namespace pushmi
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../piping.h"
#include "../boosters.h"
#include "../single.h"
namespace pushmi {
namespace detail{
// template <Sender In>
// struct out_from_fn {
//
// };
template<Sender In, class... AN>
auto out_from(std::tuple<AN...>&& args) {
using SingleReceiver =
decltype(pushmi::sfinae_from_tuple<single>(std::move(args)));
using NoneReceiver =
decltype(pushmi::sfinae_from_tuple<none>(std::move(args)));
if constexpr ((bool)TimeSenderTo<In, SingleReceiver, single_tag> ||
SenderTo<In, SingleReceiver, single_tag>) {
return pushmi::from_tuple<single>(std::move(args));
} else if constexpr ((bool)TimeSenderTo<In, NoneReceiver, none_tag> ||
SenderTo<In, NoneReceiver, none_tag>) {
return pushmi::from_tuple<none>(std::move(args));
}
}
template<Sender In, class... AN, class... DVFN, class... DEFN, class... DDFN>
auto out_from(
std::tuple<AN...>&& args,
on_value<DVFN...> vf,
on_error<DEFN...> ef,
on_done<DDFN...> df) {
auto out = out_from<In>(std::move(args));
if constexpr (::pushmi::detail::is_v<decltype(out), single>) {
return single{std::move(out), std::move(vf), std::move(ef), std::move(df)};
} else if constexpr (::pushmi::detail::is_v<decltype(out), none>) {
return none{std::move(out), std::move(ef), std::move(df)};
}
}
template<Sender In, Receiver Out, class... DVFN, class... DEFN, class... DDFN>
auto out_from(
Out&& out,
on_value<DVFN...> vf,
on_error<DEFN...> ef,
on_done<DDFN...> df) {
using SingleReceiver =
decltype(pushmi::sfinae_from_tuple<single>(std::tuple{std::move(out)}));
using NoneReceiver =
decltype(pushmi::sfinae_from_tuple<none>(std::tuple{std::move(out)}));
if constexpr ((bool)TimeSenderTo<In, SingleReceiver, single_tag> ||
SenderTo<In, SingleReceiver, single_tag>) {
return single{std::move(out), std::move(vf), std::move(ef), std::move(df)};
} else if constexpr ((bool)TimeSenderTo<In, NoneReceiver, none_tag> ||
SenderTo<In, NoneReceiver, none_tag>) {
return none{std::move(out), std::move(ef), std::move(df)};
}
}
template<Sender In, Receiver Out, class... DEFN, class... DDFN>
auto out_from(Out&& out, on_error<DEFN...> ef, on_done<DDFN...> df) {
using SingleReceiver =
decltype(pushmi::sfinae_from_tuple<single>(std::tuple{std::move(out)}));
using NoneReceiver =
decltype(pushmi::sfinae_from_tuple<none>(std::tuple{std::move(out)}));
if constexpr ((bool)TimeSenderTo<In, SingleReceiver, single_tag> ||
SenderTo<In, SingleReceiver, single_tag>) {
return single{std::move(out), std::move(ef), std::move(df)};
} else if constexpr ((bool)TimeSenderTo<In, NoneReceiver, none_tag> ||
SenderTo<In, NoneReceiver, none_tag>) {
return none{std::move(out), std::move(ef), std::move(df)};
}
}
template<Sender In, Receiver Out, class... DVFN>
auto out_from(Out&& out, on_value<DVFN...> vf) {
using SingleReceiver =
decltype(pushmi::sfinae_from_tuple<single>(std::tuple{std::move(out)}));
using NoneReceiver =
decltype(pushmi::sfinae_from_tuple<none>(std::tuple{std::move(out)}));
if constexpr ((bool)TimeSenderTo<In, SingleReceiver, single_tag> ||
SenderTo<In, SingleReceiver, single_tag>) {
return single{std::move(out), std::move(vf)};
} else if constexpr ((bool)TimeSenderTo<In, NoneReceiver, none_tag> ||
SenderTo<In, NoneReceiver, none_tag>) {
return none{std::move(out)};
}
}
template<Sender In, Receiver Out>
auto out_from(Out&& out) {
using SingleReceiver =
decltype(pushmi::sfinae_from_tuple<single>(std::tuple{std::move(out)}));
using NoneReceiver =
decltype(pushmi::sfinae_from_tuple<none>(std::tuple{std::move(out)}));
if constexpr ((bool)TimeSenderTo<In, SingleReceiver, single_tag> ||
SenderTo<In, SingleReceiver, single_tag>) {
return single{std::move(out)};
} else if constexpr ((bool)TimeSenderTo<In, NoneReceiver, none_tag> ||
SenderTo<In, NoneReceiver, none_tag>) {
return none{std::move(out)};
}
}
template<Sender In, class FN>
auto submit_transform_out(FN fn){
if constexpr ((bool)TimeSender<In>) {
return on_submit{
[fn = std::move(fn)]<class TP, class Out>(In& in, TP tp, Out out) {
::pushmi::submit(in, tp, fn(std::move(out)));
}
};
} else {
return on_submit{
[fn = std::move(fn)]<class Out>(In& in, Out out) {
::pushmi::submit(in, fn(std::move(out)));
}
};
}
}
template<Sender In, class SDSF, class TSDSF>
auto submit_transform_out(SDSF sdsf, TSDSF tsdsf){
if constexpr ((bool)TimeSender<In>) {
return on_submit{
[tsdsf = std::move(tsdsf)]<class TP, class Out>(In& in, TP tp, Out out) {
tsdsf(in, tp, std::move(out));
}
};
} else {
return on_submit{
[sdsf = std::move(sdsf)]<class Out>(In& in, Out out) {
sdsf(in, std::move(out));
}
};
}
}
template<Sender In, class Out, class... FN>
auto deferred_from(FN&&... fn) {
if constexpr ((bool)TimeSenderTo<In, Out, single_tag>) {
return time_single_deferred{(FN&&) fn...};
} else if constexpr ((bool)SenderTo<In, Out, single_tag>) {
return single_deferred{(FN&&) fn...};
} else if constexpr ((bool)SenderTo<In, Out>) {
return deferred{(FN&&) fn...};
}
}
template<Sender In, class Out, class... FN>
auto deferred_from(In in, FN&&... fn) {
if constexpr ((bool)TimeSenderTo<In, Out, single_tag>) {
return time_single_deferred{std::move(in), (FN&&) fn...};
} else if constexpr ((bool)SenderTo<In, Out, single_tag>) {
return single_deferred{std::move(in), (FN&&) fn...};
} else if constexpr ((bool)SenderTo<In, Out>) {
return deferred{std::move(in), (FN&&) fn...};
}
}
template<
Sender In,
class Out,
bool SenderRequires,
bool SingleSenderRequires,
bool TimeSingleSenderRequires>
constexpr bool deferred_requires_from() {
if constexpr ((bool)TimeSenderTo<In, Out, single_tag>) {
return TimeSingleSenderRequires;
} else if constexpr ((bool)SenderTo<In, Out, single_tag>) {
return SingleSenderRequires;
} else if constexpr ((bool)SenderTo<In, Out>) {
return SenderRequires;
}
}
} // namespace detail
namespace extension_operators {
namespace detail{
struct set_value_fn {
template<class V>
auto operator()(V&& v) const {
return [v = (V&&) v]<class Out>(Out out) mutable PUSHMI_VOID_LAMBDA_REQUIRES(Receiver<Out>) {
::pushmi::set_value(out, (V&&) v);
};
}
};
struct set_error_fn {
template<class E>
auto operator()(E e) const {
return [e = std::move(e)]<class Out>(Out out) mutable noexcept PUSHMI_VOID_LAMBDA_REQUIRES(Receiver<Out>) {
::pushmi::set_error(out, std::move(e));
};
}
};
struct set_done_fn {
auto operator()() const {
return []<class Out>(Out out) PUSHMI_VOID_LAMBDA_REQUIRES(Receiver<Out>) {
::pushmi::set_done(out);
};
}
};
struct set_stopping_fn {
auto operator()() const {
return []<class Out>(Out out) PUSHMI_VOID_LAMBDA_REQUIRES(Receiver<Out>) {
::pushmi::set_stopping(out);
};
}
};
struct set_starting_fn {
template<class Up>
auto operator()(Up up) const {
return [up = std::move(up)]<class Out>(Out out) PUSHMI_VOID_LAMBDA_REQUIRES(Receiver<Out>) {
::pushmi::set_starting(out, std::move(up));
};
}
};
struct do_submit_fn {
template <class Out>
auto operator()(Out out) const {
static_assert(Receiver<Out>, "'Out' must be a model of Receiver");
return [out = std::move(out)]<class In>(In in) mutable {
::pushmi::submit(in, std::move(out));
};
}
template <class TP, class Out>
auto operator()(TP tp, Out out) const {
static_assert(Receiver<Out>, "'Out' must be a model of Receiver");
return [tp = std::move(tp), out = std::move(out)]<class In>(In in) mutable {
::pushmi::submit(in, std::move(tp), std::move(out));
};
}
};
struct get_now_fn {
auto operator()() const {
return []<class In>(In in) PUSHMI_T_LAMBDA_REQUIRES(decltype(::pushmi::now(in)), TimeSender<In>) {
return ::pushmi::now(in);
};
}
};
} // namespace detail
inline constexpr detail::set_done_fn set_done{};
inline constexpr detail::set_error_fn set_error{};
inline constexpr detail::set_value_fn set_value{};
inline constexpr detail::set_stopping_fn set_stopping{};
inline constexpr detail::set_starting_fn set_starting{};
inline constexpr detail::do_submit_fn submit{};
inline constexpr detail::get_now_fn now{};
inline constexpr detail::get_now_fn top{};
} // namespace extension_operators
} // namespace pushmi
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../single.h"
namespace pushmi {
namespace operators {
template <class V>
auto just(V v) {
return single_deferred{[v = std::move(v)]<class Out>(
Out out) mutable PUSHMI_VOID_LAMBDA_REQUIRES(SingleReceiver<Out, V>){
::pushmi::set_value(out, std::move(v));
}};
}
} // namespace operators
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../piping.h"
#include "../executor.h"
#include "extension_operators.h"
namespace pushmi {
namespace operators {
namespace detail {
struct on_fn {
template <class ExecutorFactory>
auto operator()(ExecutorFactory ef) const;
};
template <class ExecutorFactory>
auto on_fn::operator()(ExecutorFactory ef) const {
return [ef = std::move(ef)]<class In>(In in) {
return ::pushmi::detail::deferred_from<In, archetype_single>(
std::move(in),
::pushmi::detail::submit_transform_out<In>(
[ef]<class Out>(In& in, Out out) {
auto exec = ef();
::pushmi::submit(exec, ::pushmi::now(exec), ::pushmi::single{[in = in, out = std::move(out)](auto) mutable {
::pushmi::submit(in, std::move(out));
}});
},
[ef]<class TP, class Out>(In& in, TP at, Out out) {
auto exec = ef();
::pushmi::submit(exec, at, ::pushmi::on_value{[in = in, at, out = std::move(out)](auto) mutable {
::pushmi::submit(in, at, std::move(out));
}});
}
)
);
};
}
} // namespace detail
inline constexpr detail::on_fn on{};
} // namespace operators
#if 0
namespace detail {
template <class ExecutorFactory>
class fsdon {
using executor_factory_type = std::decay_t<ExecutorFactory>;
executor_factory_type factory_;
template <class In>
class start_on {
using in_type = std::decay_t<In>;
executor_factory_type factory_;
in_type in_;
template <class Out, class Executor>
class out_on {
using out_type = std::decay_t<Out>;
using exec_type = std::decay_t<Executor>;
template <class Producer>
struct producer_proxy {
RefWrapper<Producer> up_;
std::shared_ptr<std::atomic_bool> stopped_;
exec_type exec_;
producer_proxy(
RefWrapper<Producer> p,
std::shared_ptr<std::atomic_bool> stopped,
exec_type exec)
: up_(std::move(p)),
stopped_(std::move(stopped)),
exec_(std::move(exec)) {}
template <class V>
void value(V v) {
auto up = wrap_ref(up_.get());
exec_ |
execute([up = std::move(up),
v = std::move(v),
stopped = std::move(stopped_)](auto) mutable {
if (*stopped) {
return;
}
up.get().value(std::move(v));
});
}
template <class E>
void error(E e) {
auto up = wrap_ref(up_.get());
exec_ |
execute([up = std::move(up),
e = std::move(e),
stopped = std::move(stopped_)](auto) mutable {
if (*stopped) {
return;
}
up.get().error(std::move(e));
});
}
};
bool done_;
std::shared_ptr<std::atomic_bool> stopped_;
out_type out_;
exec_type exec_;
AnyNone<> upProxy_;
public:
out_on(out_type out, exec_type exec)
: done_(false),
stopped_(std::make_shared<std::atomic_bool>(false)),
out_(std::move(out)),
exec_(std::move(exec)),
upProxy_() {}
template <class T>
void value(T t) {
if (done_) {
return;
}
done_ = true;
out_.value(std::move(t));
}
template <class E>
void error(E e) {
if (done_) {
return;
}
done_ = true;
out_.error(std::move(e));
}
void stopping() {
if (done_) {
return;
}
done_ = true;
*stopped_ = true;
out_.stopping();
}
template <class Producer>
void starting(RefWrapper<Producer> up) {
upProxy_ =
producer_proxy<Producer>{std::move(up), stopped_, std::move(exec_)};
out_.starting(wrap_ref(upProxy_));
}
};
public:
start_on(executor_factory_type&& ef, in_type&& in)
: factory_(std::move(ef)), in_(std::move(in)) {}
template <class Out>
auto then(Out out) {
auto exec = factory_();
auto myout = out_on<Out, decltype(exec)>{std::move(out), exec};
exec | execute([in = in_, myout = std::move(myout)](auto) mutable {
in.then(std::move(myout));
});
}
};
public:
explicit fsdon(executor_factory_type&& ef) : factory_(std::move(ef)) {}
template <class In>
auto operator()(In in) {
return start_on<In>{std::move(factory_), std::move(in)};
}
};
} // namespace detail
namespace fsd {
template <class ExecutorFactory>
auto on(ExecutorFactory factory) {
return detail::fsdon<ExecutorFactory>{std::move(factory)};
}
} // namespace fsd
#endif
} // namespace pushmi
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <functional>
#include "../single.h"
#include "../boosters.h"
#include "extension_operators.h"
#include "../trampoline.h"
namespace pushmi {
namespace operators {
namespace detail {
inline constexpr struct make_receiver_fn {
template <class... AN>
deduced_type_t<none, AN...> operator()(none_tag, AN&& ...an) const {
return none{(AN&&) an...};
}
template <class... AN>
deduced_type_t<single, AN...> operator()(single_tag, AN&& ...an) const {
return single{(AN&&) an...};
}
} make_receiver {};
template <Sender In, class ...AN>
using receiver_type_t =
std::invoke_result_t<make_receiver_fn, sender_category_t<In>, AN...>;
template <class In, class ... AN>
concept bool AutoSenderTo = SenderTo<In, receiver_type_t<In, AN...>>;
template <class In, class ... AN>
concept bool AutoTimeSenderTo = TimeSenderTo<In, receiver_type_t<In, AN...>>;
struct submit_fn {
private:
// TODO - only move, move-only types..
// if out can be copied, then submit can be called multiple
// times..
template <class... AN>
struct fn {
std::tuple<AN...> args_;
template <AutoSenderTo<AN...> In>
In operator()(In in) {
auto out{::pushmi::detail::out_from<In>(std::move(args_))};
::pushmi::submit(in, std::move(out));
return in;
}
template <AutoTimeSenderTo<AN...> In>
In operator()(In in) {
auto out{::pushmi::detail::out_from<In>(std::move(args_))};
::pushmi::submit(in, ::pushmi::now(in), std::move(out));
return in;
}
};
public:
template <class... AN>
auto operator()(AN&&... an) const {
return submit_fn::fn<AN...>{{(AN&&) an...}};
}
};
struct submit_at_fn {
private:
template <Regular TP, class... AN>
struct fn {
TP at_;
std::tuple<AN...> args_;
template <AutoTimeSenderTo<AN...> In>
In operator()(In in) {
auto out{::pushmi::detail::out_from<In>(std::move(args_))};
::pushmi::submit(in, std::move(at_), std::move(out));
return in;
}
};
public:
template <Regular TP, class... AN>
auto operator()(TP at, AN... an) const {
return submit_at_fn::fn<TP, AN...>{std::move(at), {(AN&&) an...}};
}
};
struct submit_after_fn {
private:
template <Regular D, class... AN>
struct fn {
D after_;
std::tuple<AN...> args_;
template <AutoTimeSenderTo<AN...> In>
In operator()(In in) {
// TODO - only move, move-only types..
// if out can be copied, then submit can be called multiple
// times..
auto out{::pushmi::detail::out_from<In>(std::move(args_))};
auto at = ::pushmi::now(in) + std::move(after_);
::pushmi::submit(in, std::move(at), std::move(out));
return in;
}
};
public:
template <Regular D, class... AN>
auto operator()(D after, AN... an) const {
return submit_after_fn::fn<D, AN...>{std::move(after), {(AN&&) an...}};
}
};
struct blocking_submit_fn {
private:
// TODO - only move, move-only types..
// if out can be copied, then submit can be called multiple
// times..
template <class... AN>
struct fn {
std::tuple<AN...> args_;
template <bool IsTimeSender, class In>
In impl_(In in) {
bool done = false;
std::condition_variable signaled;
auto out{::pushmi::detail::out_from<In>(
std::move(args_),
on_value{[&]<class Out, class V>(Out out, V&& v){
if constexpr ((bool)TimeSender<std::remove_cvref_t<V>>) {
// to keep the blocking semantics, make sure that the
// nested submits block here to prevent a spurious
// completion signal
auto nest = ::pushmi::nested_trampoline();
::pushmi::submit(nest, ::pushmi::now(nest), std::move(out));
} else {
::pushmi::set_value(out, (V&&) v);
}
done = true;
signaled.notify_all();
}},
on_error{[&](auto out, auto e) noexcept {
::pushmi::set_error(out, std::move(e));
done = true;
signaled.notify_all();
}},
on_done{[&](auto out){
::pushmi::set_done(out);
done = true;
signaled.notify_all();
}}
)};
if constexpr ((bool)IsTimeSender) {
::pushmi::submit(in, ::pushmi::now(in), std::move(out));
} else {
::pushmi::submit(in, std::move(out));
}
std::mutex lock;
std::unique_lock<std::mutex> guard{lock};
signaled.wait(guard, [&](){
return done;
});
return in;
}
template <AutoSenderTo<AN...> In>
In operator()(In in) {
return this->impl_<false>(std::move(in));
}
template <AutoTimeSenderTo<AN...> In>
In operator()(In in) {
return this->impl_<true>(std::move(in));
}
};
public:
template <class... AN>
auto operator()(AN... an) const {
return blocking_submit_fn::fn<AN...>{{(AN&&) an...}};
}
};
template <class T>
struct get_fn {
// TODO constrain this better
template <Sender In>
T operator()(In in) const {
std::optional<T> result_;
std::exception_ptr ep_;
auto out = single{
on_value{[&](T t){ result_ = std::move(t); }},
on_error{
[&](auto e) noexcept { ep_ = std::make_exception_ptr(e); },
[&](std::exception_ptr ep) noexcept { ep_ = ep; }}
};
using Out = decltype(out);
static_assert(SenderTo<In, Out, single_tag> ||
TimeSenderTo<In, Out, single_tag>,
"'In' does not deliver value compatible with 'T' to 'Out'");
blocking_submit_fn{}(std::move(out))(in);
if (!!ep_) { std::rethrow_exception(ep_); }
return std::move(*result_);
}
};
} // namespace detail
inline constexpr detail::submit_fn submit{};
inline constexpr detail::submit_at_fn submit_at{};
inline constexpr detail::submit_after_fn submit_after{};
inline constexpr detail::blocking_submit_fn blocking_submit{};
template <class T>
inline constexpr detail::get_fn<T> get{};
} // namespace operators
} // namespace pushmi
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "extension_operators.h"
#include "../deferred.h"
#include "../single_deferred.h"
#include "../time_single_deferred.h"
namespace pushmi {
namespace operators {
namespace detail {
template <class SideEffects, class Out>
struct tap_ {
SideEffects sideEffects;
Out out;
using receiver_category = single_tag;
template <class V>
requires SingleReceiver<SideEffects, const V&>&& SingleReceiver<Out, V>
void value(V&& v) {
::pushmi::set_value(sideEffects, const_cast<const V&>(v));
::pushmi::set_value(out, (V&&) v);
}
template <class E>
requires NoneReceiver<SideEffects, E>&& NoneReceiver<Out, E>
void error(E e) noexcept {
::pushmi::set_error(sideEffects, const_cast<const E&>(e));
::pushmi::set_error(out, std::move(e));
}
void done() {
::pushmi::set_done(sideEffects);
::pushmi::set_done(out);
}
};
template <Receiver SideEffects, Receiver Out>
tap_(SideEffects, Out)->tap_<SideEffects, Out>;
struct tap_fn {
template <class... AN>
auto operator()(AN... an) const;
};
template <class... AN>
auto tap_fn::operator()(AN... an) const {
auto args = std::tuple{std::move(an)...};
return [args = std::move(args)]<class In>(In in) mutable {
auto sideEffects{::pushmi::detail::out_from<In>(std::move(args))};
using SideEffects = decltype(sideEffects);
static_assert(
::pushmi::detail::deferred_requires_from<In, SideEffects,
Receiver<SideEffects> &&
SenderTo<In, SideEffects>,
Receiver<SideEffects> &&
SenderTo<In, SideEffects, single_tag>,
Receiver<SideEffects> &&
TimeSenderTo<In, SideEffects, single_tag> >(),
"'In' is not deliverable to 'SideEffects'");
return ::pushmi::detail::deferred_from<In, SideEffects>(
std::move(in),
::pushmi::detail::submit_transform_out<In>(
[sideEffects = std::move(sideEffects)]<class Out>(Out out) {
static_assert(
::pushmi::detail::deferred_requires_from<In, SideEffects,
Receiver<Out> &&
SenderTo<In, Out>,
Receiver<Out> &&
SenderTo<In, Out, single_tag>,
Receiver<Out> &&
TimeSenderTo<In, Out, single_tag> >(),
"'In' is not deliverable to 'Out'");
auto gang{::pushmi::detail::out_from<In>(detail::tap_{sideEffects, std::move(out)})};
using Gang = decltype(gang);
static_assert(
::pushmi::detail::deferred_requires_from<In, SideEffects,
Receiver<Gang> &&
SenderTo<In, Gang>,
Receiver<Gang> &&
SenderTo<In, Gang, single_tag>,
Receiver<Gang> &&
TimeSenderTo<In, Gang, single_tag> >(),
"'In' is not deliverable to 'Out' & 'SideEffects'");
return gang;
}
)
);
};
}
} // namespace detail
inline constexpr detail::tap_fn tap{};
} // namespace operators
} // namespace pushmi
// clang-format off
// clang format does not support the '<>' in the lambda syntax yet.. []<>()->{}
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../single.h"
#include "submit.h"
#include "extension_operators.h"
namespace pushmi {
namespace operators {
namespace detail {
struct transform_fn {
template <class... FN>
auto operator()(FN... fn) const;
};
template <class... FN>
auto transform_fn::operator()(FN... fn) const {
auto f = overload{std::move(fn)...};
return [f = std::move(f)]<class In>(In in) {
// copy 'f' to allow multiple calls to connect to multiple 'in'
return ::pushmi::detail::deferred_from<In, archetype_single>(
std::move(in),
::pushmi::detail::submit_transform_out<In>(
[f]<class Out>(Out out) {
return ::pushmi::detail::out_from<In>(
std::move(out),
// copy 'f' to allow multiple calls to submit
on_value{
[f]<class V>(Out& out, V&& v) {
using Result = decltype(f((V&&) v));
static_assert(SemiMovable<Result>,
"none of the functions supplied to transform can convert this value");
static_assert(SingleReceiver<Out, Result>,
"Result of value transform cannot be delivered to Out");
::pushmi::set_value(out, f((V&&) v));
}
}
);
}
)
);
};
}
} // namespace detail
inline constexpr detail::transform_fn transform{};
} // namespace operators
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "../piping.h"
#include "../executor.h"
#include "extension_operators.h"
namespace pushmi {
namespace operators {
namespace detail {
class via_fn {
template <class ExecutorFactory>
auto operator()(ExecutorFactory ef) const;
};
template<class Executor, class Out>
struct via_fn_data : public Out {
Executor exec;
via_fn_data(Out out, Executor exec) :
Out(std::move(out)), exec(std::move(exec)) {}
};
template<class Executor, class Out>
via_fn_data(Out, Executor) -> via_fn_data<Executor, Out>;
template <class ExecutorFactory>
auto via_fn::operator()(ExecutorFactory ef) const {
return [ef = std::move(ef)]<class In>(In in) {
return ::pushmi::detail::deferred_from<In, archetype_single>(
std::move(in),
::pushmi::detail::submit_transform_out<In>(
[ef]<class Out>(Out out) {
auto exec = ef();
return ::pushmi::detail::out_from<In>(
via_fn_data{std::move(out), std::move(exec)},
// copy 'f' to allow multiple calls to submit
::pushmi::on_value{[]<class V>(auto& data, V&& v){
::pushmi::submit(
data.exec,
::pushmi::now(data.exec),
::pushmi::single([v = (V&&)v, out = std::move(static_cast<Out&>(data))](auto) mutable {
::pushmi::set_value(out, std::move(v));
}));
}},
::pushmi::on_error{[](auto& data, auto e) noexcept {
::pushmi::submit(
data.exec,
::pushmi::now(data.exec),
::pushmi::single([e = std::move(e), out = std::move(static_cast<Out&>(data))](auto) mutable {
::pushmi::set_error(out, std::move(e));
}));
}},
::pushmi::on_done{[](auto& data){
::pushmi::submit(
data.exec,
::pushmi::now(data.exec),
::pushmi::single([out = std::move(static_cast<Out&>(data))](auto) mutable {
::pushmi::set_done(out);
}));
}}
);
}
)
);
};
}
} // namespace detail
inline constexpr detail::via_fn via{};
} // namespace operators
#if 0
namespace detail {
template <class ExecutorFactory>
class fsdvia {
using executor_factory_type = std::decay_t<ExecutorFactory>;
executor_factory_type factory_;
template <class In>
class start_via {
using in_type = std::decay_t<In>;
executor_factory_type factory_;
in_type in_;
template <class Out, class Executor>
class out_via {
using out_type = std::decay_t<Out>;
using executor_type = std::decay_t<Executor>;
struct shared_type {
shared_type(out_type&& out) : out_(std::move(out)), stopped_(false) {}
out_type out_;
std::atomic_bool stopped_;
};
template <class Producer>
struct producer_proxy {
RefWrapper<Producer> up_;
std::shared_ptr<shared_type> shared_;
producer_proxy(RefWrapper<Producer> p, std::shared_ptr<shared_type> s)
: up_(std::move(p)), shared_(std::move(s)) {}
template <class V>
void value(V v) {
if (!!shared_->stopped_.exchange(true)) {
return;
}
up_.get().value(std::move(v));
}
template <class E>
void error(E e) {
if (!!shared_->stopped_.exchange(true)) {
return;
}
up_.get().error(std::move(e));
}
};
bool done_;
std::shared_ptr<shared_type> shared_;
executor_type exec_;
std::shared_ptr<AnyNone<>> upProxy_;
public:
explicit out_via(out_type&& out, executor_type&& exec)
: done_(false),
shared_(std::make_shared<shared_type>(std::move(out))),
exec_(std::move(exec)),
upProxy_() {}
template <class T>
void value(T t) {
if (done_ || shared_->stopped_) {
done_ = true;
return;
}
if (!upProxy_) {
std::abort();
}
done_ = true;
exec_ | execute([t = std::move(t), shared = shared_](auto) mutable {
shared->out_.value(std::move(t));
});
}
template <class E>
void error(E e) {
if (done_ || shared_->stopped_) {
done_ = true;
return;
}
if (!upProxy_) {
std::abort();
}
done_ = true;
exec_ | execute([e = std::move(e), shared = shared_](auto) mutable {
shared->out_.error(std::move(e));
});
}
void stopping() {
if (done_) {
return;
}
if (!upProxy_) {
std::abort();
}
done_ = true;
if (!shared_->stopped_.exchange(true)) {
exec_ |
// must keep out and upProxy alive until out is notified that it
// is unsafe
execute([shared = shared_](auto) mutable {
shared->out_.stopping();
});
}
}
template <class Producer>
void starting(RefWrapper<Producer> up) {
if (!!upProxy_) {
std::abort();
}
upProxy_ = std::make_shared<AnyNone<>>(AnyNone<>{
producer_proxy<Producer>{std::move(up), shared_}});
// must keep out and upProxy alive until out is notified that it is
// starting
exec_ | execute([shared = shared_, upProxy = upProxy_](auto) mutable {
shared->out_.starting(wrap_ref(*upProxy));
});
}
};
public:
start_via(executor_factory_type&& ef, in_type&& in)
: factory_(ef), in_(in) {}
template <class Out>
auto then(Out out) {
auto exec = factory_();
in_.then(out_via<Out, decltype(exec)>{std::move(out), std::move(exec)});
}
};
public:
explicit fsdvia(executor_factory_type&& ef) : factory_(std::move(ef)) {}
template <class In>
auto operator()(In in) {
return start_via<In>{std::move(factory_), std::move(in)};
}
};
} // namespace detail
namespace fsd {
template <class ExecutorFactory>
auto via(ExecutorFactory factory) {
return detail::fsdvia<ExecutorFactory>{std::move(factory)};
}
} // namespace fsd
#endif
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
template <class In, class Operator>
auto operator|(In&& in, Operator op) -> decltype(op(std::forward<In>(in))) {
return op(std::forward<In>(in));
}
namespace pushmi {
template<class T, class... FN>
auto pipe(T t, FN... fn) {
return (t | ... | fn);
}
} // namespace pushmi
This diff is collapsed.
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "single.h"
namespace pushmi {
template <class V, class E>
class single_deferred<V, E> {
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(V)]; // can hold a V in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible_v<Wrapped>;
}
enum struct op { destroy, move };
struct vtable {
void (*op_)(op, data&, data*) = +[](op, data&, data*) {};
void (*submit_)(data&, single<V, E>) = +[](data&, single<V, E>) {};
static constexpr vtable const noop_ = {};
} const* vptr_ = &vtable::noop_;
template <class Wrapped, bool = insitu<Wrapped>()>
static constexpr vtable const vtable_v = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
dst->pobj_ = std::exchange(src.pobj_, nullptr);
case op::destroy:
delete static_cast<Wrapped const*>(src.pobj_);
}
},
+[](data& src, single<V, E> out) {
::pushmi::submit(*static_cast<Wrapped*>(src.pobj_), std::move(out));
}};
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same_v<U, single_deferred>, U>;
public:
using sender_category = single_tag;
single_deferred() = default;
single_deferred(single_deferred&& that) noexcept : single_deferred() {
that.vptr_->op_(op::move, that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
template <class Wrapped>
requires SenderTo<wrapped_t<Wrapped>, single<V, E>, single_tag>
explicit single_deferred(Wrapped obj)
: single_deferred() {
if constexpr (insitu<Wrapped>())
new (data_.buffer_) Wrapped(std::move(obj));
else
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtable_v<Wrapped>;
}
~single_deferred() {
vptr_->op_(op::destroy, data_, nullptr);
}
single_deferred& operator=(single_deferred&& that) noexcept {
this->~single_deferred();
new ((void*)this) single_deferred(std::move(that));
return *this;
}
void submit(single<V, E> out) {
vptr_->submit_(data_, std::move(out));
}
};
// Class static definitions:
template <class V, class E>
constexpr typename single_deferred<V, E>::vtable const
single_deferred<V, E>::vtable::noop_;
template <class V, class E>
template <class Wrapped, bool Big>
constexpr typename single_deferred<V, E>::vtable const
single_deferred<V, E>::vtable_v;
template <class V, class E>
template <class Wrapped>
constexpr typename single_deferred<V, E>::vtable const
single_deferred<V, E>::vtable_v<Wrapped, true> = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
new (dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
case op::destroy:
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
}
},
+[](data& src, single<V, E> out) {
::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_), std::move(out));
}
};
template <class SF>
class single_deferred<SF> {
SF sf_;
public:
using sender_category = single_tag;
constexpr single_deferred() = default;
constexpr explicit single_deferred(SF sf)
: sf_(std::move(sf)) {}
template <Receiver<single_tag> Out>
requires Invocable<SF&, Out>
void submit(Out out) {
sf_(std::move(out));
}
};
template <Sender<single_tag> Data, class DSF>
class single_deferred<Data, DSF> {
Data data_;
DSF sf_;
public:
using sender_category = single_tag;
constexpr single_deferred() = default;
constexpr explicit single_deferred(Data data)
: data_(std::move(data)) {}
constexpr single_deferred(Data data, DSF sf)
: data_(std::move(data)), sf_(std::move(sf)) {}
template <Receiver<single_tag> Out>
requires Invocable<DSF&, Data&, Out>
void submit(Out out) {
sf_(data_, std::move(out));
}
};
single_deferred() -> single_deferred<ignoreSF>;
template <class SF>
single_deferred(SF) -> single_deferred<SF>;
template <Sender<single_tag> Data>
single_deferred(Data) -> single_deferred<Data, passDSF>;
template <Sender<single_tag> Data, class DSF>
single_deferred(Data, DSF) -> single_deferred<Data, DSF>;
template <class V, class E = std::exception_ptr>
using any_single_deferred = single_deferred<V, E>;
// template <
// class V,
// class E = std::exception_ptr,
// SenderTo<single<V, E>, single_tag> Wrapped>
// auto erase_cast(Wrapped w) {
// return single_deferred<V, E>{std::move(w)};
// }
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include "single.h"
namespace pushmi {
template <class V, class E, class TP>
class time_single_deferred<V, E, TP> {
union data {
void* pobj_ = nullptr;
char buffer_[sizeof(std::promise<int>)]; // can hold a V in-situ
} data_{};
template <class Wrapped>
static constexpr bool insitu() {
return sizeof(Wrapped) <= sizeof(data::buffer_) &&
std::is_nothrow_move_constructible_v<Wrapped>;
}
enum struct op { destroy, move };
struct vtable {
void (*op_)(op, data&, data*) = +[](op, data&, data*) {};
TP (*now_)(data&) = +[](data&) { return TP{}; };
void (*submit_)(data&, TP, single<V, E>) = +[](data&, TP, single<V, E>) {};
static constexpr vtable const noop_ = {};
} const* vptr_ = &vtable::noop_;
template <class Wrapped, bool = insitu<Wrapped>()>
static constexpr vtable const vtable_v = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
dst->pobj_ = std::exchange(src.pobj_, nullptr);
case op::destroy:
delete static_cast<Wrapped const*>(src.pobj_);
}
},
+[](data& src) -> TP {
return ::pushmi::now(*static_cast<Wrapped*>(src.pobj_));
},
+[](data& src, TP at, single<V, E> out) {
::pushmi::submit(*static_cast<Wrapped*>(src.pobj_), std::move(at), std::move(out));
}};
public:
using sender_category = single_tag;
time_single_deferred() = default;
time_single_deferred(time_single_deferred&& that) noexcept
: time_single_deferred() {
that.vptr_->op_(op::move, that.data_, &data_);
std::swap(that.vptr_, vptr_);
}
template <class T, class U = std::decay_t<T>>
using wrapped_t =
std::enable_if_t<!std::is_same_v<U, time_single_deferred>, U>;
template <class Wrapped, Sender<single_tag> W = wrapped_t<Wrapped>>
requires TimeSenderTo<W, single<V, E>>
explicit time_single_deferred(Wrapped obj)
: time_single_deferred() {
if constexpr (insitu<Wrapped>())
new (data_.buffer_) Wrapped(std::move(obj));
else
data_.pobj_ = new Wrapped(std::move(obj));
vptr_ = &vtable_v<Wrapped>;
}
~time_single_deferred() {
vptr_->op_(op::destroy, data_, nullptr);
}
time_single_deferred& operator=(time_single_deferred&& that) noexcept {
this->~time_single_deferred();
new ((void*)this) time_single_deferred(std::move(that));
return *this;
}
TP now() {
vptr_->now_(data_);
}
void submit(TP at, single<V, E> out) {
vptr_->submit_(data_, std::move(at), std::move(out));
}
};
// Class static definitions:
template <class V, class E, class TP>
constexpr typename time_single_deferred<V, E, TP>::vtable const
time_single_deferred<V, E, TP>::vtable::noop_;
template <class V, class E, class TP>
template <class Wrapped, bool Big>
constexpr typename time_single_deferred<V, E, TP>::vtable const
time_single_deferred<V, E, TP>::vtable_v;
template <class V, class E, class TP>
template <class Wrapped>
constexpr typename time_single_deferred<V, E, TP>::vtable const
time_single_deferred<V, E, TP>::vtable_v<Wrapped, true> = {
+[](op o, data& src, data* dst) {
switch (o) {
case op::move:
new (dst->buffer_) Wrapped(
std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
case op::destroy:
static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
}
},
+[](data& src) -> TP {
return ::pushmi::now(*static_cast<Wrapped*>((void*)src.buffer_));
},
+[](data& src, TP tp, single<V, E> out) {
::pushmi::submit(
*static_cast<Wrapped*>((void*)src.buffer_),
std::move(tp),
std::move(out));
}
};
template <class SF, Invocable NF>
class time_single_deferred<SF, NF> {
SF sf_{};
NF nf_{};
public:
using sender_category = single_tag;
constexpr time_single_deferred() = default;
constexpr explicit time_single_deferred(SF sf)
: sf_(std::move(sf)) {}
constexpr time_single_deferred(SF sf, NF nf)
: sf_(std::move(sf)), nf_(std::move(nf)) {}
auto now() {
return nf_();
}
template <Regular TP, Receiver<single_tag> Out>
requires Invocable<SF&, TP, Out>
void submit(TP tp, Out out) {
sf_(std::move(tp), std::move(out));
}
};
template <TimeSender<single_tag> Data, class DSF, Invocable<Data&> DNF>
class time_single_deferred<Data, DSF, DNF> {
Data data_{};
DSF sf_{};
DNF nf_{};
public:
using sender_category = single_tag;
constexpr time_single_deferred() = default;
constexpr explicit time_single_deferred(Data data)
: data_(std::move(data)) {}
constexpr time_single_deferred(Data data, DSF sf, DNF nf = DNF{})
: data_(std::move(data)), sf_(std::move(sf)), nf_(std::move(nf)) {}
auto now() {
return nf_(data_);
}
template <class TP, Receiver<single_tag> Out>
requires Invocable<DSF&, Data&, TP, Out>
void submit(TP tp, Out out) {
sf_(data_, std::move(tp), std::move(out));
}
};
time_single_deferred()->time_single_deferred<ignoreSF, systemNowF>;
template <class SF>
time_single_deferred(SF)->time_single_deferred<SF, systemNowF>;
template <class SF, Invocable NF>
time_single_deferred(SF, NF)->time_single_deferred<SF, NF>;
template <TimeSender<single_tag> Data, class DSF>
time_single_deferred(Data, DSF)->time_single_deferred<Data, DSF, passDNF>;
template <TimeSender<single_tag> Data, class DSF, class DNF>
time_single_deferred(Data, DSF, DNF)->time_single_deferred<Data, DSF, DNF>;
template <
class V,
class E = std::exception_ptr,
class TP = std::chrono::system_clock::time_point>
using any_time_single_deferred = time_single_deferred<V, E, TP>;
// template <
// class V,
// class E = std::exception_ptr,
// class TP = std::chrono::system_clock::time_point,
// TimeSenderTo<single<V, E>, single_tag> Wrapped>
// auto erase_cast(Wrapped w) {
// return time_single_deferred<V, E>{std::move(w)};
// }
} // namespace pushmi
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <type_traits>
namespace pushmi {
template <class T, template <class> class C>
concept bool Valid = requires { typename C<T>; };
template <class T, template<class...> class Trait, class... Args>
concept bool Satisfies = bool(Trait<T>::type::value);
template <class T>
concept bool Object = requires(T* p) {
*p;
{ p } -> const volatile void*;
};
template <class T, class... Args>
concept bool Constructible = __is_constructible(T, Args...);
template <class T>
concept bool MoveConstructible = Constructible<T, T>;
template <class From, class To>
concept bool ConvertibleTo =
std::is_convertible_v<From, To>&& requires(From (&f)()) {
static_cast<To>(f());
};
template <class T, class U>
concept bool Same = __is_same_as(T, U) && __is_same_as(U, T);
template <class A, class B>
concept bool Derived = __is_base_of(B, A);
template <class A>
concept bool Decayed = Same<A, std::decay_t<A>>;
template <class T, class U>
concept bool Assignable = Same<T, T&>&& requires(T t, U&& u) {
{ t = (U &&) u } -> Same<T>&&;
};
template <class T>
concept bool EqualityComparable = requires(std::remove_cvref_t<T> const & t) {
{ t == t } -> bool;
{ t != t } -> bool;
};
template <class T>
concept bool SemiMovable =
Object<T>&& Constructible<T, T>&& ConvertibleTo<T, T>;
template <class T>
concept bool Movable = SemiMovable<T>&& Assignable<T&, T>;
template <class T>
concept bool Copyable = Movable<T>&&
Assignable<T&, const T&> &&
ConvertibleTo<const T&, T>;
template <class T>
concept bool Semiregular = Copyable<T>&& Constructible<T>;
template <class T>
concept bool Regular = Semiregular<T>&& EqualityComparable<T>;
template <class F, class... Args>
concept bool Invocable = requires(F&& f, Args&&... args) {
std::invoke((F &&) f, (Args &&) args...);
};
template <class F, class... Args>
concept bool NothrowInvocable =
Invocable<F, Args...> && requires(F&& f, Args&&... args) {
{ std::invoke((F &&) f, (Args &&) args...) } noexcept;
};
namespace detail {
// is_ taken from meta library
template <typename, template <typename...> class>
struct is_ : std::false_type {};
template <typename... Ts, template <typename...> class C>
struct is_<C<Ts...>, C> : std::true_type {};
template <typename T, template <typename...> class C>
constexpr bool is_v = is_<T, C>::value;
template <bool B, class T = void>
using requires_ = std::enable_if_t<B, T>;
} // namespace detail
} // namespace pushmi
#if 1
#define PUSHMI_VOID_LAMBDA_REQUIRES(RequiresExp...) \
->::pushmi::detail::requires_<(RequiresExp)>
#define PUSHMI_T_LAMBDA_REQUIRES(T, RequiresExp...) \
->::pushmi::detail::requires_<(RequiresExp), T>
#elif 0
// unsupported syntax..
#define PUSHMI_VOID_LAMBDA_REQUIRES(RequiresExp...) ->void requires(RequiresExp)
#define PUSHMI_T_LAMBDA_REQUIRES(T, RequiresExp...) ->T requires(RequiresExp)
#else
#define PUSHMI_VOID_LAMBDA_REQUIRES(RequiresExp...) ->void
#define PUSHMI_T_LAMBDA_REQUIRES(T, RequiresExp...) ->T
#endif
#pragma once
// Copyright (c) 2018-present, Facebook, Inc.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
#include <chrono>
#include <deque>
#include <thread>
#include "executor.h"
#include "time_single_deferred.h"
namespace pushmi {
struct recurse_t {};
constexpr const recurse_t recurse{};
namespace detail {
struct ownordelegate_t {};
constexpr const ownordelegate_t ownordelegate{};
struct ownornest_t {};
constexpr const ownornest_t ownornest{};
class trampoline_id {
std::thread::id threadid;
uintptr_t trampolineid;
public:
template <class T>
explicit trampoline_id(T* trampoline)
: threadid(std::this_thread::get_id()), trampolineid(trampoline) {}
};
template <class E = std::exception_ptr>
class trampoline {
public:
using time_point = std::chrono::system_clock::time_point;
private:
using error_type = std::decay_t<E>;
using work_type =
any_single<any_time_executor_ref<error_type, time_point>, error_type>;
using queue_type = std::deque<std::tuple<time_point, work_type>>;
using pending_type = std::tuple<int, queue_type, time_point>;
inline static pending_type*& owner() {
static thread_local pending_type* pending = nullptr;
return pending;
}
inline static int& depth(pending_type& p) {
return std::get<0>(p);
}
inline static queue_type& pending(pending_type& p) {
return std::get<1>(p);
}
inline static time_point& next(pending_type& p) {
return std::get<2>(p);
}
public:
inline static trampoline_id get_id() {
return {owner()};
}
inline static bool is_owned() {
return owner() != nullptr;
}
inline static time_point now() {
return std::chrono::system_clock::now();
}
template <class Selector, class Derived>
static void submit(Selector, Derived&, time_point awhen, recurse_t) {
if (!is_owned()) {
abort();
}
next(*owner()) = awhen;
}
template <class SingleReceiver>
requires !Same<SingleReceiver, recurse_t>
static void submit(ownordelegate_t, time_point awhen, SingleReceiver awhat);
template <class SingleReceiver>
requires !Same<SingleReceiver, recurse_t>
static void submit(ownornest_t, time_point awhen, SingleReceiver awhat);
};
template <class E = std::exception_ptr>
class delegator {
using time_point = typename trampoline<E>::time_point;
public:
using sender_category = single_tag;
time_point now() {
return trampoline<E>::now();
}
template <class SingleReceiver>
requires Receiver<std::remove_cvref_t<SingleReceiver>, single_tag>
void submit(time_point when, SingleReceiver&& what) {
trampoline<E>::submit(
ownordelegate, when, std::forward<SingleReceiver>(what));
}
};
template <class E = std::exception_ptr>
class nester {
using time_point = typename trampoline<E>::time_point;
public:
using sender_category = single_tag;
time_point now() {
return trampoline<E>::now();
}
template <class SingleReceiver>
void submit(time_point when, SingleReceiver&& what) {
trampoline<E>::submit(ownornest, when, std::forward<SingleReceiver>(what));
}
};
template <class E>
template <class SingleReceiver>
requires !Same<SingleReceiver, recurse_t>
// static
void trampoline<E>::submit(
ownordelegate_t,
time_point awhen,
SingleReceiver awhat) {
delegator<E> that;
if (is_owned()) {
// thread already owned
// poor mans scope guard
try {
if (++depth(*owner()) > 100 || awhen > trampoline<E>::now()) {
// defer work to owner
pending(*owner()).push_back(
std::make_tuple(awhen, work_type{std::move(awhat)}));
} else {
// dynamic recursion - optimization to balance queueing and
// stack usage and value interleaving on the same thread.
::pushmi::set_value(awhat, that);
}
} catch(...){
--depth(*owner());
throw;
}
--depth(*owner());
return;
}
// take over the thread
pending_type pending_store;
owner() = &pending_store;
depth(pending_store) = 0;
// poor mans scope guard
try {
trampoline<E>::submit(ownornest, awhen, std::move(awhat));
} catch(...) {
// ignore exceptions while delivering the exception
try {
::pushmi::set_error(awhat, std::current_exception());
for (auto& item : pending(pending_store)) {
auto& what = std::get<1>(item);
::pushmi::set_error(what, std::current_exception());
}
} catch (...) {
}
pending(pending_store).clear();
if(!is_owned()) { std::abort(); }
if(!pending(pending_store).empty()) { std::abort(); }
owner() = nullptr;
throw;
}
if(!is_owned()) { std::abort(); }
if(!pending(pending_store).empty()) { std::abort(); }
owner() = nullptr;
}
template <class E>
template <class SingleReceiver>
requires !Same<SingleReceiver, recurse_t>
// static
void trampoline<E>::submit(
ownornest_t,
time_point awhen,
SingleReceiver awhat) {
delegator<E> that;
if (!is_owned()) {
trampoline<E>::submit(ownordelegate, awhen, std::move(awhat));
return;
}
auto& pending_store = *owner();
// static recursion - tail call optimization
if (pending(pending_store).empty()) {
auto when = awhen;
while (when != time_point{}) {
if (when > trampoline<E>::now()) {
std::this_thread::sleep_until(when);
}
next(pending_store) = time_point{};
::pushmi::set_value(awhat, that);
when = next(pending_store);
}
} else {
// ensure work is sorted by time
pending(pending_store)
.push_back(std::make_tuple(awhen, work_type{std::move(awhat)}));
}
if (pending(pending_store).empty()) {
return;
}
while (!pending(pending_store).empty()) {
std::stable_sort(
pending(pending_store).begin(),
pending(pending_store).end(),
[](auto& lhs, auto& rhs) {
auto& lwhen = std::get<0>(lhs);
auto& rwhen = std::get<0>(rhs);
return lwhen < rwhen;
});
auto item = std::move(pending(pending_store).front());
pending(pending_store).pop_front();
auto& when = std::get<0>(item);
auto& what = std::get<1>(item);
any_time_executor_ref<error_type, time_point> anythis{that};
::pushmi::set_value(what, anythis);
}
}
} // namespace detail
template <class E = std::exception_ptr>
detail::trampoline_id get_trampoline_id() {
if(!detail::trampoline<E>::is_owned()) { std::abort(); }
return detail::trampoline<E>::get_id();
}
template <class E = std::exception_ptr>
bool owned_by_trampoline() {
return detail::trampoline<E>::is_owned();
}
template <class E = std::exception_ptr>
inline detail::delegator<E> trampoline() {
return {};
}
template <class E = std::exception_ptr>
inline detail::nester<E> nested_trampoline() {
return {};
}
namespace detail {
template <class E>
requires TimeSenderTo<delegator<E>, recurse_t>
decltype(auto) repeat(delegator<E>& exec) {
::pushmi::submit(exec, ::pushmi::now(exec), recurse);
}
template <class AnyExec>
void repeat(AnyExec& exec) {
std::abort();
}
} // namespace detail
inline auto repeat() {
return [](auto& exec) { detail::repeat(exec); };
}
} // namespace pushmi
add_executable(PushmiTest catch.cpp PushmiTest.cpp)
target_link_libraries(PushmiTest
pushmi
Threads::Threads)
#include "catch.hpp"
#include <type_traits>
#include <chrono>
using namespace std::literals;
#include "pushmi/flow_single_deferred.h"
#include "pushmi/o/empty.h"
#include "pushmi/o/just.h"
#include "pushmi/o/on.h"
#include "pushmi/o/transform.h"
#include "pushmi/o/tap.h"
#include "pushmi/o/via.h"
#include "pushmi/o/submit.h"
#include "pushmi/o/extension_operators.h"
#include "pushmi/trampoline.h"
#include "pushmi/new_thread.h"
using namespace pushmi::aliases;
SCENARIO( "empty can be used with tap and submit", "[empty][deferred]" ) {
GIVEN( "An empty deferred" ) {
auto e = op::empty();
using E = decltype(e);
REQUIRE( v::SenderTo<E, v::any_none<>, v::none_tag> );
WHEN( "tap and submit are applied" ) {
int signals = 0;
e |
op::tap(
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; }) |
op::submit(
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; });
THEN( "the done signal is recorded twice" ) {
REQUIRE( signals == 20 );
}
WHEN( "future_from is applied" ) {
v::future_from(e).get();
THEN( "future_from(e) returns std::future<void>" ) {
REQUIRE( std::is_same_v<std::future<void>, decltype(v::future_from(e))> );
}
}
}
}
GIVEN( "An empty int single_deferred" ) {
auto e = op::empty<int>();
using E = decltype(e);
REQUIRE( v::SenderTo<E, v::any_single<int>, v::single_tag> );
WHEN( "tap and submit are applied" ) {
int signals = 0;
e |
op::tap(
[&](auto v){ signals += 100; },
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; }) |
op::submit(
[&](auto v){ signals += 100; },
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; });
THEN( "the done signal is recorded twice" ) {
REQUIRE( signals == 20 );
}
}
}
}
SCENARIO( "just() can be used with transform and submit", "[just][deferred]" ) {
GIVEN( "A just int single_deferred" ) {
auto j = op::just(20);
using J = decltype(j);
REQUIRE( v::SenderTo<J, v::any_single<int>, v::single_tag> );
WHEN( "transform and submit are applied" ) {
int signals = 0;
int value = 0;
j |
op::transform(
[&](int v){ signals += 10000; return v + 1; },
[&](auto v){ std:abort(); return v; }) |
op::transform(
[&](int v){ signals += 10000; return v * 2; }) |
op::submit(
[&](auto v){ value = v; signals += 100; },
[&](auto e) noexcept { signals += 1000; },
[&](){ signals += 10; });
THEN( "the transform signal is recorded twice, the value signal once and the result is correct" ) {
REQUIRE( signals == 20100 );
REQUIRE( value == 42 );
}
}
WHEN( "future_from<int> is applied" ) {
auto twenty = v::future_from<int>(j).get();
THEN( "the value signal is recorded once and the result is correct" ) {
REQUIRE( twenty == 20 );
REQUIRE( std::is_same_v<std::future<int>, decltype(v::future_from<int>(j))> );
}
}
}
}
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment