Skip to content

Commit

Permalink
changes in reverse tunnel / http2 ping interval
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Apr 11, 2024
1 parent af2c050 commit bbb1dcf
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 77 deletions.
2 changes: 1 addition & 1 deletion tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "types.h"

#define MAX_CONCURRENT_STREAMS 0xffffffffu
#define PING_INTERVAL 2500
#define PING_INTERVAL 5000

#define STATE(x) ((http2_client_state_t *)((x)->state))
#define CSTATE(x) ((void *)((((x)->line->chains_state)[self->chain_index])))
Expand Down
9 changes: 6 additions & 3 deletions tunnels/client/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#define CSTATE_D_MUT(x) ((x)->line->chains_state)[state->chain_index_pi]
#define CSTATE_U_MUT(x) ((x)->line->chains_state)[state->chain_index_pi]
#define ISALIVE(x) (((((x)->line->chains_state)[state->chain_index_pi])) != NULL)
#define PRECONNECT_DELAY 100
#define PRECONNECT_DELAY_SHORT 10
#define PRECONNECT_DELAY_HIGH 1500
#undef max
#undef min
static inline size_t min(size_t x, size_t y) { return (((x) < (y)) ? (x) : (y)); }
Expand Down Expand Up @@ -53,15 +54,16 @@ static void connect_timer_finished(htimer_t *timer)
static void before_connect(hevent_t *ev)
{
struct connect_arg *cg = hevent_userdata(ev);
htimer_t *connect_timer = htimer_add(loops[cg->tid], connect_timer_finished, PRECONNECT_DELAY, 1);
htimer_t *connect_timer = htimer_add(loops[cg->tid], connect_timer_finished, cg->delay, 1);
hevent_set_userdata(connect_timer, cg);
}

static void initiateConnect(tunnel_t *t, int tid)
{
if (STATE(t)->unused_cons[tid] >= STATE(t)->connection_per_thread)
return;
STATE(t)->unused_cons[tid] += 1;
bool more_delay = STATE(t)->unused_cons[tid] <= 0;
// STATE(t)->unused_cons[tid] += 1;

// int tid = 0;
// if (threads_count > 0)
Expand All @@ -83,6 +85,7 @@ static void initiateConnect(tunnel_t *t, int tid)
struct connect_arg *cg = malloc(sizeof(struct connect_arg));
cg->t = t;
cg->tid = tid;
cg->delay = more_delay?PRECONNECT_DELAY_HIGH:PRECONNECT_DELAY_SHORT;
ev.userdata = cg;
hloop_post_event(worker_loop, &ev);
}
52 changes: 40 additions & 12 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ static inline void upStream(tunnel_t *self, context_t *c)
if (c->payload != NULL)
{
reverse_client_con_state_t *dcstate = CSTATE_D(c);
if (!dcstate->first_sent)
if (!dcstate->first_sent_u)
{
dcstate->first_sent = true;
dcstate->first_sent_u = true;
c->first = true;
}
self->up->upStream(self->up, switchLine(c, dcstate->u));
Expand Down Expand Up @@ -54,23 +54,50 @@ static inline void downStream(tunnel_t *self, context_t *c)

if (ucstate->pair_connected)
{
self->dw->downStream(self->dw, switchLine(c, CSTATE_U(c)->d));
if (!ucstate->first_sent_d)
{
ucstate->first_sent_d = true;
context_t *turned = switchLine(c, ucstate->d);
turned->first = true;
self->dw->downStream(self->dw, turned);
}
else
self->dw->downStream(self->dw, switchLine(c, ucstate->d));
}
else
{
state->unused_cons[tid] -= 1;
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);
self->dw->downStream(self->dw, newInitContext(ucstate->d));
if (!ISALIVE(c))
if (CSTATE_U(c) == NULL)
{
DISCARD_CONTEXT(c);
reuseBuffer(buffer_pools[c->line->tid], c->payload);
c->payload = NULL;
destroyContext(c);
return;
}
ucstate->pair_connected = true;
c->first = true;
self->dw->downStream(self->dw, switchLine(c, CSTATE_U(c)->d));
initiateConnect(self, tid);

// first byte is 0xFF a signal from reverse server
uint8_t check = 0x0;
readUI8(c->payload, &check);
assert(check == (unsigned char)0xFF);
shiftr(c->payload, 1);
if (bufLen(c->payload) <= 0)
{
initiateConnect(self, tid);
reuseBuffer(buffer_pools[c->line->tid], c->payload);
c->payload = NULL;
destroyContext(c);
return;
}
else
{
ucstate->first_sent_d = true;
c->first = true;
self->dw->downStream(self->dw, switchLine(c, ucstate->d));
initiateConnect(self, tid);
}
}
}
else
Expand All @@ -84,27 +111,28 @@ static inline void downStream(tunnel_t *self, context_t *c)

if (ucstate->pair_connected)
{
const unsigned int old_reverse_cons = atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed);
const unsigned int old_reverse_cons = atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed);
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], old_reverse_cons - 1);
context_t *fc = switchLine(c, ucstate->d);
destroy_cstate(ucstate);
self->dw->downStream(self->dw, fc);
initiateConnect(self, tid);
}
else
{
destroy_cstate(ucstate);
state->unused_cons[tid] -= 1;
if (state->unused_cons[tid] > 0)
state->unused_cons[tid] -= 1;
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

destroyContext(c);
}
initiateConnect(self, tid);
}
else if (c->est)
{
CSTATE_U(c)->established = true;
// unused_cons[tid] += 1;
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));
destroyContext(c);
Expand Down
4 changes: 3 additions & 1 deletion tunnels/client/reverse/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
struct connect_arg
{
unsigned int tid;
int delay;
tunnel_t *t;
};

typedef struct reverse_client_con_state_s
{
bool pair_connected;
bool established;
bool first_sent;
bool first_sent_u;
bool first_sent_d;
line_t *u;
line_t *d;

Expand Down
48 changes: 48 additions & 0 deletions tunnels/server/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,54 @@
#define ISALIVE(x) (CSTATE(x) != NULL)



static void add_connection_u(thread_box_t *box,
reverse_server_con_state_t *con)
{
con->next = box->u_cons_root.next;
box->u_cons_root.next = con;
con->prev = &box->u_cons_root;
if (con->next)
{
con->next->prev = con;
}
box->u_count += 1;
}
static void remove_connection_u(thread_box_t *box, reverse_server_con_state_t *con)
{
con->prev->next = con->next;
if (con->next)
{
con->next->prev = con->prev;
}
box->u_count -= 1;
}


static void add_connection_d(thread_box_t *box,
reverse_server_con_state_t *con)
{
con->next = box->d_cons_root.next;
box->d_cons_root.next = con;
con->prev = &box->d_cons_root;
if (con->next)
{
con->next->prev = con;
}
box->d_count += 1;
}
static void remove_connection_d(thread_box_t *box, reverse_server_con_state_t *con)
{
con->prev->next = con->next;
if (con->next)
{
con->next->prev = con->prev;
}
box->d_count -= 1;
}



static reverse_server_con_state_t *create_cstate(bool isup, line_t *line)
{
reverse_server_con_state_t *cstate = malloc(sizeof(reverse_server_con_state_t));
Expand Down
Loading

0 comments on commit bbb1dcf

Please sign in to comment.