From 2ccb23e06a7e9b35463f55fd9c31f44c04eb48f3 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Tue, 4 Jun 2024 19:40:16 +0000 Subject: [PATCH] [WIP] 80% pipe line --- ww/pipe_line.c | 95 +++++++++++++++++++++++++++++++++++++------------- ww/pipe_line.h | 4 +-- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/ww/pipe_line.c b/ww/pipe_line.c index 95dd2d0a..aadd5b48 100644 --- a/ww/pipe_line.c +++ b/ww/pipe_line.c @@ -1,9 +1,13 @@ #include "pipe_line.h" +#include "buffer_pool.h" #include "hmutex.h" #include "hplatform.h" +#include "loggers/network_logger.h" +#include "shiftbuffer.h" #include "tunnel.h" #include #include +#include struct msg_event { @@ -14,10 +18,34 @@ struct msg_event typedef void (*MsgTargetFunction)(pipe_line_t *pl, void *arg); +static void lock(pipe_line_t *pl) +{ + int old_refc = atomic_fetch_add_explicit(&pl->refc, 1, memory_order_relaxed); + if (old_refc == 0) + { + LOGF("PipeLine: thread-safety done incorrectly lock()"); + exit(1); + } + (void) old_refc; +} +static void unlock(pipe_line_t *pl) +{ + int old_refc = atomic_fetch_add_explicit(&pl->refc, -1, memory_order_relaxed); + if (old_refc == 1) + { + if (! atomic_load_explicit(&(pl->closed), memory_order_relaxed)) + { + LOGF("PipeLine: thread-safety done incorrectly unlock()"); + exit(1); + } + free(pl); + } +} static void onMsgReceived(hevent_t *ev) { struct msg_event *msg_ev = hevent_userdata(ev); (*(MsgTargetFunction *) (&(msg_ev->function)))(msg_ev->pl, msg_ev->arg); + unlock(msg_ev->pl); free(msg_ev); } @@ -28,7 +56,7 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_ fn(pl, arg); return; } - + lock(pl); struct msg_event *evdata = malloc(sizeof(struct msg_event)); *evdata = (struct msg_event){.pl = pl, .function = *(void **) (&fn), .arg = arg}; @@ -39,17 +67,30 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_ hevent_set_userdata(&ev, evdata); hloop_post_event(loops[tid_to], &ev); } +void writeBufferToRightSide(pipe_line_t *pl, void *arg) +{ + shift_buffer_t *buf = arg; + if (pl->right_line == NULL) + { + reuseBuffer(buffer_pools[pl->right_tid], buf); + return; + } + context_t *ctx = newContext(pl->left_line); + ctx->payload = buf; + pl->local_up_stream(pl->self, ctx); +} void finishLeftSide(pipe_line_t *pl, void *arg) { (void) arg; + if (pl->left_line == NULL) { return; } context_t *fctx = newFinContext(pl->left_line); - // doneLineDownSide(pl->left_line); - // destroyLine(pl->left_line); + doneLineUpSide(pl->left_line); + destroyLine(pl->left_line); pl->left_line = NULL; pl->local_down_stream(pl->self, fctx); } @@ -62,8 +103,8 @@ void finishRightSide(pipe_line_t *pl, void *arg) return; } context_t *fctx = newFinContext(pl->right_line); - // doneLineUpSide(pl->right_line); - // destroyLine(pl->right_line); + doneLineDownSide(pl->right_line); + destroyLine(pl->right_line); pl->right_line = NULL; pl->local_up_stream(pl->self, fctx); } @@ -137,7 +178,6 @@ bool writePipeLineLTR(pipe_line_t *pl, context_t *c) // other flags are not supposed to come to pipe line assert(c->fin || c->payload != NULL); - atomic_compare_exchange_strong_explicit if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) { return false; @@ -145,32 +185,37 @@ bool writePipeLineLTR(pipe_line_t *pl, context_t *c) if (c->fin) { - atomic_store_explicit(&pl->closed, true, memory_order_relaxed); + assert(pl->left_line); + doneLineUpSide(pl->left_line); destroyLine(pl->left_line); pl->left_line = NULL; - destroyLine(line_t *l) - } - else - { - assert(c->payload != NULL); + bool expected = false; + + if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed, + memory_order_relaxed)) + { + // we managed to close the channel + destroyContext(c); + sendMessage(pl, finishRightSide, NULL, pl->left_tid, pl->right_tid); + unlock(pl); + return true; + } + // other line managed to close first and also queued us the fin packet + return false; } - if (pl->direct_mode) - { - // c = switchLine(c, pl->right_line); - - pl->self->up->upStream(pl->self->up, c); - - return true; - } + assert(c->payload != NULL); + sendMessage(pl, writeBufferToRightSide, c->payload, pl->left_tid, pl->right_tid); + c->payload = NULL; + destroyContext(c); return true; } -bool writePipeLineRTL(pipe_line_t *p, context_t *c) -{ -} +// bool writePipeLineRTL(pipe_line_t *p, context_t *c) +// { +// } -pipe_line_t *newPipeLine(uint8_t tid_left, uint8_t tid_right); -void freePipeLine(pipe_line_t *p); +// pipe_line_t *newPipeLine(uint8_t tid_left, uint8_t tid_right); +// void freePipeLine(pipe_line_t *p); diff --git a/ww/pipe_line.h b/ww/pipe_line.h index 305ff135..d31e8f90 100644 --- a/ww/pipe_line.h +++ b/ww/pipe_line.h @@ -2,6 +2,7 @@ #include "tunnel.h" #include "ww.h" +#include /* @@ -15,8 +16,8 @@ struct pipe_line_s { - bool direct_mode; atomic_bool closed; + atomic_int refc; // thread local: tunnel_t *self; @@ -34,5 +35,4 @@ typedef struct pipe_line_s pipe_line_t; bool writePipeLineLTR(pipe_line_t *p, context_t *c); bool writePipeLineRTL(pipe_line_t *p, context_t *c); -void freePipeLine(pipe_line_t *p); pipe_line_t *newPipeLine(uint8_t tid_left, tunnel_t *self, uint8_t tid_right);