Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new acceptor reads new listener config #327

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ proper:
.PHONY: proper-cover
proper-cover:
mkdir -p coverage
QUICER_TEST_COVER=1 $(REBAR) as test proper -c -n 1000
QUICER_TEST_COVER=1 $(REBAR) as test proper -c -n 1000 --noshrink
lcov -c --directory c_build/CMakeFiles/quicer_nif.dir/c_src/ \
--exclude "${PWD}/msquic/src/inc/*" \
--output-file ./coverage/proper-lcov.info
Expand Down
88 changes: 44 additions & 44 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,18 +899,13 @@ async_accept2(ErlNifEnv *env,
const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM listener = argv[0];
ERL_NIF_TERM conn_opts = argv[1];
// @NOTE: since 0.2, we ignore argv[1]
QuicerListenerCTX *l_ctx = NULL;
ERL_NIF_TERM active_val = ATOM_TRUE;
if (!enif_get_resource(env, listener, ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

// Set parm active is optional
enif_get_map_value(
env, conn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

ACCEPTOR *acceptor = AcceptorAlloc();
if (!acceptor)
{
Expand All @@ -923,24 +918,11 @@ async_accept2(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_BAD_PID);
}

if (!set_owner_recv_mode(acceptor, env, active_val))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!create_settings(env, &conn_opts, &acceptor->Settings))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_PARAM_ERROR);
}

AcceptorEnqueue(l_ctx->acceptor_queue, acceptor);

assert(enif_is_process_alive(env, &(acceptor->Pid)));

ERL_NIF_TERM listenHandle = enif_make_resource(env, l_ctx);
return SUCCESS(listenHandle);
return SUCCESS(listener);
}

ERL_NIF_TERM
Expand Down Expand Up @@ -1084,40 +1066,28 @@ continue_connection_handshake(QuicerConnCTX *c_ctx)
{
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;

if (!c_ctx)
{
return QUIC_STATUS_INTERNAL_ERROR;
}

if (!c_ctx->Connection)
{
return QUIC_STATUS_INVALID_STATE;
}

if (QUIC_FAILED(Status = MsQuic->ConnectionSetConfiguration(
c_ctx->Connection, c_ctx->config_ctx->Configuration)))
{
return Status;
}
CXPLAT_FRE_ASSERT(c_ctx);
CXPLAT_FRE_ASSERT(c_ctx->Connection);

// Apply connection owners' option overrides
Status = MsQuic->SetParam(c_ctx->Connection,
QUIC_PARAM_CONN_SETTINGS,
sizeof(QUIC_SETTINGS),
&c_ctx->owner->Settings);
Status = MsQuic->ConnectionSetConfiguration(
c_ctx->Connection, c_ctx->config_ctx->Configuration);
return Status;
}

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
async_handshake_X(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])

{
QuicerConnCTX *c_ctx;
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
ERL_NIF_TERM res = ATOM_OK;
CXPLAT_FRE_ASSERT(argc == 1);
CXPLAT_FRE_ASSERT(argc == 1 || argc == 2);
ERL_NIF_TERM econn = argv[0];

if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx))
QUIC_SETTINGS Settings = { 0 };
ERL_NIF_TERM active_val = ATOM_TRUE;

