Skip to content

Commit

Permalink
rework http2/mux
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 16, 2024
1 parent 8a1ddd5 commit fbd4dc6
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 21 deletions.
32 changes: 23 additions & 9 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,26 @@ static void onStreamLineResumed(void *arg)

static void onH2LinePaused(void *arg)
{
http2_client_con_state_t *con = (http2_client_con_state_t *) arg;
http2_client_con_state_t *con = (http2_client_con_state_t *) arg;
tunnel_t *self = con->tunnel;

line_t *stream_line = con->current_stream_write_line;
if (stream_line && isAlive(stream_line))
{
http2_client_child_con_state_t *stream = LSTATE(stream_line);
stream->paused = true;
pauseLineDownSide(stream->line);
}

// ++(con->pause_counter);
// if (con->pause_counter > 8)
// {
http2_client_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
pauseLineDownSide(stream_i->line);
stream_i = stream_i->next;
}
// http2_client_child_con_state_t *stream_i;
// for (stream_i = con->root.next; stream_i;)
// {
// pauseLineDownSide(stream_i->line);
// stream_i = stream_i->next;
// }
// }
}

Expand All @@ -63,7 +73,11 @@ static void onH2LineResumed(void *arg)
http2_client_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
resumeLineDownSide(stream_i->line);
if (stream_i->paused)
{
stream_i->paused = false;
resumeLineDownSide(stream_i->line);
}
stream_i = stream_i->next;
}
}
Expand Down Expand Up @@ -214,7 +228,7 @@ static void deleteHttp2Connection(http2_client_con_state_t *con)
}
unLockLine(k.ref->stream_line);
}

action_queue_t_drop(&con->actions);
doneLineDownSide(con->line);
LSTATE_DROP(con->line);
Expand Down
22 changes: 15 additions & 7 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u
http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id);
// LOGD("callback end stream for: %d", stream_id);

// todo (optimize) nghttp2 is calling this callback even if we close the con ourselves
// this should be omitted

if (! stream)
{
return 0;
Expand Down Expand Up @@ -110,8 +114,9 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
writeRaw(buf, data, len);
lockLine(stream->line);

action_queue_t_push(&con->actions,
(http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf});
action_queue_t_push(
&con->actions,
(http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf});

return 0;
}
Expand Down Expand Up @@ -185,7 +190,6 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
return 0;
}


static void sendStreamData(http2_client_con_state_t *con, http2_client_child_con_state_t *stream, shift_buffer_t *buf)
{
http2_flag flags = kHttP2FlagNone;
Expand All @@ -212,9 +216,13 @@ static void sendStreamData(http2_client_con_state_t *con, http2_client_child_con
framehd.stream_id = stream->stream_id;
shiftl(buf, HTTP2_FRAME_HDLEN);
http2FrameHdPack(&framehd, rawBufMut(buf));
context_t *data = newContext(con->line);
data->payload = buf;
context_t *data = newContext(con->line);
data->payload = buf;
con->current_stream_write_line = stream->line;
lockLine(stream->line);
con->tunnel->up->upStream(con->tunnel->up, data);
unLockLine(stream->line);
con->current_stream_write_line = NULL;
}

static bool sendNgHttp2Data(tunnel_t *self, http2_client_con_state_t *con)
Expand Down Expand Up @@ -310,7 +318,7 @@ static void doHttp2Action(const http2_action_t action, http2_client_con_state_t
{
shift_buffer_t *buf = action.buf;
context_t *stream_data = newContext(stream->line);
stream_data->payload = buf;
stream_data->payload = buf;
stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);
}
if (! isAlive(action.stream_line) || ! isAlive(main_line))
Expand All @@ -331,7 +339,7 @@ static void doHttp2Action(const http2_action_t action, http2_client_con_state_t
}
break;
case kActionConData: {
sendStreamData(con,stream,action.buf);
sendStreamData(con, stream, action.buf);
}
break;
case kActionInvalid:
Expand Down
3 changes: 2 additions & 1 deletion tunnels/client/http2/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "loggers/network_logger.h"
#include "nghttp2/nghttp2.h"


enum http2_actions
{
kActionInvalid,
Expand Down Expand Up @@ -38,6 +37,7 @@ typedef struct http2_client_child_con_state_s
line_t *line;
size_t grpc_bytes_needed;
int32_t stream_id;
bool paused;

} http2_client_child_con_state_t;

Expand All @@ -50,6 +50,7 @@ typedef struct http2_client_con_state_s
htimer_t *ping_timer;
tunnel_t *tunnel;
line_t *line;
line_t *current_stream_write_line;
const char *path;
const char *host; // authority
const char *scheme;
Expand Down
4 changes: 2 additions & 2 deletions tunnels/client/protobuf/protobuf_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum
{
kMaxPacketSize = (65536 * 1),
kMaxRecvBeforeAck = (1 << 15),
kMaxSendBeforeAck = (1 << 21)
kMaxSendBeforeAck = (1 << 20)
};

typedef struct protobuf_client_state_s
Expand Down Expand Up @@ -124,7 +124,7 @@ static void downStream(tunnel_t *self, context_t *c)
shiftr(full_data, sizeof(uint32_t));
cstate->bytes_sent_nack -= consumed;

if (cstate->bytes_sent_nack <=kMaxSendBeforeAck / 3)
if (cstate->bytes_sent_nack <=kMaxSendBeforeAck / 2)
{
resumeLineDownSide(c->line);
}
Expand Down
3 changes: 3 additions & 0 deletions tunnels/server/http2/http2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u
http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id);
// LOGD("callback end stream for: %d", stream_id);

// todo (optimize) nghttp2 is calling this callback even if we close the con ourselves
// this should be omitted

if (! stream)
{
return 0;
Expand Down
4 changes: 2 additions & 2 deletions tunnels/server/protobuf/protobuf_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ enum
{
kMaxPacketSize = (65536 * 1),
kMaxRecvBeforeAck = (1 << 15),
kMaxSendBeforeAck = (1 << 21)
kMaxSendBeforeAck = (1 << 20)
};

typedef struct protobuf_server_state_s
Expand Down Expand Up @@ -94,7 +94,7 @@ static void upStream(tunnel_t *self, context_t *c)

cstate->bytes_sent_nack -= consumed;

if (cstate->bytes_sent_nack <= kMaxSendBeforeAck / 3)
if (cstate->bytes_sent_nack <= kMaxSendBeforeAck / 2)
{
resumeLineUpSide(c->line);
}
Expand Down

0 comments on commit fbd4dc6

Please sign in to comment.