Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dgram recv callback #247

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions c_src/quicer_eterms.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ extern ERL_NIF_TERM ATOM_CACERTFILE;
/* msquic execution profile for registration */
/*-------------------------------------------------------*/
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY; // Default
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER;
extern ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME;

/*-----------------------------------------*/
/* msquic params starts */
Expand Down
12 changes: 6 additions & 6 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ ERL_NIF_TERM ATOM_CACERTFILE;
/* msquic execution profile for registration */
/*-------------------------------------------------------*/
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY; // Default
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER;
ERL_NIF_TERM ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME;

/*-----------------------------------------*/
/* msquic params starts */
Expand Down Expand Up @@ -512,11 +512,11 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
/*-------------------------------------------------------*/ \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_LOW_LATENCY, \
quic_execution_profile_low_latency); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT, \
quic_execution_profile_max_throughput); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER, \
quic_execution_profile_scavenger); \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME, \
ATOM(ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME, \
quic_execution_profile_real_time); \
/*-----------------------------------------*/ \
/* msquic params starts */ \
Expand Down
7 changes: 3 additions & 4 deletions c_src/quicer_reg.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,15 @@ parse_reg_conf(ERL_NIF_TERM eprofile, QUIC_REGISTRATION_CONFIG *RegConfig)
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_LOW_LATENCY;
}
else if (IS_SAME_TERM(eprofile,
ATOM_QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_MAX_THROUGHPUT))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT;
}
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_SCAVENGER))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER;
}
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME))
else if (IS_SAME_TERM(eprofile, ATOM_QUIC_EXECUTION_PROFILE_REAL_TIME))
{
RegConfig->ExecutionProfile = QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME;
}
Expand Down
4 changes: 3 additions & 1 deletion docs/messages_to_owner.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ and the stream active mode is set to false (passive mode).

More streams are available due to flow control from the peer.

If you don't want this event, set 'QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE'

`Available = Max - Used`

