Skip to content

Commit

Permalink
✨ Add incite_on
Browse files Browse the repository at this point in the history
Problem:
- When a sender incites a scheduler that comes after it, `continue_on` is the
  wrong thing to use.

Solution:
- Add `incite_on` which entails a sender that causes a scheduler to proceed.
  • Loading branch information
elbeno committed Oct 8, 2024
1 parent c33c268 commit 87eaf5c
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 0 deletions.
45 changes: 45 additions & 0 deletions docs/sender_adaptors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,51 @@ auto transferred = async::then(t, [] (int i) { return std::to_string(i); });
// then on s2 it will convert 42 to a string, producing "42"
----

WARNING: If the upstream sender _causes_ the downstream scheduler to run, using
`continue_on` is incorrect: xref:sender_adaptors.adoc#_incite_on[`incite_on`]
should be used instead.

=== `incite_on`

Found in the header: `async/incite_on.hpp`

`incite_on` is like xref:sender_adaptors.adoc#_continue_on[`continue_on`] but is
for the use case where running a sender causes a scheduler to be triggered. It
is intended for use with xref:schedulers.adoc#_trigger_scheduler[`trigger_scheduler`].

[source,cpp]
----
auto sndr = async::just([] { send_request(); });
| async::incite_on(trigger_scheduler<"msg">{})
| async::then([] (auto msg) { /* handle response */ });
auto on_recv_message(auto msg) {
async::run_triggers<"msg">(msg);
}
// when sndr runs:
// send_message will send a request
// time passes...
// when the response is received, its handler calls run_triggers
// sndr continues on the trigger scheduler
----

The sender upstream of `incite_on` must complete by sending a function. When
that function is called it will in some way cause the downstream scheduled
sender to run. This may happen indirectly (e.g. as above, via another
asynchronous mechanism like message reception), or directly. The upstream sender
must complete successfully in one way -- with this function -- although it may
still send an error, or be cancelled.

`continue_on` would be incorrect in this circumstance, because once the `just`
completes and sends a message, `continue_on(trigger_scheduler{})` is racing with
message reception. If the message is received before the `trigger_scheduler` is
ready to fire, the trigger would be missed.

NOTE: The incited scheduler must produce a sender which completes
asynchronously. A synchronous scheduler would require no incitement, and
`continue_on` would be correct.

=== `let_error`

Found in the header: `async/let_error.hpp`
Expand Down
4 changes: 4 additions & 0 deletions docs/synopsis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ in pipe-composition syntax.
==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/get_scheduler.hpp[get_scheduler.hpp]
* `get_scheduler` - a query used to retrieve a scheduler from a sender's attributes

==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/incite_on.hpp[incite_on.hpp]
* `incite_on` - a xref:sender_adaptors.adoc#_incite_on[sender adaptor] that incites execution on another scheduler

==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/just.hpp[just.hpp]
* `just` - a xref:sender_factories.adoc#_just[sender factory] that sends on the value channel
* `just_error` - a xref:sender_factories.adoc#_just_error[sender factory] that sends on the error channel
Expand Down Expand Up @@ -220,6 +223,7 @@ contains traits and metaprogramming constructs used by many senders.
* xref:environments.adoc#_environments[`get_env`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/env.hpp[`#include <async/env.hpp>`]
* `get_scheduler` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[`#include <async/read_env.hpp>`]
* `get_stop_token` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[`#include <async/read_env.hpp>`]
* xref:sender_adaptors.adoc#_incite_on[`incite_on`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/incite_on.hpp[`#include <async/incite_on.hpp>`]
* xref:debug.adoc#_handling_a_debug_signal[`injected_debug_handler<>`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/debug.hpp[`#include <async/debug.hpp>`]
* `injected_task_manager<>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/task_manager_interface.hpp[`#include <async/schedulers/task_manager_interface.hpp>`]
* `injected_timer_manager<>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/timer_manager_interface.hpp[`#include <async/schedulers/timer_manager_interface.hpp>`]
Expand Down
1 change: 1 addition & 0 deletions include/async/continue_on.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <async/compose.hpp>
#include <async/concepts.hpp>
#include <async/just.hpp>
#include <async/let_value.hpp>
Expand Down
183 changes: 183 additions & 0 deletions include/async/incite_on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#pragma once

