From e919891cc896154579729fe2541b95b34c1e6dfe Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Tue, 30 Apr 2024 10:20:53 +0000 Subject: [PATCH] progerss on udp/ general things --- core/main.c | 2 +- tunnels/adapters/bridge/bridge.c | 14 +- .../adapters/connector/udp/udp_connector.c | 4 +- tunnels/client/header/header_client.c | 2 + tunnels/client/preconnect/preconnect_client.c | 19 +- tunnels/client/reverse/reverse_client.c | 3 +- tunnels/server/header/header_server.c | 1 + .../server/trojan/auth/trojan_auth_server.c | 7 +- ww/buffer_pool.h | 2 +- ww/context_queue.c | 1 - ww/eventloop/base/hlsem.c | 273 +++++++++--------- ww/eventloop/base/hmutex.h | 22 +- ww/frand.c | 3 +- ww/idle_table.c | 60 ++-- ww/idle_table.h | 15 +- ww/loggers/core_logger.h | 1 + ww/loggers/dns_logger.h | 1 + ww/loggers/network_logger.h | 1 + ww/managers/socket_manager.c | 208 +++++++++---- ww/managers/socket_manager.h | 20 +- ww/tunnel.c | 3 +- ww/utils/utils.c | 16 +- 22 files changed, 378 insertions(+), 300 deletions(-) diff --git a/core/main.c b/core/main.c index 841ac1d0..e37a0466 100644 --- a/core/main.c +++ b/core/main.c @@ -58,7 +58,7 @@ int main(void) // libhv has a separate logger, we attach it to the core logger logger_set_level_by_str(hv_default_logger(), getCoreSettings()->core_log_level); - logger_set_handler(hv_default_logger(), getCoreLoggerHandle(getCoreSettings()->core_log_console)); + logger_set_handler(hv_default_logger(), getCoreLoggerHandle()); LOGI("Starting Waterwall version %s", TOSTRING(WATERWALL_VERSION)); LOGI("Parsing core file complete"); diff --git a/tunnels/adapters/bridge/bridge.c b/tunnels/adapters/bridge/bridge.c index 22ec467e..69d3daae 100644 --- a/tunnels/adapters/bridge/bridge.c +++ b/tunnels/adapters/bridge/bridge.c @@ -1,12 +1,12 @@ #include "bridge.h" #include "loggers/network_logger.h" #include "managers/node_manager.h" - +#include "utils/jsonutils.h" typedef struct bridge_state_s { bool mode_upside; // if this node is last node of upstream - node_t * pair_node; + node_t *pair_node; tunnel_t *pair; } bridge_state_t; @@ -16,7 +16,6 @@ typedef struct bridge_con_state_s } bridge_con_state_t; - static void upStream(tunnel_t *self, context_t *c) { bridge_state_t *state = STATE(self); @@ -39,9 +38,7 @@ static void upStream(tunnel_t *self, context_t *c) // } // self->upStream(self, c); - - state->pair->dw->downStream(state->pair->dw,c); - + state->pair->dw->downStream(state->pair->dw, c); } static inline void downStream(tunnel_t *self, context_t *c) @@ -67,14 +64,13 @@ static inline void downStream(tunnel_t *self, context_t *c) // } // self->downStream(self, c); - - state->pair->up->upStream(state->pair->up,c); + state->pair->up->upStream(state->pair->up, c); } tunnel_t *newBridge(node_instance_context_t *instance_info) { const cJSON *settings = instance_info->node_settings_json; - char * pair_node_name = NULL; + char *pair_node_name = NULL; if (! getStringFromJsonObject(&pair_node_name, settings, "pair")) { LOGF("Bridge: \"pair\" is not provided in json"); diff --git a/tunnels/adapters/connector/udp/udp_connector.c b/tunnels/adapters/connector/udp/udp_connector.c index 879038d7..e662c46f 100644 --- a/tunnels/adapters/connector/udp/udp_connector.c +++ b/tunnels/adapters/connector/udp/udp_connector.c @@ -9,7 +9,7 @@ static void cleanup(udp_connector_con_state_t *cstate) udp_connector_state_t *state = STATE(cstate->tunnel); free(cstate); } -static void onRecv(hio_t *io, shift_buffer_t *buf) +static void onRecvFrom(hio_t *io, shift_buffer_t *buf) { udp_connector_con_state_t *cstate = (udp_connector_con_state_t *) (hevent_userdata(io)); if (cstate == NULL) @@ -120,7 +120,7 @@ static void upStream(tunnel_t *self, context_t *c) cstate->io = upstream_io; hevent_set_userdata(upstream_io, cstate); - hio_setcb_read(upstream_io, onRecv); + hio_setcb_read(upstream_io, onRecvFrom); hio_read(upstream_io); socket_context_t *dest_ctx = &(c->line->dest_ctx); diff --git a/tunnels/client/header/header_client.c b/tunnels/client/header/header_client.c index 2d723afa..23d0c6d2 100644 --- a/tunnels/client/header/header_client.c +++ b/tunnels/client/header/header_client.c @@ -2,6 +2,8 @@ #include "buffer_stream.h" #include "hsocket.h" #include "loggers/network_logger.h" +#include "utils/jsonutils.h" + enum header_dynamic_value_status { diff --git a/tunnels/client/preconnect/preconnect_client.c b/tunnels/client/preconnect/preconnect_client.c index 230ebceb..d6feb6d7 100644 --- a/tunnels/client/preconnect/preconnect_client.c +++ b/tunnels/client/preconnect/preconnect_client.c @@ -3,6 +3,7 @@ #include "loggers/network_logger.h" #include "managers/node_manager.h" #include "types.h" +#include "utils/jsonutils.h" static inline void upStream(tunnel_t *self, context_t *c) { @@ -47,7 +48,7 @@ static inline void upStream(tunnel_t *self, context_t *c) ucon->mode = kConnectedPair; CSTATE_MUT(c) = ucon; self->dw->downStream(self->dw, newEstContext(c->line)); - initiateConnect(self,false); + initiateConnect(self, false); } else { @@ -121,7 +122,7 @@ static inline void downStream(tunnel_t *self, context_t *c) else { const unsigned int tid = c->line->tid; - thread_box_t * this_tb = &(state->workers[tid]); + thread_box_t *this_tb = &(state->workers[tid]); preconnect_client_con_state_t *ucon = CSTATE(c); if (c->fin) @@ -134,7 +135,7 @@ static inline void downStream(tunnel_t *self, context_t *c) atomic_fetch_add_explicit(&(state->active_cons), -1, memory_order_relaxed); destroyCstate(ucon); self->dw->downStream(self->dw, c); - initiateConnect(self,true); + initiateConnect(self, true); break; @@ -144,7 +145,7 @@ static inline void downStream(tunnel_t *self, context_t *c) (ucon->d->chains_state)[self->chain_index] = NULL; destroyCstate(ucon); self->dw->downStream(self->dw, switchLine(c, d_line)); - initiateConnect(self,false); + initiateConnect(self, false); break; @@ -157,7 +158,7 @@ static inline void downStream(tunnel_t *self, context_t *c) } destroyCstate(ucon); destroyContext(c); - initiateConnect(self,true); + initiateConnect(self, true); break; @@ -177,7 +178,7 @@ static inline void downStream(tunnel_t *self, context_t *c) destroyContext(c); unsigned int unused = atomic_fetch_add_explicit(&(state->unused_cons), 1, memory_order_relaxed); LOGI("PreConnectClient: connected, unused: %d active: %d", unused + 1, state->active_cons); - initiateConnect(self,false); + initiateConnect(self, false); } else { @@ -189,15 +190,15 @@ static inline void downStream(tunnel_t *self, context_t *c) static void startPreconnect(htimer_t *timer) { - tunnel_t * self = hevent_userdata(timer); + tunnel_t *self = hevent_userdata(timer); preconnect_client_state_t *state = STATE(self); for (int i = 0; i < workers_count; i++) { - const int cpt = state->connection_per_thread; + const size_t cpt = state->connection_per_thread; for (size_t ci = 0; ci < cpt; ci++) { - initiateConnect(self,true); + initiateConnect(self, true); } } diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index f05634bf..05e117c2 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -2,6 +2,7 @@ #include "helpers.h" #include "loggers/network_logger.h" #include "types.h" +#include "utils/jsonutils.h" static inline void upStream(tunnel_t *self, context_t *c) { @@ -157,7 +158,7 @@ static inline void downStream(tunnel_t *self, context_t *c) static void startReverseCelint(htimer_t *timer) { - tunnel_t * self = hevent_userdata(timer); + tunnel_t *self = hevent_userdata(timer); reverse_client_state_t *state = STATE(self); for (int i = 0; i < workers_count; i++) { diff --git a/tunnels/server/header/header_server.c b/tunnels/server/header/header_server.c index 005a4abf..319d61e4 100644 --- a/tunnels/server/header/header_server.c +++ b/tunnels/server/header/header_server.c @@ -2,6 +2,7 @@ #include "buffer_stream.h" #include "hsocket.h" #include "loggers/network_logger.h" +#include "utils/jsonutils.h" enum header_dynamic_value_status { diff --git a/tunnels/server/trojan/auth/trojan_auth_server.c b/tunnels/server/trojan/auth/trojan_auth_server.c index 78c355d0..31f90fc7 100644 --- a/tunnels/server/trojan/auth/trojan_auth_server.c +++ b/tunnels/server/trojan/auth/trojan_auth_server.c @@ -4,13 +4,16 @@ #include "managers/node_manager.h" #include "utils/stringutils.h" #include "utils/userutils.h" +#include "utils/jsonutils.h" #define i_type hmap_users_t // NOLINT #define i_key hash_t // NOLINT #define i_val trojan_user_t * // NOLINT -#define VEC_CAP 100 -#define CRLF_LEN 2 +enum { +VEC_CAP = 100, +CRLF_LEN = 2 +}; #include "stc/hmap.h" diff --git a/ww/buffer_pool.h b/ww/buffer_pool.h index e720365a..662b6751 100644 --- a/ww/buffer_pool.h +++ b/ww/buffer_pool.h @@ -9,7 +9,7 @@ This is the most memory consuming part of the program, and also the preallocation length really depends on where you want to use this program, on a phone or on a 16 core server? - so there are possible choose able memory profiles in the .c file which you can select the best for your needs + so there are possible choices for memory profiles in the .c file which you can select the best for your needs todo (runtime selection) its better that the memory profile be a runtime selection than compile time diff --git a/ww/context_queue.c b/ww/context_queue.c index 5c678ee3..d4989506 100644 --- a/ww/context_queue.c +++ b/ww/context_queue.c @@ -87,6 +87,5 @@ void contextQueueNotifyIoRemoved(context_queue_t *self, hio_t *io) { (*i.ref)->src_io = NULL; } - destroyContext((*i.ref)); } } diff --git a/ww/eventloop/base/hlsem.c b/ww/eventloop/base/hlsem.c index 7ec3969d..18af58f6 100644 --- a/ww/eventloop/base/hlsem.c +++ b/ww/eventloop/base/hlsem.c @@ -22,60 +22,57 @@ // misrepresented as being the original software. // 3. This notice may not be removed or altered from any source distribution. -//#define USE_UNIX_SEMA +// #define USE_UNIX_SEMA #if defined(_WIN32) && !defined(USE_UNIX_SEMA) - #include - #undef min - #undef max +#include +#undef min +#undef max #elif defined(__MACH__) && !defined(USE_UNIX_SEMA) - #undef panic // mach/mach.h defines a function called panic() - #include - // redefine panic - #define panic(fmt, ...) _panic(__FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#undef panic // mach/mach.h defines a function called panic() +#include +// redefine panic +#define panic(fmt, ...) _panic(__FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) #elif defined(__unix__) || defined(USE_UNIX_SEMA) - #include +#include #else - #error Unsupported platform +#error Unsupported platform #endif -ASSUME_NONNULL_BEGIN - #define USECS_IN_1_SEC 1000000 #define NSECS_IN_1_SEC 1000000000 - //--------------------------------------------------------------------------------------------- #if defined(_WIN32) && !defined(USE_UNIX_SEMA) -static bool SemaInit(Sema* sp, u32 initcount) { - assert(initcount <= 0x7fffffff); - *sp = (Sema)CreateSemaphoreW(NULL, (int)initcount, 0x7fffffff, NULL); - return *sp != NULL; +static bool SemaInit(hsem_t* sp, uint32_t initcount) { + assert(initcount <= 0x7fffffff); + *sp = (hsem_t)CreateSemaphoreW(NULL, (int)initcount, 0x7fffffff, NULL); + return *sp != NULL; } -static void SemaDispose(Sema* sp) { - CloseHandle(*sp); +static void SemaDispose(hsem_t* sp) { + CloseHandle(*sp); } -static bool SemaWait(Sema* sp) { - const unsigned long infinite = 0xffffffff; - return WaitForSingleObject(*sp, infinite) == 0; +static bool SemaWait(hsem_t* sp) { + const unsigned long infinite = 0xffffffff; + return WaitForSingleObject(*sp, infinite) == 0; } -static bool SemaTryWait(Sema* sp) { - return WaitForSingleObject(*sp, 0) == 0; +static bool SemaTryWait(hsem_t* sp) { + return WaitForSingleObject(*sp, 0) == 0; } -static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { - return WaitForSingleObject(*sp, (unsigned long)(timeout_usecs / 1000)) == 0; +static bool SemaTimedWait(hsem_t* sp, uint64_t timeout_usecs) { + return WaitForSingleObject(*sp, (unsigned long)(timeout_usecs / 1000)) == 0; } -static bool SemaSignal(Sema* sp, u32 count) { - assert(count > 0); - // while (!ReleaseSemaphore(*sp, count, NULL)) { - // } - return ReleaseSemaphore(*sp, count, NULL); +static bool SemaSignal(hsem_t* sp, uint32_t count) { + assert(count > 0); + // while (!ReleaseSemaphore(*sp, count, NULL)) { + // } + return ReleaseSemaphore(*sp, count, NULL); } //--------------------------------------------------------------------------------------------- @@ -84,62 +81,59 @@ static bool SemaSignal(Sema* sp, u32 count) { // https://web.archive.org/web/20140109214515/ // http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html -static bool SemaInit(Sema* sp, u32 initcount) { - assert(initcount <= 0x7fffffff); - kern_return_t rc = - semaphore_create(mach_task_self(), (semaphore_t*)sp, SYNC_POLICY_FIFO, (int)initcount); - return rc == KERN_SUCCESS; +static bool SemaInit(hsem_t* sp, uint32_t initcount) { + assert(initcount <= 0x7fffffff); + kern_return_t rc = semaphore_create(mach_task_self(), (semaphore_t*)sp, SYNC_POLICY_FIFO, (int)initcount); + return rc == KERN_SUCCESS; } -static void SemaDispose(Sema* sp) { - semaphore_destroy(mach_task_self(), *(semaphore_t*)sp); +static void SemaDispose(hsem_t* sp) { + semaphore_destroy(mach_task_self(), *(semaphore_t*)sp); } -static bool SemaWait(Sema* sp) { - semaphore_t s = *(semaphore_t*)sp; - while (1) { - kern_return_t rc = semaphore_wait(s); - if (rc != KERN_ABORTED) - return rc == KERN_SUCCESS; - } +static bool SemaWait(hsem_t* sp) { + semaphore_t s = *(semaphore_t*)sp; + while (1) { + kern_return_t rc = semaphore_wait(s); + if (rc != KERN_ABORTED) return rc == KERN_SUCCESS; + } } -static bool SemaTryWait(Sema* sp) { - return SemaTimedWait(sp, 0); +static bool SemaTryWait(hsem_t* sp) { + return SemaTimedWait(sp, 0); } -static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { - mach_timespec_t ts; - ts.tv_sec = (u32)(timeout_usecs / USECS_IN_1_SEC); - ts.tv_nsec = (int)((timeout_usecs % USECS_IN_1_SEC) * 1000); - // Note: - // semaphore_wait_deadline was introduced in macOS 10.6 - // semaphore_timedwait was introduced in macOS 10.10 - // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/ - // APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html - semaphore_t s = *(semaphore_t*)sp; - while (1) { - kern_return_t rc = semaphore_timedwait(s, ts); - if (rc != KERN_ABORTED) - return rc == KERN_SUCCESS; - // TODO: update ts; subtract time already waited and retry (loop). - // For now, let's just return with an error: - return false; - } +static bool SemaTimedWait(hsem_t* sp, uint64_t timeout_usecs) { + mach_timespec_t ts; + ts.tv_sec = (uint32_t)(timeout_usecs / USECS_IN_1_SEC); + ts.tv_nsec = (int)((timeout_usecs % USECS_IN_1_SEC) * 1000); + // Note: + // semaphore_wait_deadline was introduced in macOS 10.6 + // semaphore_timedwait was introduced in macOS 10.10 + // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/ + // APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html + semaphore_t s = *(semaphore_t*)sp; + while (1) { + kern_return_t rc = semaphore_timedwait(s, ts); + if (rc != KERN_ABORTED) return rc == KERN_SUCCESS; + // TODO: update ts; subtract time already waited and retry (loop). + // For now, let's just return with an error: + return false; + } } -static bool SemaSignal(Sema* sp, u32 count) { - assert(count > 0); - semaphore_t s = *(semaphore_t*)sp; - kern_return_t rc = 0; // KERN_SUCCESS - while (count-- > 0) { - rc += semaphore_signal(s); // == ... - // auto rc1 = semaphore_signal(s); - // if (rc1 != KERN_SUCCESS) { - // rc = rc1; - // } - } - return rc == KERN_SUCCESS; +static bool SemaSignal(hsem_t* sp, uint32_t count) { + assert(count > 0); + semaphore_t s = *(semaphore_t*)sp; + kern_return_t rc = 0; // KERN_SUCCESS + while (count-- > 0) { + rc += semaphore_signal(s); // == ... + // auto rc1 = semaphore_signal(s); + // if (rc1 != KERN_SUCCESS) { + // rc = rc1; + // } + } + return rc == KERN_SUCCESS; } //--------------------------------------------------------------------------------------------- @@ -147,69 +141,69 @@ static bool SemaSignal(Sema* sp, u32 count) { // TODO: implementation based on futex (for Linux and OpenBSD). See "__TBB_USE_FUTEX" of oneTBB -static bool SemaInit(Sema* sp, u32 initcount) { - return sem_init((sem_t*)sp, 0, initcount) == 0; +static bool SemaInit(hsem_t* sp, uint32_t initcount) { + return sem_init((sem_t*)sp, 0, initcount) == 0; } -static void SemaDispose(Sema* sp) { - sem_destroy((sem_t*)sp); +static void SemaDispose(hsem_t* sp) { + sem_destroy((sem_t*)sp); } -static bool SemaWait(Sema* sp) { - // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error - int rc; - do { - rc = sem_wait((sem_t*)sp); - } while (rc == -1 && errno == EINTR); - return rc == 0; +static bool SemaWait(hsem_t* sp) { + // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error + int rc; + do { + rc = sem_wait((sem_t*)sp); + } while (rc == -1 && errno == EINTR); + return rc == 0; } -static bool SemaTryWait(Sema* sp) { - int rc; - do { - rc = sem_trywait((sem_t*)sp); - } while (rc == -1 && errno == EINTR); - return rc == 0; +static bool SemaTryWait(hsem_t* sp) { + int rc; + do { + rc = sem_trywait((sem_t*)sp); + } while (rc == -1 && errno == EINTR); + return rc == 0; } -static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += (time_t)(timeout_usecs / USECS_IN_1_SEC); - ts.tv_nsec += (long)(timeout_usecs % USECS_IN_1_SEC) * 1000; - // sem_timedwait bombs if you have more than 1e9 in tv_nsec - // so we have to clean things up before passing it in - if (ts.tv_nsec >= NSECS_IN_1_SEC) { - ts.tv_nsec -= NSECS_IN_1_SEC; - ++ts.tv_sec; - } - int rc; - do { - rc = sem_timedwait((sem_t*)sp, &ts); - } while (rc == -1 && errno == EINTR); - return rc == 0; +static bool SemaTimedWait(hsem_t* sp, uint64_t timeout_usecs) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (time_t)(timeout_usecs / USECS_IN_1_SEC); + ts.tv_nsec += (long)(timeout_usecs % USECS_IN_1_SEC) * 1000; + // sem_timedwait bombs if you have more than 1e9 in tv_nsec + // so we have to clean things up before passing it in + if (ts.tv_nsec >= NSECS_IN_1_SEC) { + ts.tv_nsec -= NSECS_IN_1_SEC; + ++ts.tv_sec; + } + int rc; + do { + rc = sem_timedwait((sem_t*)sp, &ts); + } while (rc == -1 && errno == EINTR); + return rc == 0; } -static bool SemaSignal(Sema* sp, u32 count) { - assert(count > 0); - while (count-- > 0) { - while (sem_post((sem_t*)sp) == -1) { - return false; +static bool SemaSignal(hsem_t* sp, uint32_t count) { + assert(count > 0); + while (count-- > 0) { + while (sem_post((sem_t*)sp) == -1) { + return false; + } } - } - return true; + return true; } //--------------------------------------------------------------------------------------------- #endif /* system */ -// end of Sema implementations +// end of hsem_t implementations //--------------------------------------------------------------------------------------------- // hlsem_t // LSEMA_MAX_SPINS is the upper limit of how many times to retry a CAS while spinning. // After LSEMA_MAX_SPINS CAS attempts has failed (not gotten a signal), the implementation -// falls back on calling hlsem_Wait. +// falls back on calling SemaWait. // // The number 10000 has been choosen by looking at contention between a few threads competing // for signal & wait on macOS 10.15 x86_64. In most observed cases two threads with zero overhead @@ -217,12 +211,12 @@ static bool SemaSignal(Sema* sp, u32 count) { // #define LSEMA_MAX_SPINS 10000 -static bool _hlsem_waitpartialspin(hlsem_t* s, uint64_t timeout_usecs) { +static bool _LSemaWaitPartialSpin(hlsem_t* s, uint64_t timeout_usecs) { ssize_t oldCount; int spin = LSEMA_MAX_SPINS; while (--spin >= 0) { oldCount = atomic_load_explicit(&s->count, memory_order_relaxed); - if (oldCount > 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount - 1, memory_order_acq_rel, memory_order_release)) + if (oldCount > 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount - 1, memory_order_acq_rel, memory_order_relaxed)) return true; // Prevent the compiler from collapsing the loop // [rsms]: Is this really needed? Find out. I think both clang and gcc will avoid @@ -233,9 +227,9 @@ static bool _hlsem_waitpartialspin(hlsem_t* s, uint64_t timeout_usecs) { oldCount = atomic_fetch_sub_explicit(&s->count, 1, memory_order_acquire); if (oldCount > 0) return true; if (timeout_usecs == 0) { - if (hlsem_Wait(&s->sema)) return true; + if (SemaWait(&s->sema)) return true; } - if (timeout_usecs > 0 && hlsem_timedwait(&s->sema, timeout_usecs)) return true; + if (timeout_usecs > 0 && SemaTimedWait(&s->sema, timeout_usecs)) return true; // At this point, we've timed out waiting for the semaphore, but the // count is still decremented indicating we may still be waiting on // it. So we have to re-adjust the count, but only if the semaphore @@ -243,27 +237,26 @@ static bool _hlsem_waitpartialspin(hlsem_t* s, uint64_t timeout_usecs) { // need to release the semaphore too. while (1) { oldCount = atomic_load_explicit(&s->count, memory_order_acquire); - if (oldCount >= 0 && hlsem_trywait(&s->sema)) return true; - if (oldCount < 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount + 1,memory_order_relaxed,memory_order_relaxed)) return false; - - + if (oldCount >= 0 && SemaTryWait(&s->sema)) return true; + if (oldCount < 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount + 1, memory_order_relaxed, memory_order_relaxed)) + return false; } } -bool hlsem_init(hlsem_t* s, uint32_t initcount) { +bool LSemaInit(hlsem_t* s, uint32_t initcount) { s->count = ATOMIC_VAR_INIT(initcount); - return hlsem_init(&s->sema, initcount); + return SemaInit(&s->sema, initcount); } -void hlsem_destroy(hlsem_t* s) { - hlsem_destroy(&s->sema); +void LSemaDispose(hlsem_t* s) { + SemaDispose(&s->sema); } -bool hlsem_wait(hlsem_t* s) { - return hlsem_trywait(s) || _hlsem_waitpartialspin(s, 0); +bool LSemaWait(hlsem_t* s) { + return LSemaTryWait(s) || _LSemaWaitPartialSpin(s, 0); } -bool hlsem_trywait(hlsem_t* s) { +bool LSemaTryWait(hlsem_t* s) { ssize_t oldCount = atomic_load_explicit(&s->count, memory_order_relaxed); while (oldCount > 0) { if (atomic_compare_exchange_weak_explicit(&s->count, &oldCount, oldCount - 1, memory_order_acquire, memory_order_relaxed)) { @@ -273,20 +266,18 @@ bool hlsem_trywait(hlsem_t* s) { return false; } -bool hlsem_timedwait(hlsem_t* s, uint64_t timeout_usecs) { - return hlsem_TryWait(s) || _hlsem_WaitPartialSpin(s, timeout_usecs); +bool LSemaTimedWait(hlsem_t* s, uint64_t timeout_usecs) { + return LSemaTryWait(s) || _LSemaWaitPartialSpin(s, timeout_usecs); } -void hlsem_signal(hlsem_t* s, uint32_t count) { +void LSemaSignal(hlsem_t* s, uint32_t count) { assert(count > 0); ssize_t oldCount = atomic_fetch_add_explicit(&s->count, (ssize_t)count, memory_order_release); ssize_t toRelease = -oldCount < count ? -oldCount : (ssize_t)count; - if (toRelease > 0) hlsem_signal(&s->sema, (uint32_t)toRelease); + if (toRelease > 0) SemaSignal(&s->sema, (uint32_t)toRelease); } -size_t hlsem_ApproxAvail(hlsem_t* s) { +size_t LSemaApproxAvail(hlsem_t* s) { ssize_t count = atomic_load_explicit(&s->count, memory_order_relaxed); return count > 0 ? (size_t)(count) : 0; } - - diff --git a/ww/eventloop/base/hmutex.h b/ww/eventloop/base/hmutex.h index ba917b05..e6734470 100644 --- a/ww/eventloop/base/hmutex.h +++ b/ww/eventloop/base/hmutex.h @@ -238,18 +238,18 @@ static inline int hsem_wait_for(hsem_t* sem, unsigned int ms) { // no syscalls. If there's no signal the implementation will retry by spinning for // a short while before eventually falling back to Sema. typedef struct hlsem_s { - atomic_llong count; + atomic_long count; hsem_t sema; } hlsem_t; -// returns false if system impl failed (rare) -bool hlsem_init(hlsem_t*, uint32_t initcount); -void hlsem_destroy(hlsem_t*); -bool hlsem_wait(hlsem_t*); -bool hlsem_trywait(hlsem_t*); -bool hlsem_timedwait(hlsem_t*, uint64_t timeout_usecs); -void hlsem_signal(hlsem_t*, uint32_t count /*must be >0*/); -size_t hlsem_approxavail(hlsem_t*); +bool LSemaInit(hlsem_t*, uint32_t initcount); // returns false if system impl failed (rare) +void LSemaDispose(hlsem_t*); +bool LSemaWait(hlsem_t*); +bool LSemaTryWait(hlsem_t*); +bool LSemaTimedWait(hlsem_t*, uint64_t timeout_usecs); +void LSemaSignal(hlsem_t*, uint32_t count /*must be >0*/); +size_t LSemaApproxAvail(hlsem_t*); + @@ -307,6 +307,9 @@ static inline void HybridMutexLock(hybrid_mutex_t* m) { } } +static inline bool HybridMutexTryLock(hybrid_mutex_t* m) { + return 0 == atomic_exchange_explicit(&m->flag, true, memory_order_acquire); +} static inline void HybridMutexUnlock(hybrid_mutex_t* m) { atomic_exchange(&m->flag, false); if (atomic_load(&m->nwait) != 0) { @@ -321,6 +324,7 @@ static inline void HybridMutexUnlock(hybrid_mutex_t* m) { #define hhybridmutex_init HybridMutexInit #define hhybridmutex_destroy HybridMutexDestroy #define hhybridmutex_lock HybridMutexLock +#define hhybridmutex_trylock HybridMutexTryLock #define hhybridmutex_unlock HybridMutexUnlock diff --git a/ww/frand.c b/ww/frand.c index cfa3a1da..8800ebd6 100644 --- a/ww/frand.c +++ b/ww/frand.c @@ -1,2 +1,3 @@ +#include "frand.h" -extern unsigned fastRand(void); \ No newline at end of file +extern unsigned int fastRand(void); \ No newline at end of file diff --git a/ww/idle_table.c b/ww/idle_table.c index bd1dfb9a..4e18c665 100644 --- a/ww/idle_table.c +++ b/ww/idle_table.c @@ -23,18 +23,17 @@ void destoryIdleTable(idle_table_t *self) hmap_idles_t_drop(&(self->hmap)); } -idle_table_t *newIdleTable(uint8_t tid, OnIdleExpireCallBack cb) +idle_table_t *newIdleTable(hloop_t *loop, OnIdleExpireCallBack cb) { idle_table_t *newtable = malloc(sizeof(idle_table_t)); - *newtable = (idle_table_t){.tid = tid, - .loop = loops[tid], - .idle_handle = hidle_add(loops[tid], idleCallBack, INFINITE), + *newtable = (idle_table_t){.loop = loop, + .idle_handle = hidle_add(loop, idleCallBack, INFINITE), .hqueue = heapq_idles_t_with_capacity(kVecCap), .hmap = hmap_idles_t_with_capacity(kVecCap), .expire_cb = cb, - .last_update_ms = hloop_now_ms(loops[tid])}; + .last_update_ms = hloop_now_ms(loop)}; - hspinlock_init(&(newtable->slock)); + hhybridmutex_init(&(newtable->mutex)); hevent_set_userdata(newtable->idle_handle, newtable); return newtable; } @@ -43,7 +42,7 @@ idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, uint8_t { assert(self && self->expire_cb); idle_item_t *item = malloc(sizeof(idle_item_t)); - hspinlock_lock(&(self->slock)); + hhybridmutex_lock(&(self->mutex)); *item = (idle_item_t){.expire_at_ms = hloop_now_ms(self->loop) + age_ms, .hash = key, @@ -54,42 +53,42 @@ idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, uint8_t heapq_idles_t_push(&(self->hqueue), item); hmap_idles_t_push(&(self->hmap), (hmap_idles_t_value){item->hash, item}); - hspinlock_unlock(&(self->slock)); + hhybridmutex_unlock(&(self->mutex)); return item; } void keepIdleItemForAtleast(idle_table_t *self, idle_item_t *item, uint64_t age_ms) { - hspinlock_lock(&(self->slock)); + hhybridmutex_lock(&(self->mutex)); item->expire_at_ms += self->last_update_ms + age_ms; heapq_idles_t_make_heap(&self->hqueue); - hspinlock_unlock(&(self->slock)); + hhybridmutex_unlock(&(self->mutex)); } idle_item_t *getIdleItemByHash(idle_table_t *self, hash_t key) { - hspinlock_lock(&(self->slock)); + hhybridmutex_lock(&(self->mutex)); hmap_idles_t_iter find_result = hmap_idles_t_find(&(self->hmap), key); if (find_result.ref == hmap_idles_t_end(&(self->hmap)).ref) { - hspinlock_unlock(&(self->slock)); + hhybridmutex_unlock(&(self->mutex)); return NULL; } - hspinlock_unlock(&(self->slock)); + hhybridmutex_unlock(&(self->mutex)); return (find_result.ref->second); } void removeIdleItemByHandle(idle_table_t *self, idle_item_t *item) { - if (item == NULL) - { - return; - } + assert(item != NULL && item->hash != 0x0); + hash_t item_hash = item->hash; + // enough to say its no longer in heap queue - item->userdata == NULL; + *item = (idle_item_t){}; - hspinlock_lock(&(self->slock)); - hmap_idles_t_erase(&(self->hmap), item->hash); - hspinlock_unlock(&(self->slock)); + hhybridmutex_lock(&(self->mutex)); + hmap_idles_t_erase(&(self->hmap), item_hash); + heapq_idles_t_make_heap(&self->hqueue); + hhybridmutex_unlock(&(self->mutex)); // alternative: // const uint64_t et = item->expire_at_ms; @@ -109,7 +108,7 @@ void removeIdleItemByHash(idle_table_t *self, hash_t key) removeIdleItemByHandle(self, getIdleItemByHash(self, key)); } -void beforeCloseCallBack(hevent_t *ev) +static void beforeCloseCallBack(hevent_t *ev) { idle_item_t *item = hevent_userdata(ev); const uint64_t oldex = item->expire_at_ms; @@ -124,7 +123,7 @@ void idleCallBack(hidle_t *idle) idle_table_t *self = hevent_userdata(idle); const uint64_t now = hloop_now_ms(self->loop); self->last_update_ms = now; - hspinlock_lock(&(self->slock)); + hhybridmutex_lock(&(self->mutex)); while (heapq_idles_t_size(&(self->hqueue)) > 0) { @@ -134,7 +133,8 @@ void idleCallBack(hidle_t *idle) { heapq_idles_t_pop(&(self->hqueue)); - if (item->userdata) + if (! item->cb) + { hmap_idles_t_erase(&(self->hmap), item->hash); hevent_t ev; @@ -142,14 +142,8 @@ void idleCallBack(hidle_t *idle) ev.loop = loops[item->tid]; ev.cb = beforeCloseCallBack; hevent_set_userdata(&ev, item); - if (item->tid == self->tid) - { - beforeCloseCallBack(&ev); - } - else - { - hloop_post_event(loops[item->tid], &ev); - } + + hloop_post_event(loops[item->tid], &ev); } } else @@ -157,5 +151,5 @@ void idleCallBack(hidle_t *idle) break; } } - hspinlock_unlock(&(self->slock)); + hhybridmutex_unlock(&(self->mutex)); } \ No newline at end of file diff --git a/ww/idle_table.h b/ww/idle_table.h index d2df0498..fc4f1d19 100644 --- a/ww/idle_table.h +++ b/ww/idle_table.h @@ -9,12 +9,12 @@ Thread safe idle table What dose it mean "idle table?" - in simple words, you put a object (idle_item) inside the table + in simple words, you put a object (idle_item) inside the table the idle_item has a timeout (or deadline), if the timeout expires the idle_item is removed from the table and the callback you provided is called. you also can keep updating the item timeout - The time checking has no cost and won't syscall at all, and the checking is synced by the + The time checking has no cost and won't syscall at all, and the checking is synced by the eventloop which by default wakes up every 100 ms. */ @@ -28,10 +28,9 @@ typedef struct idle_item_s hash_t hash; void *userdata; uint8_t tid; - OnIdleExpireCallBack cb; // shortcut table cb + OnIdleExpireCallBack cb; // shortcut of table cb } idle_item_t; - #define i_TYPE heapq_idles_t, struct idle_item_s * #define i_cmp -c_default_cmp // NOLINT #define idletable_less_func(x, y) ((*(x))->expire_at_ms < (*(y))->expire_at_ms) // NOLINT @@ -42,23 +41,21 @@ typedef struct idle_item_s #define i_TYPE hmap_idles_t, uint64_t, struct idle_item_s * #include "stc/hmap.h" - typedef struct idle_table_s { - uint8_t tid; hloop_t *loop; hidle_t *idle_handle; heapq_idles_t hqueue; hmap_idles_t hmap; - hspinlock_t slock; + hhybridmutex_t mutex; uint64_t last_update_ms; OnIdleExpireCallBack expire_cb; } idle_table_t; -idle_table_t *newIdleTable(uint8_t tid, OnIdleExpireCallBack cb); +idle_table_t *newIdleTable(hloop_t*loop, OnIdleExpireCallBack cb); idle_item_t *newIdleItem(idle_table_t *self, hash_t key, void *userdata, uint8_t tid, uint64_t age_ms); idle_item_t *getIdleItemByHash(idle_table_t *self, hash_t key); void destoryIdleTable(idle_table_t *self); void keepIdleItemForAtleast(idle_table_t *self, idle_item_t *item, uint64_t age_ms); -void removeIdleItemByHandle(idle_table_t *self, idle_item_t *item); +void removeIdleItemByHandle(idle_table_t *self, idle_item_t *item); // removing will not call expire cd void removeIdleItemByHash(idle_table_t *self, hash_t key); diff --git a/ww/loggers/core_logger.h b/ww/loggers/core_logger.h index a6af5c1b..8770ae8c 100644 --- a/ww/loggers/core_logger.h +++ b/ww/loggers/core_logger.h @@ -3,6 +3,7 @@ #include #undef hlog +#undef HLOG #define HLOG getCoreLogger() #undef LOGD diff --git a/ww/loggers/dns_logger.h b/ww/loggers/dns_logger.h index 8106b226..d228602f 100644 --- a/ww/loggers/dns_logger.h +++ b/ww/loggers/dns_logger.h @@ -3,6 +3,7 @@ #include #undef hlog +#undef HLOG #define HLOG getDnsLogger() #undef LOGD diff --git a/ww/loggers/network_logger.h b/ww/loggers/network_logger.h index c7d7fd9c..c1ffb4fa 100644 --- a/ww/loggers/network_logger.h +++ b/ww/loggers/network_logger.h @@ -3,6 +3,7 @@ #include #undef hlog +#undef HLOG #define HLOG getNetworkLogger() #undef LOGD diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index 0fa0f135..e4668ca1 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -3,29 +3,20 @@ #include "buffer_pool.h" #include "hloop.h" #include "hmutex.h" -#include "hsocket.h" -#include "hthread.h" #include "idle_table.h" #include "loggers/network_logger.h" #include "stc/common.h" #include "tunnel.h" #include "utils/procutils.h" +#include "utils/sockutils.h" #include "ww.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include typedef struct socket_filter_s { - hio_t *listen_io; + union { + hio_t *listen_io; + hio_t **listen_ios; + }; socket_filter_option_t option; tunnel_t *tunnel; onAccept cb; @@ -48,9 +39,9 @@ typedef struct socket_manager_s hthread_t accept_thread; filters_t filters[kFilterLevels]; hhybridmutex_t mutex; - idle_table_t udp_table; + idle_table_t *udp_table; - uint16_t last_round_tindex; + uint16_t last_round_tid; bool iptables_installed; bool ip6tables_installed; bool lsof_installed; @@ -247,11 +238,9 @@ void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onA { socket_filter_t *filter = malloc(sizeof(socket_filter_t)); - filter->tunnel = tunnel; - filter->option = option; - filter->cb = cb; - filter->listen_io = NULL; - unsigned int pirority = 0; + *filter = (socket_filter_t){.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL}; + + unsigned int pirority = 0; if (option.multiport_backend == kMultiportBackendNothing) { pirority++; @@ -270,42 +259,52 @@ void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onA hhybridmutex_unlock(&(state->mutex)); } -static void distributeSocket(hio_t *io, socket_filter_t *filter, uint16_t local_port, bool no_delay) +static inline uint16_t getCurrentDistrbuteTid() +{ + return state->last_round_tid; +} +static inline void incrementDistrbuteTid() +{ + state->last_round_tid++; + if (state->last_round_tid >= workers_count) + { + state->last_round_tid = 0; + } +} +static void distributeSocket(void *io, socket_filter_t *filter, uint16_t local_port) { socket_accept_result_t *result = malloc(sizeof(socket_accept_result_t)); result->real_localport = local_port; - if (no_delay) - { - tcp_nodelay(hio_fd(io), 1); - } + uint8_t tid = (uint8_t) getCurrentDistrbuteTid(); + hloop_t *worker_loop = loops[tid]; + hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb}; + result->tid = tid; + result->io = io; + result->tunnel = filter->tunnel; + ev.userdata = result; + incrementDistrbuteTid(); - hio_detach(io); - hloop_t *worker_loop = loops[state->last_round_tindex]; - hevent_t ev; - memset(&ev, 0, sizeof(ev)); - ev.loop = worker_loop; - ev.cb = filter->cb; - result->tid = state->last_round_tindex; - result->io = io; - result->tunnel = filter->tunnel; - ev.userdata = result; - state->last_round_tindex++; - if (state->last_round_tindex >= workers_count) - { - state->last_round_tindex = 0; - } hloop_post_event(worker_loop, &ev); } -static void noSocketConsumerFound(hio_t *io) +static void noTcpSocketConsumerFound(hio_t *io) { char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; - LOGE("SocketManager: could not find consumer for socket FD:%x [%s] <= [%s]", (int) hio_fd(io), + LOGE("SocketManager: could not find consumer for Tcp socket FD:%x [%s] <= [%s]", (int) hio_fd(io), SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); hio_close(io); } +static void noUdpSocketConsumerFound(udpsock_t *udpsock) +{ + char localaddrstr[SOCKADDR_STRLEN] = {0}; + char peeraddrstr[SOCKADDR_STRLEN] = {0}; + + LOGE("SocketManager: could not find consumer for Udp socket [%s] <= [%s]", + SOCKADDR_STR(&(udpsock->address_local), localaddrstr), SOCKADDR_STR(&(udpsock->address_peer), peeraddrstr)); + free(udpsock); +} static bool checkIpIsWhiteList(sockaddr_u *addr, char **white_list_raddr) { bool matches = false; @@ -351,8 +350,7 @@ static void distributeTcpSocket(hio_t *io, uint16_t local_port) uint16_t port_min = option.port_min; uint16_t port_max = option.port_max; - // single port or multi port per socket - if (port_min > local_port || port_max < local_port) + if (option.proto != kSapTcp || port_min > local_port || port_max < local_port) { continue; } @@ -364,14 +362,19 @@ static void distributeTcpSocket(hio_t *io, uint16_t local_port) } } - distributeSocket(io, filter, local_port, option.no_delay); + if (option.no_delay) + { + tcp_nodelay(hio_fd(io), 1); + } + hio_detach(io); + distributeSocket(io, filter, local_port); hhybridmutex_unlock(&(state->mutex)); return; } } hhybridmutex_unlock(&(state->mutex)); - noSocketConsumerFound(io); + noTcpSocketConsumerFound(io); } static void onAcceptTcpSinplePort(hio_t *io) @@ -454,6 +457,10 @@ static void listenTcpMultiPortIptables(hloop_t *loop, socket_filter_t *filter, c static void listenTcpMultiPortSockets(hloop_t *loop, socket_filter_t *filter, char *host, uint16_t port_min, uint8_t *ports_overlapped, uint16_t port_max) { + const int length = (port_max - port_min); + filter->listen_ios = (hio_t **) malloc(sizeof(hio_t *) * (length + 1)); + filter->listen_ios[length] = 0x0; + int i = 0; for (uint16_t p = port_min; p < port_max; p++) { if (ports_overlapped[p] == 1) @@ -463,14 +470,14 @@ static void listenTcpMultiPortSockets(hloop_t *loop, socket_filter_t *filter, ch } ports_overlapped[p] = 1; - filter->listen_io = hloop_create_tcp_server(loop, host, p, onAcceptTcpSinplePort); + filter->listen_ios[i] = hloop_create_tcp_server(loop, host, p, onAcceptTcpSinplePort); - if (filter->listen_io == NULL) + if (filter->listen_ios[i] == NULL) { LOGW("SocketManager: could not listen on %s:[%u] , skipped...", host, p, "TCP"); continue; } - + i++; LOGI("SocketManager: listening on %s:[%u] (%s)", host, p, "TCP"); } } @@ -534,22 +541,22 @@ static void listenTcp(hloop_t *loop, uint8_t *ports_overlapped) } } -static void distributeUdpSocket(hio_t *io,uint16_t local_port) +static idle_item_t* distributeUdpSocket(udpsock_t *con, uint16_t local_port) { hhybridmutex_lock(&(state->mutex)); - sockaddr_u *paddr = (sockaddr_u *) hio_peeraddr(io); + sockaddr_u *paddr = &con->address_peer; for (int ri = (kFilterLevels - 1); ri >= 0; ri--) { c_foreach(k, filters_t, state->filters[ri]) { + socket_filter_t *filter = *(k.ref); socket_filter_option_t option = filter->option; uint16_t port_min = option.port_min; uint16_t port_max = option.port_max; - // single port or multi port per socket - if (port_min > local_port || port_max < local_port) + if (option.proto != kSapUdp || port_min > local_port || port_max < local_port) { continue; } @@ -560,19 +567,93 @@ static void distributeUdpSocket(hio_t *io,uint16_t local_port) continue; } } - - distributeSocket(io, filter, local_port, option.no_delay); + con->tid = getCurrentDistrbuteTid(); + distributeSocket(con, filter, local_port); hhybridmutex_unlock(&(state->mutex)); - return; + return true; } } hhybridmutex_unlock(&(state->mutex)); - noSocketConsumerFound(io); + noUdpSocketConsumerFound(con); + return NULL; +} + +void onUdpSocketExpire(struct idle_item_s *table_item) +{ + assert(table_item->userdata != NULL); + udpsock_t *udpsock = table_item->userdata; + + // call close callback + if (udpsock->closecb) + { + hevent_t ev; + memset(&ev, 0, sizeof(ev)); + ev.loop = loops[table_item->tid]; + ev.cb = udpsock->closecb; + hevent_set_userdata(&ev, udpsock); + hloop_post_event(loops[table_item->tid], &ev); + } +} +static void onRecvFrom(hio_t *io, shift_buffer_t *buf) +{ + + printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), (int) bufLen(buf)); + char localaddrstr[SOCKADDR_STRLEN] = {0}; + char peeraddrstr[SOCKADDR_STRLEN] = {0}; + printf("[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), + SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); + + hash_t peeraddr_hash = sockAddrCalcHash((sockaddr_u *) hio_peeraddr(io)); + idle_item_t *udp_item = getIdleItemByHash(state->udp_table, peeraddr_hash); + + if (udp_item) + { + } + else + { + udpsock_t *con = malloc(sizeof(udpsock_t)); + *con = + (udpsock_t){.address_local = *hio_localaddr(io), .address_peer = *hio_peeraddr(io), .ident = peeraddr_hash}; + + if (distributeUdpSocket(con, sockaddr_port(((sockaddr_u *) hio_peeraddr(io))))) + { + + } + + // con is freed when no consumer is found + } + + // char *str = (char *) buf; + // printf("< %.*s", readbytes, str); + // // echo + // printf("> %.*s", readbytes, str); + // hio_write(io, buf, readbytes); +} + +static void listenUdpSinglePort(hloop_t *loop, socket_filter_t *filter, char *host, uint16_t port, + uint8_t *ports_overlapped) +{ + if (ports_overlapped[port] == 1) + { + return; + } + ports_overlapped[port] = 1; + LOGI("SocketManager: listening on %s:[%u] (%s)", host, port, "UDP"); + filter->listen_io = hloop_create_udp_server(loop, host, port); + + if (filter->listen_io == NULL) + { + LOGF("SocketManager: stopping due to null socket handle"); + exit(1); + } + + hio_setcb_read(filter->listen_io, onRecvFrom); + hio_read(filter->listen_io); } // todo (udp smanager) -static void listenUdp() +static void listenUdp(hloop_t *loop, uint8_t *ports_overlapped) { for (int ri = (kFilterLevels - 1); ri >= 0; ri--) { @@ -608,7 +689,7 @@ static void listenUdp() } else { - // listenUdpSinglePort(loop, filter, option.host, port_min, ports_overlapped); + listenUdpSinglePort(loop, filter, option.host, port_min, ports_overlapped); } } } @@ -620,11 +701,16 @@ static HTHREAD_ROUTINE(accept_thread) // NOLINT hloop_t *loop = (hloop_t *) userdata; hhybridmutex_lock(&(state->mutex)); + state->udp_table = newIdleTable(loop, onUdpSocketExpire); + { uint8_t ports_overlapped[65536] = {0}; listenTcp(loop, ports_overlapped); } - + { + uint8_t ports_overlapped[65536] = {0}; + listenUdp(loop, ports_overlapped); + } hhybridmutex_unlock(&(state->mutex)); hloop_run(loop); diff --git a/ww/managers/socket_manager.h b/ww/managers/socket_manager.h index 1c41ff15..058bfc5c 100644 --- a/ww/managers/socket_manager.h +++ b/ww/managers/socket_manager.h @@ -44,21 +44,23 @@ typedef struct socket_accept_result_s typedef void (*onAccept)(hevent_t *ev); -typedef struct udpcon_s +typedef struct udpsock_s { sockaddr_u address_local; sockaddr_u address_peer; - void (*closecb)(struct udpcon_s udp_con); - void (*readcb)(struct shift_buffer_s *buf); - void *userdata; + void (*closecb)(hevent_t *ev); + void (*readcb)(hevent_t *ev); + void *userdata; + hash_t ident; + uint8_t tid; -} udpcon_t; +} udpsock_t; + +void closeUdpSocket(udpsock_t *udpsock); void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onAccept cb); struct socket_manager_s *getSocketManager(); - -void setSocketManager(struct socket_manager_s *state); struct socket_manager_s *createSocketManager(); - -void startSocketManager(); +void setSocketManager(struct socket_manager_s *state); +void startSocketManager(); diff --git a/ww/tunnel.c b/ww/tunnel.c index 5a1d113a..52454f29 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -1,8 +1,8 @@ #include "tunnel.h" #include "buffer_pool.h" #include "string.h" // memset -#include #include +#include #include extern line_t *newLine(uint8_t tid); @@ -20,6 +20,7 @@ extern context_t *newContext(line_t *line); extern context_t *newContextFrom(context_t *source); extern context_t *newEstContext(line_t *line); extern context_t *newFinContext(line_t *line); +extern context_t *newFinContextFrom(context_t *source); extern context_t *newInitContext(line_t *line); extern context_t *switchLine(context_t *c, line_t *line); extern buffer_pool_t *getThreadBufferPool(uint8_t tid); diff --git a/ww/utils/utils.c b/ww/utils/utils.c index 98e4cad6..9805d3a9 100644 --- a/ww/utils/utils.c +++ b/ww/utils/utils.c @@ -311,18 +311,18 @@ void socketContextDomainSetConstMem(socket_context_t *restrict scontext, const c assert(scontext->domain[len] == 0x0); } -hash_t sockAddrCalcHash(const sockaddr_u *scontext) +hash_t sockAddrCalcHash(const sockaddr_u *saddr) { // paddings are 0 - if (scontext->sa.sa_family == AF_INET) + if (saddr->sa.sa_family == AF_INET) { - return CALC_HASH_BYTES(&(scontext->sin), sizeof(struct sockaddr_in)); + return CALC_HASH_BYTES(&(saddr->sin.sin_port), sizeof(struct sockaddr_in) + sizeof(in_port_t)); } - if (scontext->sa.sa_family == AF_INET6) + if (saddr->sa.sa_family == AF_INET6) { - return CALC_HASH_BYTES(&(scontext->sin6), sizeof(struct sockaddr_in6)); + return CALC_HASH_BYTES(&(saddr->sin6.sin6_port), sizeof(struct sockaddr_in6) + sizeof(in_port_t)); } - return CALC_HASH_BYTES(&(scontext->sa), (sockaddr_len((sockaddr_u *) scontext))); + return CALC_HASH_BYTES(&(saddr->sa), (sockaddr_len((sockaddr_u *) saddr))); } enum socket_address_type getHostAddrType(char *host) @@ -369,10 +369,6 @@ struct user_s *parseUserFromJsonObject(const cJSON *user_json) return user; } -bool getPortFromJsonObject(uint16_t *dest_pmin, const cJSON *json_obj, uint16_t *dest_pmax, const char *key) -{ -} - bool verifyIpCdir(const char *ipc, struct logger_s *logger) { unsigned int ipc_length = strlen(ipc);