diff --git a/CMakeLists.txt b/CMakeLists.txt index 0ec4f47c..43789003 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,6 +33,8 @@ option(INCLUDE_REVERSE_SERVER "link ReverseServer staticly to the core" TRUE) option(INCLUDE_REVERSE_CLIENT "link ReverseClient staticly to the core" TRUE) option(INCLUDE_HEADER_SERVER "link HeaderServer staticly to the core" TRUE) option(INCLUDE_HEADER_CLIENT "link HeaderClient staticly to the core" TRUE) +option(INCLUDE_PRECONNECT_CLIENT "link PreConnectClient staticly to the core" TRUE) + # create project project(Waterwall VERSION 0.1) @@ -227,7 +229,13 @@ target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/cli target_link_libraries(Waterwall HeaderClient) endif() - +#preconnect client +if (INCLUDE_PRECONNECT_CLIENT) +target_compile_definitions(Waterwall PUBLIC INCLUDE_PRECONNECT_CLIENT=1) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/client/preconnect) +target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/client/preconnect) +target_link_libraries(Waterwall PreConnectClient) +endif() diff --git a/core/static_tunnels.c b/core/static_tunnels.c index c3fa6b6a..458e8e4b 100644 --- a/core/static_tunnels.c +++ b/core/static_tunnels.c @@ -83,6 +83,10 @@ #include "tunnels/client/header/header_client.h" #endif +#ifdef INCLUDE_PRECONNECT_CLIENT +#include "tunnels/client/preconnect/preconnect_client.h" +#endif + void loadStaticTunnelsIntoCore() @@ -159,7 +163,9 @@ void loadStaticTunnelsIntoCore() USING(HeaderClient); #endif - +#ifdef INCLUDE_PRECONNECT_CLIENT + USING(PreConnectClient); +#endif } \ No newline at end of file diff --git a/tunnels/client/preconnect/CMakeLists.txt b/tunnels/client/preconnect/CMakeLists.txt new file mode 100644 index 00000000..bf445f6c --- /dev/null +++ b/tunnels/client/preconnect/CMakeLists.txt @@ -0,0 +1,37 @@ +add_library(PreConnectClient STATIC + preconnect_client.c + +) + + + +#ww api +target_include_directories(PreConnectClient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww) +target_link_libraries(PreConnectClient PUBLIC ww) +# target_compile_options(ww PUBLIC -fPIC) + + +# add dependencies +include(${CMAKE_BINARY_DIR}/cmake/CPM.cmake) + + +CPMAddPackage( + NAME stc + GIT_REPOSITORY https://github.com/stclib/STC + GIT_TAG 09790f024ad29fca6fe60528461eeb589d4a917b + DOWNLOAD_ONLY True +) + + +if(stc_ADDED) + target_include_directories(PreConnectClient PUBLIC ${stc_SOURCE_DIR}/include) +endif() + + + +target_compile_definitions(PreConnectClient PRIVATE STC_STATIC=1 PreConnectClient_VERSION=0.1) + + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") +target_compile_definitions(PreConnectClient PRIVATE DEBUG=1) +endif() diff --git a/tunnels/client/preconnect/helpers.h b/tunnels/client/preconnect/helpers.h new file mode 100644 index 00000000..9ec9d48c --- /dev/null +++ b/tunnels/client/preconnect/helpers.h @@ -0,0 +1,102 @@ +#pragma once +#include "types.h" +#include "loggers/network_logger.h" + +#define STATE(x) ((preconnect_client_state_t *)((x)->state)) +#define CSTATE(x) ((preconnect_client_con_state_t *)((((x)->line->chains_state)[self->chain_index]))) +#define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index] +#define ISALIVE(x) (((((x)->line->chains_state)[self->chain_index])) != NULL) +#define PRECONNECT_DELAY 100 +#undef max +#undef min +static inline size_t min(size_t x, size_t y) { return (((x) < (y)) ? (x) : (y)); } +static inline size_t max(size_t x, size_t y) { return (((x) < (y)) ? (y) : (x)); } + +static void add_connection(thread_box_t *box, + preconnect_client_con_state_t *con) +{ + con->next = box->root.next; + box->root.next = con; + con->prev = &box->root; + if (con->next) + { + con->next->prev = con; + } + box->length += 1; +} +static void remove_connection(thread_box_t *box, preconnect_client_con_state_t *con) +{ + + con->prev->next = con->next; + if (con->next) + { + con->next->prev = con->prev; + } + box->length -= 1; +} + +static preconnect_client_con_state_t *create_cstate(int tid) +{ + preconnect_client_con_state_t *cstate = malloc(sizeof(preconnect_client_con_state_t)); + memset(cstate, 0, sizeof(preconnect_client_con_state_t)); + cstate->u = newLine(tid); + + return cstate; +} + +static void destroy_cstate(preconnect_client_con_state_t *cstate) +{ + destroyLine(cstate->u); + + free(cstate); +} +static void do_connect(struct connect_arg *cg) +{ + tunnel_t *self = cg->t; + preconnect_client_state_t *state = STATE(self); + preconnect_client_con_state_t *cstate = create_cstate(cg->tid); + free(cg); + (cstate->u->chains_state)[self->chain_index] = cstate; + self->up->upStream(self->up, newInitContext(cstate->u)); +} + +static void connect_timer_finished(htimer_t *timer) +{ + do_connect(hevent_userdata(timer)); + htimer_del(timer); +} +static void before_connect(hevent_t *ev) +{ + struct connect_arg *cg = hevent_userdata(ev); + htimer_t *connect_timer = htimer_add(loops[cg->tid], connect_timer_finished, PRECONNECT_DELAY, 1); + hevent_set_userdata(connect_timer, cg); +} + +static void initiateConnect(tunnel_t *t) +{ + if (STATE(t)->unused_cons >= STATE(t)->min_unused_cons) + return; + + int tid = 0; + if (threads_count > 0) + { + tid = atomic_fetch_add_explicit(&(STATE(t)->round_index), 1, memory_order_relaxed); + + if (tid >= threads_count) + { + atomic_store_explicit(&(STATE(t)->round_index), 0, memory_order_relaxed); + tid = 0; + } + } + + hloop_t *worker_loop = loops[tid]; + hevent_t ev; + memset(&ev, 0, sizeof(ev)); + ev.loop = worker_loop; + ev.cb = before_connect; + struct connect_arg *cg = malloc(sizeof(struct connect_arg)); + cg->t = t; + cg->tid = tid; + ev.userdata = cg; + hloop_post_event(worker_loop, &ev); +} \ No newline at end of file diff --git a/tunnels/client/preconnect/preconnect_client.c b/tunnels/client/preconnect/preconnect_client.c new file mode 100644 index 00000000..0dc232ec --- /dev/null +++ b/tunnels/client/preconnect/preconnect_client.c @@ -0,0 +1,236 @@ +#include "preconnect_client.h" +#include "managers/node_manager.h" +#include "loggers/network_logger.h" +#include "types.h" +#include "helpers.h" + +static inline void upStream(tunnel_t *self, context_t *c) +{ + + preconnect_client_state_t *state = STATE(self); + if (c->payload != NULL) + { + preconnect_client_con_state_t *cstate = CSTATE(c); + switch (cstate->mode) + { + LOGF("PreConnectClient: invalid behaviour from behind node (no init)"); + DISCARD_CONTEXT(c); + destroyContext(c); + break; + + case connected_direct: + self->up->upStream(self->up, c); + break; + + case connected_pair: + self->up->upStream(self->up, switchLine(c, cstate->u)); + break; + case notconnected: + default: + LOGF("PreConnectClient: invalid value of connection state (memory error?)"); + exit(1); + + break; + } + } + else + { + const unsigned int tid = c->line->tid; + thread_box_t *this_tb = &(state->threads[tid]); + if (c->init) + { + // if (state->chain_index_d == 0) + // state->chain_index_d = reserveChainStateIndex(c->line); + + if (this_tb->length > 0) + { + preconnect_client_con_state_t *ucon = this_tb->root.next; + remove_connection(this_tb, ucon); + ucon->d = c->line; + ucon->mode = connected_pair; + CSTATE_MUT(c) = ucon; + self->dw->downStream(self->dw, newEstContext(c->line)); + } + else + { + preconnect_client_con_state_t *dcon = create_cstate(c->line->tid); + CSTATE_MUT(c) = dcon; + dcon->mode = connected_direct; + self->up->upStream(self->up, c); + return; + } + destroyContext(c); + } + else if (c->fin) + { + preconnect_client_con_state_t *dcon = CSTATE(c); + CSTATE_MUT(c) = NULL; + + switch (dcon->mode) + { + case connected_direct: + destroy_cstate(dcon); + self->up->upStream(self->up, c); + break; + + case connected_pair: + line_t *u_line = dcon->u; + (dcon->u->chains_state)[self->chain_index] = NULL; + destroy_cstate(dcon); + self->up->upStream(self->up, switchLine(c, u_line)); + break; + case notconnected: + default: + LOGF("PreConnectClient: invalid value of connection state (memory error?)"); + exit(1); + + break; + } + } + } +} + +static inline void downStream(tunnel_t *self, context_t *c) +{ + preconnect_client_state_t *state = STATE(self); + if (c->payload != NULL) + { + + preconnect_client_con_state_t *cstate = CSTATE(c); + + switch (cstate->mode) + { + case connected_direct: + self->dw->downStream(self->dw, c); + break; + + case connected_pair: + self->dw->downStream(self->dw, switchLine(c, cstate->d)); + break; + + case notconnected: + LOGE("PreConnectClient: this node is not purposed to handle downstream data before pairing"); + default: + LOGF("PreConnectClient: invalid value of connection state (memory error?)"); + exit(1); + + break; + } + } + else + { + const unsigned int tid = c->line->tid; + thread_box_t *this_tb = &(state->threads[tid]); + preconnect_client_con_state_t *ucon = CSTATE(c); + CSTATE_MUT(c) = NULL; + + if (c->fin) + { + switch (ucon->mode) + { + case connected_direct: + destroy_cstate(ucon); + self->dw->downStream(self->dw, c); + break; + + case connected_pair: + line_t *d_line = ucon->d; + (ucon->d->chains_state)[self->chain_index] = NULL; + destroy_cstate(ucon); + self->dw->downStream(self->dw, switchLine(c, d_line)); + break; + + case notconnected: + remove_connection(this_tb, ucon); + destroy_cstate(ucon); + destroyContext(c); + break; + + default: + LOGF("PreConnectClient: invalid value of connection state (memory error?)"); + exit(1); + + break; + } + } + else if (c->est) + { + add_connection(this_tb, ucon); + destroyContext(c); + } + } +} +static void preConnectClientUpStream(tunnel_t *self, context_t *c) +{ + upStream(self, c); +} +static void preConnectClientPacketUpStream(tunnel_t *self, context_t *c) +{ + upStream(self, c); +} +static void preConnectClientDownStream(tunnel_t *self, context_t *c) +{ + downStream(self, c); +} +static void preConnectClientPacketDownStream(tunnel_t *self, context_t *c) +{ + downStream(self, c); +} +static void start_preconnect(htimer_t *timer) +{ + tunnel_t *t = hevent_userdata(timer); + for (int i = 0; i < threads_count; i++) + { + const int cpt = STATE(t)->connection_per_thread; + + for (size_t ci = 0; ci < cpt; ci++) + { + initiateConnect(t); + } + } + + htimer_del(timer); +} +tunnel_t *newPreConnectClient(node_instance_context_t *instance_info) +{ + const size_t start_delay_ms = 150; + + preconnect_client_state_t *state = malloc(sizeof(preconnect_client_state_t) + (threads_count * sizeof(thread_box_t))); + memset(state, 0, sizeof(preconnect_client_state_t) + (threads_count * sizeof(thread_box_t))); + const cJSON *settings = instance_info->node_settings_json; + + getIntFromJsonObject(&(state->min_unused_cons), settings, "minimum-unused"); + + state->min_unused_cons = min(max(threads_count * 2, state->min_unused_cons), 128); + state->connection_per_thread = state->min_unused_cons / threads_count; + + tunnel_t *t = newTunnel(); + t->state = state; + t->upStream = &preConnectClientUpStream; + t->packetUpStream = &preConnectClientPacketUpStream; + t->downStream = &preConnectClientDownStream; + t->packetDownStream = &preConnectClientPacketDownStream; + + htimer_t *start_timer = htimer_add(loops[0], start_preconnect, start_delay_ms, 1); + hevent_set_userdata(start_timer, t); + + atomic_thread_fence(memory_order_release); + + return t; +} + +api_result_t apiPreConnectClient(tunnel_t *self, char *msg) +{ + LOGE("preConnectClient API NOT IMPLEMENTED"); + return (api_result_t){0}; // TODO +} + +tunnel_t *destroyPreConnectClient(tunnel_t *self) +{ + LOGE("preConnectClient DESTROY NOT IMPLEMENTED"); // TODO + return NULL; +} +tunnel_metadata_t getMetadataPreConnectClient() +{ + return (tunnel_metadata_t){.version = 0001, .flags = TFLAG_ROUTE_STARTER}; +} \ No newline at end of file diff --git a/tunnels/client/preconnect/preconnect_client.h b/tunnels/client/preconnect/preconnect_client.h new file mode 100644 index 00000000..2babb67c --- /dev/null +++ b/tunnels/client/preconnect/preconnect_client.h @@ -0,0 +1,15 @@ +#pragma once +#include "api.h" + +// +// con <------> PreConnectClient <-------> con (established ahead of time) +// +// + +tunnel_t *newPreConnectClient(node_instance_context_t *instance_info); +api_result_t apiPreConnectClient(tunnel_t *self, char *msg); +tunnel_t *destroyPreConnectClient(tunnel_t *self); +tunnel_metadata_t getMetadataPreConnectClient(); + + + diff --git a/tunnels/client/preconnect/types.h b/tunnels/client/preconnect/types.h new file mode 100644 index 00000000..a4f78d48 --- /dev/null +++ b/tunnels/client/preconnect/types.h @@ -0,0 +1,46 @@ +#pragma once +#include "api.h" +#include "buffer_stream.h" +#include "hv/hatomic.h" + +struct connect_arg +{ + unsigned int tid; + tunnel_t *t; +}; +typedef enum +{ + notconnected, + connected_direct, + connected_pair +} connection_state; + +typedef struct preconnect_client_con_state_s +{ + struct preconnect_client_con_state_s *prev, *next; + line_t *u; + line_t *d; + connection_state mode; + +} preconnect_client_con_state_t; + +typedef struct thread_box_s +{ + size_t length; + preconnect_client_con_state_t root; + +} thread_box_t; + +typedef struct preconnect_client_state_s +{ + atomic_uint preconnect_cons; + atomic_uint unused_cons; + + atomic_uint round_index; + size_t connection_per_thread; + + // settings + int min_unused_cons; + thread_box_t threads[]; + +} preconnect_client_state_t;