diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 13630c93..18dd0de7 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -112,6 +112,8 @@ ERL_NIF_TERM parse_conn_event_mask(ErlNifEnv *env, ERL_NIF_TERM eoptions, QuicerConnCTX *c_ctx); +QUIC_STATUS selected_owner_unreachable(QuicerStreamCTX *s_ctx); + ERL_NIF_TERM peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) { @@ -1403,22 +1405,15 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, ERL_NIF_TERM props_value[] = { enif_make_uint(env, Event->PEER_STREAM_STARTED.Flags), ATOM_BOOLEAN(is_orphan) }; + ERL_NIF_TERM eHandle = enif_make_copy(env, s_ctx->eHandle); - ERL_NIF_TERM report - = make_event_with_props(env, - ATOM_NEW_STREAM, - enif_make_copy(env, s_ctx->eHandle), - props_name, - props_value, - 2); + ERL_NIF_TERM report = make_event_with_props( + env, ATOM_NEW_STREAM, eHandle, props_name, props_value, 2); if (!enif_send(NULL, acc_pid, NULL, report)) { if (is_orphan) { - // Connection acceptor is dead - // we don't need to destroy acceptor, we don't have the ownwership - destroy_s_ctx(s_ctx); - return QUIC_STATUS_UNREACHABLE; + return selected_owner_unreachable(s_ctx); } else { @@ -1429,6 +1424,7 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, AcceptorDestroy(acc); // Set is_orphan to true, connection owner takeover props_value[1] = ATOM_TRUE; + acc = AcceptorAlloc(); CxPlatCopyMemory(acc, c_ctx->owner, sizeof(ACCEPTOR)); s_ctx->owner = acc; @@ -1436,20 +1432,30 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, acc->active = ACCEPTOR_RECV_MODE_PASSIVE; acc_pid = &(acc->Pid); - report = make_event_with_props(env, - ATOM_NEW_STREAM, - enif_make_copy(env, s_ctx->eHandle), - props_name, - props_value, - 2); + report = make_event_with_props( + env, ATOM_NEW_STREAM, eHandle, props_name, props_value, 2); if (!enif_send(NULL, acc_pid, NULL, report)) { - destroy_s_ctx(s_ctx); - return QUIC_STATUS_UNREACHABLE; + return selected_owner_unreachable(s_ctx); } } } + + int mon_res = enif_monitor_process(env, s_ctx, acc_pid, &(s_ctx->owner_mon)); + CXPLAT_FRE_ASSERTMSG(mon_res >= 0, "stream down callback must be defined!"); + if (mon_res == 0) + { + s_ctx->is_monitored = TRUE; + } + else // mon_res > 0 + { + // unlikely + // owner pid is dead, but message is sent + return selected_owner_unreachable(s_ctx); + } + s_ctx->is_closed = FALSE; + CXPLAT_FRE_ASSERTMSG(s_ctx, "s_ctx must be validate"); MsQuic->SetCallbackHandler( Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx); return QUIC_STATUS_SUCCESS; @@ -1840,6 +1846,19 @@ put_conn_handles(ErlNifEnv *env, ERL_NIF_TERM conn_handles) } } +QUIC_STATUS +selected_owner_unreachable(QuicerStreamCTX *s_ctx) +{ + // + // s_ctx ownership transfer failed + // There is no shared ownership, we must destroy the s_ctx here + // + s_ctx->is_closed = TRUE; + enif_clear_env(s_ctx->env); + destroy_s_ctx(s_ctx); + return QUIC_STATUS_UNREACHABLE; +} + ///_* Emacs ///==================================================================== /// Local Variables: diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index b5e87f7c..fda9e6f3 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -53,16 +53,15 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, // // Note, c_ctx is newly init here, don't grab lock. // - c_ctx = init_c_ctx(); - c_ctx->r_ctx = l_ctx->r_ctx; - ErlNifEnv *env = c_ctx->env; - - if (!c_ctx) + if (!(c_ctx = init_c_ctx())) { Status = QUIC_STATUS_OUT_OF_MEMORY; goto Error; } + c_ctx->r_ctx = l_ctx->r_ctx; + ErlNifEnv *env = c_ctx->env; + c_ctx->Connection = Event->NEW_CONNECTION.Connection; CxPlatRefInitialize(&(c_ctx->ref_count)); diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index ae7d8746..9b675c8f 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -958,15 +958,13 @@ handle_stream_event_recv(HQUIC Stream, { // Owner is waiting for data // notify owner to trigger async recv // - if (!enif_send( - NULL, - &(s_ctx->owner->Pid), - NULL, - make_event(env, - ATOM_QUIC_STATUS_CONTINUE, - // @TODO eHandle is in env, no need to copy? - enif_make_copy(env, s_ctx->eHandle), - ATOM_UNDEFINED))) + if (!enif_send(NULL, + &(s_ctx->owner->Pid), + NULL, + make_event(env, + ATOM_QUIC_STATUS_CONTINUE, + enif_make_copy(env, s_ctx->eHandle), + ATOM_UNDEFINED))) { // App down, shutdown stream MsQuic->StreamShutdown(Stream, diff --git a/c_src/quicer_tp.c b/c_src/quicer_tp.c index e92eda5a..2323525e 100644 --- a/c_src/quicer_tp.c +++ b/c_src/quicer_tp.c @@ -47,6 +47,10 @@ tp_snk(ErlNifEnv *env, { ErlNifPid *pid = &GLOBAL_SNAB_KC_PID; + // avoid polluting the context env, + // use tmp env for message passing, + env = enif_alloc_env(); + ERL_NIF_TERM snk_event; ERL_NIF_TERM snk_event_key_array[7] = { ATOM_IN_ENV(SNK_KIND), ATOM_IN_ENV(CONTEXT), @@ -80,5 +84,6 @@ tp_snk(ErlNifEnv *env, = enif_make_tuple2(env, ATOM_IN_ENV(GEN_CAST), enif_make_tuple2(env, ATOM_IN_ENV(TRACE), snk_event)); - enif_send(NULL, pid, NULL, report); + enif_send(NULL, pid, env, report); + enif_free_env(env); } diff --git a/test/example_client_connection.erl b/test/example_client_connection.erl index 7cb93ff4..01bf2bc9 100644 --- a/test/example_client_connection.erl +++ b/test/example_client_connection.erl @@ -164,6 +164,7 @@ handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when _ = quicer:async_shutdown_stream(Stream), {ok, S#{streams := lists:keydelete(Stream, 2, Streams)}}; {OwnerPid, Stream} when is_pid(OwnerPid) -> + %%% you should not hit here in testing, if so, fire one issue report {error, {fixme, bug_handoff_fail}}; false -> %% garbage signals from already dead stream (such like crashed owner) diff --git a/test/prop_stateful_client_conn.erl b/test/prop_stateful_client_conn.erl index ce28d503..27d7ccda 100644 --- a/test/prop_stateful_client_conn.erl +++ b/test/prop_stateful_client_conn.erl @@ -28,6 +28,10 @@ postcondition/3 ]). +%% Helpers +-export([ + spawn_stream_acceptor/3 +]). %%%%%%%%%%%%%%%%%% %%% PROPERTIES %%% %%%%%%%%%%%%%%%%%% @@ -72,6 +76,10 @@ command(#{handle := Handle}) -> {100, {call, quicer, getopt, [Handle, ?LET({Opt, _}, conn_opt(), Opt)]}}, {100, {call, quicer, async_accept_stream, [Handle, ?LET(Opts, quicer_acceptor_opts(), Opts)]}}, + {100, + {call, ?MODULE, spawn_stream_acceptor, [ + Handle, ?LET(Opts, quicer_acceptor_opts(), Opts), range(0, 200) + ]}}, {100, {call, quicer, peername, [Handle]}}, {50, {call, quicer, peercert, [Handle]}}, {10, {call, quicer, negotiated_protocol, [Handle]}}, @@ -205,6 +213,8 @@ postcondition(#{handle := _H, state := closed}, {call, quicer, get_connections, true; postcondition(#{state := closed}, {call, _Mod, _Fun, _Args}, {error, closed}) -> true; +postcondition(_State, {call, ?MODULE, spawn_stream_acceptor, _Args}, ok) -> + true; postcondition(_State, {call, _Mod, _Fun, _Args} = _Call, _Res) -> false. @@ -278,3 +288,21 @@ default_conn_opts() -> {certfile, "./msquic/submodules/openssl/test/certs/servercert.pem"}, {keyfile, "./msquic/submodules/openssl/test/certs/serverkey.pem"} ]. + +%% Test helpers +spawn_stream_acceptor(ConnHandle, Opts, DieAfter) -> + spawn( + fun() -> + do_accept_stream(ConnHandle, Opts, DieAfter) + end + ), + ok. + +do_accept_stream(Conn, Opts, DieAfter) -> + {ok, Conn} = quicer:async_accept_stream(Conn, Opts), + receive + {quicer, new_stream, _Stream, _Flags} -> + timer:sleep(DieAfter) + after DieAfter -> + ok + end. diff --git a/test/prop_stateful_server_conn.erl b/test/prop_stateful_server_conn.erl index 777ad853..a5952ecc 100644 --- a/test/prop_stateful_server_conn.erl +++ b/test/prop_stateful_server_conn.erl @@ -139,7 +139,9 @@ postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {ok, _}) - true; postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {error, invalid_state}) -> true; -postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {error, timeout}) -> +postcondition(#{state := closed}, {call, quicer, handshake, _}, {error, timeout}) -> + true; +postcondition(#{state := _}, {call, quicer, handshake, _Args}, {error, timeout}) -> %% @FIXME true; postcondition( @@ -209,21 +211,23 @@ postcondition(_State, {call, quicer, close_connection, _Args}, ok) -> true; postcondition(_State, {call, quicer, shutdown_connection, _Args}, ok) -> true; -postcondition(accepted, {call, quicer, close_connection, _Args}, {error, timeout}) -> +postcondition(#{state := accepted}, {call, quicer, close_connection, _Args}, {error, timeout}) -> + true; +postcondition(#{state := accepted}, {call, quicer, shutdown_connection, _Args}, {error, timeout}) -> true; -postcondition(accepted, {call, quicer, shutdown_connection, _Args}, {error, timeout}) -> +postcondition(#{state := accepted}, {call, quicer, close_connection, _Args}, {error, closed}) -> true; -postcondition(accepted, {call, quicer, close_connection, _Args}, {error, closed}) -> +postcondition(#{state := closed}, {call, quicer, close_connection, _Args}, {error, timeout}) -> true; postcondition(_, {call, quicer, shutdown_connection, [_, _, Tout]}, {error, timeout}) when Tout < 200 -> true; -postcondition(_, {call, quicer, close_connection, [_, _, Tout]}, {error, timeout}) when +postcondition(_, {call, quicer, close_connection, [_, Tout]}, {error, timeout}) when Tout < 200 -> true; -postcondition(accepted, {call, quicer, shutdown_connection, _Args}, {error, closed}) -> +postcondition(#{state := accepted}, {call, quicer, shutdown_connection, _Args}, {error, closed}) -> true; postcondition( #{me := Me, owner := Owner, state := State}, @@ -243,6 +247,10 @@ postcondition( S =:= accepted orelse S =:= closed -> true; +postcondition( + #{state := accepted}, {call, quicer, async_csend, [_]}, {error, stm_send_error, aborted} +) -> + true; postcondition(#{state := accepted}, {call, quicer, async_csend, [_]}, {error, timeout}) -> %% @FIXME https://github.com/emqx/quic/issues/265 true; diff --git a/test/quicer_connection_SUITE.erl b/test/quicer_connection_SUITE.erl index 5c70448e..22f7cf1b 100644 --- a/test/quicer_connection_SUITE.erl +++ b/test/quicer_connection_SUITE.erl @@ -897,6 +897,26 @@ tc_get_conn_owner_server(Config) -> ct:fail({client_fail, Reason}) end. +tc_shutdown_conn_before_handshake(Config) -> + Port = select_port(), + {ok, L} = quicer:listen(Port, default_listen_opts(Config)), + {ok, L} = quicer:async_accept(L, #{}), + CPid = spawn(fun() -> + _ = quicer:connect("localhost", Port, default_conn_opts(), 1000), + receive + done -> ok + end + end), + receive + {quic, new_conn, Conn, _} -> + Res = quicer:shutdown_connection(Conn), + CPid ! done, + ?assertEqual(ok, Res) + after 1000 -> + ct:fail("conn from client timeout") + end, + quicer:close_listener(L). + %%% %%% Helpers %%% diff --git a/tools/run/bin/lib.sh b/tools/run/bin/lib.sh index a0b4aaca..da77c152 100644 --- a/tools/run/bin/lib.sh +++ b/tools/run/bin/lib.sh @@ -29,7 +29,7 @@ do_run() { done ;; proper) - escript "$REBAR3" as test proper + escript "$REBAR3" as test proper --noshrink ;; *) escript "$REBAR3" $@