From b214fdccd372ce6361b7462a11a1c9d34b90eb91 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Wed, 10 Jul 2024 14:24:39 +0000 Subject: [PATCH] [WIP] 90% redesign h2/mut from scratch --- tunnels/client/http2/helpers.h | 58 ++++++------- tunnels/client/http2/http2_client.c | 117 +++++++++++++------------ tunnels/client/http2/types.h | 46 +++++----- tunnels/server/http2/helpers.h | 56 ++++++------ tunnels/server/http2/http2_server.c | 128 ++++++++++++++++------------ 5 files changed, 217 insertions(+), 188 deletions(-) diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index a23d0125..03dcba97 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -47,12 +47,12 @@ static void onH2LinePaused(void *arg) // ++(con->pause_counter); // if (con->pause_counter > 8) // { - http2_client_child_con_state_t *stream_i; - for (stream_i = con->root.next; stream_i;) - { - pauseLineDownSide(stream_i->line); - stream_i = stream_i->next; - } + http2_client_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + pauseLineDownSide(stream_i->line); + stream_i = stream_i->next; + } // } } @@ -117,8 +117,9 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_ nvs[nvlen++] = (makeNV("content-type", "application/grpc+proto")); } // todo (match chrome) this one is same as curl, but not same as chrome - nvs[nvlen++] = makeNV("Accept", "*/*"); - //chrome: "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"); + nvs[nvlen++] = makeNV("Accept", "*/*"); + // chrome: + // "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"); nvs[nvlen++] = makeNV("Accept-Language", "en,fa;q=0.9,zh-CN;q=0.8,zh;q=0.7"); nvs[nvlen++] = makeNV("Cache-Control", "no-cache"); @@ -132,11 +133,11 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_ http2_client_child_con_state_t *stream = wwmGlobalMalloc(sizeof(http2_client_child_con_state_t)); memset(stream, 0, sizeof(http2_client_child_con_state_t)); // stream->stream_id = nghttp2_submit_request2(con->session, NULL, &nvs[0], nvlen, NULL,stream); - stream->stream_id = nghttp2_submit_headers(con->session, flags, -1, NULL, &nvs[0], nvlen, stream); - stream->chunkbs = newBufferStream(getLineBufferPool(con->line)); - stream->parent = con->line; - stream->line = child_line; - stream->tunnel = con->tunnel; + stream->stream_id = nghttp2_submit_headers(con->session, flags, -1, NULL, &nvs[0], nvlen, stream); + stream->grpc_buffer_stream = newBufferStream(getLineBufferPool(con->line)); + stream->parent = con->line; + stream->line = child_line; + stream->tunnel = con->tunnel; LSTATE_I_MUT(stream->line, stream->tunnel->chain_index) = stream; setupLineUpSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed); @@ -150,7 +151,7 @@ static void deleteHttp2Stream(http2_client_child_con_state_t *stream) { LSTATE_I_DROP(stream->line, stream->tunnel->chain_index); - destroyBufferStream(stream->chunkbs); + destroyBufferStream(stream->grpc_buffer_stream); doneLineUpSide(stream->line); wwmGlobalFree(stream); } @@ -159,21 +160,18 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid) { http2_client_state_t *state = TSTATE(self); http2_client_con_state_t *con = wwmGlobalMalloc(sizeof(http2_client_con_state_t)); - - *con = (http2_client_con_state_t){ - .queue = newContextQueue(), - .content_type = state->content_type, - .path = state->path, - .host = state->host, - .host_port = state->host_port, - .scheme = state->scheme, - .state = kH2SendMagic, - .method = state->content_type == kApplicationGrpc? kHttpPost: kHttpGet, - .line = newLine(tid), - .ping_timer = htimer_add(loops[tid], onPingTimer, kPingInterval, INFINITE), - .tunnel = self, - }; - LSTATE_MUT(con->line) = con; + *con = (http2_client_con_state_t) {.queue = newContextQueue(), + .content_type = state->content_type, + .path = state->path, + .host = state->host, + .host_port = state->host_port, + .scheme = state->scheme, + .state = kH2SendMagic, + .method = state->content_type == kApplicationGrpc ? kHttpPost : kHttpGet, + .line = newLine(tid), + .ping_timer = htimer_add(loops[tid], onPingTimer, kPingInterval, INFINITE), + .tunnel = self}; + LSTATE_MUT(con->line) = con; setupLineDownSide(con->line, onH2LinePaused, con, onH2LineResumed); hevent_set_userdata(con->ping_timer, con); @@ -185,8 +183,6 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid) }; nghttp2_submit_settings(con->session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings)); - - return con; } static void deleteHttp2Connection(http2_client_con_state_t *con) diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 6fd0941d..2624baca 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -11,20 +11,26 @@ enum kDefaultConcurrency = 64 // cons will be muxed into 1 }; -static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id) +static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *userdata) { - http2_frame_hd framehd; - shift_buffer_t *buf = popBuffer(getLineBufferPool(line)); - setLen(buf, HTTP2_FRAME_HDLEN); - - framehd.length = 0; - framehd.type = kHttP2Data; - framehd.flags = kHttP2FlagEndStream; - framehd.stream_id = stream_id; - http2FrameHdPack(&framehd, rawBufMut(buf)); - context_t *endstream_ctx = newContext(line); - endstream_ctx->payload = buf; - self->up->upStream(self->up, endstream_ctx); + (void) stream_id; + (void) error_code; + + http2_client_con_state_t *con = (http2_client_con_state_t *) userdata; + tunnel_t *self = con->tunnel; + http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); + if (! stream) + { + return 0; + } + context_t *fc = newFinContext(stream->line); + CSTATE_DROP(fc); + tunnel_t *dest = stream->tunnel->dw; + removeStream(con, stream); + deleteHttp2Stream(stream); + dest->downStream(dest, fc); + + return 0; } static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, int32_t stream_id, shift_buffer_t *buf) @@ -189,15 +195,18 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame // } // } - if (strcmp((const char *) name, "grpc-ack") == 0) + if (strcmp((const char *) name, "custom-ack") == 0) { http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (stream) { const int consumed = atoi((const char *) value); + if (stream->bytes_sent_nack >= kMaxSendBeforeAck) { stream->bytes_sent_nack -= consumed; + LOGD("consumed: %d left: %d",consumed,(int)stream->bytes_sent_nack); + if (stream->bytes_sent_nack < kMaxSendBeforeAck) { resumeLineDownSide(stream->line); @@ -206,6 +215,8 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame else { stream->bytes_sent_nack -= consumed; + LOGD("consumed: %d left: %d",consumed,(int)stream->bytes_sent_nack); + } } } @@ -238,22 +249,22 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 shift_buffer_t *buf = popBuffer(getLineBufferPool(con->line)); setLen(buf, len); writeRaw(buf, data, len); - bufferStreamPush(stream->chunkbs, buf); + bufferStreamPush(stream->grpc_buffer_stream, buf); while (true) { - if (stream->bytes_needed == 0 && bufferStreamLen(stream->chunkbs) >= GRPC_MESSAGE_HDLEN) + if (stream->grpc_bytes_needed == 0 && bufferStreamLen(stream->grpc_buffer_stream) >= GRPC_MESSAGE_HDLEN) { - shift_buffer_t *gheader_buf = bufferStreamRead(stream->chunkbs, GRPC_MESSAGE_HDLEN); + shift_buffer_t *gheader_buf = bufferStreamRead(stream->grpc_buffer_stream, GRPC_MESSAGE_HDLEN); grpc_message_hd msghd; grpcMessageHdUnpack(&msghd, rawBuf(gheader_buf)); - stream->bytes_needed = msghd.length; + stream->grpc_bytes_needed = msghd.length; reuseBuffer(getLineBufferPool(con->line), gheader_buf); } - if (stream->bytes_needed > 0 && bufferStreamLen(stream->chunkbs) >= stream->bytes_needed) + if (stream->grpc_bytes_needed > 0 && bufferStreamLen(stream->grpc_buffer_stream) >= stream->grpc_bytes_needed) { - shift_buffer_t *gdata_buf = bufferStreamRead(stream->chunkbs, stream->bytes_needed); - stream->bytes_needed = 0; + shift_buffer_t *gdata_buf = bufferStreamRead(stream->grpc_buffer_stream, stream->grpc_bytes_needed); + stream->grpc_bytes_needed = 0; context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; @@ -263,7 +274,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 nghttp2_nv nvs[1]; char stri[32]; sprintf(stri, "%d", (int) stream->bytes_received_nack); - nvs[0] = makeNV("grpc-ack", stri); + nvs[0] = makeNV("custom-ack", stri); nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); stream->bytes_received_nack = 0; } @@ -292,7 +303,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 nghttp2_nv nvs[1]; char stri[32]; sprintf(stri, "%d", (int) stream->bytes_received_nack); - nvs[0] = makeNV("grpc-ack", stri); + nvs[0] = makeNV("custom-ack", stri); nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); stream->bytes_received_nack = 0; } @@ -305,6 +316,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *frame, void *userdata) { + (void) session; if (WW_UNLIKELY(userdata == NULL)) { return 0; @@ -312,8 +324,8 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr // LOGD("onFrameRecvCallback\n"); printFrameHd(&frame->hd); - http2_client_con_state_t *con = (http2_client_con_state_t *) userdata; - tunnel_t *self = con->tunnel; + http2_client_con_state_t *con = (http2_client_con_state_t *) userdata; + // tunnel_t *self = con->tunnel; switch (frame->hd.type) { @@ -338,25 +350,6 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr default: break; } - if (frame->hd.flags & kHttP2FlagEndStream) - { - http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (! stream) - { - return 0; - } - // http2_client_state_t *state = TSTATE(self); - resumeLineUpSide(stream->parent); - nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); - context_t *fc = newFinContext(stream->line); - CSTATE_DROP(fc); - tunnel_t *dest = stream->tunnel->dw; - removeStream(con, stream); - deleteHttp2Stream(stream); - dest->downStream(dest, fc); - - return 0; - } if ((frame->hd.type & NGHTTP2_HEADERS) == NGHTTP2_HEADERS) { @@ -440,23 +433,40 @@ static void upStream(tunnel_t *self, context_t *c) http2_client_con_state_t *con = LSTATE(stream->parent); CSTATE_DROP(c); - if (con->content_type == kApplicationGrpc && con->handshake_completed) + // LOGE("closed %d",stream->stream_id); + int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS; + if (con->content_type == kApplicationGrpc) { - lockLine(con->line); - sendGrpcFinalData(self, con->line, stream->stream_id); - if (! isAlive(con->line)) + nghttp2_nv nv = makeNV("grpc-status", "0"); + nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, &nv, 1, NULL); + } + else + { + nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL); + } + + nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); + removeStream(con, stream); + deleteHttp2Stream(stream); + + + lockLine(con->line); + while (trySendRequest(self, con, 0, NULL)) + { + if (! isAlive(c->line)) { unLockLine(con->line); destroyContext(c); return; } + } + if (! isAlive(con->line)) + { unLockLine(con->line); + destroyContext(c); + return; } - - resumeLineUpSide(con->line); - nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); - removeStream(con, stream); - deleteHttp2Stream(stream); + unLockLine(con->line); if (con->root.next == NULL && con->childs_added >= state->concurrency && isAlive(con->line)) { @@ -560,6 +570,7 @@ tunnel_t *newHttp2Client(node_instance_context_t *instance_info) nghttp2_session_callbacks_set_on_header_callback(state->cbs, onHeaderCallback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback(state->cbs, onDataChunkRecvCallback); nghttp2_session_callbacks_set_on_frame_recv_callback(state->cbs, onFrameRecvCallback); + nghttp2_session_callbacks_set_on_stream_close_callback(state->cbs, onStreamClosedCallback); for (size_t i = 0; i < workers_count; i++) { diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index fc07ac82..e7ce1e19 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -24,53 +24,51 @@ typedef enum kH2RecvData, } http2_session_state; -enum{ +enum +{ kMaxRecvBeforeAck = (1 << 16), - kMaxSendBeforeAck = (1 << 20) + kMaxSendBeforeAck = (1 << 19) }; typedef struct http2_client_child_con_state_s { struct http2_client_child_con_state_s *prev, *next; - int32_t stream_id; nghttp2_stream *ng_stream; - buffer_stream_t *chunkbs; // used for grpc + buffer_stream_t *grpc_buffer_stream; tunnel_t *tunnel; line_t *parent; line_t *line; - size_t bytes_needed; + size_t grpc_bytes_needed; size_t bytes_sent_nack; size_t bytes_received_nack; - - + int32_t stream_id; } http2_client_child_con_state_t; typedef struct http2_client_con_state_s { - - nghttp2_session *session; http2_session_state state; + http2_client_child_con_state_t root; + nghttp2_session *session; context_queue_t *queue; + htimer_t *ping_timer; + tunnel_t *tunnel; + line_t *line; + const char *path; + const char *host; // authority + const char *scheme; + enum http_method method; + enum http_content_type content_type; size_t childs_added; uint32_t pause_counter; int error; int frame_type_when_stream_closed; - bool handshake_completed; - enum http_method method; - enum http_content_type content_type; - const char *path; - const char *host; // authority int host_port; - const char *scheme; + bool handshake_completed; bool init_sent; bool first_sent; bool no_ping_ack; - htimer_t *ping_timer; - tunnel_t *tunnel; - line_t *line; - http2_client_child_con_state_t root; } http2_client_con_state_t; @@ -81,20 +79,20 @@ typedef struct http2_client_con_state_s typedef struct thread_connection_pool_s { - size_t round_index; vec_cons cons; + size_t round_index; } thread_connection_pool_t; typedef struct http2_client_state_s { nghttp2_session_callbacks *cbs; - enum http_content_type content_type; - size_t concurrency; + nghttp2_option *ngoptions; + char *scheme; char *path; char *host; // authority + enum http_content_type content_type; + size_t concurrency; int host_port; - char *scheme; int last_iid; - nghttp2_option *ngoptions; thread_connection_pool_t thread_cpool[]; } http2_client_state_t; diff --git a/tunnels/server/http2/helpers.h b/tunnels/server/http2/helpers.h index 6ff04cce..c8e1bd2f 100644 --- a/tunnels/server/http2/helpers.h +++ b/tunnels/server/http2/helpers.h @@ -56,24 +56,18 @@ static void onStreamLineResumed(void *arg) static void onH2LinePaused(void *arg) { - http2_server_con_state_t *con = (http2_server_con_state_t *) arg; - - // ++(con->pause_counter); - // if (con->pause_counter > 4) - // { - http2_server_child_con_state_t *stream_i; - for (stream_i = con->root.next; stream_i;) - { - pauseLineUpSide(stream_i->line); - stream_i = stream_i->next; - } - // } + http2_server_con_state_t *con = (http2_server_con_state_t *) arg; + http2_server_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + pauseLineUpSide(stream_i->line); + stream_i = stream_i->next; + } } static void onH2LineResumed(void *arg) { - http2_server_con_state_t *con = (http2_server_con_state_t *) arg; - // con->pause_counter = con->pause_counter > 0 ? (con->pause_counter - 1) : con->pause_counter; + http2_server_con_state_t *con = (http2_server_con_state_t *) arg; http2_server_child_con_state_t *stream_i; for (stream_i = con->root.next; stream_i;) { @@ -82,19 +76,23 @@ static void onH2LineResumed(void *arg) } } -static http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_t *con, line_t *this_line, tunnel_t *self, - int32_t stream_id) +static http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_t *con, line_t *this_line, + tunnel_t *self, int32_t stream_id) { - http2_server_child_con_state_t *stream; - stream = wwmGlobalMalloc(sizeof(http2_server_child_con_state_t)); - memset(stream, 0, sizeof(http2_server_child_con_state_t)); - - stream->stream_id = stream_id; - stream->chunkbs = newBufferStream(getLineBufferPool(this_line)); - stream->parent = this_line; - stream->line = newLine(this_line->tid); + http2_server_child_con_state_t *stream = wwmGlobalMalloc(sizeof(http2_server_child_con_state_t)); + + *stream = (http2_server_child_con_state_t) {.stream_id = stream_id, + .grpc_buffer_stream = NULL, + .parent = this_line, + .line = newLine(this_line->tid), + .tunnel = self}; + + if (con->content_type == kApplicationGrpc) + { + stream->grpc_buffer_stream = newBufferStream(getLineBufferPool(this_line)); + } + LSTATE_MUT(stream->line) = stream; - stream->tunnel = self; nghttp2_session_set_stream_user_data(con->session, stream_id, stream); setupLineDownSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed); @@ -103,7 +101,13 @@ static http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_ static void deleteHttp2Stream(http2_server_child_con_state_t *stream) { LSTATE_I_DROP(stream->line, stream->tunnel->chain_index); - destroyBufferStream(stream->chunkbs); + + if (stream->grpc_buffer_stream) + { + destroyBufferStream(stream->grpc_buffer_stream); + } + + doneLineDownSide(stream->line); destroyLine(stream->line); if (stream->request_path) diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index 8c188ac3..bab59651 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -7,6 +7,28 @@ #include "types.h" #include "utils/mathutils.h" +static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *userdata) +{ + (void) stream_id; + (void) error_code; + + http2_server_con_state_t *con = (http2_server_con_state_t *) userdata; + tunnel_t *self = con->tunnel; + http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); + if (! stream) + { + return 0; + } + context_t *fc = newFinContext(stream->line); + CSTATE_DROP(fc); + tunnel_t *dest = stream->tunnel->up; + removeStream(con, stream); + deleteHttp2Stream(stream); + dest->upStream(dest, fc); + + return 0; +} + static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *_name, size_t namelen, const uint8_t *_value, size_t valuelen, uint8_t flags, void *userdata) { @@ -54,7 +76,7 @@ static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame { con->content_type = httpContentTypeEnum(value); } - else if (strcmp(name, "grpc-ack") == 0) + else if (strcmp(name, "custom-ack") == 0) { http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (stream) @@ -106,22 +128,23 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 shiftl(buf, lCap(buf) / 2); // use some unused space setLen(buf, len); writeRaw(buf, data, len); - bufferStreamPush(stream->chunkbs, buf); + bufferStreamPush(stream->grpc_buffer_stream, buf); while (true) { - if (stream->bytes_needed == 0 && bufferStreamLen(stream->chunkbs) >= GRPC_MESSAGE_HDLEN) + if (stream->grpc_bytes_needed == 0 && bufferStreamLen(stream->grpc_buffer_stream) >= GRPC_MESSAGE_HDLEN) { - shift_buffer_t *gheader_buf = bufferStreamRead(stream->chunkbs, GRPC_MESSAGE_HDLEN); + shift_buffer_t *gheader_buf = bufferStreamRead(stream->grpc_buffer_stream, GRPC_MESSAGE_HDLEN); grpc_message_hd msghd; grpcMessageHdUnpack(&msghd, rawBuf(gheader_buf)); - stream->bytes_needed = msghd.length; + stream->grpc_bytes_needed = msghd.length; reuseBuffer(getLineBufferPool(con->line), gheader_buf); } - if (stream->bytes_needed > 0 && bufferStreamLen(stream->chunkbs) >= stream->bytes_needed) + if (stream->grpc_bytes_needed > 0 && + bufferStreamLen(stream->grpc_buffer_stream) >= stream->grpc_bytes_needed) { - shift_buffer_t *gdata_buf = bufferStreamRead(stream->chunkbs, stream->bytes_needed); - stream->bytes_needed = 0; + shift_buffer_t *gdata_buf = bufferStreamRead(stream->grpc_buffer_stream, stream->grpc_bytes_needed); + stream->grpc_bytes_needed = 0; context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; if (! stream->first_sent) @@ -129,23 +152,40 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->first_sent = true; stream_data->first = true; } - stream->bytes_received_nack += bufLen(buf); - if (stream->bytes_received_nack >= kMaxRecvBeforeAck) - { - nghttp2_nv nvs[1]; - char stri[32]; - sprintf(stri, "%d", (int) stream->bytes_received_nack); - nvs[0] = makeNV("grpc-ack", stri); - nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); - stream->bytes_received_nack = 0; - } - stream->tunnel->up->upStream(stream->tunnel->up, stream_data); if (nghttp2_session_get_stream_user_data(session, stream_id)) { continue; } + + stream->bytes_received_nack += bufLen(gdata_buf); + if (stream->bytes_received_nack >= kMaxRecvBeforeAck) + { + line_t* line = con->line; + shift_buffer_t *flow_ctl_msg_buf = popBuffer(getThreadBufferPool(line->tid)); + setLen(flow_ctl_msg_buf, 32); + int string_length = + sprintf((char *) rawBufMut(flow_ctl_msg_buf), "%d", (int) stream->bytes_received_nack); + ; + setLen(flow_ctl_msg_buf, string_length); + stream->bytes_received_nack = 0; + + http2_frame_hd framehd; + framehd.length = bufLen(flow_ctl_msg_buf); + framehd.type = kHttP2Data; + framehd.flags = flags; + framehd.stream_id = stream_id; + shiftl(flow_ctl_msg_buf, HTTP2_FRAME_HDLEN); + http2FrameHdPack(&framehd, rawBufMut(flow_ctl_msg_buf)); + context_t *flow_ctl_response_data = newContext(line); + flow_ctl_response_data->payload = buf; + con->tunnel->dw->downStream(con->tunnel->dw, flow_ctl_response_data); + if (! isAlive(line)) + { + return 0; + } + } } break; } @@ -166,11 +206,12 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->bytes_received_nack += bufLen(buf); if (stream->bytes_received_nack >= kMaxRecvBeforeAck) { + nghttp2_nv nvs[1]; char stri[32]; sprintf(stri, "%d", (int) stream->bytes_received_nack); - nvs[0] = makeNV("grpc-ack", stri); - nghttp2_submit_headers(con->session, NGHTTP2_FLAG_END_HEADERS, stream_id, NULL, &nvs[0], 1, NULL); + nvs[0] = makeNV("custom-ack", stri); + nghttp2_submit_trailer(con->session, stream_id, &nvs[0], 1); stream->bytes_received_nack = 0; } @@ -182,14 +223,14 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *frame, void *userdata) { + (void) session; if (WW_UNLIKELY(userdata == NULL)) { return 0; } // LOGD("onFrameRecvCallback\n"); printFrameHd(&frame->hd); - http2_server_con_state_t *con = (http2_server_con_state_t *) userdata; - tunnel_t *self = con->tunnel; + http2_server_con_state_t *con = (http2_server_con_state_t *) userdata; switch (frame->hd.type) { @@ -213,34 +254,14 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr default: break; } - // if (con->state == H2_RECV_HEADERS && con->parsed->http_cb) - // { - // con->parsed->http_cb(con->parsed, HP_HEADERS_COMPLETE, NULL, 0); - // } - - if ((frame->hd.flags & kHttP2FlagEndStream) == kHttP2FlagEndStream) - { - http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (! stream) - { - return 0; - } - resumeLineDownSide(stream->parent); - nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); - context_t *fc = newFinContext(stream->line); - tunnel_t *dest = stream->tunnel->up; - CSTATE_DROP(fc); - removeStream(con, stream); - deleteHttp2Stream(stream); - dest->upStream(dest, fc); - return 0; - } if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) { return 0; } + tunnel_t *self = con->tunnel; + nghttp2_nv nvs[10]; int nvlen = 0; nvs[nvlen++] = makeNV(":status", "200"); @@ -392,7 +413,6 @@ static void upStream(tunnel_t *self, context_t *c) } if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) { - assert(false); context_t *fin_ctx = newFinContext(con->line); deleteHttp2Connection(con); self->dw->downStream(self->dw, fin_ctx); @@ -459,7 +479,6 @@ static void downStream(tunnel_t *self, context_t *c) nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL); } - resumeLineDownSide(con->line); nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); removeStream(con, stream); deleteHttp2Stream(stream); @@ -482,16 +501,16 @@ static void downStream(tunnel_t *self, context_t *c) } unLockLine(con->line); - if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) - { - context_t *fin_ctx = newFinContext(con->line); - deleteHttp2Connection(con); - self->dw->downStream(self->dw, fin_ctx); - } + // if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) + // { + // context_t *fin_ctx = newFinContext(con->line); + // deleteHttp2Connection(con); + // self->dw->downStream(self->dw, fin_ctx); + // } destroyContext(c); - return; } + else { destroyContext(c); } @@ -508,6 +527,7 @@ tunnel_t *newHttp2Server(node_instance_context_t *instance_info) nghttp2_session_callbacks_set_on_header_callback(state->cbs, onHeaderCallback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback(state->cbs, onDataChunkRecvCallback); nghttp2_session_callbacks_set_on_frame_recv_callback(state->cbs, onFrameRecvCallback); + nghttp2_session_callbacks_set_on_stream_close_callback(state->cbs, onStreamClosedCallback); nghttp2_option_new(&(state->ngoptions)); nghttp2_option_set_peer_max_concurrent_streams(state->ngoptions, kMaxConcurrentStreams);