diff --git a/tunnels/adapters/connector/tcp/tcp_connector.c b/tunnels/adapters/connector/tcp/tcp_connector.c index c87ed97a..4fe162f6 100644 --- a/tunnels/adapters/connector/tcp/tcp_connector.c +++ b/tunnels/adapters/connector/tcp/tcp_connector.c @@ -3,6 +3,7 @@ #include "hsocket.h" #include "loggers/network_logger.h" #include "sync_dns.h" +#include "tunnel.h" #include "types.h" #include "utils/jsonutils.h" #include "utils/sockutils.h" @@ -12,18 +13,14 @@ static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue) if (cstate->io) { hevent_set_userdata(cstate->io, NULL); + hio_close(cstate->io); } - hio_t *last_resumed_io = NULL; - while (contextQueueLen(cstate->data_queue) > 0) { + // all data must be written before sending fin, event loop will hold them for us context_t *cw = contextQueuePop(cstate->data_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } + if (write_queue) { hio_write(cstate->io, cw->payload); @@ -36,54 +33,28 @@ static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue) destroyContext(cw); } - while (contextQueueLen(cstate->finished_queue) > 0) - { - context_t *cw = contextQueuePop(cstate->finished_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } - - destroyContext(cw); - } - destroyContextQueue(cstate->data_queue); - destroyContextQueue(cstate->finished_queue); + doneLineUpSide(cstate->line); free(cstate); } static bool resumeWriteQueue(tcp_connector_con_state_t *cstate) { - context_queue_t *data_queue = (cstate)->data_queue; - context_queue_t *finished_queue = (cstate)->finished_queue; - hio_t *io = cstate->io; + context_queue_t *data_queue = (cstate)->data_queue; + hio_t *io = cstate->io; while (contextQueueLen(data_queue) > 0) { context_t *cw = contextQueuePop(data_queue); unsigned int bytes = bufLen(cw->payload); int nwrite = hio_write(io, cw->payload); cw->payload = NULL; - contextQueuePush(cstate->finished_queue, cw); + destroyContext(cw); if (nwrite >= 0 && nwrite < bytes) { return false; // write pending } } - // data data_queue is empty - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) - { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); - } return true; } static void onWriteComplete(hio_t *io) @@ -97,35 +68,17 @@ static void onWriteComplete(hio_t *io) if (hio_write_is_complete(io)) { - hio_setcb_write(cstate->io, NULL); - cstate->write_paused = false; - context_queue_t *data_queue = cstate->data_queue; - context_queue_t *finished_queue = cstate->finished_queue; - if (contextQueueLen(data_queue) > 0) - { - if (! resumeWriteQueue(cstate)) - { - hio_setcb_write(cstate->io, onWriteComplete); - cstate->write_paused = true; - return; - } - } - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) + context_queue_t *data_queue = cstate->data_queue; + if (contextQueueLen(data_queue) > 0 && ! resumeWriteQueue(cstate)) { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); + return; } + hio_setcb_write(cstate->io, NULL); + cstate->write_paused = false; + resumeLineDownSide(cstate->line); } } - static void onRecv(hio_t *io, shift_buffer_t *buf) { tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *) (hevent_userdata(io)); @@ -139,7 +92,6 @@ static void onRecv(hio_t *io, shift_buffer_t *buf) line_t *line = (cstate)->line; context_t *context = newContext(line); - context->src_io = io; context->payload = payload; self->downStream(self, context); } @@ -163,6 +115,15 @@ static void onClose(hio_t *io) self->downStream(self, context); } } +static void onLinePaused(void *cstate) +{ + hio_read_stop(((tcp_connector_con_state_t *) cstate)->io); +} + +static void onLineResumed(void *cstate) +{ + hio_read(((tcp_connector_con_state_t *) cstate)->io); +} static void onOutBoundConnected(hio_t *upstream_io) { @@ -190,10 +151,8 @@ static void onOutBoundConnected(hio_t *upstream_io) LOGD("TcpConnector: connection succeed FD:%x [%s] => [%s]", (int) hio_fd(upstream_io), SOCKADDR_STR(hio_localaddr(upstream_io), localaddrstr), SOCKADDR_STR(hio_peeraddr(upstream_io), peeraddrstr)); - context_t *est_context = newContext(line); - est_context->est = true; - est_context->src_io = upstream_io; - self->downStream(self, est_context); + setupLineUpSide(line, onLinePaused, cstate, onLineResumed); + self->downStream(self, newEstContext(line)); } static void upStream(tunnel_t *self, context_t *c) @@ -204,10 +163,7 @@ static void upStream(tunnel_t *self, context_t *c) { if (cstate->write_paused) { - if (c->src_io) - { - hio_read_stop(c->src_io); - } + pauseLineDownSide(c->line); contextQueuePush(cstate->data_queue, c); } else @@ -215,41 +171,34 @@ static void upStream(tunnel_t *self, context_t *c) unsigned int bytes = bufLen(c->payload); int nwrite = hio_write(cstate->io, c->payload); c->payload = NULL; + destroyContext(c); + if (nwrite >= 0 && nwrite < bytes) { - if (c->src_io) - { - hio_read_stop(c->src_io); - } - contextQueuePush(cstate->finished_queue, c); + pauseLineDownSide(c->line); cstate->write_paused = true; hio_setcb_write(cstate->io, onWriteComplete); } - else - { - destroyContext(c); - } } } else { if (c->init) { - tcp_connector_state_t *state = STATE(self); - CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t)); - memset(CSTATE(c), 0, sizeof(tcp_connector_con_state_t)); + tcp_connector_state_t *state = STATE(self); + CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t)); tcp_connector_con_state_t *cstate = CSTATE(c); + + *cstate = (tcp_connector_con_state_t){.buffer_pool = getContextBufferPool(c), + .tunnel = self, + .line = c->line, + .data_queue = newContextQueue(cstate->buffer_pool), + .write_paused = true}; + #ifdef PROFILE gettimeofday(&(cstate->__profile_conenct), NULL); #endif - cstate->buffer_pool = getContextBufferPool(c); - cstate->tunnel = self; - cstate->line = c->line; - cstate->data_queue = newContextQueue(cstate->buffer_pool); - cstate->finished_queue = newContextQueue(cstate->buffer_pool); - cstate->write_paused = true; - socket_context_t *dest_ctx = &(c->line->dest_ctx); socket_context_t *src_ctx = &(c->line->src_ctx); switch ((enum tcp_connector_dynamic_value_status) state->dest_addr_selected.status) @@ -320,7 +269,6 @@ static void upStream(tunnel_t *self, context_t *c) hio_set_peeraddr(upstream_io, &(dest_ctx->address.sa), (int) sockaddr_len(&(dest_ctx->address))); cstate->io = upstream_io; hevent_set_userdata(upstream_io, cstate); - hio_setcb_connect(upstream_io, onOutBoundConnected); hio_setcb_close(upstream_io, onClose); hio_connect(upstream_io); @@ -330,11 +278,8 @@ static void upStream(tunnel_t *self, context_t *c) { hio_t *io = cstate->io; CSTATE_MUT(c) = NULL; - contextQueueNotifyIoRemoved(cstate->data_queue, c->src_io); - contextQueueNotifyIoRemoved(cstate->finished_queue, c->src_io); cleanup(cstate, true); destroyContext(c); - hio_close(io); } } return; @@ -372,6 +317,7 @@ static void downStream(tunnel_t *self, context_t *c) if (resumeWriteQueue(cstate)) { cstate->write_paused = false; + resumeLineDownSide(cstate->line); } else { diff --git a/tunnels/adapters/connector/tcp/types.h b/tunnels/adapters/connector/tcp/types.h index 3053be9f..b24cb25f 100644 --- a/tunnels/adapters/connector/tcp/types.h +++ b/tunnels/adapters/connector/tcp/types.h @@ -31,14 +31,11 @@ typedef struct tcp_connector_con_state_s struct timeval __profile_conenct; #endif - tunnel_t *tunnel; - line_t * line; - hio_t * io; - - buffer_pool_t * buffer_pool; + tunnel_t *tunnel; + line_t *line; + hio_t *io; + buffer_pool_t *buffer_pool; context_queue_t *data_queue; - context_queue_t *finished_queue; - - bool write_paused; - bool established; + bool write_paused; + bool established; } tcp_connector_con_state_t; diff --git a/tunnels/adapters/connector/udp/udp_connector.c b/tunnels/adapters/connector/udp/udp_connector.c index 81abd2bc..41794e6c 100644 --- a/tunnels/adapters/connector/udp/udp_connector.c +++ b/tunnels/adapters/connector/udp/udp_connector.c @@ -37,7 +37,6 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf) cstate->established = true; context_t *est_context = newContext(line); est_context->est = true; - est_context->src_io = io; self->downStream(self, est_context); if (hevent_userdata(io) == NULL) { @@ -46,7 +45,6 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf) } context_t *context = newContext(line); - context->src_io = io; context->payload = payload; self->downStream(self, context); diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index ba38e35a..480b2c27 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -1,8 +1,10 @@ #include "tcp_listener.h" #include "buffer_pool.h" #include "hlog.h" +#include "hloop.h" #include "loggers/network_logger.h" #include "managers/socket_manager.h" +#include "tunnel.h" #include "utils/jsonutils.h" #include "utils/sockutils.h" #include @@ -31,7 +33,6 @@ typedef struct tcp_listener_con_state_s tunnel_t *tunnel; line_t *line; hio_t *io; - context_queue_t *finished_queue; context_queue_t *data_queue; buffer_pool_t *buffer_pool; bool write_paused; @@ -44,18 +45,14 @@ static void cleanup(tcp_listener_con_state_t *cstate, bool write_queue) if (cstate->io) { hevent_set_userdata(cstate->io, NULL); + hio_close(cstate->io); } - hio_t *last_resumed_io = NULL; - while (contextQueueLen(cstate->data_queue) > 0) { + // all data must be written before sending fin, event loop will hold them for us context_t *cw = contextQueuePop(cstate->data_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } + if (write_queue) { hio_write(cstate->io, cw->payload); @@ -68,53 +65,29 @@ static void cleanup(tcp_listener_con_state_t *cstate, bool write_queue) destroyContext(cw); } - while (contextQueueLen(cstate->finished_queue) > 0) - { - context_t *cw = contextQueuePop(cstate->finished_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } - - destroyContext(cw); - } - destroyContextQueue(cstate->data_queue); - destroyContextQueue(cstate->finished_queue); + doneLineDownSide(cstate->line); + destroyLine(cstate->line); free(cstate); } static bool resumeWriteQueue(tcp_listener_con_state_t *cstate) { - context_queue_t *data_queue = (cstate)->data_queue; - context_queue_t *finished_queue = (cstate)->finished_queue; - hio_t *io = cstate->io; + context_queue_t *data_queue = (cstate)->data_queue; + hio_t *io = cstate->io; while (contextQueueLen(data_queue) > 0) { context_t *cw = contextQueuePop(data_queue); unsigned int bytes = bufLen(cw->payload); int nwrite = hio_write(io, cw->payload); cw->payload = NULL; - contextQueuePush(cstate->finished_queue, cw); + destroyContext(cw); if (nwrite >= 0 && nwrite < bytes) { return false; // write pending } } - // data queue is empty - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) - { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); - } + return true; } @@ -129,36 +102,28 @@ static void onWriteComplete(hio_t *io) if (hio_write_is_complete(io)) { - hio_setcb_write(cstate->io, NULL); - cstate->write_paused = false; - - context_queue_t *data_queue = cstate->data_queue; - context_queue_t *finished_queue = cstate->finished_queue; - if (contextQueueLen(data_queue) > 0) - { - if (! resumeWriteQueue(cstate)) - { - hio_setcb_write(cstate->io, onWriteComplete); - cstate->write_paused = true; - return; - } - } - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) + context_queue_t *data_queue = cstate->data_queue; + if (contextQueueLen(data_queue) > 0 && ! resumeWriteQueue(cstate)) { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); + return; } + hio_setcb_write(cstate->io, NULL); + cstate->write_paused = false; + resumeLineUpSide(cstate->line); } } +static void onLinePaused(void *cstate) +{ + hio_read_stop(((tcp_listener_con_state_t *) cstate)->io); +} + +static void onLineResumed(void *cstate) +{ + hio_read(((tcp_listener_con_state_t *) cstate)->io); +} + static void upStream(tunnel_t *self, context_t *c) { if (c->payload != NULL) @@ -183,10 +148,8 @@ static void upStream(tunnel_t *self, context_t *c) if (c->fin) { - tcp_listener_con_state_t *cstate = CSTATE(c); - cleanup(cstate, false); + cleanup(CSTATE(c), false); CSTATE_MUT(c) = NULL; - destroyLine(c->line); } } @@ -201,10 +164,7 @@ static void downStream(tunnel_t *self, context_t *c) { if (cstate->write_paused) { - if (c->src_io) - { - hio_read_stop(c->src_io); - } + pauseLineUpSide(c->line); contextQueuePush(cstate->data_queue, c); } else @@ -212,20 +172,14 @@ static void downStream(tunnel_t *self, context_t *c) unsigned int bytes = bufLen(c->payload); int nwrite = hio_write(cstate->io, c->payload); c->payload = NULL; + destroyContext(c); + if (nwrite >= 0 && nwrite < bytes) { - if (c->src_io) - { - hio_read_stop(c->src_io); - } - contextQueuePush(cstate->finished_queue, c); + pauseLineUpSide(c->line); cstate->write_paused = true; hio_setcb_write(cstate->io, onWriteComplete); } - else - { - destroyContext(c); - } } } else @@ -239,14 +193,10 @@ static void downStream(tunnel_t *self, context_t *c) } if (c->fin) { - hio_t *io = cstate->io; - CSTATE_MUT(c) = NULL; - contextQueueNotifyIoRemoved(cstate->data_queue, c->src_io); - contextQueueNotifyIoRemoved(cstate->finished_queue, c->src_io); + hio_t *io = cstate->io; cleanup(cstate, true); - destroyLine(c->line); + CSTATE_MUT(c) = NULL; destroyContext(c); - hio_close(io); return; } } @@ -266,7 +216,6 @@ static void onRecv(hio_t *io, shift_buffer_t *buf) bool *first_packet_sent = &((cstate)->first_packet_sent); context_t *context = newContext(line); - context->src_io = io; context->payload = payload; if (! (*first_packet_sent)) { @@ -313,20 +262,17 @@ static void onInboundConnected(hevent_t *ev) line->src_ctx.address_protocol = kSapTcp; line->src_ctx.address = *(sockaddr_u *) hio_peeraddr(io); - *cstate = (tcp_listener_con_state_t){ - .line = line, - .buffer_pool = getThreadBufferPool(tid), - .finished_queue = newContextQueue(cstate->buffer_pool), - .data_queue = newContextQueue(cstate->buffer_pool), - .io = io, - .tunnel = self, - .write_paused = false, - .established = false, - .first_packet_sent = false - }; - - // sockaddr_set_port(&(line->src_ctx.addr), data->real_localport == 0 ? sockaddr_port((sockaddr_u - // *)hio_localaddr(io)) : data->real_localport); + *cstate = (tcp_listener_con_state_t){.line = line, + .buffer_pool = getThreadBufferPool(tid), + .data_queue = newContextQueue(cstate->buffer_pool), + .io = io, + .tunnel = self, + .write_paused = false, + .established = false, + .first_packet_sent = false}; + + setupLineDownSide(line, onLinePaused, cstate, onLineResumed); + sockaddr_set_port(&(line->src_ctx.address), data->real_localport); line->src_ctx.address_type = line->src_ctx.address.sa.sa_family == AF_INET ? kSatIPV4 : kSatIPV6; hevent_set_userdata(io, cstate); @@ -345,24 +291,13 @@ static void onInboundConnected(hevent_t *ev) free(data); - // io->upstream_io = NULL; hio_setcb_read(io, onRecv); hio_setcb_close(io, onClose); - // hio_setcb_write(io, onWriteComplete); not required here - if (resumeWriteQueue(cstate)) - { - cstate->write_paused = false; - } - else - { - hio_setcb_write(cstate->io, onWriteComplete); - } // send the init packet lockLine(line); { context_t *context = newInitContext(line); - context->src_io = io; self->upStream(self, context); if (! isAlive(line)) { diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index ad12ddac..28b0a2d9 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -1,9 +1,13 @@ #pragma once +#include "tunnel.h" #include "types.h" -#define MAX_CONCURRENT_STREAMS 0xffffffffu -#define PING_INTERVAL 5000 +enum +{ + kMaxConcurrentStreams = 0xffffffffU, + kPingInterval = 5000 +}; static void onPingTimer(htimer_t *timer); @@ -18,17 +22,6 @@ static nghttp2_nv makeNV(const char *name, const char *value) return nv; } -static nghttp2_nv makeNV2(const char *name, const char *value, int namelen, int valuelen) -{ - nghttp2_nv nv; - nv.name = (uint8_t *) name; - nv.value = (uint8_t *) value; - nv.namelen = namelen; - nv.valuelen = valuelen; - nv.flags = NGHTTP2_NV_FLAG_NONE; - return nv; -} - static void printFrameHd(const nghttp2_frame_hd *hd) { (void) hd; @@ -36,6 +29,41 @@ static void printFrameHd(const nghttp2_frame_hd *hd) // hd->stream_id); } +static void onStreamLinePaused(void *arg) +{ + http2_client_child_con_state_t *stream = (http2_client_child_con_state_t *) arg; + pauseLineUpSide(stream->parent); +} +static void onStreamLineResumed(void *arg) +{ + http2_client_child_con_state_t *stream = (http2_client_child_con_state_t *) arg; + resumeLineUpSide(stream->parent); +} + +static void onH2LinePaused(void *arg) +{ + http2_client_con_state_t *con = (http2_client_con_state_t *) arg; + http2_client_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + http2_client_child_con_state_t *next = stream_i->next; + pauseLineDownSide(next->line); + stream_i = next; + } +} + +static void onH2LineResumed(void *arg) +{ + http2_client_con_state_t *con = (http2_client_con_state_t *) arg; + http2_client_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + http2_client_child_con_state_t *next = stream_i->next; + pauseLineDownSide(next->line); + stream_i = next; + } +} + static void addStraem(http2_client_con_state_t *con, http2_client_child_con_state_t *stream) { stream->next = con->root.next; @@ -56,7 +84,7 @@ static void removeStream(http2_client_con_state_t *con, http2_client_child_con_s } } -static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_t *con, line_t *child_line, hio_t *io) +static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_t *con, line_t *child_line) { char authority_addr[320]; nghttp2_nv nvs[15]; @@ -102,11 +130,12 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_ stream->chunkbs = newBufferStream(getLineBufferPool(con->line)); stream->parent = con->line; stream->line = child_line; - stream->io = io; stream->tunnel = con->tunnel->dw; - stream->line->chains_state[stream->tunnel->chain_index + 1] = stream; + setupLineUpSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed); + addStraem(con, stream); + // nghttp2_session_set_stream_user_data(con->session, stream->stream_id, stream); return stream; @@ -116,38 +145,40 @@ static void deleteHttp2Stream(http2_client_child_con_state_t *stream) destroyBufferStream(stream->chunkbs); stream->line->chains_state[stream->tunnel->chain_index + 1] = NULL; + doneLineUpSide(stream->line); + free(stream); } -static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid, hio_t *io) +static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid) { - http2_client_state_t * state = STATE(self); + http2_client_state_t *state = STATE(self); http2_client_con_state_t *con = malloc(sizeof(http2_client_con_state_t)); - memset(con, 0, sizeof(http2_client_con_state_t)); - - con->queue = newContextQueue(getThreadBufferPool(tid)); - con->content_type = state->content_type; - con->path = state->path; - con->host = state->host; - con->host_port = state->host_port; - con->scheme = state->scheme; - con->method = kHttpGet; - con->line = newLine(tid); - con->ping_timer = htimer_add(con->line->loop, onPingTimer, PING_INTERVAL, INFINITE); - con->tunnel = self; - con->io = io; + + *con = (http2_client_con_state_t){ + .queue = newContextQueue(getThreadBufferPool(tid)), + .content_type = state->content_type, + .path = state->path, + .host = state->host, + .host_port = state->host_port, + .scheme = state->scheme, + .state = kH2SendMagic, + .method = kHttpGet, + .line = newLine(tid), + .ping_timer = htimer_add(con->line->loop, onPingTimer, kPingInterval, INFINITE), + .tunnel = self, + }; con->line->chains_state[self->chain_index] = con; + setupLineUpSide(con->line, onH2LinePaused, con, onH2LineResumed); hevent_set_userdata(con->ping_timer, con); nghttp2_session_client_new2(&con->session, state->cbs, con, state->ngoptions); nghttp2_settings_entry settings[] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, MAX_CONCURRENT_STREAMS}, + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, kMaxConcurrentStreams}, {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, (1U << 18)}, - }; + }; nghttp2_submit_settings(con->session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings)); - con->state = kH2SendMagic; - if (state->content_type == kApplicationGrpc) { con->method = kHttpPost; @@ -157,10 +188,10 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid, } static void deleteHttp2Connection(http2_client_con_state_t *con) { - tunnel_t * self = con->tunnel; + tunnel_t *self = con->tunnel; http2_client_state_t *state = STATE(self); - vec_cons * vector = &(state->thread_cpool[con->line->tid].cons); + vec_cons *vector = &(state->thread_cpool[con->line->tid].cons); vec_cons_iter it = vec_cons_find(vector, con); if (it.ref != vec_cons_end(vector).ref) { @@ -171,8 +202,8 @@ static void deleteHttp2Connection(http2_client_con_state_t *con) for (stream_i = con->root.next; stream_i;) { http2_client_child_con_state_t *next = stream_i->next; - context_t * fin_ctx = newFinContext(stream_i->line); - tunnel_t * dest = stream_i->tunnel; + context_t *fin_ctx = newFinContext(stream_i->line); + tunnel_t *dest = stream_i->tunnel; deleteHttp2Stream(stream_i); CSTATE_MUT(fin_ctx) = NULL; dest->downStream(dest, fin_ctx); @@ -181,12 +212,13 @@ static void deleteHttp2Connection(http2_client_con_state_t *con) nghttp2_session_del(con->session); con->line->chains_state[self->chain_index] = NULL; destroyContextQueue(con->queue); + doneLineDownSide(con->line); destroyLine(con->line); htimer_del(con->ping_timer); free(con); } -static http2_client_con_state_t *takeHttp2Connection(tunnel_t *self, int tid, hio_t *io) +static http2_client_con_state_t *takeHttp2Connection(tunnel_t *self, int tid) { http2_client_state_t *state = STATE(self); @@ -215,12 +247,12 @@ static http2_client_con_state_t *takeHttp2Connection(tunnel_t *self, int tid, hi return con; } - con = createHttp2Connection(self, tid, io); + con = createHttp2Connection(self, tid); vec_cons_push(vector, con); return con; } - http2_client_con_state_t *con = createHttp2Connection(self, tid, io); + http2_client_con_state_t *con = createHttp2Connection(self, tid); vec_cons_push(vector, con); return con; } @@ -232,7 +264,7 @@ static void onPingTimer(htimer_t *timer) { LOGW("Http2Client: closing a session due to no ping reply"); context_t *con_fc = newFinContext(con->line); - tunnel_t * con_dest = con->tunnel->up; + tunnel_t *con_dest = con->tunnel->up; deleteHttp2Connection(con); con_dest->upStream(con_dest, con_fc); } @@ -240,7 +272,7 @@ static void onPingTimer(htimer_t *timer) { con->no_ping_ack = true; nghttp2_submit_ping(con->session, 0, NULL); - char * data = NULL; + char *data = NULL; size_t len; len = nghttp2_session_mem_send(con->session, (const uint8_t **) &data); if (len > 0) @@ -250,7 +282,6 @@ static void onPingTimer(htimer_t *timer) writeRaw(send_buf, data, len); context_t *req = newContext(con->line); req->payload = send_buf; - req->src_io = NULL; if (! con->first_sent) { con->first_sent = true; diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index cb176a51..f9998568 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -23,8 +23,7 @@ static void sendGrpcFinalData(tunnel_t *self, line_t *line, size_t stream_id) endstream_ctx->payload = buf; self->up->upStream(self->up, endstream_ctx); } -static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t stream_id, hio_t *stream_io, - shift_buffer_t *buf) +static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t stream_id, shift_buffer_t *buf) { line_t *line = con->line; if (con == NULL) @@ -45,7 +44,6 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t context_t *req = newContext(line); req->payload = send_buf; - req->src_io = stream_io; if (! con->first_sent) { con->first_sent = true; @@ -96,7 +94,6 @@ static bool trySendRequest(tunnel_t *self, http2_client_con_state_t *con, size_t http2FrameHdPack(&framehd, rawBufMut(buf)); context_t *req = newContext(line); req->payload = buf; - req->src_io = stream_io; self->up->upStream(self->up, req); goto send_done; @@ -118,11 +115,10 @@ static void flushWriteQueue(http2_client_con_state_t *con) { context_t *stream_context = contextQueuePop(con->queue); http2_client_child_con_state_t *stream = CSTATE(stream_context); - stream->io = stream_context->src_io; con->state = kH2SendHeaders; // consumes payload - while (trySendRequest(self, con, stream->stream_id, stream->io, stream_context->payload)) + while (trySendRequest(self, con, stream->stream_id, stream_context->payload)) { if (! isAlive(g->line)) { @@ -224,7 +220,6 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->bytes_needed = 0; context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; - stream_data->src_io = con->io; stream->tunnel->downStream(stream->tunnel, stream_data); if (nghttp2_session_get_stream_user_data(session, stream_id)) @@ -243,7 +238,6 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 writeRaw(buf, data, len); context_t *stream_data = newContext(stream->line); stream_data->payload = buf; - stream_data->src_io = con->io; stream->tunnel->downStream(stream->tunnel, stream_data); } @@ -327,7 +321,6 @@ static void upStream(tunnel_t *self, context_t *c) { http2_client_child_con_state_t *stream = CSTATE(c); http2_client_con_state_t *con = stream->parent->chains_state[self->chain_index]; - stream->io = c->src_io; if (! con->handshake_completed) { @@ -337,7 +330,7 @@ static void upStream(tunnel_t *self, context_t *c) con->state = kH2SendHeaders; // consumes payload - while (trySendRequest(self, con, stream->stream_id, stream->io, c->payload)) + while (trySendRequest(self, con, stream->stream_id, c->payload)) { if (! isAlive(c->line)) { @@ -352,8 +345,8 @@ static void upStream(tunnel_t *self, context_t *c) { if (c->init) { - http2_client_con_state_t *con = takeHttp2Connection(self, c->line->tid, NULL); - http2_client_child_con_state_t *stream = createHttp2Stream(con, c->line, c->src_io); + http2_client_con_state_t *con = takeHttp2Connection(self, c->line->tid); + http2_client_child_con_state_t *stream = createHttp2Stream(con, c->line); CSTATE_MUT(c) = stream; nghttp2_session_set_stream_user_data(con->session, stream->stream_id, stream); @@ -368,7 +361,7 @@ static void upStream(tunnel_t *self, context_t *c) } } - while (trySendRequest(self, con, 0, NULL, NULL)) + while (trySendRequest(self, con, 0, NULL)) { if (! isAlive(c->line)) { @@ -409,8 +402,6 @@ static void downStream(tunnel_t *self, context_t *c) { http2_client_state_t *state = STATE(self); http2_client_con_state_t *con = CSTATE(c); - con->io = c->src_io; - if (c->payload != NULL) { @@ -434,7 +425,7 @@ static void downStream(tunnel_t *self, context_t *c) return; } - while (trySendRequest(self, con, 0, NULL, NULL)) + while (trySendRequest(self, con, 0, NULL)) { if (! isAlive(c->line)) { diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index b0212182..87e6f42f 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -34,7 +34,6 @@ typedef struct http2_client_child_con_state_s tunnel_t * tunnel; line_t * parent; line_t * line; - hio_t * io; } http2_client_child_con_state_t; @@ -60,7 +59,6 @@ typedef struct http2_client_con_state_s htimer_t * ping_timer; tunnel_t * tunnel; line_t * line; - hio_t * io; http2_client_child_con_state_t root; } http2_client_con_state_t; diff --git a/tunnels/client/openssl/openssl_client.c b/tunnels/client/openssl/openssl_client.c index 6627886d..28f96b8e 100644 --- a/tunnels/client/openssl/openssl_client.c +++ b/tunnels/client/openssl/openssl_client.c @@ -223,9 +223,7 @@ static void upStream(tunnel_t *self, context_t *c) return; failed:; - context_t *fail_context_up = newFinContextFrom(c); - fail_context_up->src_io = c->src_io; - self->up->upStream(self->up, fail_context_up); + self->up->upStream(self->up, newFinContextFrom(c)); context_t *fail_context = newFinContextFrom(c); cleanup(self, c); diff --git a/tunnels/client/reality/reality_client.c b/tunnels/client/reality/reality_client.c index 96b4b936..a5705552 100644 --- a/tunnels/client/reality/reality_client.c +++ b/tunnels/client/reality/reality_client.c @@ -232,9 +232,7 @@ static void upStream(tunnel_t *self, context_t *c) return; failed:; - context_t *fail_context_up = newFinContextFrom(c); - fail_context_up->src_io = c->src_io; - self->up->upStream(self->up, fail_context_up); + self->up->upStream(self->up, newFinContextFrom(c)); context_t *fail_context = newFinContextFrom(c); cleanup(self, c); diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 5d878158..9235b0da 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -1,14 +1,37 @@ #pragma once #include "loggers/network_logger.h" +#include "tunnel.h" #include "types.h" #include "utils/mathutils.h" -#define CSTATE_D(x) ((reverse_client_con_state_t *) ((((x)->line->chains_state)[state->chain_index_pi]))) -#define CSTATE_U(x) ((reverse_client_con_state_t *) ((((x)->line->chains_state)[state->chain_index_pi]))) -#define CSTATE_D_MUT(x) ((x)->line->chains_state)[state->chain_index_pi] -#define CSTATE_U_MUT(x) ((x)->line->chains_state)[state->chain_index_pi] -#define PRECONNECT_DELAY_SHORT 10 -#define PRECONNECT_DELAY_HIGH 750 +#define CSTATE_D(x) ((reverse_client_con_state_t *) ((((x)->line->chains_state)[state->chain_index_d]))) +#define CSTATE_U(x) ((reverse_client_con_state_t *) ((((x)->line->chains_state)[self->chain_index]))) +#define CSTATE_D_MUT(x) ((x)->line->chains_state)[state->chain_index_d] +#define CSTATE_U_MUT(x) ((x)->line->chains_state)[self->chain_index] +enum +{ + kPreconnectDelayShort = 10, + kPreconnectDelayHigh = 750 +}; + +static void onLinePausedU(void *cstate) +{ + pauseLineUpSide(((reverse_client_con_state_t *) cstate)->d); +} + +static void onLineResumedU(void *cstate) +{ + resumeLineUpSide(((reverse_client_con_state_t *) cstate)->d); +} +static void onLinePausedD(void *cstate) +{ + pauseLineUpSide(((reverse_client_con_state_t *) cstate)->u); +} + +static void onLineResumedD(void *cstate) +{ + resumeLineUpSide(((reverse_client_con_state_t *) cstate)->u); +} static reverse_client_con_state_t *createCstate(uint8_t tid) { @@ -17,22 +40,21 @@ static reverse_client_con_state_t *createCstate(uint8_t tid) line_t *up = newLine(tid); line_t *dw = newLine(tid); - cstate->u = up; - cstate->d = dw; + reserveChainStateIndex(dw); // we always take one from the down line + setupLineDownSide(up, onLinePausedU, cstate, onLineResumedU); + setupLineDownSide(dw, onLinePausedD, cstate, onLinePausedD); + cstate->u = up; + cstate->d = dw; return cstate; } static void cleanup(reverse_client_con_state_t *cstate) { - if (cstate->u) - { - destroyLine(cstate->u); - } - if (cstate->d) - { - destroyLine(cstate->d); - } + doneLineDownSide(cstate->u); + doneLineDownSide(cstate->d); + destroyLine(cstate->u); + destroyLine(cstate->d); free(cstate); } static void doConnect(struct connect_arg *cg) @@ -41,8 +63,8 @@ static void doConnect(struct connect_arg *cg) reverse_client_state_t *state = STATE(self); reverse_client_con_state_t *cstate = createCstate(cg->tid); free(cg); - (cstate->u->chains_state)[state->chain_index_pi] = cstate; - (cstate->d->chains_state)[state->chain_index_pi] = cstate; + (cstate->u->chains_state)[self->chain_index] = cstate; + (cstate->d->chains_state)[state->chain_index_d] = cstate; self->up->upStream(self->up, newInitContext(cstate->u)); } @@ -96,7 +118,7 @@ static void initiateConnect(tunnel_t *self, uint8_t tid, bool delay) struct connect_arg *cg = malloc(sizeof(struct connect_arg)); cg->t = self; cg->tid = tid; - cg->delay = delay ? PRECONNECT_DELAY_HIGH : PRECONNECT_DELAY_SHORT; + cg->delay = delay ? kPreconnectDelayHigh : kPreconnectDelayShort; ev.userdata = cg; hloop_post_event(worker_loop, &ev); } \ No newline at end of file diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index 6121a1c0..65ce0bf5 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -26,11 +26,11 @@ static void upStream(tunnel_t *self, context_t *c) if (c->fin) { - const unsigned int tid = c->line->tid; - reverse_client_con_state_t *dcstate = CSTATE_D(c); - CSTATE_D_MUT(c) = NULL; - (dcstate->u->chains_state)[state->chain_index_pi] = NULL; - context_t *fc = switchLine(c, dcstate->u); + const unsigned int tid = c->line->tid; + reverse_client_con_state_t *dcstate = CSTATE_D(c); + CSTATE_D_MUT(c) = NULL; + (dcstate->u->chains_state)[self->chain_index] = NULL; + context_t *fc = switchLine(c, dcstate->u); cleanup(dcstate); const unsigned int old_reverse_cons = atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed); @@ -111,9 +111,9 @@ static void downStream(tunnel_t *self, context_t *c) if (c->fin) { - reverse_client_con_state_t *ucstate = CSTATE_U(c); - CSTATE_U_MUT(c) = NULL; - (ucstate->d->chains_state)[state->chain_index_pi] = NULL; + reverse_client_con_state_t *ucstate = CSTATE_U(c); + CSTATE_U_MUT(c) = NULL; + (ucstate->d->chains_state)[state->chain_index_d] = NULL; if (ucstate->pair_connected) { @@ -158,7 +158,7 @@ static void downStream(tunnel_t *self, context_t *c) } } -static void startReverseCelint(htimer_t *timer) +static void startReverseClient(htimer_t *timer) { tunnel_t *self = hevent_userdata(timer); reverse_client_state_t *state = STATE(self); @@ -191,16 +191,16 @@ tunnel_t *newReverseClient(node_instance_context_t *instance_info) state->connection_per_thread = 1; // we are always the first line creator so its easy to get the positon independent index here - line_t *l = newLine(0); - int index = reserveChainStateIndex(l); - state->chain_index_pi = index; + line_t *l = newLine(0); + int index = reserveChainStateIndex(l); + state->chain_index_d = index; destroyLine(l); tunnel_t *t = newTunnel(); t->state = state; t->upStream = &upStream; t->downStream = &downStream; - htimer_t *start_timer = htimer_add(loops[0], startReverseCelint, start_delay_ms, 1); + htimer_t *start_timer = htimer_add(loops[0], startReverseClient, start_delay_ms, 1); hevent_set_userdata(start_timer, t); atomic_thread_fence(memory_order_release); diff --git a/tunnels/client/reverse/types.h b/tunnels/client/reverse/types.h index f845c010..5e650dbf 100644 --- a/tunnels/client/reverse/types.h +++ b/tunnels/client/reverse/types.h @@ -7,7 +7,7 @@ struct connect_arg { uint8_t tid; unsigned int delay; - tunnel_t * t; + tunnel_t *t; }; typedef struct reverse_client_con_state_s @@ -25,8 +25,10 @@ typedef struct reverse_client_state_s { atomic_uint reverse_cons; atomic_uint round_index; - size_t chain_index_pi; - size_t connection_per_thread; + + uint8_t chain_index_d; + + int connection_per_thread; int min_unused_cons; unsigned int unused_cons[]; diff --git a/tunnels/client/wolfssl/wolfssl_client.c b/tunnels/client/wolfssl/wolfssl_client.c index e93d3a2a..7ba507a5 100644 --- a/tunnels/client/wolfssl/wolfssl_client.c +++ b/tunnels/client/wolfssl/wolfssl_client.c @@ -223,9 +223,7 @@ static void upStream(tunnel_t *self, context_t *c) return; failed:; - context_t *fail_context_up = newFinContextFrom(c); - fail_context_up->src_io = c->src_io; - self->up->upStream(self->up, fail_context_up); + self->up->upStream(self->up, newFinContextFrom(c)); context_t *fail_context = newFinContextFrom(c); cleanup(self, c); diff --git a/tunnels/server/http2/helpers.h b/tunnels/server/http2/helpers.h index d1f5b069..8070dc09 100644 --- a/tunnels/server/http2/helpers.h +++ b/tunnels/server/http2/helpers.h @@ -1,7 +1,12 @@ #pragma once +#include "nghttp2/nghttp2.h" +#include "tunnel.h" #include "types.h" -#define MAX_CONCURRENT_STREAMS 0xffffffffu +enum +{ + kMaxConcurrentStreams = 0xffffffffU +}; static nghttp2_nv makeNv(const char *name, const char *value) { @@ -14,16 +19,6 @@ static nghttp2_nv makeNv(const char *name, const char *value) return nv; } -static nghttp2_nv makeNv2(const char *name, const char *value, int namelen, int valuelen) -{ - nghttp2_nv nv; - nv.name = (uint8_t *) name; - nv.value = (uint8_t *) value; - nv.namelen = namelen; - nv.valuelen = valuelen; - nv.flags = NGHTTP2_NV_FLAG_NONE; - return nv; -} static void printFrameHd(const nghttp2_frame_hd *hd) { @@ -51,21 +46,57 @@ static void removeStream(http2_server_con_state_t *con, http2_server_child_con_s stream->next->prev = stream->prev; } } + +static void onStreamLinePaused(void *arg) +{ + http2_server_child_con_state_t *stream = (http2_server_child_con_state_t *) arg; + pauseLineDownSide(stream->parent); +} +static void onStreamLineResumed(void *arg) +{ + http2_server_child_con_state_t *stream = (http2_server_child_con_state_t *) arg; + resumeLineDownSide(stream->parent); +} + +static void onH2LinePaused(void *arg) +{ + http2_server_con_state_t *con = (http2_server_con_state_t *) arg; + http2_server_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + http2_server_child_con_state_t *next = stream_i->next; + pauseLineUpSide(next->line); + stream_i = next; + } +} + +static void onH2LineResumed(void *arg) +{ + http2_server_con_state_t *con = (http2_server_con_state_t *) arg; + http2_server_child_con_state_t *stream_i; + for (stream_i = con->root.next; stream_i;) + { + http2_server_child_con_state_t *next = stream_i->next; + resumeLineUpSide(next->line); + stream_i = next; + } +} + http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_t *con, line_t *this_line, tunnel_t *target_tun, int32_t stream_id) { http2_server_child_con_state_t *stream; stream = malloc(sizeof(http2_server_child_con_state_t)); memset(stream, 0, sizeof(http2_server_child_con_state_t)); - + stream->stream_id = stream_id; stream->chunkbs = newBufferStream(getLineBufferPool(this_line)); stream->parent = this_line; stream->line = newLine(this_line->tid); stream->line->chains_state[target_tun->chain_index - 1] = stream; - stream->io = NULL; stream->tunnel = target_tun; nghttp2_session_set_stream_user_data(con->session, stream_id, stream); + setupLineDownSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed); return stream; } @@ -74,6 +105,7 @@ static void deleteHttp2Stream(http2_server_child_con_state_t *stream) stream->line->chains_state[stream->tunnel->chain_index - 1] = NULL; destroyBufferStream(stream->chunkbs); + doneLineDownSide(stream->line); destroyLine(stream->line); if (stream->request_path) { @@ -82,20 +114,19 @@ static void deleteHttp2Stream(http2_server_child_con_state_t *stream) free(stream); } -static http2_server_con_state_t *createHttp2Connection(tunnel_t *self, line_t *line, hio_t *io) +static http2_server_con_state_t *createHttp2Connection(tunnel_t *self, line_t *line) { - http2_server_state_t * state = STATE(self); + http2_server_state_t *state = STATE(self); http2_server_con_state_t *con = malloc(sizeof(http2_server_con_state_t)); memset(con, 0, sizeof(http2_server_con_state_t)); - nghttp2_session_server_new2(&con->session, state->cbs, con, state->ngoptions); con->state = kH2WantRecv; con->tunnel = self; con->line = line; - con->io = io; + setupLineUpSide(line, onH2LinePaused, con, onH2LineResumed); nghttp2_settings_entry settings[] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, MAX_CONCURRENT_STREAMS}, + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, kMaxConcurrentStreams}, {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, (1U << 18)}, }; nghttp2_submit_settings(con->session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings)); @@ -104,12 +135,12 @@ static http2_server_con_state_t *createHttp2Connection(tunnel_t *self, line_t *l } static void deleteHttp2Connection(http2_server_con_state_t *con) { - tunnel_t * self = con->tunnel; + tunnel_t *self = con->tunnel; http2_server_child_con_state_t *stream_i; for (stream_i = con->root.next; stream_i;) { - context_t * fin_ctx = newFinContext(stream_i->line); - tunnel_t * dest = stream_i->tunnel; + context_t *fin_ctx = newFinContext(stream_i->line); + tunnel_t *dest = stream_i->tunnel; http2_server_child_con_state_t *next = stream_i->next; deleteHttp2Stream(stream_i); CSTATE_MUT(fin_ctx) = NULL; @@ -118,7 +149,7 @@ static void deleteHttp2Connection(http2_server_con_state_t *con) } nghttp2_session_del(con->session); - + doneLineUpSide(con->line); con->line->chains_state[self->chain_index] = NULL; free(con); } diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index be8da89f..06ffa620 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -1,7 +1,9 @@ #include "http2_server.h" - +#include "grpc_def.h" +#include "http2_def.h" #include "helpers.h" #include "loggers/network_logger.h" +#include "nghttp2/nghttp2.h" #include "types.h" static int onHeaderCallback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *_name, size_t namelen, @@ -103,7 +105,6 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 stream->bytes_needed = 0; context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; - stream_data->src_io = con->io; if (! stream->first_sent) { stream->first_sent = true; @@ -127,7 +128,6 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 writeRaw(buf, data, len); context_t *stream_data = newContext(stream->line); stream_data->payload = buf; - stream_data->src_io = con->io; if (! stream->first_sent) { stream->first_sent = true; @@ -221,7 +221,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr return 0; } -static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_t stream_id, hio_t *stream_io, +static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_t stream_id, shift_buffer_t *buf) { line_t *line = con->line; @@ -243,7 +243,6 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ writeRaw(send_buf, data, len); context_t *response_data = newContext(line); response_data->payload = send_buf; - response_data->src_io = stream_io; self->dw->downStream(self->dw, response_data); return true; @@ -287,7 +286,6 @@ static bool trySendResponse(tunnel_t *self, http2_server_con_state_t *con, size_ http2FrameHdPack(&framehd, rawBufMut(buf)); context_t *response_data = newContext(line); response_data->payload = buf; - response_data->src_io = stream_io; self->dw->downStream(self->dw, response_data); goto send_done; @@ -308,7 +306,6 @@ static void upStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { http2_server_con_state_t *con = CSTATE(c); - con->io = c->src_io; con->state = kH2WantRecv; size_t len = bufLen(c->payload); size_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), len); @@ -328,7 +325,7 @@ static void upStream(tunnel_t *self, context_t *c) return; } - while (trySendResponse(self, con, 0, NULL, NULL)) + while (trySendResponse(self, con, 0, NULL)) { if (! isAlive(c->line)) { @@ -342,7 +339,7 @@ static void upStream(tunnel_t *self, context_t *c) { if (c->init) { - CSTATE_MUT(c) = createHttp2Connection(self, c->line, c->src_io); + CSTATE_MUT(c) = createHttp2Connection(self, c->line); self->dw->downStream(self->dw, newEstContext(c->line)); destroyContext(c); @@ -359,12 +356,11 @@ static void downStream(tunnel_t *self, context_t *c) { http2_server_child_con_state_t *stream = CSTATE(c); http2_server_con_state_t * con = stream->parent->chains_state[self->chain_index]; - stream->io = c->src_io; if (c->payload != NULL) { con->state = kH2SendHeaders; - while (trySendResponse(self, con, stream->stream_id, stream->io, c->payload)) + while (trySendResponse(self, con, stream->stream_id, c->payload)) { if (! isAlive(c->line)) { @@ -395,7 +391,7 @@ static void downStream(tunnel_t *self, context_t *c) deleteHttp2Stream(stream); CSTATE_MUT(c) = NULL; - while (trySendResponse(self, con, 0, NULL, NULL)) + while (trySendResponse(self, con, 0, NULL)) { if (! isAlive(c->line)) { @@ -434,7 +430,7 @@ tunnel_t *newHttp2Server(node_instance_context_t *instance_info) nghttp2_session_callbacks_set_on_frame_recv_callback(state->cbs, onFrameRecvCallback); nghttp2_option_new(&(state->ngoptions)); - nghttp2_option_set_peer_max_concurrent_streams(state->ngoptions, 0xffffffffU); + nghttp2_option_set_peer_max_concurrent_streams(state->ngoptions, kMaxConcurrentStreams); nghttp2_option_set_no_closed_streams(state->ngoptions, 1); nghttp2_option_set_no_http_messaging(state->ngoptions, 1); diff --git a/tunnels/server/http2/types.h b/tunnels/server/http2/types.h index dc97e8d4..60e49877 100644 --- a/tunnels/server/http2/types.h +++ b/tunnels/server/http2/types.h @@ -1,8 +1,7 @@ #pragma once #include "api.h" #include "buffer_stream.h" -#include "grpc_def.h" -#include "http2_def.h" + #include "http_def.h" #include "nghttp2/nghttp2.h" @@ -36,7 +35,6 @@ typedef struct http2_server_child_con_state_s size_t bytes_needed; line_t * parent; line_t * line; - hio_t * io; tunnel_t * tunnel; } http2_server_child_con_state_t; @@ -51,7 +49,6 @@ typedef struct http2_server_con_state_s tunnel_t * tunnel; line_t * line; - hio_t * io; http2_server_child_con_state_t root; } http2_server_con_state_t; diff --git a/tunnels/server/openssl/openssl_server.c b/tunnels/server/openssl/openssl_server.c index 50a447e5..f878d912 100644 --- a/tunnels/server/openssl/openssl_server.c +++ b/tunnels/server/openssl/openssl_server.c @@ -140,7 +140,6 @@ static void fallbackWrite(tunnel_t *self, context_t *c) cstate->fallback_init_sent = true; context_t *init_ctx = newInitContext(c->line); - init_ctx->src_io = c->src_io; cstate->init_sent = true; state->fallback->upStream(state->fallback, init_ctx); if (! isAlive(c->line)) @@ -265,9 +264,7 @@ static void upStream(tunnel_t *self, context_t *c) LOGD("OpensslServer: Tls handshake complete"); cstate->handshake_completed = true; empytBufferStream(cstate->fallback_buf); - context_t *up_init_ctx = newInitContext(c->line); - up_init_ctx->src_io = c->src_io; - self->up->upStream(self->up, up_init_ctx); + self->up->upStream(self->up, newInitContext(c->line)); if (! isAlive(c->line)) { LOGW("OpensslServer: next node instantly closed the init with fin"); @@ -421,9 +418,7 @@ static void upStream(tunnel_t *self, context_t *c) return; failed_after_establishment:; - context_t *fail_context_up = newFinContextFrom(c); - fail_context_up->src_io = c->src_io; - self->up->upStream(self->up, fail_context_up); + self->up->upStream(self->up, newFinContextFrom(c)); disconnect:; context_t *fail_context = newFinContextFrom(c); diff --git a/tunnels/server/reverse/helpers.h b/tunnels/server/reverse/helpers.h index cbfa842a..b36d0c40 100644 --- a/tunnels/server/reverse/helpers.h +++ b/tunnels/server/reverse/helpers.h @@ -1,4 +1,5 @@ #pragma once +#include "tunnel.h" #include "types.h" #define CSTATE_D(x) ((reverse_server_con_state_t *) ((((x)->line->chains_state)[state->chain_index_d]))) @@ -47,6 +48,24 @@ static void removeConnectionD(thread_box_t *box, reverse_server_con_state_t *con } box->d_count -= 1; } +static void onLinePausedU(void *cstate) +{ + pauseLineDownSide(((reverse_server_con_state_t *) cstate)->d); +} + +static void onLineResumedU(void *cstate) +{ + resumeLineDownSide(((reverse_server_con_state_t *) cstate)->d); +} +static void onLinePausedD(void *cstate) +{ + pauseLineDownSide(((reverse_server_con_state_t *) cstate)->u); +} + +static void onLineResumedD(void *cstate) +{ + resumeLineDownSide(((reverse_server_con_state_t *) cstate)->u); +} static reverse_server_con_state_t *createCstate(bool isup, line_t *line) { diff --git a/tunnels/server/reverse/reverse_server.c b/tunnels/server/reverse/reverse_server.c index 0e7c9add..a4248a2b 100644 --- a/tunnels/server/reverse/reverse_server.c +++ b/tunnels/server/reverse/reverse_server.c @@ -2,6 +2,7 @@ #include "helpers.h" #include "loggers/network_logger.h" #include "managers/node_manager.h" +#include "tunnel.h" #include "types.h" #include "ww.h" @@ -67,6 +68,8 @@ static void upStream(tunnel_t *self, context_t *c) removeConnectionU(this_tb, ucstate); ucstate->d = c->line; ucstate->paired = true; + setupLineUpSide(ucstate->u, onLinePausedU, ucstate, onLineResumedU); + setupLineUpSide(ucstate->d, onLinePausedD, ucstate, onLineResumedD); CSTATE_D_MUT(c) = ucstate; self->up->upStream(self->up, newEstContext(ucstate->u)); @@ -145,8 +148,10 @@ static void downStream(tunnel_t *self, context_t *c) reverse_server_con_state_t *dcstate = this_tb->d_cons_root.next; removeConnectionD(this_tb, dcstate); - dcstate->u = c->line; - dcstate->paired = true; + dcstate->u = c->line; + dcstate->paired = true; + setupLineUpSide(dcstate->u, onLinePausedU, dcstate, onLineResumedU); + setupLineUpSide(dcstate->d, onLinePausedD, dcstate, onLineResumedD); CSTATE_U_MUT(c) = dcstate; (dcstate->d->chains_state)[state->chain_index_d] = dcstate; self->up->upStream(self->up, newEstContext(c->line)); diff --git a/tunnels/server/trojan/auth/trojan_auth_server.c b/tunnels/server/trojan/auth/trojan_auth_server.c index 46093628..19e83ab4 100644 --- a/tunnels/server/trojan/auth/trojan_auth_server.c +++ b/tunnels/server/trojan/auth/trojan_auth_server.c @@ -126,10 +126,8 @@ static void upStream(tunnel_t *self, context_t *c) LOGD("TrojanAuthServer: user \"%s\" accepted", tuser->user.name); cstate->authenticated = true; markAuthenticated(c->line); - context_t *init_ctx = newInitContext(c->line); - init_ctx->src_io = c->src_io; cstate->init_sent = true; - self->up->upStream(self->up, init_ctx); + self->up->upStream(self->up, newInitContext(c->line)); if (! isAlive(c->line)) { reuseContextBuffer(c); @@ -199,11 +197,8 @@ disconnect:; fallback:; if (! cstate->init_sent) { - context_t *init_ctx = newInitContext(c->line); - init_ctx->src_io = c->src_io; cstate->init_sent = true; - - state->fallback->upStream(state->fallback, init_ctx); + state->fallback->upStream(state->fallback, newInitContext(c->line)); if (! isAlive(c->line)) { reuseContextBuffer(c); diff --git a/tunnels/server/trojan/socks/trojan_socks_server.c b/tunnels/server/trojan/socks/trojan_socks_server.c index 972462be..f8cd88ca 100644 --- a/tunnels/server/trojan/socks/trojan_socks_server.c +++ b/tunnels/server/trojan/socks/trojan_socks_server.c @@ -200,7 +200,7 @@ static bool parseAddress(context_t *c) return true; } -static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, line_t *line, hio_t *src_io) +static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, line_t *line) { buffer_stream_t *bstream = cstate->udp_stream; if (bufferStreamLen(bstream) <= 0) @@ -216,7 +216,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, { case kTrojanatypIpV4: // address_type | DST.ADDR | DST.PORT | Length | CRLF | Payload - // 1 | 4 | 2 | 2 | 2 + // 1 | 4 | 2 | 2 | 2 if (bufferStreamLen(bstream) < 1 + 4 + 2 + 2 + 2) { @@ -237,7 +237,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, break; case kTrojanatypDomainName: // address_type | DST.ADDR | DST.PORT | Length | CRLF | Payload - // 1 | x(1) + x | 2 | 2 | 2 + // 1 | x(1) + x | 2 | 2 | 2 if (bufferStreamLen(bstream) < 1 + 1 + 2 + 2 + 2) { return true; @@ -262,7 +262,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, break; case kTrojanatypIpV6: // address_type | DST.ADDR | DST.PORT | Length | CRLF | Payload - // 1 | 16 | 2 | 2 | 2 + // 1 | 16 | 2 | 2 | 2 if (bufferStreamLen(bstream) < 1 + 16 + 2 + 2 + 2) { @@ -294,7 +294,6 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, context_t *c = newContext(line); socket_context_t *dest_context = &(c->line->dest_ctx); - c->src_io = src_io; c->payload = bufferStreamRead(bstream, full_len); if (cstate->init_sent) @@ -305,7 +304,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, { return true; } - return processUdp(self, cstate, line, src_io); + return processUdp(self, cstate, line); } dest_context->address.sa.sa_family = AF_INET; @@ -394,7 +393,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, { return true; } - return processUdp(self, cstate, line, src_io); + return processUdp(self, cstate, line); } static void upStream(tunnel_t *self, context_t *c) @@ -450,7 +449,7 @@ static void upStream(tunnel_t *self, context_t *c) bufferStreamPush(cstate->udp_stream, c->payload); c->payload = NULL; - if (! processUdp(self, cstate, c->line, c->src_io)) + if (! processUdp(self, cstate, c->line)) { LOGE("TrojanSocksServer: udp packet could not be parsed"); @@ -497,7 +496,7 @@ static void upStream(tunnel_t *self, context_t *c) bufferStreamPush(cstate->udp_stream, c->payload); c->payload = NULL; - if (! processUdp(self, cstate, c->line, c->src_io)) + if (! processUdp(self, cstate, c->line)) { LOGE("TrojanSocksServer: udp packet could not be parsed"); diff --git a/tunnels/server/wolfssl/wolfssl_server.c b/tunnels/server/wolfssl/wolfssl_server.c index 09adb32b..7f0754b3 100644 --- a/tunnels/server/wolfssl/wolfssl_server.c +++ b/tunnels/server/wolfssl/wolfssl_server.c @@ -134,10 +134,8 @@ static void fallbackWrite(tunnel_t *self, context_t *c) { cstate->fallback_init_sent = true; - context_t *init_ctx = newInitContext(c->line); - init_ctx->src_io = c->src_io; cstate->init_sent = true; - state->fallback->upStream(state->fallback, init_ctx); + state->fallback->upStream(state->fallback, newInitContext(c->line)); if (! isAlive(c->line)) { destroyContext(c); @@ -260,9 +258,7 @@ static void upStream(tunnel_t *self, context_t *c) LOGD("OpensslServer: Tls handshake complete"); cstate->handshake_completed = true; - context_t *up_init_ctx = newInitContext(c->line); - up_init_ctx->src_io = c->src_io; - self->up->upStream(self->up, up_init_ctx); + self->up->upStream(self->up, newInitContext(c->line)); if (! isAlive(c->line)) { LOGW("OpensslServer: next node instantly closed the init with fin"); @@ -416,9 +412,7 @@ static void upStream(tunnel_t *self, context_t *c) return; failed_after_establishment:; - context_t *fail_context_up = newFinContextFrom(c); - fail_context_up->src_io = c->src_io; - self->up->upStream(self->up, fail_context_up); + self->up->upStream(self->up, newFinContextFrom(c)); disconnect:; context_t *fail_context = newFinContextFrom(c); diff --git a/ww/buffer_pool.c b/ww/buffer_pool.c index 85574e8b..4ca0d1cb 100644 --- a/ww/buffer_pool.c +++ b/ww/buffer_pool.c @@ -10,7 +10,6 @@ #include // NOLINTBEGIN -// todo (cmake) #define MEMORY_PROFILE_SMALL (ram_profile >= kRamProfileM1Memory ? kRamProfileM1Memory : ram_profile) #define MEMORY_PROFILE_SELECTED ram_profile diff --git a/ww/buffer_pool.h b/ww/buffer_pool.h index 5fd2ab75..b7d8b68a 100644 --- a/ww/buffer_pool.h +++ b/ww/buffer_pool.h @@ -18,9 +18,8 @@ This is the most memory consuming part of the program, and also the preallocation length really depends on where you want to use this program, on a mobile phone or on a 16 core server? - so there are possible choices for memory profiles in the .c file which you can select the best for your needs. - todo (runtime selection) its better that the memory profile be a runtime selection than compile time. + so the pool width is affected by ww memory profile */ diff --git a/ww/context_queue.c b/ww/context_queue.c index 8621945f..77ff5f0a 100644 --- a/ww/context_queue.c +++ b/ww/context_queue.c @@ -10,7 +10,7 @@ #include "stc/deq.h" enum { - kQCap = 32 + kQCap = 16 }; struct context_queue_s @@ -44,50 +44,15 @@ void destroyContextQueue(context_queue_t *self) void contextQueuePush(context_queue_t *self, context_t *context) { - if (context->src_io) - { - context->fd = hio_fd(context->src_io); - } queue_push_back(&self->q, context); } -// i don't know if there is a better way, since a muxed src can just gets freed even if its in the queue :( -// could we proxy the close callback ...? context_t *contextQueuePop(context_queue_t *self) { context_t *context = queue_pull_front(&self->q); - - if (context->fd == 0 || ! hio_exists(context->line->loop, context->fd)) - { - context->src_io = NULL; - } - else - { - if (hio_is_closed(context->src_io) || context->src_io != hio_get(context->line->loop, context->fd)) - { - context->src_io = NULL; - } - } - return context; } size_t contextQueueLen(context_queue_t *self) { return queue_size(&self->q); } - -void contextQueueNotifyIoRemoved(context_queue_t *self, hio_t *io) -{ - if (io == NULL) - { - return; - } - c_foreach(i, queue, self->q) - { - if ((*i.ref)->src_io == io) - { - (*i.ref)->src_io = NULL; - (*i.ref)->fd = 0; - } - } -} diff --git a/ww/context_queue.h b/ww/context_queue.h index 4cef559f..1a2f1b5f 100644 --- a/ww/context_queue.h +++ b/ww/context_queue.h @@ -17,4 +17,3 @@ void destroyContextQueue(context_queue_t *self); void contextQueuePush(context_queue_t *self, context_t *context); context_t *contextQueuePop(context_queue_t *self); size_t contextQueueLen(context_queue_t *self); -void contextQueueNotifyIoRemoved(context_queue_t *self, hio_t *io); diff --git a/ww/tunnel.c b/ww/tunnel.c index 61218e96..770a5dab 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -27,6 +27,14 @@ extern context_t *switchLine(context_t *c, line_t *line); extern buffer_pool_t *getThreadBufferPool(uint8_t tid); extern buffer_pool_t *getLineBufferPool(line_t *l); extern buffer_pool_t *getContextBufferPool(context_t *c); +extern void setupLineUpSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb); +extern void setupLineDownSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb); +extern void doneLineUpSide(line_t *l); +extern void doneLineDownSide(line_t *l); +extern void pauseLineUpSide(line_t *l); +extern void pauseLineDownSide(line_t *l); +extern void resumeLineUpSide(line_t *l); +extern void resumeLineDownSide(line_t *l); // `from` upstreams to `to` void chainUp(tunnel_t *from, tunnel_t *to) diff --git a/ww/tunnel.h b/ww/tunnel.h index 8f890b80..cb586015 100644 --- a/ww/tunnel.h +++ b/ww/tunnel.h @@ -42,11 +42,19 @@ enum #define CSTATE(x) ((void *) ((((x)->line->chains_state)[self->chain_index]))) #define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index] +typedef void (*LineFlowSignal)(void *state); + typedef struct line_s { hloop_t *loop; socket_context_t src_ctx; socket_context_t dest_ctx; + void *up_state; + void *dw_state; + LineFlowSignal up_pause_cb; + LineFlowSignal up_resume_cb; + LineFlowSignal dw_pause_cb; + LineFlowSignal dw_resume_cb; uint16_t refc; uint8_t tid; uint8_t lcid; @@ -60,7 +68,6 @@ typedef struct line_s typedef struct context_s { line_t *line; - hio_t *src_io; shift_buffer_t *payload; int fd; bool init; @@ -113,6 +120,64 @@ inline line_t *newLine(uint8_t tid) memset(&(result->chains_state), 0, (sizeof(void *) * kMaxChainLen)); return result; } +inline bool isAlive(line_t *line) +{ + return line->alive; +} + +inline void setupLineUpSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb) +{ + l->up_state = state; + l->up_pause_cb = pause_cb; + l->up_resume_cb = resume_cb; +} +inline void setupLineDownSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb) +{ + l->dw_state = state; + l->dw_pause_cb = pause_cb; + l->dw_resume_cb = resume_cb; +} +inline void doneLineUpSide(line_t *l) +{ + l->up_state = NULL; +} +inline void doneLineDownSide(line_t *l) +{ + l->dw_state = NULL; +} + +inline void pauseLineUpSide(line_t *l) +{ + if (l->up_state) + { + l->up_pause_cb(l->up_state); + } +} + +inline void pauseLineDownSide(line_t *l) +{ + if (l->dw_state) + { + l->dw_pause_cb(l->dw_state); + } +} + +inline void resumeLineUpSide(line_t *l) +{ + if (l->up_state) + { + l->up_resume_cb(l->up_state); + } +} + +inline void resumeLineDownSide(line_t *l) +{ + if (l->dw_state) + { + l->dw_resume_cb(l->dw_state); + } +} + inline uint8_t reserveChainStateIndex(line_t *l) { uint8_t result = l->lcid; @@ -178,7 +243,7 @@ inline context_t *newContextFrom(context_t *source) { lockLine(source->line); context_t *new_ctx = malloc(sizeof(context_t)); - *new_ctx = (context_t){.line = source->line, .src_io = source->src_io}; + *new_ctx = (context_t){.line = source->line}; return new_ctx; } inline context_t *newEstContext(line_t *line) @@ -215,12 +280,6 @@ inline context_t *switchLine(context_t *c, line_t *line) c->line = line; return c; } -inline bool isAlive(line_t *line) -{ - return line->alive; -} - - inline void markAuthenticated(line_t *line) { @@ -231,7 +290,6 @@ inline bool isAuthenticated(line_t *line) return line->auth_cur > 0; } - inline buffer_pool_t *getThreadBufferPool(uint8_t tid) { return buffer_pools[tid];