Skip to content

Commit

Permalink
Merge pull request #217 from qzhuyan/dev/william/msquic2.2-evt-peer-n…
Browse files Browse the repository at this point in the history
…eeds-stream

msquic2.2 evt peer needs streams
  • Loading branch information
qzhuyan authored Sep 18, 2023
2 parents 8ebd86d + 17549c9 commit c7c75fe
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 28 deletions.
14 changes: 3 additions & 11 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1355,20 +1355,12 @@ handle_connection_event_peer_needs_streams(
assert(QUIC_CONNECTION_EVENT_PEER_NEEDS_STREAMS == Event->Type);
assert(c_ctx->Connection);
ErlNifEnv *env = c_ctx->env;
/* reserved for the future upgrade
ERL_NIF_TERM props_name[] = { Event->PEER_NEEDS_STREAMS.Bidirectional ?
ATOM_BIDI_STREAMS : ATOM_UNIDI_STREAMS }; ERL_NIF_TERM props_value[] = {
enif_make_uint64(env, Event->PEER_NEEDS_STREAMS.StreamLimit) }; ERL_NIF_TERM
report = make_event_with_props(env, ATOM_PEER_NEEDS_STREAMS,
enif_make_resource(env, c_ctx),
props_name,
props_value,
1);
*/
ERL_NIF_TERM report = make_event(env,
ATOM_PEER_NEEDS_STREAMS,
enif_make_resource(env, c_ctx),
ATOM_UNDEFINED);
Event->PEER_NEEDS_STREAMS.Bidirectional
? ATOM_BIDI_STREAMS
: ATOM_UNIDI_STREAMS);

enif_send(NULL, &(c_ctx->owner->Pid), NULL, report);
return QUIC_STATUS_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions c_src/quicer_eterms.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ extern ERL_NIF_TERM ATOM_NEW_STREAM;
/*----------------------------------------------------------*/
extern ERL_NIF_TERM ATOM_SNABBKAFFE_COLLECTOR;
extern ERL_NIF_TERM ATOM_TRACE;
extern ERL_NIF_TERM ATOM_TIME;
// Trace point Context, nif for callback
extern ERL_NIF_TERM ATOM_CONTEXT;
extern ERL_NIF_TERM ATOM_NIF;
Expand Down
2 changes: 2 additions & 0 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ ERL_NIF_TERM ATOM_NEW_STREAM;
/*----------------------------------------------------------*/
ERL_NIF_TERM ATOM_SNABBKAFFE_COLLECTOR;
ERL_NIF_TERM ATOM_TRACE;
ERL_NIF_TERM ATOM_TIME;
// Trace point Context, nif for callback
ERL_NIF_TERM ATOM_CONTEXT;
ERL_NIF_TERM ATOM_NIF;
Expand Down Expand Up @@ -715,6 +716,7 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
ATOM(ATOM_NEW_STREAM, new_stream); \
ATOM(ATOM_SNABBKAFFE_COLLECTOR, snabbkaffe_collector); \
ATOM(ATOM_TRACE, trace); \
ATOM(ATOM_TIME, time); \
ATOM(ATOM_CONTEXT, context); \
ATOM(ATOM_NIF, nif); \
ATOM(ATOM_CALLBACK, callback); \
Expand Down
16 changes: 15 additions & 1 deletion c_src/quicer_tp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define TRACEPOINT_DEFINE
#include "quicer_tp.h"

extern uint64_t CxPlatTimeUs64(void);

