Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memleak dead acceptor #280

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,

if (!acc)
{
// If we don't have available process
// If we don't have available acceptor waiting,
// fallback to the connection owner
TP_CB_3(no_acceptor, (uintptr_t)c_ctx->Connection, 0);
is_orphan = TRUE;
Expand All @@ -1386,26 +1386,23 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
acc_pid = &(acc->Pid);

s_ctx->owner = acc;
s_ctx->is_closed = FALSE;

cache_stream_id(s_ctx);

ERL_NIF_TERM props_name[] = { ATOM_FLAGS, ATOM_IS_ORPHAN };
ERL_NIF_TERM props_value[]
= { enif_make_uint(env, Event->PEER_STREAM_STARTED.Flags),
ATOM_BOOLEAN(is_orphan) };

ERL_NIF_TERM report = make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_resource(env, s_ctx),
props_name,
props_value,
2);
ERL_NIF_TERM report = make_event_with_props(
env, ATOM_NEW_STREAM, s_ctx->eHandle, props_name, props_value, 2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
if (is_orphan)
{
// Connection acceptor is dead
// we don't need to destroy acceptor, we don't have the ownwership
destroy_s_ctx(s_ctx);
return QUIC_STATUS_UNREACHABLE;
}
else
Expand All @@ -1426,17 +1423,18 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,

report = make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_resource(env, s_ctx),
s_ctx->eHandle,
props_name,
props_value,
2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
// Sad...
destroy_s_ctx(s_ctx);
return QUIC_STATUS_UNREACHABLE;
}
}
}
s_ctx->is_closed = FALSE;
MsQuic->SetCallbackHandler(
Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx);
return QUIC_STATUS_SUCCESS;
Expand Down
4 changes: 2 additions & 2 deletions include/quicer_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@
-type conn_shutdown_flag() ::
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE
| ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT.
%% @TODO expand
-type acceptor_opts() :: map().

-type acceptor_opts() :: quic_settings() | #{active => boolean()}.

-type active_n() :: boolean() | once | integer().

Expand Down
10 changes: 9 additions & 1 deletion test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,12 @@ peer_needs_streams(C, bidi_streams, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.
{ok, State};
handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when
%% @FIXME, not desired behavior.
%% Casued by inflight quic Msg during stream handoff
Sig == peer_send_shutdown orelse Sig == stream_closed
->
{OwnerPid, Stream} = lists:keyfind(Stream, 2, Streams),
OwnerPid ! Msg,
{ok, S}.
11 changes: 10 additions & 1 deletion test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ new_stream(
case quicer:handoff_stream(Stream, StreamOwner) of
ok ->
{ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}};
{error, closed} ->
{error, closed} when Streams == [] ->
%% Shutdown the connection when fails to handle the first stream
{stop, {shutdown, closed}, CBState};
{error, closed} ->
%% For proper test, handle random failures from the streams
{ok, CBState};
false ->
{error, handoff_fail}
end;
Expand Down Expand Up @@ -149,5 +153,10 @@ dgram_recv(C, Bin, _Flag, S) ->
end,
{ok, S}.

handle_info({quic, stream_closed, _}, State) ->
%% Stream close before/in the middle of handoff
{ok, State};
handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State};
handle_info(_, State) ->
{ok, State}.
34 changes: 25 additions & 9 deletions test/example_server_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
-export([handle_stream_data/4]).

-export([handle_call/3]).
-export([handle_info/2]).

-include("quicer.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
Expand All @@ -51,8 +52,12 @@ init_handoff(Stream, _StreamOpts, Conn, #{flags := Flags}) ->
{ok, InitState}.

post_handoff(Stream, _PostData, State) ->
ok = quicer:setopt(Stream, active, true),
{ok, State}.
case quicer:setopt(Stream, active, true) of
ok ->
{ok, State};
{error, closed} ->
{stop, closed, State}
end.

new_stream(Stream, #{flags := Flags}, Conn) ->
InitState = #{
Expand Down Expand Up @@ -131,13 +136,21 @@ handle_stream_data(

case PeerStream of
undefined ->
{ok, StreamProc} = quicer_local_stream:start_link(
?MODULE,
Conn,
[{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}]
),
{ok, _} = quicer_stream:send(StreamProc, Bin),
{ok, State#{peer_stream := StreamProc}};
case
quicer_local_stream:start_link(
?MODULE,
Conn,
[{open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}]
)
of
{ok, StreamProc} ->
catch quicer_stream:send(StreamProc, Bin),
{ok, State#{peer_stream := StreamProc}};
{error, {_, _}} ->
{ok, State};
{error, closed} ->
{ok, State}
end;
StreamProc when is_pid(StreamProc) ->
{ok, _} = quicer_stream:send(StreamProc, Bin),
{ok, State}
Expand Down Expand Up @@ -170,3 +183,6 @@ stream_closed(

handle_call(_Request, _From, S) ->
{reply, {error, not_impl}, S}.

handle_info(Info, S) ->
{ok, S}.
2 changes: 2 additions & 0 deletions test/prop_quic_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,5 @@
| {start_flag, stream_start_flags()}
| {event_mask, uint32()}
| {disable_fpbuffer, boolean()}.

-type stream_accept_opts() :: [{active, boolean()}].
Loading
Loading