Skip to content

Commit

Permalink
new changes of reverse connection
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 14, 2024
1 parent 2c4e2fc commit d8a135c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
14 changes: 10 additions & 4 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,20 @@ static void downStream(tunnel_t *self, context_t *c)
{
if (! ucstate->first_sent_d)
{
ucstate->pair_connected = true;
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
}
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);
initiateConnect(self, tid, false);

context_t *turned = switchLine(c, ucstate->d);
if (! isAlive(ucstate->d))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
ucstate->first_sent_d = true;
context_t *turned = switchLine(c, ucstate->d);
turned->first = true;
self->dw->downStream(self->dw, turned);
}
Expand All @@ -89,6 +93,8 @@ static void downStream(tunnel_t *self, context_t *c)
if (check != (unsigned char) 0xFF)
{
reuseContextBuffer(c);
CSTATE_U_MUT(c) = NULL;
(ucstate->d->chains_state)[state->chain_index_d] = NULL;
cleanup(ucstate);
self->up->upStream(self->up, newFinContextFrom(c));
destroyContext(c);
Expand All @@ -98,7 +104,7 @@ static void downStream(tunnel_t *self, context_t *c)
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

ucstate->pair_connected = true;
self->dw->downStream(self->dw, newInitContext(ucstate->d));

reuseContextBuffer(c);
Expand Down
24 changes: 19 additions & 5 deletions tunnels/server/reverse/reverse_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ static void upStream(tunnel_t *self, context_t *c)
if (dcstate->handshaked || check != (unsigned char) 0xFF)
{
reuseContextBuffer(c);
CSTATE_D_MUT(c) = NULL;
(dcstate->u->chains_state)[state->chain_index_u] = NULL;
cleanup(dcstate);
self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
return;
}
shiftr(c->payload, 1);
reuseContextBuffer(c);
dcstate->handshaked = true;
self->dw->downStream(self->dw, newEstContext(c->line));

Expand All @@ -61,7 +63,12 @@ static void upStream(tunnel_t *self, context_t *c)
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->dw->downStream(self->dw, hello_data_ctx);

if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
if (this_tb->u_count > 0)
{
reverse_server_con_state_t *ucstate = this_tb->u_cons_root.next;
Expand All @@ -72,13 +79,20 @@ static void upStream(tunnel_t *self, context_t *c)
setupLineUpSide(ucstate->d, onLinePausedD, ucstate, onLineResumedD);
CSTATE_D_MUT(c) = ucstate;
self->up->upStream(self->up, newEstContext(ucstate->u));

if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
flushWriteQueue(self, ucstate);
}
else
{
addConnectionD(this_tb, dcstate);
}
destroyContext(c);

}
}
else
Expand Down Expand Up @@ -155,13 +169,13 @@ static void downStream(tunnel_t *self, context_t *c)
CSTATE_U_MUT(c) = dcstate;
(dcstate->d->chains_state)[state->chain_index_d] = dcstate;
self->up->upStream(self->up, newEstContext(c->line));
if (CSTATE_U(c) == NULL)
if (! isAlive(c->line))
{
destroyContext(c);
return;
}
self->dw->downStream(self->dw, newEstContext(dcstate->d));
if (CSTATE_U(c) == NULL)
if (! isAlive(c->line))
{
destroyContext(c);
return;
Expand Down

0 comments on commit d8a135c

Please sign in to comment.