Skip to content

Commit

Permalink
[WIP] 80% add idle table timeout checks on reverse connections
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 27, 2024
1 parent 7cf4357 commit 6b1cbe5
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 32 deletions.
6 changes: 3 additions & 3 deletions tunnels/adapters/listener/udp/udp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static void cleanup(udp_listener_con_state_t *cstate)

if (cstate->idle_handle != NULL)
{
if (removeIdleItemByHash(cstate->table, cstate->idle_handle->hash))
if (removeIdleItemByHash(cstate->idle_handle->tid,cstate->table, cstate->idle_handle->hash))
{
free(cstate);
}
Expand Down Expand Up @@ -135,7 +135,7 @@ static void onUdpConnectonExpire(idle_item_t *idle_udp)
free(cstate);
return;
}
LOGD("UdpListener: expired idle udp FD:%x ", (int) hio_fd(cstate->io));
LOGD("UdpListener: expired idle udp FD:%x ", hio_fd(cstate->io));
cstate->idle_handle = NULL;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;
Expand Down Expand Up @@ -173,7 +173,7 @@ static udp_listener_con_state_t *newConnection(uint8_t tid, tunnel_t *self, hio_
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};

LOGD("UdpListener: Accepted FD:%x [%s] <= [%s]", (int) hio_fd(cstate->io),
LOGD("UdpListener: Accepted FD:%x [%s] <= [%s]", hio_fd(cstate->io),
SOCKADDR_STR(&log_localaddr, localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
}

Expand Down
6 changes: 6 additions & 0 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "reverse_client.h"
#include "helpers.h"
#include "idle_table.h"
#include "loggers/network_logger.h"
#include "shiftbuffer.h"
#include "tunnel.h"
Expand Down Expand Up @@ -160,6 +161,9 @@ static void downStream(tunnel_t *self, context_t *c)
{
CSTATE_U(c)->established = true;
initiateConnect(self, tid, false);

idle_item_t* con_idle_item = newIdleItem(cstate->starved_connections,
hash_t key, void *userdata, ExpireCallBack cb, uint8_t tid, uint64_t age_ms)
destroyContext(c);
}
else
Expand Down Expand Up @@ -199,6 +203,8 @@ tunnel_t *newReverseClient(node_instance_context_t *instance_info)
state->chain_index_d = index;
destroyLine(l);

state->starved_connections = newIdleTable(loops[0]);

tunnel_t *t = newTunnel();
t->state = state;
t->upStream = &upStream;
Expand Down
16 changes: 7 additions & 9 deletions tunnels/client/reverse/types.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once
#include "api.h"
#include "buffer_stream.h"
#include "hatomic.h"
#include "idle_table.h"

struct connect_arg
{
Expand All @@ -23,12 +22,11 @@ typedef struct reverse_client_con_state_s

typedef struct reverse_client_state_s
{
atomic_uint reverse_cons;
atomic_uint round_index;

uint8_t chain_index_d;

unsigned int min_unused_cons;
unsigned int unused_cons[];
atomic_uint reverse_cons;
atomic_uint round_index;
uint8_t chain_index_d;
unsigned int min_unused_cons;
idle_table_t *starved_connections;
unsigned int unused_cons[];

} reverse_client_state_t;
26 changes: 11 additions & 15 deletions ww/idle_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,7 @@ enum
#define i_TYPE hmap_idles_t, uint64_t, struct idle_item_s *
#include "stc/hmap.h"

struct udp_listener_state_s
{
// settings
char *address;
int multiport_backend;
uint16_t port_min;
uint16_t port_max;
char **white_list_raddr;
char **black_list_raddr;
};

struct idle_table_s
{
hloop_t *loop;
Expand All @@ -47,7 +38,7 @@ struct idle_table_s

void idleCallBack(htimer_t *timer);

void destoryIdleTable(idle_table_t *self)
void destroyIdleTable(idle_table_t *self)
{
htimer_del(self->idle_handle);
heapq_idles_t_drop(&(self->hqueue));
Expand Down Expand Up @@ -78,9 +69,14 @@ idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, ExpireC
*item = (idle_item_t){
.expire_at_ms = hloop_now_ms(loops[tid]) + age_ms, .hash = key, .tid = tid, .userdata = userdata, .cb = cb};

heapq_idles_t_push(&(self->hqueue), item);

hmap_idles_t_push(&(self->hmap), (hmap_idles_t_value){item->hash, item});
if(NULL == hmap_idles_t_push(&(self->hmap), (hmap_idles_t_value){item->hash, item})){
// hash is already in the table !
free(item);
hhybridmutex_unlock(&(self->mutex));
return NULL;
}
heapq_idles_t_push(&(self->hqueue), item);
hhybridmutex_unlock(&(self->mutex));
return item;
}
Expand Down Expand Up @@ -130,12 +126,12 @@ idle_item_t *getIdleItemByHash(uint8_t tid, idle_table_t *self, hash_t key)
// // }
// // }
// }
bool removeIdleItemByHash(idle_table_t *self, hash_t key)
bool removeIdleItemByHash(uint8_t tid, idle_table_t *self, hash_t key)
{
hhybridmutex_lock(&(self->mutex));

hmap_idles_t_iter find_result = hmap_idles_t_find(&(self->hmap), key);
if (find_result.ref == hmap_idles_t_end(&(self->hmap)).ref)
if (find_result.ref == hmap_idles_t_end(&(self->hmap)).ref || find_result.ref->second->tid != tid)
{
hhybridmutex_unlock(&(self->mutex));
return false;
Expand Down
10 changes: 5 additions & 5 deletions ww/idle_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
idle item is a threadlocal item, it belongs to the thread that created it
and other threads must not change , remove or do anything to it
because of that, tid parameter is required in order to find the item
*/

Expand All @@ -31,18 +32,17 @@ struct idle_item_s
{
void *userdata;
uint64_t expire_at_ms;
ExpireCallBack cb;
hash_t hash;
uint8_t tid;
ExpireCallBack cb;
};
typedef struct idle_table_s idle_table_t;

idle_table_t *newIdleTable(hloop_t *loop);
void destoryIdleTable(idle_table_t *self);
idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, ExpireCallBack cb, uint8_t tid,
uint64_t age_ms);

// [Notice] only the owner thread of an idle item can use these functios on it
idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, ExpireCallBack cb, uint8_t tid,
uint64_t age_ms);
idle_item_t *getIdleItemByHash(uint8_t tid, idle_table_t *self, hash_t key);
void keepIdleItemForAtleast(idle_table_t *self, idle_item_t *item, uint64_t age_ms);
bool removeIdleItemByHash(idle_table_t *self, hash_t key);
bool removeIdleItemByHash(uint8_t tid, idle_table_t *self, hash_t key);

0 comments on commit 6b1cbe5

Please sign in to comment.