Skip to content

Commit

Permalink
rework and finish other parts of mux
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 26, 2024
1 parent 10a7f68 commit 37c73dc
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 28 deletions.
51 changes: 34 additions & 17 deletions tunnels/client/mux/mux_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct mux_client_child_con_state_s
uint32_t recv_nack;
uint16_t cid;
bool paused;
bool first_sent;

} mux_client_child_con_state_t;

Expand Down Expand Up @@ -293,31 +294,49 @@ static void upStream(tunnel_t *self, context_t *c)
switchLine(c, main_line);
mux_client_con_state_t *main_con = CSTATE(c);

lockLine(main_line);
lockLine(current_writing_line);
main_con->current_writing_line = current_writing_line;

while (bufLen(c->payload) > kMuxMaxFrameLength)
{
shift_buffer_t *chunk = popBuffer(getContextBufferPool(c));
sliceBufferTo(chunk, c->payload, kMuxMaxFrameLength);
makeDataFrame(chunk, child_con->cid);

if (! child_con->first_sent)
{
child_con->first_sent = true;
makeOpenFrame(chunk, child_con->cid);
}
else
{
makeDataFrame(chunk, child_con->cid);
}

context_t *data_chunk_ctx = newContextFrom(c);
data_chunk_ctx->payload = chunk;
self->up->upStream(self->up, data_chunk_ctx);

if (! isAlive(main_line))
{
unLockLine(main_line);
unLockLine(current_writing_line);

reuseContextPayload(c);
destroyContext(c);

return;
}
}

lockLine(main_line);
lockLine(current_writing_line);

makeDataFrame(c->payload, child_con->cid);
if (! child_con->first_sent)
{
child_con->first_sent = true;
makeOpenFrame(c->payload, child_con->cid);
}
else
{
makeDataFrame(c->payload, child_con->cid);
}

self->up->upStream(self->up, c);

Expand All @@ -337,16 +356,6 @@ static void upStream(tunnel_t *self, context_t *c)
child_con = createChildConnection(main_con, c->line);
CSTATE_MUT(c) = child_con;
self->dw->downStream(self->dw, newEstContext(c->line));
if (! isAlive(c->line))
{
destroyContext(c);
return;
}

context_t *data_init_ctx = newContext(main_con->line);
data_init_ctx->payload = popBuffer(getLineBufferPool(main_con->line));
makeOpenFrame(data_init_ctx->payload, child_con->cid);
self->up->upStream(self->up, data_init_ctx);

