diff --git a/docs/sender_adaptors.adoc b/docs/sender_adaptors.adoc index 060cc2e..3b9881e 100644 --- a/docs/sender_adaptors.adoc +++ b/docs/sender_adaptors.adoc @@ -18,6 +18,51 @@ auto transferred = async::then(t, [] (int i) { return std::to_string(i); }); // then on s2 it will convert 42 to a string, producing "42" ---- +WARNING: If the upstream sender _causes_ the downstream scheduler to run, using +`continue_on` is incorrect: xref:sender_adaptors.adoc#_incite_on[`incite_on`] +should be used instead. + +=== `incite_on` + +Found in the header: `async/incite_on.hpp` + +`incite_on` is like xref:sender_adaptors.adoc#_continue_on[`continue_on`] but is +for the use case where running a sender causes a scheduler to be triggered. It +is intended for use with xref:schedulers.adoc#_trigger_scheduler[`trigger_scheduler`]. + +[source,cpp] +---- +auto sndr = async::just([] { send_request(); }); + | async::incite_on(trigger_scheduler<"msg">{}) + | async::then([] (auto msg) { /* handle response */ }); + +auto on_recv_message(auto msg) { + async::run_triggers<"msg">(msg); +} + +// when sndr runs: +// send_message will send a request +// time passes... +// when the response is received, its handler calls run_triggers +// sndr continues on the trigger scheduler +---- + +The sender upstream of `incite_on` must complete by sending a function. When +that function is called it will in some way cause the downstream scheduled +sender to run. This may happen indirectly (e.g. as above, via another +asynchronous mechanism like message reception), or directly. The upstream sender +must complete successfully in one way -- with this function -- although it may +still send an error, or be cancelled. + +`continue_on` would be incorrect in this circumstance, because once the `just` +completes and sends a message, `continue_on(trigger_scheduler{})` is racing with +message reception. If the message is received before the `trigger_scheduler` is +ready to fire, the trigger would be missed. + +NOTE: The incited scheduler must produce a sender which completes +asynchronously. A synchronous scheduler would require no incitement, and +`continue_on` would be correct. + === `let_error` Found in the header: `async/let_error.hpp` diff --git a/docs/synopsis.adoc b/docs/synopsis.adoc index 267f1e5..a142e1f 100644 --- a/docs/synopsis.adoc +++ b/docs/synopsis.adoc @@ -58,6 +58,9 @@ in pipe-composition syntax. ==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/get_scheduler.hpp[get_scheduler.hpp] * `get_scheduler` - a query used to retrieve a scheduler from a sender's attributes +==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/incite_on.hpp[incite_on.hpp] +* `incite_on` - a xref:sender_adaptors.adoc#_incite_on[sender adaptor] that incites execution on another scheduler + ==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/just.hpp[just.hpp] * `just` - a xref:sender_factories.adoc#_just[sender factory] that sends on the value channel * `just_error` - a xref:sender_factories.adoc#_just_error[sender factory] that sends on the error channel @@ -220,6 +223,7 @@ contains traits and metaprogramming constructs used by many senders. * xref:environments.adoc#_environments[`get_env`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/env.hpp[`#include `] * `get_scheduler` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[`#include `] * `get_stop_token` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[`#include `] +* xref:sender_adaptors.adoc#_incite_on[`incite_on`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/incite_on.hpp[`#include `] * xref:debug.adoc#_handling_a_debug_signal[`injected_debug_handler<>`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/debug.hpp[`#include `] * `injected_task_manager<>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/task_manager_interface.hpp[`#include `] * `injected_timer_manager<>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/timer_manager_interface.hpp[`#include `] diff --git a/include/async/continue_on.hpp b/include/async/continue_on.hpp index 1346286..5b994e8 100644 --- a/include/async/continue_on.hpp +++ b/include/async/continue_on.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include diff --git a/include/async/incite_on.hpp b/include/async/incite_on.hpp new file mode 100644 index 0000000..872ea85 --- /dev/null +++ b/include/async/incite_on.hpp @@ -0,0 +1,183 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace async { +namespace _incite_on { +template struct receiver { + using is_receiver = void; + + Ops *ops; + + [[nodiscard]] constexpr auto + query(async::get_env_t) const -> forwarding_env> { + return forward_env_of(ops->rcvr); + } + + template auto set_value(F &&f) const && -> void { + static_assert(stdx::invocable>, + "Sender passed to incite_on must send a function " + "that incites the scheduler!"); + ops->complete_first(std::forward(f)); + } + template + auto set_error(Args &&...args) const && -> void { + ops->template passthrough(std::forward(args)...); + } + auto set_stopped() const && -> void { + ops->template passthrough(); + } +}; + +template +concept single_sender = + stdx::tuple_size_v, stdx::tuple, stdx::tuple>> <= 1; + +template +// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions) +struct op_state { + using first_rcvr = receiver; + static_assert(single_sender>, + "Sender passed to incite_on must send a single function " + "that incites the scheduler!"); + + template Sch, + stdx::same_as_unqualified S, typename R> + constexpr op_state(Sch &&sch, S &&s, R &&r) + : sched{std::forward(sch)}, rcvr{std::forward(r)}, + state{std::in_place_index<0>, stdx::with_result_of{[&] { + return connect(std::forward(s), first_rcvr{this}); + }}} {} + constexpr op_state(op_state &&) = delete; + + template auto complete_first(F &&f) -> void { + debug_signal>( + get_env(rcvr)); + auto &op = state.template emplace<1>(stdx::with_result_of{ + [&] { return connect(sched.schedule(), std::move(rcvr)); }}); + async::start(op); + std::forward(f)(); + } + + template + auto passthrough(Args &&...args) -> void { + debug_signal>( + get_env(rcvr)); + Tag{}(std::move(rcvr), std::forward(args)...); + } + + constexpr auto start() & -> void { + debug_signal<"start", debug::erased_context_for>( + get_env(rcvr)); + async::start(std::get<0>(state)); + } + + [[no_unique_address]] Sched sched; + [[no_unique_address]] Rcvr rcvr; + + using dependent_sender = decltype(sched.schedule()); + using first_ops = connect_result_t; + using second_ops = connect_result_t; + std::variant state; +}; + +template struct sender { + using is_sender = void; + + [[no_unique_address]] Sched sched; + [[no_unique_address]] S s; + + private: + using dependent_sender = decltype(sched.schedule()); + static_assert( + not synchronous, + "The scheduler passed to incite_on cannot have a synchronous sender!"); + + template + using dependent_completions = stdx::conditional_t< + boost::mp11::mp_empty>::value, + completion_signatures<>, + completion_signatures_of_t>; + template + using unchanged_completions = + boost::mp11::mp_append, + stopped_signatures_of_t>; + + public: + template + [[nodiscard]] constexpr auto + connect(R &&r) && -> op_state, S> { + check_connect(); + return {std::move(sched), std::move(s), std::forward(r)}; + } + + template + requires multishot_sender and std::copy_constructible and + std::copy_constructible + [[nodiscard]] constexpr auto + connect(R &&r) const & -> op_state, S> { + check_connect(); + return {sched, s, std::forward(r)}; + } + + template + [[nodiscard]] constexpr static auto get_completion_signatures(Env const &) + -> boost::mp11::mp_unique, dependent_completions>> { + return {}; + } +}; + +template struct pipeable { + Sched sched; + + private: + template Self> + friend constexpr auto operator|(S &&s, Self &&self) -> async::sender auto { + return sender>{ + std::forward(self).sched, std::forward(s)}; + } +}; +} // namespace _incite_on + +template +[[nodiscard]] constexpr auto incite_on(Sched &&sched) { + return _compose::adaptor{_incite_on::pipeable>{ + std::forward(sched)}}; +} + +template +[[nodiscard]] constexpr auto incite_on(S &&s, Sched &&sched) -> sender auto { + return std::forward(s) | incite_on(std::forward(sched)); +} + +struct incite_on_t; + +template +struct debug::context_for<_incite_on::op_state> { + using tag = incite_on_t; + constexpr static auto name = stdx::ct_string{"incite_on"}; + using type = _incite_on::op_state; + using children = + stdx::type_list, + debug::erased_context_for>; +}; +} // namespace async diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9f89407..57ef80a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -28,6 +28,7 @@ add_tests( forwarding_query freestanding_sync_wait hosted_sync_wait + incite_on just just_error just_error_result_of diff --git a/test/fail/CMakeLists.txt b/test/fail/CMakeLists.txt index c47d734..698abc2 100644 --- a/test/fail/CMakeLists.txt +++ b/test/fail/CMakeLists.txt @@ -1,3 +1,8 @@ +add_compile_fail_test("incite_on_multiple_completions.cpp" LIBRARIES async + pthread) +add_compile_fail_test("incite_on_synchronous_scheduler.cpp" LIBRARIES async + pthread) +add_compile_fail_test("incite_on_without_function.cpp" LIBRARIES async pthread) add_compile_fail_test("just_connect.cpp" LIBRARIES async pthread) add_compile_fail_test("just_error_connect.cpp" LIBRARIES async pthread) add_compile_fail_test("just_error_result_of_connect.cpp" LIBRARIES async diff --git a/test/fail/incite_on_multiple_completions.cpp b/test/fail/incite_on_multiple_completions.cpp new file mode 100644 index 0000000..9913a37 --- /dev/null +++ b/test/fail/incite_on_multiple_completions.cpp @@ -0,0 +1,16 @@ +#include +#include +#include +#include +#include +#include + +// EXPECT: Sender passed to incite_on must send a single function + +auto main() -> int { + auto const v = async::make_variant_sender( + true, [] { return async::just(42); }, [] { return async::just(1.0f); }); + auto const s = async::incite_on(v, async::trigger_scheduler<"basic">{}); + [[maybe_unused]] auto op = + async::connect(s, async::detail::universal_receiver{}); +} diff --git a/test/fail/incite_on_synchronous_scheduler.cpp b/test/fail/incite_on_synchronous_scheduler.cpp new file mode 100644 index 0000000..a4513b9 --- /dev/null +++ b/test/fail/incite_on_synchronous_scheduler.cpp @@ -0,0 +1,10 @@ +#include +#include +#include + +// EXPECT: The scheduler passed to incite_on cannot have a synchronous sender + +auto main() -> int { + [[maybe_unused]] auto const s = + async::incite_on(async::just([] {}), async::inline_scheduler{}); +} diff --git a/test/fail/incite_on_without_function.cpp b/test/fail/incite_on_without_function.cpp new file mode 100644 index 0000000..9149cb3 --- /dev/null +++ b/test/fail/incite_on_without_function.cpp @@ -0,0 +1,12 @@ +#include +#include +#include +#include + +// EXPECT: Sender passed to incite_on must send a function + +auto main() -> int { + auto const s = + async::incite_on(async::just(42), async::trigger_scheduler<"basic">{}); + auto x = async::sync_wait(s); +} diff --git a/test/incite_on.cpp b/test/incite_on.cpp new file mode 100644 index 0000000..b047c98 --- /dev/null +++ b/test/incite_on.cpp @@ -0,0 +1,139 @@ +#include "detail/common.hpp" + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include + +TEST_CASE("incite_on basic functionality", "[incite_on]") { + int value{}; + auto const s = + async::incite_on(async::just([] { async::run_triggers<"basic">(); }), + async::trigger_scheduler<"basic">{}); + auto op = async::connect(s, receiver{[&] { value = 42; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on error", "[incite_on]") { + int value{}; + auto const s = async::incite_on(async::just_error(42), + async::trigger_scheduler<"error">{}); + auto op = async::connect(s, error_receiver{[&](auto i) { value = i; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on stopped", "[incite_on]") { + int value{}; + auto const s = async::incite_on(async::just_stopped(), + async::trigger_scheduler<"stopped">{}); + auto op = async::connect(s, stopped_receiver{[&] { value = 42; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on send a value with trigger", "[incite_on]") { + int value{}; + auto const s = + async::incite_on(async::just([] { async::run_triggers<"value">(42); }), + async::trigger_scheduler<"value", int>{}); + auto op = async::connect(s, receiver{[&](auto i) { value = i; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on advertises what it sends", "[incite_on]") { + [[maybe_unused]] auto const s = + async::incite_on(async::just([] { async::run_triggers<"basic">(); }), + async::trigger_scheduler<"basic">{}); + static_assert(async::sender_of); +} + +TEST_CASE("incite_on advertises pass-throughs", "[incite_on]") { + [[maybe_unused]] auto s = async::incite_on( + async::just_error(17), async::trigger_scheduler<"error">{}); + static_assert(async::sender_of); +} + +TEST_CASE("incite_on is pipeable", "[incite_on]") { + int value{}; + auto s = async::just([] { async::run_triggers<"pipeable">(); }) | + async::incite_on(async::trigger_scheduler<"pipeable">{}); + auto op = async::connect(s, receiver{[&] { value = 42; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on is adaptor-pipeable", "[incite_on]") { + int value{}; + auto a = async::then([] { + return [] { async::run_triggers<"adaptor-pipeable">(); }; + }) | + async::incite_on(async::trigger_scheduler<"adaptor-pipeable">{}); + auto op = async::connect(async::just() | a, receiver{[&] { value = 42; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("move-only first sender", "[incite_on]") { + int value{}; + auto s = + async::just([mo = move_only(17)] { async::run_triggers<"mo">(); }) | + async::incite_on(async::trigger_scheduler<"mo">{}); + static_assert(async::singleshot_sender); + auto op = async::connect(std::move(s), receiver{[&] { value = 42; }}); + async::start(op); + CHECK(value == 42); +} + +TEST_CASE("incite_on does not complete synchronously", "[incite_on]") { + auto const s = async::just([] { async::run_triggers<"sync">(); }) | + async::incite_on(async::trigger_scheduler<"sync">{}); + static_assert(not async::synchronous); +} + +namespace { +std::vector debug_events{}; + +struct debug_handler { + template + constexpr auto signal(auto &&...) { + using namespace stdx::literals; + if constexpr (std::is_same_v, + async::incite_on_t>) { + debug_events.push_back( + fmt::format("{} {} {}", C, async::debug::name_of, S)); + } + } +}; +} // namespace + +template <> inline auto async::injected_debug_handler<> = debug_handler{}; + +TEST_CASE("incite_on can be debugged with a string", "[incite_on]") { + using namespace std::string_literals; + debug_events.clear(); + + auto const s = async::just([] { async::run_triggers<"debug">(); }) | + async::incite_on(async::trigger_scheduler<"debug">{}); + auto op = async::connect( + s, with_env{universal_receiver{}, + async::prop{async::get_debug_interface_t{}, + async::debug::named_interface<"op">{}}}); + async::start(op); + CHECK(debug_events == + std::vector{"op incite_on start"s, "op incite_on set_value"s}); +}