Commit c5555259 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

add benchmarks for flow and many

fbshipit-source-id: 46b51ab9bbfb6903a07cd0e5ed006a8dd081975f
parent f5ee09a3
...@@ -9,12 +9,18 @@ ...@@ -9,12 +9,18 @@
#include "pushmi/trampoline.h" #include "pushmi/trampoline.h"
#include "pushmi/new_thread.h" #include "pushmi/new_thread.h"
#include "pushmi/none.h"
#include "pushmi/flow_single.h"
#include "pushmi/flow_single_deferred.h"
#include "pushmi/entangle.h"
#include "pool.h" #include "pool.h"
using namespace pushmi::aliases; using namespace pushmi::aliases;
struct countdownsingle { template<class R>
countdownsingle(int& c) struct countdown {
countdown(int& c)
: counter(&c) {} : counter(&c) {}
int* counter; int* counter;
...@@ -23,14 +29,18 @@ struct countdownsingle { ...@@ -23,14 +29,18 @@ struct countdownsingle {
void operator()(ExecutorRef exec) const; void operator()(ExecutorRef exec) const;
}; };
template<class R>
template <class ExecutorRef> template <class ExecutorRef>
void countdownsingle::operator()(ExecutorRef exec) const { void countdown<R>::operator()(ExecutorRef exec) const {
if (--*counter > 0) { if (--*counter > 0) {
//exec | op::submit(mi::make_single(*this)); exec | op::submit(R{}(*this));
exec | op::submit(mi::single<countdownsingle, mi::abortEF, mi::ignoreDF>{*this});
} }
} }
using countdownsingle = countdown<mi::make_single_fn>;
using countdownflowsingle = countdown<mi::make_flow_single_fn>;
using countdownmany = countdown<mi::make_many_fn>;
struct inline_executor { struct inline_executor {
using properties = mi::property_set<mi::is_sender<>, mi::is_single<>>; using properties = mi::property_set<mi::is_sender<>, mi::is_single<>>;
template<class Out> template<class Out>
...@@ -39,11 +49,62 @@ struct inline_executor { ...@@ -39,11 +49,62 @@ struct inline_executor {
} }
}; };
struct inline_executor_flow_single {
using properties = mi::property_set<mi::is_sender<>, mi::is_flow<>, mi::is_single<>>;
template<class Out>
void submit(Out out) {
// boolean cancellation
bool stop = false;
auto set_stop = [](auto& e) {
auto stop = e.lockPointerToDual();
if (!!stop) {
*stop = true;
}
e.unlockPointerToDual();
};
auto tokens = mi::entangle(stop, set_stop);
using Stopper = decltype(tokens.second);
struct Data : mi::none<> {
explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
Stopper stopper;
};
auto up = mi::MAKE(none)(
Data{std::move(tokens.second)},
[](auto& data, auto e) noexcept {
data.stopper.t(data.stopper);
},
[](auto& data) {
data.stopper.t(data.stopper);
});
// pass reference for cancellation.
::mi::set_starting(out, std::move(up));
if (!tokens.first.t) {
::mi::set_value(out, *this);
} else {
// cancellation is not an error
::mi::set_done(out);
}
}
};
struct inline_executor_many {
using properties = mi::property_set<mi::is_sender<>, mi::is_many<>>;
template<class Out>
void submit(Out out) {
::mi::set_next(out, *this);
::mi::set_done(out);
}
};
#define concept Concept #define concept Concept
#include <nonius/nonius.h++> #include <nonius/nonius.h++>
NONIUS_BENCHMARK("inline 10", [](nonius::chronometer meter){ NONIUS_BENCHMARK("inline 10 single", [](nonius::chronometer meter){
int counter = 0; int counter = 0;
auto ie = inline_executor{}; auto ie = inline_executor{};
using IE = decltype(ie); using IE = decltype(ie);
...@@ -55,6 +116,30 @@ NONIUS_BENCHMARK("inline 10", [](nonius::chronometer meter){ ...@@ -55,6 +116,30 @@ NONIUS_BENCHMARK("inline 10", [](nonius::chronometer meter){
}); });
}) })
NONIUS_BENCHMARK("inline 10 flow_single", [](nonius::chronometer meter){
int counter = 0;
auto ie = inline_executor_flow_single{};
using IE = decltype(ie);
countdownflowsingle flowsingle{counter};
meter.measure([&]{
counter = 10;
ie | op::submit(mi::make_flow_single(on_value(flowsingle)));
return counter;
});
})
NONIUS_BENCHMARK("inline 10 many", [](nonius::chronometer meter){
int counter = 0;
auto ie = inline_executor_many{};
using IE = decltype(ie);
countdownmany many{counter};
meter.measure([&]{
counter = 10;
ie | op::submit(mi::make_many(many));
return counter;
});
})
NONIUS_BENCHMARK("trampoline static derecursion 10", [](nonius::chronometer meter){ NONIUS_BENCHMARK("trampoline static derecursion 10", [](nonius::chronometer meter){
int counter = 0; int counter = 0;
auto tr = mi::trampoline(); auto tr = mi::trampoline();
......
...@@ -3,29 +3,40 @@ target_include_directories(catch PUBLIC ...@@ -3,29 +3,40 @@ target_include_directories(catch PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../external/Catch2/single_include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../external/Catch2/single_include>
$<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include/Catch2>) $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include/Catch2>)
include(../external/Catch2/contrib/Catch.cmake)
if(NOT ${CMAKE_VERSION} VERSION_LESS "3.10.0")
add_executable(FlowTest FlowTest.cpp) add_executable(FlowTest FlowTest.cpp)
target_link_libraries(FlowTest pushmi catch Threads::Threads) target_link_libraries(FlowTest pushmi catch Threads::Threads)
catch_discover_tests(FlowTest)
add_executable(CompileTest CompileTest.cpp) add_executable(CompileTest CompileTest.cpp)
target_link_libraries(CompileTest pushmi catch Threads::Threads) target_link_libraries(CompileTest pushmi catch Threads::Threads)
# catch_discover_tests(CompileTest)
add_executable(NewThreadTest NewThreadTest.cpp) add_executable(NewThreadTest NewThreadTest.cpp)
target_link_libraries(NewThreadTest pushmi catch Threads::Threads) target_link_libraries(NewThreadTest pushmi catch Threads::Threads)
catch_discover_tests(NewThreadTest)
add_executable(TrampolineTest TrampolineTest.cpp) add_executable(TrampolineTest TrampolineTest.cpp)
target_link_libraries(TrampolineTest pushmi catch Threads::Threads) target_link_libraries(TrampolineTest pushmi catch Threads::Threads)
catch_discover_tests(TrampolineTest)
add_executable(PushmiTest PushmiTest.cpp) add_executable(PushmiTest PushmiTest.cpp)
target_link_libraries(PushmiTest pushmi catch Threads::Threads) target_link_libraries(PushmiTest pushmi catch Threads::Threads)
catch_discover_tests(PushmiTest)
if(NOT ${CMAKE_VERSION} VERSION_LESS "3.10.0") else()
include(../external/Catch2/contrib/Catch.cmake) add_executable(PushmiTest PushmiTest.cpp
catch_discover_tests(FlowTest) CompileTest.cpp
# catch_discover_tests(CompileTest) TrampolineTest.cpp
catch_discover_tests(NewThreadTest) NewThreadTest.cpp
catch_discover_tests(TrampolineTest) FlowTest.cpp)
target_link_libraries(PushmiTest pushmi catch Threads::Threads)
catch_discover_tests(PushmiTest) catch_discover_tests(PushmiTest)
endif() endif()
...@@ -273,6 +273,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -273,6 +273,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
}); });
WHEN("submit is applied and cancels the producer early") { WHEN("submit is applied and cancels the producer early") {
{
f | f |
op::blocking_submit( op::blocking_submit(
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
...@@ -287,6 +288,10 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -287,6 +288,10 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
::mi::set_done(up); ::mi::set_done(up);
}); });
})); }));
}
// make sure that the completion signal arrives
std::this_thread::sleep_for(100ms);
THEN( THEN(
"the starting, up.done and out.done signals are each recorded once") { "the starting, up.done and out.done signals are each recorded once") {
...@@ -295,6 +300,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -295,6 +300,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
} }
WHEN("submit is applied and cancels the producer late") { WHEN("submit is applied and cancels the producer late") {
{
f | f |
op::blocking_submit( op::blocking_submit(
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
...@@ -309,6 +315,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -309,6 +315,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
::mi::set_done(up); ::mi::set_done(up);
}); });
})); }));
}
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
...@@ -328,6 +335,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -328,6 +335,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
signals = 0; signals = 0;
// set completion time to be in 100ms // set completion time to be in 100ms
at = nt.now() + 100ms; at = nt.now() + 100ms;
{
f | f |
op::blocking_submit( op::blocking_submit(
mi::on_value([&](int) { signals += 100; }), mi::on_value([&](int) { signals += 100; }),
...@@ -340,6 +348,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") { ...@@ -340,6 +348,7 @@ SCENARIO("flow single cancellation new thread", "[flow][deferred]") {
::mi::set_done(up); ::mi::set_done(up);
}); });
})); }));
}
// make sure any cancellation signal has completed // make sure any cancellation signal has completed
std::this_thread::sleep_for(10ms); std::this_thread::sleep_for(10ms);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment