Skip to content

Commit

Permalink
Merge pull request #328 from qzhuyan/dev/william/dyn-acceptors-2
Browse files Browse the repository at this point in the history
- Dynamic config listener acceptors
- probe conn state with zero len datagram
  • Loading branch information
qzhuyan authored Dec 12, 2024
2 parents a9687f4 + 93cc21c commit 1f933bc
Show file tree
Hide file tree
Showing 26 changed files with 590 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ proper:
.PHONY: proper-cover
proper-cover:
mkdir -p coverage
QUICER_TEST_COVER=1 $(REBAR) as test proper -c -n 1000
QUICER_TEST_COVER=1 $(REBAR) as test proper -c -n 1000 --noshrink
lcov -c --directory c_build/CMakeFiles/quicer_nif.dir/c_src/ \
--exclude "${PWD}/msquic/src/inc/*" \
--output-file ./coverage/proper-lcov.info
Expand Down
88 changes: 44 additions & 44 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,18 +899,13 @@ async_accept2(ErlNifEnv *env,
const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM listener = argv[0];
ERL_NIF_TERM conn_opts = argv[1];
// @NOTE: since 0.2, we ignore argv[1]
QuicerListenerCTX *l_ctx = NULL;
ERL_NIF_TERM active_val = ATOM_TRUE;
if (!enif_get_resource(env, listener, ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

// Set parm active is optional
enif_get_map_value(
env, conn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

ACCEPTOR *acceptor = AcceptorAlloc();
if (!acceptor)
{
Expand All @@ -923,24 +918,11 @@ async_accept2(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_BAD_PID);
}

if (!set_owner_recv_mode(acceptor, env, active_val))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!create_settings(env, &conn_opts, &acceptor->Settings))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_PARAM_ERROR);
}

AcceptorEnqueue(l_ctx->acceptor_queue, acceptor);

assert(enif_is_process_alive(env, &(acceptor->Pid)));

ERL_NIF_TERM listenHandle = enif_make_resource(env, l_ctx);
return SUCCESS(listenHandle);
return SUCCESS(listener);
}

ERL_NIF_TERM
Expand Down Expand Up @@ -1084,40 +1066,28 @@ continue_connection_handshake(QuicerConnCTX *c_ctx)
{
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;

if (!c_ctx)
{
return QUIC_STATUS_INTERNAL_ERROR;
}

if (!c_ctx->Connection)
{
return QUIC_STATUS_INVALID_STATE;
}

if (QUIC_FAILED(Status = MsQuic->ConnectionSetConfiguration(
c_ctx->Connection, c_ctx->config_ctx->Configuration)))
{
return Status;
}
CXPLAT_FRE_ASSERT(c_ctx);
CXPLAT_FRE_ASSERT(c_ctx->Connection);

// Apply connection owners' option overrides
Status = MsQuic->SetParam(c_ctx->Connection,
QUIC_PARAM_CONN_SETTINGS,
sizeof(QUIC_SETTINGS),
&c_ctx->owner->Settings);
Status = MsQuic->ConnectionSetConfiguration(
c_ctx->Connection, c_ctx->config_ctx->Configuration);
return Status;
}

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
async_handshake_X(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])

{
QuicerConnCTX *c_ctx;
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
ERL_NIF_TERM res = ATOM_OK;
CXPLAT_FRE_ASSERT(argc == 1);
CXPLAT_FRE_ASSERT(argc == 1 || argc == 2);
ERL_NIF_TERM econn = argv[0];

if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx))
QUIC_SETTINGS Settings = { 0 };
ERL_NIF_TERM active_val = ATOM_TRUE;

