Skip to content

Commit

Permalink
fix: thread safe msquic API table and global registration
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed Sep 26, 2023
1 parent ee588f6 commit 0985041
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 37 deletions.
2 changes: 1 addition & 1 deletion c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
}
else
{
return ERROR_TUPLE_2(ATOM_REG_FAILED);
return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION);
}
r_ctx = NULL;
}
Expand Down
3 changes: 1 addition & 2 deletions c_src/quicer_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

// alloc/dealloc ctx should be done in the callbacks.
extern QuicerRegistrationCTX *G_r_ctx;

QuicerRegistrationCTX *
init_r_ctx()
{
Expand All @@ -31,8 +32,6 @@ init_r_ctx()
r_ctx->env = enif_alloc_env();
r_ctx->Registration = NULL;
r_ctx->is_released = FALSE;
CxPlatListInitializeHead(&r_ctx->Listeners);
CxPlatListInitializeHead(&r_ctx->Connections);
return r_ctx;
}

Expand Down
2 changes: 0 additions & 2 deletions c_src/quicer_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ typedef struct QuicerRegistrationCTX
HQUIC Registration;
BOOLEAN is_released;
char name[UINT8_MAX + 1];
CXPLAT_LIST_ENTRY Listeners;
CXPLAT_LIST_ENTRY Connections;
} QuicerRegistrationCTX;

/*
Expand Down
1 change: 1 addition & 0 deletions c_src/quicer_eterms.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ extern ERL_NIF_TERM ATOM_BAD_MON;
extern ERL_NIF_TERM ATOM_LISTENER_OPEN_ERROR;
extern ERL_NIF_TERM ATOM_LISTENER_START_ERROR;
extern ERL_NIF_TERM ATOM_BADARG;
extern ERL_NIF_TERM ATOM_LIB_UNINITIALIZED;
extern ERL_NIF_TERM ATOM_CONN_OPEN_ERROR;
extern ERL_NIF_TERM ATOM_CONN_START_ERROR;
extern ERL_NIF_TERM ATOM_STREAM_OPEN_ERROR;
Expand Down
61 changes: 38 additions & 23 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ static ERL_NIF_TERM stream_controlling_process(ErlNifEnv *env,
const ErlNifPid *caller,
const ERL_NIF_TERM *pid);

static ERL_NIF_TERM
closeLib(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

/*
** atoms in use, initialized while load nif
*/
Expand All @@ -54,6 +57,7 @@ ERL_NIF_TERM ATOM_BAD_MON;
ERL_NIF_TERM ATOM_LISTENER_OPEN_ERROR;
ERL_NIF_TERM ATOM_LISTENER_START_ERROR;
ERL_NIF_TERM ATOM_BADARG;
ERL_NIF_TERM ATOM_LIB_UNINITIALIZED;
ERL_NIF_TERM ATOM_CONN_OPEN_ERROR;
ERL_NIF_TERM ATOM_CONN_START_ERROR;
ERL_NIF_TERM ATOM_STREAM_OPEN_ERROR;
Expand Down Expand Up @@ -430,6 +434,7 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
ATOM(ATOM_LISTENER_OPEN_ERROR, listener_open_error); \
ATOM(ATOM_LISTENER_START_ERROR, listener_start_error); \
ATOM(ATOM_BADARG, badarg); \
ATOM(ATOM_LIB_UNINITIALIZED, lib_uninitialized); \
ATOM(ATOM_CONN_OPEN_ERROR, conn_open_error); \
ATOM(ATOM_CONN_START_ERROR, conn_start_error); \
ATOM(ATOM_STREAM_OPEN_ERROR, stm_open_error); \
Expand Down Expand Up @@ -769,6 +774,7 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
ATOM(ATOM_UNDEFINED, undefined);

extern QuicerRegistrationCTX *G_r_ctx;
extern ErlNifMutex *GRegLock;

