Skip to content

Commit

Permalink
rework queue
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Apr 3, 2024
1 parent c5eab11 commit e2599f5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 103 deletions.
65 changes: 30 additions & 35 deletions tunnels/adapters/connector/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@ static void cleanup(connector_con_state_t *cstate)
destroyContextQueue(cstate->queue);
free(cstate);
}

static bool resume_write_queue(connector_con_state_t *cstate)
{
context_queue_t *queue = (cstate)->queue;
context_t **cw = &((cstate)->current_w);
hio_t *io = cstate->io;
hio_t *last_resumed_io = NULL;
while (contextQueueLen(queue) > 0)
{
*cw = contextQueuePop(queue);
hio_t *upstream_io = (*cw)->src_io;
context_t *cw = contextQueuePop(queue);
hio_t *upstream_io = cw->src_io;

if ((*cw)->payload == NULL)
if (cw->payload == NULL)
{
destroyContext((*cw));
*cw = NULL;
destroyContext(cw);

if (upstream_io && last_resumed_io != upstream_io)
{
Expand All @@ -40,18 +37,18 @@ static bool resume_write_queue(connector_con_state_t *cstate)
continue;
}

int bytes = bufLen((*cw)->payload);
int nwrite = hio_write(io, rawBuf((*cw)->payload), bytes);
int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, rawBuf(cw->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = cw;
return false; // write pending
}
else
{
reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
contextQueuePush(queue, (*cw));
*cw = NULL;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;
contextQueuePush(queue, cw);
}
}
return true;
Expand All @@ -61,33 +58,32 @@ static void on_write_complete(hio_t *io, const void *buf, int writebytes)
{
// resume the read on other end of the connection
connector_con_state_t *cstate = (connector_con_state_t *)(hevent_userdata(io));
if (cstate == NULL)
if (cstate == NULL || cstate->current_w == NULL)
return;

context_t **cw = &((cstate)->current_w);
context_queue_t *queue = (cstate)->queue;

hio_t *upstream_io = (*cw)->src_io;
if (hio_write_is_complete(io))
{

reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
context_t *cpy_ctx = (*cw);
*cw = NULL;
hio_setcb_write(io, NULL);
cstate->write_paused = false;
context_t *cw = cstate->current_w;
cstate->current_w = NULL;
context_queue_t *queue = cstate->queue;
hio_t *upstream_io = cw->src_io;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;

if (contextQueueLen(queue) > 0)
{
contextQueuePush(cstate->queue, cpy_ctx);
if (resume_write_queue(cstate))
hio_setcb_write(io, NULL);
contextQueuePush(cstate->queue, cw);
if (!resume_write_queue(cstate))
{
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
}
}
else
{
destroyContext(cpy_ctx);
cstate->write_paused = false;
hio_setcb_write(io, NULL);

destroyContext(cw);
if (upstream_io)
hio_read(upstream_io);
}
Expand Down Expand Up @@ -178,22 +174,21 @@ void connectorUpStream(tunnel_t *self, context_t *c)
}
else
{
cstate->current_w = c;

int bytes = bufLen(c->payload);
int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = c;
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
if (c->src_io)
hio_read_stop(c->src_io);
hio_setcb_write(cstate->io, on_write_complete);
}
else
{
reuseBuffer(cstate->buffer_pool, cstate->current_w->payload);
cstate->current_w->payload = NULL;
cstate->current_w = NULL;
reuseBuffer(cstate->buffer_pool, c->payload);
c->payload = NULL;
destroyContext(c);
}
}
Expand Down
64 changes: 30 additions & 34 deletions tunnels/adapters/tcp_connector/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ static void cleanup(tcp_connector_con_state_t *cstate)
static bool resume_write_queue(tcp_connector_con_state_t *cstate)
{
context_queue_t *queue = (cstate)->queue;
context_t **cw = &((cstate)->current_w);
hio_t *io = cstate->io;
hio_t *last_resumed_io = NULL;
while (contextQueueLen(queue) > 0)
{
*cw = contextQueuePop(queue);
hio_t *upstream_io = (*cw)->src_io;
context_t *cw = contextQueuePop(queue);
hio_t *upstream_io = cw->src_io;

if ((*cw)->payload == NULL)
if (cw->payload == NULL)
{
destroyContext((*cw));
*cw = NULL;
destroyContext(cw);

if (upstream_io && last_resumed_io != upstream_io)
{
Expand All @@ -41,18 +39,18 @@ static bool resume_write_queue(tcp_connector_con_state_t *cstate)
continue;
}

int bytes = bufLen((*cw)->payload);
int nwrite = hio_write(io, rawBuf((*cw)->payload), bytes);
int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, rawBuf(cw->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = cw;
return false; // write pending
}
else
{
reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
contextQueuePush(queue, (*cw));
*cw = NULL;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;
contextQueuePush(queue, cw);
}
}
return true;
Expand All @@ -62,33 +60,32 @@ static void on_write_complete(hio_t *io, const void *buf, int writebytes)
{
// resume the read on other end of the connection
tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *)(hevent_userdata(io));
if (cstate == NULL)
if (cstate == NULL || cstate->current_w == NULL)
return;

context_t **cw = &((cstate)->current_w);
context_queue_t *queue = (cstate)->queue;

hio_t *upstream_io = (*cw)->src_io;
if (hio_write_is_complete(io))
{

reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
context_t *cpy_ctx = (*cw);
*cw = NULL;
hio_setcb_write(io, NULL);
cstate->write_paused = false;
context_t *cw = cstate->current_w;
cstate->current_w = NULL;
context_queue_t *queue = cstate->queue;
hio_t *upstream_io = cw->src_io;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;

if (contextQueueLen(queue) > 0)
{
contextQueuePush(cstate->queue, cpy_ctx);
if (resume_write_queue(cstate))
hio_setcb_write(io, NULL);
contextQueuePush(cstate->queue, cw);
if (!resume_write_queue(cstate))
{
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
}
}
else
{
destroyContext(cpy_ctx);
cstate->write_paused = false;
hio_setcb_write(io, NULL);

destroyContext(cw);
if (upstream_io)
hio_read(upstream_io);
}
Expand Down Expand Up @@ -177,22 +174,21 @@ void tcpConnectorUpStream(tunnel_t *self, context_t *c)
}
else
{
cstate->current_w = c;

int bytes = bufLen(c->payload);
int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = c;
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
if (c->src_io)
hio_read_stop(c->src_io);
hio_setcb_write(cstate->io, on_write_complete);
}
else
{
reuseBuffer(cstate->buffer_pool, cstate->current_w->payload);
cstate->current_w->payload = NULL;
cstate->current_w = NULL;
reuseBuffer(cstate->buffer_pool, c->payload);
c->payload = NULL;
destroyContext(c);
}
}
Expand Down
65 changes: 31 additions & 34 deletions tunnels/adapters/tcp_listener/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,16 @@ static void cleanup(tcp_listener_con_state_t *cstate)
static bool resume_write_queue(tcp_listener_con_state_t *cstate)
{
context_queue_t *queue = (cstate)->queue;
context_t **cw = &((cstate)->current_w);
hio_t *io = cstate->io;
hio_t *last_resumed_io = NULL;
while (contextQueueLen(queue) > 0)
{
*cw = contextQueuePop(queue);
hio_t *upstream_io = (*cw)->src_io;
context_t *cw = contextQueuePop(queue);
hio_t *upstream_io = cw->src_io;

if ((*cw)->payload == NULL)
if (cw->payload == NULL)
{
destroyContext((*cw));
*cw = NULL;
destroyContext(cw);

if (upstream_io && last_resumed_io != upstream_io)
{
Expand All @@ -80,18 +78,18 @@ static bool resume_write_queue(tcp_listener_con_state_t *cstate)
continue;
}

int bytes = bufLen((*cw)->payload);
int nwrite = hio_write(io, rawBuf((*cw)->payload), bytes);
int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, rawBuf(cw->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = cw;
return false; // write pending
}
else
{
reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
contextQueuePush(queue, (*cw));
*cw = NULL;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;
contextQueuePush(queue, cw);
}
}
return true;
Expand All @@ -101,33 +99,32 @@ static void on_write_complete(hio_t *io, const void *buf, int writebytes)
{
// resume the read on other end of the connection
tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *)(hevent_userdata(io));
if (cstate == NULL)
if (cstate == NULL || cstate->current_w == NULL)
return;

context_t **cw = &((cstate)->current_w);
context_queue_t *queue = (cstate)->queue;

hio_t *upstream_io = (*cw)->src_io;
if (hio_write_is_complete(io))
{

reuseBuffer(cstate->buffer_pool, (*cw)->payload);
(*cw)->payload = NULL;
context_t *cpy_ctx = (*cw);
*cw = NULL;
hio_setcb_write(io, NULL);
cstate->write_paused = false;
context_t *cw = cstate->current_w;
cstate->current_w = NULL;
context_queue_t *queue = cstate->queue;
hio_t *upstream_io = cw->src_io;
reuseBuffer(cstate->buffer_pool, cw->payload);
cw->payload = NULL;

if (contextQueueLen(queue) > 0)
{
contextQueuePush(cstate->queue, cpy_ctx);
if (resume_write_queue(cstate))
hio_setcb_write(io, NULL);
contextQueuePush(cstate->queue, cw);
if (!resume_write_queue(cstate))
{
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);
}
}
else
{
destroyContext(cpy_ctx);
cstate->write_paused = false;
hio_setcb_write(io, NULL);

destroyContext(cw);
if (upstream_io)
hio_read(upstream_io);
}
Expand Down Expand Up @@ -187,21 +184,21 @@ static inline void downStream(tunnel_t *self, context_t *c)
}
else
{
cstate->current_w = c;
int bytes = bufLen(c->payload);
int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes);
if (nwrite >= 0 && nwrite < bytes)
{
cstate->current_w = c;
cstate->write_paused = true;
hio_setcb_write(cstate->io, on_write_complete);

if (c->src_io)
hio_read_stop(c->src_io);
hio_setcb_write(cstate->io, on_write_complete);
}
else
{
reuseBuffer(cstate->buffer_pool, cstate->current_w->payload);
cstate->current_w->payload = NULL;
cstate->current_w = NULL;
reuseBuffer(cstate->buffer_pool, c->payload);
c->payload = NULL;
destroyContext(c);
}
}
Expand Down

0 comments on commit e2599f5

Please sign in to comment.