diff --git a/tunnels/server/halfduplex/halfduplex_server.c b/tunnels/server/halfduplex/halfduplex_server.c index 47d07792..170fc56e 100644 --- a/tunnels/server/halfduplex/halfduplex_server.c +++ b/tunnels/server/halfduplex/halfduplex_server.c @@ -1,5 +1,6 @@ #include "halfduplex_server.h" #include "basic_types.h" +#include "buffer_pool.h" #include "hbase.h" #include "hlog.h" #include "hmutex.h" @@ -70,8 +71,81 @@ static void localDownStream(tunnel_t *self, context_t *c, pipe_line_t *pl) { } -static void notifyDownloadLineIsReadyForBind(tunnel_t *self, context_t *c, pipe_line_t *pl) +static void notifyDownloadLineIsReadyForBind(tunnel_t *self, hash_t hash) { + halfduplex_server_state_t *state = STATE(self); + + hhybridmutex_lock(&(state->download_line_map_mutex)); + + hmap_cons_t_iter f_iter = hmap_cons_t_find(&(state->download_line_map), hash); + bool found = f_iter.ref == hmap_cons_t_end(&(state->download_line_map)).ref; + + if (found) + { + // download pair is found + hmap_cons_t_erase_at(&(state->download_line_map), f_iter); + uint8_t tid_download_line = (*f_iter.ref).second->download_line->tid; + + hhybridmutex_lock(&(state->upload_line_map_mutex)); + + f_iter = hmap_cons_t_find(&(state->upload_line_map), hash); + found = f_iter.ref == hmap_cons_t_end(&(state->upload_line_map)).ref; + + if (found) + { + // upload pair is found + hmap_cons_t_erase_at(&(state->upload_line_map), f_iter); + uint8_t tid_upload_line = (*f_iter.ref).second->upload_line->tid; + + line_t *download_line = ((halfduplex_server_con_state_t *) ((*f_iter.ref).second))->download_line; + + halfduplex_server_con_state_t *download_line_cstate = + ((halfduplex_server_con_state_t *) ((*f_iter.ref).second)); + + assert(download_line_cstate->state == kCsDownloadInTable); + + download_line_cstate->state = kCsDownloadDirect; + download_line_cstate->upload_line = c->line; + + lockLine(download_line); + context_t *i_ctx = newInitContext(download_line); + self->up->upStream(self->up, i_ctx); + + if (! isAlive(download_line)) + { + unLockLine(download_line); + reuseContextBuffer(c); + destroyContext(c); + return; + } + + unLockLine(download_line); + + if (bufLen(buf) > 0) + { + self->up->upStream(self->up, switchLine(c, download_line)); + } + else + { + reuseContextBuffer(c); + destroyContext(c); + } + } + + hhybridmutex_unlock(&(state->upload_line_map_mutex)); + } + else + { + // the connection just closed + } + hhybridmutex_unlock(&(state->download_line_map_mutex)); +} + +static void callNotifyDownloadLineIsReadyForBind(hevent_t *ev) +{ + struct notify_argument_s *nag = hevent_userdata(ev); + notifyDownloadLineIsReadyForBind(nag->self, nag->hash); + free(nag); } static void upStream(tunnel_t *self, context_t *c) @@ -155,8 +229,9 @@ static void upStream(tunnel_t *self, context_t *c) hhybridmutex_unlock(&(state->download_line_map_mutex)); cstate->state = kCsUploadPiped; - newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_right, cstate, - TunnelFlowRoutine local_up_stream, TunnelFlowRoutine local_down_stream); + newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_download_line, localUpStream, + localDownStream); + shiftl(buf, sizeof(uint64_t)); writePipeLineLTR(cstate->pipe, c); } @@ -239,6 +314,15 @@ static void upStream(tunnel_t *self, context_t *c) else { // tell upload line to re-check + struct notify_argument_s *evdata = malloc(sizeof(struct notify_argument_s)); + *evdata = (struct notify_argument_s){.self = self, .hash = hash}; + + hevent_t ev; + memset(&ev, 0, sizeof(ev)); + ev.loop = loops[tid_upload_line]; + ev.cb = callNotifyDownloadLineIsReadyForBind; + hevent_set_userdata(&ev, evdata); + hloop_post_event(loops[tid_upload_line], &ev); } } else @@ -266,6 +350,36 @@ static void upStream(tunnel_t *self, context_t *c) break; } + break; + + case kCsUploadInTable: + if (cstate->buffering) + { + cstate->buffering = appendBufferMerge(getContextBufferPool(c), cstate->buffering, c->payload); + } + else + { + cstate->buffering = c->payload; + } + c->payload = NULL; + destroyContext(c); + break; + + case kCsUploadPiped: + if (! writePipeLineLTR(cstate->pipe, c)) + { + destroyContext(c); + } + break; + + case kCsUploadDirect: + self->up->upStream(self->up, c); + break; + + case kCsDownloadInTable: + reuseContextBuffer(c); + destroyContext(c); + break; } } else @@ -282,7 +396,7 @@ static void upStream(tunnel_t *self, context_t *c) } else if (c->fin) { - switch (cstate->cs) + switch (cstate->state) { case kCsUnkown: @@ -334,27 +448,6 @@ static void upStream(tunnel_t *self, context_t *c) } break; - case kCsDownloadInTable: { - hhybridmutex_lock(&(state->download_line_map_mutex)); - - hmap_cons_t_iter f_iter = hmap_cons_t_find(&(state->download_line_map), cstate->hash); - bool found = f_iter.ref == hmap_cons_t_end(&(state->download_line_map)).ref; - if (! found) - { - LOGF("HalfDuplexServer: Thread safety is done incorrectly [%s:%d:%s]", __FILENAME__, __LINE__, - __FUNCTION__); - exit(1); - } - hmap_cons_t_erase_at(&(state->upload_line_map), f_iter); - - hhybridmutex_unlock(&(state->download_line_map_mutex)); - - free(cstate); - CSTATE_MUT(c) = NULL; - destroyContext(c); - } - break; - case kCsDownloadDirect: { halfduplex_server_con_state_t *cstate_upload = LSTATE(cstate->upload_line); halfduplex_server_con_state_t *cstate_download = LSTATE(cstate->download_line); @@ -363,7 +456,7 @@ static void upStream(tunnel_t *self, context_t *c) if (cstate_upload->pipe) { context_t *fctx = newFinContext(cstate_upload->upload_line); - + if (! writePipeLineRTL(cstate_upload->pipe, fctx)) { destroyContext(fctx); @@ -398,12 +491,12 @@ static void upStream(tunnel_t *self, context_t *c) case kCsUploadPiped: { halfduplex_server_con_state_t *cstate_upload = LSTATE(cstate->upload_line); CSTATE_MUT(c) = NULL; - free(cstate_upload); if (! writePipeLineRTL(cstate_upload->pipe, c)) { destroyContext(c); } + free(cstate_upload); } break; } diff --git a/ww/pipe_line.c b/ww/pipe_line.c index 9a601a15..27bc9385 100644 --- a/ww/pipe_line.c +++ b/ww/pipe_line.c @@ -285,7 +285,7 @@ void initLeft(pipe_line_t *pl, void *arg) } void newPipeLine(pipe_line_t **result, tunnel_t *self, uint8_t tid_left, line_t *left_line, uint8_t tid_right, - PipeLineFlowRoutine local_up_stream, PipeLineFlowRoutine local_down_stream); + PipeLineFlowRoutine local_up_stream, PipeLineFlowRoutine local_down_stream) { assert(*result == NULL); @@ -304,5 +304,4 @@ void newPipeLine(pipe_line_t **result, tunnel_t *self, uint8_t tid_left, line_t initLeft(pl, NULL); sendMessage(pl, initRight, NULL, pl->left_tid, pl->right_tid); - return pl; } diff --git a/ww/pipe_line.h b/ww/pipe_line.h index 071111e1..76a1f611 100644 --- a/ww/pipe_line.h +++ b/ww/pipe_line.h @@ -12,6 +12,7 @@ pipe dose not require free call, it automatically gets freed after both lines are finished sending a fin context to pipe will make it quiet, it will not send anything back even the queued items + will be destroyed on the target thread i hope you don't use it, currently only used for halfduplex server since there were no other way...