From fbd4dc68722315e2562f3188b14ba5f9f085d359 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:10:47 +0000 Subject: [PATCH] rework http2/mux --- tunnels/client/http2/helpers.h | 32 ++++++++++++++++------- tunnels/client/http2/http2_client.c | 22 +++++++++++----- tunnels/client/http2/types.h | 3 ++- tunnels/client/protobuf/protobuf_client.c | 4 +-- tunnels/server/http2/http2_server.c | 3 +++ tunnels/server/protobuf/protobuf_server.c | 4 +-- 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index 019381b1..c79c76f3 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -43,16 +43,26 @@ static void onStreamLineResumed(void *arg) static void onH2LinePaused(void *arg) { - http2_client_con_state_t *con = (http2_client_con_state_t *) arg; + http2_client_con_state_t *con = (http2_client_con_state_t *) arg; + tunnel_t *self = con->tunnel; + + line_t *stream_line = con->current_stream_write_line; + if (stream_line && isAlive(stream_line)) + { + http2_client_child_con_state_t *stream = LSTATE(stream_line); + stream->paused = true; + pauseLineDownSide(stream->line); + } + // ++(con->pause_counter); // if (con->pause_counter > 8) // { - http2_client_child_con_state_t *stream_i; - for (stream_i = con->root.next; stream_i;) - { - pauseLineDownSide(stream_i->line); - stream_i = stream_i->next; - } + // http2_client_child_con_state_t *stream_i; + // for (stream_i = con->root.next; stream_i;) + // { + // pauseLineDownSide(stream_i->line); + // stream_i = stream_i->next; + // } // } } @@ -63,7 +73,11 @@ static void onH2LineResumed(void *arg) http2_client_child_con_state_t *stream_i; for (stream_i = con->root.next; stream_i;) { - resumeLineDownSide(stream_i->line); + if (stream_i->paused) + { + stream_i->paused = false; + resumeLineDownSide(stream_i->line); + } stream_i = stream_i->next; } } @@ -214,7 +228,7 @@ static void deleteHttp2Connection(http2_client_con_state_t *con) } unLockLine(k.ref->stream_line); } - + action_queue_t_drop(&con->actions); doneLineDownSide(con->line); LSTATE_DROP(con->line); diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 26c5e276..67fa430f 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -19,6 +19,10 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u http2_client_con_state_t *con = (http2_client_con_state_t *) userdata; http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); // LOGD("callback end stream for: %d", stream_id); + + // todo (optimize) nghttp2 is calling this callback even if we close the con ourselves + // this should be omitted + if (! stream) { return 0; @@ -110,8 +114,9 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 writeRaw(buf, data, len); lockLine(stream->line); - action_queue_t_push(&con->actions, - (http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf}); + action_queue_t_push( + &con->actions, + (http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf}); return 0; } @@ -185,7 +190,6 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr return 0; } - static void sendStreamData(http2_client_con_state_t *con, http2_client_child_con_state_t *stream, shift_buffer_t *buf) { http2_flag flags = kHttP2FlagNone; @@ -212,9 +216,13 @@ static void sendStreamData(http2_client_con_state_t *con, http2_client_child_con framehd.stream_id = stream->stream_id; shiftl(buf, HTTP2_FRAME_HDLEN); http2FrameHdPack(&framehd, rawBufMut(buf)); - context_t *data = newContext(con->line); - data->payload = buf; + context_t *data = newContext(con->line); + data->payload = buf; + con->current_stream_write_line = stream->line; + lockLine(stream->line); con->tunnel->up->upStream(con->tunnel->up, data); + unLockLine(stream->line); + con->current_stream_write_line = NULL; } static bool sendNgHttp2Data(tunnel_t *self, http2_client_con_state_t *con) @@ -310,7 +318,7 @@ static void doHttp2Action(const http2_action_t action, http2_client_con_state_t { shift_buffer_t *buf = action.buf; context_t *stream_data = newContext(stream->line); - stream_data->payload = buf; + stream_data->payload = buf; stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data); } if (! isAlive(action.stream_line) || ! isAlive(main_line)) @@ -331,7 +339,7 @@ static void doHttp2Action(const http2_action_t action, http2_client_con_state_t } break; case kActionConData: { - sendStreamData(con,stream,action.buf); + sendStreamData(con, stream, action.buf); } break; case kActionInvalid: diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index aad52b0a..848ef727 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -7,7 +7,6 @@ #include "loggers/network_logger.h" #include "nghttp2/nghttp2.h" - enum http2_actions { kActionInvalid, @@ -38,6 +37,7 @@ typedef struct http2_client_child_con_state_s line_t *line; size_t grpc_bytes_needed; int32_t stream_id; + bool paused; } http2_client_child_con_state_t; @@ -50,6 +50,7 @@ typedef struct http2_client_con_state_s htimer_t *ping_timer; tunnel_t *tunnel; line_t *line; + line_t *current_stream_write_line; const char *path; const char *host; // authority const char *scheme; diff --git a/tunnels/client/protobuf/protobuf_client.c b/tunnels/client/protobuf/protobuf_client.c index 57b5aa38..844b61e7 100644 --- a/tunnels/client/protobuf/protobuf_client.c +++ b/tunnels/client/protobuf/protobuf_client.c @@ -18,7 +18,7 @@ enum { kMaxPacketSize = (65536 * 1), kMaxRecvBeforeAck = (1 << 15), - kMaxSendBeforeAck = (1 << 21) + kMaxSendBeforeAck = (1 << 20) }; typedef struct protobuf_client_state_s @@ -124,7 +124,7 @@ static void downStream(tunnel_t *self, context_t *c) shiftr(full_data, sizeof(uint32_t)); cstate->bytes_sent_nack -= consumed; - if (cstate->bytes_sent_nack <=kMaxSendBeforeAck / 3) + if (cstate->bytes_sent_nack <=kMaxSendBeforeAck / 2) { resumeLineDownSide(c->line); } diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index 33919780..0de8e159 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -15,6 +15,9 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); // LOGD("callback end stream for: %d", stream_id); + // todo (optimize) nghttp2 is calling this callback even if we close the con ourselves + // this should be omitted + if (! stream) { return 0; diff --git a/tunnels/server/protobuf/protobuf_server.c b/tunnels/server/protobuf/protobuf_server.c index a598d811..3339d1db 100644 --- a/tunnels/server/protobuf/protobuf_server.c +++ b/tunnels/server/protobuf/protobuf_server.c @@ -20,7 +20,7 @@ enum { kMaxPacketSize = (65536 * 1), kMaxRecvBeforeAck = (1 << 15), - kMaxSendBeforeAck = (1 << 21) + kMaxSendBeforeAck = (1 << 20) }; typedef struct protobuf_server_state_s @@ -94,7 +94,7 @@ static void upStream(tunnel_t *self, context_t *c) cstate->bytes_sent_nack -= consumed; - if (cstate->bytes_sent_nack <= kMaxSendBeforeAck / 3) + if (cstate->bytes_sent_nack <= kMaxSendBeforeAck / 2) { resumeLineUpSide(c->line); }