From 8bd752b716ef42f4bc672599ddbf315e6b31804a Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Wed, 15 May 2024 00:52:36 +0000 Subject: [PATCH] fixes due to now line pause/resume model --- tunnels/client/reverse/helpers.h | 14 +++++- tunnels/client/reverse/reverse_client.c | 63 +++++++++++-------------- tunnels/client/reverse/types.h | 1 + tunnels/server/reverse/reverse_server.c | 20 +++++--- 4 files changed, 55 insertions(+), 43 deletions(-) diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 9235b0da..124a1f95 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -42,7 +42,7 @@ static reverse_client_con_state_t *createCstate(uint8_t tid) line_t *dw = newLine(tid); reserveChainStateIndex(dw); // we always take one from the down line setupLineDownSide(up, onLinePausedU, cstate, onLineResumedU); - setupLineDownSide(dw, onLinePausedD, cstate, onLinePausedD); + setupLineDownSide(dw, onLinePausedD, cstate, onLineResumedD); cstate->u = up; cstate->d = dw; @@ -65,7 +65,19 @@ static void doConnect(struct connect_arg *cg) free(cg); (cstate->u->chains_state)[self->chain_index] = cstate; (cstate->d->chains_state)[state->chain_index_d] = cstate; + context_t *hello_data_ctx = newContext(cstate->u); self->up->upStream(self->up, newInitContext(cstate->u)); + + if (! isAlive(cstate->u)) + { + destroyContext(hello_data_ctx); + return; + } + + hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx)); + setLen(hello_data_ctx->payload, 1); + writeUI8(hello_data_ctx->payload, 0xFF); + self->up->upStream(self->up, hello_data_ctx); } static void connectTimerFinished(htimer_t *timer) diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index b63aa79b..c6f5d7f1 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -2,7 +2,6 @@ #include "buffer_pool.h" #include "helpers.h" #include "loggers/network_logger.h" -#include "shiftbuffer.h" #include "tunnel.h" #include "types.h" #include "utils/jsonutils.h" @@ -60,15 +59,22 @@ static void downStream(tunnel_t *self, context_t *c) if (ucstate->pair_connected) { - if (! ucstate->first_sent_d) + self->dw->downStream(self->dw, switchLine(c, ucstate->d)); + } + else + { + if (ucstate->handshaked) { if (state->unused_cons[tid] > 0) { state->unused_cons[tid] -= 1; } atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed); + + ucstate->pair_connected = true; + context_t *turned = switchLine(c, ucstate->d); + self->dw->downStream(self->dw, newInitContext(ucstate->d)); initiateConnect(self, tid, false); - context_t *turned = switchLine(c, ucstate->d); if (! isAlive(ucstate->d)) { reuseContextBuffer(c); @@ -81,35 +87,29 @@ static void downStream(tunnel_t *self, context_t *c) } else { - self->dw->downStream(self->dw, switchLine(c, ucstate->d)); - } - } - else - { - - // first byte is 0xFF a signal from reverse server - uint8_t check = 0x0; - readUI8(c->payload, &check); - if (check != (unsigned char) 0xFF) - { + // first byte is 0xFF a signal from reverse server + uint8_t check = 0x0; + readUI8(c->payload, &check); + if (check != (unsigned char) 0xFF) + { + reuseContextBuffer(c); + CSTATE_U_MUT(c) = NULL; + (ucstate->d->chains_state)[state->chain_index_d] = NULL; + cleanup(ucstate); + self->up->upStream(self->up, newFinContextFrom(c)); + destroyContext(c); + return; + } + ucstate->handshaked = true; reuseContextBuffer(c); - CSTATE_U_MUT(c) = NULL; - (ucstate->d->chains_state)[state->chain_index_d] = NULL; - cleanup(ucstate); - self->up->upStream(self->up, newFinContextFrom(c)); + + state->unused_cons[tid] += 1; + LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], + atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); + destroyContext(c); return; } - shiftr(c->payload, 1); - state->unused_cons[tid] += 1; - LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], - atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); - ucstate->pair_connected = true; - self->dw->downStream(self->dw, newInitContext(ucstate->d)); - - reuseContextBuffer(c); - destroyContext(c); - return; } } else @@ -148,13 +148,6 @@ static void downStream(tunnel_t *self, context_t *c) else if (c->est) { CSTATE_U(c)->established = true; - - context_t *hello_data_ctx = newContextFrom(c); - hello_data_ctx->payload = popBuffer(getContextBufferPool(c)); - setLen(hello_data_ctx->payload, 1); - writeUI8(hello_data_ctx->payload, 0xFF); - self->up->upStream(self->up, hello_data_ctx); - destroyContext(c); } else diff --git a/tunnels/client/reverse/types.h b/tunnels/client/reverse/types.h index 5e650dbf..16275d99 100644 --- a/tunnels/client/reverse/types.h +++ b/tunnels/client/reverse/types.h @@ -14,6 +14,7 @@ typedef struct reverse_client_con_state_s { bool pair_connected; bool established; + bool handshaked; bool first_sent_u; bool first_sent_d; line_t *u; diff --git a/tunnels/server/reverse/reverse_server.c b/tunnels/server/reverse/reverse_server.c index cbebc752..7c84f68c 100644 --- a/tunnels/server/reverse/reverse_server.c +++ b/tunnels/server/reverse/reverse_server.c @@ -41,12 +41,12 @@ static void upStream(tunnel_t *self, context_t *c) thread_box_t *this_tb = &(state->threadlocal_pool[c->line->tid]); reverse_server_con_state_t *dcstate = CSTATE_D(c); - // first byte is 0xFF a signal from reverse server + // first byte is 0xFF a signal from reverse client uint8_t check = 0x0; readUI8(c->payload, &check); + reuseContextBuffer(c); if (dcstate->handshaked || check != (unsigned char) 0xFF) { - reuseContextBuffer(c); CSTATE_D_MUT(c) = NULL; (dcstate->u->chains_state)[state->chain_index_u] = NULL; cleanup(dcstate); @@ -54,21 +54,21 @@ static void upStream(tunnel_t *self, context_t *c) destroyContext(c); return; } - reuseContextBuffer(c); dcstate->handshaked = true; - self->dw->downStream(self->dw, newEstContext(c->line)); context_t *hello_data_ctx = newContextFrom(c); hello_data_ctx->payload = popBuffer(getContextBufferPool(c)); setLen(hello_data_ctx->payload, 1); writeUI8(hello_data_ctx->payload, 0xFF); self->dw->downStream(self->dw, hello_data_ctx); + if (! isAlive(c->line)) { reuseContextBuffer(c); destroyContext(c); return; } + if (this_tb->u_count > 0) { reverse_server_con_state_t *ucstate = this_tb->u_cons_root.next; @@ -77,7 +77,15 @@ static void upStream(tunnel_t *self, context_t *c) ucstate->paired = true; setupLineUpSide(ucstate->u, onLinePausedU, ucstate, onLineResumedU); setupLineUpSide(ucstate->d, onLinePausedD, ucstate, onLineResumedD); + cleanup(CSTATE_D(c)); CSTATE_D_MUT(c) = ucstate; + self->dw->downStream(self->dw, newEstContext(c->line)); + if (! isAlive(c->line)) + { + reuseContextBuffer(c); + destroyContext(c); + return; + } self->up->upStream(self->up, newEstContext(ucstate->u)); if (! isAlive(c->line)) { @@ -92,7 +100,6 @@ static void upStream(tunnel_t *self, context_t *c) addConnectionD(this_tb, dcstate); } destroyContext(c); - } } else @@ -152,14 +159,13 @@ static void downStream(tunnel_t *self, context_t *c) thread_box_t *this_tb = &(state->threadlocal_pool[tid]); if (c->init) { - if (state->chain_index_u == 0) + if (WW_UNLIKELY(state->chain_index_u == 0)) { state->chain_index_u = reserveChainStateIndex(c->line); } if (this_tb->d_count > 0) { - reverse_server_con_state_t *dcstate = this_tb->d_cons_root.next; removeConnectionD(this_tb, dcstate); dcstate->u = c->line;