diff --git a/.github/workflows/hex_pub.yml b/.github/workflows/hex_pub.yml index d78b03a1..6865ca6f 100644 --- a/.github/workflows/hex_pub.yml +++ b/.github/workflows/hex_pub.yml @@ -4,8 +4,8 @@ on: - '*' jobs: - if: false publish: + if: false runs-on: ubuntu-latest steps: - name: Check out diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e7931894..4a76ff10 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -21,9 +21,41 @@ jobs: run: | rebar3 fmt -c - mac: - timeout-minutes: 60 + pre-check: + name: Pre check needs: formatting-check + runs-on: ubuntu-latest + timeout-minutes: 25 + strategy: + fail-fast: false + matrix: + # https://builds.hex.pm/builds/otp/ubuntu-22.04/builds.txt + otp: + - 26.2.5.3 + rebar3: + - 3.23.0 + steps: + - name: Checkout + uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6 + with: + submodules: recursive + - uses: erlef/setup-beam@2f0cc07b4b9bea248ae098aba9e1a8a1de5ec24c # v1.17.5 + with: + otp-version: ${{ matrix.otp }} + rebar3-version: ${{ matrix.rebar3 }} + - name: release build with debug log off + run: | + echo "github ref: ${{ github.event.ref }}" + echo "github ref: ${{ github.ref }}" + sudo sysctl -w kernel.core_pattern=core + ulimit -c unlimited + export CMAKE_BUILD_TYPE=Debug + export QUICER_TLS_VER=sys + make ci + + mac-mesh: + timeout-minutes: 60 + needs: pre-check strategy: fail-fast: false matrix: @@ -78,8 +110,8 @@ jobs: retention-days: 1 - linux: - needs: formatting-check + linux-mesh: + needs: pre-check runs-on: ubuntu-latest timeout-minutes: 25 strategy: diff --git a/c_src/quicer_config.c b/c_src/quicer_config.c index 6f870020..0f79db57 100644 --- a/c_src/quicer_config.c +++ b/c_src/quicer_config.c @@ -20,7 +20,7 @@ limitations under the License. #include "quicer_tls.h" #include -extern QuicerRegistrationCTX *G_r_ctx; +extern QuicerRegistrationCTX G_r_ctx; extern pthread_mutex_t MsQuicLock; static ERL_NIF_TERM get_stream_opt(ErlNifEnv *env, @@ -218,11 +218,6 @@ ServerLoadConfiguration(ErlNifEnv *env, { QUIC_SETTINGS Settings = { 0 }; - if (!G_r_ctx) - { - return ATOM_REG_FAILED; - } - if (!create_settings(env, option, &Settings)) { return ATOM_BADARG; @@ -275,11 +270,6 @@ ClientLoadConfiguration(ErlNifEnv *env, QUIC_SETTINGS Settings = { 0 }; ERL_NIF_TERM ret = ATOM_OK; - if (!G_r_ctx) - { - return ATOM_REG_FAILED; - } - // // Configures the client's idle timeout. // @@ -859,10 +849,9 @@ encode_parm_to_eterm(ErlNifEnv *env, } ERL_NIF_TERM -getopt3(ErlNifEnv *env, - __unused_parm__ int argc, - __unused_parm__ const ERL_NIF_TERM argv[]) +getopt3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + CXPLAT_FRE_ASSERT(3 == argc); ERL_NIF_TERM ctx = argv[0]; ERL_NIF_TERM eopt = argv[1]; ERL_NIF_TERM elevel = argv[2]; @@ -894,12 +883,12 @@ getopt3(ErlNifEnv *env, { return SUCCESS(ETERM_UINT_64(((QuicerStreamCTX *)q_ctx)->StreamID)); } - if (!get_stream_handle(q_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(q_ctx))) { goto Exit; } res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); - put_stream_handle(q_ctx); + LOCAL_REFCNT(put_stream_handle(q_ctx)); } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { @@ -971,8 +960,9 @@ set_level_param(ErlNifEnv *env, } ERL_NIF_TERM -setopt4(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) +setopt4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + CXPLAT_FRE_ASSERT(4 == argc); ERL_NIF_TERM ctx = argv[0]; ERL_NIF_TERM eopt = argv[1]; ERL_NIF_TERM evalue = argv[2]; @@ -992,13 +982,13 @@ setopt4(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } else if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { - if (!get_stream_handle(q_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(q_ctx))) { goto Exit; } res = set_stream_opt( env, (QuicerStreamCTX *)q_ctx, eopt, evalue, elevel); - put_stream_handle(q_ctx); + LOCAL_REFCNT(put_stream_handle(q_ctx)); } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { @@ -1344,7 +1334,7 @@ get_stream_opt(ErlNifEnv *env, { res = get_level_param(env, s_ctx->Stream, - s_ctx->c_ctx->config_resource->Configuration, + s_ctx->c_ctx->config_ctx->Configuration, optname, elevel); goto Exit; @@ -1458,7 +1448,7 @@ set_stream_opt(ErlNifEnv *env, { res = set_level_param(env, s_ctx->Stream, - s_ctx->c_ctx->config_resource->Configuration, + s_ctx->c_ctx->config_ctx->Configuration, optname, optval, elevel); @@ -1531,13 +1521,13 @@ get_connection_opt(ErlNifEnv *env, if (!IS_SAME_TERM(ATOM_FALSE, elevel)) { - if (!c_ctx->config_resource) + if (!c_ctx->config_ctx) { goto Exit; } res = get_level_param(env, c_ctx->Connection, - c_ctx->config_resource->Configuration, + c_ctx->config_ctx->Configuration, optname, elevel); goto Exit; @@ -1724,13 +1714,13 @@ set_connection_opt(ErlNifEnv *env, if (!IS_SAME_TERM(ATOM_FALSE, elevel)) { - if (!c_ctx->config_resource) + if (!c_ctx->config_ctx) { goto Exit; } res = set_level_param(env, c_ctx->Connection, - c_ctx->config_resource->Configuration, + c_ctx->config_ctx->Configuration, optname, optval, elevel); @@ -2062,13 +2052,13 @@ get_listener_opt(ErlNifEnv *env, { return ERROR_TUPLE_2(ATOM_CLOSED); } - enif_keep_resource(l_ctx); + get_listener_handle(l_ctx); if (!IS_SAME_TERM(ATOM_FALSE, elevel)) { res = get_level_param(env, l_ctx->Listener, - l_ctx->config_resource->Configuration, + l_ctx->config_ctx->Configuration, optname, elevel); goto Exit; @@ -2123,7 +2113,7 @@ get_listener_opt(ErlNifEnv *env, res = ERROR_TUPLE_2(ATOM_STATUS(status)); } Exit: - enif_release_resource(l_ctx); + put_listener_handle(l_ctx); return res; } @@ -2157,7 +2147,7 @@ set_listener_opt(ErlNifEnv *env, { res = set_level_param(env, l_ctx->Listener, - l_ctx->config_resource->Configuration, + l_ctx->config_ctx->Configuration, optname, optval, elevel); @@ -2644,7 +2634,6 @@ parse_registration(ErlNifEnv *env, return FALSE; } } - return TRUE; } diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 18dd0de7..e0ccd46f 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -23,7 +23,7 @@ limitations under the License. #include #include -extern QuicerRegistrationCTX *G_r_ctx; +extern QuicerRegistrationCTX G_r_ctx; extern pthread_mutex_t GRegLock; #if defined(DEBUG) && !defined(QUICER_LOGGING_STDOUT) @@ -115,8 +115,9 @@ ERL_NIF_TERM parse_conn_event_mask(ErlNifEnv *env, QUIC_STATUS selected_owner_unreachable(QuicerStreamCTX *s_ctx); ERL_NIF_TERM -peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) +peercert1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + CXPLAT_FRE_ASSERT(1 == argc); ERL_NIF_TERM ctx = argv[0]; ERL_NIF_TERM DerCert; ERL_NIF_TERM res = ATOM_UNDEFINED; @@ -124,8 +125,10 @@ peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) QuicerConnCTX *c_ctx; int len = 0; unsigned char *tmp; + if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { + // @FIXME: get s_ctx handle first. c_ctx = ((QuicerStreamCTX *)q_ctx)->c_ctx; } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) @@ -137,6 +140,11 @@ peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_BADARG); } + if (!c_ctx || !LOCAL_REFCNT(get_conn_handle(c_ctx))) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } + assert(c_ctx); enif_mutex_lock(c_ctx->lock); if (!c_ctx->peer_cert) @@ -168,6 +176,7 @@ peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) exit: enif_mutex_unlock(c_ctx->lock); + LOCAL_REFCNT(put_conn_handle(c_ctx)); return res; } @@ -266,6 +275,33 @@ dump_sslkeylogfile(_In_z_ const char *FileName, fclose(File); } +// Assign registration for c_ctx +// 1. use `quic_registration` option if set, otherwise use global registration +// 2. take the registration handle for resource management +// 3. link c_ctx to r_ctx for resource management +// return FALSE for invalid quic_registration or the registration handle is +// closed +static BOOLEAN +assign_registration(ErlNifEnv *env, + ERL_NIF_TERM eoptions, + QuicerConnCTX *c_ctx) +{ + + QuicerRegistrationCTX *r_ctx = NULL; + if (!parse_registration(env, eoptions, &r_ctx)) + { + return FALSE; + } + + r_ctx = r_ctx ? r_ctx : &G_r_ctx; + + if (!get_reg_handle(r_ctx)) + { + return FALSE; + } + CONN_LINK_REGISTRATION(c_ctx, r_ctx); + return TRUE; +} // // The clients's callback for connection events from MsQuic. // @@ -382,20 +418,18 @@ _IRQL_requires_max_(DISPATCH_LEVEL) } enif_clear_env(env); - QuicerConfigCTX *conf_ctx = c_ctx->config_resource; if (is_destroy) { enif_mutex_lock(c_ctx->lock); c_ctx->is_closed = TRUE; // client shutdown completed - c_ctx->config_resource = NULL; enif_mutex_unlock(c_ctx->lock); - put_conn_handle(c_ctx); - if (conf_ctx) + CXPLAT_DBG_ASSERT(c_ctx->Connection); + // just for safty + if (c_ctx->Connection) { - enif_release_resource(conf_ctx); + put_conn_handle(c_ctx); } - destroy_c_ctx(c_ctx); } return status; } @@ -499,21 +533,14 @@ ServerConnectionCallback(HQUIC Connection, } enif_clear_env(env); - QuicerConfigCTX *conf_ctx = c_ctx->config_resource; - if (is_destroy) { enif_mutex_lock(c_ctx->lock); c_ctx->is_closed = TRUE; // server shutdown_complete - c_ctx->config_resource = NULL; + // c_ctx->config_ctx = NULL; enif_mutex_unlock(c_ctx->lock); put_conn_handle(c_ctx); - if (conf_ctx) - { - enif_release_resource(conf_ctx); - } - destroy_c_ctx(c_ctx); } return status; } @@ -541,13 +568,9 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) } // If r_ctx is unset, default to use global registration - if (!r_ctx && !G_r_ctx) - { - return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - } - else + if (!r_ctx) { - r_ctx = G_r_ctx; + r_ctx = &G_r_ctx; } if (!get_reg_handle(r_ctx)) @@ -576,13 +599,10 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) } // It is safe to use r_ctx here since - // a) it is passed as argument which beam still has reference to - // b) G_r_ctx is only destroyed when code is unloaded. + // it is passed as argument which beam still has reference to + CONN_LINK_REGISTRATION(c_ctx, r_ctx); - enif_keep_resource(r_ctx); - c_ctx->r_ctx = r_ctx; enif_mutex_lock(r_ctx->lock); - if (!r_ctx->Registration) { res = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); @@ -608,36 +628,23 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) goto exit; } - CxPlatRefInitialize(&(c_ctx->ref_count)); - - // link to registration - if (r_ctx) - { - enif_mutex_lock(r_ctx->lock); - CxPlatListInsertTail(&r_ctx->Connections, &c_ctx->RegistrationLink); - enif_mutex_unlock(r_ctx->lock); - } - eHandle = enif_make_resource(env, c_ctx); return SUCCESS(eHandle); exit: - enif_release_resource(r_ctx); - enif_release_resource(c_ctx); + put_conn_handle(c_ctx); return res; } ERL_NIF_TERM -async_connect3(ErlNifEnv *env, - __unused_parm__ int argc, - const ERL_NIF_TERM argv[]) +async_connect3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QUIC_STATUS Status; + CXPLAT_FRE_ASSERT(argc == 3); ERL_NIF_TERM ehost = argv[0]; ERL_NIF_TERM eport = argv[1]; ERL_NIF_TERM eoptions = argv[2]; ERL_NIF_TERM eHandle = ATOM_UNDEFINED; - // Usually we should not get this error // If we get it is internal logic error ERL_NIF_TERM res = ERROR_TUPLE_2(ATOM_ERROR_INTERNAL_ERROR); @@ -665,18 +672,20 @@ async_connect3(ErlNifEnv *env, // Check option 'handle' for opened connection if (enif_get_map_value(env, eoptions, ATOM_HANDLE, &eHandle)) { - /* Reuse c_ctx from existing connecion handle */ + /* Reuse c_ctx from existing connection handle */ if (enif_get_resource(env, eHandle, ctx_connection_t, (void **)&c_ctx)) { assert(c_ctx->is_closed); assert(c_ctx->owner); - // r_ctx is already kept in open_connectionX + // r_ctx is already kept when open the connection r_ctx = c_ctx->r_ctx; - is_reuse_handle = TRUE; + // @NOTE: get reg handle for this fun call, put it before return if (!get_reg_handle(r_ctx)) { + c_ctx->r_ctx = NULL; return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } + is_reuse_handle = TRUE; } else { @@ -686,44 +695,39 @@ async_connect3(ErlNifEnv *env, else { /* Alloc new c_ctx */ - assert(!is_reuse_handle); - assert(!c_ctx); - assert(!r_ctx); + CXPLAT_FRE_ASSERT(!is_reuse_handle); + CXPLAT_FRE_ASSERT(!c_ctx); + CXPLAT_FRE_ASSERT(!r_ctx); c_ctx = init_c_ctx(); // Get Reg for c_ctx, quic_registration is optional - if (!parse_registration(env, eoptions, &c_ctx->r_ctx)) + + if (!assign_registration(env, eoptions, c_ctx)) { - enif_release_resource(c_ctx); + c_ctx->r_ctx = NULL; + put_conn_handle(c_ctx); return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } - - r_ctx = c_ctx->r_ctx ? c_ctx->r_ctx : G_r_ctx; + CXPLAT_DBG_ASSERT(c_ctx->r_ctx); + r_ctx = c_ctx->r_ctx; if ((c_ctx->owner = AcceptorAlloc()) == NULL) { - enif_release_resource(c_ctx); + put_conn_handle(c_ctx); return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); } // set owner if (!enif_self(env, &(c_ctx->owner->Pid))) { - enif_release_resource(c_ctx); + put_conn_handle(c_ctx); return ERROR_TUPLE_2(ATOM_BAD_PID); } - - if (!get_reg_handle(r_ctx)) - { - enif_release_resource(c_ctx); - return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - } - enif_keep_resource(r_ctx); } - assert(r_ctx); - assert(c_ctx); + CXPLAT_FRE_ASSERT(r_ctx); + CXPLAT_FRE_ASSERT(c_ctx); // Now we have c_ctx either // a) passed in as handle @@ -734,14 +738,15 @@ async_connect3(ErlNifEnv *env, } Registration = r_ctx->Registration; - assert(c_ctx->owner); + CXPLAT_DBG_ASSERT(Registration); + CXPLAT_DBG_ASSERT(c_ctx->owner); - // Allocate config_resource for client connection + // Allocate config_ctx for client connection // @TODO client config handle should be reused if needed. - if (NULL == (c_ctx->config_resource = init_config_ctx())) + if (NULL == (c_ctx->config_ctx = init_config_ctx())) { res = ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); - goto Error; + goto ErrorNoLock; } #ifdef QUICER_USE_TRUSTED_STORE @@ -751,7 +756,7 @@ async_connect3(ErlNifEnv *env, { // TLS opt error not file content error res = ERROR_TUPLE_2(ATOM_CACERTFILE); - goto Error; + goto ErrorNoLock; } if (cacertfile) @@ -760,27 +765,24 @@ async_connect3(ErlNifEnv *env, { free(cacertfile); res = ERROR_TUPLE_2(ATOM_CERT_ERROR); - goto Error; + goto ErrorNoLock; } free(cacertfile); cacertfile = NULL; } #endif // QUICER_USE_TRUSTED_STORE - // convert eoptions to Configuration + // Convert eoptions to Configuration ERL_NIF_TERM estatus = ClientLoadConfiguration( - env, &eoptions, Registration, &(c_ctx->config_resource->Configuration)); + env, &eoptions, Registration, &(c_ctx->config_ctx->Configuration)); if (!IS_SAME_TERM(ATOM_OK, estatus)) { res = ERROR_TUPLE_2(estatus); - if (!is_reuse_handle) - { - enif_release_resource(c_ctx); - } - return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + goto ErrorNoLock; } + // Open Connection if not reused if (!is_reuse_handle) { if (QUIC_FAILED(Status = MsQuic->ConnectionOpen(Registration, @@ -790,28 +792,21 @@ async_connect3(ErlNifEnv *env, { assert(c_ctx->Connection == NULL); res = ERROR_TUPLE_2(ATOM_CONN_OPEN_ERROR); + goto Error; } else { assert(c_ctx->is_closed); - CxPlatRefInitialize(&c_ctx->ref_count); res = parse_conn_resume_ticket(env, eoptions, c_ctx); // we could only lock it after resume ticket is set enif_mutex_lock(c_ctx->lock); - } - if (!IS_SAME_TERM(ATOM_OK, res)) - { - goto Error; + if (!IS_SAME_TERM(ATOM_OK, res)) + { + goto Error; + } } } c_ctx->is_closed = FALSE; // connection opened. - // - if (!is_reuse_handle && r_ctx) - { - enif_mutex_lock(r_ctx->lock); - CxPlatListInsertTail(&r_ctx->Connections, &c_ctx->RegistrationLink); - enif_mutex_unlock(r_ctx->lock); - } // optional set sslkeylogfile parse_sslkeylogfile_option(env, eoptions, c_ctx); @@ -835,7 +830,7 @@ async_connect3(ErlNifEnv *env, goto Error; } - // @TODO client async_connect_3 should able to take a config_resource as + // @TODO client async_connect_3 should able to take a config_ctx as // input ERL TERM so that we don't need to call ClientLoadConfiguration assert(!c_ctx->is_closed && c_ctx->Connection); @@ -854,12 +849,12 @@ async_connect3(ErlNifEnv *env, // c_ctx->lock should be taken to prevent parallel access from callback as // work trigged by starting of the connection. - if (QUIC_FAILED(Status = MsQuic->ConnectionStart( - c_ctx->Connection, - c_ctx->config_resource->Configuration, - QUIC_ADDRESS_FAMILY_UNSPEC, - host, - port))) + if (QUIC_FAILED(Status + = MsQuic->ConnectionStart(c_ctx->Connection, + c_ctx->config_ctx->Configuration, + QUIC_ADDRESS_FAMILY_UNSPEC, + host, + port))) { AcceptorDestroy(c_ctx->owner); c_ctx->owner = NULL; @@ -869,12 +864,6 @@ async_connect3(ErlNifEnv *env, c_ctx->Connection = NULL; } - if (c_ctx->config_resource) - { - destroy_config_ctx(c_ctx->config_resource); - c_ctx->config_resource = NULL; - } - res = ERROR_TUPLE_2(ATOM_CONN_START_ERROR); TP_NIF_3(start_fail, (uintptr_t)(c_ctx->Connection), Status); goto Error; @@ -883,40 +872,24 @@ async_connect3(ErlNifEnv *env, eHandle = enif_make_resource(env, c_ctx); enif_mutex_unlock(c_ctx->lock); - return SUCCESS(eHandle); -Error: - put_reg_handle(r_ctx); - HQUIC Connection = c_ctx->Connection; - if (Connection) - { - /* - Prevent double ConnectionClose - @NOTE: - We could not call MsQuic->SetCallbackHandler to set callback to NULL - becasue this function is async, not thread safe. - */ - c_ctx->Connection = NULL; - c_ctx->is_closed = TRUE; - } - - // Error exit, it must be closed or Handle is NULL - assert(c_ctx->is_closed || NULL == c_ctx->Connection); - enif_mutex_unlock(c_ctx->lock); - - if (Connection) + if (is_reuse_handle) { - // @NOTE: - // It will trigger 'connection shutdown completed' callback - // thus it is important to not release the resource c_ctx for callback - // to access the c_ctx - MsQuic->ConnectionClose(Connection); + put_reg_handle(r_ctx); } - if (!is_reuse_handle) + return SUCCESS(eHandle); + +Error: + enif_mutex_unlock(c_ctx->lock); +ErrorNoLock: + if (is_reuse_handle) { - enif_release_resource(r_ctx); + // we get the handle at the begining of this function + put_reg_handle(r_ctx); } + c_ctx->is_closed = TRUE; + put_conn_handle(c_ctx); return res; } @@ -971,12 +944,11 @@ async_accept2(ErlNifEnv *env, } ERL_NIF_TERM -shutdown_connection3(ErlNifEnv *env, - __unused_parm__ int argc, - const ERL_NIF_TERM argv[]) +shutdown_connection3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QuicerConnCTX *c_ctx; uint32_t app_errcode = 0, flags = 0; + CXPLAT_FRE_ASSERT(3 == argc); if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); @@ -1011,7 +983,7 @@ shutdown_connection3(ErlNifEnv *env, } ERL_NIF_TERM -sockname1(ErlNifEnv *env, __unused_parm__ int args, const ERL_NIF_TERM argv[]) +sockname1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { void *q_ctx; HQUIC Handle = NULL; @@ -1020,24 +992,31 @@ sockname1(ErlNifEnv *env, __unused_parm__ int args, const ERL_NIF_TERM argv[]) QUIC_ADDR addr; uint32_t addrSize = sizeof(addr); + CXPLAT_FRE_ASSERT(1 == argc); + if (enif_get_resource(env, argv[0], ctx_connection_t, &q_ctx)) { - if (!get_conn_handle((QuicerConnCTX *)q_ctx)) + QuicerConnCTX *c_ctx = (QuicerConnCTX *)q_ctx; + if (!get_conn_handle(c_ctx)) { return ERROR_TUPLE_2(ATOM_CLOSED); } - Handle = (((QuicerConnCTX *)q_ctx))->Connection; + Handle = c_ctx->Connection; Param = QUIC_PARAM_CONN_LOCAL_ADDRESS; Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); - put_conn_handle((QuicerConnCTX *)q_ctx); + put_conn_handle(c_ctx); } else if (enif_get_resource(env, argv[0], ctx_listener_t, &q_ctx)) { - enif_mutex_lock(((QuicerListenerCTX *)q_ctx)->lock); - Handle = ((QuicerListenerCTX *)q_ctx)->Listener; + QuicerListenerCTX *l_ctx = (QuicerListenerCTX *)q_ctx; + if (!get_listener_handle(l_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } + Handle = l_ctx->Listener; Param = QUIC_PARAM_LISTENER_LOCAL_ADDRESS; Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); - enif_mutex_unlock(((QuicerListenerCTX *)q_ctx)->lock); + put_listener_handle(l_ctx); } else { @@ -1115,9 +1094,8 @@ continue_connection_handshake(QuicerConnCTX *c_ctx) return QUIC_STATUS_INVALID_STATE; } - if (QUIC_FAILED( - Status = MsQuic->ConnectionSetConfiguration( - c_ctx->Connection, c_ctx->config_resource->Configuration))) + if (QUIC_FAILED(Status = MsQuic->ConnectionSetConfiguration( + c_ctx->Connection, c_ctx->config_ctx->Configuration))) { return Status; } @@ -1357,16 +1335,19 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, QuicerStreamCTX *s_ctx = init_s_ctx(); BOOLEAN is_orphan = FALSE; - enif_keep_resource(c_ctx); + + if (!get_conn_handle(c_ctx)) + { + return QUIC_STATUS_UNREACHABLE; + } + s_ctx->c_ctx = c_ctx; s_ctx->eHandle = enif_make_resource(s_ctx->imm_env, s_ctx); // @TODO Generally, we rely on outer caller to clean the env, // or we should clean the env in this function. env = s_ctx->env; - get_conn_handle(c_ctx); s_ctx->Stream = Event->PEER_STREAM_STARTED.Stream; - CxPlatRefInitialize(&(s_ctx->ref_count)); ACCEPTOR *acc = AcceptorDequeue(c_ctx->acceptor_queue); @@ -1379,10 +1360,11 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, acc = AcceptorAlloc(); if (!acc) { + s_ctx->Stream = NULL; return QUIC_STATUS_UNREACHABLE; } // We must copy here, otherwise it will become double free - // in resource dealloc callbacks (for Stream and Connection) + // for Stream and Connection CxPlatCopyMemory(acc, c_ctx->owner, sizeof(ACCEPTOR)); // We set it to passive and let new owner set it to active after handoff @@ -1762,13 +1744,7 @@ get_connectionsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM res = enif_make_list(env, 0); if (argc == 0) // use global registration { - pthread_mutex_lock(&GRegLock); - if (!G_r_ctx) - { - pthread_mutex_unlock(&GRegLock); - return res; - } - r_ctx = G_r_ctx; + r_ctx = &G_r_ctx; } else { @@ -1777,6 +1753,11 @@ get_connectionsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_BADARG); } } + + if (!get_reg_handle(r_ctx)) + { + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + } enif_mutex_lock(r_ctx->lock); CXPLAT_LIST_ENTRY *Entry = r_ctx->Connections.Flink; while (Entry != &r_ctx->Connections) @@ -1795,6 +1776,7 @@ get_connectionsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) // becasue deref c_ctx may cause connection close and then trigger callback // that destroy c_ctx which locks r_ctx in another thread, causing dead lock put_conn_handles(env, res); + put_reg_handle(r_ctx); // get_connectionsX if (argc == 0) // use global registration { @@ -1803,6 +1785,48 @@ get_connectionsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return res; } +ERL_NIF_TERM +count_reg_connsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerRegistrationCTX *r_ctx = NULL; + ERL_NIF_TERM res = ATOM_UNDEFINED; + uint32_t count = 0; + if (argc == 0) // use global registration + { + r_ctx = &G_r_ctx; + } + else + { + if (!enif_get_resource(env, argv[0], ctx_reg_t, (void **)&r_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + } + if (!get_reg_handle(r_ctx)) + { + res = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + goto exit; + } + enif_mutex_lock(r_ctx->lock); + CXPLAT_LIST_ENTRY *Entry = r_ctx->Connections.Flink; + while (Entry != &r_ctx->Connections) + { + Entry = Entry->Flink; + count++; + } + enif_mutex_unlock(r_ctx->lock); + + res = enif_make_uint(env, count); + put_reg_handle(r_ctx); // conn count + +exit: + if (argc == 0) // use global registration + { + pthread_mutex_unlock(&GRegLock); + } + return res; +} + ERL_NIF_TERM get_conn_owner1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -1849,13 +1873,11 @@ put_conn_handles(ErlNifEnv *env, ERL_NIF_TERM conn_handles) QUIC_STATUS selected_owner_unreachable(QuicerStreamCTX *s_ctx) { - // - // s_ctx ownership transfer failed - // There is no shared ownership, we must destroy the s_ctx here - // s_ctx->is_closed = TRUE; - enif_clear_env(s_ctx->env); - destroy_s_ctx(s_ctx); + // @NOTE: unset Stream handle to avoid double closing + // becasue we are rejecting it and MsQuic internally will + // close it. + s_ctx->Stream = NULL; return QUIC_STATUS_UNREACHABLE; } diff --git a/c_src/quicer_connection.h b/c_src/quicer_connection.h index c8d3517a..09323417 100644 --- a/c_src/quicer_connection.h +++ b/c_src/quicer_connection.h @@ -58,6 +58,9 @@ peercert1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM get_connectionsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM +count_reg_connsX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + ERL_NIF_TERM get_conn_owner1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index 45e7adf6..63abb1bb 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -17,21 +17,31 @@ limitations under the License. #include "quicer_ctx.h" // alloc/dealloc ctx should be done in the callbacks. -extern QuicerRegistrationCTX *G_r_ctx; +extern QuicerRegistrationCTX G_r_ctx; QuicerRegistrationCTX * -init_r_ctx() +init_r_ctx(QuicerRegistrationCTX *r_ctx) { - QuicerRegistrationCTX *r_ctx - = enif_alloc_resource(ctx_reg_t, sizeof(QuicerRegistrationCTX)); + if (!r_ctx) + { + r_ctx = enif_alloc_resource(ctx_reg_t, sizeof(QuicerRegistrationCTX)); + CxPlatZeroMemory(r_ctx, sizeof(QuicerRegistrationCTX)); + // Only for none global registration + CxPlatRefInitialize(&r_ctx->ref_count); + } + else + { + // G_r_ctx + CXPLAT_FRE_ASSERT(r_ctx == &G_r_ctx); + CXPLAT_FRE_ASSERT(r_ctx->ref_count == 0); + } if (!r_ctx) { return NULL; } - CxPlatZeroMemory(r_ctx, sizeof(QuicerRegistrationCTX)); r_ctx->env = enif_alloc_env(); r_ctx->Registration = NULL; - r_ctx->is_released = FALSE; + r_ctx->is_closed = TRUE; r_ctx->lock = enif_mutex_create("quicer:r_ctx"); CxPlatListInitializeHead(&r_ctx->Listeners); CxPlatListInitializeHead(&r_ctx->Connections); @@ -41,17 +51,11 @@ init_r_ctx() void deinit_r_ctx(QuicerRegistrationCTX *r_ctx) { + r_ctx->is_closed = TRUE; enif_free_env(r_ctx->env); enif_mutex_destroy(r_ctx->lock); } -void -destroy_r_ctx(QuicerRegistrationCTX *r_ctx) -{ - r_ctx->is_released = TRUE; - enif_release_resource(r_ctx); -} - QuicerListenerCTX * init_l_ctx() { @@ -63,7 +67,7 @@ init_l_ctx() } CxPlatZeroMemory(l_ctx, sizeof(QuicerListenerCTX)); l_ctx->env = enif_alloc_env(); - l_ctx->config_resource = init_config_ctx(); + l_ctx->config_ctx = NULL; l_ctx->acceptor_queue = AcceptorQueueNew(); l_ctx->lock = enif_mutex_create("quicer:l_ctx"); #if defined(QUICER_USE_TRUSTED_STORE) @@ -71,8 +75,8 @@ init_l_ctx() #endif l_ctx->is_closed = TRUE; l_ctx->allow_insecure = FALSE; - l_ctx->r_ctx = NULL; CxPlatListInitializeHead(&l_ctx->RegistrationLink); + CxPlatRefInitialize(&l_ctx->ref_count); return l_ctx; } @@ -85,15 +89,8 @@ deinit_l_ctx(QuicerListenerCTX *l_ctx) X509_STORE_free(l_ctx->trusted_store); } #endif // QUICER_USE_TRUSTED_STORE + AcceptorQueueDestroy(l_ctx->acceptor_queue); - if (l_ctx->config_resource) - { - destroy_config_ctx(l_ctx->config_resource); - } - if (l_ctx->r_ctx && l_ctx->r_ctx != G_r_ctx) - { - enif_release_resource(l_ctx->r_ctx); - } enif_mutex_destroy(l_ctx->lock); enif_free_env(l_ctx->env); } @@ -101,34 +98,7 @@ deinit_l_ctx(QuicerListenerCTX *l_ctx) void destroy_l_ctx(QuicerListenerCTX *l_ctx) { - QuicerRegistrationCTX *r_ctx; - if (l_ctx->r_ctx) - { - r_ctx = l_ctx->r_ctx; - } - else - { - r_ctx = G_r_ctx; - } - - if (r_ctx) - { - put_reg_handle(r_ctx); - enif_mutex_lock(r_ctx->lock); - CxPlatListEntryRemove(&l_ctx->RegistrationLink); - enif_mutex_unlock(r_ctx->lock); - } - - // @note, Destroy config asap as it holds rundown - // ref count in registration - destroy_config_ctx(l_ctx->config_resource); - if (l_ctx->r_ctx) - { - enif_release_resource(l_ctx->r_ctx); - l_ctx->r_ctx = NULL; - } - l_ctx->config_resource = NULL; if (l_ctx->is_monitored) { enif_demonitor_process(l_ctx->env, l_ctx, &l_ctx->owner_mon); @@ -160,9 +130,10 @@ init_c_ctx() c_ctx->event_mask = 0; c_ctx->ssl_keylogfile = NULL; c_ctx->is_closed = TRUE; // init - c_ctx->config_resource = NULL; + c_ctx->config_ctx = NULL; c_ctx->peer_cert = NULL; CxPlatListInitializeHead(&c_ctx->RegistrationLink); + CxPlatRefInitialize(&c_ctx->ref_count); return c_ctx; } @@ -177,12 +148,18 @@ deinit_c_ctx(QuicerConnCTX *c_ctx) c_ctx->trusted = NULL; } #endif // QUICER_USE_TRUSTED_STORE - if (c_ctx->config_resource) + // @note, for the conn failed to get opened/started + if (c_ctx->config_ctx) { - enif_release_resource(c_ctx->config_resource); + put_config_handle(c_ctx->config_ctx); } AcceptorQueueDestroy(c_ctx->acceptor_queue); + if (c_ctx->r_ctx) + { + PUT_UNLINK_REGISTRATION(c_ctx, c_ctx->r_ctx); + } + if (c_ctx->peer_cert) { X509_free(c_ctx->peer_cert); @@ -202,19 +179,6 @@ destroy_c_ctx(QuicerConnCTX *c_ctx) c_ctx->trusted = NULL; } #endif // QUICER_USE_TRUSTED_STORE - QuicerRegistrationCTX *r_ctx; - if (c_ctx->r_ctx) - { - r_ctx = c_ctx->r_ctx; - } - else - { - r_ctx = G_r_ctx; - } - - enif_mutex_lock(r_ctx->lock); - CxPlatListEntryRemove(&c_ctx->RegistrationLink); - enif_mutex_unlock(r_ctx->lock); if (c_ctx->is_monitored) { @@ -237,6 +201,7 @@ init_config_ctx() CxPlatZeroMemory(config_ctx, sizeof(QuicerConfigCTX)); config_ctx->env = enif_alloc_env(); config_ctx->Configuration = NULL; + CxPlatRefInitialize(&config_ctx->ref_count); return config_ctx; } @@ -246,15 +211,6 @@ deinit_config_ctx(QuicerConfigCTX *config_ctx) enif_free_env(config_ctx->env); } -void -destroy_config_ctx(QuicerConfigCTX *config_ctx) -{ - if (config_ctx) - { - enif_release_resource(config_ctx); - } -} - QuicerStreamCTX * init_s_ctx() { @@ -280,6 +236,7 @@ init_s_ctx() s_ctx->is_closed = TRUE; // init s_ctx->event_mask = 0; s_ctx->sig_queue = NULL; + CxPlatRefInitialize(&s_ctx->ref_count); return s_ctx; } @@ -342,16 +299,22 @@ destroy_dgram_send_ctx(QuicerDgramSendCTX *dgram_send_ctx) inline void put_stream_handle(QuicerStreamCTX *s_ctx) { - if (CxPlatRefDecrement(&s_ctx->ref_count) && s_ctx->Stream) + if (CxPlatRefDecrement(&s_ctx->ref_count)) { + CXPLAT_DBG_ASSERT(s_ctx->is_closed); HQUIC Stream = s_ctx->Stream; - Stream = s_ctx->Stream; + QuicerConnCTX *c_ctx = s_ctx->c_ctx; s_ctx->Stream = NULL; s_ctx->is_closed = TRUE; MsQuic->SetCallbackHandler(Stream, NULL, NULL); MsQuic->StreamClose(Stream); - assert(s_ctx->c_ctx != NULL); - put_conn_handle(s_ctx->c_ctx); + CXPLAT_DBG_ASSERT(s_ctx->c_ctx != NULL); + if (c_ctx) + { + put_conn_handle(s_ctx->c_ctx); + s_ctx->c_ctx = NULL; + } + destroy_s_ctx(s_ctx); } } @@ -364,13 +327,37 @@ get_stream_handle(QuicerStreamCTX *s_ctx) inline void put_conn_handle(QuicerConnCTX *c_ctx) { - if (CxPlatRefDecrement(&c_ctx->ref_count) && c_ctx->Connection) + if (CxPlatRefDecrement(&c_ctx->ref_count)) { HQUIC Connection = c_ctx->Connection; + QuicerRegistrationCTX *r_ctx = c_ctx->r_ctx; + QuicerConfigCTX *config_ctx = c_ctx->config_ctx; + CXPLAT_DBG_ASSERT(c_ctx->is_closed); c_ctx->Connection = NULL; + c_ctx->config_ctx = NULL; c_ctx->is_closed = TRUE; - MsQuic->SetCallbackHandler(Connection, NULL, NULL); - MsQuic->ConnectionClose(Connection); + c_ctx->r_ctx = NULL; + + // Close connection handle + if (Connection) + { + MsQuic->SetCallbackHandler(Connection, NULL, NULL); + MsQuic->ConnectionClose(Connection); + } + + // Deref config_ctx + if (config_ctx) + { + put_config_handle(config_ctx); + } + + // Unlink from registration + // @NOTE: maybe not yet linked + if (r_ctx) + { + PUT_UNLINK_REGISTRATION(c_ctx, r_ctx); + } + destroy_c_ctx(c_ctx); } } @@ -383,11 +370,34 @@ get_conn_handle(QuicerConnCTX *c_ctx) inline void put_listener_handle(QuicerListenerCTX *l_ctx) { - if (CxPlatRefDecrement(&l_ctx->ref_count) && l_ctx->Listener) + if (CxPlatRefDecrement(&l_ctx->ref_count)) { + CXPLAT_DBG_ASSERT(l_ctx->is_closed); + QuicerRegistrationCTX *r_ctx = l_ctx->r_ctx; HQUIC Listener = l_ctx->Listener; l_ctx->Listener = NULL; + CXPLAT_DBG_ASSERT(l_ctx->is_closed); + l_ctx->is_closed = TRUE; + l_ctx->r_ctx = NULL; + + // Close listener handle MsQuic->ListenerClose(Listener); + + // Deref config_ctx as it has shared ownership. + if (l_ctx->config_ctx) + { + put_config_handle(l_ctx->config_ctx); + } + l_ctx->config_ctx = NULL; + + // Deref Registration Handle + if (r_ctx) + { + PUT_UNLINK_REGISTRATION(l_ctx, r_ctx); + } + + // destroy l_ctx + destroy_l_ctx(l_ctx); } } @@ -400,13 +410,48 @@ get_listener_handle(QuicerListenerCTX *l_ctx) inline void put_reg_handle(QuicerRegistrationCTX *r_ctx) { - CxPlatRefDecrement(&r_ctx->ref_count); + if (CxPlatRefDecrement(&r_ctx->ref_count)) + { + HQUIC Registration = r_ctx->Registration; + CXPLAT_DBG_ASSERT(r_ctx->is_closed); + r_ctx->is_closed = TRUE; + r_ctx->Registration = NULL; + MsQuic->RegistrationShutdown( + Registration, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); + MsQuic->RegistrationClose(Registration); + if (r_ctx == &G_r_ctx) + { + deinit_r_ctx(r_ctx); + } + else + { + enif_release_resource(r_ctx); + } + } } inline BOOLEAN get_reg_handle(QuicerRegistrationCTX *r_ctx) { - return CxPlatRefIncrementNonZero(&r_ctx->ref_count, 1); + BOOLEAN res = CxPlatRefIncrementNonZero(&r_ctx->ref_count, 1); + return res; +} + +inline void +put_config_handle(QuicerConfigCTX *config_ctx) +{ + if (CxPlatRefDecrement(&config_ctx->ref_count)) + { + MsQuic->ConfigurationClose(config_ctx->Configuration); + config_ctx->Configuration = NULL; + enif_release_resource(config_ctx); + } +} + +inline BOOLEAN +get_config_handle(QuicerConfigCTX *config_ctx) +{ + return CxPlatRefIncrementNonZero(&config_ctx->ref_count, 1); } void diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index 9068566d..cefb8c11 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -28,6 +28,41 @@ limitations under the License. #define _CTX_NIF_WRITE_ #define _CTX_NIF_READ_ +#define CONN_LINK_REGISTRATION(CTX, RCTX) \ + LINK_REGISTRATION(CTX, RCTX, Connections) + +#define LISTENER_LINK_REGISTRATION(CTX, RCTX) \ + LINK_REGISTRATION(CTX, RCTX, Listeners) +#define PUT_UNLINK_REGISTRATION(CTX, RCTX) \ + do \ + { \ + UNLINK_REGISTRATION(CTX, RCTX); \ + put_reg_handle(RCTX); \ + } \ + while (0) + +#define UNLINK_REGISTRATION(CTX, RCTX) \ + do \ + { \ + enif_mutex_lock(RCTX->lock); \ + CxPlatListEntryRemove(&CTX->RegistrationLink); \ + enif_mutex_unlock(RCTX->lock); \ + } \ + while (0) + +#define LINK_REGISTRATION(CTX, RCTX, LISTNAME) \ + do \ + { \ + enif_mutex_lock(RCTX->lock); \ + CxPlatListInsertTail(&RCTX->LISTNAME, &CTX->RegistrationLink); \ + enif_mutex_unlock(RCTX->lock); \ + CTX->r_ctx = RCTX; \ + } \ + while (0) + +#define LOCAL_REFCNT(XX) XX +#define DESTRUCT_REFCNT(XX) XX +#define CALLBACK_DESTRUCT_REFCNT(XX) DESTRUCT_REFCNT(XX) /* * Registration */ @@ -37,7 +72,7 @@ typedef struct QuicerRegistrationCTX HQUIC Registration; // Tracking lifetime of Registration handle CXPLAT_REF_COUNT ref_count; - BOOLEAN is_released; + BOOLEAN is_closed; char name[UINT8_MAX + 1]; ErlNifMutex *lock; CXPLAT_LIST_ENTRY Listeners; @@ -51,12 +86,13 @@ typedef struct QuicerConfigCTX { ErlNifEnv *env; HQUIC Configuration; + CXPLAT_REF_COUNT ref_count; } QuicerConfigCTX; typedef struct QuicerListenerCTX { - // config_resource is allocated in 'init_l_ctx' - QuicerConfigCTX *config_resource; + // config_ctx is allocated in 'init_l_ctx' + QuicerConfigCTX *config_ctx; QuicerRegistrationCTX *r_ctx; HQUIC Listener; // track lifetime of Connection handle @@ -84,10 +120,10 @@ typedef struct QuicerListenerCTX typedef struct QuicerConnCTX { uint32_t magic; - // config_resource - // for server, inherit from l_ctx + // config_ctx + // for server, inherited and shared with l_ctx // for client, alloc on its own - QuicerConfigCTX *config_resource; + QuicerConfigCTX *config_ctx; QuicerRegistrationCTX *r_ctx; CXPLAT_LIST_ENTRY RegistrationLink; HQUIC Connection; @@ -160,9 +196,8 @@ typedef struct QuicerStreamSendCTX typedef struct QuicerStreamSendCTX QuicerDgramSendCTX; -QuicerRegistrationCTX *init_r_ctx(); +QuicerRegistrationCTX *init_r_ctx(QuicerRegistrationCTX *r_ctx); void deinit_r_ctx(QuicerRegistrationCTX *r_ctx); -void destroy_r_ctx(QuicerRegistrationCTX *r_ctx); QuicerListenerCTX *init_l_ctx(); void deinit_l_ctx(QuicerListenerCTX *l_ctx); @@ -198,6 +233,9 @@ BOOLEAN get_listener_handle(QuicerListenerCTX *l_ctx); void put_reg_handle(QuicerRegistrationCTX *r_ctx); BOOLEAN get_reg_handle(QuicerRegistrationCTX *r_ctx); +void put_config_handle(QuicerConfigCTX *config_ctx); +BOOLEAN get_config_handle(QuicerConfigCTX *config_ctx); + void cache_stream_id(QuicerStreamCTX *s_ctx); void cleanup_owner_signals(QuicerStreamCTX *s_ctx); diff --git a/c_src/quicer_dgram.c b/c_src/quicer_dgram.c index 7c87aba6..473b8e30 100644 --- a/c_src/quicer_dgram.c +++ b/c_src/quicer_dgram.c @@ -27,10 +27,7 @@ send_dgram(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM eFlags = argv[2]; uint32_t sendflags; - if (3 != argc) - { - return ERROR_TUPLE_2(ATOM_BADARG); - } + CXPLAT_FRE_ASSERT(argc == 3); if (!enif_get_resource(env, econn, ctx_connection_t, (void **)&c_ctx)) { @@ -73,6 +70,10 @@ send_dgram(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_BADARG); } + if (!get_conn_handle(c_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } enif_mutex_lock(c_ctx->lock); HQUIC Connection = c_ctx->Connection; @@ -87,6 +88,7 @@ send_dgram(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) dgram_send_ctx->Buffer.Length = (uint32_t)bin->size; QUIC_STATUS Status; + ERL_NIF_TERM ret = SUCCESS(ETERM_UINT_64(bin_size)); if (QUIC_FAILED(Status = MsQuic->DatagramSend(Connection, &dgram_send_ctx->Buffer, 1, @@ -94,14 +96,12 @@ send_dgram(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) dgram_send_ctx))) { destroy_dgram_send_ctx(dgram_send_ctx); - enif_mutex_unlock(c_ctx->lock); - return ERROR_TUPLE_3(ATOM_DGRAM_SEND_ERROR, ATOM_STATUS(Status)); - } - else - { - enif_mutex_unlock(c_ctx->lock); - return SUCCESS(ETERM_UINT_64(bin_size)); + ret = ERROR_TUPLE_3(ATOM_DGRAM_SEND_ERROR, ATOM_STATUS(Status)); } + + enif_mutex_unlock(c_ctx->lock); + put_conn_handle(c_ctx); + return ret; } void diff --git a/c_src/quicer_eterms.h b/c_src/quicer_eterms.h index e126d48d..c0b0713d 100644 --- a/c_src/quicer_eterms.h +++ b/c_src/quicer_eterms.h @@ -24,6 +24,7 @@ extern ERL_NIF_TERM ATOM_FALSE; // quicer internal 'errors' extern ERL_NIF_TERM ATOM_OK; extern ERL_NIF_TERM ATOM_ERROR; +extern ERL_NIF_TERM ATOM_GLOBAL; extern ERL_NIF_TERM ATOM_REG_FAILED; extern ERL_NIF_TERM ATOM_OPEN_FAILED; extern ERL_NIF_TERM ATOM_CTX_INIT_FAILED; diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index fda9e6f3..96966347 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -22,7 +22,7 @@ limitations under the License. #include #include -extern QuicerRegistrationCTX *G_r_ctx; +extern QuicerRegistrationCTX G_r_ctx; extern pthread_mutex_t GRegLock; BOOLEAN parse_registration(ErlNifEnv *env, @@ -37,7 +37,6 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, QUIC_STATUS Status = QUIC_STATUS_SUCCESS; QuicerListenerCTX *l_ctx = (QuicerListenerCTX *)Context; QuicerConnCTX *c_ctx = NULL; - BOOLEAN is_destroy = FALSE; BOOLEAN is_worker = (enif_thread_type() == ERL_NIF_THR_UNDEFINED); @@ -50,20 +49,31 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, { case QUIC_LISTENER_EVENT_NEW_CONNECTION: + CXPLAT_DBG_ASSERT(l_ctx->r_ctx); + + if (!get_reg_handle(l_ctx->r_ctx)) + { + Status = QUIC_STATUS_UNREACHABLE; + goto Error; + } + QuicerRegistrationCTX *r_ctx = l_ctx->r_ctx; + // // Note, c_ctx is newly init here, don't grab lock. // if (!(c_ctx = init_c_ctx())) { Status = QUIC_STATUS_OUT_OF_MEMORY; + put_reg_handle(r_ctx); goto Error; } - c_ctx->r_ctx = l_ctx->r_ctx; + // assign r_ctx of c_ctx + CONN_LINK_REGISTRATION(c_ctx, l_ctx->r_ctx); + ErlNifEnv *env = c_ctx->env; c_ctx->Connection = Event->NEW_CONNECTION.Connection; - CxPlatRefInitialize(&(c_ctx->ref_count)); #if defined(QUICER_USE_TRUSTED_STORE) if (l_ctx->trusted_store) @@ -72,10 +82,12 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, c_ctx->trusted = l_ctx->trusted_store; } #endif // QUICER_USE_TRUSTED_STORE - assert(l_ctx->config_resource); + CXPLAT_DBG_ASSERT(l_ctx->config_ctx); + // Keep resource for c_ctx - enif_keep_resource(l_ctx->config_resource); - c_ctx->config_resource = l_ctx->config_resource; + CXPLAT_FRE_ASSERT(get_config_handle(l_ctx->config_ctx)); + + c_ctx->config_ctx = l_ctx->config_ctx; ACCEPTOR *conn_owner = AcceptorDequeue(l_ctx->acceptor_queue); @@ -83,16 +95,15 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, { TP_CB_3(no_acceptor, (uintptr_t)c_ctx->Connection, 0); Status = QUIC_STATUS_UNREACHABLE; - // We are going to reject the connection, + // @NOTE: We are going to reject the connection, // we will not be the owner of this connection // msquic will close the Connection Handle internally. - // Set it to NULL to avoid close it in resource_conn_dealloc_callback - // or in the put_conn_handle. c_ctx->Connection = NULL; - put_conn_handle(c_ctx); - // However, we still need to free the c_ctx + // @NOTE: we don't hold the lock of c_ctx since it is new conn. - enif_release_resource(c_ctx); + put_conn_handle(c_ctx); + CXPLAT_FRE_ASSERTMSG(r_ctx->ref_count > 0, + "Listener should still own the r_ctx"); goto Error; } TP_CB_3(acceptor_hit, (uintptr_t)c_ctx->Connection, 0); @@ -190,10 +201,13 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, // However, we still need to free the c_ctx // note, we don't hold the lock of c_ctx since it is new conn. - enif_release_resource(c_ctx); + put_conn_handle(c_ctx); goto Error; } + CXPLAT_DBG_ASSERT(r_ctx); + + c_ctx->is_closed = FALSE; // new connection // // A new connection is being attempted by a client. For the handshake to // proceed, the server must provide a configuration for QUIC to use. The @@ -203,50 +217,22 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, (void *)ServerConnectionCallback, c_ctx); - QuicerRegistrationCTX *r_ctx; - if (l_ctx->r_ctx) - { - r_ctx = l_ctx->r_ctx; - } - else - { - r_ctx = G_r_ctx; - } - - if (r_ctx) - { - enif_mutex_lock(r_ctx->lock); - CxPlatListInsertTail(&r_ctx->Connections, &c_ctx->RegistrationLink); - enif_mutex_unlock(r_ctx->lock); - } - - c_ctx->is_closed = FALSE; // new connection enif_clear_env(env); break; case QUIC_LISTENER_EVENT_STOP_COMPLETE: - // **Note**, this callback event in msquic can be triggered by either - // MsQuicListenerClose or MsQuicListenerStop. + // **Note**, this callback event from msquic can be triggered by either + // `MsQuicListenerClose` or `MsQuicListenerStop`. env = l_ctx->env; + enif_send(NULL, &(l_ctx->listenerPid), - NULL, + env, enif_make_tuple3(env, ATOM_QUIC, ATOM_LISTENER_STOPPED, enif_make_resource(env, l_ctx))); - if (!l_ctx->Listener) - { - // @NOTE This callback is part of the listener *close* process - // Listener is already closing, we can destroy the l_ctx now - // as the handle is NULL no subsequent msquic API is allowed/possible - assert(!l_ctx->is_stopped); - is_destroy = TRUE; - } - else - { - l_ctx->is_stopped = TRUE; - } + l_ctx->is_stopped = TRUE; enif_clear_env(env); break; default: @@ -258,15 +244,11 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, { enif_mutex_unlock(l_ctx->lock); } - if (is_destroy) - { - destroy_l_ctx(l_ctx); - } return Status; } ERL_NIF_TERM -listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) +listen2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; ERL_NIF_TERM ret = ATOM_OK; @@ -277,7 +259,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) QUIC_ADDR Address = {}; HQUIC Registration = NULL; - QuicerRegistrationCTX *target_r_ctx = NULL; + CXPLAT_FRE_ASSERT(argc == 2); if (!enif_is_map(env, options)) { @@ -305,7 +287,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ret); } - // Now build l_ctx + // New l_ctx QuicerListenerCTX *l_ctx = init_l_ctx(); if (!l_ctx) @@ -319,16 +301,8 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) #if defined(QUICER_USE_TRUSTED_STORE) l_ctx->trusted_store = trusted_store; #endif // QUICER_USE_TRUSTED_STORE - CxPlatRefInitialize(&l_ctx->ref_count); - // ********* ANY ERROR below this line should goto `exit` ************** - - // Set owner for l_ctx - if (!enif_self(env, &(l_ctx->listenerPid))) - { - ret = ERROR_TUPLE_2(ATOM_BAD_PID); - goto exit; - } + // ********* ANY ERROR below this line should goto `exit-*` ************** // Get Reg for l_ctx, quic_registration is optional if (!parse_registration(env, options, &l_ctx->r_ctx)) @@ -339,44 +313,45 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) if (l_ctx->r_ctx) { - // quic_registration is set - enif_keep_resource(l_ctx->r_ctx); + // quic_registration is set, + // none-global registration. + CXPLAT_DBG_ASSERT(l_ctx->r_ctx != &G_r_ctx); if (!get_reg_handle(l_ctx->r_ctx)) { + l_ctx->r_ctx = NULL; ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); goto exit; } - Registration = l_ctx->r_ctx->Registration; - target_r_ctx = l_ctx->r_ctx; } else { // quic_registration is not set, use global registration - target_r_ctx = G_r_ctx; - pthread_mutex_lock(&GRegLock); - - if (!G_r_ctx) + if (!get_reg_handle(&G_r_ctx)) { - pthread_mutex_unlock(&GRegLock); ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); goto exit; } + l_ctx->r_ctx = &G_r_ctx; + } - enif_keep_resource(G_r_ctx); - if (!get_reg_handle(G_r_ctx)) - { - ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - goto exit; - } - Registration = G_r_ctx->Registration; - pthread_mutex_unlock(&GRegLock); + LISTENER_LINK_REGISTRATION(l_ctx, l_ctx->r_ctx); + + CXPLAT_DBG_ASSERT(l_ctx->r_ctx); + // Set owner for l_ctx + if (!enif_self(env, &(l_ctx->listenerPid))) + { + ret = ERROR_TUPLE_2(ATOM_BAD_PID); + goto exit; } + Registration = l_ctx->r_ctx->Registration; + l_ctx->config_ctx = init_config_ctx(); + // Now load server config ret = ServerLoadConfiguration(env, &options, Registration, - &l_ctx->config_resource->Configuration, + &l_ctx->config_ctx->Configuration, &CredConfig); if (!IS_SAME_TERM(ATOM_OK, ret)) { @@ -403,23 +378,11 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) l_ctx, &l_ctx->Listener))) { - // Server Configuration should be destroyed - // @FIXME here leaks config? - // enif_release_resource(l_ctx->config_resource); - l_ctx->config_resource->Configuration = NULL; ret = ERROR_TUPLE_3(ATOM_LISTENER_OPEN_ERROR, ATOM_STATUS(Status)); goto exit; } l_ctx->is_closed = FALSE; - // Link to registration only when ListenerOpen success - if (target_r_ctx) - { - enif_mutex_lock(target_r_ctx->lock); - CxPlatListInsertTail(&target_r_ctx->Listeners, &l_ctx->RegistrationLink); - enif_mutex_unlock(target_r_ctx->lock); - } - // Now try to start listener unsigned alpn_buffer_length = 0; QUIC_BUFFER *alpn_buffers = NULL; @@ -446,9 +409,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) if (QUIC_FAILED(Status)) { TP_NIF_3(start_fail, (uintptr_t)(l_ctx->Listener), Status); - HQUIC Listener = l_ctx->Listener; - l_ctx->Listener = NULL; - MsQuic->ListenerClose(Listener); + l_ctx->is_closed = TRUE; ret = ERROR_TUPLE_3(ATOM_LISTENER_START_ERROR, ATOM_STATUS(Status)); goto exit; } @@ -463,7 +424,8 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) X509_STORE_free(trusted_store); #endif // QUICER_USE_TRUSTED_STORE free_certificate(&CredConfig); - destroy_l_ctx(l_ctx); + l_ctx->is_closed = TRUE; + put_listener_handle(l_ctx); return ret; } @@ -473,36 +435,26 @@ close_listener1(ErlNifEnv *env, const ERL_NIF_TERM argv[]) { QuicerListenerCTX *l_ctx; - BOOLEAN is_destroy = FALSE; ERL_NIF_TERM ret = ATOM_OK; + if (!enif_get_resource(env, argv[0], ctx_listener_t, (void **)&l_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); } enif_mutex_lock(l_ctx->lock); + if (l_ctx->is_closed) { enif_mutex_unlock(l_ctx->lock); return ERROR_TUPLE_2(ATOM_CLOSED); } - HQUIC l = l_ctx->Listener; - // set before destroy_l_ctx - l_ctx->Listener = NULL; - l_ctx->is_closed = TRUE; - - // If is_stopped, it means the listener is already stopped. - // there will be no callback for QUIC_LISTENER_EVENT_STOP_COMPLETE - // so we need to destroy the l_ctx otherwise it will leak. - is_destroy = l_ctx->is_stopped; enif_mutex_unlock(l_ctx->lock); - MsQuic->ListenerClose(l); - if (is_destroy) - { - destroy_l_ctx(l_ctx); - } + l_ctx->is_closed = TRUE; + put_listener_handle(l_ctx); + ret = ATOM_CLOSED; return ret; } @@ -535,6 +487,7 @@ stop_listener1(ErlNifEnv *env, exit: enif_mutex_unlock(l_ctx->lock); put_listener_handle(l_ctx); + CXPLAT_FRE_ASSERT(l_ctx->ref_count > 0); return ret; } @@ -606,7 +559,7 @@ start_listener3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) } // =================================================== - // Safe to access l_ctx now + // Safe to access l_ctx now // =================================================== enif_mutex_lock(l_ctx->lock); @@ -617,14 +570,9 @@ start_listener3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) goto exit; } - QuicerRegistrationCTX *target_r_ctx = NULL; - - // This is a read, do not need to bump the ref count - target_r_ctx = l_ctx->r_ctx ? l_ctx->r_ctx : G_r_ctx; - ret = ServerLoadConfiguration(env, &options, - target_r_ctx->Registration, + l_ctx->r_ctx->Registration, &new_config_ctx->Configuration, &CredConfig); free_certificate(&CredConfig); @@ -636,7 +584,7 @@ start_listener3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) goto exit; } - QuicerConfigCTX *old_config_ctx = l_ctx->config_resource; + QuicerConfigCTX *old_config_ctx = l_ctx->config_ctx; #if defined(QUICER_USE_TRUSTED_STORE) X509_STORE_free(l_ctx->trusted_store); @@ -664,29 +612,30 @@ start_listener3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) } l_ctx->is_stopped = FALSE; - l_ctx->config_resource = new_config_ctx; + l_ctx->config_ctx = new_config_ctx; // the ongoing handshake will be completed with the old config - enif_release_resource(old_config_ctx); + // @TODO We should close config ASAP to make acceptor fail + put_config_handle(old_config_ctx); exit: enif_mutex_unlock(l_ctx->lock); return ret; } +// Get listeners from registration ERL_NIF_TERM get_listenersX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QuicerRegistrationCTX *r_ctx = NULL; ERL_NIF_TERM res = enif_make_list(env, 0); - if (argc == 0) // use global registration + BOOLEAN isGlobal = argc == 0 ? TRUE : FALSE; + if (isGlobal) { - pthread_mutex_lock(&GRegLock); - if (!G_r_ctx) + if (!get_reg_handle(&G_r_ctx)) { - pthread_mutex_unlock(&GRegLock); return res; } - r_ctx = G_r_ctx; + r_ctx = &G_r_ctx; } else { @@ -705,10 +654,9 @@ get_listenersX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) Entry = Entry->Flink; } enif_mutex_unlock(r_ctx->lock); - - if (argc == 0) // use global registration + if (isGlobal) { - pthread_mutex_unlock(&GRegLock); + put_reg_handle(r_ctx); } return res; } diff --git a/c_src/quicer_nif.c b/c_src/quicer_nif.c index 45309b21..c130f286 100644 --- a/c_src/quicer_nif.c +++ b/c_src/quicer_nif.c @@ -47,6 +47,7 @@ closeLib(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM ATOM_TRUE; ERL_NIF_TERM ATOM_FALSE; +ERL_NIF_TERM ATOM_GLOBAL; // quicer internal 'errors' ERL_NIF_TERM ATOM_OK; @@ -475,6 +476,7 @@ ERL_NIF_TERM ATOM_QUIC_SEND_ECN_CONGESTION_COUNT; #define INIT_ATOMS \ ATOM(ATOM_TRUE, true); \ ATOM(ATOM_FALSE, false); \ + ATOM(ATOM_GLOBAL, global); \ \ ATOM(ATOM_OK, ok); \ ATOM(ATOM_ERROR, error); \ @@ -867,7 +869,7 @@ ERL_NIF_TERM ATOM_QUIC_SEND_ECN_CONGESTION_COUNT; ATOM(ATOM_QUIC_SEND_ECN_CONGESTION_COUNT, send_ecn_congestion_count); \ ATOM(ATOM_UNDEFINED, undefined); -extern QuicerRegistrationCTX *G_r_ctx; +extern QuicerRegistrationCTX G_r_ctx; extern pthread_mutex_t GRegLock; const QUIC_API_TABLE *MsQuic = NULL; @@ -952,7 +954,7 @@ resource_conn_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj) TP_CB_3(start, (uintptr_t)c_ctx->Connection, c_ctx->is_closed); // must be closed otherwise will trigger callback and casue race cond. // This ensures no callbacks during cleanup here. - assert(c_ctx->is_closed == TRUE); // in dealloc + CXPLAT_DBG_ASSERT(c_ctx->is_closed == TRUE); // in dealloc if (c_ctx->Connection) { TP_CB_3(close, (uintptr_t)c_ctx->Connection, c_ctx->is_closed); @@ -992,14 +994,12 @@ resource_stream_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj) { QuicerStreamCTX *s_ctx = (QuicerStreamCTX *)obj; TP_CB_3(start, (uintptr_t)s_ctx->Stream, s_ctx->is_closed); - assert(s_ctx->is_closed == TRUE); + CXPLAT_DBG_ASSERT(s_ctx->is_closed == TRUE); if (s_ctx->Stream && !s_ctx->is_closed) { MsQuic->StreamClose(s_ctx->Stream); } - // ensure it is called *After* StreamClose - enif_release_resource(s_ctx->c_ctx); AcceptorDestroy(s_ctx->owner); deinit_s_ctx(s_ctx); TP_CB_3(end, (uintptr_t)s_ctx->Stream, s_ctx->is_closed); @@ -1017,7 +1017,7 @@ resource_stream_down_callback(__unused_parm__ ErlNifEnv *env, enif_mutex_lock(s_ctx->lock); if (s_ctx && s_ctx->owner && DeadPid && !enif_compare_pids(&s_ctx->owner->Pid, DeadPid) - && get_stream_handle(s_ctx)) + && LOCAL_REFCNT(get_stream_handle(s_ctx))) { TP_CB_3(start, (uintptr_t)s_ctx->Stream, 0); if (QUIC_FAILED(status = MsQuic->StreamShutdown( @@ -1033,7 +1033,7 @@ resource_stream_down_callback(__unused_parm__ ErlNifEnv *env, { TP_CB_3(shutdown_success, (uintptr_t)s_ctx->Stream, status); } - put_stream_handle(s_ctx); + LOCAL_REFCNT(put_stream_handle(s_ctx)); } enif_mutex_unlock(s_ctx->lock); } @@ -1044,8 +1044,7 @@ resource_config_dealloc_callback(__unused_parm__ ErlNifEnv *env, { TP_CB_3(start, (uintptr_t)obj, 0); QuicerConfigCTX *config_ctx = (QuicerConfigCTX *)obj; - // Check if Registration is closed or not - if (G_r_ctx && config_ctx->Configuration) + if (config_ctx->Configuration) { MsQuic->ConfigurationClose(config_ctx->Configuration); } @@ -1057,12 +1056,10 @@ void resource_reg_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj) { TP_CB_3(start, (uintptr_t)obj, 0); - QuicerRegistrationCTX *reg_ctx = (QuicerRegistrationCTX *)obj; - deinit_r_ctx(reg_ctx); - if (MsQuic && reg_ctx->Registration) - { - MsQuic->RegistrationClose(reg_ctx->Registration); - } + QuicerRegistrationCTX *r_ctx = (QuicerRegistrationCTX *)obj; + CXPLAT_FRE_ASSERT(!get_reg_handle(r_ctx)); + CXPLAT_FRE_ASSERT(!r_ctx->Registration); + deinit_r_ctx(r_ctx); TP_CB_3(end, (uintptr_t)obj, 0); } @@ -1346,15 +1343,25 @@ closeLib(__unused_parm__ ErlNifEnv *env, pthread_mutex_lock(&GRegLock); // end of the world - if (G_r_ctx && !G_r_ctx->is_released) + // @TODO: This is temp solution to ensure closing the global registration + // However other none global registration could be still open. + if (!G_r_ctx.is_closed) { - // Make MsQuic debug check pass: - // Zero Registration when closing MsQuic - MsQuic->RegistrationClose(G_r_ctx->Registration); - G_r_ctx->Registration = NULL; - G_r_ctx->is_released = TRUE; - destroy_r_ctx(G_r_ctx); - G_r_ctx = NULL; + if (!get_reg_handle(&G_r_ctx)) + { + CXPLAT_DBG_ASSERTMSG(FALSE, + "Global Registration closed before MsQuic"); + } + // @FIXME: This is unsafe, but we have no choice for now + while (G_r_ctx.ref_count != 2) + { + printf("closelib wait for global reg cnt to be 2 but now: %ld\n", + (long)G_r_ctx.ref_count); + sleep(1); + } + G_r_ctx.is_closed = TRUE; + put_reg_handle(&G_r_ctx); + put_reg_handle(&G_r_ctx); } pthread_mutex_unlock(&GRegLock); @@ -1514,18 +1521,15 @@ atom_status(ErlNifEnv *env, QUIC_STATUS status) ERL_NIF_TERM controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + CXPLAT_FRE_ASSERT(2 == argc); QuicerStreamCTX *s_ctx = NULL; QuicerConnCTX *c_ctx = NULL; ErlNifPid target, caller; ERL_NIF_TERM new_owner = argv[1]; ERL_NIF_TERM res = ATOM_OK; - if (argc != 2) - { - return ATOM_BADARG; - } // precheck - if (!enif_get_local_pid(env, argv[1], &target)) + if (!enif_get_local_pid(env, new_owner, &target)) { return ERROR_TUPLE_2(ATOM_BADARG); } @@ -1538,7 +1542,7 @@ controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) if (enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) { - if (!get_stream_handle(s_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(s_ctx))) { return ERROR_TUPLE_2(ATOM_CLOSED); } @@ -1546,7 +1550,7 @@ controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) enif_mutex_lock(s_ctx->lock); res = stream_controlling_process(env, s_ctx, &caller, &new_owner); enif_mutex_unlock(s_ctx->lock); - put_stream_handle(s_ctx); + LOCAL_REFCNT(put_stream_handle(s_ctx)); } else if (enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx)) { @@ -1559,6 +1563,7 @@ controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) enif_mutex_unlock(c_ctx->lock); put_conn_handle(c_ctx); } + // @TODO: add listener controlling process else { return ERROR_TUPLE_2(ATOM_BADARG); @@ -1757,7 +1762,10 @@ static ErlNifFunc nif_funcs[] = { { "peercert", 1, peercert1, 0}, { "enable_sig_buffer", 1, enable_sig_buffer, 0}, { "flush_stream_buffered_sigs", 1, flush_stream_buffered_sigs, 0}, + { "count_reg_conns", 0, count_reg_connsX, 0}, + { "count_reg_conns", 1, count_reg_connsX, 0}, /* for DEBUG */ + { "get_registration_refcnt", 1, get_registration_refcnt, 0}, { "get_conn_rid", 1, get_conn_rid1, 1}, { "get_stream_rid", 1, get_stream_rid1, 1}, { "get_listeners", 0, get_listenersX, 0}, diff --git a/c_src/quicer_reg.c b/c_src/quicer_reg.c index c33ee2a4..654d9234 100644 --- a/c_src/quicer_reg.c +++ b/c_src/quicer_reg.c @@ -19,8 +19,9 @@ limitations under the License. static BOOLEAN parse_reg_conf(ERL_NIF_TERM eprofile, QUIC_REGISTRATION_CONFIG *RegConfig); -QuicerRegistrationCTX *G_r_ctx = NULL; +QuicerRegistrationCTX G_r_ctx = { .name = "global", .is_closed = TRUE }; pthread_mutex_t GRegLock = PTHREAD_MUTEX_INITIALIZER; +extern pthread_mutex_t MsQuicLock; /* ** Open global registration. @@ -34,9 +35,9 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) QUIC_STATUS status; ERL_NIF_TERM res = ATOM_OK; - if (!MsQuic || G_r_ctx) + if (!MsQuic) { - return ERROR_TUPLE_2(ATOM_BADARG); + return ERROR_TUPLE_2(ATOM_ERROR_INVALID_STATE); } pthread_mutex_lock(&GRegLock); @@ -51,30 +52,32 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) } } - QuicerRegistrationCTX *r_ctx = init_r_ctx(); - if (!r_ctx) + // This verifies the context is indeed released. + if (!get_reg_handle(&G_r_ctx)) { - pthread_mutex_unlock(&GRegLock); - return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); + // reg is closed + CXPLAT_DBG_ASSERT(G_r_ctx.is_closed); + CXPLAT_DBG_ASSERT(G_r_ctx.ref_count == 0); + init_r_ctx(&G_r_ctx); + QuicerRegistrationCTX *r_ctx = &G_r_ctx; + if (QUIC_FAILED(status = MsQuic->RegistrationOpen(&RegConfig, + &r_ctx->Registration))) + { + res = ERROR_TUPLE_2(ATOM_STATUS(status)); + goto exit; + } + r_ctx->is_closed = FALSE; + // Now it is safe for others to use + CxPlatRefInitialize(&r_ctx->ref_count); } - - if (QUIC_FAILED( - status = MsQuic->RegistrationOpen(&RegConfig, &r_ctx->Registration))) + else { - enif_release_resource(r_ctx); - res = ERROR_TUPLE_2(ATOM_STATUS(status)); - goto exit; + // already opened, deref now + put_reg_handle(&G_r_ctx); } - CxPlatRefInitialize(&r_ctx->ref_count); - G_r_ctx = r_ctx; pthread_mutex_unlock(&GRegLock); - - // nif owns the global registration - // thus not return to the erlang side return ATOM_OK; - exit: - destroy_r_ctx(r_ctx); pthread_mutex_unlock(&GRegLock); return res; } @@ -83,31 +86,43 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ** For global registration only */ ERL_NIF_TERM -deregistration(__unused_parm__ ErlNifEnv *env, +deregistration(ErlNifEnv *env, __unused_parm__ int argc, __unused_parm__ const ERL_NIF_TERM argv[]) { - // @TODO error_code should be configurable - int error_code = 0; + ERL_NIF_TERM res = ATOM_OK; + pthread_mutex_lock(&MsQuicLock); if (!MsQuic) { - return ERROR_TUPLE_2(ATOM_BADARG); + res = ERROR_TUPLE_2(ATOM_BADARG); + goto exit; } - pthread_mutex_lock(&GRegLock); - if (G_r_ctx && !G_r_ctx->is_released) + CXPLAT_REF_COUNT expected = 1; + if (!__atomic_compare_exchange_n(&G_r_ctx.ref_count, + &expected, + 0, + FALSE, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST)) { - MsQuic->RegistrationShutdown(G_r_ctx->Registration, FALSE, error_code); - // Do not defer the closing the global registration - // to resource dealloc callback because a common scenario is to - // close the lib after close the global registration. - MsQuic->RegistrationClose(G_r_ctx->Registration); - G_r_ctx->Registration = NULL; - destroy_r_ctx(G_r_ctx); - G_r_ctx = NULL; + // @NOTE, if already closed, should return ATOM_OK + if (expected != 0) + { + res = enif_make_int64(env, expected); + } } - pthread_mutex_unlock(&GRegLock); - return ATOM_OK; + else + { + HQUIC Registration = G_r_ctx.Registration; + G_r_ctx.is_closed = TRUE; + G_r_ctx.Registration = NULL; + MsQuic->RegistrationClose(Registration); + deinit_r_ctx(&G_r_ctx); + } +exit: + pthread_mutex_unlock(&MsQuicLock); + return res; } ERL_NIF_TERM @@ -127,7 +142,7 @@ new_registration2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_BADARG); } - QuicerRegistrationCTX *r_ctx = init_r_ctx(); + QuicerRegistrationCTX *r_ctx = init_r_ctx(NULL); if (!r_ctx) { return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); @@ -139,7 +154,7 @@ new_registration2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) || strlen(r_ctx->name) == 0)) { res = ERROR_TUPLE_2(ATOM_BADARG); - goto exit; + goto err_exit; } RegConfig.AppName = r_ctx->name; @@ -147,13 +162,12 @@ new_registration2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) status = MsQuic->RegistrationOpen(&RegConfig, &r_ctx->Registration))) { res = ERROR_TUPLE_2(ATOM_STATUS(status)); - goto exit; + goto err_exit; } - CxPlatRefInitialize(&r_ctx->ref_count); return SUCCESS(enif_make_resource(env, r_ctx)); -exit: - destroy_r_ctx(r_ctx); +err_exit: + put_reg_handle(r_ctx); return res; } @@ -164,7 +178,11 @@ shutdown_registration_x(ErlNifEnv *env, int argc, const ERL_NIF_TERM *argv) ErlNifUInt64 error_code = 0; BOOLEAN silent = FALSE; ERL_NIF_TERM ectx = argv[0]; - if (!enif_get_resource(env, ectx, ctx_reg_t, (void **)&r_ctx)) + if (IS_SAME_TERM(ectx, ATOM_GLOBAL)) + { + r_ctx = &G_r_ctx; + } + else if (!enif_get_resource(env, ectx, ctx_reg_t, (void **)&r_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); } @@ -191,13 +209,16 @@ shutdown_registration_x(ErlNifEnv *env, int argc, const ERL_NIF_TERM *argv) } } - if (r_ctx->Registration && !r_ctx->is_released) + if (get_reg_handle(r_ctx)) { // void return, trigger callback, no blocking MsQuic->RegistrationShutdown(r_ctx->Registration, silent, error_code); - destroy_r_ctx(r_ctx); + put_reg_handle(r_ctx); + } + else + { + return ERROR_TUPLE_2(ATOM_ERROR_INVALID_STATE); } - return ATOM_OK; } @@ -207,19 +228,38 @@ close_registration(ErlNifEnv *env, const ERL_NIF_TERM argv[]) { QuicerRegistrationCTX *r_ctx = NULL; - HQUIC Registration = NULL; ERL_NIF_TERM ectx = argv[0]; + ERL_NIF_TERM res = ATOM_OK; if (!enif_get_resource(env, ectx, ctx_reg_t, (void **)&r_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); } - enif_mutex_lock(r_ctx->lock); - Registration = r_ctx->Registration; - r_ctx->Registration = NULL; - enif_mutex_unlock(r_ctx->lock); - MsQuic->RegistrationClose(Registration); - destroy_r_ctx(r_ctx); - return ATOM_OK; + + CXPLAT_REF_COUNT expected = 1; + + if (!__atomic_compare_exchange_n(&r_ctx->ref_count, + &expected, + 0, + FALSE, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST)) + { + // @NOTE, if already closed, should return default ATOM_OK + if (expected != 0) + { + res = enif_make_int64(env, expected); + } + } + else + { + HQUIC Registration = r_ctx->Registration; + r_ctx->Registration = NULL; + MsQuic->RegistrationClose(Registration); + // @NOTE, we don't use put_reg_handle + // because we are pretty sure that the ref_count is 0 now + enif_release_resource(r_ctx); + } + return res; } ERL_NIF_TERM @@ -240,6 +280,33 @@ get_registration_name1(ErlNifEnv *env, return SUCCESS(name); } +ERL_NIF_TERM +get_registration_refcnt(ErlNifEnv *env, + __unused_parm__ int argc, + const ERL_NIF_TERM *argv) +{ + QuicerRegistrationCTX *r_ctx = NULL; + ERL_NIF_TERM ectx = argv[0]; + CXPLAT_DBG_ASSERT(argc == 1); + + if (IS_SAME_TERM(ectx, ATOM_GLOBAL)) + { + r_ctx = &G_r_ctx; + } + else if (!enif_get_resource(env, ectx, ctx_reg_t, (void **)&r_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + + if (!get_reg_handle(r_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } + CXPLAT_REF_COUNT cnt = r_ctx->ref_count; + put_reg_handle(r_ctx); + return enif_make_int64(env, cnt - 1); +} + BOOLEAN parse_reg_conf(ERL_NIF_TERM eprofile, QUIC_REGISTRATION_CONFIG *RegConfig) { diff --git a/c_src/quicer_reg.h b/c_src/quicer_reg.h index dc6e0b56..677f8433 100644 --- a/c_src/quicer_reg.h +++ b/c_src/quicer_reg.h @@ -35,4 +35,6 @@ close_registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM get_registration_name1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM +get_registration_refcnt(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); #endif // QUICER_REG_H_ diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index 9b675c8f..619b9a4b 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -140,9 +140,8 @@ ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event) if (is_destroy) { - put_stream_handle(s_ctx); // must be called after mutex unlock - destroy_s_ctx(s_ctx); + CALLBACK_DESTRUCT_REFCNT(put_stream_handle(s_ctx)); } return status; } @@ -242,16 +241,13 @@ _IRQL_requires_max_(DISPATCH_LEVEL) if (is_destroy) { // must be called after mutex unlock, - put_stream_handle(s_ctx); - destroy_s_ctx(s_ctx); + CALLBACK_DESTRUCT_REFCNT(put_stream_handle(s_ctx)); } return status; } ERL_NIF_TERM -async_start_stream2(ErlNifEnv *env, - __unused_parm__ int argc, - const ERL_NIF_TERM argv[]) +async_start_stream2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; QuicerConnCTX *c_ctx = NULL; @@ -262,6 +258,8 @@ async_start_stream2(ErlNifEnv *env, ERL_NIF_TERM eopen_flag; unsigned int open_flag = QUIC_STREAM_OPEN_FLAG_NONE; // default + CXPLAT_FRE_ASSERT(2 == argc); + ERL_NIF_TERM eoptions = argv[1]; if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx)) @@ -300,7 +298,6 @@ async_start_stream2(ErlNifEnv *env, if (!get_conn_handle(c_ctx)) { - //@TODO maybe other error like conn_closed? return ERROR_TUPLE_2(ATOM_CLOSED); } @@ -314,7 +311,6 @@ async_start_stream2(ErlNifEnv *env, // This is optional get_uint32_from_map(env, eoptions, ATOM_QUIC_EVENT_MASK, &s_ctx->event_mask); - enif_keep_resource(c_ctx); s_ctx->c_ctx = c_ctx; // Caller should be the owner of this stream. s_ctx->owner = AcceptorAlloc(); @@ -342,7 +338,6 @@ async_start_stream2(ErlNifEnv *env, ClientStreamCallback, s_ctx, &(s_ctx->Stream)); - CxPlatRefInitialize(&s_ctx->ref_count); if (QUIC_FAILED(Status)) { @@ -360,7 +355,7 @@ async_start_stream2(ErlNifEnv *env, // // We need to take a refcnt to avoid handle get closed as the StreamStart // may trigger callback in another thread. - if (!get_stream_handle(s_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(s_ctx))) { res = ERROR_TUPLE_2(ATOM_CLOSED); goto ErrorExit; @@ -368,13 +363,12 @@ async_start_stream2(ErlNifEnv *env, HQUIC Stream = s_ctx->Stream; Status = MsQuic->StreamStart(Stream, start_flag); cache_stream_id(s_ctx); - put_stream_handle(s_ctx); + LOCAL_REFCNT(put_stream_handle(s_ctx)); if (QUIC_FAILED(Status)) { - enif_mutex_lock(s_ctx->lock); - s_ctx->is_closed = TRUE; - enif_mutex_unlock(s_ctx->lock); + // revert the enif_make_resource... + enif_release_resource(s_ctx); res = ERROR_TUPLE_3(ATOM_STREAM_START_ERROR, ATOM_STATUS(Status)); goto ErrorExit; } @@ -388,19 +382,19 @@ async_start_stream2(ErlNifEnv *env, return SUCCESS(res); ErrorExit: - destroy_s_ctx(s_ctx); - put_conn_handle(c_ctx); + s_ctx->is_closed = TRUE; + // destruct as no return to the NIF caller + DESTRUCT_REFCNT(put_stream_handle(s_ctx)); return res; } // accept streams on top of connection. ERL_NIF_TERM -async_accept_stream2(ErlNifEnv *env, - __unused_parm__ int argc, - const ERL_NIF_TERM argv[]) +async_accept_stream2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QuicerConnCTX *c_ctx; ERL_NIF_TERM active_val; + CXPLAT_FRE_ASSERT(2 == argc); if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx)) { @@ -455,11 +449,7 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) unsigned int open_flag = QUIC_STREAM_OPEN_FLAG_NONE; // default uint32_t sendflags = 0; - if (4 != argc) - { - return ERROR_TUPLE_2(ATOM_BADARG); - } - + CXPLAT_FRE_ASSERT(4 == argc); ERL_NIF_TERM eHandle = argv[0]; ERL_NIF_TERM ebin = argv[1]; ERL_NIF_TERM eoptions = argv[2]; @@ -502,15 +492,13 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_CLOSED); } - // Allocate ctxs + // Allocate s_ctx QuicerStreamCTX *s_ctx = init_s_ctx(); if (!s_ctx) { return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); } - // release in resource_stream_dealloc_callback - enif_keep_resource(c_ctx); s_ctx->c_ctx = c_ctx; QuicerStreamSendCTX *send_ctx = init_send_ctx(); @@ -583,7 +571,6 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) s_ctx->Stream = NULL; goto ErrorExit; } - CxPlatRefInitialize(&(s_ctx->ref_count)); // Now we have Stream handle s_ctx->eHandle = enif_make_resource(s_ctx->imm_env, s_ctx); @@ -616,7 +603,6 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) send_ctx))) { enif_mutex_unlock(s_ctx->lock); - put_stream_handle(s_ctx); res = ERROR_TUPLE_3(ATOM_STREAM_SEND_ERROR, ATOM_STATUS(Status)); goto ErrorExit; } @@ -633,11 +619,8 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ErrorExit: destroy_send_ctx(send_ctx); - destroy_s_ctx(s_ctx); - put_conn_handle(c_ctx); - // Do not close the stream here, it will be done - // in resource_stream_dealloc_callback triggered by - // destroy_s_ctx + s_ctx->is_closed = TRUE; + DESTRUCT_REFCNT(put_stream_handle(s_ctx)); return res; } @@ -651,17 +634,14 @@ send3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM res = ATOM_OK; uint32_t sendflags = 0; - if (3 != argc) - { - return ERROR_TUPLE_2(ATOM_BADARG); - } + CXPLAT_FRE_ASSERT(3 == argc); if (!enif_get_resource(env, estream, ctx_stream_t, (void **)&s_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); } - if (!get_stream_handle(s_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(s_ctx))) { return ERROR_TUPLE_2(ATOM_CLOSED); } @@ -734,14 +714,14 @@ send3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ErrorExit: destroy_send_ctx(send_ctx); Exit: - put_stream_handle(s_ctx); + LOCAL_REFCNT(put_stream_handle(s_ctx)); return res; } ERL_NIF_TERM -recv2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) +recv2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { - + CXPLAT_FRE_ASSERT(argc == 2); QuicerStreamCTX *s_ctx; ErlNifBinary bin; ERL_NIF_TERM estream = argv[0]; @@ -834,14 +814,15 @@ recv2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } ERL_NIF_TERM -shutdown_stream3(ErlNifEnv *env, - __unused_parm__ int argc, - const ERL_NIF_TERM argv[]) +shutdown_stream3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { QUIC_STATUS Status; ERL_NIF_TERM ret = ATOM_OK; QuicerStreamCTX *s_ctx; uint32_t app_errcode = 0, flags = 0; + + CXPLAT_FRE_ASSERT(3 == argc); + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) { return ERROR_TUPLE_2(ATOM_BADARG); @@ -858,7 +839,7 @@ shutdown_stream3(ErlNifEnv *env, ret = ERROR_TUPLE_2(ATOM_BADARG); } - if (!get_stream_handle(s_ctx)) + if (!LOCAL_REFCNT(get_stream_handle(s_ctx))) { return ERROR_TUPLE_2(ATOM_CLOSED); } @@ -867,7 +848,7 @@ shutdown_stream3(ErlNifEnv *env, { ret = ERROR_TUPLE_2(ATOM_STATUS(Status)); } - put_stream_handle(s_ctx); + LOCAL_REFCNT(put_stream_handle(s_ctx)); return ret; } diff --git a/docs/Terminology.md b/docs/Terminology.md index 437f5840..1d311378 100644 --- a/docs/Terminology.md +++ b/docs/Terminology.md @@ -10,9 +10,10 @@ | | 'connection owner' receive events of a connection | | | 'stream owner' receive application data and events from a stream | | | 'listener owner' receive events from listener | -| | When owner is dead, related resources would be released | +| | When owner is dead, related resources will be released | | l_ctx | listener nif context | | c_ctx | connection nif context | | s_ctx | stream nif context | - +| r_ctx | registration nif context | +| config_ctx | configuration nif context | diff --git a/docs/internals.org b/docs/internals.org index 134604a0..f6821770 100644 --- a/docs/internals.org +++ b/docs/internals.org @@ -20,9 +20,10 @@ In erlang code, Handle (a handle term) is a REF to the contexts and they are `*_ctx` is the NIF resource object in C code, and it has 1. A MsQuic handle (HQUIC) for interworking with MsQuic. -2. A 'is_closed' boolean flag mark if the HQUIC is valid or not. +2. A 'is_closed' boolean flag mark if Erlang still owns the handle or not. If it is false, the HQUIC may become (or is becoming) invalid and it is unsafe to be used for calling MsQuic APIs. -3. ... Other members(fields) are irrelevant here. +3. handle has shared ownership and protected by `ref_count` +4. when handle has no more owner, it will be released. ** Creation @@ -46,119 +47,56 @@ In init_s_ctx 1. server: ctx is created in conn callback 2. client: ctx is created in quicer_nif:async_start_stream2/2 -** Destroy - -In erlang vm, if there is a var holding the handle, it will not be garbage collected. - -Same as in C NIF code, if the handle is one of the input arguments, it is safe to access the *_ctx because the var in erlang is still holding the handle but we cannot assume the API caller will hold the handle for the entire lifecycle. ** Thread Safety with ref counting. In C code, the resource object is not deallocated (become *invalid**) until the last handle term is garbage collected by the VM and the resource is released with enif_release_resource (not necessarily in that order). -note, The refcounting is very expensive call. -Access a *invalid* resource object via the pointer could cause SEGFAULT. - -In term of thread safety, There are three players in quicer that could access the resource object simultaneously. +In terms of thread safety, There are three players in quicer that could access the handle resource object simultaneously. a. The MsQuic worker thread is accessing *_ctx in the callback function -b. VM scheduler is executing a NIF function such as quicer:send/2 -c. VM scheduler thread is doing GC of the handle object - -After the context object is created, the refcnt is 1 - -1. VM ensures the c) happens last when refcnt is 0. - -2. In the nif function that needs to access the resource object: - - +call enif_keep_resource before the access+ - - +call enif_release_resource after the access+ - - At the time of accessing resource object, the Var is still refereced and it will not be garbage collected. - - It is unnecessary to keep/release the resources. - - Note @TODO, unsure this is still true for the NIF that runs on dirty scheduler. - -3. In the nif function that calls the MsQuic API which can cause later accessing resource object in the callback context in the msquic worker thread. - - It is unnecessary to keep/release the resources. - -4. In MsQuic callback function when handling the event of 'SHUTDOWN COMPLETE' - - 'SHUTDOWN COMPLETE' means resource is no longer active and there will be no more callbacks - (Connection->ClientCallbackHandle = NULL) - - Callback should decrease the refcnt. - - In callback, - Call enif_release_resource - Set msquic Handle to NULL within locking context. - Msquic API could deal with NULL handle. - -5. In MsQuic callback function and NIF function - - When the high-level resource object is created on top of a lower level. - - For example: when a stream is created in a connection, we need to call enif_keep_resource for connection resource object bump the refcnt. - -6. Deinit the resource object in resource destruct function ~resource_*_dealloc_callback~ - - Call enif_release_resource for corresponding lower-level resource object. - - For instance, when deinit the s_ctx, it should deref the c_ctx. - - Everything contained in the resource object becomes invalid and it will be impossible to access the destructed - resource obj from a) and b). But for accessing from c) it is out of our control. - - Close the msquic handle. - -7. When close the connection via quicer_nif:close_connection3 - - Set Connection Handle to NULL within locked context - -8. MsQuic callback signaling - - NO-OP - -9. @TODO: Catch runtime error signals from msquic - To not 'crash' the erlang VM. we need some signal handling to handle the runtime error signals from msquic such as runtime assertions and raise an alarm for 'need maintaince'. - - If the resource that links to the error is identical, we could try to mark it blacklisted and deny further access. - -*** Connection Client +b. VM scheduler is executing a NIF function such as `quicer:send/2` +c. VM scheduler thread is doing GC, releasing the resource object. -The connection ctx resource is *created* in NIF quicer:async_connect. +Handle resource object is allocated with two parts of resources. +1. beam VM resources + - enif_env + - enif_mutex + - cached eterms -enif_release_resource is called in 'destroy_c_ctx'. +2. MsQuic resources + - registration handle + - listener handle + - connection handle + - stream handle -*** Connection Server +*** Ref counting MsQuic resources -The connection ctx resource is *created* in 'ServerListenerCallback'. +The `ref_count` is for MsQuic resource handle for shared ownership. -enif_release_resource is called in 'destroy_c_ctx'. +One example of shared ownership is that in multistreams scenario, each `s_ctx` (for Stream) shares +the ownership of `c_ctx` (Connection) with other `s_ctx` and `c_ctx` itself. -*** Stream Client +when ref_count is 0, the c_ctx will be released. -For the stream init from client -The stream ctx resource is *created* in 'async_start_stream2'. -Client call enif_keep_resource on the connection ctx which it belongs to. +functions: (`xx` is the handle name, e.g. put_stream_handle) +- put_xx_handle +- get_xx_handle -For the stream init from server -The stream ctx resource is *created* in 'ClientConnectionCallback'. -Client call enif_keep_resource on the connection ctx which it belongs to. +*** Ref counting VM resources -enif_release_resource is called in 'destroy_s_ctx'. -enif_release_resource of connection ctx is called in 'destroy_c_ctx'. +VM resources are wrapped in handle context (e.g. `s_ctx`). -*** Stream Server -The stream ctx resource is *created* in 'ServerConnectionCallback'. -Then call enif_keep_resource on the connection ctx which it belongs to. +owners: +- Erlang part +- NIF Part -enif_release_resource is called in 'destroy_s_ctx'. -enif_release_resource of connection ctx is called in 'destroy_c_ctx'. +functions: +- enif_make_resource +- enif_keep_resource +- enif_release_resource +@NOTE: Logically, handle context must not be released before `ref_count` become 0. * MsQuic API Usages @@ -296,7 +234,7 @@ SYNC call in non-callback-context | QUIC_LISTENER_EVENT_NEW_CONNECTION | {quic, new_conn , connection_handle()} | | QUIC_LISTENER_EVENT_STOP_COMPLETE | {quic, listener_stopped , listener_handle()} | -** Connecion Events +** Connection Events *QUIC_CONNECTION_EVENT* in msquic.h diff --git a/docs/messages_to_owner.md b/docs/messages_to_owner.md index 87be3987..d351732f 100644 --- a/docs/messages_to_owner.md +++ b/docs/messages_to_owner.md @@ -365,7 +365,7 @@ The stream acceptor will no longer get new incoming stream. ### New incoming connection ``` erlang -{quic, new_conn, connection_handle(), ConnecionInfo :: #{ version := integer() +{quic, new_conn, connection_handle(), ConnectionInfo :: #{ version := integer() , local_addr := string() , remote_addr := string() , server_name := binary() diff --git a/src/quicer.erl b/src/quicer.erl index 18bc1381..95e35fa3 100644 --- a/src/quicer.erl +++ b/src/quicer.erl @@ -28,6 +28,7 @@ shutdown_registration/1, shutdown_registration/3, get_registration_name/1, + get_registration_refcnt/1, reg_open/0, reg_open/1, reg_close/0 @@ -94,7 +95,8 @@ wait_for_handoff/2, handoff_stream/2, handoff_stream/3, - perf_counters/0 + perf_counters/0, + count_reg_conns/1 ]). %% helpers @@ -227,10 +229,16 @@ shutdown_registration(Handle, IsSilent, ErrCode) -> quicer_nif:shutdown_registration(Handle, IsSilent, ErrCode). %% @doc close a registration. --spec close_registration(reg_handle()) -> - quicer_nif:close_registration(). +-spec close_registration(reg_handle()) -> ok. close_registration(Handle) -> - quicer_nif:close_registration(Handle). + case quicer_nif:close_registration(Handle) of + ok -> + ok; + N -> + logger:info("pending close_registration refcnt: ~p~n", [N]), + timer:sleep(100), + close_registration(Handle) + end. %% @doc get registration name -spec get_registration_name(reg_handle()) -> @@ -238,6 +246,12 @@ close_registration(Handle) -> get_registration_name(Handle) -> quicer_nif:get_registration_name(Handle). +%% @doc get registration reference count +-spec get_registration_refcnt(global | reg_handle()) -> + quicer_nif:get_registration_refcnt(). +get_registration_refcnt(Handle) -> + quicer_nif:get_registration_refcnt(Handle). + %% @doc GRegistraion should be opened before calling traffic APIs. %% %% This is called automatically when quicer application starts with @@ -342,6 +356,8 @@ close_listener(Listener) -> ok | {error, badarg | closed | timeout}. close_listener(Listener, Timeout) -> case quicer_nif:close_listener(Listener) of + closed -> + ok; ok when Timeout == 0 -> ok; ok -> @@ -1079,6 +1095,12 @@ get_connections(global) -> get_connections(Reg) -> quicer_nif:get_connections(Reg). +-spec count_reg_conns(reg_handle() | global) -> non_neg_integer(). +count_reg_conns(global) -> + quicer_nif:count_reg_conns(); +count_reg_conns(Reg) -> + quicer_nif:count_reg_conns(Reg). + -spec get_conn_owner(connection_handle()) -> quicer_nif:get_owner(). get_conn_owner(Conn) -> quicer_nif:get_conn_owner(Conn). diff --git a/src/quicer_connection.erl b/src/quicer_connection.erl index bb354717..ff144f02 100644 --- a/src/quicer_connection.erl +++ b/src/quicer_connection.erl @@ -155,6 +155,7 @@ %% for server start_link/4, get_cb_state/1, + merge_cb_state/2, stream_send/6, get_handle/1 ]). @@ -219,6 +220,12 @@ start_link(CallbackModule, Listener, Opts, Sup) -> get_cb_state(ConnPid) -> gen_server:call(ConnPid, get_cb_state, infinity). +-spec merge_cb_state(ConnPid :: pid(), New :: map()) -> cb_state() | {error, any()}. +merge_cb_state(ConnPid, New) when is_map(New) -> + gen_server:call(ConnPid, {merge_cb_state, New}, infinity); +merge_cb_state(_ConnPid, _New) -> + {error, unsupported_type}. + -spec stream_send( ConnPid :: pid(), Callback :: atom(), @@ -325,6 +332,12 @@ init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModul | {stop, Reason :: term(), NewState :: term()}. handle_call(get_cb_state, _From, #{callback_state := CbState} = State) -> {reply, CbState, State}; +handle_call({merge_cb_state, New}, _From, #{callback_state := CbState} = State) when + is_map(New) +-> + NewCBState = maps:merge(CbState, New), + NewState = State#{callback_state := NewCBState}, + {reply, NewCBState, NewState}; handle_call(get_handle, _From, #{conn := Connection} = State) -> {reply, Connection, State}; handle_call( diff --git a/src/quicer_listener.erl b/src/quicer_listener.erl index 054e605a..55dcc828 100644 --- a/src/quicer_listener.erl +++ b/src/quicer_listener.erl @@ -220,6 +220,8 @@ handle_cast(_Request, State) -> | {noreply, NewState :: term(), hibernate} | {stop, Reason :: normal | term(), NewState :: term()}. handle_info({quic, listener_stopped, L}, #state{listener = L} = State) -> + %% uncontroled stop: + _ = quicer:close_listener(L), {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/quicer_listener_sup.erl b/src/quicer_listener_sup.erl index 98440099..ea986674 100644 --- a/src/quicer_listener_sup.erl +++ b/src/quicer_listener_sup.erl @@ -50,7 +50,7 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). start_listener(AppName, Port, Options) -> - supervisor:start_child(?MODULE, chid_spec(AppName, Port, Options)). + supervisor:start_child(?MODULE, child_spec(AppName, Port, Options)). stop_listener(AppName) -> _ = supervisor:terminate_child(?MODULE, ?CHILD_ID(AppName)), @@ -119,7 +119,7 @@ init([]) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -chid_spec(AppName, ListenOn, Options) -> +child_spec(AppName, ListenOn, Options) -> #{ id => ?CHILD_ID(AppName), start => {quicer_listener, start_link, [AppName, ListenOn, Options]}, diff --git a/src/quicer_nif.erl b/src/quicer_nif.erl index 3d260a0a..37bf8ad5 100644 --- a/src/quicer_nif.erl +++ b/src/quicer_nif.erl @@ -25,6 +25,7 @@ shutdown_registration/3, close_registration/1, get_registration_name/1, + get_registration_refcnt/1, listen/2, start_listener/3, stop_listener/1, @@ -46,7 +47,9 @@ controlling_process/2, peercert/1, enable_sig_buffer/1, - flush_stream_buffered_sigs/1 + flush_stream_buffered_sigs/1, + count_reg_conns/0, + count_reg_conns/1 ]). -export([ @@ -88,6 +91,7 @@ shutdown_registration/0, close_registration/0, get_registration_name/0, + get_registration_refcnt/0, get_listeners/0, get_connections/0, get_owner/0, @@ -98,9 +102,10 @@ %% NIF fuction return types -type abi_version() :: integer(). -type new_registration() :: {ok, reg_handle()} | {error, atom_reason()}. --type shutdown_registration() :: ok | {error, badarg}. +-type shutdown_registration() :: ok | {error, badarg | invalid_state}. -type close_registration() :: ok | {error, badarg}. -type get_registration_name() :: {ok, string()} | {error, badarg}. +-type get_registration_refcnt() :: {error, closed} | integer(). -type get_listeners() :: [listener_handle()]. -type get_connections() :: [connection_handle()]. -type get_owner() :: {ok, pid()} | {error, undefined | badarg}. @@ -184,11 +189,11 @@ open_lib(_LttngLib) -> close_lib() -> erlang:nif_error(nif_library_not_loaded). --spec reg_open() -> ok | {error, badarg}. +-spec reg_open() -> ok | {error, badarg | invalid_state}. reg_open() -> erlang:nif_error(nif_library_not_loaded). --spec reg_open(execution_profile()) -> ok | {error, badarg}. +-spec reg_open(execution_profile()) -> ok | {error, badarg | invalid_state}. reg_open(_) -> erlang:nif_error(nif_library_not_loaded). @@ -200,11 +205,11 @@ reg_close() -> new_registration(_Name, _Profile) -> erlang:nif_error(nif_library_not_loaded). --spec shutdown_registration(reg_handle()) -> shutdown_registration(). +-spec shutdown_registration(global | reg_handle()) -> shutdown_registration(). shutdown_registration(_Handle) -> erlang:nif_error(nif_library_not_loaded). --spec shutdown_registration(reg_handle(), IsSilent :: boolean(), ErrorCode :: uint64()) -> +-spec shutdown_registration(global | reg_handle(), IsSilent :: boolean(), ErrorCode :: uint64()) -> shutdown_registration(). shutdown_registration(_Handle, _IsSilent, _ErrorCode) -> erlang:nif_error(nif_library_not_loaded). @@ -217,6 +222,10 @@ close_registration(_Handle) -> get_registration_name(_Handle) -> erlang:nif_error(nif_library_not_loaded). +-spec get_registration_refcnt(reg_handle()) -> get_registration_refcnt(). +get_registration_refcnt(_Handle) -> + erlang:nif_error(nif_library_not_loaded). + -spec listen(listen_on(), listen_opts()) -> {ok, listener_handle()} | {error, listener_open_error, atom_reason()} @@ -228,8 +237,12 @@ listen(_ListenOn, _Options) -> ok | {error, closed | badarg}. start_listener(_Listener, _ListenOn, _Opts) -> erlang:nif_error(nif_library_not_loaded). - --spec close_listener(listener_handle()) -> ok | {error, closed | badarg}. +%% @doc close the listener +%% return closed if the listener is closed. +%% return ok if the listener is stopped then closed, and caller should expect listener_stopped signal. +%% +%% @end +-spec close_listener(listener_handle()) -> ok | closed | {error, closed | badarg}. close_listener(_Listener) -> erlang:nif_error(nif_library_not_loaded). @@ -400,6 +413,13 @@ get_connections() -> get_connections(_RegHandle) -> erlang:nif_error(nif_library_not_loaded). +-spec count_reg_conns() -> non_neg_integer(). +count_reg_conns() -> + erlang:nif_error(nif_library_not_loaded). +-spec count_reg_conns(reg_handle()) -> non_neg_integer() | {error, badarg}. +count_reg_conns(_RegHandle) -> + erlang:nif_error(nif_library_not_loaded). + -spec copy_stream_handle(stream_handle()) -> {ok, stream_handle()} | {error, badarg}. copy_stream_handle(_H) -> erlang:nif_error(nif_library_not_loaded). diff --git a/src/quicer_server_conn_callback.erl b/src/quicer_server_conn_callback.erl index 8e81a66d..7b8f6743 100644 --- a/src/quicer_server_conn_callback.erl +++ b/src/quicer_server_conn_callback.erl @@ -135,7 +135,7 @@ connected( %% @TODO configurable behavior of spawing stream acceptor _ = quicer_stream:start_link(Callback, Conn, SOpts), {ok, S#{conn => Conn}}; -connected(_Connecion, _Flags, S) -> +connected(_Connection, _Flags, S) -> {ok, S}. handle_info({'EXIT', _Pid, _Reason}, State) -> diff --git a/test/quicer_SUITE.erl b/test/quicer_SUITE.erl index 0c6a4928..d7701a88 100644 --- a/test/quicer_SUITE.erl +++ b/test/quicer_SUITE.erl @@ -74,6 +74,7 @@ tc_stream_get_owner_remote/1, tc_dgram_client_send/1, + tc_dgram_client_send_iolist/1, % , tc_getopt_raw/1 tc_getopt/1, @@ -256,6 +257,7 @@ end_per_testcase(tc_open_listener_neg_1, _Config) -> quicer:open_lib(), quicer:reg_open(); end_per_testcase(tc_lib_registration_neg, _Config) -> + quicer:open_lib(), quicer:reg_open(); end_per_testcase(_TestCase, _Config) -> quicer:terminate_listener(mqtt), @@ -306,7 +308,7 @@ tc_close_lib_test(_Config) -> tc_lib_registration_neg(_Config) -> ok = quicer:close_lib(), - {error, badarg} = quicer:reg_open(), + {error, invalid_state} = quicer:reg_open(), {error, badarg} = quicer:reg_close(). tc_lib_registration(_Config) -> @@ -339,7 +341,7 @@ tc_lib_re_registration(_Config) -> ok = quicer:reg_close(), ok = quicer:reg_open() end, - {error, badarg} = quicer:reg_open(), + ok = quicer:reg_open(), ok = quicer:reg_close(), ok = quicer:reg_close(). @@ -872,6 +874,8 @@ tc_stream_controlling_process_demon(Config) -> {'DOWN', MonRef2, process, NewOwner2, normal} -> ok end, + %% Give time for resource down callback to happen + timer:sleep(100), ?assertNotMatch({ok, _}, quicer:send(Stm, <<"owner_changed">>)), SPid ! done, ensure_server_exit_normal(Ref) @@ -884,20 +888,39 @@ tc_dgram_client_send(Config) -> Owner = self(), {SPid, Ref} = spawn_monitor(fun() -> ping_pong_server_dgram(Owner, Config, Port) end), receive - listener_ready -> - Opts = default_conn_opts() ++ [{datagram_receive_enabled, 1}], - {ok, Conn} = quicer:connect("localhost", Port, Opts, 5000), - {ok, Stm} = quicer:start_stream(Conn, []), - {ok, 4} = quicer:send(Stm, <<"ping">>), - {ok, 4} = quicer:send_dgram(Conn, <<"ping">>), - flush_streams_available(Conn), - flush_datagram_state_changed(Conn), - dgram_client_recv_loop(Conn, false, false), - SPid ! done, - ok = ensure_server_exit_normal(Ref) - after 1000 -> - ct:fail("timeout here") - end. + listener_ready -> ok + after 1000 -> ct:fail("listener_ready timeout") + end, + Opts = default_conn_opts() ++ [{datagram_receive_enabled, 1}], + {ok, Conn} = quicer:connect("localhost", Port, Opts, 5000), + {ok, Stm} = quicer:start_stream(Conn, []), + {ok, 4} = quicer:send(Stm, <<"ping">>), + {ok, 4} = quicer:send_dgram(Conn, <<"ping">>), + + flush_streams_available(Conn), + flush_datagram_state_changed(Conn), + dgram_client_recv_loop(Conn, false, false), + SPid ! done, + ok = ensure_server_exit_normal(Ref). + +tc_dgram_client_send_iolist(Config) -> + Port = select_port(), + Owner = self(), + {SPid, Ref} = spawn_monitor(fun() -> ping_pong_server_dgram(Owner, Config, Port) end), + receive + listener_ready -> ok + after 1000 -> ct:fail("listener_ready timeout") + end, + Opts = default_conn_opts() ++ [{datagram_receive_enabled, 1}], + {ok, Conn} = quicer:connect("localhost", Port, Opts, 5000), + {ok, Stm} = quicer:start_stream(Conn, []), + {ok, 4} = quicer:send(Stm, <<"ping">>), + {ok, 4} = quicer:send_dgram(Conn, [<<"pi">>, "ng"]), + flush_streams_available(Conn), + flush_datagram_state_changed(Conn), + dgram_client_recv_loop(Conn, false, false), + SPid ! done, + ok = ensure_server_exit_normal(Ref). dgram_client_recv_loop(Conn, true, true) -> ok = quicer:close_connection(Conn); @@ -1252,7 +1275,7 @@ tc_getstat(Config) -> tc_getstat_closed(Config) -> Port = select_port(), Owner = self(), - {SPid, _Ref} = spawn_monitor(fun() -> echo_server(Owner, Config, Port) end), + {SPid, Ref} = spawn_monitor(fun() -> echo_server(Owner, Config, Port) end), receive listener_ready -> {ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000), @@ -1275,7 +1298,8 @@ tc_getstat_closed(Config) -> ok end, %ok = quicer:close_connection(Conn), - SPid ! done + SPid ! done, + ensure_server_exit_normal(Ref) after 5000 -> ct:fail("listener_timeout") end. @@ -1283,7 +1307,7 @@ tc_getstat_closed(Config) -> tc_peername_v6(Config) -> Port = select_port(), Owner = self(), - {SPid, _Ref} = spawn_monitor(fun() -> echo_server(Owner, Config, Port) end), + {SPid, Ref} = spawn_monitor(fun() -> echo_server(Owner, Config, Port) end), receive listener_ready -> {ok, Conn} = quicer:connect("::1", Port, default_conn_opts(), 5000), @@ -1296,7 +1320,8 @@ tc_peername_v6(Config) -> ct:pal("addr is ~p", [Addr]), "::1" = inet:ntoa(Addr), ok = quicer:close_connection(Conn), - SPid ! done + SPid ! done, + ensure_server_exit_normal(Ref) after 5000 -> ct:fail("listener_timeout") end. @@ -1605,9 +1630,11 @@ tc_setopt_conn_remote_addr(_Config) -> {ok, _} -> ok; {error, transport_down, #{error := 298, status := bad_certificate}} -> - %% Mac @TODO don't know why it failed + %% Mac + {unix, darwin} = os:type(), ok - end. + end, + quicer:shutdown_connection(Conn). tc_setopt_global_retry_mem_percent(_Config) -> ?assertEqual(ok, quicer:setopt(quic_global, retry_memory_percent, 30, false)). @@ -2032,7 +2059,7 @@ tc_stream_open_flag_unidirectional(Config) -> {quic, <<"ping1">>, Stm, _} -> ct:fail("unidirectional stream should not receive any"); {quic, stream_closed, Stm, #{is_conn_shutdown := _, is_app_closing := false}} -> - ct:pal("stream is closed due to connecion idle") + ct:pal("stream is closed due to connection idle") end, ?assert(is_integer(Rid)), ?assert(Rid =/= 0), @@ -2108,7 +2135,7 @@ tc_stream_start_flag_fail_blocked(Config) -> end, receive {quic, closed, Conn, _Flags} -> - ct:pal("Connecion is closed ~p", [Conn]) + ct:pal("Connection is closed ~p", [Conn]) end, quicer:terminate_listener(mqtt), ?assert(is_integer(Rid)). @@ -3520,13 +3547,14 @@ simple_stream_server(Owner, Config, Port) -> done -> ok end, + ct:pal("Close Listener: ~p", [L]), simple_stream_server_exit(L). simple_stream_server_exit(L) -> - quicer:close_listener(L). + ok = quicer:close_listener(L, 5000). ensure_server_exit_normal(MonRef) -> - ensure_server_exit_normal(MonRef, 5000). + ensure_server_exit_normal(MonRef, 10000). ensure_server_exit_normal(MonRef, Timeout) -> receive {'DOWN', MonRef, process, _, normal} -> diff --git a/test/quicer_connection_SUITE.erl b/test/quicer_connection_SUITE.erl index 22f7cf1b..edb74f39 100644 --- a/test/quicer_connection_SUITE.erl +++ b/test/quicer_connection_SUITE.erl @@ -88,7 +88,7 @@ init_per_group(suite_reg, Config) -> %% @end %%-------------------------------------------------------------------- end_per_group(suite_reg, Config) -> - Reg = proplists:get_value(quic_registration, Config), + Reg = ?config(quic_registration, Config), quicer:shutdown_registration(Reg), ok = quicer:close_registration(Reg); end_per_group(_GroupName, _Config) -> @@ -888,6 +888,7 @@ tc_get_conn_owner_server(Config) -> end), receive {quic, new_conn, SConn, _} -> + {error, badarg} = quicer:get_conn_owner(undefined), {ok, Pid} = quicer:get_conn_owner(SConn), ?assertEqual(self(), Pid), quicer:close_connection(SConn), @@ -917,6 +918,79 @@ tc_shutdown_conn_before_handshake(Config) -> end, quicer:close_listener(L). +tc_conn_count_default(_Config) -> + ?assertEqual(0, quicer:count_reg_conns(global)). + +tc_conn_count_for_registered_listeners(Config) -> + Port = select_port(), + %% GIVEN: registrations are created for server listener and clients + {ok, RegL} = quicer:new_registration( + "listener", quic_execution_profile_max_throughput + ), + {ok, RegC} = quicer:new_registration( + "clients", quic_execution_profile_max_throughput + ), + %% WHEN: there is no connections from clients. + {ok, L} = quicer:listen(Port, default_listen_opts(Config ++ [{quic_registration, RegL}])), + {ok, {_, _Port}} = quicer:sockname(L), + %% THEN: The Listener registration has 0 connection + ?assertEqual(0, quicer:count_reg_conns(RegL)), + %% WHEN: N clients are connected + NClients = 10, + CPids = lists:map( + fun(_) -> + {ok, L} = quicer:async_accept(L, #{}), + spawn(fun() -> + {ok, CConn} = quicer:connect( + "localhost", + Port, + [{quic_registration, RegC} | default_conn_opts()], + 1000 + ), + receive + done -> + quicer:close_connection(CConn) + end + end) + end, + lists:seq(1, NClients) + ), + lists:foreach( + fun(_) -> + receive + {quic, new_conn, Conn, _} -> + quicer:handshake(Conn) + after 1000 -> + ct:fail("conn from client timeout") + end + end, + lists:seq(1, NClients) + ), + %% THEN: The Listener registration has N connections + ?assertEqual(NClients, quicer:count_reg_conns(RegL)), + %% THEN: The Client registration has N connections + ?assertEqual(NClients, quicer:count_reg_conns(RegC)), + %% THEN: The default global registration has 0 connections + ?assertEqual(0, quicer:count_reg_conns(global)), + + lists:foreach(fun(Pid) -> Pid ! done end, CPids), + ok = quicer:close_listener(L), + ok = quicer:close_registration(RegL), + ok = quicer:close_registration(RegC), + ok. + +tc_invalid_conn_reg(_Config) -> + Opts = default_conn_opts() ++ [{quic_registration, erlang:make_ref()}], + ?assertEqual({error, quic_registration}, quicer:connect("localhost", 443, Opts, 5000)). + +tc_closed_conn_reg(_Config) -> + {ok, ThisReg} = quicer:new_registration( + atom_to_list(?FUNCTION_NAME), quic_execution_profile_max_throughput + ), + ok = quicer:close_registration(ThisReg), + Opts = default_conn_opts() ++ [{quic_registration, ThisReg}], + ?assertEqual({error, quic_registration}, quicer:connect("localhost", 443, Opts, 5000)). + %%% %%% Helpers %%% diff --git a/test/quicer_listener_SUITE.erl b/test/quicer_listener_SUITE.erl index b1a86f79..e19eee88 100644 --- a/test/quicer_listener_SUITE.erl +++ b/test/quicer_listener_SUITE.erl @@ -102,12 +102,14 @@ end_per_group(_GroupName, _Config) -> init_per_testcase(tc_listener_conf_reload_listen_on_neg, Config) -> case os:type() of {unix, darwin} -> {skip, "Not runnable on MacOS"}; - _ -> Config + _ -> init_per_testcase(default, Config) end; -init_per_testcase(_TestCase, Config) -> +init_per_testcase(default, Config) -> application:ensure_all_started(quicer), quicer_test_lib:cleanup_msquic(), - Config. + Config; +init_per_testcase(_TestCase, Config) -> + init_per_testcase(default, Config). %%-------------------------------------------------------------------- %% @spec end_per_testcase(TestCase, Config0) -> @@ -119,7 +121,13 @@ init_per_testcase(_TestCase, Config) -> %%-------------------------------------------------------------------- end_per_testcase(_TestCase, Config) -> RegH = proplists:get_value(quic_registration, Config, global), - [quicer:close_listener(L, 1000) || L <- quicer:get_listeners(RegH)], + lists:foreach( + fun(L) -> + ct:pal("end_per_testcase: closing listener ~p", [L]), + quicer:close_listener(L, 1000) + end, + quicer:get_listeners(RegH) + ), quicer_test_lib:report_active_connections(), ok. @@ -242,7 +250,7 @@ tc_open_listener(Config) -> tc_open_listener_with_inval_reg(Config) -> Port = select_port(), Config1 = proplists:delete(quic_registration, Config), - %% Given invalid registration + %% Given invalid registration Reg = erlang:make_ref(), %% When try to listen with the invalid registration Res = quicer:listen(Port, default_listen_opts([{quic_registration, Reg} | Config1])), @@ -265,6 +273,7 @@ tc_open_listener_with_new_reg(Config) -> {ok, P} = snabbkaffe:retry(100, 10, fun() -> {ok, _} = gen_udp:open(Port) end), ok = gen_udp:close(P), ok = quicer:shutdown_registration(Reg), + ok = quicer:close_registration(Reg), ok. tc_open_listener_with_cert_password(Config) -> @@ -497,7 +506,7 @@ tc_listener_conf_reload(Config) -> ], Options = {ListenerOpts, ConnectionOpts, StreamOpts}, - %% Given a QUIC connection between example client and example server + %% GIVEN: a QUIC connection between example client and example server {ok, QuicApp} = quicer:spawn_listener(sample, Port, Options), ClientConnOpts = default_conn_opts_verify(Config, ca), {ok, ClientConnPid} = example_client_connection:start_link( @@ -591,7 +600,7 @@ tc_listener_conf_reload_listen_on(Config) -> ], Options = {ListenerOpts, ConnectionOpts, StreamOpts}, - %% Given a QUIC connection between example client and example server + %% GIVEN: a QUIC connection between example client and example server {ok, QuicApp} = quicer:spawn_listener(sample, Port, Options), ClientConnOpts = default_conn_opts_verify(Config, ca), {ok, ClientConnPid} = example_client_connection:start_link( @@ -602,11 +611,11 @@ tc_listener_conf_reload_listen_on(Config) -> ct:pal("C1 status : ~p", [sys:get_status(ClientConnPid)]), {ok, LHandle} = quicer_listener:get_handle(QuicApp, 5000), - %% WHEN: the listener is reloaded with ListenOn (new bind address) NewPort = select_port(), ok = quicer_listener:reload(QuicApp, NewPort, ListenerOpts), %% THEN: the listener handle is unchanged + %% ?assertEqual({ok, LHandle}, quicer_listener:get_handle(QuicApp, 5000)), %% THEN: start new connection to old port @@ -729,8 +738,7 @@ tc_listener_conf_reload_listen_on_neg(Config) -> quicer_test_lib:report_unhandled_messages(), ct:fail("nothing from conn 2") end, - - quicer_listener:stop_listener(QuicApp). + quicer:stop_listener(?FUNCTION_NAME). tc_stop_close_listener(Config) -> Port = select_port(), @@ -912,9 +920,8 @@ tc_listener_stopped_when_owner_die(Config) -> {ok, _L0} = quicer:listen(Port, default_listen_opts(Config)) end ), - %% Then the old listener can be closed but timeout since it is already stopped - %% and no stop event is triggered - {error, timeout} = quicer:close_listener(L0, _timeout = 10), + %% Then the old listener can be closed normally + ok = quicer:close_listener(L0, _timeout = 10), %% Then the new listener can be closed ok = quicer:close_listener(L1). diff --git a/test/quicer_reg_SUITE.erl b/test/quicer_reg_SUITE.erl index cc037fd7..89f38a46 100644 --- a/test/quicer_reg_SUITE.erl +++ b/test/quicer_reg_SUITE.erl @@ -148,6 +148,7 @@ do_tc_new_reg(_Config) -> Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), quicer:shutdown_registration(Reg), + quicer:close_registration(Reg), ok. tc_shutdown_reg_1(_Config) -> @@ -155,6 +156,7 @@ tc_shutdown_reg_1(_Config) -> Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), ok = quicer:shutdown_registration(Reg), + ok = quicer:close_registration(Reg), ok. tc_shutdown_1_abnormal(_Config) -> @@ -168,26 +170,30 @@ tc_shutdown_3_abnormal(_Config) -> ?assertEqual({error, badarg}, quicer:shutdown_registration(Reg, 1, 2)), ?assertEqual({error, badarg}, quicer:shutdown_registration(Reg, 1, foo)), ?assertEqual({error, badarg}, quicer:shutdown_registration(Reg, true, -1)), - ok = quicer:shutdown_registration(Reg). + ok = quicer:shutdown_registration(Reg), + ok = quicer:close_registration(Reg). tc_shutdown_ok(_Config) -> Name = atom_to_list(?FUNCTION_NAME), Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), - ok = quicer:shutdown_registration(Reg). + ok = quicer:shutdown_registration(Reg), + ok = quicer:close_registration(Reg). tc_shutdown_twice(_Config) -> Name = atom_to_list(?FUNCTION_NAME), Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), ok = quicer:shutdown_registration(Reg), - ok = quicer:shutdown_registration(Reg). + ok = quicer:shutdown_registration(Reg), + ok = quicer:close_registration(Reg). tc_shutdown_with_reason(_Config) -> Name = atom_to_list(?FUNCTION_NAME), Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), - ok = quicer:shutdown_registration(Reg, false, 123). + ok = quicer:shutdown_registration(Reg, false, 123), + ok = quicer:close_registration(Reg). tc_get_reg_name(_Config) -> Name = atom_to_list(?FUNCTION_NAME), @@ -195,19 +201,38 @@ tc_get_reg_name(_Config) -> {ok, Reg} = quicer:new_registration(Name, Profile), ?assertEqual({ok, Name}, quicer:get_registration_name(Reg)), ok = quicer:shutdown_registration(Reg), - ?assertEqual({ok, Name}, quicer:get_registration_name(Reg)). + ?assertEqual({ok, Name}, quicer:get_registration_name(Reg)), + ok = quicer:close_registration(Reg). tc_close_with_opened_conn(_Config) -> Name = atom_to_list(?FUNCTION_NAME), Profile = quic_execution_profile_low_latency, {ok, Reg} = quicer:new_registration(Name, Profile), - {ok, Conn} = quicer_nif:open_connection(#{quic_registration => Reg}), %% @NOTE This is a hack to make sure the connection is abled to be closed %% which is triggered by the registration close - _ = timer:apply_after( - 1000, - quicer, - connect, - ["localhost", 5060, [{alpn, ["sample"]}, {handle, Conn}], 1000] - ), + spawn(fun() -> + {ok, Conn} = quicer:open_connection(#{quic_registration => Reg}), + quicer:connect("localhost", 5060, [{alpn, ["sample"]}, {handle, Conn}], 1000) + end), + _ = quicer:shutdown_registration(Reg), ?assertEqual(ok, quicer:close_registration(Reg)). + +tc_close_global_reg(_Config) -> + ?assertEqual(ok, quicer:reg_close()), + %% close more + ?assertEqual(ok, quicer:reg_close()), + %% close one more + ?assertEqual(ok, quicer:reg_close()), + quicer:reg_open(). + +tc_open_global_reg(_Config) -> + ?assertEqual(ok, quicer:reg_close()), + ?assertEqual(ok, quicer:reg_open()). + +tc_shutdown_global_reg(_Config) -> + ?assertEqual(ok, quicer:shutdown_registration(global)), + ?assertEqual(ok, quicer:reg_close()). + +tc_get_links_link_closed(_Config) -> + ok = quicer:reg_close(), + ?assertEqual({error, quic_registration}, quicer:get_connections()). diff --git a/test/quicer_snb_SUITE.erl b/test/quicer_snb_SUITE.erl index 81e9812e..4eebf781 100644 --- a/test/quicer_snb_SUITE.erl +++ b/test/quicer_snb_SUITE.erl @@ -71,6 +71,7 @@ BUCKET, begin quicer_nif:set_snab_kc_pid(whereis(snabbkaffe_collector)), + ?assert(whereis(snabbkaffe_collector) == quicer_nif:get_snab_kc_pid()), RUN end, CHECK @@ -570,7 +571,7 @@ tc_stream_acceptor_down(Config) -> quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), receive {quic, closed, Conn, _} -> - ct:pal("Connecion is closed") + ct:pal("Connection is closed") end, quicer:shutdown_connection(Conn) end, @@ -2858,7 +2859,7 @@ tc_multi_streams_example_server_1(Config) -> quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), receive {quic, closed, Conn, _} -> - ct:pal("Connecion is closed") + ct:pal("Connection is closed") end, quicer:shutdown_connection(Conn) end, diff --git a/test/quicer_test_lib.erl b/test/quicer_test_lib.erl index 5d8a280b..2f63e2b0 100644 --- a/test/quicer_test_lib.erl +++ b/test/quicer_test_lib.erl @@ -421,8 +421,53 @@ cleanup_msquic() -> ok. reset_global_reg() -> - quicer:reg_close(), - quicer:reg_open(). + case quicer:get_listeners() of + [] -> + ok; + Other -> + ct:pal("Warn: Listeners not cleaned up: ~p", [Other]), + lists:foreach( + fun(L) -> quicer:close_listener(L) end, Other + ) + end, + case quicer:get_registration_refcnt(global) of + 1 -> + ok; + {error, closed} -> + ct:pal("global registration closed"), + quicer:reg_open(); + N -> + ct:pal("Warn: Global registration refcnt not 1: ~p", [N]) + end, + quicer:shutdown_registration(global), + retry_reg_close(), + retry_reg_open(). + +retry_reg_close() -> + case quicer:reg_close() of + ok -> + ok; + N when is_integer(N) -> + ct:pal( + "Failed to close global registration refcnt: ~p, Conns: ~p, Listeners: ~p~nretry....", + [ + N, + quicer:get_connections(), + quicer:get_listeners() + ] + ), + timer:sleep(50), + retry_reg_close() + end. + +retry_reg_open() -> + case quicer:reg_open() of + ok -> + ok; + {error, invalid_state} = E -> + %% Lib is closed. + E + end. shutdown_all_listeners() -> lists:foreach(