Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: monitor remote stream #314

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ ERL_NIF_TERM parse_conn_event_mask(ErlNifEnv *env,
ERL_NIF_TERM eoptions,
QuicerConnCTX *c_ctx);

QUIC_STATUS selected_owner_unreachable(QuicerStreamCTX *s_ctx);

ERL_NIF_TERM
peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
{
Expand Down Expand Up @@ -1403,22 +1405,15 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
ERL_NIF_TERM props_value[]
= { enif_make_uint(env, Event->PEER_STREAM_STARTED.Flags),
ATOM_BOOLEAN(is_orphan) };
ERL_NIF_TERM eHandle = enif_make_copy(env, s_ctx->eHandle);

ERL_NIF_TERM report
= make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_copy(env, s_ctx->eHandle),
props_name,
props_value,
2);
ERL_NIF_TERM report = make_event_with_props(
env, ATOM_NEW_STREAM, eHandle, props_name, props_value, 2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
if (is_orphan)
{
// Connection acceptor is dead
// we don't need to destroy acceptor, we don't have the ownwership
destroy_s_ctx(s_ctx);
return QUIC_STATUS_UNREACHABLE;
return selected_owner_unreachable(s_ctx);
}
else
{
Expand All @@ -1429,27 +1424,38 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
AcceptorDestroy(acc);
// Set is_orphan to true, connection owner takeover
props_value[1] = ATOM_TRUE;

acc = AcceptorAlloc();
CxPlatCopyMemory(acc, c_ctx->owner, sizeof(ACCEPTOR));
s_ctx->owner = acc;
// this is our protocol
acc->active = ACCEPTOR_RECV_MODE_PASSIVE;
acc_pid = &(acc->Pid);

report = make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_copy(env, s_ctx->eHandle),
props_name,
props_value,
2);
report = make_event_with_props(
env, ATOM_NEW_STREAM, eHandle, props_name, props_value, 2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
destroy_s_ctx(s_ctx);
return QUIC_STATUS_UNREACHABLE;
return selected_owner_unreachable(s_ctx);
}
}
}

int mon_res = enif_monitor_process(env, s_ctx, acc_pid, &(s_ctx->owner_mon));
CXPLAT_FRE_ASSERTMSG(mon_res >= 0, "stream down callback must be defined!");
if (mon_res == 0)
{
s_ctx->is_monitored = TRUE;
}
else // mon_res > 0
{
// unlikely
// owner pid is dead, but message is sent
return selected_owner_unreachable(s_ctx);
}

s_ctx->is_closed = FALSE;
CXPLAT_FRE_ASSERTMSG(s_ctx, "s_ctx must be validate");
MsQuic->SetCallbackHandler(
Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx);
return QUIC_STATUS_SUCCESS;
Expand Down Expand Up @@ -1840,6 +1846,19 @@ put_conn_handles(ErlNifEnv *env, ERL_NIF_TERM conn_handles)
}
}

QUIC_STATUS
selected_owner_unreachable(QuicerStreamCTX *s_ctx)
{
//
// s_ctx ownership transfer failed
// There is no shared ownership, we must destroy the s_ctx here
//
s_ctx->is_closed = TRUE;
enif_clear_env(s_ctx->env);
destroy_s_ctx(s_ctx);
return QUIC_STATUS_UNREACHABLE;
}

///_* Emacs
///====================================================================
/// Local Variables:
Expand Down
9 changes: 4 additions & 5 deletions c_src/quicer_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,15 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener,
//
// Note, c_ctx is newly init here, don't grab lock.
//
c_ctx = init_c_ctx();
c_ctx->r_ctx = l_ctx->r_ctx;
ErlNifEnv *env = c_ctx->env;

if (!c_ctx)
if (!(c_ctx = init_c_ctx()))
{
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Error;
}

c_ctx->r_ctx = l_ctx->r_ctx;
ErlNifEnv *env = c_ctx->env;

c_ctx->Connection = Event->NEW_CONNECTION.Connection;
CxPlatRefInitialize(&(c_ctx->ref_count));

