Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 11, 2024
1 parent d1f5efe commit c31a9d8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 60 deletions.
2 changes: 0 additions & 2 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_

addStraem(con, stream);

// nghttp2_session_set_stream_user_data(con->session, stream->stream_id, stream);

return stream;
}
static void deleteHttp2Stream(http2_client_child_con_state_t *stream)
Expand Down
63 changes: 46 additions & 17 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ enum

static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *userdata)
{
(void) session;
(void) stream_id;
(void) error_code;

http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id);
if (WW_UNLIKELY(! stream))
LOGD("callback end stream for: %d", stream_id);
if (! stream)
{
return 0;
}
Expand Down Expand Up @@ -185,6 +184,21 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr

if ((frame->hd.type & NGHTTP2_HEADERS) == NGHTTP2_HEADERS)
{
if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == NGHTTP2_FLAG_END_STREAM)
{
LOGD("end stream for: %d", frame->hd.stream_id);

http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (WW_UNLIKELY(! stream))
{
return 0;
}
lockLine(stream->line);
action_queue_t_push(
&con->actions,
(http2_action_t) {.action_id = kActionStreamFinish, .stream_line = stream->line, .buf = NULL});
}

if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
con->handshake_completed = true;
Expand All @@ -207,6 +221,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr

static void sendFlowControlData(http2_client_con_state_t *con, int32_t stream_id, int consumed)
{
(void) con;

uint8_t buf[8] = {0};

Expand All @@ -216,13 +231,12 @@ static void sendFlowControlData(http2_client_con_state_t *con, int32_t stream_id
memcpy(buf, &stream_id, sizeof(stream_id));
memcpy(buf + 4, &consumed, sizeof(consumed));

nghttp2_submit_ping(con->session, NGHTTP2_FLAG_ACK, buf);
// nghttp2_submit_ping(con->session, NGHTTP2_FLAG_ACK, buf);
}

static void sendStreamData(http2_client_con_state_t *con, int32_t stream_id, shift_buffer_t *buf)
static void sendStreamData(http2_client_con_state_t *con, http2_client_child_con_state_t *stream, shift_buffer_t *buf)
{
http2_flag flags = kHttP2FlagNone;
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, stream_id);
http2_flag flags = kHttP2FlagNone;
if (WW_UNLIKELY(! stream))
{
reuseBuffer(getLineBufferPool(con->line), buf);
Expand All @@ -232,7 +246,7 @@ static void sendStreamData(http2_client_con_state_t *con, int32_t stream_id, shi

if (stream->bytes_sent_nack > kMaxSendBeforeAck)
{
pauseLineDownSide(stream->line);
// pauseLineDownSide(stream->line);
}

if (con->content_type == kApplicationGrpc)
Expand All @@ -249,7 +263,7 @@ static void sendStreamData(http2_client_con_state_t *con, int32_t stream_id, shi
framehd.length = bufLen(buf);
framehd.type = kHttP2Data;
framehd.flags = flags;
framehd.stream_id = stream_id;
framehd.stream_id = stream->stream_id;
shiftl(buf, HTTP2_FRAME_HDLEN);
http2FrameHdPack(&framehd, rawBufMut(buf));
context_t *data = newContext(con->line);
Expand Down Expand Up @@ -282,13 +296,21 @@ static void doHttp2Action(const http2_action_t action, http2_client_con_state_t
{
line_t *main_line = con->line;
tunnel_t *self = con->tunnel;

if (! isAlive(action.stream_line))
{
if (action.buf)
{
reuseBuffer(getLineBufferPool(action.stream_line), action.buf);
}
unLockLine(action.stream_line);
return;
}

http2_client_child_con_state_t *stream = LSTATE(action.stream_line);

assert(stream); // when the line is alive, there is no way that we lose the state

switch (action.action_id)
{
default:
Expand Down Expand Up @@ -394,16 +416,19 @@ static void upStream(tunnel_t *self, context_t *c)
return;
}

lockLine(con->line);
while (sendNgHttp2Data(self, con))
{
if (! isAlive(c->line))
if (! isAlive(con->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}
}
unLockLine(con->line);

sendStreamData(con, stream->stream_id, c->payload);
sendStreamData(con, stream, c->payload);

dropContexPayload(c);
destroyContext(c);
Expand All @@ -416,26 +441,31 @@ static void upStream(tunnel_t *self, context_t *c)
http2_client_child_con_state_t *stream = createHttp2Stream(con, c->line);
CSTATE_MUT(c) = stream;
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, stream);
lockLine(con->line);

if (! con->init_sent)
{
con->init_sent = true;
self->up->upStream(self->up, newInitContext(con->line));
if (! isAlive(c->line))
if (! isAlive(con->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}
}

while (sendNgHttp2Data(self, con))
{
if (! isAlive(c->line))
if (! isAlive(con->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}
}
unLockLine(con->line);

destroyContext(c);
}
else if (c->fin)
Expand All @@ -455,25 +485,24 @@ static void upStream(tunnel_t *self, context_t *c)
{
nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL);
}

LOGD("destroy %d", (int) stream->stream_id);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
removeStream(con, stream);
deleteHttp2Stream(stream);

lockLine(con->line);
while (sendNgHttp2Data(self, con))
{
if (! isAlive(c->line))
if (! isAlive(con->line))
{
unLockLine(con->line);
destroyContext(c);
return;
}
}

unLockLine(con->line);

if (con->root.next == NULL && con->childs_added >= state->concurrency && isAlive(con->line))
if (con->root.next == NULL && con->childs_added >= state->concurrency)
{
context_t *con_fc = newFinContext(con->line);
tunnel_t *con_dest = con->tunnel->up;
Expand Down
Loading

0 comments on commit c31a9d8

Please sign in to comment.