Skip to content

Commit

Permalink
finish h2 flowcontrol
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 7, 2024
1 parent 333389c commit aa7cbcc
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 18 deletions.
27 changes: 16 additions & 11 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,25 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
// // req->headers["Host"] = value;
// }
// }
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (stream)

if (strcmp((const char *) name, "grpc-ack") == 0)
{
if (strcmp((const char *) name, "grpc-ack") == 0)
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (stream)
{
const int consumed = atoi((const char *) value);
if (stream->bytes_sent_nack > kMaxSendBeforeAck)
if (stream->bytes_sent_nack >= kMaxSendBeforeAck)
{
stream->bytes_sent_nack -= consumed;
if (stream->bytes_sent_nack < kMaxSendBeforeAck / 2)
if (stream->bytes_sent_nack < kMaxSendBeforeAck)
{
resumeLineDownSide(stream->line);
}
}
else
{
stream->bytes_sent_nack -= consumed;
}
}
}

Expand Down Expand Up @@ -253,14 +258,14 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream_data->payload = gdata_buf;

stream->bytes_received_nack += bufLen(gdata_buf);
if (stream->bytes_sent_nack >= kMaxRecvBeforeAck)
if (stream->bytes_received_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_sent_nack);
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_sent_nack = 0;
stream->bytes_received_nack = 0;
}

stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);
Expand All @@ -282,14 +287,14 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream_data->payload = buf;

stream->bytes_received_nack += bufLen(buf);
if (stream->bytes_sent_nack >= kMaxRecvBeforeAck)
if (stream->bytes_received_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_sent_nack);
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_sent_nack = 0;
stream->bytes_received_nack = 0;
}

stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);
Expand Down
10 changes: 5 additions & 5 deletions tunnels/server/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,22 @@ static void onH2LinePaused(void *arg)
{
http2_server_con_state_t *con = (http2_server_con_state_t *) arg;

++(con->pause_counter);
if (con->pause_counter > 4)
{
// ++(con->pause_counter);
// if (con->pause_counter > 4)
// {
http2_server_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
pauseLineUpSide(stream_i->line);
stream_i = stream_i->next;
}
}
// }
}

static void onH2LineResumed(void *arg)
{
http2_server_con_state_t *con = (http2_server_con_state_t *) arg;
con->pause_counter = con->pause_counter > 0 ? (con->pause_counter - 1) : con->pause_counter;
// con->pause_counter = con->pause_counter > 0 ? (con->pause_counter - 1) : con->pause_counter;
http2_server_child_con_state_t *stream_i;
for (stream_i = con->root.next; stream_i;)
{
Expand Down
52 changes: 50 additions & 2 deletions tunnels/server/http2/http2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
{
con->content_type = httpContentTypeEnum(value);
}
else if (strcmp(name, "grpc-ack") == 0)
{
http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (stream)
{
const int consumed = atoi(value);
if (stream->bytes_sent_nack >= kMaxSendBeforeAck)
{
stream->bytes_sent_nack -= consumed;
if (stream->bytes_sent_nack < kMaxSendBeforeAck)
{
resumeLineUpSide(stream->line);
}
}
else
{
stream->bytes_sent_nack -= consumed;
}
}
}
}

return 0;
Expand Down Expand Up @@ -109,6 +129,17 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->first_sent = true;
stream_data->first = true;
}
stream->bytes_received_nack += bufLen(buf);
if (stream->bytes_received_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_received_nack = 0;
}

stream->tunnel->up->upStream(stream->tunnel->up, stream_data);

if (nghttp2_session_get_stream_user_data(session, stream_id))
Expand All @@ -132,6 +163,17 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->first_sent = true;
stream_data->first = true;
}
stream->bytes_received_nack += bufLen(buf);
if (stream->bytes_received_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_received_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_received_nack = 0;
}

stream->tunnel->up->upStream(stream->tunnel->up, stream_data);
}

Expand Down Expand Up @@ -260,6 +302,12 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_
// HTTP2 DATA framehd
con->state = kH2SendData;

http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, stream_id);
stream->bytes_sent_nack += bufLen(buf);
if (stream->bytes_sent_nack > kMaxSendBeforeAck)
{
pauseLineUpSide(stream->line);
}
// LOGD("HTTP2 SEND_DATA_FRAME_HD...\n");
if (con->content_type == kApplicationGrpc)
{
Expand Down Expand Up @@ -478,7 +526,7 @@ api_result_t apiHttp2Server(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
return (api_result_t) {0};
}

tunnel_t *destroyHttp2Server(tunnel_t *self)
Expand All @@ -490,5 +538,5 @@ tunnel_t *destroyHttp2Server(tunnel_t *self)

tunnel_metadata_t getMetadataHttp2Server(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
return (tunnel_metadata_t) {.version = 0001, .flags = 0x0};
}
9 changes: 9 additions & 0 deletions tunnels/server/http2/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ typedef enum
kH2RecvData,
} http2_session_state;

enum
{

kMaxRecvBeforeAck = (1 << 16),
kMaxSendBeforeAck = (1 << 20)
};

typedef struct http2_server_child_con_state_s
{
struct http2_server_child_con_state_s *prev, *next;
Expand All @@ -35,6 +42,8 @@ typedef struct http2_server_child_con_state_s
line_t *parent;
line_t *line;
tunnel_t *tunnel;
size_t bytes_sent_nack;
size_t bytes_received_nack;
} http2_server_child_con_state_t;

typedef struct http2_server_con_state_s
Expand Down

0 comments on commit aa7cbcc

Please sign in to comment.