Skip to content

Commit

Permalink
[WIP] 80% pipe line
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 4, 2024
1 parent 34879e7 commit 2ccb23e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 27 deletions.
95 changes: 70 additions & 25 deletions ww/pipe_line.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#include "pipe_line.h"
#include "buffer_pool.h"
#include "hmutex.h"
#include "hplatform.h"
#include "loggers/network_logger.h"
#include "shiftbuffer.h"
#include "tunnel.h"
#include <stdatomic.h>
#include <stdint.h>
#include <string.h>

struct msg_event
{
Expand All @@ -14,10 +18,34 @@ struct msg_event

typedef void (*MsgTargetFunction)(pipe_line_t *pl, void *arg);

static void lock(pipe_line_t *pl)
{
int old_refc = atomic_fetch_add_explicit(&pl->refc, 1, memory_order_relaxed);
if (old_refc == 0)
{
LOGF("PipeLine: thread-safety done incorrectly lock()");
exit(1);
}
(void) old_refc;
}
static void unlock(pipe_line_t *pl)
{
int old_refc = atomic_fetch_add_explicit(&pl->refc, -1, memory_order_relaxed);
if (old_refc == 1)
{
if (! atomic_load_explicit(&(pl->closed), memory_order_relaxed))
{
LOGF("PipeLine: thread-safety done incorrectly unlock()");
exit(1);
}
free(pl);
}
}
static void onMsgReceived(hevent_t *ev)
{
struct msg_event *msg_ev = hevent_userdata(ev);
(*(MsgTargetFunction *) (&(msg_ev->function)))(msg_ev->pl, msg_ev->arg);
unlock(msg_ev->pl);
free(msg_ev);
}

Expand All @@ -28,7 +56,7 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_
fn(pl, arg);
return;
}

lock(pl);
struct msg_event *evdata = malloc(sizeof(struct msg_event));
*evdata = (struct msg_event){.pl = pl, .function = *(void **) (&fn), .arg = arg};

Expand All @@ -39,17 +67,30 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_
hevent_set_userdata(&ev, evdata);
hloop_post_event(loops[tid_to], &ev);
}
void writeBufferToRightSide(pipe_line_t *pl, void *arg)
{
shift_buffer_t *buf = arg;
if (pl->right_line == NULL)
{
reuseBuffer(buffer_pools[pl->right_tid], buf);
return;
}
context_t *ctx = newContext(pl->left_line);
ctx->payload = buf;
pl->local_up_stream(pl->self, ctx);
}

void finishLeftSide(pipe_line_t *pl, void *arg)
{
(void) arg;

if (pl->left_line == NULL)
{
return;
}
context_t *fctx = newFinContext(pl->left_line);
// doneLineDownSide(pl->left_line);
// destroyLine(pl->left_line);
doneLineUpSide(pl->left_line);
destroyLine(pl->left_line);
pl->left_line = NULL;
pl->local_down_stream(pl->self, fctx);
}
Expand All @@ -62,8 +103,8 @@ void finishRightSide(pipe_line_t *pl, void *arg)
return;
}
context_t *fctx = newFinContext(pl->right_line);
// doneLineUpSide(pl->right_line);
// destroyLine(pl->right_line);
doneLineDownSide(pl->right_line);
destroyLine(pl->right_line);
pl->right_line = NULL;
pl->local_up_stream(pl->self, fctx);
}
Expand Down Expand Up @@ -137,40 +178,44 @@ bool writePipeLineLTR(pipe_line_t *pl, context_t *c)
// other flags are not supposed to come to pipe line
assert(c->fin || c->payload != NULL);

atomic_compare_exchange_strong_explicit
if (atomic_load_explicit(&pl->closed, memory_order_relaxed))
{
return false;
}

if (c->fin)
{
atomic_store_explicit(&pl->closed, true, memory_order_relaxed);
assert(pl->left_line);
doneLineUpSide(pl->left_line);
destroyLine(pl->left_line);
pl->left_line = NULL;
destroyLine(line_t *l)
}
else
{
assert(c->payload != NULL);

bool expected = false;

if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed,
memory_order_relaxed))
{
// we managed to close the channel
destroyContext(c);
sendMessage(pl, finishRightSide, NULL, pl->left_tid, pl->right_tid);
unlock(pl);
return true;
}
// other line managed to close first and also queued us the fin packet
return false;
}

if (pl->direct_mode)
{
// c = switchLine(c, pl->right_line);

pl->self->up->upStream(pl->self->up, c);

return true;
}
assert(c->payload != NULL);
sendMessage(pl, writeBufferToRightSide, c->payload, pl->left_tid, pl->right_tid);
c->payload = NULL;
destroyContext(c);

return true;
}

bool writePipeLineRTL(pipe_line_t *p, context_t *c)
{
}
// bool writePipeLineRTL(pipe_line_t *p, context_t *c)
// {
// }

pipe_line_t *newPipeLine(uint8_t tid_left, uint8_t tid_right);
void freePipeLine(pipe_line_t *p);
// pipe_line_t *newPipeLine(uint8_t tid_left, uint8_t tid_right);
// void freePipeLine(pipe_line_t *p);
4 changes: 2 additions & 2 deletions ww/pipe_line.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "tunnel.h"
#include "ww.h"
#include <stdatomic.h>

/*
Expand All @@ -15,8 +16,8 @@

struct pipe_line_s
{
bool direct_mode;
atomic_bool closed;
atomic_int refc;

// thread local:
tunnel_t *self;
Expand All @@ -34,5 +35,4 @@ typedef struct pipe_line_s pipe_line_t;

bool writePipeLineLTR(pipe_line_t *p, context_t *c);
bool writePipeLineRTL(pipe_line_t *p, context_t *c);
void freePipeLine(pipe_line_t *p);
pipe_line_t *newPipeLine(uint8_t tid_left, tunnel_t *self, uint8_t tid_right);

0 comments on commit 2ccb23e

Please sign in to comment.