Skip to content

Commit

Permalink
Add an internal receiver to v2::async_scope's nest op (#484)
Browse files Browse the repository at this point in the history
While writing `unifex::spawn_future()`, I discovered that waiting until
the destructor of the `v2::async_scope`'s `nest()` operation to drop the
scope reference is too late (it led to hangs).  This diff adds an
internal receiver to the nest operation so we can detect when the
operation is complete (which is likely before the operation state is
destroyed) and drop the reference as soon as we reach that state.
  • Loading branch information
ispeters authored Feb 1, 2023
1 parent 3c810a5 commit fb13829
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 29 deletions.
103 changes: 83 additions & 20 deletions include/unifex/v2/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,55 +191,108 @@ struct _nest_op final {
template <typename Sender, typename Receiver>
using nest_op = typename _nest_op<Sender, Receiver>::type;

template <typename Sender, typename Receiver>
struct _nest_receiver final {
struct type;
};

template <typename Sender, typename Receiver>
using nest_receiver = typename _nest_receiver<Sender, Receiver>::type;

template <typename Sender, typename Receiver>
struct _nest_op<Sender, Receiver>::type final {
template <typename Sender2, typename Receiver2>
explicit type(Sender2&& s, Receiver2&& r, scope_reference&& scope) noexcept(
is_nothrow_connectable_v<Sender2, Receiver2>)
: scope_(std::move(scope)) {
std::is_nothrow_constructible_v<Receiver, Receiver2>&&
is_nothrow_connectable_v<Sender2, nest_receiver<Sender, Receiver>>)
: scope_(std::move(scope))
, receiver_(static_cast<Receiver2&&>(r)) {
UNIFEX_ASSERT(scope_);
activate_union_member_with(op_, [&]() {
return unifex::connect(
static_cast<Sender2&&>(s), static_cast<Receiver2&&>(r));
static_cast<Sender2&&>(s), nest_receiver<Sender, Receiver>{this});
});
}

explicit type(Receiver&& r) noexcept(
std::is_nothrow_move_constructible_v<Receiver>) {
activate_union_member(receiver_, std::move(r));
}
std::is_nothrow_move_constructible_v<Receiver>)
: receiver_(std::move(r)) {}

explicit type(const Receiver& r) noexcept(
std::is_nothrow_copy_constructible_v<Receiver>) {
activate_union_member(receiver_, r);
}
std::is_nothrow_copy_constructible_v<Receiver>)
: receiver_(r) {}

type(type&& op) = delete;

~type() {
if (scope_) {
deactivate_union_member(op_);
} else {
deactivate_union_member(receiver_);
op_.destruct();
}
}

friend void tag_invoke(tag_t<start>, type& op) noexcept {
if (op.scope_) {
unifex::start(op.op_.get());
} else {
unifex::set_done(std::move(op).receiver_.get());
unifex::set_done(std::move(op).receiver_);
}
}

private:
using op_t = connect_result_t<Sender, Receiver>;
using op_t = connect_result_t<Sender, nest_receiver<Sender, Receiver>>;

scope_reference scope_;
union {
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<Receiver> receiver_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<op_t> op_;
};
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<op_t> op_;
};

template <typename Sender, typename Receiver>
struct _nest_receiver<Sender, Receiver>::type final {
nest_op<Sender, Receiver>* op_;

template <typename... T>
void set_value(T... values) noexcept {
complete([&](auto&& receiver) noexcept {
UNIFEX_TRY {
unifex::set_value(std::move(receiver), std::move(values)...);
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver), std::current_exception());
}
});
}

template <typename E>
void set_error(E e) noexcept {
complete([&](auto&& receiver) noexcept {
unifex::set_error(std::move(receiver), std::move(e));
});
}

void set_done() noexcept { complete(unifex::set_done); }

template <typename Func>
void complete(Func func) noexcept {
// save this->op_ into a local because we're about to destroy the current
// object, invalidating the this pointer
auto op = op_;

// keep a strong reference on the scope until this function returns
auto scope = std::move(op->scope_);

// we're done with the inner operation; note: this call destroys the current
// object, which is why we've saved this->op_ into a local
op->op_.destruct();

// from here the current object may be destroyed
func(std::move(op->receiver_));
}

template(typename CPO) //
(requires is_receiver_query_cpo_v<CPO>) //
friend auto tag_invoke(CPO&& cpo, const type& r) noexcept
-> decltype(std::move(cpo)(std::declval<const Receiver&>())) {
return std::move(cpo)(r.op_->receiver_);
}
};

