Skip to content

Commit

Permalink
rework queue
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Apr 6, 2024
1 parent 4cc5083 commit 6285460
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 240 deletions.
140 changes: 66 additions & 74 deletions tunnels/adapters/connector/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,75 @@

static void cleanup(connector_con_state_t *cstate)
{
if (cstate->io)
{
hevent_set_userdata(cstate->io, NULL);
}
hio_t *last_resumed_io = NULL;

if (cstate->current_w)
while (contextQueueLen(cstate->data_queue) > 0)
{
if (cstate->current_w->src_io != NULL &&
hio_exists(cstate->current_w->line->loop, cstate->current_w->fd) &&
!hio_is_closed(cstate->current_w->src_io))
context_t *cw = contextQueuePop(cstate->data_queue);
if (cw->src_io != NULL && last_resumed_io != cw->src_io)
{
last_resumed_io = cstate->current_w->src_io;
hio_read(cstate->current_w->src_io);
last_resumed_io = cw->src_io;
hio_read(cw->src_io);
}
if (cstate->current_w->payload)
if (cw->payload)
{
DISCARD_CONTEXT(cstate->current_w);
DISCARD_CONTEXT(cw);
}

destroyContext(cstate->current_w);
destroyContext(cw);
}

while (contextQueueLen(cstate->queue) > 0)
while (contextQueueLen(cstate->finished_queue) > 0)
{
context_t *cw = contextQueuePop(cstate->queue);
if (cw->src_io != NULL && !hio_is_closed(cw->src_io))
context_t *cw = contextQueuePop(cstate->finished_queue);
if (cw->src_io != NULL && last_resumed_io != cw->src_io)
{
if (last_resumed_io != cw->src_io)
{
last_resumed_io = cw->src_io;
hio_read(cw->src_io);
}
last_resumed_io = cw->src_io;
hio_read(cw->src_io);
}
if (cw->payload)
{
DISCARD_CONTEXT(cw);
}
destroyContext(cw);
}
destroyContextQueue(cstate->queue);

destroyContextQueue(cstate->data_queue);
destroyContextQueue(cstate->finished_queue);
free(cstate);
}
static bool resume_write_queue(connector_con_state_t *cstate)
{
context_queue_t *queue = (cstate)->queue;
context_queue_t *data_queue = (cstate)->data_queue;
context_queue_t *finished_queue = (cstate)->finished_queue;
hio_t *io = cstate->io;
hio_t *last_resumed_io = NULL;
while (contextQueueLen(queue) > 0)
while (contextQueueLen(data_queue) > 0)
{
context_t *cw = contextQueuePop(queue);
hio_t *upstream_io = cw->src_io;

if (cw->payload == NULL)
{
destroyContext(cw);

if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
hio_read(upstream_io);
}
continue;
}
context_t *cw = contextQueuePop(data_queue);

int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, rawBuf(cw->payload), bytes);
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;
contextQueuePush(cstate->finished_queue, cw);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = cw;
return false; // write pending
}
else
}
// data data_queue is empty
hio_t *last_resumed_io = NULL;
while (contextQueueLen(finished_queue) > 0)
{
context_t *cw = contextQueuePop(finished_queue);
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;
contextQueuePush(queue, cw);
last_resumed_io = upstream_io;
hio_read(upstream_io);
}
destroyContext(cw);
}
return true;
}
Expand All @@ -86,33 +81,34 @@ static void on_write_complete(hio_t *io, const void *buf, int writebytes)
{
// resume the read on other end of the connection
connector_con_state_t *cstate = (connector_con_state_t *)(hevent_userdata(io));
if (cstate == NULL || cstate->current_w == NULL)
if (cstate == NULL)
return;

if (hio_write_is_complete(io))
{
hio_setcb_write(cstate->io, NULL);
cstate->write_paused = false;
context_t *cw = cstate->current_w;
cstate->current_w = NULL;
context_queue_t *queue = cstate->queue;
hio_t *upstream_io = cw->src_io;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;

if (contextQueueLen(queue) > 0)
{
contextQueuePush(cstate->queue, cw);
context_queue_t *data_queue = cstate->data_queue;
context_queue_t *finished_queue = cstate->finished_queue;
if (contextQueueLen(data_queue) > 0)
if (!resume_write_queue(cstate))
{
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
cstate->write_paused = true;
return;
}
}
else

hio_t *last_resumed_io = NULL;
while (contextQueueLen(finished_queue) > 0)
{
if (upstream_io != NULL && hio_exists(cw->line->loop, cw->fd) && !hio_is_closed(upstream_io))
context_t *cw = contextQueuePop(finished_queue);
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
hio_read(upstream_io);
}
destroyContext(cw);
}
}
Expand Down Expand Up @@ -153,7 +149,6 @@ static void on_close(hio_t *io)
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;
context_t *context = newFinContext(line);
context->src_io = io;
self->downStream(self, context);
}
}
Expand Down Expand Up @@ -198,23 +193,22 @@ void connectorUpStream(tunnel_t *self, context_t *c)
{
if (c->src_io)
hio_read_stop(c->src_io);
contextQueuePush(cstate->queue, c);
contextQueuePush(cstate->data_queue, c);
}
else
{

int bytes = bufLen(c->payload);
int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = c;
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
if (c->src_io)
{
c->fd = hio_fd(c->src_io);
hio_read_stop(c->src_io);
}
reuseBuffer(cstate->buffer_pool, c->payload);
c->payload = NULL;

contextQueuePush(cstate->finished_queue, c);
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
}
else
{
Expand All @@ -239,7 +233,9 @@ void connectorUpStream(tunnel_t *self, context_t *c)
cstate->buffer_pool = buffer_pools[c->line->tid];
cstate->tunnel = self;
cstate->line = c->line;
cstate->queue = newContextQueue(cstate->buffer_pool);
cstate->data_queue = newContextQueue(cstate->buffer_pool);
cstate->finished_queue = newContextQueue(cstate->buffer_pool);

cstate->write_paused = true;

socket_context_t final_ctx = {0};
Expand Down Expand Up @@ -285,8 +281,7 @@ void connectorUpStream(tunnel_t *self, context_t *c)
if (!connectorResolvedomain(&final_ctx))
{
free(final_ctx.domain);
destroyContextQueue(cstate->queue);
free(CSTATE(c));
cleanup(cstate);
CSTATE_MUT(c) = NULL;
goto fail;
}
Expand All @@ -299,8 +294,7 @@ void connectorUpStream(tunnel_t *self, context_t *c)
if (sockfd < 0)
{
LOGE("Connector: socket fd < 0");
destroyContextQueue(cstate->queue);
free(CSTATE(c));
cleanup(cstate);
CSTATE_MUT(c) = NULL;
goto fail;
}
Expand Down Expand Up @@ -339,7 +333,6 @@ void connectorUpStream(tunnel_t *self, context_t *c)
else if (c->fin)
{
hio_t *io = cstate->io;
hevent_set_userdata(io, NULL);
cleanup(cstate);
CSTATE_MUT(c) = NULL;
destroyContext(c);
Expand Down Expand Up @@ -387,8 +380,7 @@ void connectorDownStream(tunnel_t *self, context_t *c)
}
else if (c->fin)
{
hio_t *io = CSTATE(c)->io;
hevent_set_userdata(io, NULL);

cleanup(cstate);
CSTATE_MUT(c) = NULL;
self->dw->downStream(self->dw, c);
Expand Down
4 changes: 2 additions & 2 deletions tunnels/adapters/connector/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ typedef struct connector_con_state_s
hio_t *io_back;

buffer_pool_t *buffer_pool;
context_queue_t *queue;
context_t *current_w;
context_queue_t *data_queue;
context_queue_t *finished_queue;

bool write_paused;
bool established;
Expand Down
3 changes: 2 additions & 1 deletion tunnels/adapters/connector/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ void connectorPacketUpStream(tunnel_t *self, context_t *c)
cstate->tunnel = self;
cstate->line = c->line;
cstate->write_paused = false;
cstate->queue = NULL;
cstate->data_queue = NULL;
cstate->finished_queue = NULL;

// sockaddr_set_ipport(&(dest->addr),"www.gstatic.com",80);

Expand Down
Loading

0 comments on commit 6285460

Please sign in to comment.