Skip to content

Commit

Permalink
flow control h2 client
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 7, 2024
1 parent 9188002 commit 08f8f56
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 12 deletions.
60 changes: 54 additions & 6 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id)
self->up->upStream(self->up, endstream_ctx);
}

static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t stream_id, shift_buffer_t *buf)
static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, int32_t stream_id, shift_buffer_t *buf)
{
line_t *line = con->line;

Expand Down Expand Up @@ -71,6 +71,13 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
// HTTP2 DATA framehd
con->state = kH2SendData;

http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, stream_id);
stream->bytes_sent_nack += bufLen(buf);
if (stream->bytes_sent_nack > kMaxSendBeforeAck)
{
pauseLineDownSide(stream->line);
}

// LOGD("HTTP2 SEND_DATA_FRAME_HD...\n");
if (con->content_type == kApplicationGrpc)
{
Expand All @@ -94,6 +101,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t
framehd.stream_id = stream_id;
shiftl(buf, HTTP2_FRAME_HDLEN);
http2FrameHdPack(&framehd, rawBufMut(buf));

context_t *req = newContext(line);
req->payload = buf;
self->up->upStream(self->up, req);
Expand Down Expand Up @@ -140,7 +148,7 @@ static void flushWriteQueue(http2_client_con_state_t *con)
static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen, uint8_t flags, void *userdata)
{
(void) name;
// (void) name;
(void) session;
(void) namelen;
(void) value;
Expand All @@ -152,7 +160,7 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
}

// LOGD("onHeaderCallback\n");
printFrameHd(&frame->hd);
// printFrameHd(&frame->hd);
// const char *name = (const char *) _name;
// const char *value = (const char *) _value;
// LOGD("%s: %s\n", name, value);
Expand Down Expand Up @@ -180,6 +188,22 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame
// // req->headers["Host"] = value;
// }
// }
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (stream)
{
if (strcmp((const char *) name, "grpc-ack") == 0)
{
const int consumed = atoi((const char *) value);
if (stream->bytes_sent_nack > kMaxSendBeforeAck)
{
stream->bytes_sent_nack -= consumed;
if (stream->bytes_sent_nack < kMaxSendBeforeAck / 2)
{
resumeLineDownSide(stream->line);
}
}
}
}

return 0;
}
Expand Down Expand Up @@ -227,6 +251,18 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->bytes_needed = 0;
context_t *stream_data = newContext(stream->line);
stream_data->payload = gdata_buf;

stream->bytes_received_nack += bufLen(gdata_buf);
if (stream->bytes_sent_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_sent_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_sent_nack = 0;
}

stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);

if (nghttp2_session_get_stream_user_data(session, stream_id))
Expand All @@ -244,6 +280,18 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
writeRaw(buf, data, len);
context_t *stream_data = newContext(stream->line);
stream_data->payload = buf;

stream->bytes_received_nack += bufLen(buf);
if (stream->bytes_sent_nack >= kMaxRecvBeforeAck)
{
nghttp2_nv nvs[1];
char stri[32];
sprintf(stri, "%d", (int) stream->bytes_sent_nack);
nvs[0] = makeNV("grpc-ack", stri);
nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL);
stream->bytes_sent_nack = 0;
}

stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);
}

Expand Down Expand Up @@ -510,7 +558,7 @@ tunnel_t *newHttp2Client(node_instance_context_t *instance_info)

for (size_t i = 0; i < workers_count; i++)
{
state->thread_cpool[i] = (thread_connection_pool_t){.round_index = 0, .cons = vec_cons_with_capacity(8)};
state->thread_cpool[i] = (thread_connection_pool_t) {.round_index = 0, .cons = vec_cons_with_capacity(8)};
}

if (! getStringFromJsonObject(&(state->host), settings, "host"))
Expand Down Expand Up @@ -557,7 +605,7 @@ api_result_t apiHttp2Client(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
return (api_result_t) {0};
}

tunnel_t *destroyHttp2Client(tunnel_t *self)
Expand All @@ -568,5 +616,5 @@ tunnel_t *destroyHttp2Client(tunnel_t *self)

tunnel_metadata_t getMetadataHttp2Client(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
return (tunnel_metadata_t) {.version = 0001, .flags = 0x0};
}
12 changes: 11 additions & 1 deletion tunnels/client/http2/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,26 @@ typedef enum
kH2RecvData,
} http2_session_state;

enum{

kMaxRecvBeforeAck = (1 << 16),
kMaxSendBeforeAck = (1 << 20)
};

typedef struct http2_client_child_con_state_s
{
struct http2_client_child_con_state_s *prev, *next;
int32_t stream_id;
nghttp2_stream *ng_stream;
buffer_stream_t *chunkbs; // used for grpc
size_t bytes_needed;
tunnel_t *tunnel;
line_t *parent;
line_t *line;
size_t bytes_needed;
size_t bytes_sent_nack;
size_t bytes_received_nack;



} http2_client_child_con_state_t;

Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#define kMaxConcurrentStreams 0xffffffffU // NOLINT

static nghttp2_nv makeNv(const char *name, const char *value)
static nghttp2_nv makeNV(const char *name, const char *value)
{
nghttp2_nv nv;
nv.name = (uint8_t *) name;
Expand Down
8 changes: 4 additions & 4 deletions tunnels/server/http2/http2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr

nghttp2_nv nvs[10];
int nvlen = 0;
nvs[nvlen++] = makeNv(":status", "200");
nvs[nvlen++] = makeNV(":status", "200");
if (con->content_type == kApplicationGrpc)
{
// correct content_type: application/grpc
nvs[nvlen++] = makeNv("content-type", httpContentTypeStr(kApplicationGrpc));
nvs[nvlen++] = makeNv("accept-encoding", "identity");
nvs[nvlen++] = makeNV("content-type", httpContentTypeStr(kApplicationGrpc));
nvs[nvlen++] = makeNV("accept-encoding", "identity");
}

int flags = NGHTTP2_FLAG_END_HEADERS;
Expand Down Expand Up @@ -403,7 +403,7 @@ static void downStream(tunnel_t *self, context_t *c)
int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS;
if (con->content_type == kApplicationGrpc)
{
nghttp2_nv nv = makeNv("grpc-status", "0");
nghttp2_nv nv = makeNV("grpc-status", "0");
nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, &nv, 1, NULL);
}
else
Expand Down

0 comments on commit 08f8f56

Please sign in to comment.