Skip to content

Commit

Permalink
Merge pull request #311 from qzhuyan/fix/william/please-debug-emu
Browse files Browse the repository at this point in the history
please debug emu
  • Loading branch information
qzhuyan authored Oct 21, 2024
2 parents 1e762a5 + c0815e2 commit 3b790e7
Show file tree
Hide file tree
Showing 22 changed files with 336 additions and 62 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/asan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: asan check
on:
workflow_dispatch:
inputs:
ref:
required: false

jobs:
asan:
runs-on: ubuntu-22.04
strategy:
fail-fast: true
steps:
- name: checkout
uses: actions/checkout@v3
with:
ref: ${{ github.event.inputs.ref }}
- name: asan build
run: |
otp_prebuilds=otp-26.2.5.3-ubuntu-22.04.tar.gz
wget https://github.com/qzhuyan/kerl/releases/download/testing/${otp_prebuilds}
tar zxvf ${otp_prebuilds} -C /
ln -s /home/runner/OTP/otp-26.2.5.3/ /home/runner/OTP/default
echo ". /home/runner/OTP/default/activate" >> ~/.bashrc
- name: run sanitizer-check
run: |
. /home/runner/OTP/default/activate
tools/run/bin/sanitizer-check all
- name: Archive asan logs
uses: actions/upload-artifact@v3
with:
name: asan_logs
path: asan_logs
retention-days: 7
27 changes: 22 additions & 5 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,14 @@ async_connect3(ErlNifEnv *env,

// Monitor owner before start, so we don't need to race with callbacks
// after start the connection
enif_monitor_process(NULL, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon);
//
if (!c_ctx->is_monitored
&& 0
== enif_monitor_process(
NULL, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon))
{
c_ctx->is_monitored = TRUE;
}

// c_ctx->lock should be taken to prevent parallel access from callback as
// work trigged by starting of the connection.
Expand All @@ -863,6 +870,7 @@ async_connect3(ErlNifEnv *env,
if (c_ctx->config_resource)
{
destroy_config_ctx(c_ctx->config_resource);
c_ctx->config_resource = NULL;
}

res = ERROR_TUPLE_2(ATOM_CONN_START_ERROR);
Expand Down Expand Up @@ -1168,7 +1176,11 @@ handle_connection_event_connected(QuicerConnCTX *c_ctx,
// A monitor is automatically removed when it triggers or when the
// resource is deallocated.
enif_mutex_lock(c_ctx->lock);
enif_monitor_process(NULL, c_ctx, acc_pid, &c_ctx->owner_mon);
if ((!c_ctx->is_monitored)
&& 0 == enif_monitor_process(NULL, c_ctx, acc_pid, &c_ctx->owner_mon))
{
c_ctx->is_monitored = TRUE;
}
enif_mutex_unlock(c_ctx->lock);

ERL_NIF_TERM ConnHandle = enif_make_resource(c_ctx->env, c_ctx);
Expand Down Expand Up @@ -1392,8 +1404,13 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
= { enif_make_uint(env, Event->PEER_STREAM_STARTED.Flags),
ATOM_BOOLEAN(is_orphan) };

ERL_NIF_TERM report = make_event_with_props(
env, ATOM_NEW_STREAM, s_ctx->eHandle, props_name, props_value, 2);
ERL_NIF_TERM report
= make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_copy(env, s_ctx->eHandle),
props_name,
props_value,
2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
if (is_orphan)
Expand Down Expand Up @@ -1421,7 +1438,7 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,

report = make_event_with_props(env,
ATOM_NEW_STREAM,
s_ctx->eHandle,
enif_make_copy(env, s_ctx->eHandle),
props_name,
props_value,
2);
Expand Down
13 changes: 11 additions & 2 deletions c_src/quicer_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ destroy_l_ctx(QuicerListenerCTX *l_ctx)
l_ctx->r_ctx = NULL;
}
l_ctx->config_resource = NULL;
enif_demonitor_process(l_ctx->env, l_ctx, &l_ctx->owner_mon);
if (l_ctx->is_monitored)
{
enif_demonitor_process(l_ctx->env, l_ctx, &l_ctx->owner_mon);
l_ctx->is_monitored = FALSE;
}
enif_release_resource(l_ctx);
}

Expand Down Expand Up @@ -212,7 +216,12 @@ destroy_c_ctx(QuicerConnCTX *c_ctx)
CxPlatListEntryRemove(&c_ctx->RegistrationLink);
enif_mutex_unlock(r_ctx->lock);

enif_demonitor_process(c_ctx->env, c_ctx, &c_ctx->owner_mon);
if (c_ctx->is_monitored)
{
enif_demonitor_process(c_ctx->env, c_ctx, &c_ctx->owner_mon);
c_ctx->is_monitored = FALSE;
}

enif_release_resource(c_ctx);
}

