Skip to content

Commit

Permalink
fixes due to now line pause/resume model
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 15, 2024
1 parent 532a8c5 commit 8bd752b
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 43 deletions.
14 changes: 13 additions & 1 deletion tunnels/client/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static reverse_client_con_state_t *createCstate(uint8_t tid)
line_t *dw = newLine(tid);
reserveChainStateIndex(dw); // we always take one from the down line
setupLineDownSide(up, onLinePausedU, cstate, onLineResumedU);
setupLineDownSide(dw, onLinePausedD, cstate, onLinePausedD);
setupLineDownSide(dw, onLinePausedD, cstate, onLineResumedD);
cstate->u = up;
cstate->d = dw;

Expand All @@ -65,7 +65,19 @@ static void doConnect(struct connect_arg *cg)
free(cg);
(cstate->u->chains_state)[self->chain_index] = cstate;
(cstate->d->chains_state)[state->chain_index_d] = cstate;
context_t *hello_data_ctx = newContext(cstate->u);
self->up->upStream(self->up, newInitContext(cstate->u));

if (! isAlive(cstate->u))
{
destroyContext(hello_data_ctx);
return;
}

hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx));
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->up->upStream(self->up, hello_data_ctx);
}

static void connectTimerFinished(htimer_t *timer)
Expand Down
63 changes: 28 additions & 35 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "buffer_pool.h"
#include "helpers.h"
#include "loggers/network_logger.h"
#include "shiftbuffer.h"
#include "tunnel.h"
#include "types.h"
#include "utils/jsonutils.h"
Expand Down Expand Up @@ -60,15 +59,22 @@ static void downStream(tunnel_t *self, context_t *c)

if (ucstate->pair_connected)
{
if (! ucstate->first_sent_d)
self->dw->downStream(self->dw, switchLine(c, ucstate->d));
}
else
{
if (ucstate->handshaked)
{
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
}
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);

ucstate->pair_connected = true;
context_t *turned = switchLine(c, ucstate->d);
self->dw->downStream(self->dw, newInitContext(ucstate->d));
initiateConnect(self, tid, false);
context_t *turned = switchLine(c, ucstate->d);
if (! isAlive(ucstate->d))
{
reuseContextBuffer(c);
Expand All @@ -81,35 +87,29 @@ static void downStream(tunnel_t *self, context_t *c)
}
else
{
self->dw->downStream(self->dw, switchLine(c, ucstate->d));
}
}
else
{

// first byte is 0xFF a signal from reverse server
uint8_t check = 0x0;
readUI8(c->payload, &check);
if (check != (unsigned char) 0xFF)
{
// first byte is 0xFF a signal from reverse server
uint8_t check = 0x0;
readUI8(c->payload, &check);
if (check != (unsigned char) 0xFF)
{
reuseContextBuffer(c);
CSTATE_U_MUT(c) = NULL;
(ucstate->d->chains_state)[state->chain_index_d] = NULL;
cleanup(ucstate);
self->up->upStream(self->up, newFinContextFrom(c));
destroyContext(c);
return;
}
ucstate->handshaked = true;
reuseContextBuffer(c);
CSTATE_U_MUT(c) = NULL;
(ucstate->d->chains_state)[state->chain_index_d] = NULL;
cleanup(ucstate);
self->up->upStream(self->up, newFinContextFrom(c));

state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

destroyContext(c);
return;
}
shiftr(c->payload, 1);
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));
ucstate->pair_connected = true;
self->dw->downStream(self->dw, newInitContext(ucstate->d));

reuseContextBuffer(c);
destroyContext(c);
return;
}
}
else
Expand Down Expand Up @@ -148,13 +148,6 @@ static void downStream(tunnel_t *self, context_t *c)
else if (c->est)
{
CSTATE_U(c)->established = true;

context_t *hello_data_ctx = newContextFrom(c);
hello_data_ctx->payload = popBuffer(getContextBufferPool(c));
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->up->upStream(self->up, hello_data_ctx);

destroyContext(c);
}
else
Expand Down
1 change: 1 addition & 0 deletions tunnels/client/reverse/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct reverse_client_con_state_s
{
bool pair_connected;
bool established;
bool handshaked;
bool first_sent_u;
bool first_sent_d;
line_t *u;
Expand Down
20 changes: 13 additions & 7 deletions tunnels/server/reverse/reverse_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,34 @@ static void upStream(tunnel_t *self, context_t *c)
thread_box_t *this_tb = &(state->threadlocal_pool[c->line->tid]);
reverse_server_con_state_t *dcstate = CSTATE_D(c);

// first byte is 0xFF a signal from reverse server
// first byte is 0xFF a signal from reverse client
uint8_t check = 0x0;
readUI8(c->payload, &check);
reuseContextBuffer(c);
if (dcstate->handshaked || check != (unsigned char) 0xFF)
{
reuseContextBuffer(c);
CSTATE_D_MUT(c) = NULL;
(dcstate->u->chains_state)[state->chain_index_u] = NULL;
cleanup(dcstate);
self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
return;
}
reuseContextBuffer(c);
dcstate->handshaked = true;
self->dw->downStream(self->dw, newEstContext(c->line));

context_t *hello_data_ctx = newContextFrom(c);
hello_data_ctx->payload = popBuffer(getContextBufferPool(c));
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->dw->downStream(self->dw, hello_data_ctx);

if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}

if (this_tb->u_count > 0)
{
reverse_server_con_state_t *ucstate = this_tb->u_cons_root.next;
Expand All @@ -77,7 +77,15 @@ static void upStream(tunnel_t *self, context_t *c)
ucstate->paired = true;
setupLineUpSide(ucstate->u, onLinePausedU, ucstate, onLineResumedU);
setupLineUpSide(ucstate->d, onLinePausedD, ucstate, onLineResumedD);
cleanup(CSTATE_D(c));
CSTATE_D_MUT(c) = ucstate;
self->dw->downStream(self->dw, newEstContext(c->line));
if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
self->up->upStream(self->up, newEstContext(ucstate->u));
if (! isAlive(c->line))
{
Expand All @@ -92,7 +100,6 @@ static void upStream(tunnel_t *self, context_t *c)
addConnectionD(this_tb, dcstate);
}
destroyContext(c);

}
}
else
Expand Down Expand Up @@ -152,14 +159,13 @@ static void downStream(tunnel_t *self, context_t *c)
thread_box_t *this_tb = &(state->threadlocal_pool[tid]);
if (c->init)
{
if (state->chain_index_u == 0)
if (WW_UNLIKELY(state->chain_index_u == 0))
{
state->chain_index_u = reserveChainStateIndex(c->line);
}

if (this_tb->d_count > 0)
{

reverse_server_con_state_t *dcstate = this_tb->d_cons_root.next;
removeConnectionD(this_tb, dcstate);
dcstate->u = c->line;
Expand Down

0 comments on commit 8bd752b

Please sign in to comment.