diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 14ab1b8f..3abe4632 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -27,7 +27,7 @@ static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id) self->up->upStream(self->up, endstream_ctx); } -static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t stream_id, shift_buffer_t *buf) +static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, int32_t stream_id, shift_buffer_t *buf) { line_t *line = con->line; @@ -71,6 +71,13 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t // HTTP2 DATA framehd con->state = kH2SendData; + http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, stream_id); + stream->bytes_sent_nack += bufLen(buf); + if (stream->bytes_sent_nack > kMaxSendBeforeAck) + { + pauseLineDownSide(stream->line); + } + // LOGD("HTTP2 SEND_DATA_FRAME_HD...\n"); if (con->content_type == kApplicationGrpc) { @@ -94,6 +101,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t framehd.stream_id = stream_id; shiftl(buf, HTTP2_FRAME_HDLEN); http2FrameHdPack(&framehd, rawBufMut(buf)); + context_t *req = newContext(line); req->payload = buf; self->up->upStream(self->up, req); @@ -140,7 +148,7 @@ static void flushWriteQueue(http2_client_con_state_t *con) static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *userdata) { - (void) name; + // (void) name; (void) session; (void) namelen; (void) value; @@ -152,7 +160,7 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame } // LOGD("onHeaderCallback\n"); - printFrameHd(&frame->hd); + // printFrameHd(&frame->hd); // const char *name = (const char *) _name; // const char *value = (const char *) _value; // LOGD("%s: %s\n", name, value); @@ -180,6 +188,22 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame // // req->headers["Host"] = value; // } // } + http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (stream) + { + if (strcmp((const char *) name, "grpc-ack") == 0) + { + const int consumed = atoi((const char *) value); + if (stream->bytes_sent_nack > kMaxSendBeforeAck) + { + stream->bytes_sent_nack -= consumed; + if (stream->bytes_sent_nack < kMaxSendBeforeAck / 2) + { + resumeLineDownSide(stream->line); + } + } + } + } return 0; } @@ -227,6 +251,18 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->bytes_needed = 0; context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; + + stream->bytes_received_nack += bufLen(gdata_buf); + if (stream->bytes_sent_nack >= kMaxRecvBeforeAck) + { + nghttp2_nv nvs[1]; + char stri[32]; + sprintf(stri, "%d", (int) stream->bytes_sent_nack); + nvs[0] = makeNV("grpc-ack", stri); + nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); + stream->bytes_sent_nack = 0; + } + stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data); if (nghttp2_session_get_stream_user_data(session, stream_id)) @@ -244,6 +280,18 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 writeRaw(buf, data, len); context_t *stream_data = newContext(stream->line); stream_data->payload = buf; + + stream->bytes_received_nack += bufLen(buf); + if (stream->bytes_sent_nack >= kMaxRecvBeforeAck) + { + nghttp2_nv nvs[1]; + char stri[32]; + sprintf(stri, "%d", (int) stream->bytes_sent_nack); + nvs[0] = makeNV("grpc-ack", stri); + nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); + stream->bytes_sent_nack = 0; + } + stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data); } @@ -510,7 +558,7 @@ tunnel_t *newHttp2Client(node_instance_context_t *instance_info) for (size_t i = 0; i < workers_count; i++) { - state->thread_cpool[i] = (thread_connection_pool_t){.round_index = 0, .cons = vec_cons_with_capacity(8)}; + state->thread_cpool[i] = (thread_connection_pool_t) {.round_index = 0, .cons = vec_cons_with_capacity(8)}; } if (! getStringFromJsonObject(&(state->host), settings, "host")) @@ -557,7 +605,7 @@ api_result_t apiHttp2Client(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyHttp2Client(tunnel_t *self) @@ -568,5 +616,5 @@ tunnel_t *destroyHttp2Client(tunnel_t *self) tunnel_metadata_t getMetadataHttp2Client(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index 5a3f0e6d..fc07ac82 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -24,16 +24,26 @@ typedef enum kH2RecvData, } http2_session_state; +enum{ + + kMaxRecvBeforeAck = (1 << 16), + kMaxSendBeforeAck = (1 << 20) +}; + typedef struct http2_client_child_con_state_s { struct http2_client_child_con_state_s *prev, *next; int32_t stream_id; nghttp2_stream *ng_stream; buffer_stream_t *chunkbs; // used for grpc - size_t bytes_needed; tunnel_t *tunnel; line_t *parent; line_t *line; + size_t bytes_needed; + size_t bytes_sent_nack; + size_t bytes_received_nack; + + } http2_client_child_con_state_t; diff --git a/tunnels/server/http2/helpers.h b/tunnels/server/http2/helpers.h index 7e0814f6..3cb61d7b 100644 --- a/tunnels/server/http2/helpers.h +++ b/tunnels/server/http2/helpers.h @@ -5,7 +5,7 @@ #define kMaxConcurrentStreams 0xffffffffU // NOLINT -static nghttp2_nv makeNv(const char *name, const char *value) +static nghttp2_nv makeNV(const char *name, const char *value) { nghttp2_nv nv; nv.name = (uint8_t *) name; diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index a276a27e..0f0011f7 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -201,12 +201,12 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr nghttp2_nv nvs[10]; int nvlen = 0; - nvs[nvlen++] = makeNv(":status", "200"); + nvs[nvlen++] = makeNV(":status", "200"); if (con->content_type == kApplicationGrpc) { // correct content_type: application/grpc - nvs[nvlen++] = makeNv("content-type", httpContentTypeStr(kApplicationGrpc)); - nvs[nvlen++] = makeNv("accept-encoding", "identity"); + nvs[nvlen++] = makeNV("content-type", httpContentTypeStr(kApplicationGrpc)); + nvs[nvlen++] = makeNV("accept-encoding", "identity"); } int flags = NGHTTP2_FLAG_END_HEADERS; @@ -403,7 +403,7 @@ static void downStream(tunnel_t *self, context_t *c) int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS; if (con->content_type == kApplicationGrpc) { - nghttp2_nv nv = makeNv("grpc-status", "0"); + nghttp2_nv nv = makeNV("grpc-status", "0"); nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, &nv, 1, NULL); } else