Expand Down
16 changes: 7 additions & 9 deletions c_src/quicer_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,15 +958,13 @@ handle_stream_event_recv(HQUIC Stream,
{ // Owner is waiting for data
// notify owner to trigger async recv
//
if (!enif_send(
NULL,
&(s_ctx->owner->Pid),
NULL,
make_event(env,
ATOM_QUIC_STATUS_CONTINUE,
// @TODO eHandle is in env, no need to copy?
enif_make_copy(env, s_ctx->eHandle),
ATOM_UNDEFINED)))
if (!enif_send(NULL,
&(s_ctx->owner->Pid),
NULL,
make_event(env,
ATOM_QUIC_STATUS_CONTINUE,
enif_make_copy(env, s_ctx->eHandle),
ATOM_UNDEFINED)))
{
// App down, shutdown stream
MsQuic->StreamShutdown(Stream,
Expand Down
7 changes: 6 additions & 1 deletion c_src/quicer_tp.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ tp_snk(ErlNifEnv *env,
{
ErlNifPid *pid = &GLOBAL_SNAB_KC_PID;

// avoid polluting the context env,
// use tmp env for message passing,
env = enif_alloc_env();

ERL_NIF_TERM snk_event;
ERL_NIF_TERM snk_event_key_array[7]
= { ATOM_IN_ENV(SNK_KIND), ATOM_IN_ENV(CONTEXT),
Expand Down Expand Up @@ -80,5 +84,6 @@ tp_snk(ErlNifEnv *env,
= enif_make_tuple2(env,
ATOM_IN_ENV(GEN_CAST),
enif_make_tuple2(env, ATOM_IN_ENV(TRACE), snk_event));
enif_send(NULL, pid, NULL, report);
enif_send(NULL, pid, env, report);
enif_free_env(env);
}
1 change: 1 addition & 0 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when
_ = quicer:async_shutdown_stream(Stream),
{ok, S#{streams := lists:keydelete(Stream, 2, Streams)}};
{OwnerPid, Stream} when is_pid(OwnerPid) ->
%%% you should not hit here in testing, if so, fire one issue report
{error, {fixme, bug_handoff_fail}};
false ->
%% garbage signals from already dead stream (such like crashed owner)
Expand Down
28 changes: 28 additions & 0 deletions test/prop_stateful_client_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
postcondition/3
]).

%% Helpers
-export([
spawn_stream_acceptor/3
]).
%%%%%%%%%%%%%%%%%%
%%% PROPERTIES %%%
%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -72,6 +76,10 @@ command(#{handle := Handle}) ->
{100, {call, quicer, getopt, [Handle, ?LET({Opt, _}, conn_opt(), Opt)]}},
{100,
{call, quicer, async_accept_stream, [Handle, ?LET(Opts, quicer_acceptor_opts(), Opts)]}},
{100,
{call, ?MODULE, spawn_stream_acceptor, [
Handle, ?LET(Opts, quicer_acceptor_opts(), Opts), range(0, 200)
]}},
{100, {call, quicer, peername, [Handle]}},
{50, {call, quicer, peercert, [Handle]}},
{10, {call, quicer, negotiated_protocol, [Handle]}},
Expand Down Expand Up @@ -205,6 +213,8 @@ postcondition(#{handle := _H, state := closed}, {call, quicer, get_connections,
true;
postcondition(#{state := closed}, {call, _Mod, _Fun, _Args}, {error, closed}) ->
true;
postcondition(_State, {call, ?MODULE, spawn_stream_acceptor, _Args}, ok) ->
true;
postcondition(_State, {call, _Mod, _Fun, _Args} = _Call, _Res) ->
false.

Expand Down Expand Up @@ -278,3 +288,21 @@ default_conn_opts() ->
{certfile, "./msquic/submodules/openssl/test/certs/servercert.pem"},
{keyfile, "./msquic/submodules/openssl/test/certs/serverkey.pem"}
].

%% Test helpers
spawn_stream_acceptor(ConnHandle, Opts, DieAfter) ->
spawn(
fun() ->
do_accept_stream(ConnHandle, Opts, DieAfter)
end
),
ok.

do_accept_stream(Conn, Opts, DieAfter) ->
{ok, Conn} = quicer:async_accept_stream(Conn, Opts),
receive
{quicer, new_stream, _Stream, _Flags} ->
timer:sleep(DieAfter)
after DieAfter ->
ok
end.
20 changes: 14 additions & 6 deletions test/prop_stateful_server_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {ok, _}) -
true;
postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {error, invalid_state}) ->
true;
postcondition(#{state := accepted}, {call, quicer, handshake, _Args}, {error, timeout}) ->
postcondition(#{state := closed}, {call, quicer, handshake, _}, {error, timeout}) ->
true;
postcondition(#{state := _}, {call, quicer, handshake, _Args}, {error, timeout}) ->
%% @FIXME
true;
postcondition(
Expand Down Expand Up @@ -209,21 +211,23 @@ postcondition(_State, {call, quicer, close_connection, _Args}, ok) ->
true;
postcondition(_State, {call, quicer, shutdown_connection, _Args}, ok) ->
true;
postcondition(accepted, {call, quicer, close_connection, _Args}, {error, timeout}) ->
postcondition(#{state := accepted}, {call, quicer, close_connection, _Args}, {error, timeout}) ->
true;
postcondition(#{state := accepted}, {call, quicer, shutdown_connection, _Args}, {error, timeout}) ->
true;
postcondition(accepted, {call, quicer, shutdown_connection, _Args}, {error, timeout}) ->
postcondition(#{state := accepted}, {call, quicer, close_connection, _Args}, {error, closed}) ->
true;
postcondition(accepted, {call, quicer, close_connection, _Args}, {error, closed}) ->
postcondition(#{state := closed}, {call, quicer, close_connection, _Args}, {error, timeout}) ->
true;
postcondition(_, {call, quicer, shutdown_connection, [_, _, Tout]}, {error, timeout}) when
Tout < 200
->
true;
postcondition(_, {call, quicer, close_connection, [_, _, Tout]}, {error, timeout}) when
postcondition(_, {call, quicer, close_connection, [_, Tout]}, {error, timeout}) when
Tout < 200
->
true;
postcondition(accepted, {call, quicer, shutdown_connection, _Args}, {error, closed}) ->
postcondition(#{state := accepted}, {call, quicer, shutdown_connection, _Args}, {error, closed}) ->
true;
postcondition(
#{me := Me, owner := Owner, state := State},
Expand All @@ -243,6 +247,10 @@ postcondition(
S =:= accepted orelse S =:= closed
->
true;
postcondition(
#{state := accepted}, {call, quicer, async_csend, [_]}, {error, stm_send_error, aborted}
) ->
true;
postcondition(#{state := accepted}, {call, quicer, async_csend, [_]}, {error, timeout}) ->
%% @FIXME https://github.com/emqx/quic/issues/265
true;
Expand Down
20 changes: 20 additions & 0 deletions test/quicer_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,26 @@ tc_get_conn_owner_server(Config) ->
ct:fail({client_fail, Reason})
end.

tc_shutdown_conn_before_handshake(Config) ->
Port = select_port(),
{ok, L} = quicer:listen(Port, default_listen_opts(Config)),
{ok, L} = quicer:async_accept(L, #{}),
CPid = spawn(fun() ->
_ = quicer:connect("localhost", Port, default_conn_opts(), 1000),
receive
done -> ok
end
end),
receive
{quic, new_conn, Conn, _} ->
Res = quicer:shutdown_connection(Conn),
CPid ! done,
?assertEqual(ok, Res)
after 1000 ->
ct:fail("conn from client timeout")
end,
quicer:close_listener(L).

%%%
%%% Helpers
%%%
Expand Down
2 changes: 1 addition & 1 deletion tools/run/bin/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ do_run() {
done
;;
proper)
escript "$REBAR3" as test proper
escript "$REBAR3" as test proper --noshrink
;;
*)
escript "$REBAR3" $@
Expand Down
Loading