From c7023958674bc0ec1a7b0ba1f0c708e29ad164b5 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Sun, 31 Mar 2024 09:06:17 +0300 Subject: [PATCH] fix painful http2 handling bugs --- tunnels/client/http2/CMakeLists.txt | 1 + tunnels/client/http2/helpers.h | 13 +- tunnels/client/http2/http2_client.c | 200 ++++++++++++++++++---------- tunnels/client/http2/types.h | 11 +- tunnels/server/http2/CMakeLists.txt | 2 + tunnels/server/http2/helpers.h | 17 ++- tunnels/server/http2/http2_server.c | 170 +++++++++++++++-------- tunnels/server/http2/types.h | 4 + 8 files changed, 277 insertions(+), 141 deletions(-) diff --git a/tunnels/client/http2/CMakeLists.txt b/tunnels/client/http2/CMakeLists.txt index 442fa73e..306ebc87 100644 --- a/tunnels/client/http2/CMakeLists.txt +++ b/tunnels/client/http2/CMakeLists.txt @@ -33,6 +33,7 @@ CPMAddPackage( OPTIONS "BUILD_STATIC_LIBS ON" "BUILD_TESTING OFF" + # "ENABLE_DEBUG ON" ) diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index fd299634..06a26abd 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -3,15 +3,13 @@ #include "types.h" #define MAX_CONCURRENT_STREAMS 0xffffffffu -#define MAX_CHILD_PER_STREAM 200 - +#define MAX_CHILD_PER_STREAM 3 #define STATE(x) ((http2_client_state_t *)((x)->state)) #define CSTATE(x) ((void *)((((x)->line->chains_state)[self->chain_index]))) #define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index] #define ISALIVE(x) (CSTATE(x) != NULL) - static nghttp2_nv make_nv(const char *name, const char *value) { nghttp2_nv nv; @@ -120,6 +118,8 @@ create_http2_stream(http2_client_con_state_t *con, line_t *child_line, hio_t *io } static void delete_http2_stream(http2_client_child_con_state_t *stream) { + if(stream->temp_buf != NULL) + reuseBuffer(buffer_pools[stream->line->tid],stream->temp_buf); stream->line->chains_state[stream->tunnel->chain_index + 1] = NULL; free(stream); } @@ -168,8 +168,10 @@ static void delete_http2_connection(http2_client_con_state_t *con) { http2_client_child_con_state_t *next = stream_i->next; context_t *fin_ctx = newFinContext(stream_i->line); + tunnel_t *dest = stream_i->tunnel; delete_http2_stream(stream_i); - stream_i->tunnel->downStream(stream_i->tunnel, fin_ctx); + CSTATE_MUT(fin_ctx) = NULL; + dest->downStream(dest, fin_ctx); stream_i = next; } nghttp2_session_del(con->session); @@ -192,9 +194,8 @@ static http2_client_con_state_t *take_http2_connection(tunnel_t *self, int tid, { if ((*k.ref)->childs_added < MAX_CHILD_PER_STREAM) { - (*k.ref)->childs_added +=1; + (*k.ref)->childs_added += 1; return (*k.ref); - } } vec_cons_pop(vector); diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 3a9fdf27..e4a83d2b 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -2,10 +2,14 @@ #include "types.h" #include "helpers.h" +#define MAX_CHUNK_SIZE 8100 + static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id) { http2_frame_hd framehd; shift_buffer_t *buf = popBuffer(buffer_pools[line->tid]); + setLen(buf, HTTP2_FRAME_HDLEN); + framehd.length = 0; framehd.type = HTTP2_DATA; framehd.flags = HTTP2_FLAG_END_STREAM; @@ -25,7 +29,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t size_t len; len = nghttp2_session_mem_send(con->session, (const uint8_t **)&data); // LOGD("nghttp2_session_mem_send %d\n", len); - if (len != 0) + if (len > 0) { shift_buffer_t *send_buf = popBuffer(buffer_pools[line->tid]); shiftl(send_buf, lCap(send_buf) / 1.25); // use some unused space @@ -41,22 +45,23 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t } self->up->upStream(self->up, req); - if (nghttp2_session_want_read(con->session) == 0 && - nghttp2_session_want_write(con->session) == 0) - { - if (buf != NULL) - { - reuseBuffer(buffer_pools[line->tid], buf); - buf = NULL; - } - context_t *fin_ctx = newFinContext(line); - delete_http2_connection(con); - con->tunnel->up->upStream(con->tunnel->up, fin_ctx); - return false; - } + // if (nghttp2_session_want_read(con->session) == 0 && + // nghttp2_session_want_write(con->session) == 0) + // { + // if (buf != NULL) + // { + // reuseBuffer(buffer_pools[line->tid], buf); + // buf = NULL; + // } + // context_t *fin_ctx = newFinContext(line); + // delete_http2_connection(con); + // con->tunnel->up->upStream(con->tunnel->up, fin_ctx); + // return false; + // } return true; } + assert(len >= 0); if (buf == NULL || bufLen(buf) <= 0) return false; @@ -65,7 +70,8 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t if (con->state == H2_SEND_HEADERS) { - http2_flag flags = HTTP2_FLAG_END_STREAM; + // http2_flag flags = HTTP2_FLAG_END_STREAM; + http2_flag flags = HTTP2_FLAG_NONE; // HTTP2 DATA framehd con->state = H2_SEND_DATA; @@ -112,7 +118,7 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t static void flush_write_queue(http2_client_con_state_t *con) { tunnel_t *self = con->tunnel; - context_t *g = newContext(con->line); + context_t *g = newContext(con->line); // keep the line alive while (contextQueueLen(con->queue) > 0) { context_t *stream_context = contextQueuePop(con->queue); @@ -122,10 +128,13 @@ static void flush_write_queue(http2_client_con_state_t *con) // consumes payload while (trySendRequest(self, con, stream->stream_id, stream->io, stream_context->payload)) - ; - - if (!ISALIVE(g)) - break; + { + if (!ISALIVE(g)) + { + destroyContext(g); + return; + } + } } destroyContext(g); } @@ -209,38 +218,67 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, return 0; // LOGD("on_data_chunk_recv_callback\n"); // LOGD("stream_id=%d length=%d\n", stream_id, (int)len); - // LOGD("%.*s\n", (int)len, data); + // LOGD("down: %d\n", (int)len); if (con->content_type == APPLICATION_GRPC) { - // grpc_message_hd - if (len >= GRPC_MESSAGE_HDLEN) + + if (stream->temp_buf == NULL) { + // grpc_message_hd + if (len < GRPC_MESSAGE_HDLEN) + return 0; + grpc_message_hd msghd; grpc_message_hd_unpack(&msghd, data); - LOGD("grpc_message_hd: flags=%d length=%d\n", msghd.flags, msghd.length); + // LOGD("grpc_message_hd: flags=%d length=%d\n", msghd.flags, msghd.length); data += GRPC_MESSAGE_HDLEN; len -= GRPC_MESSAGE_HDLEN; // LOGD("%.*s\n", (int)len, data); + + shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); + shiftl(buf, lCap(buf) / 1.25); // use some unused space + setLen(buf, msghd.length); + memcpy(rawBuf(buf), data, len); + + if (msghd.length > len) + { + stream->temp_buf = buf; + stream->bytes_needed = msghd.length - len; + return 0; + } + context_t *stream_data = newContext(stream->line); + stream_data->payload = buf; + stream_data->src_io = con->io; + stream->tunnel->downStream(stream->tunnel, stream_data); + } + else + { + memcpy(rawBuf(stream->temp_buf) + (bufLen(stream->temp_buf) - stream->bytes_needed), data, len); + stream->bytes_needed -= len; + if (stream->bytes_needed == 0) + { + context_t *stream_data = newContext(stream->line); + stream_data->payload = stream->temp_buf; + stream_data->src_io = con->io; + stream->temp_buf = NULL; + + stream->tunnel->downStream(stream->tunnel, stream_data); + } } } + else + { + shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); + shiftl(buf, lCap(buf) / 1.25); // use some unused space + setLen(buf, len); + memcpy(rawBuf(buf), data, len); + context_t *stream_data = newContext(stream->line); + stream_data->payload = buf; + stream_data->src_io = con->io; + stream->tunnel->downStream(stream->tunnel, stream_data); + } - shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); - shiftl(buf, lCap(buf) / 1.25); // use some unused space - setLen(buf, len); - memcpy(rawBuf(buf), data, len); - context_t *dw_data = newContext(stream->line); - dw_data->payload = buf; - dw_data->src_io = con->io; - stream->tunnel->downStream(stream->tunnel, dw_data); - // if (hp->parsed->http_cb) - // { - // hp->parsed->http_cb(hp->parsed, HP_BODY, (const char *)data, len); - // } - // else - // { - // hp->parsed->body.append((const char *)data, len); - // } return 0; } @@ -276,6 +314,19 @@ static int on_frame_recv_callback(nghttp2_session *session, default: break; } + if ((frame->hd.flags & HTTP2_FLAG_END_STREAM) == HTTP2_FLAG_END_STREAM) + { + http2_client_child_con_state_t *stream = + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (!stream) + return 0; + nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); + context_t *fc = newFinContext(stream->line); + tunnel_t *dest = stream->tunnel; + remove_stream(con, stream); + delete_http2_stream(stream); + dest->downStream(dest, fc); + } if ((frame->hd.type & NGHTTP2_HEADERS) == NGHTTP2_HEADERS) { @@ -289,10 +340,14 @@ static int on_frame_recv_callback(nghttp2_session *session, else if ((frame->hd.flags & HTTP2_FLAG_END_STREAM) == HTTP2_FLAG_END_STREAM) { http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(con->session, frame->hd.stream_id); + if (stream == NULL) + return 0; context_t *fin_ctx = newFinContext(stream->line); + tunnel_t *dest = stream->tunnel; remove_stream(con, stream); delete_http2_stream(stream); - stream->tunnel->downStream(stream->tunnel, fin_ctx); + CSTATE_MUT(fin_ctx) = NULL; + dest->downStream(dest, fin_ctx); } } @@ -326,42 +381,34 @@ static inline void upStream(tunnel_t *self, context_t *c) if (c->init) { http2_client_con_state_t *con = take_http2_connection(self, c->line->tid, NULL); - context_t *g = newContext(con->line); + http2_client_child_con_state_t *stream = create_http2_stream(con, c->line, c->src_io); + CSTATE_MUT(c) = stream; if (!con->init_sent) { con->init_sent = true; self->up->upStream(self->up, newInitContext(con->line)); - if (!ISALIVE(g)) + if (!ISALIVE(c)) { - destroyContext(g); destroyContext(c); return; } } while (trySendRequest(self, con, 0, NULL, NULL)) - ; - - if (!ISALIVE(g)) - { - destroyContext(g); - destroyContext(c); - return; - } - http2_client_child_con_state_t *stream = create_http2_stream(con, c->line, c->src_io); - CSTATE_MUT(c) = stream; - destroyContext(g); + if (!ISALIVE(c)) + { + destroyContext(c); + return; + } - if (!ISALIVE(c)) + if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c)) { - delete_http2_stream(stream); - destroyContext(c); - return; + context_t *fin_ctx = newFinContext(con->line); + delete_http2_connection(con); + con->tunnel->up->upStream(con->tunnel->up, fin_ctx); } - while (trySendRequest(self, con, stream->stream_id, NULL, NULL)) - ; destroyContext(c); } else if (c->fin) @@ -376,9 +423,8 @@ static inline void upStream(tunnel_t *self, context_t *c) nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); remove_stream(con, stream); delete_http2_stream(stream); - - if (nghttp2_session_want_read(con->session) == 0 && - nghttp2_session_want_write(con->session) == 0) + CSTATE_MUT(c) = NULL; + if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c)) { context_t *fin_ctx = newFinContext(con->line); delete_http2_connection(con); @@ -403,20 +449,34 @@ static inline void downStream(tunnel_t *self, context_t *c) con->state = H2_WANT_RECV; size_t len = bufLen(c->payload); size_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *)rawBuf(c->payload), len); + assert(ret == len); DISCARD_CONTEXT(c); + while (trySendRequest(self, con, 0, NULL, NULL)) + if (!ISALIVE(c)) + { + destroyContext(c); + return; + } + + // if (con->childs_added >= MAX_CHILD_PER_STREAM && con->root.next == NULL && ISALIVE(c)) + // { + // context_t *fin_ctx = newFinContext(con->line); + // delete_http2_connection(con); + // con->tunnel->up->upStream(con->tunnel->up, fin_ctx); + // } + if (!ISALIVE(c)) { destroyContext(c); return; } - if (ret != len) - { - context_t *fin_ctx = newFinContext(con->line); - delete_http2_connection(con); - con->tunnel->upStream(con->tunnel, fin_ctx); - } + // { + // context_t *fin_ctx = newFinContext(con->line); + // delete_http2_connection(con); + // con->tunnel->upStream(con->tunnel, fin_ctx); + // } destroyContext(c); } else diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index 71f92680..bac2132f 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -31,10 +31,13 @@ typedef struct http2_client_child_con_state_s struct http2_client_child_con_state_s *prev, *next; int32_t stream_id; nghttp2_stream *ng_stream; + + shift_buffer_t *temp_buf; + size_t bytes_needed; tunnel_t *tunnel; line_t *parent; line_t *line; - hio_t* io; + hio_t *io; } http2_client_child_con_state_t; @@ -44,7 +47,7 @@ typedef struct http2_client_con_state_s nghttp2_session *session; http2_session_state state; context_queue_t *queue; - size_t childs_added; + size_t childs_added; int error; int frame_type_when_stream_closed; bool handshake_completed; @@ -60,7 +63,7 @@ typedef struct http2_client_con_state_s tunnel_t *tunnel; line_t *line; - hio_t* io; + hio_t *io; http2_client_child_con_state_t root; } http2_client_con_state_t; @@ -86,6 +89,6 @@ typedef struct http2_client_state_s int host_port; char *scheme; int last_iid; - nghttp2_option * ngoptions; + nghttp2_option *ngoptions; thread_connection_pool_t thread_cpool[]; } http2_client_state_t; diff --git a/tunnels/server/http2/CMakeLists.txt b/tunnels/server/http2/CMakeLists.txt index 1598b531..d156658a 100644 --- a/tunnels/server/http2/CMakeLists.txt +++ b/tunnels/server/http2/CMakeLists.txt @@ -33,6 +33,8 @@ CPMAddPackage( OPTIONS "BUILD_STATIC_LIBS ON" "BUILD_TESTING OFF" + # "ENABLE_DEBUG ON " + ) diff --git a/tunnels/server/http2/helpers.h b/tunnels/server/http2/helpers.h index 44aeeaf3..2c2335ac 100644 --- a/tunnels/server/http2/helpers.h +++ b/tunnels/server/http2/helpers.h @@ -72,6 +72,8 @@ create_http2_stream(http2_server_con_state_t *con, line_t *this_line, tunnel_t * static void delete_http2_stream(http2_server_child_con_state_t *stream) { + if(stream->temp_buf != NULL) + reuseBuffer(buffer_pools[stream->line->tid],stream->temp_buf); stream->line->chains_state[stream->tunnel->chain_index - 1] = NULL; destroyLine(stream->line); if (stream->request_path) @@ -79,16 +81,16 @@ static void delete_http2_stream(http2_server_child_con_state_t *stream) free(stream); } -static http2_server_con_state_t *create_http2_connection(tunnel_t *self, line_t*line,hio_t* io) +static http2_server_con_state_t *create_http2_connection(tunnel_t *self, line_t *line, hio_t *io) { http2_server_state_t *state = STATE(self); - http2_server_con_state_t *con = malloc(sizeof(http2_server_con_state_t)); + http2_server_con_state_t *con = malloc(sizeof(http2_server_con_state_t)); memset(con, 0, sizeof(http2_server_con_state_t)); - nghttp2_session_server_new2(&con->session, state->cbs, con,state->ngoptions); + nghttp2_session_server_new2(&con->session, state->cbs, con, state->ngoptions); con->state = H2_WANT_RECV; con->tunnel = self; - con->line = line; + con->line = line; con->io = io; nghttp2_settings_entry settings[] = { @@ -104,14 +106,17 @@ static void delete_http2_connection(http2_server_con_state_t *con) for (stream_i = con->root.next; stream_i;) { context_t *fin_ctx = newFinContext(stream_i->line); - http2_server_child_con_state_t *next = stream_i->next; + tunnel_t *dest = stream_i->tunnel; delete_http2_stream(stream_i); - stream_i->tunnel->upStream(stream_i->tunnel, fin_ctx); + CSTATE_MUT(fin_ctx) = NULL; + dest->upStream(dest, fin_ctx); stream_i = next; } nghttp2_session_del(con->session); + con->line->chains_state[self->chain_index] = NULL; free(con); } +static inline size_t min(size_t x, size_t y) { return (((x) < (y)) ? (x) : (y)); } diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index fc2f5af5..4618e938 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -93,43 +93,79 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, // LOGD("on_data_chunk_recv_callback\n"); // LOGD("stream_id=%d length=%d\n", stream_id, (int)len); - // //LOGD("%.*s\n", (int)len, data); + // LOGD("up: %d\n", (int)len); if (con->content_type == APPLICATION_GRPC) { - // grpc_message_hd - if (len >= GRPC_MESSAGE_HDLEN) + + if (stream->temp_buf == NULL) { + // grpc_message_hd + if (len < GRPC_MESSAGE_HDLEN) + return 0; + grpc_message_hd msghd; grpc_message_hd_unpack(&msghd, data); // LOGD("grpc_message_hd: flags=%d length=%d\n", msghd.flags, msghd.length); data += GRPC_MESSAGE_HDLEN; len -= GRPC_MESSAGE_HDLEN; - // //LOGD("%.*s\n", (int)len, data); + // LOGD("%.*s\n", (int)len, data); + + shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); + shiftl(buf, lCap(buf) / 1.25); // use some unused space + setLen(buf, msghd.length); + memcpy(rawBuf(buf), data, len); + + if (msghd.length > len) + { + stream->temp_buf = buf; + stream->bytes_needed = msghd.length - len; + return 0; + } + context_t *stream_data = newContext(stream->line); + stream_data->payload = buf; + stream_data->src_io = con->io; + if (!stream->first_sent) + { + stream->first_sent = true; + stream_data->first = true; + } + stream->tunnel->upStream(stream->tunnel, stream_data); + } + else + { + memcpy(rawBuf(stream->temp_buf) + (bufLen(stream->temp_buf) - stream->bytes_needed), data, len); + stream->bytes_needed -= len; + assert(stream->bytes_needed == 0); + context_t *stream_data = newContext(stream->line); + stream_data->payload = stream->temp_buf; + stream_data->src_io = con->io; + if (!stream->first_sent) + { + stream->first_sent = true; + stream_data->first = true; + } + stream->temp_buf = NULL; + stream->tunnel->upStream(stream->tunnel, stream_data); } } - - shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); - shiftl(buf, lCap(buf) / 1.25); // use some unused space - setLen(buf, len); - memcpy(rawBuf(buf), data, len); - context_t *stream_data = newContext(stream->line); - stream_data->payload = buf; - stream_data->src_io = con->io; - if (!stream->first_sent) + else { - stream->first_sent = true; - stream_data->first = true; + shift_buffer_t *buf = popBuffer(buffer_pools[con->line->tid]); + shiftl(buf, lCap(buf) / 1.25); // use some unused space + setLen(buf, len); + memcpy(rawBuf(buf), data, len); + context_t *stream_data = newContext(stream->line); + stream_data->payload = buf; + stream_data->src_io = con->io; + if (!stream->first_sent) + { + stream->first_sent = true; + stream_data->first = true; + } + stream->tunnel->upStream(stream->tunnel, stream_data); } - stream->tunnel->upStream(stream->tunnel, stream_data); - // if (hp->parsed->http_cb) - // { - // hp->parsed->http_cb(hp->parsed, HP_BODY, (const char *)data, len); - // } - // else - // { - // hp->parsed->body.append((const char *)data, len); - // } + return 0; } @@ -170,6 +206,21 @@ static int on_frame_recv_callback(nghttp2_session *session, // con->parsed->http_cb(con->parsed, HP_HEADERS_COMPLETE, NULL, 0); // } + if ((frame->hd.flags & HTTP2_FLAG_END_STREAM) == HTTP2_FLAG_END_STREAM) + { + http2_server_child_con_state_t *stream = + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (!stream) + return 0; + nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); + context_t *fc = newFinContext(stream->line); + tunnel_t *dest = stream->tunnel; + remove_stream(con, stream); + delete_http2_stream(stream); + dest->upStream(dest, fc); + + } + if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) { @@ -193,7 +244,6 @@ static int on_frame_recv_callback(nghttp2_session *session, http2_server_child_con_state_t *stream = create_http2_stream(con, con->line, self->up, frame->hd.stream_id); add_stream(con, stream); - stream->tunnel->upStream(stream->tunnel, newInitContext(stream->line)); return 0; @@ -210,7 +260,7 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ size_t len; len = nghttp2_session_mem_send(con->session, (const uint8_t **)&data); // LOGD("nghttp2_session_mem_send %d\n", len); - if (len != 0) + if (len > 0) { shift_buffer_t *send_buf = popBuffer(buffer_pools[line->tid]); shiftl(send_buf, lCap(send_buf) / 1.25); // use some unused space @@ -221,22 +271,21 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ response_data->src_io = stream_io; self->dw->downStream(self->dw, response_data); - if (nghttp2_session_want_read(con->session) == 0 && - nghttp2_session_want_write(con->session) == 0) - { - if (buf != NULL) - { - reuseBuffer(buffer_pools[line->tid], buf); - } - context_t *fin_ctx = newFinContext(line); - delete_http2_connection(con); - self->dw->downStream(self->dw, fin_ctx); - return false; - } + // if (nghttp2_session_want_read(con->session) == 0 && + // nghttp2_session_want_write(con->session) == 0) + // { + // if (buf != NULL) + // { + // reuseBuffer(buffer_pools[line->tid], buf); + // } + // context_t *fin_ctx = newFinContext(line); + // delete_http2_connection(con); + // self->dw->downStream(self->dw, fin_ctx); + // return false; + // } return true; } - if (buf == NULL || bufLen(buf) <= 0) return false; @@ -244,7 +293,8 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ if (con->state == H2_SEND_HEADERS) { - http2_flag flags = HTTP2_FLAG_END_STREAM; + // http2_flag flags = HTTP2_FLAG_END_STREAM; + http2_flag flags = HTTP2_FLAG_NONE; // HTTP2 DATA framehd con->state = H2_SEND_DATA; @@ -264,8 +314,8 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ grpc_message_hd_pack(&msghd, rawBuf(buf)); } - http2_frame_hd framehd; + http2_frame_hd framehd; framehd.length = bufLen(buf); framehd.type = HTTP2_DATA; framehd.flags = flags; @@ -298,7 +348,7 @@ static inline void upStream(tunnel_t *self, context_t *c) con->io = c->src_io; con->state = H2_WANT_RECV; size_t len = bufLen(c->payload); - size_t ret = nghttp2_session_mem_recv(con->session, (const uint8_t *)rawBuf(c->payload), len); + size_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *)rawBuf(c->payload), len); DISCARD_CONTEXT(c); if (!ISALIVE(c)) @@ -307,19 +357,23 @@ static inline void upStream(tunnel_t *self, context_t *c) return; } - if (ret != len) - { - // TODO not http2 -> fallback - context_t *fin_ctx = newFinContext(con->line); - delete_http2_connection(CSTATE(c)); - destroyContext(c); - self->dw->downStream(self->dw, fin_ctx); + assert(ret == len); + // { + // // TODO not http2 -> fallback + // context_t *fin_ctx = newFinContext(con->line); + // delete_http2_connection(CSTATE(c)); + // destroyContext(c); + // self->dw->downStream(self->dw, fin_ctx); - return; - } + // return; + // } while (trySendResponse(self, con, 0, NULL, NULL)) - ; + if (!ISALIVE(c)) + { + destroyContext(c); + return; + } destroyContext(c); } else @@ -327,6 +381,8 @@ static inline void upStream(tunnel_t *self, context_t *c) if (c->init) { CSTATE_MUT(c) = create_http2_connection(self, c->line, c->src_io); + self->dw->downStream(self->dw, newEstContext(c->line)); + destroyContext(c); } else if (c->fin) @@ -347,7 +403,11 @@ static inline void downStream(tunnel_t *self, context_t *c) { con->state = H2_SEND_HEADERS; while (trySendResponse(self, con, stream->stream_id, stream->io, c->payload)) - ; + if (!ISALIVE(c)) + { + destroyContext(c); + return; + } c->payload = NULL; destroyContext(c); } @@ -380,8 +440,8 @@ static inline void downStream(tunnel_t *self, context_t *c) destroyContext(c); return; } - - self->dw->downStream(self->dw, switchLine(c, stream->parent)); + else + destroyContext(c); } } diff --git a/tunnels/server/http2/types.h b/tunnels/server/http2/types.h index a77c7fd7..9186c01c 100644 --- a/tunnels/server/http2/types.h +++ b/tunnels/server/http2/types.h @@ -38,6 +38,9 @@ typedef struct http2_server_child_con_state_s char *request_path; int32_t stream_id; bool first_sent; + + shift_buffer_t* temp_buf; + size_t bytes_needed; line_t *parent; line_t *line; hio_t* io; @@ -53,6 +56,7 @@ typedef struct http2_server_con_state_s int frame_type_when_stream_closed; enum http_content_type content_type; + tunnel_t *tunnel; line_t *line; hio_t* io;