destroyContext(c);
}
Expand Down Expand Up @@ -384,6 +393,11 @@ static void downStream(tunnel_t *self, context_t *c)
{
mux_client_con_state_t *main_con = CSTATE(c);

if (WW_UNLIKELY(c->est))
{
destroyContext(c);
return;
}
if (c->fin)
{
destroyMainConnecton(main_con);
Expand All @@ -400,6 +414,7 @@ static void downStream(tunnel_t *self, context_t *c)
{
LOGE("MuxClient: payload length < kMuxMinFrameLength");
destroyMainConnecton(main_con);
self->up->upStream(self->up, newFinContext(c->line));
destroyContext(c);
return;
}
Expand Down Expand Up @@ -434,9 +449,10 @@ static void downStream(tunnel_t *self, context_t *c)

if (WW_UNLIKELY(bufLen(frame_payload) <= 0))
{
LOGE("MuxClient: payload length < 0");
LOGE("MuxClient: payload length <= 0");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
self->up->upStream(self->up, newFinContext(c->line));
destroyContext(c);
return;
}
Expand All @@ -455,6 +471,7 @@ static void downStream(tunnel_t *self, context_t *c)
LOGE("MuxClient: incorrect frame flag");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
self->up->upStream(self->up, newFinContext(c->line));
destroyContext(c);
return;
break;
Expand Down
76 changes: 68 additions & 8 deletions tunnels/server/mux/mux_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ static void upStream(tunnel_t *self, context_t *c)
{
LOGE("MuxServer: payload length < kMuxMinFrameLength");
destroyMainConnecton(main_con);
self->dw->downStream(self->dw, newFinContext(c->line));

destroyContext(c);
return;
}
Expand All @@ -190,9 +192,10 @@ static void upStream(tunnel_t *self, context_t *c)
{
if (WW_UNLIKELY(bufLen(frame_payload) <= 0))
{
LOGE("MuxServer: payload length < 0");
LOGE("MuxServer: payload length <= 0");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
self->dw->downStream(self->dw, newFinContext(c->line));
destroyContext(c);
continue;
}
Expand Down Expand Up @@ -244,9 +247,10 @@ static void upStream(tunnel_t *self, context_t *c)
case kMuxFlagData: {
if (WW_UNLIKELY(bufLen(frame_payload) <= 0))
{
LOGE("MuxServer: payload length < 0");
LOGE("MuxServer: payload length <= 0");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
self->dw->downStream(self->dw, newFinContext(c->line));
destroyContext(c);
return;
}
Expand All @@ -265,6 +269,7 @@ static void upStream(tunnel_t *self, context_t *c)
LOGE("MuxServer: incorrect frame flag");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
self->dw->downStream(self->dw, newFinContext(c->line));
destroyContext(c);
return;
break;
Expand Down Expand Up @@ -310,13 +315,69 @@ static void upStream(tunnel_t *self, context_t *c)

static void downStream(tunnel_t *self, context_t *c)
{
mux_server_con_state_t *main_con = CSTATE(c);
mux_server_child_con_state_t *child_con = CSTATE(c);

if (c->payload != NULL)
{
line_t *current_writing_line = c->line;
line_t *main_line = child_con->parent;
mux_server_con_state_t *main_con = LSTATE(main_line);

switchLine(c, main_line);

lockLine(main_line);
lockLine(current_writing_line);
main_con->current_writing_line = current_writing_line;

while (bufLen(c->payload) > kMuxMaxFrameLength)
{
shift_buffer_t *chunk = popBuffer(getContextBufferPool(c));
sliceBufferTo(chunk, c->payload, kMuxMaxFrameLength);
makeDataFrame(chunk, child_con->cid);

context_t *data_chunk_ctx = newContextFrom(c);
data_chunk_ctx->payload = chunk;
self->dw->downStream(self->dw, data_chunk_ctx);

if (! isAlive(main_line))
{
unLockLine(main_line);
unLockLine(current_writing_line);

reuseContextPayload(c);
destroyContext(c);
return;
}
}

makeDataFrame(c->payload, child_con->cid);

self->up->upStream(self->up, c);

if (c->fin)
if (isAlive(main_line))
{
main_con->current_writing_line = NULL;
}

unLockLine(main_line);
unLockLine(current_writing_line);
}
else
{
destroyMainConnecton(main_con);
destroyContext(c);
return;
if (c->fin)
{
context_t *data_fin_ctx = newContext(child_con->parent);
data_fin_ctx->payload = popBuffer(getLineBufferPool(child_con->parent));
makeCloseFrame(data_fin_ctx->payload, child_con->cid);
destroyChildConnecton(child_con);
self->dw->downStream(self->dw, data_fin_ctx);
return;
}
if (WW_UNLIKELY(c->est))
{
destroyContext(c);
return;
}
}
}

Expand All @@ -326,7 +387,6 @@ tunnel_t *newMuxServer(node_instance_context_t *instance_info)
mux_server_state_t *state = globalMalloc(sizeof(mux_server_state_t));
memset(state, 0, sizeof(mux_server_state_t));


tunnel_t *t = newTunnel();
t->state = state;
t->upStream = &upStream;
Expand Down
6 changes: 3 additions & 3 deletions tunnels/shared/mux/mux_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum
kMuxFlagFlow = 2,
kMuxFlagData = 3,
kMuxMinFrameLength = (sizeof(mux_frame_t) - sizeof(mux_length_t)),
kMuxMaxFrameLength = (1U << (8*sizeof(mux_length_t))) - 4
kMuxMaxFrameLength = (1U << (8*sizeof(mux_length_t))) - (1+kMuxMinFrameLength)
};


Expand All @@ -32,7 +32,7 @@ enum
static void makeOpenFrame(shift_buffer_t *buf, cid_t cid)
{
shiftl(buf, sizeof(mux_frame_t));
mux_frame_t frame = {.length = sizeof(mux_frame_t) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagOpen};
mux_frame_t frame = {.length = bufLen(buf) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagOpen};
writeRaw(buf, &frame, sizeof(mux_frame_t));
}

Expand All @@ -46,6 +46,6 @@ static void makeCloseFrame(shift_buffer_t *buf, cid_t cid)
static void makeDataFrame(shift_buffer_t *buf, cid_t cid)
{
shiftl(buf, sizeof(mux_frame_t));
mux_frame_t frame = {.length = sizeof(mux_frame_t) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagData};
mux_frame_t frame = {.length = bufLen(buf) - sizeof(frame.length), .cid = cid, .flags = kMuxFlagData};
writeRaw(buf, &frame, sizeof(mux_frame_t));
}

0 comments on commit 37c73dc

Please sign in to comment.