if (!enif_get_resource(env, econn, ctx_connection_t, (void **)&c_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
Expand All @@ -1129,11 +1099,41 @@ async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return ERROR_TUPLE_2(ATOM_CLOSED);
}

if (argc > 1)
{
ERL_NIF_TERM econn_opts = argv[1];
// Set parm active is optional
enif_get_map_value(
env, econn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

if (!create_settings(env, &econn_opts, &Settings))
{
res = ERROR_TUPLE_2(ATOM_PARAM_ERROR);
goto exit;
}

if (!set_owner_recv_mode(c_ctx->owner, env, active_val))
{
res = ERROR_TUPLE_2(ATOM_BADARG);
goto exit;
}

// Apply connection owners' option overrides
if (QUIC_FAILED(Status = MsQuic->SetParam(c_ctx->Connection,
QUIC_PARAM_CONN_SETTINGS,
sizeof(QUIC_SETTINGS),
&Settings)))
{
res = ERROR_TUPLE_2(ATOM_STATUS(Status));
goto exit;
}
}

if (QUIC_FAILED(Status = continue_connection_handshake(c_ctx)))
{
res = ERROR_TUPLE_2(ATOM_STATUS(Status));
}

exit:
put_conn_handle(c_ctx);
return res;
}
Expand Down
2 changes: 1 addition & 1 deletion c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ERL_NIF_TERM
get_conn_rid1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
async_handshake_X(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

QUIC_STATUS continue_connection_handshake(QuicerConnCTX *c_ctx);

Expand Down
3 changes: 2 additions & 1 deletion c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,8 @@ static ErlNifFunc nif_funcs[] = {
{ "open_connection", 1, open_connectionX, 0},
{ "async_connect", 3, async_connect3, 0},
{ "async_accept", 2, async_accept2, 0},
{ "async_handshake", 1, async_handshake_1, 0},
{ "async_handshake", 1, async_handshake_X, 0},
{ "async_handshake", 2, async_handshake_X, 0},
{ "async_shutdown_connection", 3, shutdown_connection3, 0},
{ "async_accept_stream", 2, async_accept_stream2, 0},
{ "start_stream", 2, async_start_stream2, 0},
Expand Down
3 changes: 0 additions & 3 deletions c_src/quicer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ typedef struct ACCEPTOR
ErlNifPid Pid;
ACCEPTOR_RECV_MODE active;
uint16_t active_count; /* counter for active_n */
QUIC_SETTINGS Settings;
void *reserved1;
void *reserved2;
void *reserved3;
} ACCEPTOR;

typedef struct AcceptorsQueue
Expand Down
7 changes: 7 additions & 0 deletions include/quicer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,11 @@
-define(QUIC_CONGESTION_CONTROL_ALGORITHM_CUBIC, 0).
-define(QUIC_CONGESTION_CONTROL_ALGORITHM_BBR, 1).

-record(probe_state, {
final :: term() | undefined,
sent_at :: integer() | undefined,
suspect_lost_at :: integer() | undefined,
final_at :: integer() | undefined
}).

-endif. %% QUICER_HRL
6 changes: 6 additions & 0 deletions include/quicer_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -506,5 +506,11 @@
dgram_max_len := uint64()
}.

-type probe_state() :: #probe_state{}.
-type probe_res() ::
#probe_state{}
| {error, dgram_send_error, atom()}
| {error, atom()}.

%% QUICER_TYPES_HRL
-endif.
89 changes: 68 additions & 21 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
async_connect/3,
handshake/1,
handshake/2,
handshake/3,
async_handshake/1,
async_handshake/2,
accept/2,
accept/3,
async_accept/2,
Expand All @@ -59,6 +61,7 @@
close_connection/4,
async_close_connection/1,
async_close_connection/3,
probe/2,
accept_stream/2,
accept_stream/3,
async_accept_stream/2,
Expand All @@ -69,6 +72,7 @@
async_send/2,
async_send/3,
recv/2,
async_send_dgram/2,
send_dgram/2,
shutdown_stream/1,
shutdown_stream/2,
Expand Down Expand Up @@ -174,7 +178,10 @@
quicer_addr/0,

%% Registraion Profiles
registration_profile/0
registration_profile/0,

%% probes
probe_res/0
]).

-type connection_opts() :: proplists:proplist() | conn_opts().
Expand Down Expand Up @@ -447,14 +454,29 @@ async_connect(Host, Port, Opts) when is_map(Opts) ->
handshake(Conn) ->
handshake(Conn, 5000).

-spec handshake(connection_handle(), timeout()) ->
{ok, connection_handle()} | {error, any()}.
handshake(Conn, Timeout) ->
case async_handshake(Conn) of
{error, _} = E ->
E;
ok ->
receive
{quic, connected, Conn, _} -> {ok, Conn};
{quic, closed, Conn, _Flags} -> {error, closed}
after Timeout ->
{error, timeout}
end
end.

%% @doc Complete TLS handshake after accepted a Connection
%% @see handshake/2
%% @see async_handshake/1
-spec handshake(connection_handle(), timeout()) ->
-spec handshake(connection_handle(), conn_opts(), timeout()) ->
{ok, connection_handle()}
| {error, any()}.
handshake(Conn, Timeout) ->
case async_handshake(Conn) of
handshake(Conn, ConnOpts, Timeout) ->
case async_handshake(Conn, ConnOpts) of
{error, _} = E ->
E;
ok ->
Expand All @@ -467,13 +489,24 @@ handshake(Conn, Timeout) ->
end.

