From bbb1dcf367fcdecad1b8be1ab601ea1cb9080aaf Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Thu, 11 Apr 2024 10:32:32 +0300 Subject: [PATCH] changes in reverse tunnel / http2 ping interval --- tunnels/client/http2/helpers.h | 2 +- tunnels/client/reverse/helpers.h | 9 +- tunnels/client/reverse/reverse_client.c | 52 ++++++++--- tunnels/client/reverse/types.h | 4 +- tunnels/server/reverse/helpers.h | 48 ++++++++++ tunnels/server/reverse/reverse_server.c | 112 +++++++++++++----------- tunnels/server/reverse/types.h | 12 ++- 7 files changed, 162 insertions(+), 77 deletions(-) diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index c2b2ebda..2534971c 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -3,7 +3,7 @@ #include "types.h" #define MAX_CONCURRENT_STREAMS 0xffffffffu -#define PING_INTERVAL 2500 +#define PING_INTERVAL 5000 #define STATE(x) ((http2_client_state_t *)((x)->state)) #define CSTATE(x) ((void *)((((x)->line->chains_state)[self->chain_index]))) diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 8345d837..fc381b29 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -8,7 +8,8 @@ #define CSTATE_D_MUT(x) ((x)->line->chains_state)[state->chain_index_pi] #define CSTATE_U_MUT(x) ((x)->line->chains_state)[state->chain_index_pi] #define ISALIVE(x) (((((x)->line->chains_state)[state->chain_index_pi])) != NULL) -#define PRECONNECT_DELAY 100 +#define PRECONNECT_DELAY_SHORT 10 +#define PRECONNECT_DELAY_HIGH 1500 #undef max #undef min static inline size_t min(size_t x, size_t y) { return (((x) < (y)) ? (x) : (y)); } @@ -53,7 +54,7 @@ static void connect_timer_finished(htimer_t *timer) static void before_connect(hevent_t *ev) { struct connect_arg *cg = hevent_userdata(ev); - htimer_t *connect_timer = htimer_add(loops[cg->tid], connect_timer_finished, PRECONNECT_DELAY, 1); + htimer_t *connect_timer = htimer_add(loops[cg->tid], connect_timer_finished, cg->delay, 1); hevent_set_userdata(connect_timer, cg); } @@ -61,7 +62,8 @@ static void initiateConnect(tunnel_t *t, int tid) { if (STATE(t)->unused_cons[tid] >= STATE(t)->connection_per_thread) return; - STATE(t)->unused_cons[tid] += 1; + bool more_delay = STATE(t)->unused_cons[tid] <= 0; + // STATE(t)->unused_cons[tid] += 1; // int tid = 0; // if (threads_count > 0) @@ -83,6 +85,7 @@ static void initiateConnect(tunnel_t *t, int tid) struct connect_arg *cg = malloc(sizeof(struct connect_arg)); cg->t = t; cg->tid = tid; + cg->delay = more_delay?PRECONNECT_DELAY_HIGH:PRECONNECT_DELAY_SHORT; ev.userdata = cg; hloop_post_event(worker_loop, &ev); } \ No newline at end of file diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index de45dde5..5524eea9 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -10,9 +10,9 @@ static inline void upStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { reverse_client_con_state_t *dcstate = CSTATE_D(c); - if (!dcstate->first_sent) + if (!dcstate->first_sent_u) { - dcstate->first_sent = true; + dcstate->first_sent_u = true; c->first = true; } self->up->upStream(self->up, switchLine(c, dcstate->u)); @@ -54,23 +54,50 @@ static inline void downStream(tunnel_t *self, context_t *c) if (ucstate->pair_connected) { - self->dw->downStream(self->dw, switchLine(c, CSTATE_U(c)->d)); + if (!ucstate->first_sent_d) + { + ucstate->first_sent_d = true; + context_t *turned = switchLine(c, ucstate->d); + turned->first = true; + self->dw->downStream(self->dw, turned); + } + else + self->dw->downStream(self->dw, switchLine(c, ucstate->d)); } else { state->unused_cons[tid] -= 1; atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed); self->dw->downStream(self->dw, newInitContext(ucstate->d)); - if (!ISALIVE(c)) + if (CSTATE_U(c) == NULL) { - DISCARD_CONTEXT(c); + reuseBuffer(buffer_pools[c->line->tid], c->payload); + c->payload = NULL; destroyContext(c); return; } ucstate->pair_connected = true; - c->first = true; - self->dw->downStream(self->dw, switchLine(c, CSTATE_U(c)->d)); - initiateConnect(self, tid); + + // first byte is 0xFF a signal from reverse server + uint8_t check = 0x0; + readUI8(c->payload, &check); + assert(check == (unsigned char)0xFF); + shiftr(c->payload, 1); + if (bufLen(c->payload) <= 0) + { + initiateConnect(self, tid); + reuseBuffer(buffer_pools[c->line->tid], c->payload); + c->payload = NULL; + destroyContext(c); + return; + } + else + { + ucstate->first_sent_d = true; + c->first = true; + self->dw->downStream(self->dw, switchLine(c, ucstate->d)); + initiateConnect(self, tid); + } } } else @@ -84,27 +111,28 @@ static inline void downStream(tunnel_t *self, context_t *c) if (ucstate->pair_connected) { - const unsigned int old_reverse_cons = atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed); + const unsigned int old_reverse_cons = atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed); LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], old_reverse_cons - 1); context_t *fc = switchLine(c, ucstate->d); destroy_cstate(ucstate); self->dw->downStream(self->dw, fc); + initiateConnect(self, tid); } else { destroy_cstate(ucstate); - state->unused_cons[tid] -= 1; + if (state->unused_cons[tid] > 0) + state->unused_cons[tid] -= 1; LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); destroyContext(c); } - initiateConnect(self, tid); } else if (c->est) { CSTATE_U(c)->established = true; - // unused_cons[tid] += 1; + state->unused_cons[tid] += 1; LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); destroyContext(c); diff --git a/tunnels/client/reverse/types.h b/tunnels/client/reverse/types.h index 40f0954d..bc23c696 100644 --- a/tunnels/client/reverse/types.h +++ b/tunnels/client/reverse/types.h @@ -6,6 +6,7 @@ struct connect_arg { unsigned int tid; + int delay; tunnel_t *t; }; @@ -13,7 +14,8 @@ typedef struct reverse_client_con_state_s { bool pair_connected; bool established; - bool first_sent; + bool first_sent_u; + bool first_sent_d; line_t *u; line_t *d; diff --git a/tunnels/server/reverse/helpers.h b/tunnels/server/reverse/helpers.h index e5dec445..6f02cce8 100644 --- a/tunnels/server/reverse/helpers.h +++ b/tunnels/server/reverse/helpers.h @@ -10,6 +10,54 @@ #define ISALIVE(x) (CSTATE(x) != NULL) + +static void add_connection_u(thread_box_t *box, + reverse_server_con_state_t *con) +{ + con->next = box->u_cons_root.next; + box->u_cons_root.next = con; + con->prev = &box->u_cons_root; + if (con->next) + { + con->next->prev = con; + } + box->u_count += 1; +} +static void remove_connection_u(thread_box_t *box, reverse_server_con_state_t *con) +{ + con->prev->next = con->next; + if (con->next) + { + con->next->prev = con->prev; + } + box->u_count -= 1; +} + + +static void add_connection_d(thread_box_t *box, + reverse_server_con_state_t *con) +{ + con->next = box->d_cons_root.next; + box->d_cons_root.next = con; + con->prev = &box->d_cons_root; + if (con->next) + { + con->next->prev = con; + } + box->d_count += 1; +} +static void remove_connection_d(thread_box_t *box, reverse_server_con_state_t *con) +{ + con->prev->next = con->next; + if (con->next) + { + con->next->prev = con->prev; + } + box->d_count -= 1; +} + + + static reverse_server_con_state_t *create_cstate(bool isup, line_t *line) { reverse_server_con_state_t *cstate = malloc(sizeof(reverse_server_con_state_t)); diff --git a/tunnels/server/reverse/reverse_server.c b/tunnels/server/reverse/reverse_server.c index 249209c4..5075d83f 100644 --- a/tunnels/server/reverse/reverse_server.c +++ b/tunnels/server/reverse/reverse_server.c @@ -7,13 +7,33 @@ static void flush_write_queue(tunnel_t *self, reverse_server_con_state_t *cstate) { - while (contextQueueLen(cstate->uqueue) > 0) - { - self->dw->downStream(self->dw, switchLine(contextQueuePop(cstate->uqueue), cstate->d)); + if (contextQueueLen(cstate->uqueue) > 0) + while (contextQueueLen(cstate->uqueue) > 0) + { + if (!cstate->signal_sent) + { + cstate->signal_sent = true; + context_t *c = switchLine(contextQueuePop(cstate->uqueue), cstate->d); + shiftl(c->payload, 1); + writeUI8(c->payload, 0xFF); + self->dw->downStream(self->dw, c); + } + else + self->dw->downStream(self->dw, switchLine(contextQueuePop(cstate->uqueue), cstate->d)); - // 1 context is stalled form the caller, so this will not be read after free - if (cstate->d->chains_state[STATE(self)->chain_index_d] == NULL) - return; + // 1 context is held by the caller, so this will not be read after free + if (cstate->d->chains_state[STATE(self)->chain_index_d] == NULL) + return; + } + else + { + cstate->signal_sent = true; + shift_buffer_t *buf = popBuffer(buffer_pools[cstate->d->tid]); + shiftl(buf, 1); + writeUI8(buf, 0xFF); + context_t *c = newContext(cstate->d); + c->payload = buf; + self->dw->downStream(self->dw, c); } } @@ -28,7 +48,8 @@ static inline void upStream(tunnel_t *self, context_t *c) else { // a real pair will not send that before it receives something - DISCARD_CONTEXT(c); + reuseBuffer(buffer_pools[c->line->tid], c->payload); + c->payload = NULL; destroyContext(c); } } @@ -43,23 +64,20 @@ static inline void upStream(tunnel_t *self, context_t *c) if (this_tb->u_count > 0) { - reverse_server_con_state_t *ucstate = qcons_pull(&this_tb->u_cons); + reverse_server_con_state_t *ucstate = this_tb->u_cons_root.next; + remove_connection_u(this_tb, ucstate); ucstate->d = c->line; ucstate->paired = true; - ucstate->samethread = true; CSTATE_D_MUT(c) = ucstate; - this_tb->u_count -= 1; self->up->upStream(self->up, newEstContext(ucstate->u)); self->dw->downStream(self->dw, newEstContext(c->line)); flush_write_queue(self, ucstate); - } else { reverse_server_con_state_t *dcstate = create_cstate(false, c->line); CSTATE_D_MUT(c) = dcstate; - qcons_push(&(this_tb->d_cons), dcstate); - this_tb->d_count += 1; + add_connection_d(this_tb, dcstate); } destroyContext(c); } @@ -77,21 +95,9 @@ static inline void upStream(tunnel_t *self, context_t *c) } else { - size_t len = qcons_size(&(this_tb->d_cons)); - while (len--) - { - reverse_server_con_state_t *q_dcs = qcons_pull(&(this_tb->d_cons)); - if (q_dcs == dcstate) - { - this_tb->d_count -= 1; - destroy_cstate(dcstate); - break; - } - else - qcons_push(&(this_tb->d_cons), q_dcs); - } + remove_connection_d(this_tb, dcstate); + destroy_cstate(dcstate); destroyContext(c); - } } } @@ -120,23 +126,40 @@ static inline void downStream(tunnel_t *self, context_t *c) if (this_tb->d_count > 0) { - reverse_server_con_state_t *dcstate = qcons_pull(&this_tb->d_cons); + + reverse_server_con_state_t *dcstate = this_tb->d_cons_root.next; + remove_connection_d(this_tb, dcstate); dcstate->u = c->line; dcstate->paired = true; - dcstate->samethread = true; CSTATE_U_MUT(c) = dcstate; (dcstate->d->chains_state)[state->chain_index_d] = dcstate; - this_tb->d_count -= 1; self->up->upStream(self->up, newEstContext(c->line)); + if (CSTATE_U(c) == NULL) + { + destroyContext(c); + return; + } self->dw->downStream(self->dw, newEstContext(dcstate->d)); + if (CSTATE_U(c) == NULL) + { + destroyContext(c); + return; + } + + dcstate->signal_sent = true; + shift_buffer_t *buf = popBuffer(buffer_pools[dcstate->d->tid]); + shiftl(buf, 1); + writeUI8(buf, 0xFF); + context_t *c = newContext(dcstate->d); + c->payload = buf; + self->dw->downStream(self->dw, c); } else { - LOGW("reverseServer: no peer left, waiting tid: %d",c->line->tid); + LOGW("reverseServer: no peer left, waiting tid: %d", c->line->tid); reverse_server_con_state_t *ucstate = create_cstate(true, c->line); CSTATE_U_MUT(c) = ucstate; - qcons_push(&(this_tb->u_cons), ucstate); - this_tb->u_count += 1; + add_connection_u(this_tb, ucstate); } destroyContext(c); } @@ -155,21 +178,10 @@ static inline void downStream(tunnel_t *self, context_t *c) } else { - size_t len = qcons_size(&(this_tb->u_cons)); - while (len--) - { - reverse_server_con_state_t *q_ucs = qcons_pull(&(this_tb->u_cons)); - if (q_ucs == ucstate) - { - this_tb->u_count -= 1; - destroy_cstate(ucstate); - break; - } - else - qcons_push(&(this_tb->u_cons), q_ucs); - } - destroyContext(c); + remove_connection_u(this_tb, ucstate); + destroy_cstate(ucstate); + destroyContext(c); } } } @@ -197,12 +209,6 @@ tunnel_t *newReverseServer(node_instance_context_t *instance_info) reverse_server_state_t *state = malloc(sizeof(reverse_server_state_t) + (threads_count * sizeof(thread_box_t))); memset(state, 0, sizeof(reverse_server_state_t) + (threads_count * sizeof(thread_box_t))); - for (size_t i = 0; i < threads_count; i++) - { - state->threads[i].d_cons = qcons_with_capacity(64); - state->threads[i].u_cons = qcons_with_capacity(64); - } - tunnel_t *t = newTunnel(); t->state = state; t->upStream = &reverseServerUpStream; diff --git a/tunnels/server/reverse/types.h b/tunnels/server/reverse/types.h index b231a04a..94a9b019 100644 --- a/tunnels/server/reverse/types.h +++ b/tunnels/server/reverse/types.h @@ -5,25 +5,23 @@ typedef struct reverse_server_con_state_s { + struct reverse_server_con_state_s *prev, *next; bool paired; - bool samethread; + bool signal_sent; context_queue_t *uqueue; - line_t *u; line_t *d; } reverse_server_con_state_t; -#define i_TYPE qcons, reverse_server_con_state_t * -#define i_use_cmp -#include "stc/queue.h" + typedef struct thread_box_s { size_t d_count; - qcons d_cons; size_t u_count; - qcons u_cons; + reverse_server_con_state_t d_cons_root; + reverse_server_con_state_t u_cons_root; } thread_box_t;