diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 3abe4632..6fd0941d 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -188,20 +188,25 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame // // req->headers["Host"] = value; // } // } - http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (stream) + + if (strcmp((const char *) name, "grpc-ack") == 0) { - if (strcmp((const char *) name, "grpc-ack") == 0) + http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (stream) { const int consumed = atoi((const char *) value); - if (stream->bytes_sent_nack > kMaxSendBeforeAck) + if (stream->bytes_sent_nack >= kMaxSendBeforeAck) { stream->bytes_sent_nack -= consumed; - if (stream->bytes_sent_nack < kMaxSendBeforeAck / 2) + if (stream->bytes_sent_nack < kMaxSendBeforeAck) { resumeLineDownSide(stream->line); } } + else + { + stream->bytes_sent_nack -= consumed; + } } } @@ -253,14 +258,14 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream_data->payload = gdata_buf; stream->bytes_received_nack += bufLen(gdata_buf); - if (stream->bytes_sent_nack >= kMaxRecvBeforeAck) + if (stream->bytes_received_nack >= kMaxRecvBeforeAck) { nghttp2_nv nvs[1]; char stri[32]; - sprintf(stri, "%d", (int) stream->bytes_sent_nack); + sprintf(stri, "%d", (int) stream->bytes_received_nack); nvs[0] = makeNV("grpc-ack", stri); nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); - stream->bytes_sent_nack = 0; + stream->bytes_received_nack = 0; } stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data); @@ -282,14 +287,14 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream_data->payload = buf; stream->bytes_received_nack += bufLen(buf); - if (stream->bytes_sent_nack >= kMaxRecvBeforeAck) + if (stream->bytes_received_nack >= kMaxRecvBeforeAck) { nghttp2_nv nvs[1]; char stri[32]; - sprintf(stri, "%d", (int) stream->bytes_sent_nack); + sprintf(stri, "%d", (int) stream->bytes_received_nack); nvs[0] = makeNV("grpc-ack", stri); nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); - stream->bytes_sent_nack = 0; + stream->bytes_received_nack = 0; } stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data); diff --git a/tunnels/server/http2/helpers.h b/tunnels/server/http2/helpers.h index 3cb61d7b..6ff04cce 100644 --- a/tunnels/server/http2/helpers.h +++ b/tunnels/server/http2/helpers.h @@ -58,22 +58,22 @@ static void onH2LinePaused(void *arg) { http2_server_con_state_t *con = (http2_server_con_state_t *) arg; - ++(con->pause_counter); - if (con->pause_counter > 4) - { + // ++(con->pause_counter); + // if (con->pause_counter > 4) + // { http2_server_child_con_state_t *stream_i; for (stream_i = con->root.next; stream_i;) { pauseLineUpSide(stream_i->line); stream_i = stream_i->next; } - } + // } } static void onH2LineResumed(void *arg) { http2_server_con_state_t *con = (http2_server_con_state_t *) arg; - con->pause_counter = con->pause_counter > 0 ? (con->pause_counter - 1) : con->pause_counter; + // con->pause_counter = con->pause_counter > 0 ? (con->pause_counter - 1) : con->pause_counter; http2_server_child_con_state_t *stream_i; for (stream_i = con->root.next; stream_i;) { diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index 0f0011f7..8c188ac3 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -54,6 +54,26 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame { con->content_type = httpContentTypeEnum(value); } + else if (strcmp(name, "grpc-ack") == 0) + { + http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (stream) + { + const int consumed = atoi(value); + if (stream->bytes_sent_nack >= kMaxSendBeforeAck) + { + stream->bytes_sent_nack -= consumed; + if (stream->bytes_sent_nack < kMaxSendBeforeAck) + { + resumeLineUpSide(stream->line); + } + } + else + { + stream->bytes_sent_nack -= consumed; + } + } + } } return 0; @@ -109,6 +129,17 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->first_sent = true; stream_data->first = true; } + stream->bytes_received_nack += bufLen(buf); + if (stream->bytes_received_nack >= kMaxRecvBeforeAck) + { + nghttp2_nv nvs[1]; + char stri[32]; + sprintf(stri, "%d", (int) stream->bytes_received_nack); + nvs[0] = makeNV("grpc-ack", stri); + nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); + stream->bytes_received_nack = 0; + } + stream->tunnel->up->upStream(stream->tunnel->up, stream_data); if (nghttp2_session_get_stream_user_data(session, stream_id)) @@ -132,6 +163,17 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->first_sent = true; stream_data->first = true; } + stream->bytes_received_nack += bufLen(buf); + if (stream->bytes_received_nack >= kMaxRecvBeforeAck) + { + nghttp2_nv nvs[1]; + char stri[32]; + sprintf(stri, "%d", (int) stream->bytes_received_nack); + nvs[0] = makeNV("grpc-ack", stri); + nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); + stream->bytes_received_nack = 0; + } + stream->tunnel->up->upStream(stream->tunnel->up, stream_data); } @@ -260,6 +302,12 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ // HTTP2 DATA framehd con->state = kH2SendData; + http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, stream_id); + stream->bytes_sent_nack += bufLen(buf); + if (stream->bytes_sent_nack > kMaxSendBeforeAck) + { + pauseLineUpSide(stream->line); + } // LOGD("HTTP2 SEND_DATA_FRAME_HD...\n"); if (con->content_type == kApplicationGrpc) { @@ -478,7 +526,7 @@ api_result_t apiHttp2Server(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyHttp2Server(tunnel_t *self) @@ -490,5 +538,5 @@ tunnel_t *destroyHttp2Server(tunnel_t *self) tunnel_metadata_t getMetadataHttp2Server(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/http2/types.h b/tunnels/server/http2/types.h index bf134039..2144037c 100644 --- a/tunnels/server/http2/types.h +++ b/tunnels/server/http2/types.h @@ -24,6 +24,13 @@ typedef enum kH2RecvData, } http2_session_state; +enum +{ + + kMaxRecvBeforeAck = (1 << 16), + kMaxSendBeforeAck = (1 << 20) +}; + typedef struct http2_server_child_con_state_s { struct http2_server_child_con_state_s *prev, *next; @@ -35,6 +42,8 @@ typedef struct http2_server_child_con_state_s line_t *parent; line_t *line; tunnel_t *tunnel; + size_t bytes_sent_nack; + size_t bytes_received_nack; } http2_server_child_con_state_t; typedef struct http2_server_con_state_s