diff --git a/tunnels/client/mux/mux_client.c b/tunnels/client/mux/mux_client.c index 9ac7151..24a3928 100644 --- a/tunnels/client/mux/mux_client.c +++ b/tunnels/client/mux/mux_client.c @@ -43,6 +43,7 @@ typedef struct mux_client_child_con_state_s uint32_t recv_nack; uint16_t cid; bool paused; + bool first_sent; } mux_client_child_con_state_t; @@ -293,13 +294,24 @@ static void upStream(tunnel_t *self, context_t *c) switchLine(c, main_line); mux_client_con_state_t *main_con = CSTATE(c); + lockLine(main_line); + lockLine(current_writing_line); main_con->current_writing_line = current_writing_line; while (bufLen(c->payload) > kMuxMaxFrameLength) { shift_buffer_t *chunk = popBuffer(getContextBufferPool(c)); sliceBufferTo(chunk, c->payload, kMuxMaxFrameLength); - makeDataFrame(chunk, child_con->cid); + + if (! child_con->first_sent) + { + child_con->first_sent = true; + makeOpenFrame(chunk, child_con->cid); + } + else + { + makeDataFrame(chunk, child_con->cid); + } context_t *data_chunk_ctx = newContextFrom(c); data_chunk_ctx->payload = chunk; @@ -307,17 +319,24 @@ static void upStream(tunnel_t *self, context_t *c) if (! isAlive(main_line)) { + unLockLine(main_line); + unLockLine(current_writing_line); + reuseContextPayload(c); destroyContext(c); - return; } } - lockLine(main_line); - lockLine(current_writing_line); - - makeDataFrame(c->payload, child_con->cid); + if (! child_con->first_sent) + { + child_con->first_sent = true; + makeOpenFrame(c->payload, child_con->cid); + } + else + { + makeDataFrame(c->payload, child_con->cid); + } self->up->upStream(self->up, c); @@ -337,16 +356,6 @@ static void upStream(tunnel_t *self, context_t *c) child_con = createChildConnection(main_con, c->line); CSTATE_MUT(c) = child_con; self->dw->downStream(self->dw, newEstContext(c->line)); - if (! isAlive(c->line)) - { - destroyContext(c); - return; - } - - context_t *data_init_ctx = newContext(main_con->line); - data_init_ctx->payload = popBuffer(getLineBufferPool(main_con->line)); - makeOpenFrame(data_init_ctx->payload, child_con->cid); - self->up->upStream(self->up, data_init_ctx); destroyContext(c); } @@ -384,6 +393,11 @@ static void downStream(tunnel_t *self, context_t *c) { mux_client_con_state_t *main_con = CSTATE(c); + if (WW_UNLIKELY(c->est)) + { + destroyContext(c); + return; + } if (c->fin) { destroyMainConnecton(main_con); @@ -400,6 +414,7 @@ static void downStream(tunnel_t *self, context_t *c) { LOGE("MuxClient: payload length < kMuxMinFrameLength"); destroyMainConnecton(main_con); + self->up->upStream(self->up, newFinContext(c->line)); destroyContext(c); return; } @@ -434,9 +449,10 @@ static void downStream(tunnel_t *self, context_t *c) if (WW_UNLIKELY(bufLen(frame_payload) <= 0)) { - LOGE("MuxClient: payload length < 0"); + LOGE("MuxClient: payload length <= 0"); reuseBuffer(getLineBufferPool(main_con->line), frame_payload); destroyMainConnecton(main_con); + self->up->upStream(self->up, newFinContext(c->line)); destroyContext(c); return; } @@ -455,6 +471,7 @@ static void downStream(tunnel_t *self, context_t *c) LOGE("MuxClient: incorrect frame flag"); reuseBuffer(getLineBufferPool(main_con->line), frame_payload); destroyMainConnecton(main_con); + self->up->upStream(self->up, newFinContext(c->line)); destroyContext(c); return; break; diff --git a/tunnels/server/mux/mux_server.c b/tunnels/server/mux/mux_server.c index c84aefc..4257bee 100644 --- a/tunnels/server/mux/mux_server.c +++ b/tunnels/server/mux/mux_server.c @@ -174,6 +174,8 @@ static void upStream(tunnel_t *self, context_t *c) { LOGE("MuxServer: payload length < kMuxMinFrameLength"); destroyMainConnecton(main_con); + self->dw->downStream(self->dw, newFinContext(c->line)); + destroyContext(c); return; } @@ -190,9 +192,10 @@ static void upStream(tunnel_t *self, context_t *c) { if (WW_UNLIKELY(bufLen(frame_payload) <= 0)) { - LOGE("MuxServer: payload length < 0"); + LOGE("MuxServer: payload length <= 0"); reuseBuffer(getLineBufferPool(main_con->line), frame_payload); destroyMainConnecton(main_con); + self->dw->downStream(self->dw, newFinContext(c->line)); destroyContext(c); continue; } @@ -244,9 +247,10 @@ static void upStream(tunnel_t *self, context_t *c) case kMuxFlagData: { if (WW_UNLIKELY(bufLen(frame_payload) <= 0)) { - LOGE("MuxServer: payload length < 0"); + LOGE("MuxServer: payload length <= 0"); reuseBuffer(getLineBufferPool(main_con->line), frame_payload); destroyMainConnecton(main_con); + self->dw->downStream(self->dw, newFinContext(c->line)); destroyContext(c); return; } @@ -265,6 +269,7 @@ static void upStream(tunnel_t *self, context_t *c) LOGE("MuxServer: incorrect frame flag"); reuseBuffer(getLineBufferPool(main_con->line), frame_payload); destroyMainConnecton(main_con); + self->dw->downStream(self->dw, newFinContext(c->line)); destroyContext(c); return; break; @@ -310,13 +315,69 @@ static void upStream(tunnel_t *self, context_t *c) static void downStream(tunnel_t *self, context_t *c) { - mux_server_con_state_t *main_con = CSTATE(c); + mux_server_child_con_state_t *child_con = CSTATE(c); + + if (c->payload != NULL) + { + line_t *current_writing_line = c->line; + line_t *main_line = child_con->parent; + mux_server_con_state_t *main_con = LSTATE(main_line); + + switchLine(c, main_line); + + lockLine(main_line); + lockLine(current_writing_line); + main_con->current_writing_line = current_writing_line; + + while (bufLen(c->payload) > kMuxMaxFrameLength) + { + shift_buffer_t *chunk = popBuffer(getContextBufferPool(c)); + sliceBufferTo(chunk, c->payload, kMuxMaxFrameLength); + makeDataFrame(chunk, child_con->cid); + + context_t *data_chunk_ctx = newContextFrom(c); + data_chunk_ctx->payload = chunk; + self->dw->downStream(self->dw, data_chunk_ctx); + + if (! isAlive(main_line)) + { + unLockLine(main_line); + unLockLine(current_writing_line); + + reuseContextPayload(c); + destroyContext(c); + return; + } + } + + makeDataFrame(c->payload, child_con->cid); + + self->up->upStream(self->up, c); - if (c->fin) + if (isAlive(main_line)) + { + main_con->current_writing_line = NULL; + } + + unLockLine(main_line); + unLockLine(current_writing_line); + } + else { - destroyMainConnecton(main_con); - destroyContext(c); - return; + if (c->fin) + { + context_t *data_fin_ctx = newContext(child_con->parent); + data_fin_ctx->payload = popBuffer(getLineBufferPool(child_con->parent)); + makeCloseFrame(data_fin_ctx->payload, child_con->cid); + destroyChildConnecton(child_con); + self->dw->downStream(self->dw, data_fin_ctx); + return; + } + if (WW_UNLIKELY(c->est)) + { + destroyContext(c); + return; + } } } @@ -326,7 +387,6 @@ tunnel_t *newMuxServer(node_instance_context_t *instance_info) mux_server_state_t *state = globalMalloc(sizeof(mux_server_state_t)); memset(state, 0, sizeof(mux_server_state_t)); - tunnel_t *t = newTunnel(); t->state = state; t->upStream = &upStream; diff --git a/tunnels/shared/mux/mux_frame.h b/tunnels/shared/mux/mux_frame.h index b714207..3571edc 100644 --- a/tunnels/shared/mux/mux_frame.h +++ b/tunnels/shared/mux/mux_frame.h @@ -22,7 +22,7 @@ enum kMuxFlagFlow = 2, kMuxFlagData = 3, kMuxMinFrameLength = (sizeof(mux_frame_t) - sizeof(mux_length_t)), - kMuxMaxFrameLength = (1U << (8*sizeof(mux_length_t))) - 4 + kMuxMaxFrameLength = (1U << (8*sizeof(mux_length_t))) - (1+kMuxMinFrameLength) }; @@ -32,7 +32,7 @@ enum static void makeOpenFrame(shift_buffer_t *buf, cid_t cid) { shiftl(buf, sizeof(mux_frame_t)); - mux_frame_t frame = {.length = sizeof(mux_frame_t) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagOpen}; + mux_frame_t frame = {.length = bufLen(buf) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagOpen}; writeRaw(buf, &frame, sizeof(mux_frame_t)); } @@ -46,6 +46,6 @@ static void makeCloseFrame(shift_buffer_t *buf, cid_t cid) static void makeDataFrame(shift_buffer_t *buf, cid_t cid) { shiftl(buf, sizeof(mux_frame_t)); - mux_frame_t frame = {.length = sizeof(mux_frame_t) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagData}; + mux_frame_t frame = {.length = bufLen(buf) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagData}; writeRaw(buf, &frame, sizeof(mux_frame_t)); }