void
tp_snk(ErlNifEnv *env,
const char *ctx,
Expand All @@ -18,14 +20,26 @@ tp_snk(ErlNifEnv *env,
= { ATOM_SNK_KIND, ATOM_CONTEXT, ATOM_FUNCTION, ATOM_TAG,
ATOM_RESOURCE_ID, ATOM_MARK, ATOM_SNK_META };

ERL_NIF_TERM snk_evt_meta;
ERL_NIF_TERM snk_evt_meta_key_array[1] = { ATOM_TIME };
ERL_NIF_TERM snk_evt_meta_val_array[1]
= { enif_make_uint64(env, CxPlatTimeUs64()) };

// shall never fail
enif_make_map_from_arrays(env,
snk_evt_meta_key_array,
snk_evt_meta_val_array,
1,
&snk_evt_meta);

ERL_NIF_TERM snk_event_val_array[7] = {
ATOM_DEBUG, // snk_kind
enif_make_string(env, ctx, ERL_NIF_LATIN1), // context
enif_make_string(env, fun, ERL_NIF_LATIN1), // fun
enif_make_string(env, tag, ERL_NIF_LATIN1), // tag
enif_make_uint64(env, rid), // rid
enif_make_uint64(env, mark), // mark
enif_make_new_map(env) // snk_meta
snk_evt_meta // snk_meta
};

enif_make_map_from_arrays(
Expand Down
2 changes: 1 addition & 1 deletion docs/messages_to_owner.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ More streams are available due to flow control from the peer.

Peer wants to open more streams but cannot due to flow control
```erlang
{quic, peer_needs_streams, connection_handle(), undefined}
{quic, peer_needs_streams, connection_handle(), unidi_streams | bidi_streams}
```

### Ideal processor changed
Expand Down
2 changes: 1 addition & 1 deletion src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ handle_info({quic, peer_needs_streams, C, Needs},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams, needs => Needs}),
default_cb_ret(M:peer_needs_streams(C, Needs, CbState), State);

handle_info({quic, connection_resumed, C, ResumeData},
Expand Down
2 changes: 1 addition & 1 deletion src/quicer_server_conn_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
, peer_bidi_stream_count => BidirCnt}}.

%% @doc May integrate with App flow control
peer_needs_streams(_C, undefined, S) ->
peer_needs_streams(_C, _UnidiOrBidi, S) ->
{ok, S}.

connected(Conn, _Flags, #{ slow_start := false, stream_opts := SOpts
Expand Down
7 changes: 4 additions & 3 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ local_address_changed(_C, _NewAddr, S) ->
streams_available(_C, {_BidirCnt, _UnidirCnt}, S) ->
{ok, S}.

peer_needs_streams(C, #{unidi_streams := Current}, S) ->
peer_needs_streams(C, unidi_streams, S) ->
{ok, Current} = quicer:getopt(C, param_conn_local_unidi_stream_count),
ok = quicer:setopt(C, param_conn_settings, #{peer_unidi_stream_count => Current + 1}),
{ok, S};
peer_needs_streams(C, #{bidi_streams := Current}, S) ->
ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}),
peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
Expand Down
14 changes: 4 additions & 10 deletions test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1996,20 +1996,14 @@ tc_multi_streams_example_server_1(Config) ->
true = quicer:is_unidirectional(Flag),
Incoming
after 1000 ->
%%ct:fail("no incoming stream")
%% reenable the check when it is fixed.
%% https://github.com/microsoft/msquic/issues/3120
ok
ct:fail("no incoming stream")
end,
receive
{quic, Data, Stm3In, DFlag} ->
ct:pal("~p is received from ~p with flag: ~p", [Data, Stm3In, DFlag]),
?assertEqual(Data, <<"ping3">>)
after 1000 ->
%% ct:fail("no incoming data")
%% reenable the check when it is fixed.
%% https://github.com/microsoft/msquic/issues/3120
ok
ct:fail("no incoming data")
end,
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
receive
Expand All @@ -2020,7 +2014,7 @@ tc_multi_streams_example_server_1(Config) ->
end,
fun(_Result, Trace) ->
ct:pal("Trace is ~p", [Trace]),
?assertMatch([{pair, _, _}],
?assertMatch([{pair, _, _}, {pair, _, _}],
?find_pairs(
#{ ?snk_kind := debug
, event := handoff_stream
Expand All @@ -2033,7 +2027,7 @@ tc_multi_streams_example_server_1(Config) ->
, stream := _STREAM0
},
Trace)),
?assertMatch([{pair, _, _}],
?assertMatch([{pair, _, _}, {pair, _, _}],
?find_pairs( #{ ?snk_kind := debug
, event := handoff_stream
, module := quicer
Expand Down

0 comments on commit c7c75fe

Please sign in to comment.