Commit eb464ae8 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

remove stopping signal

fbshipit-source-id: b71f93b93d0915a5c913bddb8e4bdbbe7072bce7
parent 0d4b1a6c
This diff is collapsed.
...@@ -56,10 +56,6 @@ struct ignoreDF { ...@@ -56,10 +56,6 @@ struct ignoreDF {
void operator()() {} void operator()() {}
}; };
struct ignoreStpF {
void operator()() {}
};
struct ignoreStrtF { struct ignoreStrtF {
template <class Up> template <class Up>
void operator()(Up&&) {} void operator()(Up&&) {}
...@@ -103,14 +99,6 @@ struct passDDF { ...@@ -103,14 +99,6 @@ struct passDDF {
} }
}; };
struct passDStpF {
PUSHMI_TEMPLATE(class Data)
(requires Receiver<Data>)
void operator()(Data& out) const {
::pushmi::set_stopping(out);
}
};
struct passDStrtF { struct passDStrtF {
PUSHMI_TEMPLATE(class Up, class Data) PUSHMI_TEMPLATE(class Up, class Data)
(requires requires ( (requires requires (
...@@ -243,18 +231,6 @@ auto on_done(Fn fn) -> on_done_fn<Fn> { ...@@ -243,18 +231,6 @@ auto on_done(Fn fn) -> on_done_fn<Fn> {
return on_done_fn<Fn>{std::move(fn)}; return on_done_fn<Fn>{std::move(fn)};
} }
template <class Fn>
struct on_stopping_fn : Fn {
constexpr on_stopping_fn() = default;
constexpr explicit on_stopping_fn(Fn fn) : Fn(std::move(fn)) {}
using Fn::operator();
};
template <class Fn>
auto on_stopping(Fn fn) -> on_stopping_fn<Fn> {
return on_stopping_fn<Fn>{std::move(fn)};
}
template <class... Fns> template <class... Fns>
struct on_starting_fn : overload_fn<Fns...> { struct on_starting_fn : overload_fn<Fns...> {
constexpr on_starting_fn() = default; constexpr on_starting_fn() = default;
......
...@@ -256,9 +256,6 @@ PUSHMI_CONCEPT_DEF( ...@@ -256,9 +256,6 @@ PUSHMI_CONCEPT_DEF(
PUSHMI_CONCEPT_DEF( PUSHMI_CONCEPT_DEF(
template (class S, class... PropertyN) template (class S, class... PropertyN)
(concept FlowReceiver)(S, PropertyN...), (concept FlowReceiver)(S, PropertyN...),
requires(S& s) (
::pushmi::set_stopping(s)
) &&
Receiver<S> && Receiver<S> &&
property_query_v<S, PropertyN...> && property_query_v<S, PropertyN...> &&
Flow<S> Flow<S>
......
...@@ -27,11 +27,6 @@ void set_value(S& s, V&& v) noexcept(noexcept(s.value((V&&) v))) { ...@@ -27,11 +27,6 @@ void set_value(S& s, V&& v) noexcept(noexcept(s.value((V&&) v))) {
s.value((V&&) v); s.value((V&&) v);
} }
PUSHMI_TEMPLATE (class S)
(requires requires (std::declval<S&>().stopping()))
void set_stopping(S& s) noexcept(noexcept(s.stopping())) {
s.stopping();
}
PUSHMI_TEMPLATE (class S, class Up) PUSHMI_TEMPLATE (class S, class Up)
(requires requires (std::declval<S&>().starting(std::declval<Up>()))) (requires requires (std::declval<S&>().starting(std::declval<Up>())))
void set_starting(S& s, Up up) noexcept(noexcept(s.starting(std::move(up)))) { void set_starting(S& s, Up up) noexcept(noexcept(s.starting(std::move(up)))) {
...@@ -100,12 +95,6 @@ void set_value(std::reference_wrapper<S> s, V&& v) noexcept( ...@@ -100,12 +95,6 @@ void set_value(std::reference_wrapper<S> s, V&& v) noexcept(
noexcept(set_value(s.get(), (V&&) v))) { noexcept(set_value(s.get(), (V&&) v))) {
set_value(s.get(), (V&&) v); set_value(s.get(), (V&&) v);
} }
PUSHMI_TEMPLATE (class S)
(requires requires ( set_stopping(std::declval<S&>()) ))
void set_stopping(std::reference_wrapper<S> s) noexcept(
noexcept(set_stopping(s.get()))) {
set_stopping(s.get());
}
PUSHMI_TEMPLATE (class S, class Up) PUSHMI_TEMPLATE (class S, class Up)
(requires requires ( set_starting(std::declval<S&>(), std::declval<Up>()) )) (requires requires ( set_starting(std::declval<S&>(), std::declval<Up>()) ))
void set_starting(std::reference_wrapper<S> s, Up up) noexcept( void set_starting(std::reference_wrapper<S> s, Up up) noexcept(
...@@ -176,15 +165,6 @@ struct set_value_fn { ...@@ -176,15 +165,6 @@ struct set_value_fn {
} }
}; };
struct set_stopping_fn {
PUSHMI_TEMPLATE (class S)
(requires requires (
set_stopping(std::declval<S&>())
))
void operator()(S&& s) const noexcept(noexcept(set_stopping(s))) {
set_stopping(s);
}
};
struct set_starting_fn { struct set_starting_fn {
PUSHMI_TEMPLATE (class S, class Up) PUSHMI_TEMPLATE (class S, class Up)
(requires requires ( (requires requires (
...@@ -239,7 +219,6 @@ struct get_now_fn { ...@@ -239,7 +219,6 @@ struct get_now_fn {
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done{}; PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done{};
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error{}; PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error{};
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value{}; PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value{};
PUSHMI_INLINE_VAR constexpr __adl::set_stopping_fn set_stopping{};
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting{}; PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting{};
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit{}; PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit{};
PUSHMI_INLINE_VAR constexpr __adl::get_now_fn now{}; PUSHMI_INLINE_VAR constexpr __adl::get_now_fn now{};
......
...@@ -214,16 +214,6 @@ struct set_done_fn { ...@@ -214,16 +214,6 @@ struct set_done_fn {
} }
}; };
struct set_stopping_fn {
auto operator()() const {
return constrain(lazy::Receiver<_1>,
[](auto out) {
::pushmi::set_stopping(out);
}
);
}
};
struct set_starting_fn { struct set_starting_fn {
PUSHMI_TEMPLATE(class Up) PUSHMI_TEMPLATE(class Up)
(requires Receiver<Up>) (requires Receiver<Up>)
...@@ -274,7 +264,6 @@ namespace extension_operators { ...@@ -274,7 +264,6 @@ namespace extension_operators {
PUSHMI_INLINE_VAR constexpr detail::set_done_fn set_done{}; PUSHMI_INLINE_VAR constexpr detail::set_done_fn set_done{};
PUSHMI_INLINE_VAR constexpr detail::set_error_fn set_error{}; PUSHMI_INLINE_VAR constexpr detail::set_error_fn set_error{};
PUSHMI_INLINE_VAR constexpr detail::set_value_fn set_value{}; PUSHMI_INLINE_VAR constexpr detail::set_value_fn set_value{};
PUSHMI_INLINE_VAR constexpr detail::set_stopping_fn set_stopping{};
PUSHMI_INLINE_VAR constexpr detail::set_starting_fn set_starting{}; PUSHMI_INLINE_VAR constexpr detail::set_starting_fn set_starting{};
PUSHMI_INLINE_VAR constexpr detail::do_submit_fn submit{}; PUSHMI_INLINE_VAR constexpr detail::do_submit_fn submit{};
PUSHMI_INLINE_VAR constexpr detail::now_fn now{}; PUSHMI_INLINE_VAR constexpr detail::now_fn now{};
......
...@@ -147,15 +147,6 @@ class fsdon { ...@@ -147,15 +147,6 @@ class fsdon {
out_.error(std::move(e)); out_.error(std::move(e));
} }
void stopping() {
if (done_) {
return;
}
done_ = true;
*stopped_ = true;
out_.stopping();
}
template <class Producer> template <class Producer>
void starting(RefWrapper<Producer> up) { void starting(RefWrapper<Producer> up) {
upProxy_ = upProxy_ =
......
...@@ -183,24 +183,6 @@ class fsdvia { ...@@ -183,24 +183,6 @@ class fsdvia {
}); });
} }
void stopping() {
if (done_) {
return;
}
if (!upProxy_) {
std::abort();
}
done_ = true;
if (!shared_->stopped_.exchange(true)) {
exec_ |
// must keep out and upProxy alive until out is notified that it
// is unsafe
execute([shared = shared_](auto) mutable {
shared->out_.stopping();
});
}
}
template <class Producer> template <class Producer>
void starting(RefWrapper<Producer> up) { void starting(RefWrapper<Producer> up) {
if (!!upProxy_) { if (!!upProxy_) {
......
...@@ -257,13 +257,6 @@ void flow_single_test() { ...@@ -257,13 +257,6 @@ void flow_single_test() {
pushmi::ignoreVF{}, pushmi::ignoreVF{},
pushmi::abortEF{}, pushmi::abortEF{},
pushmi::ignoreDF{}, pushmi::ignoreDF{},
pushmi::ignoreStpF{});
auto out9 =
pushmi::MAKE(flow_single)(
pushmi::ignoreVF{},
pushmi::abortEF{},
pushmi::ignoreDF{},
pushmi::ignoreStpF{},
pushmi::ignoreStrtF{}); pushmi::ignoreStrtF{});
using Out0 = decltype(out0); using Out0 = decltype(out0);
...@@ -297,11 +290,6 @@ void flow_single_test() { ...@@ -297,11 +290,6 @@ void flow_single_test() {
pushmi::passDVF{}, pushmi::passDVF{},
pushmi::passDEF{}, pushmi::passDEF{},
pushmi::passDDF{}); pushmi::passDDF{});
auto proxy9 = pushmi::MAKE(flow_single)(out0,
pushmi::passDVF{},
pushmi::passDEF{},
pushmi::passDDF{},
pushmi::passDStpF{});
auto any2 = pushmi::any_flow_single<int>(out0); auto any2 = pushmi::any_flow_single<int>(out0);
auto any3 = pushmi::any_flow_single<int>(proxy0); auto any3 = pushmi::any_flow_single<int>(proxy0);
......
...@@ -46,11 +46,11 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") { ...@@ -46,11 +46,11 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") {
auto up = mi::MAKE(none)( auto up = mi::MAKE(none)(
Data{std::move(tokens.second)}, Data{std::move(tokens.second)},
[&](auto& data, auto e) noexcept { [&](auto& data, auto e) noexcept {
signals += 1000000; signals += 100000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}, },
[&](auto& data) { [&](auto& data) {
signals += 100000; signals += 10000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}); });
...@@ -64,9 +64,6 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") { ...@@ -64,9 +64,6 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") {
// cancellation is not an error // cancellation is not an error
::mi::set_done(out); ::mi::set_done(out);
} }
// I want to get rid of this signal it makes usage harder and
// messes up r-value qualifing done, error and value.
::mi::set_stopping(out);
}); });
WHEN("submit is applied and cancels the producer") { WHEN("submit is applied and cancels the producer") {
...@@ -75,7 +72,6 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") { ...@@ -75,7 +72,6 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// immediately stop producer // immediately stop producer
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -83,8 +79,8 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") { ...@@ -83,8 +79,8 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") {
})); }));
THEN( THEN(
"the starting, up.done, out.done and out.stopping signals are each recorded once") { "the starting, up.done and out.done signals are each recorded once") {
REQUIRE(signals == 110011); REQUIRE(signals == 10011);
} }
} }
...@@ -94,13 +90,12 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") { ...@@ -94,13 +90,12 @@ SCENARIO("flow single immediate cancellation", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// do not stop producer before it is scheduled to run // do not stop producer before it is scheduled to run
mi::on_starting([&](auto up) { signals += 10; })); mi::on_starting([&](auto up) { signals += 10; }));
THEN( THEN(
"the starting, out.value and out.stopping signals are each recorded once") { "the starting and out.value signals are each recorded once") {
REQUIRE(signals == 10110); REQUIRE(signals == 110);
} }
} }
} }
...@@ -132,11 +127,11 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -132,11 +127,11 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
auto up = mi::MAKE(none)( auto up = mi::MAKE(none)(
Data{std::move(tokens.second)}, Data{std::move(tokens.second)},
[&](auto& data, auto e) noexcept { [&](auto& data, auto e) noexcept {
signals += 1000000; signals += 100000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}, },
[&](auto& data) { [&](auto& data) {
signals += 100000; signals += 10000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}); });
...@@ -160,9 +155,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -160,9 +155,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
// cancellation is not an error // cancellation is not an error
::mi::set_done(out); ::mi::set_done(out);
} }
// I want to get rid of this signal it makes usage harder
// and messes up r-value qualifing done, error and value.
::mi::set_stopping(out);
}); });
}); });
}); });
...@@ -173,7 +165,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -173,7 +165,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// stop producer before it is scheduled to run // stop producer before it is scheduled to run
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -183,8 +174,8 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -183,8 +174,8 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
})); }));
THEN( THEN(
"the starting, up.done, out.done and out.stopping signals are each recorded once") { "the starting, up.done and out.done signals are each recorded once") {
REQUIRE(signals == 110011); REQUIRE(signals == 10011);
} }
} }
...@@ -194,7 +185,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -194,7 +185,6 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// do not stop producer before it is scheduled to run // do not stop producer before it is scheduled to run
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -205,8 +195,8 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") { ...@@ -205,8 +195,8 @@ SCENARIO("flow single cancellation trampoline", "[flow][deferred]") {
})); }));
THEN( THEN(
"the starting, up.done, out.value and out.stopping signals are each recorded once") { "the starting, up.done and out.value signals are each recorded once") {
REQUIRE(signals == 110110); REQUIRE(signals == 10110);
} }
} }
} }
...@@ -248,11 +238,11 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -248,11 +238,11 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
auto up = mi::MAKE(none)( auto up = mi::MAKE(none)(
Data{std::move(tokens.second)}, Data{std::move(tokens.second)},
[&](auto& data, auto e) noexcept { [&](auto& data, auto e) noexcept {
signals += 1000000; signals += 100000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}, },
[&](auto& data) { [&](auto& data) {
signals += 100000; signals += 10000;
data.stopper.t(data.stopper); data.stopper.t(data.stopper);
}); });
...@@ -278,9 +268,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -278,9 +268,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
// cancellation is not an error // cancellation is not an error
::mi::set_done(out); ::mi::set_done(out);
} }
// I want to get rid of this signal it makes usage harder
// and messes up r-value qualifing done, error and value.
::mi::set_stopping(out);
}); });
}); });
}); });
...@@ -291,7 +278,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -291,7 +278,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// stop producer before it is scheduled to run // stop producer before it is scheduled to run
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -303,8 +289,8 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -303,8 +289,8 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
})); }));
THEN( THEN(
"the starting, up.done, out.done and out.stopping signals are each recorded once") { "the starting, up.done and out.done signals are each recorded once") {
REQUIRE(signals == 110011); REQUIRE(signals == 10011);
} }
} }
...@@ -314,7 +300,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -314,7 +300,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// do not stop producer before it is scheduled to run // do not stop producer before it is scheduled to run
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -328,16 +313,16 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -328,16 +313,16 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
THEN( THEN(
"the starting, up.done, out.value and out.stopping signals are each recorded once") { "the starting, up.done and out.value signals are each recorded once") {
REQUIRE(signals == 110110); REQUIRE(signals == 10110);
} }
} }
WHEN("submit is applied and cancels the producer at the same time") { WHEN("submit is applied and cancels the producer at the same time") {
// count known results // count known results
int total = 0; int total = 0;
int cancellostrace = 0; // 110110 int cancellostrace = 0; // 10110
int cancelled = 0; // 110011 int cancelled = 0; // 10011
for (;;) { for (;;) {
signals = 0; signals = 0;
...@@ -348,7 +333,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -348,7 +333,6 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
mi::on_error([&](auto) noexcept { signals += 1000; }), mi::on_error([&](auto) noexcept { signals += 1000; }),
mi::on_done([&]() { signals += 1; }), mi::on_done([&]() { signals += 1; }),
mi::on_stopping([&]() { signals += 10000; }),
// stop producer at the same time that it is scheduled to run // stop producer at the same time that it is scheduled to run
mi::on_starting([&](auto up) { mi::on_starting([&](auto up) {
signals += 10; signals += 10;
...@@ -362,8 +346,8 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -362,8 +346,8 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
// accumulate known signals // accumulate known signals
++total; ++total;
cancellostrace += signals == 110110; cancellostrace += signals == 10110;
cancelled += signals == 110011; cancelled += signals == 10011;
if (total != cancellostrace + cancelled) { if (total != cancellostrace + cancelled) {
// display the unrecognized signals recorded // display the unrecognized signals recorded
...@@ -376,7 +360,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -376,7 +360,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
<< ", cancelled " << cancelled); << ", cancelled " << cancelled);
break; break;
} }
if (!!cancellostrace && !!cancelled) { if (cancellostrace > 4 && cancelled > 4) {
// yay all known outcomes were observed! // yay all known outcomes were observed!
break; break;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment