Skip to content

Commit

Permalink
rework pipes/basic work is done + format
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 13, 2024
1 parent 4070700 commit 3421a89
Show file tree
Hide file tree
Showing 28 changed files with 416 additions and 443 deletions.
132 changes: 39 additions & 93 deletions tunnels/adapters/connector/tcp/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "hsocket.h"
#include "loggers/network_logger.h"
#include "sync_dns.h"
#include "tunnel.h"
#include "types.h"
#include "utils/jsonutils.h"
#include "utils/sockutils.h"
Expand All @@ -12,18 +13,14 @@ static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue)
if (cstate->io)
{
hevent_set_userdata(cstate->io, NULL);
hio_close(cstate->io);
}

hio_t *last_resumed_io = NULL;

while (contextQueueLen(cstate->data_queue) > 0)
{
// all data must be written before sending fin, event loop will hold them for us
context_t *cw = contextQueuePop(cstate->data_queue);
if (cw->src_io != NULL && last_resumed_io != cw->src_io)
{
last_resumed_io = cw->src_io;
hio_read(cw->src_io);
}

if (write_queue)
{
hio_write(cstate->io, cw->payload);
Expand All @@ -36,54 +33,28 @@ static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue)
destroyContext(cw);
}

while (contextQueueLen(cstate->finished_queue) > 0)
{
context_t *cw = contextQueuePop(cstate->finished_queue);
if (cw->src_io != NULL && last_resumed_io != cw->src_io)
{
last_resumed_io = cw->src_io;
hio_read(cw->src_io);
}

destroyContext(cw);
}

destroyContextQueue(cstate->data_queue);
destroyContextQueue(cstate->finished_queue);
doneLineUpSide(cstate->line);
free(cstate);
}

static bool resumeWriteQueue(tcp_connector_con_state_t *cstate)
{
context_queue_t *data_queue = (cstate)->data_queue;
context_queue_t *finished_queue = (cstate)->finished_queue;
hio_t *io = cstate->io;
context_queue_t *data_queue = (cstate)->data_queue;
hio_t *io = cstate->io;
while (contextQueueLen(data_queue) > 0)
{
context_t *cw = contextQueuePop(data_queue);
unsigned int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, cw->payload);
cw->payload = NULL;
contextQueuePush(cstate->finished_queue, cw);
destroyContext(cw);
if (nwrite >= 0 && nwrite < bytes)
{
return false; // write pending
}
}

// data data_queue is empty
hio_t *last_resumed_io = NULL;
while (contextQueueLen(finished_queue) > 0)
{
context_t *cw = contextQueuePop(finished_queue);
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
hio_read(upstream_io);
}
destroyContext(cw);
}
return true;
}
static void onWriteComplete(hio_t *io)
Expand All @@ -97,35 +68,17 @@ static void onWriteComplete(hio_t *io)

if (hio_write_is_complete(io))
{
hio_setcb_write(cstate->io, NULL);
cstate->write_paused = false;

context_queue_t *data_queue = cstate->data_queue;
context_queue_t *finished_queue = cstate->finished_queue;
if (contextQueueLen(data_queue) > 0)
{
if (! resumeWriteQueue(cstate))
{
hio_setcb_write(cstate->io, onWriteComplete);
cstate->write_paused = true;
return;
}
}
hio_t *last_resumed_io = NULL;
while (contextQueueLen(finished_queue) > 0)
context_queue_t *data_queue = cstate->data_queue;
if (contextQueueLen(data_queue) > 0 && ! resumeWriteQueue(cstate))
{
context_t *cw = contextQueuePop(finished_queue);
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
hio_read(upstream_io);
}
destroyContext(cw);
return;
}
hio_setcb_write(cstate->io, NULL);
cstate->write_paused = false;
resumeLineDownSide(cstate->line);
}
}

static void onRecv(hio_t *io, shift_buffer_t *buf)
{
tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *) (hevent_userdata(io));
Expand All @@ -139,7 +92,6 @@ static void onRecv(hio_t *io, shift_buffer_t *buf)
line_t *line = (cstate)->line;

context_t *context = newContext(line);
context->src_io = io;
context->payload = payload;
self->downStream(self, context);
}
Expand All @@ -163,6 +115,15 @@ static void onClose(hio_t *io)
self->downStream(self, context);
}
}
static void onLinePaused(void *cstate)
{
hio_read_stop(((tcp_connector_con_state_t *) cstate)->io);
}

static void onLineResumed(void *cstate)
{
hio_read(((tcp_connector_con_state_t *) cstate)->io);
}

static void onOutBoundConnected(hio_t *upstream_io)
{
Expand Down Expand Up @@ -190,10 +151,8 @@ static void onOutBoundConnected(hio_t *upstream_io)
LOGD("TcpConnector: connection succeed FD:%x [%s] => [%s]", (int) hio_fd(upstream_io),
SOCKADDR_STR(hio_localaddr(upstream_io), localaddrstr), SOCKADDR_STR(hio_peeraddr(upstream_io), peeraddrstr));

context_t *est_context = newContext(line);
est_context->est = true;
est_context->src_io = upstream_io;
self->downStream(self, est_context);
setupLineUpSide(line, onLinePaused, cstate, onLineResumed);
self->downStream(self, newEstContext(line));
}

static void upStream(tunnel_t *self, context_t *c)
Expand All @@ -204,52 +163,42 @@ static void upStream(tunnel_t *self, context_t *c)
{
if (cstate->write_paused)
{
if (c->src_io)
{
hio_read_stop(c->src_io);
}
pauseLineDownSide(c->line);
contextQueuePush(cstate->data_queue, c);
}
else
{
unsigned int bytes = bufLen(c->payload);
int nwrite = hio_write(cstate->io, c->payload);
c->payload = NULL;
destroyContext(c);

if (nwrite >= 0 && nwrite < bytes)
{
if (c->src_io)
{
hio_read_stop(c->src_io);
}
contextQueuePush(cstate->finished_queue, c);
pauseLineDownSide(c->line);
cstate->write_paused = true;
hio_setcb_write(cstate->io, onWriteComplete);
}
else
{
destroyContext(c);
}
}
}
else
{
if (c->init)
{
tcp_connector_state_t *state = STATE(self);
CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t));
memset(CSTATE(c), 0, sizeof(tcp_connector_con_state_t));
tcp_connector_state_t *state = STATE(self);
CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t));
tcp_connector_con_state_t *cstate = CSTATE(c);

*cstate = (tcp_connector_con_state_t){.buffer_pool = getContextBufferPool(c),
.tunnel = self,
.line = c->line,
.data_queue = newContextQueue(cstate->buffer_pool),
.write_paused = true};

#ifdef PROFILE
gettimeofday(&(cstate->__profile_conenct), NULL);
#endif

cstate->buffer_pool = getContextBufferPool(c);
cstate->tunnel = self;
cstate->line = c->line;
cstate->data_queue = newContextQueue(cstate->buffer_pool);
cstate->finished_queue = newContextQueue(cstate->buffer_pool);
cstate->write_paused = true;

socket_context_t *dest_ctx = &(c->line->dest_ctx);
socket_context_t *src_ctx = &(c->line->src_ctx);
switch ((enum tcp_connector_dynamic_value_status) state->dest_addr_selected.status)
Expand Down Expand Up @@ -320,7 +269,6 @@ static void upStream(tunnel_t *self, context_t *c)
hio_set_peeraddr(upstream_io, &(dest_ctx->address.sa), (int) sockaddr_len(&(dest_ctx->address)));
cstate->io = upstream_io;
hevent_set_userdata(upstream_io, cstate);

hio_setcb_connect(upstream_io, onOutBoundConnected);
hio_setcb_close(upstream_io, onClose);
hio_connect(upstream_io);
Expand All @@ -330,11 +278,8 @@ static void upStream(tunnel_t *self, context_t *c)
{
hio_t *io = cstate->io;
CSTATE_MUT(c) = NULL;
contextQueueNotifyIoRemoved(cstate->data_queue, c->src_io);
contextQueueNotifyIoRemoved(cstate->finished_queue, c->src_io);
cleanup(cstate, true);
destroyContext(c);
hio_close(io);
}
}
return;
Expand Down Expand Up @@ -372,6 +317,7 @@ static void downStream(tunnel_t *self, context_t *c)
if (resumeWriteQueue(cstate))
{
cstate->write_paused = false;
resumeLineDownSide(cstate->line);
}
else
{
Expand Down
15 changes: 6 additions & 9 deletions tunnels/adapters/connector/tcp/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ typedef struct tcp_connector_con_state_s
struct timeval __profile_conenct;
#endif

tunnel_t *tunnel;
line_t * line;
hio_t * io;

buffer_pool_t * buffer_pool;
tunnel_t *tunnel;
line_t *line;
hio_t *io;
buffer_pool_t *buffer_pool;
context_queue_t *data_queue;
context_queue_t *finished_queue;

bool write_paused;
bool established;
bool write_paused;
bool established;
} tcp_connector_con_state_t;
2 changes: 0 additions & 2 deletions tunnels/adapters/connector/udp/udp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf)
cstate->established = true;
context_t *est_context = newContext(line);
est_context->est = true;
est_context->src_io = io;
self->downStream(self, est_context);
if (hevent_userdata(io) == NULL)
{
Expand All @@ -46,7 +45,6 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf)
}

context_t *context = newContext(line);
context->src_io = io;
context->payload = payload;

self->downStream(self, context);
Expand Down
Loading

0 comments on commit 3421a89

Please sign in to comment.