Skip to content

Commit

Permalink
[WIP] 90% half duplex server
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 7, 2024
1 parent 6cb0d5a commit f61d1d0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 29 deletions.
147 changes: 120 additions & 27 deletions tunnels/server/halfduplex/halfduplex_server.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "halfduplex_server.h"
#include "basic_types.h"
#include "buffer_pool.h"
#include "hbase.h"
#include "hlog.h"
#include "hmutex.h"
Expand Down Expand Up @@ -70,8 +71,81 @@ static void localDownStream(tunnel_t *self, context_t *c, pipe_line_t *pl)
{
}

static void notifyDownloadLineIsReadyForBind(tunnel_t *self, context_t *c, pipe_line_t *pl)
static void notifyDownloadLineIsReadyForBind(tunnel_t *self, hash_t hash)
{
halfduplex_server_state_t *state = STATE(self);

hhybridmutex_lock(&(state->download_line_map_mutex));

hmap_cons_t_iter f_iter = hmap_cons_t_find(&(state->download_line_map), hash);
bool found = f_iter.ref == hmap_cons_t_end(&(state->download_line_map)).ref;

if (found)
{
// download pair is found
hmap_cons_t_erase_at(&(state->download_line_map), f_iter);
uint8_t tid_download_line = (*f_iter.ref).second->download_line->tid;

hhybridmutex_lock(&(state->upload_line_map_mutex));

f_iter = hmap_cons_t_find(&(state->upload_line_map), hash);
found = f_iter.ref == hmap_cons_t_end(&(state->upload_line_map)).ref;

if (found)
{
// upload pair is found
hmap_cons_t_erase_at(&(state->upload_line_map), f_iter);
uint8_t tid_upload_line = (*f_iter.ref).second->upload_line->tid;

line_t *download_line = ((halfduplex_server_con_state_t *) ((*f_iter.ref).second))->download_line;

halfduplex_server_con_state_t *download_line_cstate =
((halfduplex_server_con_state_t *) ((*f_iter.ref).second));

assert(download_line_cstate->state == kCsDownloadInTable);

download_line_cstate->state = kCsDownloadDirect;
download_line_cstate->upload_line = c->line;

lockLine(download_line);
context_t *i_ctx = newInitContext(download_line);
self->up->upStream(self->up, i_ctx);

if (! isAlive(download_line))
{
unLockLine(download_line);
reuseContextBuffer(c);
destroyContext(c);
return;
}

unLockLine(download_line);

if (bufLen(buf) > 0)
{
self->up->upStream(self->up, switchLine(c, download_line));
}
else
{
reuseContextBuffer(c);
destroyContext(c);
}
}

hhybridmutex_unlock(&(state->upload_line_map_mutex));
}
else
{
// the connection just closed
}
hhybridmutex_unlock(&(state->download_line_map_mutex));
}

static void callNotifyDownloadLineIsReadyForBind(hevent_t *ev)
{
struct notify_argument_s *nag = hevent_userdata(ev);
notifyDownloadLineIsReadyForBind(nag->self, nag->hash);
free(nag);
}

static void upStream(tunnel_t *self, context_t *c)
Expand Down Expand Up @@ -155,8 +229,9 @@ static void upStream(tunnel_t *self, context_t *c)
hhybridmutex_unlock(&(state->download_line_map_mutex));
cstate->state = kCsUploadPiped;

newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_right, cstate,
TunnelFlowRoutine local_up_stream, TunnelFlowRoutine local_down_stream);
newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_download_line, localUpStream,
localDownStream);

shiftl(buf, sizeof(uint64_t));
writePipeLineLTR(cstate->pipe, c);
}
Expand Down Expand Up @@ -239,6 +314,15 @@ static void upStream(tunnel_t *self, context_t *c)
else
{
// tell upload line to re-check
struct notify_argument_s *evdata = malloc(sizeof(struct notify_argument_s));
*evdata = (struct notify_argument_s){.self = self, .hash = hash};

hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.loop = loops[tid_upload_line];
ev.cb = callNotifyDownloadLineIsReadyForBind;
hevent_set_userdata(&ev, evdata);
hloop_post_event(loops[tid_upload_line], &ev);
}
}
else
Expand Down Expand Up @@ -266,6 +350,36 @@ static void upStream(tunnel_t *self, context_t *c)

break;
}
break;

case kCsUploadInTable:
if (cstate->buffering)
{
cstate->buffering = appendBufferMerge(getContextBufferPool(c), cstate->buffering, c->payload);
}
else
{
cstate->buffering = c->payload;
}
c->payload = NULL;
destroyContext(c);
break;

