Skip to content

Commit

Permalink
fix painful http2 handling bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Mar 31, 2024
1 parent 726c499 commit c702395
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 141 deletions.
1 change: 1 addition & 0 deletions tunnels/client/http2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ CPMAddPackage(
OPTIONS
"BUILD_STATIC_LIBS ON"
"BUILD_TESTING OFF"
# "ENABLE_DEBUG ON"

)

Expand Down
13 changes: 7 additions & 6 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
#include "types.h"

#define MAX_CONCURRENT_STREAMS 0xffffffffu
#define MAX_CHILD_PER_STREAM 200

#define MAX_CHILD_PER_STREAM 3

#define STATE(x) ((http2_client_state_t *)((x)->state))
#define CSTATE(x) ((void *)((((x)->line->chains_state)[self->chain_index])))
#define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index]
#define ISALIVE(x) (CSTATE(x) != NULL)


static nghttp2_nv make_nv(const char *name, const char *value)
{
nghttp2_nv nv;
Expand Down Expand Up @@ -120,6 +118,8 @@ create_http2_stream(http2_client_con_state_t *con, line_t *child_line, hio_t *io
}
static void delete_http2_stream(http2_client_child_con_state_t *stream)
{
if(stream->temp_buf != NULL)
reuseBuffer(buffer_pools[stream->line->tid],stream->temp_buf);
stream->line->chains_state[stream->tunnel->chain_index + 1] = NULL;
free(stream);
}
Expand Down Expand Up @@ -168,8 +168,10 @@ static void delete_http2_connection(http2_client_con_state_t *con)
{
http2_client_child_con_state_t *next = stream_i->next;
context_t *fin_ctx = newFinContext(stream_i->line);
tunnel_t *dest = stream_i->tunnel;
delete_http2_stream(stream_i);
stream_i->tunnel->downStream(stream_i->tunnel, fin_ctx);
CSTATE_MUT(fin_ctx) = NULL;
dest->downStream(dest, fin_ctx);
stream_i = next;
}
nghttp2_session_del(con->session);
Expand All @@ -192,9 +194,8 @@ static http2_client_con_state_t *take_http2_connection(tunnel_t *self, int tid,
{
if ((*k.ref)->childs_added < MAX_CHILD_PER_STREAM)
{
(*k.ref)->childs_added +=1;
(*k.ref)->childs_added += 1;
return (*k.ref);

}
}
vec_cons_pop(vector);
Expand Down
200 changes: 130 additions & 70 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
#include "types.h"
#include "helpers.h"

#define MAX_CHUNK_SIZE 8100

static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id)
{
http2_frame_hd framehd;
shift_buffer_t *buf = popBuffer(buffer_pools[line->tid]);
setLen(buf, HTTP2_FRAME_HDLEN);

framehd.length = 0;
framehd.type = HTTP2_DATA;
framehd.flags = HTTP2_FLAG_END_STREAM;
Expand All @@ -25,7 +29,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
size_t len;
len = nghttp2_session_mem_send(con->session, (const uint8_t **)&data);
// LOGD("nghttp2_session_mem_send %d\n", len);
if (len != 0)
if (len > 0)
{
shift_buffer_t *send_buf = popBuffer(buffer_pools[line->tid]);
shiftl(send_buf, lCap(send_buf) / 1.25); // use some unused space
Expand All @@ -41,22 +45,23 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
}
self->up->upStream(self->up, req);

if (nghttp2_session_want_read(con->session) == 0 &&
nghttp2_session_want_write(con->session) == 0)
{
if (buf != NULL)
{
reuseBuffer(buffer_pools[line->tid], buf);
buf = NULL;
}
context_t *fin_ctx = newFinContext(line);
delete_http2_connection(con);
con->tunnel->up->upStream(con->tunnel->up, fin_ctx);
return false;
}
// if (nghttp2_session_want_read(con->session) == 0 &&
// nghttp2_session_want_write(con->session) == 0)
// {
// if (buf != NULL)
// {
// reuseBuffer(buffer_pools[line->tid], buf);
// buf = NULL;
// }
// context_t *fin_ctx = newFinContext(line);
// delete_http2_connection(con);
// con->tunnel->up->upStream(con->tunnel->up, fin_ctx);
// return false;
// }

return true;
}
assert(len >= 0);

if (buf == NULL || bufLen(buf) <= 0)
return false;
Expand All @@ -65,7 +70,8 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
if (con->state == H2_SEND_HEADERS)
{

http2_flag flags = HTTP2_FLAG_END_STREAM;
// http2_flag flags = HTTP2_FLAG_END_STREAM;
http2_flag flags = HTTP2_FLAG_NONE;

// HTTP2 DATA framehd
con->state = H2_SEND_DATA;
Expand Down Expand Up @@ -112,7 +118,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
static void flush_write_queue(http2_client_con_state_t *con)
{
tunnel_t *self = con->tunnel;
context_t *g = newContext(con->line);
context_t *g = newContext(con->line); // keep the line alive
while (contextQueueLen(con->queue) > 0)
{
context_t *stream_context = contextQueuePop(con->queue);
Expand All @@ -122,10 +128,13 @@ static void flush_write_queue(http2_client_con_state_t *con)

// consumes payload
while (trySendRequest(self, con, stream->stream_id, stream->io, stream_context->payload))
;

if (!ISALIVE(g))
break;
{
if (!ISALIVE(g))
{
destroyContext(g);
return;
}
}
}
destroyContext(g);
}
Expand Down Expand Up @@ -209,38 +218,67 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
return 0;
// LOGD("on_data_chunk_recv_callback\n");
// LOGD("stream_id=%d length=%d\n", stream_id, (int)len);
// LOGD("%.*s\n", (int)len, data);
// LOGD("down: %d\n", (int)len);

if (con->content_type == APPLICATION_GRPC)
{
// grpc_message_hd
if (len >= GRPC_MESSAGE_HDLEN)

if (stream->temp_buf == NULL)
{
// grpc_message_hd
if (len < GRPC_MESSAGE_HDLEN)
return 0;

grpc_message_hd msghd;
grpc_message_hd_unpack(&msghd, data);
LOGD("grpc_message_hd: flags=%d length=%d\n", msghd.flags, msghd.length);
// LOGD("grpc_message_hd: flags=%d length=%d\n", msghd.flags, msghd.length);
data += GRPC_MESSAGE_HDLEN;
len -= GRPC_MESSAGE_HDLEN;
// LOGD("%.*s\n", (int)len, data);

shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]);
shiftl(buf, lCap(buf) / 1.25); // use some unused space
setLen(buf, msghd.length);
memcpy(rawBuf(buf), data, len);

if (msghd.length > len)
{
stream->temp_buf = buf;
stream->bytes_needed = msghd.length - len;
return 0;
}
context_t *stream_data = newContext(stream->line);
stream_data->payload = buf;
stream_data->src_io = con->io;
stream->tunnel->downStream(stream->tunnel, stream_data);
}
else
{
memcpy(rawBuf(stream->temp_buf) + (bufLen(stream->temp_buf) - stream->bytes_needed), data, len);
stream->bytes_needed -= len;
if (stream->bytes_needed == 0)
{
context_t *stream_data = newContext(stream->line);
stream_data->payload = stream->temp_buf;
stream_data->src_io = con->io;
stream->temp_buf = NULL;

stream->tunnel->downStream(stream->tunnel, stream_data);
}
}
}
else
{
shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]);
shiftl(buf, lCap(buf) / 1.25); // use some unused space
setLen(buf, len);
memcpy(rawBuf(buf), data, len);
context_t *stream_data = newContext(stream->line);
stream_data->payload = buf;
stream_data->src_io = con->io;
stream->tunnel->downStream(stream->tunnel, stream_data);
}

shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]);
shiftl(buf, lCap(buf) / 1.25); // use some unused space
setLen(buf, len);
memcpy(rawBuf(buf), data, len);
context_t *dw_data = newContext(stream->line);
dw_data->payload = buf;
dw_data->src_io = con->io;
stream->tunnel->downStream(stream->tunnel, dw_data);
// if (hp->parsed->http_cb)
// {
// hp->parsed->http_cb(hp->parsed, HP_BODY, (const char *)data, len);
// }
// else
// {
// hp->parsed->body.append((const char *)data, len);
// }
return 0;
}

Expand Down Expand Up @@ -276,6 +314,19 @@ static int on_frame_recv_callback(nghttp2_session *session,
default:
break;
}
if ((frame->hd.flags & HTTP2_FLAG_END_STREAM) == HTTP2_FLAG_END_STREAM)
{
http2_client_child_con_state_t *stream =
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!stream)
return 0;
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
context_t *fc = newFinContext(stream->line);
tunnel_t *dest = stream->tunnel;
remove_stream(con, stream);
delete_http2_stream(stream);
dest->downStream(dest, fc);
}

if ((frame->hd.type & NGHTTP2_HEADERS) == NGHTTP2_HEADERS)
{
Expand All @@ -289,10 +340,14 @@ static int on_frame_recv_callback(nghttp2_session *session,
else if ((frame->hd.flags & HTTP2_FLAG_END_STREAM) == HTTP2_FLAG_END_STREAM)
{
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, frame->hd.stream_id);
if (stream == NULL)
return 0;
context_t *fin_ctx = newFinContext(stream->line);
tunnel_t *dest = stream->tunnel;
remove_stream(con, stream);
delete_http2_stream(stream);
stream->tunnel->downStream(stream->tunnel, fin_ctx);
CSTATE_MUT(fin_ctx) = NULL;
dest->downStream(dest, fin_ctx);
}
}