const QUIC_API_TABLE *MsQuic = NULL;
// Mutex for MsQuic
Expand Down Expand Up @@ -948,7 +954,7 @@ resource_reg_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj)
TP_CB_3(start, (uintptr_t)obj, 0);
QuicerRegistrationCTX *reg_ctx = (QuicerRegistrationCTX *)obj;
deinit_r_ctx(reg_ctx);
if (reg_ctx->Registration)
if (MsQuic && reg_ctx->Registration)
{
MsQuic->RegistrationClose(reg_ctx->Registration);
}
Expand All @@ -966,10 +972,16 @@ on_load(ErlNifEnv *env,
{
int ret_val = 0;

// Library initialization, library scope
if (!MsQuicLock)
{
MsQuicLock = enif_mutex_create("msquic_lock");
}
{
MsQuicLock = enif_mutex_create("msquic_lock");
}

if (!GRegLock)
{
GRegLock = enif_mutex_create("global_reg_lock");
}

// init atoms in use.
#define ATOM(name, val) \
Expand Down Expand Up @@ -1052,19 +1064,10 @@ on_upgrade(ErlNifEnv *env,
static void
on_unload(__unused_parm__ ErlNifEnv *env, __unused_parm__ void *priv_data)
{
if (G_r_ctx)
{
// @TODO memleak here
MsQuic->RegistrationClose(G_r_ctx->Registration);
G_r_ctx = NULL;
}
if (MsQuic)
{
MsQuicClose(MsQuic);
MsQuic = NULL;
}
// @TODO memleak here
//enif_mutex_destroy(MsQuicLock);
closeLib(env, 0, NULL);
// @TODO reserved for upgrade
// enif_mutex_destroy(GRegLock);
// enif_mutex_destroy(MsQuicLock);
}

static ERL_NIF_TERM
Expand All @@ -1078,18 +1081,18 @@ openLib(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
char lttngPath[PATH_MAX] = { 0 };
if (MsQuicLock == NULL)
{
return ATOM_OK;
return ERROR_TUPLE_2(ATOM_LIB_UNINITIALIZED);
}
enif_mutex_lock(MsQuicLock);
if (MsQuic)
{
TP_NIF_3(skip, 0, 2);
res = SUCCESS(res);
res = SUCCESS(res);
goto exit;
}

// @todo external call for static link
CxPlatSystemLoad();
// CxPlatSystemLoad();
MsQuicLibraryLoad();

//
Expand Down Expand Up @@ -1127,19 +1130,31 @@ closeLib(__unused_parm__ ErlNifEnv *env,
{
if (MsQuicLock == NULL)
{
return ATOM_OK;
return ERROR_TUPLE_2(ATOM_LIB_UNINITIALIZED);
}
enif_mutex_lock(MsQuicLock);
if (MsQuic)
{
TP_NIF_3(do_close, MsQuic, 0);
if (G_r_ctx)

enif_mutex_lock(GRegLock);
// end of the world
if (G_r_ctx && !G_r_ctx->is_released)
{
deregistration(env, argc, argv);
// Make MsQuic debug check pass:
// Zero Registration when closing MsQuic
MsQuic->RegistrationClose(G_r_ctx->Registration);
G_r_ctx->Registration = NULL;
G_r_ctx->is_released = TRUE;
destroy_r_ctx(G_r_ctx);
G_r_ctx = NULL;
}
enif_mutex_unlock(GRegLock);

MsQuicClose(MsQuic);
MsQuic = NULL;
}

enif_mutex_unlock(MsQuicLock);
return ATOM_OK;
}
Expand Down
20 changes: 14 additions & 6 deletions c_src/quicer_reg.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ static BOOLEAN parse_reg_conf(ERL_NIF_TERM eprofile,
QUIC_REGISTRATION_CONFIG *RegConfig);

QuicerRegistrationCTX *G_r_ctx = NULL;
ErlNifMutex *GRegLock = NULL;

/*
** For global registration only
Expand All @@ -33,23 +34,26 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
QUIC_STATUS status;
ERL_NIF_TERM res = ATOM_OK;

if (!MsQuic || G_r_ctx)
if (!MsQuic || !GRegLock || G_r_ctx)
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_mutex_lock(GRegLock);
if (argc == 1)
{
eprofile = argv[0];
if (!parse_reg_conf(eprofile, &RegConfig))
{
enif_mutex_unlock(GRegLock);
return ERROR_TUPLE_2(ATOM_BADARG);
}
}

QuicerRegistrationCTX *r_ctx = init_r_ctx();
if (!r_ctx)
{
enif_mutex_unlock(GRegLock);
return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY);
}

Expand All @@ -60,14 +64,17 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
goto exit;
}

// Keep global registration context
// enif_keep_resource(r_ctx);
G_r_ctx = r_ctx;
enif_mutex_unlock(GRegLock);

// nif owns the global registration
// thus not return to the erlang side
return ATOM_OK;

exit:
deinit_r_ctx(r_ctx);
destroy_r_ctx(r_ctx);
enif_mutex_unlock(GRegLock);
return res;
}

Expand All @@ -80,18 +87,19 @@ deregistration(__unused_parm__ ErlNifEnv *env,
__unused_parm__ const ERL_NIF_TERM argv[])
{
int error_code = 0;
if (!MsQuic)
if (!MsQuic || !GRegLock)
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (G_r_ctx)
enif_mutex_lock(GRegLock);
if (G_r_ctx && !G_r_ctx->is_released)
{
MsQuic->RegistrationShutdown(G_r_ctx->Registration, FALSE, error_code);
destroy_r_ctx(G_r_ctx);
G_r_ctx = NULL;
}

enif_mutex_unlock(GRegLock);
return ATOM_OK;
}

Expand Down
10 changes: 7 additions & 3 deletions test/quicer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
end_per_testcase/2]).

%% test cases
-export([tc_nif_module_load/1
-export([ tc_nif_module_load/1
, tc_nif_module_unload/1
, tc_nif_module_reload/1
, tc_open_lib_test/1
Expand Down Expand Up @@ -219,8 +219,12 @@ end_per_testcase(tc_close_lib_test, _Config) ->
quicer:open_lib();
end_per_testcase(tc_lib_registration, _Config) ->
quicer:reg_open();
end_per_testcase(tc_lib_registration_1, _Config) ->
quicer:reg_open();
end_per_testcase(tc_lib_re_registration, _Config) ->
quicer:reg_open();
end_per_testcase(tc_lib_re_registration_neg, _Config) ->
quicer:reg_open();
end_per_testcase(tc_open_listener_neg_1, _Config) ->
quicer:open_lib(),
quicer:reg_open();
Expand Down Expand Up @@ -284,7 +288,7 @@ tc_lib_registration(_Config) ->
ok = quicer:reg_close().

tc_lib_registration_1(_Config) ->
ok =quicer:reg_close(),
ok = quicer:reg_close(),
{error, badarg} = quicer:reg_open(foo),
ok = quicer:reg_open(quic_execution_profile_low_latency),
ok = quicer:reg_close(),
Expand Down Expand Up @@ -819,7 +823,7 @@ dgram_client_recv_loop(Conn, ReceivedOnStream, ReceivedViaDgram) ->
receive
{quic, <<"pong">>, Conn, Flag} when is_integer(Flag) ->
dgram_client_recv_loop(Conn, ReceivedOnStream, true);
{quic, <<"pong">>, _Stream, Flag} ->
{quic, <<"pong">>, _Stream, _Flag} ->
dgram_client_recv_loop(Conn, true, ReceivedViaDgram);
{quic, dgram_state_changed, Conn, #{dgram_send_enabled := true, dgram_max_len := _Size}} ->
dgram_client_recv_loop(Conn, ReceivedOnStream, ReceivedViaDgram);
Expand Down

0 comments on commit 0985041

Please sign in to comment.