template <typename Sender>
Expand All @@ -251,8 +304,18 @@ struct _nest_sender<Sender>::type final {
class Tuple>
using value_types = sender_value_types_t<Sender, Variant, Tuple>;

// we add exception_ptr to our predecessor's error_types because our
// set_value() catches exceptions thrown by our receiver's set_value() and
// passes them to our receiver's set_error.
//
// TODO: not all predecessors invoke our set_value() (e.g. just_error() and
// just_done()) so we actually only need this when our predecessor's
// value_types is non-empty but I don't know how to do that computation
// right now.
template <template <typename...> class Variant>
using error_types = sender_error_types_t<Sender, Variant>;
using error_types = typename concat_type_lists_unique_t<
sender_error_types_t<Sender, type_list>,
type_list<std::exception_ptr>>::template apply<Variant>;

static constexpr bool sends_done = true;

Expand Down
26 changes: 17 additions & 9 deletions test/async_scope_v2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include <unifex/v2/async_scope.hpp>

#include <unifex/allocate.hpp>
#include <unifex/just.hpp>
#include <unifex/just_done.hpp>
#include <unifex/just_error.hpp>
Expand Down Expand Up @@ -184,8 +185,11 @@ TEST_F(

// values_types is just the nested sender's value_types
static_assert(std::is_same_v<value_types, variant<>>);
// error_types is the nested sender's error_types
static_assert(std::is_same_v<error_types, variant<int>>);
// error_types is the nested sender's error_types + std::exception_ptr
//
// TODO: we don't actually need std::exception_ptr here (see TODO in
// v2/async_scope.hpp)
static_assert(std::is_same_v<error_types, variant<int, std::exception_ptr>>);
// sends_done is always true because the sender completes with done if nesting
// fails
static_assert(sender_t::sends_done);
Expand All @@ -208,8 +212,11 @@ TEST_F(async_scope_v2_test, nest_of_just_done_has_expected_static_properties) {

// values_types is just the nested sender's value_types
static_assert(std::is_same_v<value_types, variant<>>);
// error_types is the nested sender's error_types
static_assert(std::is_same_v<error_types, variant<>>);
// error_types is the nested sender's error_types + std::exception_ptr
//
// TODO: we don't actually need std::exception_ptr here (see TODO in
// v2/async_scope.hpp)
static_assert(std::is_same_v<error_types, variant<std::exception_ptr>>);
// sends_done is always true because the sender completes with done if nesting
// fails
static_assert(sender_t::sends_done);
Expand All @@ -233,13 +240,13 @@ TEST_F(
using just_sender_t = decltype(unifex::just(newtype{}));

// MSVC incorrectly fails these assertions
#ifndef _MSC_VER
# ifndef _MSC_VER
// just_sender_t has a throwing move constructor...
static_assert(!noexcept(scope.nest(std::declval<just_sender_t>())));

// ...and a throwing copy constructor
static_assert(!noexcept(scope.nest(std::declval<just_sender_t&>())));
#endif
# endif
auto sender = scope.nest(unifex::just(newtype{}));

using sender_t = decltype(sender);
Expand All @@ -254,10 +261,10 @@ TEST_F(
// fails
static_assert(sender_t::sends_done);
// MSVC incorrectly fails these assertions
#ifndef _MSC_VER
# ifndef _MSC_VER
static_assert(!std::is_nothrow_move_constructible_v<sender_t>);
static_assert(!std::is_nothrow_copy_constructible_v<sender_t>);
#endif
# endif
}
#endif

Expand Down Expand Up @@ -434,7 +441,8 @@ TEST_F(
// TODO: factor this in terms of let_error so we can check this logic even
// when exceptions are disabled
try {
unifex::sync_wait(scope.nest(unifex::just_error(42)));
// allocate the nested sender to help catch lifetime bugs with ASAN
unifex::sync_wait(scope.nest(unifex::allocate(unifex::just_error(42))));
} catch (int i) {
EXPECT_EQ(42, i);
} catch (...) {
Expand Down
23 changes: 23 additions & 0 deletions test/nest_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,27 @@ TEST(nest_test, nest_has_the_expected_noexcept_clause) {
}
#endif // !defined(__GNUC__) || __GNUC__ > 9

TEST(nest_test, nest_operation_drops_scope_reference_on_completion) {
struct receiver {
void set_value() noexcept {}
void set_error(std::exception_ptr) noexcept {}
void set_done() noexcept {}
};

unifex::v2::async_scope scope;

{
auto op = unifex::connect(unifex::nest(unifex::just(), scope), receiver{});

EXPECT_EQ(scope.use_count(), 1);

unifex::start(op);

// the operation is fully synchronous so it's done by now
EXPECT_EQ(scope.use_count(), 0);
}

unifex::sync_wait(scope.join());
}

} // namespace

0 comments on commit fb13829

Please sign in to comment.