#include <async/completes_synchronously.hpp>
#include <async/completion_tags.hpp>
#include <async/compose.hpp>
#include <async/concepts.hpp>
#include <async/connect.hpp>
#include <async/debug.hpp>
#include <async/env.hpp>
#include <async/type_traits.hpp>

#include <stdx/concepts.hpp>
#include <stdx/ct_string.hpp>
#include <stdx/functional.hpp>
#include <stdx/tuple.hpp>

#include <concepts>
#include <type_traits>
#include <utility>
#include <variant>

namespace async {
namespace _incite_on {
template <typename Ops, typename Rcvr> struct receiver {
using is_receiver = void;

Ops *ops;

[[nodiscard]] constexpr auto
query(async::get_env_t) const -> forwarding_env<env_of_t<Rcvr>> {
return forward_env_of(ops->rcvr);
}

template <typename F> auto set_value(F &&f) const && -> void {
static_assert(stdx::invocable<std::remove_cvref_t<F>>,
"Sender passed to incite_on must send a function "
"that incites the scheduler!");
ops->complete_first(std::forward<F>(f));
}
template <typename... Args>
auto set_error(Args &&...args) const && -> void {
ops->template passthrough<set_error_t>(std::forward<Args>(args)...);
}
auto set_stopped() const && -> void {
ops->template passthrough<set_stopped_t>();
}
};

template <typename S, typename Tag, typename E>
concept single_sender =
stdx::tuple_size_v<typename async::detail::gather_signatures<
Tag, completion_signatures_of_t<S, E>, stdx::tuple, stdx::tuple>> <= 1;

template <typename Sched, typename Rcvr, typename Sndr>
// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions)
struct op_state {
using first_rcvr = receiver<op_state, Rcvr>;
static_assert(single_sender<Sndr, set_value_t, env_of_t<Rcvr>>,
"Sender passed to incite_on must send a single function "
"that incites the scheduler!");

template <stdx::same_as_unqualified<Sched> Sch,
stdx::same_as_unqualified<Sndr> S, typename R>
constexpr op_state(Sch &&sch, S &&s, R &&r)
: sched{std::forward<Sch>(sch)}, rcvr{std::forward<R>(r)},
state{std::in_place_index<0>, stdx::with_result_of{[&] {
return connect(std::forward<S>(s), first_rcvr{this});
}}} {}
constexpr op_state(op_state &&) = delete;

template <typename F> auto complete_first(F &&f) -> void {
debug_signal<set_value_t::name, debug::erased_context_for<op_state>>(
get_env(rcvr));
auto &op = state.template emplace<1>(stdx::with_result_of{
[&] { return connect(sched.schedule(), std::move(rcvr)); }});
async::start(op);
std::forward<F>(f)();
}

template <channel_tag Tag, typename... Args>
auto passthrough(Args &&...args) -> void {
debug_signal<Tag::name, debug::erased_context_for<op_state>>(
get_env(rcvr));
Tag{}(std::move(rcvr), std::forward<Args>(args)...);
}

constexpr auto start() & -> void {
debug_signal<"start", debug::erased_context_for<op_state>>(
get_env(rcvr));
async::start(std::get<0>(state));
}

[[no_unique_address]] Sched sched;
[[no_unique_address]] Rcvr rcvr;

using dependent_sender = decltype(sched.schedule());
using first_ops = connect_result_t<Sndr, first_rcvr>;
using second_ops = connect_result_t<dependent_sender, Rcvr>;
std::variant<first_ops, second_ops> state;
};

template <typename Sched, typename S> struct sender {
using is_sender = void;

[[no_unique_address]] Sched sched;
[[no_unique_address]] S s;

private:
using dependent_sender = decltype(sched.schedule());
static_assert(
not synchronous<dependent_sender>,
"The scheduler passed to incite_on cannot have a synchronous sender!");

template <typename Env>
using dependent_completions = stdx::conditional_t<
boost::mp11::mp_empty<value_signatures_of_t<S, Env>>::value,
completion_signatures<>,
completion_signatures_of_t<dependent_sender, Env>>;
template <typename Env>
using unchanged_completions =
boost::mp11::mp_append<error_signatures_of_t<S, Env>,
stopped_signatures_of_t<S, Env>>;

public:
template <async::receiver R>
[[nodiscard]] constexpr auto
connect(R &&r) && -> op_state<Sched, std::remove_cvref_t<R>, S> {
check_connect<sender &&, R>();
return {std::move(sched), std::move(s), std::forward<R>(r)};
}

template <async::receiver R>
requires multishot_sender<S> and std::copy_constructible<S> and
std::copy_constructible<Sched>
[[nodiscard]] constexpr auto
connect(R &&r) const & -> op_state<Sched, std::remove_cvref_t<R>, S> {
check_connect<sender, R>();
return {sched, s, std::forward<R>(r)};
}

template <typename Env>
[[nodiscard]] constexpr static auto get_completion_signatures(Env const &)
-> boost::mp11::mp_unique<boost::mp11::mp_append<
unchanged_completions<Env>, dependent_completions<Env>>> {
return {};
}
};

template <typename Sched> struct pipeable {
Sched sched;

private:
template <async::sender S, stdx::same_as_unqualified<pipeable> Self>
friend constexpr auto operator|(S &&s, Self &&self) -> async::sender auto {
return sender<Sched, std::remove_cvref_t<S>>{
std::forward<Self>(self).sched, std::forward<S>(s)};
}
};
} // namespace _incite_on

template <typename Sched>
[[nodiscard]] constexpr auto incite_on(Sched &&sched) {
return _compose::adaptor{_incite_on::pipeable<std::remove_cvref_t<Sched>>{
std::forward<Sched>(sched)}};
}

template <sender S, typename Sched>
[[nodiscard]] constexpr auto incite_on(S &&s, Sched &&sched) -> sender auto {
return std::forward<S>(s) | incite_on(std::forward<Sched>(sched));
}

struct incite_on_t;

template <typename... Ts>
struct debug::context_for<_incite_on::op_state<Ts...>> {
using tag = incite_on_t;
constexpr static auto name = stdx::ct_string{"incite_on"};
using type = _incite_on::op_state<Ts...>;
using children =
stdx::type_list<debug::erased_context_for<typename type::first_ops>,
debug::erased_context_for<typename type::second_ops>>;
};
} // namespace async
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ add_tests(
forwarding_query
freestanding_sync_wait
hosted_sync_wait
incite_on
just
just_error
just_error_result_of
Expand Down
5 changes: 5 additions & 0 deletions test/fail/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
add_compile_fail_test("incite_on_multiple_completions.cpp" LIBRARIES async
pthread)
add_compile_fail_test("incite_on_synchronous_scheduler.cpp" LIBRARIES async
pthread)
add_compile_fail_test("incite_on_without_function.cpp" LIBRARIES async pthread)
add_compile_fail_test("just_connect.cpp" LIBRARIES async pthread)
add_compile_fail_test("just_error_connect.cpp" LIBRARIES async pthread)
add_compile_fail_test("just_error_result_of_connect.cpp" LIBRARIES async
Expand Down
16 changes: 16 additions & 0 deletions test/fail/incite_on_multiple_completions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include <async/concepts.hpp>
#include <async/connect.hpp>
#include <async/incite_on.hpp>
#include <async/just.hpp>
#include <async/schedulers/trigger_scheduler.hpp>
#include <async/variant_sender.hpp>

// EXPECT: Sender passed to incite_on must send a single function

auto main() -> int {
auto const v = async::make_variant_sender(
true, [] { return async::just(42); }, [] { return async::just(1.0f); });
auto const s = async::incite_on(v, async::trigger_scheduler<"basic">{});
[[maybe_unused]] auto op =
async::connect(s, async::detail::universal_receiver{});
}
10 changes: 10 additions & 0 deletions test/fail/incite_on_synchronous_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include <async/incite_on.hpp>
#include <async/just.hpp>
#include <async/schedulers/inline_scheduler.hpp>

// EXPECT: The scheduler passed to incite_on cannot have a synchronous sender

auto main() -> int {
[[maybe_unused]] auto const s =
async::incite_on(async::just([] {}), async::inline_scheduler{});
}
12 changes: 12 additions & 0 deletions test/fail/incite_on_without_function.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include <async/incite_on.hpp>
#include <async/just.hpp>
#include <async/schedulers/trigger_scheduler.hpp>
#include <async/sync_wait.hpp>

// EXPECT: Sender passed to incite_on must send a function

auto main() -> int {
auto const s =
async::incite_on(async::just(42), async::trigger_scheduler<"basic">{});
auto x = async::sync_wait(s);
}
Loading

0 comments on commit 87eaf5c

Please sign in to comment.