diff --git a/src/quicer_local_stream.erl b/src/quicer_local_stream.erl index 25faba40..14c7345c 100644 --- a/src/quicer_local_stream.erl +++ b/src/quicer_local_stream.erl @@ -17,9 +17,10 @@ %% @doc Stream initiated from local -module(quicer_local_stream). --export([start/3, - start_link/3] - ). +-export([start/4, + start_link/3, + start_link/4 + ]). -include("quicer_types.hrl"). @@ -76,13 +77,21 @@ , handle_continue/2 ]). +-type local_stream_opts() :: stream_opts() | proplists:proplist(). -type cb_ret() :: quicer_stream:cb_ret(). -type cb_state() :: quicer_stream:cb_state(). --spec start_link(module(), connection_handle(), map()) -> gen_server:start_ret(). +-spec start_link(module(), connection_handle(), local_stream_opts()) -> gen_server:start_ret(). start_link(CallbackModule, Connection, Opts) -> - quicer_stream:start_link(CallbackModule, Connection, Opts#{local => true}). - --spec start(module(), connection_handle(), map()) -> gen_server:start_ret(). -start(CallbackModule, Connection, Opts) -> - quicer_stream:start(CallbackModule, Connection, Opts#{local => true}). + start_link(CallbackModule, Connection, Opts, []). +-spec start_link(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts)-> + start_link(CallbackModule, Connection, maps:from_list(Opts), StartOpts); +start_link(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => true}, StartOpts). + +-spec start(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts) -> + start(CallbackModule, Connection, maps:from_list(Opts), StartOpts); +start(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Opts#{is_local => true}, StartOpts). diff --git a/src/quicer_remote_stream.erl b/src/quicer_remote_stream.erl index 3d42b5ff..0ee6f124 100644 --- a/src/quicer_remote_stream.erl +++ b/src/quicer_remote_stream.erl @@ -18,10 +18,12 @@ -include("quicer_types.hrl"). --export([start/3, +-export([start/4, start_link/3, - start/5, - start_link/5 + start_link/4, + start/6, + start_link/5, + start_link/6 ]). -callback init_handoff(stream_handle(), stream_opts(), connection_handle(), new_stream_props()) -> cb_ret(). @@ -77,21 +79,30 @@ , handle_continue/2 ]). +-type remote_stream_opts() :: stream_opts() | proplists:proplist(). -type cb_ret() :: quicer_stream:cb_ret(). -type cb_state() :: quicer_stream:cb_state(). --spec start_link(module(), connection_handle(), map()) -> gen_server:start_ret(). +-spec start_link(module(), connection_handle(), remote_stream_opts()) -> gen_server:start_ret(). start_link(CallbackModule, Connection, Opts) -> - quicer_stream:start_link(CallbackModule, Connection, Opts#{local => false}). + start_link(CallbackModule, Connection, Opts#{is_local => false}, []). --spec start(module(), connection_handle(), map()) -> gen_server:start_ret(). -start(CallbackModule, Connection, Opts) -> - quicer_stream:start(CallbackModule, Connection, Opts#{local => false}). +-spec start_link(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => false}, StartOpts). --spec start_link(module(), connection_handle(), stream_handle(), map(), quicer:new_stream_props()) -> gen_server:start_ret(). +-spec start(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Opts, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Opts#{is_local => false}, StartOpts). + +-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props()) -> gen_server:start_ret(). start_link(CallbackModule, Connection, Stream, Opts, Props) -> - quicer_stream:start_link(CallbackModule, Connection, Stream, Opts, Props). + start_link(CallbackModule, Connection, Stream, Opts, Props, []). + +-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start_link(CallbackModule, Connection, Stream, Opts, Props, StartOpts) -> + quicer_stream:start_link(CallbackModule, Connection, Stream, Opts, Props#{is_local => false}, StartOpts). --spec start(module(), connection_handle(), stream_handle(), map(), quicer:new_stream_props()) -> gen_server:start_ret(). -start(CallbackModule, Connection, Stream, Opts, Props) -> - quicer_stream:start(CallbackModule, Connection, Stream, Opts, Props). +-spec start(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret(). +start(CallbackModule, Connection, Stream, Opts, Props, StartOpts) -> + quicer_stream:start(CallbackModule, Connection, Stream, Opts, Props, StartOpts). diff --git a/src/quicer_server_conn_callback.erl b/src/quicer_server_conn_callback.erl index 33cfb0e6..ac7c0aab 100644 --- a/src/quicer_server_conn_callback.erl +++ b/src/quicer_server_conn_callback.erl @@ -40,8 +40,6 @@ -export([handle_info/2]). -init(ConnOpts) when is_list(ConnOpts) -> - init(maps:from_list(ConnOpts)); init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(ConnOpts) when is_map(ConnOpts) -> @@ -52,7 +50,7 @@ closed(_Conn, #{} = _Flags, S)-> new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) -> %% @TODO configurable behavior of spawning stream acceptor - case quicer_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of + case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of {ok, Pid} -> ok = quicer:async_handshake(Conn), {ok, S#{ conn => Conn @@ -75,8 +73,8 @@ nst_received(_Conn, _Data, S) -> new_stream(Stream, #{is_orphan := true} = StreamProps, #{conn := Conn, streams := Streams, stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn, - SOpts, StreamProps) + case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn, + SOpts, StreamProps) of {ok, StreamOwner} -> case quicer:handoff_stream(Stream, StreamOwner) of diff --git a/src/quicer_stream.erl b/src/quicer_stream.erl index cce5a309..40e335e5 100644 --- a/src/quicer_stream.erl +++ b/src/quicer_stream.erl @@ -89,12 +89,10 @@ -export([ %% Start before conn handshake, with only Conn handle start_link/3 , start_link/4 - , start/3 , start/4 %% Start after conn handshake with new Stream Handle , start_link/5 , start_link/6 - , start/5 , start/6 , send/2 , send/3 @@ -138,8 +136,6 @@ start_link(Callback, Conn, StreamOpts) -> start_link(Callback, Conn, StreamOpts, []). start_link(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) -> gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts). -start(Callback, Conn, StreamOpts) -> - start(Callback, Conn, StreamOpts, []). start(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) -> gen_server:start(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts). @@ -164,8 +160,6 @@ start_link(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts) andalso is_map(Props) -> gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts). -start(Callback, Stream, Conn, StreamOpts, Props) -> - start(Callback, Stream, Conn, StreamOpts, Props, []). start(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts) when Callback =/= undefined andalso is_atom(Callback) diff --git a/test/example_client_connection.erl b/test/example_client_connection.erl index 29e3b3d9..030b850f 100644 --- a/test/example_client_connection.erl +++ b/test/example_client_connection.erl @@ -54,13 +54,12 @@ init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(#{conn := Conn, stream_opts := SOpts} = ConnOpts) when is_map(ConnOpts) -> %% for accepting - {ok, Stream2} = quicer_stream:start_link(example_client_stream, Conn, SOpts#{is_local => false}), + {ok, Stream2} = quicer_remote_stream:start(example_client_stream, Conn, SOpts, [{spawn_opt, [link]}]), %% for sending unidi_streams - {ok, Stream1} = quicer_stream:start_link(example_client_stream, Conn, - SOpts#{ is_local => true - , open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL - }), - {ok,_} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN), + {ok, Stream1} = quicer_local_stream:start(example_client_stream, Conn, + SOpts#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}, [{spawn_opt, [link]}]), + + {ok, _} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN), {ok, ConnOpts#{master_stream_pair => {Stream1, Stream2}}}. closed(_Conn, #{is_peer_acked := true}, S)-> @@ -89,7 +88,7 @@ nst_received(_Conn, Data, S) -> new_stream(Stream, Flags, #{ conn := Conn, streams := Streams , stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of + case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of {ok, StreamOwner} -> quicer_connection:handoff_stream(Stream, StreamOwner), {ok, CBState#{ streams := [ {StreamOwner, Stream} | Streams] }}; @@ -122,9 +121,6 @@ peer_needs_streams(C, #{unidi_streams := Current}, S) -> {ok, S}; peer_needs_streams(C, #{bidi_streams := Current}, S) -> ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}), - {ok, S}; -%% for https://github.com/microsoft/msquic/issues/3120 -peer_needs_streams(_C, undefined, S) -> {ok, S}. handle_info({'EXIT', _Pid, _Reason}, State) -> diff --git a/test/example_server_connection.erl b/test/example_server_connection.erl index 439ab842..e140eb86 100644 --- a/test/example_server_connection.erl +++ b/test/example_server_connection.erl @@ -63,7 +63,7 @@ closed(_Conn, _CloseProp, S) -> {stop, normal, S}. new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) -> - case quicer_stream:start_link(example_server_stream, Conn, SOpts) of + case quicer_remote_stream:start_link(example_server_stream, Conn, SOpts) of {ok, Pid} -> ok = quicer:async_handshake(Conn), {ok, S#{ conn => Conn @@ -88,7 +88,7 @@ nst_received(_Conn, _Data, S) -> new_stream(Stream, Flags, #{ conn := Conn, streams := Streams , stream_opts := SOpts} = CBState) -> %% Spawn new stream - case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of + case quicer_remote_stream:start(example_server_stream, Stream, Conn, SOpts, Flags, [{spawn_opt, [link]}]) of {ok, StreamOwner} -> case quicer:handoff_stream(Stream, StreamOwner) of ok -> @@ -121,9 +121,6 @@ peer_needs_streams(C, unidi_streams, S) -> {ok, S}; peer_needs_streams(_C, bidi_streams, S) -> %% leave it for test case to unblock it, see tc_multi_streams_example_server_3 - {ok, S}; -%% for https://github.com/microsoft/msquic/issues/3120 -peer_needs_streams(_C, undefined, S) -> {ok, S}. datagram_state_changed(_C, _Flags, S) -> diff --git a/test/example_server_stream.erl b/test/example_server_stream.erl index ef168ffc..634e6d72 100644 --- a/test/example_server_stream.erl +++ b/test/example_server_stream.erl @@ -121,10 +121,8 @@ handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := Peer case PeerStream of undefined -> - {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn, - [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} - , {is_local, true} - ]), + {ok, StreamProc} = quicer_local_stream:start_link(?MODULE, Conn, + [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} ]), {ok, _} = quicer_stream:send(StreamProc, Bin), {ok, State#{peer_stream := StreamProc}}; StreamProc when is_pid(StreamProc) ->