%% @doc Complete TLS handshake after accepted a Connection.
%% Caller should expect to receive ```{quic, connected, connection_handle()}'''
%%
%% @see handshake/2
%% @see async_handshake/2
-spec async_handshake(connection_handle()) -> ok | {error, any()}.
async_handshake(Conn) ->
quicer_nif:async_handshake(Conn).

%% @doc Complete TLS handshake after accepted a Connection.
%% also set connection options which override the default listener options.
%%
%% @see handshake/2
%% @see async_handshake/1
-spec async_handshake(connection_handle(), conn_opts()) -> ok | {error, any()}.
async_handshake(Conn, ConnOpts) when is_list(ConnOpts) ->
async_handshake(Conn, maps:from_list(ConnOpts));
async_handshake(Conn, ConnOpts) ->
quicer_nif:async_handshake(Conn, ConnOpts).

%% @doc Accept new Connection (Server)
%%
%% Accept new connection from listener_handle().
Expand Down Expand Up @@ -816,35 +849,49 @@ do_recv(Stream, Count, Buff) ->
E
end.

%% @doc Sending Unreliable Datagram.
%% Caller should handle the async signals for the send results
%%
%% ref: [https://datatracker.ietf.org/doc/html/rfc9221]
%% @see send/2 send_dgram/2
-spec async_send_dgram(connection_handle(), binary()) ->
{ok, non_neg_integer()}
| {error, badarg | not_enough_mem | invalid_parameter | closed}
| {error, dgram_send_error, atom_reason()}.
async_send_dgram(Conn, Data) ->
quicer_nif:send_dgram(Conn, Data, _IsSyncRel = 1).

%% @doc Sending Unreliable Datagram
%% return error only if sending could not be scheduled such as
%% not_enough_mem, connection is already closed or wrong args.
%% otherwise, it is fire and forget.
%%
%% ref: [https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram]
%% @see send/2
%% %% ref: [https://datatracker.ietf.org/doc/html/rfc9221]
%% @see send/2, async_send_dgram/2
-spec send_dgram(connection_handle(), binary()) ->
{ok, BytesSent :: pos_integer()}
| {error, badarg | not_enough_mem | closed}
{ok, BytesSent :: non_neg_integer()}
| {error, badarg | not_enough_mem | invalid_parameter | closed}
| {error, dgram_send_error, atom_reason()}.
send_dgram(Conn, Data) ->
case quicer_nif:send_dgram(Conn, Data, _IsSync = 1) of
%% @todo we need find tuned event mask
{ok, _Len} = OK ->
receive
{quic, dgram_send_state, Conn, #{state := ?QUIC_DATAGRAM_SEND_SENT}} ->
receive
{quic, dgram_send_state, Conn, #{state := ?QUIC_DATAGRAM_SEND_ACKNOWLEDGED}} ->
OK;
{quic, dgram_send_state, Conn, #{state := Other}} ->
{error, dgram_send_error, Other}
end;
{quic, dgram_send_state, Conn, #{state := ?QUIC_DATAGRAM_SEND_ACKNOWLEDGED}} ->
case quicer_lib:handle_dgram_send_states(Conn) of
ok ->
OK;
{quic, dgram_send_state, Conn, #{state := Other}} ->
{error, dgram_send_error, Other}
{error, E} ->
{error, dgram_send_error, E}
end;
{error, E} ->
{error, E};
E ->
E
end.

%% @doc Probe conn state with 0 len dgram.
-spec probe(connection_handle(), timeout()) -> probe_res().
probe(Conn, Timeout) ->
quicer_lib:probe(Conn, Timeout).

%% @doc Shutdown stream gracefully, with infinity timeout
%%
%% @see shutdown_stream/1
Expand Down
6 changes: 2 additions & 4 deletions src/quicer_conn_acceptor_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ start_link(ListenerH, ConnOpts) ->
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}}
| ignore.
init([ListenerH, Opts]) ->
init([ListenerH, OptsTab]) ->
SupFlags = #{
strategy => simple_one_for_one,
intensity => 1,
period => 5
},

OneChild = #{
id => ignored,
start => {quicer_connection, start_link, [undefined, ListenerH, Opts]},
start => {quicer_connection, start_acceptor, [ListenerH, OptsTab]},
restart => temporary,
shutdown => 5000,
type => worker
},

{ok, {SupFlags, [OneChild]}}.

%%%===================================================================
Expand Down
Loading

0 comments on commit 1f933bc

Please sign in to comment.