if (!enif_get_resource(env, econn, ctx_connection_t, (void **)&c_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
Expand All @@ -1129,11 +1099,41 @@ async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return ERROR_TUPLE_2(ATOM_CLOSED);
}

if (argc > 1)
{
ERL_NIF_TERM econn_opts = argv[1];
// Set parm active is optional
enif_get_map_value(
env, econn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

if (!create_settings(env, &econn_opts, &Settings))
{
res = ERROR_TUPLE_2(ATOM_PARAM_ERROR);
goto exit;
}

if (!set_owner_recv_mode(c_ctx->owner, env, active_val))
{
res = ERROR_TUPLE_2(ATOM_BADARG);
goto exit;
}

// Apply connection owners' option overrides
if (QUIC_FAILED(Status = MsQuic->SetParam(c_ctx->Connection,
QUIC_PARAM_CONN_SETTINGS,
sizeof(QUIC_SETTINGS),
&Settings)))
{
res = ERROR_TUPLE_2(ATOM_STATUS(Status));
goto exit;
}
}

if (QUIC_FAILED(Status = continue_connection_handshake(c_ctx)))
{
res = ERROR_TUPLE_2(ATOM_STATUS(Status));
}

exit:
put_conn_handle(c_ctx);
return res;
}
Expand Down
2 changes: 1 addition & 1 deletion c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ERL_NIF_TERM
get_conn_rid1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
async_handshake_X(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

QUIC_STATUS continue_connection_handshake(QuicerConnCTX *c_ctx);

Expand Down
3 changes: 2 additions & 1 deletion c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,8 @@ static ErlNifFunc nif_funcs[] = {
{ "open_connection", 1, open_connectionX, 0},
{ "async_connect", 3, async_connect3, 0},
{ "async_accept", 2, async_accept2, 0},
{ "async_handshake", 1, async_handshake_1, 0},
{ "async_handshake", 1, async_handshake_X, 0},
{ "async_handshake", 2, async_handshake_X, 0},
{ "async_shutdown_connection", 3, shutdown_connection3, 0},
{ "async_accept_stream", 2, async_accept_stream2, 0},
{ "start_stream", 2, async_start_stream2, 0},
Expand Down
3 changes: 0 additions & 3 deletions c_src/quicer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ typedef struct ACCEPTOR
ErlNifPid Pid;
ACCEPTOR_RECV_MODE active;
uint16_t active_count; /* counter for active_n */
QUIC_SETTINGS Settings;
void *reserved1;
void *reserved2;
void *reserved3;
} ACCEPTOR;

typedef struct AcceptorsQueue
Expand Down
36 changes: 32 additions & 4 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
async_connect/3,
handshake/1,
handshake/2,
handshake/3,
async_handshake/1,
async_handshake/2,
accept/2,
accept/3,
async_accept/2,
Expand Down Expand Up @@ -447,14 +449,29 @@ async_connect(Host, Port, Opts) when is_map(Opts) ->
handshake(Conn) ->
handshake(Conn, 5000).

-spec handshake(connection_handle(), timeout()) ->
{ok, connection_handle()} | {error, any()}.
handshake(Conn, Timeout) ->
case async_handshake(Conn) of
{error, _} = E ->
E;
ok ->
receive
{quic, connected, Conn, _} -> {ok, Conn};
{quic, closed, Conn, _Flags} -> {error, closed}
after Timeout ->
{error, timeout}
end
end.

%% @doc Complete TLS handshake after accepted a Connection
%% @see handshake/2
%% @see async_handshake/1
-spec handshake(connection_handle(), timeout()) ->
-spec handshake(connection_handle(), conn_opts(), timeout()) ->
{ok, connection_handle()}
| {error, any()}.
handshake(Conn, Timeout) ->
case async_handshake(Conn) of
handshake(Conn, ConnOpts, Timeout) ->
case async_handshake(Conn, ConnOpts) of
{error, _} = E ->
E;
ok ->
Expand All @@ -467,13 +484,24 @@ handshake(Conn, Timeout) ->
end.

%% @doc Complete TLS handshake after accepted a Connection.
%% Caller should expect to receive ```{quic, connected, connection_handle()}'''
%%
%% @see handshake/2
%% @see async_handshake/2
-spec async_handshake(connection_handle()) -> ok | {error, any()}.
async_handshake(Conn) ->
quicer_nif:async_handshake(Conn).

%% @doc Complete TLS handshake after accepted a Connection.
%% also set connection options which override the default listener options.
%%
%% @see handshake/2
%% @see async_handshake/1
-spec async_handshake(connection_handle(), conn_opts()) -> ok | {error, any()}.
async_handshake(Conn, ConnOpts) when is_list(ConnOpts) ->
async_handshake(Conn, maps:from_list(ConnOpts));
async_handshake(Conn, ConnOpts) ->
quicer_nif:async_handshake(Conn, ConnOpts).

%% @doc Accept new Connection (Server)
%%
%% Accept new connection from listener_handle().
Expand Down
6 changes: 2 additions & 4 deletions src/quicer_conn_acceptor_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ start_link(ListenerH, ConnOpts) ->
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(), [ChildSpec :: supervisor:child_spec()]}}
| ignore.
init([ListenerH, Opts]) ->
init([ListenerH, OptsTab]) ->
SupFlags = #{
strategy => simple_one_for_one,
intensity => 1,
period => 5
},

OneChild = #{
id => ignored,
start => {quicer_connection, start_link, [undefined, ListenerH, Opts]},
start => {quicer_connection, start_acceptor, [ListenerH, OptsTab]},
restart => temporary,
shutdown => 5000,
type => worker
},

{ok, {SupFlags, [OneChild]}}.

%%%===================================================================
Expand Down
56 changes: 51 additions & 5 deletions src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
-export([
start_link/3,
%% for server
start_acceptor/3,
start_link/4,
get_cb_state/1,
merge_cb_state/2,
Expand Down Expand Up @@ -216,6 +217,14 @@ start_link(undefined, Listener, {_LOpts, COpts, _SOpts} = Opts, Sup) when is_map
start_link(CallbackModule, Listener, Opts, Sup) ->
gen_server:start_link(?MODULE, [CallbackModule, Listener, Opts, Sup], []).

-define(DEFAULT_ACCEPTOR_START_OPTS, [{spawn_opt, [link]}]).
%% @doc start acceptor with shared Listener confs
-spec start_acceptor(listener_handle(), ets:tid(), pid()) -> {ok, pid()} | {error, any()}.
start_acceptor(ListenHandle, Tab, Sup) when is_pid(Sup) ->
[{c_opts, #{conn_callback := CallbackModule} = Conf}] = ets:lookup(Tab, c_opts),
StartOpts = maps:get(proc_start_opts, Conf, ?DEFAULT_ACCEPTOR_START_OPTS),
gen_server:start_link(?MODULE, [CallbackModule, ListenHandle, Tab, Sup], StartOpts).

-spec get_cb_state(ConnPid :: pid()) -> cb_state() | {error, any()}.
get_cb_state(ConnPid) ->
gen_server:call(ConnPid, get_cb_state, infinity).
Expand Down Expand Up @@ -277,7 +286,8 @@ init([CallbackModule, {Host, Port}, {COpts, SOpts}]) when
callback => CallbackModule,
conn_opts => COpts,
stream_opts => SOpts,
sup => undefined
sup => undefined,
is_server => false
},
{ok, Conn} = quicer:async_connect(Host, Port, COpts),
State1 = State0#{conn := Conn},
Expand All @@ -290,7 +300,7 @@ init([CallbackModule, {Host, Port}, {COpts, SOpts}]) when
Other ->
Other
end;
%% For Server
%% For Server, deprecating since v0.2
init([CallbackModule, Listener, {LOpts, COpts, SOpts}, Sup]) when is_list(COpts) ->
init([CallbackModule, Listener, {LOpts, maps:from_list(COpts), SOpts}, Sup]);
init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModule =/= undefined ->
Expand All @@ -301,7 +311,8 @@ init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModul
callback => CallbackModule,
conn_opts => COpts,
stream_opts => SOpts,
sup => Sup
sup => Sup,
is_server => true
},
%% Async Acceptor
{ok, Listener} = quicer_nif:async_accept(Listener, COpts),
Expand All @@ -313,7 +324,27 @@ init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModul
%% ignore, {stop, Reason} ...
Other ->
Other
end.
end;
%% For Acceptor since v0.2
init([_, _, _ConfTab, _] = Args) ->
acceptor_init(Args).
acceptor_init([CallbackModule, Listener, ConfTab, Sup]) ->
process_flag(trap_exit, true),
State0 = #{
listener => Listener,
callback => CallbackModule,
conf_tab => ConfTab,
sup => Sup,
is_server => true,
conn => undefined,
conn_opts => undefined,
stream_opts => undefined,
callback_state => undefined
},
%% Async Acceptor
NOOP = #{},
{ok, Listener} = quicer_nif:async_accept(Listener, NOOP),
{ok, State0}.

%%--------------------------------------------------------------------
%% @private
Expand Down Expand Up @@ -387,16 +418,31 @@ handle_cast(_Request, State) ->
| {noreply, NewState :: term(), Timeout :: timeout()}
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: normal | term(), NewState :: term()}.
%% deprecating this clause
handle_info(
{quic, new_conn, C, Props},
#{callback := M, sup := Sup, callback_state := CBState} = State
) ->
) when is_map(CBState) ->
?tp_ignore_side_effects_in_prod(debug, #{
module => ?MODULE, conn => C, props => Props, event => new_conn
}),
%% I become the connection owner, I should start an new acceptor.
Sup =/= undefined andalso (catch supervisor:start_child(Sup, [Sup])),
default_cb_ret(M:new_conn(C, Props, CBState), State#{conn := C});
handle_info(
{quic, new_conn, C, Props},
#{callback := M, sup := Sup, callback_state := undefined, conf_tab := ConfTab} = State
) ->
%% deferred init
COpts = ets:lookup_element(ConfTab, c_opts, 2),
SOpts = ets:lookup_element(ConfTab, s_opts, 2),
Sup =/= undefined andalso (catch supervisor:start_child(Sup, [Sup])),
case M:init(COpts#{stream_opts => SOpts}) of
{ok, CBState0} ->
default_cb_ret(M:new_conn(C, Props, CBState0), State#{conn := C});
Other ->
Other
end;
handle_info(
{quic, connected, C, #{is_resumed := IsResumed} = Props},
#{
Expand Down
Loading
Loading