Commit 5b474b9d authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

add take_until

Summary: add take_until algorithm

Reviewed By: ericniebler

Differential Revision: D13697233

fbshipit-source-id: b2e060bc88cb652dc196e4915961d811a97b414d
parent 1689b2f7
...@@ -570,7 +570,7 @@ BENCHMARK(trampoline_flow_many_sender_1000, n) { ...@@ -570,7 +570,7 @@ BENCHMARK(trampoline_flow_many_sender_1000, n) {
BENCHMARK_SUSPEND { BENCHMARK_SUSPEND {
std::iota(values.begin(), values.end(), 1); std::iota(values.begin(), values.end(), 1);
} }
auto f = op::flow_from(values, tr) | op::tap([&](int){ auto f = op::flow_from(values, [&]{return tr;}) | op::tap([&](int){
--counter; --counter;
}); });
FOR_EACH_RANGE (i, 0, n) { FOR_EACH_RANGE (i, 0, n) {
......
...@@ -68,6 +68,7 @@ ...@@ -68,6 +68,7 @@
#define PUSHMI_PP_IGNORE_SHADOW_BEGIN \ #define PUSHMI_PP_IGNORE_SHADOW_BEGIN \
_Pragma("GCC diagnostic push") \ _Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wshadow\"") \ _Pragma("GCC diagnostic ignored \"-Wshadow\"") \
_Pragma("GCC diagnostic ignored \"-Wshadow-local\"") \
/**/ /**/
#define PUSHMI_PP_IGNORE_SHADOW_END \ #define PUSHMI_PP_IGNORE_SHADOW_END \
_Pragma("GCC diagnostic pop") _Pragma("GCC diagnostic pop")
......
...@@ -32,6 +32,15 @@ struct priorityZeroF { ...@@ -32,6 +32,15 @@ struct priorityZeroF {
auto operator()(){ return 0; } auto operator()(){ return 0; }
}; };
PUSHMI_TEMPLATE(class Exec)
(requires Strand<Exec>)
struct strandFactory {
Exec ex_;
strandFactory() = default;
explicit strandFactory(Exec ex) : ex_(std::move(ex)) {}
Exec operator()(){ return ex_; }
};
struct passDNF { struct passDNF {
PUSHMI_TEMPLATE(class Data) PUSHMI_TEMPLATE(class Data)
(requires TimeExecutor<Data>) (requires TimeExecutor<Data>)
......
...@@ -275,6 +275,12 @@ inline detail::nester<E> nested_trampoline() { ...@@ -275,6 +275,12 @@ inline detail::nester<E> nested_trampoline() {
return {}; return {};
} }
PUSHMI_INLINE_VAR constexpr auto trampolines =
strandFactory<detail::delegator<std::exception_ptr>>{};
PUSHMI_INLINE_VAR constexpr auto nested_trampolines =
strandFactory<detail::nester<std::exception_ptr>>{};
namespace detail { namespace detail {
PUSHMI_TEMPLATE(class E) PUSHMI_TEMPLATE(class E)
......
...@@ -117,8 +117,8 @@ template <class Producer> ...@@ -117,8 +117,8 @@ template <class Producer>
struct flow_from_up { struct flow_from_up {
using receiver_category = receiver_tag; using receiver_category = receiver_tag;
explicit flow_from_up(std::shared_ptr<Producer> p_) : p(std::move(p_)) {} explicit flow_from_up(std::shared_ptr<Producer> p) : p_(std::move(p)) {}
std::shared_ptr<Producer> p; std::shared_ptr<Producer> p_;
void value(std::ptrdiff_t requested) { void value(std::ptrdiff_t requested) {
if (requested < 1) { if (requested < 1) {
...@@ -126,8 +126,8 @@ struct flow_from_up { ...@@ -126,8 +126,8 @@ struct flow_from_up {
} }
// submit work to exec // submit work to exec
::folly::pushmi::submit( ::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec), ::folly::pushmi::schedule(p_->exec),
make_receiver([p = p, requested](auto) { make_receiver([p = p_, requested](auto) {
auto remaining = requested; auto remaining = requested;
// this loop is structured to work when there is // this loop is structured to work when there is
// re-entrancy out.value in the loop may call up.value. // re-entrancy out.value in the loop may call up.value.
...@@ -146,17 +146,17 @@ struct flow_from_up { ...@@ -146,17 +146,17 @@ struct flow_from_up {
template <class E> template <class E>
void error(E) noexcept { void error(E) noexcept {
p->stop.store(true); p_->stop.store(true);
::folly::pushmi::submit( ::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec), ::folly::pushmi::schedule(p_->exec),
flow_from_done<Producer>{p}); flow_from_done<Producer>{p_});
} }
void done() { void done() {
p->stop.store(true); p_->stop.store(true);
::folly::pushmi::submit( ::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec), ::folly::pushmi::schedule(p_->exec),
flow_from_done<Producer>{p}); flow_from_done<Producer>{p_});
} }
}; };
...@@ -184,8 +184,8 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { ...@@ -184,8 +184,8 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
} }
}; };
template <class I, class S, class Exec> template <class I, class S, class EF>
struct task struct sender_impl
: flow_sender_tag::with_values<typename std::iterator_traits<I>::value_type> : flow_sender_tag::with_values<typename std::iterator_traits<I>::value_type>
::no_error ::no_error
, pipeorigin { , pipeorigin {
...@@ -193,23 +193,21 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { ...@@ -193,23 +193,21 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
I begin_; I begin_;
S end_; S end_;
Exec exec_; EF ef_;
sender_impl(I begin, S end, EF ef) : begin_(begin), end_(end), ef_(ef) {}
task(I begin, S end, Exec exec)
: begin_(begin), end_(end), exec_(exec) {
}
PUSHMI_TEMPLATE(class Out) PUSHMI_TEMPLATE(class Out)
(requires ReceiveValue< (requires ReceiveValue<
Out&, Out&,
typename std::iterator_traits<I>::value_type>) // typename std::iterator_traits<I>::value_type>) //
void submit(Out out) { void submit(Out out) {
auto exec = ::folly::pushmi::make_strand(ef_);
using Exec = decltype(exec);
using Producer = flow_from_producer<I, S, Out, Exec>; using Producer = flow_from_producer<I, S, Out, Exec>;
auto p = std::make_shared<Producer>( auto p = std::make_shared<Producer>(
begin_, end_, std::move(out), exec_, false); begin_, end_, std::move(out), std::move(exec), false);
::folly::pushmi::submit( ::folly::pushmi::submit(
::folly::pushmi::schedule(exec_), receiver_impl<Producer>{p}); ::folly::pushmi::schedule(p->exec), receiver_impl<Producer>{p});
} }
}; };
...@@ -219,27 +217,27 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { ...@@ -219,27 +217,27 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
typename std::iterator_traits<I>::iterator_category, typename std::iterator_traits<I>::iterator_category,
std::forward_iterator_tag>) // std::forward_iterator_tag>) //
auto operator()(I begin, S end) const { auto operator()(I begin, S end) const {
return (*this)(begin, end, trampoline()); return (*this)(begin, end, trampolines);
} }
PUSHMI_TEMPLATE(class R) PUSHMI_TEMPLATE(class R)
(requires Range<R>) // (requires Range<R>) //
auto operator()(R&& range) const { auto operator()(R&& range) const {
return (*this)(std::begin(range), std::end(range), trampoline()); return (*this)(std::begin(range), std::end(range), trampolines);
} }
PUSHMI_TEMPLATE(class I, class S, class Exec) PUSHMI_TEMPLATE(class I, class S, class EF)
(requires DerivedFrom< (requires DerivedFrom<
typename std::iterator_traits<I>::iterator_category, typename std::iterator_traits<I>::iterator_category,
std::forward_iterator_tag>&& Executor<Exec>) // std::forward_iterator_tag>&& StrandFactory<EF>) //
auto operator()(I begin, S end, Exec exec) const { auto operator()(I begin, S end, EF ef) const {
return task<I, S, Exec>{begin, end, exec}; return sender_impl<I, S, EF>{begin, end, ef};
} }
PUSHMI_TEMPLATE(class R, class Exec) PUSHMI_TEMPLATE(class R, class EF)
(requires Range<R>&& Executor<Exec>) // (requires Range<R>&& StrandFactory<EF>) //
auto operator()(R&& range, Exec exec) const { auto operator()(R&& range, EF ef) const {
return (*this)(std::begin(range), std::end(range), exec); return (*this)(std::begin(range), std::end(range), ef);
} }
} flow_from{}; } flow_from{};
......
This diff is collapsed.
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <type_traits>
#include <chrono>
using namespace std::literals;
#include <folly/experimental/pushmi/sender/flow_sender.h>
#include <folly/experimental/pushmi/o/empty.h>
#include <folly/experimental/pushmi/o/extension_operators.h>
#include <folly/experimental/pushmi/o/for_each.h>
#include <folly/experimental/pushmi/o/from.h>
#include <folly/experimental/pushmi/o/just.h>
#include <folly/experimental/pushmi/o/take_until.h>
#include <folly/experimental/pushmi/o/tap.h>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/o/via.h>
#include <folly/experimental/pushmi/executor/new_thread.h>
#include <folly/experimental/pushmi/executor/strand.h>
#include <folly/experimental/pushmi/executor/trampoline.h>
using namespace folly::pushmi::aliases;
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
using namespace testing;
namespace detail {
struct receiver_counters {
std::atomic<int> values_{0};
std::atomic<int> errors_{0};
std::atomic<int> dones_{0};
std::atomic<int> startings_{0};
std::atomic<int> finallys_{0};
};
template <class Base = mi::receiver<>>
struct ReceiverSignals_ : Base {
~ReceiverSignals_() {}
ReceiverSignals_(const ReceiverSignals_&) = default;
ReceiverSignals_& operator=(const ReceiverSignals_&) = default;
ReceiverSignals_(ReceiverSignals_&&) = default;
ReceiverSignals_& operator=(ReceiverSignals_&&) = default;
explicit ReceiverSignals_(std::string id) : id_(std::move(id)), counters_(std::make_shared<receiver_counters>()) {}
std::string id_;
std::shared_ptr<receiver_counters> counters_;
void value(mi::detail::any) {
if (mi::FlowReceiver<ReceiverSignals_> != false) {
EXPECT_THAT(counters_->startings_.load(), Eq(1))
<< "[" << id_
<< "]::value() expected the starting signal to be recorded before the value signal";
}
EXPECT_THAT(counters_->finallys_.load(), Eq(0))
<< "[" << id_
<< "]::value() expected the value signal to be recorded before the done/error signal";
++counters_->values_;
}
void error(mi::detail::any) noexcept {
if (mi::FlowReceiver<ReceiverSignals_> != false) {
EXPECT_THAT(counters_->startings_.load(), Eq(1))
<< "[" << id_
<< "]::error() expected the starting signal to be recorded before the error signal";
}
EXPECT_THAT(counters_->finallys_.load(), Eq(0))
<< "[" << id_
<< "]::error() expected only one of done/error signals to be recorded";
++counters_->errors_;
++counters_->finallys_;
}
void done() {
if (mi::FlowReceiver<ReceiverSignals_> != false) {
EXPECT_THAT(counters_->startings_.load(), Eq(1))
<< "[" << id_
<< "]::done() expected the starting signal to be recorded before the done signal";
}
EXPECT_THAT(counters_->finallys_.load(), Eq(0))
<< "[" << id_
<< "]::done() expected only one of done/error signals to be recorded";
++counters_->dones_;
++counters_->finallys_;
}
void starting(mi::detail::any) {
EXPECT_THAT(counters_->startings_.load(), Eq(0))
<< "[" << id_
<< "]::starting() expected the starting signal to be recorded once";
++counters_->startings_;
}
void wait() {
while (counters_->finallys_.load() == 0) {
}
}
void verifyValues(int count) {
EXPECT_THAT(counters_->values_.load(), Eq(count))
<< "[" << id_
<< "]::verifyValues() expected the value signal to be recorded ["
<< count << "] times.";
}
void verifyErrors() {
EXPECT_THAT(counters_->errors_.load(), Eq(1))
<< "[" << id_
<< "]::verifyErrors() expected the error signal to be recorded once";
EXPECT_THAT(counters_->dones_.load(), Eq(0))
<< "[" << id_
<< "]::verifyErrors() expected the dones signal not to be recorded";
EXPECT_THAT(counters_->finallys_.load(), Eq(1))
<< "[" << id_
<< "]::verifyErrors() expected the finally signal to be recorded once";
}
void verifyDones() {
EXPECT_THAT(counters_->dones_.load(), Eq(1))
<< "[" << id_
<< "]::verifyDones() expected the dones signal to be recorded once";
EXPECT_THAT(counters_->errors_.load(), Eq(0))
<< "[" << id_
<< "]::verifyDones() expected the errors signal not to be recorded";
EXPECT_THAT(counters_->finallys_.load(), Eq(1))
<< "[" << id_
<< "]::verifyDones() expected the finally signal to be recorded once";
}
void verifyFinal() {
if (mi::FlowReceiver<ReceiverSignals_> == false) {
EXPECT_THAT(counters_->startings_.load(), Eq(0))
<< "[" << id_
<< "]::verifyFinal() expected the starting signal not to be recorded";
} else {
EXPECT_THAT(counters_->startings_.load(), Eq(1))
<< "[" << id_
<< "]::verifyFinal() expected the starting signal to be recorded once";
}
EXPECT_THAT(counters_->finallys_.load(), Eq(1))
<< "[" << id_
<< "]::verifyFinal() expected the finally signal to be recorded once";
}
};
} // namespace detail
using ReceiverSignals =
detail::ReceiverSignals_<mi::receiver<>>;
using FlowReceiverSignals =
detail::ReceiverSignals_<mi::flow_receiver<>>;
TEST(EmptySourceEmptyTriggerTrampoline, TakeUntil) {
auto e = op::flow_from(std::array<int, 0>{}, mi::trampolines);
FlowReceiverSignals source{"source"};
FlowReceiverSignals trigger{"trigger"};
ReceiverSignals each{"each"};
e | op::tap(source) |
op::take_until(mi::trampolines, e | op::tap(trigger)) |
op::for_each(each);
source.wait();
source.verifyValues(0);
source.verifyDones();
source.verifyFinal();
trigger.wait();
trigger.verifyValues(0);
trigger.verifyDones();
trigger.verifyFinal();
each.wait();
each.verifyValues(0);
each.verifyDones();
each.verifyFinal();
}
TEST(EmptySourceEmptyTrigger, TakeUntil) {
auto nt = mi::new_thread();
auto e = op::flow_from(std::array<int, 0>{}, mi::strands(nt));
FlowReceiverSignals source{"source"};
FlowReceiverSignals trigger{"trigger"};
ReceiverSignals each{"each"};
e | op::tap(source) |
op::take_until(mi::strands(nt), e | op::tap(trigger)) |
op::for_each(each);
source.wait();
source.verifyValues(0);
source.verifyDones();
source.verifyFinal();
trigger.wait();
trigger.verifyValues(0);
trigger.verifyDones();
trigger.verifyFinal();
each.wait();
each.verifyValues(0);
each.verifyDones();
each.verifyFinal();
}
TEST(EmptySourceValueTrigger, TakeUntil) {
auto nt = mi::new_thread();
auto e = op::flow_from(std::array<int, 0>{}, mi::strands(nt));
auto v = op::flow_from(std::array<int, 1>{{42}}, mi::strands(nt));
FlowReceiverSignals source{"source"};
FlowReceiverSignals trigger{"trigger"};
ReceiverSignals each{"each"};
e | op::tap(source) |
op::take_until(mi::strands(nt), v | op::tap(trigger)) |
op::for_each(each);
source.wait();
source.verifyValues(0);
source.verifyDones();
source.verifyFinal();
trigger.wait();
trigger.verifyValues(1);
trigger.verifyDones();
trigger.verifyFinal();
each.wait();
each.verifyValues(0);
each.verifyDones();
each.verifyFinal();
}
TEST(ValueSourceEmptyTrigger, TakeUntil) {
auto nt = mi::new_thread();
auto e = op::flow_from(std::array<int, 0>{}, mi::strands(nt));
auto v = op::flow_from(std::array<int, 1>{{42}}, mi::strands(nt));
FlowReceiverSignals source{"source"};
FlowReceiverSignals trigger{"trigger"};
ReceiverSignals each{"each"};
v | op::tap(source) |
op::take_until(mi::strands(nt), e | op::tap(trigger)) |
op::for_each(each);
source.wait();
source.verifyDones();
source.verifyFinal();
trigger.wait();
trigger.verifyValues(0);
trigger.verifyDones();
trigger.verifyFinal();
each.wait();
each.verifyValues(0);
each.verifyDones();
each.verifyFinal();
}
TEST(ValueSourceValueTrigger, TakeUntil) {
auto nt = mi::new_thread();
auto v = op::flow_from(std::array<int, 1>{{42}}, mi::strands(nt));
FlowReceiverSignals source{"source"};
FlowReceiverSignals trigger{"trigger"};
ReceiverSignals each{"each"};
v | op::tap(source) |
op::take_until(mi::strands(nt), v | op::tap(trigger)) |
op::for_each(each);
source.wait();
source.verifyDones();
source.verifyFinal();
trigger.wait();
trigger.verifyValues(1);
trigger.verifyDones();
trigger.verifyFinal();
each.wait();
each.verifyValues(0);
each.verifyDones();
each.verifyFinal();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment