From c5fc8cff69fa00410aa94db3a53be35e887dceb6 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 10 May 2024 18:48:40 +0200 Subject: [PATCH 1/3] test: stream proper test --- include/quicer_types.hrl | 4 +- test/example_client_connection.erl | 10 +- test/example_server_connection.erl | 11 +- test/example_server_stream.erl | 35 ++- test/prop_quic_types.hrl | 2 + test/prop_stateful_stream.erl | 337 +++++++++++++++++++++++++++++ test/quicer_snb_SUITE.erl | 6 +- 7 files changed, 391 insertions(+), 14 deletions(-) create mode 100644 test/prop_stateful_stream.erl diff --git a/include/quicer_types.hrl b/include/quicer_types.hrl index b1a6fdf3..a282aeea 100644 --- a/include/quicer_types.hrl +++ b/include/quicer_types.hrl @@ -166,8 +166,8 @@ -type conn_shutdown_flag() :: ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE | ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT. -%% @TODO expand --type acceptor_opts() :: map(). + +-type acceptor_opts() :: quic_settings() | #{active => boolean()}. -type active_n() :: boolean() | once | integer(). diff --git a/test/example_client_connection.erl b/test/example_client_connection.erl index ba1ab728..8791bc2b 100644 --- a/test/example_client_connection.erl +++ b/test/example_client_connection.erl @@ -145,4 +145,12 @@ peer_needs_streams(C, bidi_streams, S) -> {ok, S}. handle_info({'EXIT', _Pid, _Reason}, State) -> - {ok, State}. + {ok, State}; +handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when + %% @FIXME, not desired behavior. + %% Casued by inflight quic Msg during stream handoff + Sig == peer_send_shutdown orelse Sig == stream_closed +-> + {OwnerPid, Stream} = lists:keyfind(Stream, 2, Streams), + OwnerPid ! Msg, + {ok, S}. diff --git a/test/example_server_connection.erl b/test/example_server_connection.erl index 485eda4d..429ca247 100644 --- a/test/example_server_connection.erl +++ b/test/example_server_connection.erl @@ -105,8 +105,12 @@ new_stream( case quicer:handoff_stream(Stream, StreamOwner) of ok -> {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}}; - {error, closed} -> + {error, closed} when Streams == [] -> + %% Shutdown the connection when fails to handle the first stream {stop, {shutdown, closed}, CBState}; + {error, closed} -> + %% For proper test, handle random failures from the streams + {ok, CBState}; false -> {error, handoff_fail} end; @@ -149,5 +153,10 @@ dgram_recv(C, Bin, _Flag, S) -> end, {ok, S}. +handle_info({quic, stream_closed, _}, State) -> + %% Stream close in the before/in the middle of handoff + {ok, State}; handle_info({'EXIT', _Pid, _Reason}, State) -> + {ok, State}; +handle_info(_, State) -> {ok, State}. diff --git a/test/example_server_stream.erl b/test/example_server_stream.erl index 6035b13b..f4d116e8 100644 --- a/test/example_server_stream.erl +++ b/test/example_server_stream.erl @@ -35,6 +35,7 @@ -export([handle_stream_data/4]). -export([handle_call/3]). +-export([handle_info/2]). -include("quicer.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -51,8 +52,12 @@ init_handoff(Stream, _StreamOpts, Conn, #{flags := Flags}) -> {ok, InitState}. post_handoff(Stream, _PostData, State) -> - ok = quicer:setopt(Stream, active, true), - {ok, State}. + case quicer:setopt(Stream, active, true) of + ok -> + {ok, State}; + {error, closed} -> + {stop, closed, State} + end. new_stream(Stream, #{flags := Flags}, Conn) -> InitState = #{ @@ -131,13 +136,21 @@ handle_stream_data( case PeerStream of undefined -> - {ok, StreamProc} = quicer_local_stream:start_link( - ?MODULE, - Conn, - [{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}] - ), - {ok, _} = quicer_stream:send(StreamProc, Bin), - {ok, State#{peer_stream := StreamProc}}; + case + quicer_local_stream:start_link( + ?MODULE, + Conn, + [{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}] + ) + of + {ok, StreamProc} -> + catch quicer_stream:send(StreamProc, Bin), + {ok, State#{peer_stream := StreamProc}}; + {error, {_, _}} -> + {ok, State}; + {error, closed} -> + {ok, State} + end; StreamProc when is_pid(StreamProc) -> {ok, _} = quicer_stream:send(StreamProc, Bin), {ok, State} @@ -170,3 +183,7 @@ stream_closed( handle_call(_Request, _From, S) -> {reply, {error, not_impl}, S}. + +handle_info(Info, S) -> + % ct:pal("~p:~p:~p ", [?MODULE, ?FUNCTION_NAME, Info]), + {ok, S}. diff --git a/test/prop_quic_types.hrl b/test/prop_quic_types.hrl index abbb6560..b11b4940 100644 --- a/test/prop_quic_types.hrl +++ b/test/prop_quic_types.hrl @@ -153,3 +153,5 @@ | {start_flag, stream_start_flags()} | {event_mask, uint32()} | {disable_fpbuffer, boolean()}. + +-type stream_accept_opts() :: [{active, boolean()}]. diff --git a/test/prop_stateful_stream.erl b/test/prop_stateful_stream.erl new file mode 100644 index 00000000..d807011f --- /dev/null +++ b/test/prop_stateful_stream.erl @@ -0,0 +1,337 @@ +-module(prop_stateful_stream). +-compile([export_all]). +-include_lib("proper/include/proper.hrl"). +-include_lib("quicer/include/quicer_types.hrl"). +-include("prop_quic_types.hrl"). + +%% Test APIs: +%% - async accept stream +%% - start stream +%% - send +%% - recv +%% - shutdown_stream +%% - close_stream +%% - setopt +%% - getopt + +%% Model Callbacks +-export([ + command/1, + initial_state/0, + next_state/3, + precondition/2, + postcondition/3 +]). + +-define(MAX_STREAMS, 64 * 1024 - 1). + +%%%%%%%%%%%%%%%%%% +%%% PROPERTIES %%% +%%%%%%%%%%%%%%%%%% +prop_stateful_client_stream_test(opts) -> + [{numtests, 2000}]. +prop_stateful_client_stream_test() -> + process_flag(trap_exit, true), + ?SETUP( + fun() -> + {ok, _} = listener_start_link(?MODULE), + fun() -> listener_stop(?MODULE) end + end, + ?FORALL( + Cmds, + commands(?MODULE), + begin + flush_quic_msgs(), + {ok, H} = quicer:connect("localhost", 14569, default_conn_opts(), 10000), + {History, State, Result} = run_commands(?MODULE, Cmds, [{conn_handle, H}]), + quicer:async_shutdown_connection(H, ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT, 0), + ?WHENFAIL( + io:format( + "History: ~p\nState: ~p\nResult: ~p\n", + [History, State, Result] + ), + aggregate(command_names(Cmds), Result =:= ok) + ) + end + ) + ). + +prop_stateful_server_stream_test(opts) -> + [{numtests, 2000}]. +prop_stateful_server_stream_test() -> + Port = 14570, + process_flag(trap_exit, true), + ?SETUP( + fun() -> + {ok, L} = quicer:listen(Port, default_listen_opts()), + put(?FUNCTION_NAME, L), + fun() -> quicer:stop_listener(L) end + end, + ?FORALL( + Cmds, + commands(?MODULE), + begin + flush_quic_msgs(), + L = get(?FUNCTION_NAME), + {ok, L} = quicer:async_accept(L, maps:from_list([{active, false}])), + {ok, Client} = example_client_connection:start_link( + "localhost", + Port, + {default_conn_opts(), quicer_test_lib:default_stream_opts()} + ), + Conn = + receive + {quic, new_conn, C, _} -> + case quicer:handshake(C) of + {ok, C} -> C; + Err -> error({quicer:get_conn_rid(C), Err}) + end + after 3000 -> + %% hard to reproduce here + error(new_conn_timeout) + end, + {History, State, Result} = run_commands(?MODULE, Cmds, [ + {conn_handle, Conn} + ]), + quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT, 0), + catch gen_server:stop(Client), + ?WHENFAIL( + io:format( + "History: ~p\nState: ~p\nResult: ~p\n", + [History, State, Result] + ), + aggregate(command_names(Cmds), Result =:= ok) + ) + end + ) + ). + +%%%%%%%%%%%%% +%%% MODEL %%% +%%%%%%%%%%%%% +%% @doc Initial model value at system start. Should be deterministic. +initial_state() -> + #{ + conn_state => connected, + stream_set => [], + owner => self(), + me => self(), + % cnt calls + calls => 1 + }. + +%% @doc List of possible commands to run against the system +command(#{stream_set := SS}) -> + C = {var, conn_handle}, + oneof([ + {call, quicer, start_stream, [ + C, + ?LET( + Opts, + quicer_stream_opts(), + Opts ++ [{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}] + ) + ]}, + {call, quicer, start_stream, [C, ?LET(Opts, quicer_stream_opts(), Opts)]}, + {call, quicer, async_accept_stream, [C, ?LET(Opt, stream_accept_opts(), Opt)]}, + {call, quicer, async_send, [random_stream(SS), binary()]}, + {call, quicer, async_send, [remote_stream(SS), binary()]}, + {call, ?MODULE, send_recv, [random_stream(SS), binary()]}, + {call, quicer, async_shutdown_stream, [random_stream(SS)]}, + {call, quicer, async_close_stream, [random_stream(SS)]}, + {call, ?MODULE, getopt, [random_stream(SS)]}, + {call, ?MODULE, setopt, [random_stream(SS)]}, + {call, ?MODULE, close_remote_stream, []}, + {call, ?MODULE, stop_client_user, [SS]}, + %% @TODO mix with close_connection + %{call, quicer, close_connection, [C]}, + {call, quicer, send, [random_stream(SS), binary()]} + ]). + +%% @doc Determines whether a command should be valid under the +%% current state. +precondition(#{conn_state := closed}, {call, _, _, _}) -> + false; +precondition(#{stream_set := []}, {call, ?MODULE, getopt, _}) -> + false; +precondition(#{stream_set := []}, {call, ?MODULE, setopt, _}) -> + false; +precondition(#{stream_set := []}, {call, ?MODULE, send_recv, _}) -> + false; +precondition(#{stream_set := []}, {call, quicer, Act, _}) when + Act =/= start_stream andalso + Act =/= async_accept_stream +-> + false; +precondition(SS, {call, ?MODULE, stop_client_user, _}) -> + maps:is_key(client_user, SS); +precondition(_State, {call, _Mod, _Fun, _Args}) -> + true. + +%% @doc Given the state `State' *prior* to the call +%% `{call, Mod, Fun, Args}', determine whether the result +%% `Res' (coming from the actual system) makes sense. +postcondition(#{conn_state := closed}, {call, quicer, _, _}, {error, _}) -> + true; +%% postcondition(_State, {call, quicer, start_stream, _Args}, {error, _} = E) -> +%% false; +postcondition(_State, {call, quicer, send, [_, <<>>]}, {error, _}) -> + %% send empty binary results badarg + true; +postcondition(_State, {call, quicer, send, [_, _]}, {error, E}) when + E == closed orelse E == cancelled +-> + %% async shutdowned stream + true; +postcondition(_State, {call, quicer, send, [closed, _]}, {error, badarg}) -> + true; +postcondition(_State, {call, quicer, send, [stm_open_error, _]}, {error, badarg}) -> + true; +postcondition(_State, {call, quicer, send, _Args}, {error, _}) -> + false; +postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> + true. + +%% @doc Assuming the postcondition for a call was true, update the model +%% accordingly for the test to proceed. +next_state(#{calls := C} = State, {error, _, _}, {call, quicer, start_stream, _Args}) -> + State#{calls := C + 1}; +next_state(#{stream_set := SS, calls := C} = State, V, {call, quicer, start_stream, _Args}) -> + State#{stream_set := [{call, erlang, element, [2, V]} | SS], calls := C + 1}; +next_state( + #{stream_set := SS, calls := C} = State, _V, {call, quicer, async_shutdown_stream, [Stream]} +) -> + State#{stream_set := lists:delete(Stream, SS), calls := C + 1}; +next_state(#{calls := C} = State, _V, {call, quicer, close_connection, _Args}) -> + State#{calls := C + 1, conn_state := closed}; +next_state(#{calls := C} = State, _Res, {call, _Mod, _Fun, _Args}) -> + NewState = State, + NewState#{calls := C + 1}. + +%%% helpers +send_recv(Stream, Binary) -> + case quicer:send(Stream, Binary) of + ok -> + quicer:recv(Stream, byte_size(Binary)); + E -> + E + end. + +unblock_streams(Conn) -> + receive + {quic, peer_needs_streams, Conn, unidi_streams} -> + {ok, Current} = quicer:getopt(Conn, local_unidi_stream_count), + ok = quicer:setopt(Conn, settings, #{peer_unidi_stream_count => Current + 10}); + {quic, peer_needs_streams, Conn, bidi_streams} -> + {ok, Current} = quicer:getopt(Conn, local_bidi_stream_count), + ok = quicer:setopt(Conn, settings, #{peer_bidi_stream_count => Current + 10}) + after 0 -> + ok + end. + +setopt(SS) -> + {K, V} = ?LET(Opt, stream_opt(), Opt), + quicer:setopt(random_stream(SS), K, V, stream). + +getopt(SS) -> + quicer:setopt(random_stream(SS), ?LET({K, _V}, stream_opt(), K), stream). + +close_remote_stream() -> + receive + {new_stream, Stream, _, _} -> + quicer:close_stream( + Stream, + ?LET(FLAG, stream_shutdown_flags(), FLAG), + ?LET(Err, app_errno(), Err) + ) + after 0 -> + ok + end. + +remote_stream(_) -> + receive + {new_stream, Stream, _, _} -> + quicer:async_send(Stream, binary()) + after 0 -> + ok + end. + +stop_client_user(#{client_user := Pid}) -> + Pid ! stop. + +flush_quic_msgs() -> + receive + {quic, _, _, _} -> + flush_quic_msgs() + after 0 -> + ok + end. + +%%%%%%%%%%%%%%%%%%%%%%% +%%% Listener helper %%% +%%%%%%%%%%%%%%%%%%%%%%% +listener_start_link(ListenerName) -> + application:ensure_all_started(quicer), + LPort = 14569, + ListenerOpts = default_listen_opts(), + ConnectionOpts = [ + {conn_callback, example_server_connection}, + {stream_acceptors, 32} + | default_conn_opts() + ], + StreamOpts = [ + {stream_callback, example_server_stream}, + {auto_unblock_stream, true} + ], + Options = {ListenerOpts, ConnectionOpts, StreamOpts}, + quicer:spawn_listener(ListenerName, LPort, Options). + +listener_stop(ListenerName) -> + quicer:terminate_listener(ListenerName). + +random_stream([H | _]) when is_tuple(H) -> + %% For Exec + H; +random_stream(SS) -> + %% For symbolic + {call, erlang, hd, [SS]}. + +%% OS picks the available port +select_port() -> + {ok, S} = gen_udp:open(0, [{reuseaddr, true}]), + {ok, {_, Port}} = inet:sockname(S), + gen_udp:close(S), + Port. + +default_listen_opts() -> + [ + {conn_acceptors, 32}, + {cacertfile, "./msquic/submodules/openssl/test/certs/rootCA.pem"}, + {certfile, "./msquic/submodules/openssl/test/certs/servercert.pem"}, + {keyfile, "./msquic/submodules/openssl/test/certs/serverkey.pem"}, + {alpn, ["prop"]}, + {verify, none}, + {idle_timeout_ms, 0}, + %% some CI runner is slow on this + {handshake_idle_timeout_ms, 10000}, + % QUIC_SERVER_RESUME_AND_ZERORTT + {server_resumption_level, 2}, + {peer_bidi_stream_count, ?MAX_STREAMS}, + {peer_unidi_stream_count, ?MAX_STREAMS} + ]. + +default_conn_opts() -> + [ + {alpn, ["prop"]}, + %% , {sslkeylogfile, "/tmp/SSLKEYLOGFILE"} + {verify, none}, + {idle_timeout_ms, 0}, + {handshake_idle_timeout_ms, 10000}, + {local_bidi_stream_count, ?MAX_STREAMS}, + {local_unidi_stream_count, ?MAX_STREAMS}, + {peer_bidi_stream_count, ?MAX_STREAMS}, + {peer_unidi_stream_count, ?MAX_STREAMS}, + {cacertfile, "./msquic/submodules/openssl/test/certs/rootCA.pem"}, + {certfile, "./msquic/submodules/openssl/test/certs/servercert.pem"}, + {keyfile, "./msquic/submodules/openssl/test/certs/serverkey.pem"} + ]. diff --git a/test/quicer_snb_SUITE.erl b/test/quicer_snb_SUITE.erl index 214230e7..12b092bf 100644 --- a/test/quicer_snb_SUITE.erl +++ b/test/quicer_snb_SUITE.erl @@ -2836,7 +2836,11 @@ tc_multi_streams_example_server_1(Config) -> receive {quic, Data, Stm3In, DFlag} -> ct:pal("~p is received from ~p with flag: ~p", [Data, Stm3In, DFlag]), - ?assertEqual(Data, <<"ping3">>) + ?assertEqual(Data, <<"ping3">>), + %% Assert that send over a remote unidirectional stream get `invalid_state` + ?assertEqual( + {error, stm_send_error, invalid_state}, quicer:send(Stm3In, <<"foo">>) + ) after 1000 -> ct:fail("no incoming data") end, From 78a6b5919965ae65ed7b14fc7bddfe5f7966673c Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 15 May 2024 20:49:20 +0200 Subject: [PATCH 2/3] fix(memleak): sending new_stream to dead acceptor --- c_src/quicer_connection.c | 18 ++++++++---------- test/prop_stateful_stream.erl | 2 +- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index f1bd8971..2063e1fd 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -1360,7 +1360,7 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, if (!acc) { - // If we don't have available process + // If we don't have available acceptor waiting, // fallback to the connection owner TP_CB_3(no_acceptor, (uintptr_t)c_ctx->Connection, 0); is_orphan = TRUE; @@ -1386,7 +1386,7 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, acc_pid = &(acc->Pid); s_ctx->owner = acc; - s_ctx->is_closed = FALSE; + cache_stream_id(s_ctx); ERL_NIF_TERM props_name[] = { ATOM_FLAGS, ATOM_IS_ORPHAN }; @@ -1394,18 +1394,15 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, = { enif_make_uint(env, Event->PEER_STREAM_STARTED.Flags), ATOM_BOOLEAN(is_orphan) }; - ERL_NIF_TERM report = make_event_with_props(env, - ATOM_NEW_STREAM, - enif_make_resource(env, s_ctx), - props_name, - props_value, - 2); + ERL_NIF_TERM report = make_event_with_props( + env, ATOM_NEW_STREAM, s_ctx->eHandle, props_name, props_value, 2); if (!enif_send(NULL, acc_pid, NULL, report)) { if (is_orphan) { // Connection acceptor is dead // we don't need to destroy acceptor, we don't have the ownwership + destroy_s_ctx(s_ctx); return QUIC_STATUS_UNREACHABLE; } else @@ -1426,17 +1423,18 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, report = make_event_with_props(env, ATOM_NEW_STREAM, - enif_make_resource(env, s_ctx), + s_ctx->eHandle, props_name, props_value, 2); if (!enif_send(NULL, acc_pid, NULL, report)) { - // Sad... + destroy_s_ctx(s_ctx); return QUIC_STATUS_UNREACHABLE; } } } + s_ctx->is_closed = FALSE; MsQuic->SetCallbackHandler( Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx); return QUIC_STATUS_SUCCESS; diff --git a/test/prop_stateful_stream.erl b/test/prop_stateful_stream.erl index d807011f..7aa2f583 100644 --- a/test/prop_stateful_stream.erl +++ b/test/prop_stateful_stream.erl @@ -57,7 +57,7 @@ prop_stateful_client_stream_test() -> ). prop_stateful_server_stream_test(opts) -> - [{numtests, 2000}]. + [{numtests, 10000}]. prop_stateful_server_stream_test() -> Port = 14570, process_flag(trap_exit, true), From 7658ac704099eaeabad61c44484a4c3022b371a8 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 17 May 2024 13:39:16 +0200 Subject: [PATCH 3/3] chore: fix comment --- test/example_server_connection.erl | 2 +- test/example_server_stream.erl | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/test/example_server_connection.erl b/test/example_server_connection.erl index 429ca247..02e00e1f 100644 --- a/test/example_server_connection.erl +++ b/test/example_server_connection.erl @@ -154,7 +154,7 @@ dgram_recv(C, Bin, _Flag, S) -> {ok, S}. handle_info({quic, stream_closed, _}, State) -> - %% Stream close in the before/in the middle of handoff + %% Stream close before/in the middle of handoff {ok, State}; handle_info({'EXIT', _Pid, _Reason}, State) -> {ok, State}; diff --git a/test/example_server_stream.erl b/test/example_server_stream.erl index f4d116e8..c3d4589a 100644 --- a/test/example_server_stream.erl +++ b/test/example_server_stream.erl @@ -185,5 +185,4 @@ handle_call(_Request, _From, S) -> {reply, {error, not_impl}, S}. handle_info(Info, S) -> - % ct:pal("~p:~p:~p ", [?MODULE, ?FUNCTION_NAME, Info]), {ok, S}.