case kCsUploadPiped:
if (! writePipeLineLTR(cstate->pipe, c))
{
destroyContext(c);
}
break;

case kCsUploadDirect:
self->up->upStream(self->up, c);
break;

case kCsDownloadInTable:
reuseContextBuffer(c);
destroyContext(c);
break;
}
}
else
Expand All @@ -282,7 +396,7 @@ static void upStream(tunnel_t *self, context_t *c)
}
else if (c->fin)
{
switch (cstate->cs)
switch (cstate->state)
{

case kCsUnkown:
Expand Down Expand Up @@ -334,27 +448,6 @@ static void upStream(tunnel_t *self, context_t *c)
}
break;

case kCsDownloadInTable: {
hhybridmutex_lock(&(state->download_line_map_mutex));

hmap_cons_t_iter f_iter = hmap_cons_t_find(&(state->download_line_map), cstate->hash);
bool found = f_iter.ref == hmap_cons_t_end(&(state->download_line_map)).ref;
if (! found)
{
LOGF("HalfDuplexServer: Thread safety is done incorrectly [%s:%d:%s]", __FILENAME__, __LINE__,
__FUNCTION__);
exit(1);
}
hmap_cons_t_erase_at(&(state->upload_line_map), f_iter);

hhybridmutex_unlock(&(state->download_line_map_mutex));

free(cstate);
CSTATE_MUT(c) = NULL;
destroyContext(c);
}
break;

case kCsDownloadDirect: {
halfduplex_server_con_state_t *cstate_upload = LSTATE(cstate->upload_line);
halfduplex_server_con_state_t *cstate_download = LSTATE(cstate->download_line);
Expand All @@ -363,7 +456,7 @@ static void upStream(tunnel_t *self, context_t *c)
if (cstate_upload->pipe)
{
context_t *fctx = newFinContext(cstate_upload->upload_line);

if (! writePipeLineRTL(cstate_upload->pipe, fctx))
{
destroyContext(fctx);
Expand Down Expand Up @@ -398,12 +491,12 @@ static void upStream(tunnel_t *self, context_t *c)
case kCsUploadPiped: {
halfduplex_server_con_state_t *cstate_upload = LSTATE(cstate->upload_line);
CSTATE_MUT(c) = NULL;
free(cstate_upload);

if (! writePipeLineRTL(cstate_upload->pipe, c))
{
destroyContext(c);
}
free(cstate_upload);
}
break;
}
Expand Down
3 changes: 1 addition & 2 deletions ww/pipe_line.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ void initLeft(pipe_line_t *pl, void *arg)
}

void newPipeLine(pipe_line_t **result, tunnel_t *self, uint8_t tid_left, line_t *left_line, uint8_t tid_right,
PipeLineFlowRoutine local_up_stream, PipeLineFlowRoutine local_down_stream);
PipeLineFlowRoutine local_up_stream, PipeLineFlowRoutine local_down_stream)

{
assert(*result == NULL);
Expand All @@ -304,5 +304,4 @@ void newPipeLine(pipe_line_t **result, tunnel_t *self, uint8_t tid_left, line_t

initLeft(pl, NULL);
sendMessage(pl, initRight, NULL, pl->left_tid, pl->right_tid);
return pl;
}
1 change: 1 addition & 0 deletions ww/pipe_line.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
pipe dose not require free call, it automatically gets freed after both lines are finished
sending a fin context to pipe will make it quiet, it will not send anything back even the queued items
will be destroyed on the target thread
i hope you don't use it, currently only used for halfduplex server since there were no other way...
Expand Down

3 comments on commit f61d1d0

@Saleh-Mumtaz
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

درود مهندس،
در ایشیو ها خوندم که فکر و ذهنتون درگیر اپدیت نهایی پروژه هست و وقت ندارید، یک موردی رو میخواستم مطرح کنم،اینکه من به گیتهاب یک برنامه ای مثل v2rayn نگاه میکنم ایشیو ها همه بسته شدن، و ایشیو های باز بسیار اندک هستند.
برای این پروژه هم خودتون اگر بشه ایشیو های قدیمی و نامربوط رو ببندین که برای نسخه بعدی استفاده کنندگان ایشیو بزنن بسیار خوب میشه.

@radkesvat
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

درسته ؛ نسخه بعد که بیاد کلا cleanup میکنم هرچی تا الان هست ؛ بهتون دسترسی میدم اگه وقت داشتید ایشیو هایی که الان هست رو مدیریت کنید و اگه تموم شدن ببندینشون 🙏

@Saleh-Mumtaz
Copy link
Collaborator

@Saleh-Mumtaz Saleh-Mumtaz commented on f61d1d0 Jun 8, 2024 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.