Expand Down
3 changes: 3 additions & 0 deletions c_src/quicer_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef struct QuicerListenerCTX
CXPLAT_REF_COUNT ref_count;
QUICER_ACCEPTOR_QUEUE *acceptor_queue;
ErlNifPid listenerPid;
BOOLEAN is_monitored;
ErlNifMonitor owner_mon;
ErlNifEnv *env;
ErlNifMutex *lock;
Expand Down Expand Up @@ -92,6 +93,7 @@ typedef struct QuicerConnCTX
HQUIC Connection;
QUICER_ACCEPTOR_QUEUE *acceptor_queue;
ACCEPTOR *owner;
BOOLEAN is_monitored;
ErlNifMonitor owner_mon;
ErlNifEnv *env;
ErlNifMutex *lock;
Expand Down Expand Up @@ -119,6 +121,7 @@ typedef struct QuicerStreamCTX
HQUIC Stream;
uint64_t StreamID;
ACCEPTOR *owner;
BOOLEAN is_monitored;
ErlNifMonitor owner_mon;
ErlNifEnv *env;
// Immutable env,
Expand Down
1 change: 1 addition & 0 deletions c_src/quicer_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
goto exit;
}

l_ctx->is_monitored = TRUE;
// Now open listener
if (QUIC_FAILED(Status = MsQuic->ListenerOpen(
// Listener registration
Expand Down
29 changes: 23 additions & 6 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,9 @@ on_load(ErlNifEnv *env,
int ret_val = 0;
unsigned load_vsn = 0;

init_atoms(env);

// TP must run after init_atoms as atoms are used in TP
TP_NIF_3(start, &MsQuic, 0);
if (!enif_get_uint(env, loadinfo, &load_vsn))
{
Expand All @@ -1154,7 +1157,6 @@ on_load(ErlNifEnv *env,
return 1; // any value except 0 is error
}

init_atoms(env);
open_resources(env);

TP_NIF_3(end, &MsQuic, 0);
Expand Down Expand Up @@ -1575,18 +1577,24 @@ connection_controlling_process(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_demonitor_process(env, c_ctx, &c_ctx->owner_mon);
if (c_ctx->is_monitored)
{
enif_demonitor_process(env, c_ctx, &c_ctx->owner_mon);
c_ctx->is_monitored = FALSE;
}

if (0
!= enif_monitor_process(
env, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon))
{
// rollback, must success
enif_self(env, &c_ctx->owner->Pid);
enif_monitor_process(env, c_ctx, caller, &c_ctx->owner_mon);
CXPLAT_FRE_ASSERT(
0 == enif_monitor_process(env, c_ctx, caller, &c_ctx->owner_mon));
c_ctx->is_monitored = TRUE;
return ERROR_TUPLE_2(ATOM_OWNER_DEAD);
}

c_ctx->is_monitored = TRUE;
TP_NIF_3(exit, (uintptr_t)c_ctx->Connection, (uintptr_t)&c_ctx);
return ATOM_OK;
}
Expand All @@ -1610,7 +1618,11 @@ stream_controlling_process(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_demonitor_process(env, s_ctx, &s_ctx->owner_mon);
if (s_ctx->is_monitored)
{
enif_demonitor_process(env, s_ctx, &s_ctx->owner_mon);
s_ctx->is_monitored = FALSE;
}

if (0
!= enif_monitor_process(
Expand All @@ -1619,9 +1631,14 @@ stream_controlling_process(ErlNifEnv *env,
// rollback, must success
enif_self(env, &s_ctx->owner->Pid);
flush_sig_buffer(env, s_ctx);
enif_monitor_process(env, s_ctx, caller, &s_ctx->owner_mon);

CXPLAT_FRE_ASSERT(
0 == enif_monitor_process(env, s_ctx, caller, &s_ctx->owner_mon));
s_ctx->is_monitored = TRUE;

return ERROR_TUPLE_2(ATOM_OWNER_DEAD);
}
s_ctx->is_monitored = TRUE;
flush_sig_buffer(env, s_ctx);
TP_NIF_3(exit, (uintptr_t)s_ctx->Stream, (uintptr_t)&s_ctx->owner->Pid);
return ATOM_OK;
Expand Down
4 changes: 4 additions & 0 deletions c_src/quicer_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ async_start_stream2(ErlNifEnv *env,
res = ERROR_TUPLE_3(ATOM_STREAM_START_ERROR, ATOM_STATUS(Status));
goto ErrorExit;
}

int mon_res = enif_monitor_process(
env, s_ctx, &s_ctx->owner->Pid, &s_ctx->owner_mon);
CXPLAT_FRE_ASSERT(mon_res == 0);
// NOTE: Set is_closed to FALSE (s_ctx->is_closed = FALSE;)
// must be done in the worker callback (for
// QUICER_STREAM_EVENT_MASK_START_COMPLETE) to avoid race cond.
Expand Down
17 changes: 12 additions & 5 deletions c_src/quicer_tp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

extern uint64_t CxPlatTimeUs64(void);

// help macro to copy atom to env for debug emulator assertions
#define ATOM_IN_ENV(X) enif_make_copy(env, ATOM_##X)

void
tp_snk(ErlNifEnv *env,
const char *ctx,
Expand All @@ -17,11 +20,13 @@ tp_snk(ErlNifEnv *env,
{
ERL_NIF_TERM snk_event;
ERL_NIF_TERM snk_event_key_array[7]
= { ATOM_SNK_KIND, ATOM_CONTEXT, ATOM_FUNCTION, ATOM_TAG,
ATOM_RESOURCE_ID, ATOM_MARK, ATOM_SNK_META };
= { ATOM_IN_ENV(SNK_KIND), ATOM_IN_ENV(CONTEXT),
ATOM_IN_ENV(FUNCTION), ATOM_IN_ENV(TAG),
ATOM_IN_ENV(RESOURCE_ID), ATOM_IN_ENV(MARK),
ATOM_IN_ENV(SNK_META) };

ERL_NIF_TERM snk_evt_meta;
ERL_NIF_TERM snk_evt_meta_key_array[1] = { ATOM_TIME };
ERL_NIF_TERM snk_evt_meta_key_array[1] = { ATOM_IN_ENV(TIME) };
ERL_NIF_TERM snk_evt_meta_val_array[1]
= { enif_make_uint64(env, CxPlatTimeUs64()) };

Expand All @@ -33,7 +38,7 @@ tp_snk(ErlNifEnv *env,
&snk_evt_meta);

ERL_NIF_TERM snk_event_val_array[7] = {
ATOM_DEBUG, // snk_kind
ATOM_IN_ENV(DEBUG), // snk_kind
enif_make_string(env, ctx, ERL_NIF_LATIN1), // context
enif_make_string(env, fun, ERL_NIF_LATIN1), // fun
enif_make_string(env, tag, ERL_NIF_LATIN1), // tag
Expand All @@ -46,7 +51,9 @@ tp_snk(ErlNifEnv *env,
env, snk_event_key_array, snk_event_val_array, 7, &snk_event);

ERL_NIF_TERM report = enif_make_tuple2(
env, ATOM_GEN_CAST, enif_make_tuple2(env, ATOM_TRACE, snk_event));
env,
ATOM_IN_ENV(GEN_CAST),
enif_make_tuple2(env, ATOM_IN_ENV(TRACE), snk_event));
enif_send(NULL, &pid, NULL, report);
}
}
31 changes: 17 additions & 14 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ new_stream(
} = CBState
) ->
%% Spawn new stream
case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
case quicer_remote_stream:start_link(example_client_stream, Stream, Conn, SOpts, Flags) of
{ok, StreamOwner} ->
case quicer:handoff_stream(Stream, StreamOwner) of
ok ->
Expand All @@ -116,7 +116,8 @@ new_stream(
{ok, CBState#{streams := [{E, Stream} | Streams]}}
end;
Other ->
Other
ct:pal("Start accepting remote stream error ~p", [Other]),
{ok, CBState#{streams := [{start_error, Stream} | Streams]}}
end.

dgram_state_changed(_Conn, _Flags, S) ->
Expand Down Expand Up @@ -152,17 +153,19 @@ peer_needs_streams(C, bidi_streams, S) ->
handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State};
handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when
%% @FIXME, not desired behavior.
%% Casued by inflight quic Msg during stream handoff
Sig == peer_send_shutdown orelse Sig == stream_closed
->
{OwnerPid, Stream} = lists:keyfind(Stream, 2, Streams),
NewS =
case OwnerPid == owner_down orelse OwnerPid == closed of
true ->
quicer:async_shutdown_stream(Stream),
S#{streams := lists:keydelete(Stream, 2, Streams)};
false ->
error(fixme)
end,
{ok, S}.
case lists:keyfind(Stream, 2, Streams) of
{Reason, Stream} when
Reason =:= owner_down orelse
Reason =:= closed orelse
Reason =:= start_error
->
_ = quicer:async_shutdown_stream(Stream),
{ok, S#{streams := lists:keydelete(Stream, 2, Streams)}};
{OwnerPid, Stream} when is_pid(OwnerPid) ->
{error, {fixme, bug_handoff_fail}};
false ->
%% garbage signals from already dead stream (such like crashed owner)
{ok, S}
end.
13 changes: 10 additions & 3 deletions test/example_client_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,16 @@
-include("quicer.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

init_handoff(_Stream, _StreamOpts, _Conn, _Flags) ->
%% stream owner already set while starts.
{stop, not_impl, #{}}.
init_handoff(Stream, _StreamOpts, Conn, #{flags := Flags}) ->
InitState = #{
stream => Stream,
conn => Conn,
peer_stream => undefined,
is_local => false,
is_unidir => quicer:is_unidirectional(Flags)
},
ct:pal("init_handoff ~p", [{InitState, _StreamOpts}]),
{ok, InitState}.

post_handoff(Stream, _PostData, State) ->
ok = quicer:setopt(Stream, active, true),
Expand Down
7 changes: 4 additions & 3 deletions test/example_server_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ init_handoff(Stream, _StreamOpts, Conn, #{flags := Flags}) ->
is_local => false,
is_unidir => quicer:is_unidirectional(Flags)
},
% ct:pal("init_handoff ~p", [{InitState, _StreamOpts}]),
ct:pal("init_handoff ~p", [{InitState, _StreamOpts}]),
{ok, InitState}.

post_handoff(Stream, _PostData, State) ->
Expand Down Expand Up @@ -137,10 +137,11 @@ handle_stream_data(
case PeerStream of
undefined ->
case
quicer_local_stream:start_link(
quicer_local_stream:start(
?MODULE,
Conn,
[{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}]
[{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}],
[]
)
of
{ok, StreamProc} ->
Expand Down
2 changes: 2 additions & 0 deletions test/prop_stateful_server_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ next_state(State, Res, Call) ->

do_next_state(#{state := accepted} = State, {error, _}, {call, quicer, handshake, _Args}) ->
State;
do_next_state(#{state := _} = State, {error, closed}, {call, quicer, _, _Args}) ->
State#{state := closed};
do_next_state(#{state := accepted} = State, _Res, {call, quicer, handshake, _Args}) ->
State#{state := connected};
do_next_state(#{state := accepted} = State, _Res, {call, quicer, close_connection, _Args}) ->
Expand Down
Loading

0 comments on commit 3b790e7

Please sign in to comment.