Expand Down Expand Up @@ -326,42 +381,34 @@ static inline void upStream(tunnel_t *self, context_t *c)
if (c->init)
{
http2_client_con_state_t *con = take_http2_connection(self, c->line->tid, NULL);
context_t *g = newContext(con->line);
http2_client_child_con_state_t *stream = create_http2_stream(con, c->line, c->src_io);
CSTATE_MUT(c) = stream;

if (!con->init_sent)
{
con->init_sent = true;
self->up->upStream(self->up, newInitContext(con->line));
if (!ISALIVE(g))
if (!ISALIVE(c))
{
destroyContext(g);
destroyContext(c);
return;
}
}

while (trySendRequest(self, con, 0, NULL, NULL))
;

if (!ISALIVE(g))
{
destroyContext(g);
destroyContext(c);
return;
}
http2_client_child_con_state_t *stream = create_http2_stream(con, c->line, c->src_io);
CSTATE_MUT(c) = stream;
destroyContext(g);
if (!ISALIVE(c))
{
destroyContext(c);
return;
}

if (!ISALIVE(c))
if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c))
{
delete_http2_stream(stream);
destroyContext(c);
return;
context_t *fin_ctx = newFinContext(con->line);
delete_http2_connection(con);
con->tunnel->up->upStream(con->tunnel->up, fin_ctx);
}

while (trySendRequest(self, con, stream->stream_id, NULL, NULL))
;
destroyContext(c);
}
else if (c->fin)
Expand All @@ -376,9 +423,8 @@ static inline void upStream(tunnel_t *self, context_t *c)
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
remove_stream(con, stream);
delete_http2_stream(stream);

if (nghttp2_session_want_read(con->session) == 0 &&
nghttp2_session_want_write(con->session) == 0)
CSTATE_MUT(c) = NULL;
if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c))
{
context_t *fin_ctx = newFinContext(con->line);
delete_http2_connection(con);
Expand All @@ -403,20 +449,34 @@ static inline void downStream(tunnel_t *self, context_t *c)
con->state = H2_WANT_RECV;
size_t len = bufLen(c->payload);
size_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *)rawBuf(c->payload), len);
assert(ret == len);
DISCARD_CONTEXT(c);

while (trySendRequest(self, con, 0, NULL, NULL))
if (!ISALIVE(c))
{
destroyContext(c);
return;
}

// if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c))
// {
// context_t *fin_ctx = newFinContext(con->line);
// delete_http2_connection(con);
// con->tunnel->up->upStream(con->tunnel->up, fin_ctx);
// }

if (!ISALIVE(c))
{
destroyContext(c);
return;
}

if (ret != len)
{
context_t *fin_ctx = newFinContext(con->line);
delete_http2_connection(con);
con->tunnel->upStream(con->tunnel, fin_ctx);
}
// {
// context_t *fin_ctx = newFinContext(con->line);
// delete_http2_connection(con);
// con->tunnel->upStream(con->tunnel, fin_ctx);
// }
destroyContext(c);
}
else
Expand Down
Loading

0 comments on commit c702395

Please sign in to comment.