```erlang
Expand Down Expand Up @@ -315,7 +317,7 @@ Peer wants to open more streams but cannot due to flow control
with connection handle and integer flag

```erlang
{quic, binary(), connection_handle(), flag :: non_neg_integer()}
{quic, binary(), connection_handle(), Flags :: non_neg_integer()}
```

### DATAGRAM send completed, success or fail.
Expand Down
16 changes: 8 additions & 8 deletions include/quicer_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@
conf_handle() |
reg_handle().

-type registration_profile() :: quic_execution_profile_low_latency |
quic_execution_profile_max_throughput |
quic_execution_profile_scavenger |
quic_execution_profile_realtime.

-type registration_profile() :: execution_profile().
-type quic_handle_level() :: quic_tls | quic_configuration | false.

-type listen_on() :: inet:port_number() | string().
Expand Down Expand Up @@ -304,9 +300,9 @@

-type execution_profile() ::
quic_execution_profile_low_latency |
quic_execution_profile_type_max_throughput |
quic_execution_profile_type_scavenger |
quic_execution_profile_type_realtime.
quic_execution_profile_max_throughput |
quic_execution_profile_scavenger |
quic_execution_profile_real_time.

%% Connection Event Props
-type new_conn_props() :: #{ version := integer()
Expand Down Expand Up @@ -388,4 +384,8 @@
| ?QUIC_DATAGRAM_SEND_ACKNOWLEDGED_SPURIOUS %% Acknowledged after being suspected lost
| ?QUIC_DATAGRAM_SEND_CANCELED. %% Send cancelled

-type dgram_state() :: #{ dgram_send_enabled := boolean()
, dgram_max_len := uint64()
}.

-endif. %% QUICER_TYPES_HRL
15 changes: 14 additions & 1 deletion src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
-callback nst_received(connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret().
%% Client only, New session ticket received,

-callback dgram_state_changed(connection_handle(), dgram_state(), cb_state()) -> cb_ret().
%% Handle Datagram State Changed event.

-callback dgram_recv(connection_handle(), DataBin :: binary(), Flags :: non_neg_integer(), cb_state()) -> cb_ret().
%% Handle Unreliable Datagram of RFC 9221.

-callback handle_call(Req::term(), From::gen_server:from(), cb_state()) -> cb_ret().

-callback handle_info(Info::term(), cb_state()) -> cb_ret().
Expand All @@ -120,6 +126,8 @@
, handle_continue/2
, peer_needs_streams/3 %% require newer MsQuic
, nst_received/3 %% client only
, dgram_state_changed/3 %% because dgram could be off
, dgram_recv/4 %% because dgram could be off
]).
%% Handle API call with callback state.

Expand Down Expand Up @@ -412,7 +420,12 @@ handle_info({quic, nst_received, C, TicketBin},
handle_info({quic, dgram_state_changed, C, Flags},
#{callback := M, callback_state := CBState} = State) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_state_changed, flags => Flags}),
default_cb_ret(M:datagram_state_changed(C, Flags, CBState), State);
default_cb_ret(M:dgram_state_changed(C, Flags, CBState), State);

handle_info({quic, Bin, C, Flags},
#{conn := C, callback := M, callback_state := CBState} = State) when is_binary(Bin) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_recv, flags => Flags}),
default_cb_ret(M:dgram_recv(C, Bin, Flags, CBState), State);

handle_info(OtherInfo, #{callback := M,
callback_state := CBState} = State) ->
Expand Down
4 changes: 2 additions & 2 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
, resumed/3
, nst_received/3
, new_stream/3
, datagram_state_changed/3
, dgram_state_changed/3
]).

-export([handle_info/2]).
Expand Down Expand Up @@ -96,7 +96,7 @@ new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
Other
end.

datagram_state_changed(_Conn, _Flags, S) ->
dgram_state_changed(_Conn, _Flags, S) ->
?tp(debug, #{module => ?MODULE, conn => _Conn, flags => state, event => dgram_state_changed}),
{ok, S}.

Expand Down
14 changes: 12 additions & 2 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
, resumed/3
, nst_received/3
, new_stream/3
, datagram_state_changed/3
, dgram_state_changed/3
, dgram_recv/4
]).

-export([handle_info/2]).
Expand Down Expand Up @@ -123,7 +124,16 @@ peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S}.

datagram_state_changed(_C, _Flags, S) ->
dgram_state_changed(_C, _Flags, S) ->
{ok, S}.

dgram_recv(C, Bin, _Flag, S) ->
%% maybe peer didn't enable,
case quicer:send_dgram(C, Bin) of
{ok, _} -> ok;
Error -> %% for testing when peer disable the receiving
ct:pal("send dgram error: ~p~n", [Error])
end,
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
Expand Down
80 changes: 80 additions & 0 deletions test/quicer_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,86 @@ tc_conn_client_bad_cert(Config) ->
ct:fail({run_error, Error})
end.

tc_datagram_disallowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
ListenerOpts = [{conn_acceptors, 4} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will format the code with erlfmt in the next PR.

StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
%% GIVEN: A listener with datagram_receive_enabled = false
Comment on lines +474 to +476
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: should disable_fpbuffer = true above be datagram_receive_enabled = false?
Or maybe explicitly set it in ListenerOpts?

Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: Client send dgram data
{ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000),
%% THEN: It get an error
?assertEqual({error, dgram_send_error, invalid_state}, quicer:send_dgram(Conn, <<"dg_ping">>)),
quicer:shutdown_connection(Conn),
ok.

tc_datagram_peer_allowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
%% GIVEN: A listener with datagram_receive_enabled = 1 (true)
ListenerOpts = [{conn_acceptors, 4}, {datagram_receive_enabled, 1} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: A client send_dgram
{ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000),
%% THEN: It should success
?assertEqual({ok, 7}, quicer:send_dgram(Conn, <<"dg_ping">>)),

receive
%% THEN: the client should not recv dgram from peer as the receiving is disabled
{quic, Data, _Conn, _Flag} when is_binary(Data) ->
ct:fail("client side dgram recv timeout")
after 500 ->
ok
end,
quicer:shutdown_connection(Conn),
ok.

tc_datagram_local_peer_allowed(Config) ->
Port = select_port(),
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
%% GIVEN: A listener with datagram_receive_enabled = 1 (true)
ListenerOpts = [{conn_acceptors, 4}, {datagram_receive_enabled, 1} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
, {disable_fpbuffer, true}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},

{ok, _} = quicer:spawn_listener(mqtt, Port, Options),
%% WHEN: Client connect with datagram_receive_enabled = 1 (true)
{ok, Conn} = quicer:connect("localhost", Port, [{datagram_receive_enabled, 1} | default_conn_opts()], 5000),
?assertEqual({ok, 7}, quicer:send_dgram(Conn, <<"dg_ping">>)),
receive
%% THEN: the client is able to receive the dgram from server
{quic, <<"dg_ping">>, Conn, Flag} ->
?assertEqual(0, Flag)
after 1000 ->
ct:fail("client side dgram recv timeout")
end,
quicer:shutdown_connection(Conn),
ok.

run_tc_conn_client_bad_cert(Config)->
Port = select_port(),
Owner = self(),
Expand Down
3 changes: 2 additions & 1 deletion test/quicer_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ reset_global_reg()->
quicer:reg_open().

shutdown_all_listeners() ->
lists:foreach(fun quicer:shutdown_listener/1,
lists:foreach(fun({{Id, _ListenOn}, _Pid}) ->
quicer:terminate_listener(Id) end,
quicer:listeners()).

%%%_* Emacs ====================================================================
Expand Down