Skip to content

Commit

Permalink
HalfDuplexClient first impl
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 31, 2024
1 parent 00a2651 commit b9ccfa9
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 0 deletions.
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(INCLUDE_SOCKS_5_SERVER "link Socks5Server staticly to the core" TRUE)
option(INCLUDE_REALITY_SERVER "link RealityServer staticly to the core" TRUE)
option(INCLUDE_REALITY_CLIENT "link RealityClient staticly to the core" TRUE)

option(INCLUDE_HALFDUPLEX_CLIENT "link HalfDuplexClient staticly to the core" TRUE)

set(OPENSSL_CONFIGURE_VERBOSE ON)

# add executable
Expand Down Expand Up @@ -320,6 +322,14 @@ target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/cli
target_link_libraries(Waterwall RealityClient)
endif()

#halfduplex client
if (INCLUDE_HALFDUPLEX_CLIENT)
target_compile_definitions(Waterwall PUBLIC INCLUDE_HALFDUPLEX_CLIENT=1)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/client/halfduplex)
target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/client/halfduplex)
target_link_libraries(Waterwall HalfDuplexClient)
endif()



target_compile_definitions(Waterwall PUBLIC WATERWALL_VERSION=${Waterwall_VERSION})
Expand Down
11 changes: 11 additions & 0 deletions core/static_tunnels.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@
#include "tunnels/client/reality/reality_client.h"
#endif

#ifdef INCLUDE_HALFDUPLEX_CLIENT
#include "tunnels/client/halfduplex/halfduplex_client.h"
#endif


void loadStaticTunnelsIntoCore(void)
{
#ifdef INCLUDE_TCP_LISTENER
Expand Down Expand Up @@ -233,4 +238,10 @@ void loadStaticTunnelsIntoCore(void)
#ifdef INCLUDE_REALITY_CLIENT
USING(RealityClient);
#endif


#ifdef INCLUDE_HALFDUPLEX_CLIENT
USING(HalfDuplexClient);
#endif

}
15 changes: 15 additions & 0 deletions tunnels/client/halfduplex/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

add_library(HalfDuplexClient STATIC
halfduplex_client.c

)

#ww api
target_include_directories(HalfDuplexClient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww)
target_link_libraries(HalfDuplexClient PUBLIC ww)

# add dependencies
include(${CMAKE_BINARY_DIR}/cmake/CPM.cmake)


target_compile_definitions(HalfDuplexClient PRIVATE HalfDuplexClient_VERSION=0.1)
252 changes: 252 additions & 0 deletions tunnels/client/halfduplex/halfduplex_client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
#include "halfduplex_client.h"
#include "buffer_pool.h"
#include "context_queue.h"
#include "frand.h"
#include "shiftbuffer.h"
#include "tunnel.h"
#include <stdatomic.h>
#include <stdint.h>
#include <stdlib.h>

typedef struct halfduplex_state_s
{
void *_;
} halfduplex_state_t;

typedef struct halfduplex_con_state_s
{
line_t *main_line;
line_t *upload_line;
line_t *download_line;
//--------------
context_queue_t *write_queue;

} halfduplex_con_state_t;

enum
{
kCmdUpload = 0x1,
kCmdDownload = 0x2,
};

static void cleanup(halfduplex_con_state_t *cstate)
{
if (cstate->write_queue != NULL)
{
destroyContextQueue(cstate->write_queue);
}
if (cstate->upload_line)
{
doneLineDownSide(cstate->upload_line);
destroyLine(cstate->upload_line);
}
if (cstate->download_line)
{
doneLineDownSide(cstate->download_line);
destroyLine(cstate->download_line);
}
doneLineUpSide(cstate->main_line);
free(cstate);
}

static void flushWriteQueue(tunnel_t *self, context_t *c)
{
halfduplex_con_state_t *cstate = CSTATE(c);

while (isAlive(c->line) && contextQueueLen(cstate->write_queue) > 0)
{
self->upStream(self, contextQueuePop(cstate->write_queue));
}
}

static void onMainLinePaused(void *cstate)
{
pauseLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line);
pauseLineUpSide(((halfduplex_con_state_t *) cstate)->download_line);
}

static void onMainLineResumed(void *cstate)
{
resumeLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line);
resumeLineUpSide(((halfduplex_con_state_t *) cstate)->download_line);
}
static void onUDLinePaused(void *cstate)
{
pauseLineDownSide(((halfduplex_con_state_t *) cstate)->main_line);
}

static void onUDLineResumed(void *cstate)
{
resumeLineDownSide(((halfduplex_con_state_t *) cstate)->main_line);
}

static void upStream(tunnel_t *self, context_t *c)
{
halfduplex_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
{
if (c->first)
{
uint16_t cid = (uint16_t) fastRand();
context_t *intro_context = newContext(cstate->download_line);
intro_context->first = true;
intro_context->payload = popBuffer(getContextBufferPool(c));
shiftl(intro_context->payload, 2);
writeUI16(intro_context->payload, cid);
shiftl(intro_context->payload, 1);
writeUI8(intro_context->payload, kCmdDownload);

self->up->upStream(self->up, intro_context);

shiftl(c->payload, 2);
writeUI16(c->payload, cid);
shiftl(intro_context->payload, 1);
writeUI8(intro_context->payload, kCmdUpload);
}
self->up->upStream(self->up, switchLine(c, cstate->upload_line));
}
else
{

if (c->init)
{
halfduplex_con_state_t *cstate = malloc(sizeof(halfduplex_con_state_t));

*cstate = (halfduplex_con_state_t){.download_line = newLine(c->line->tid),
.upload_line = newLine(c->line->tid),
.main_line = c->line,
.write_queue = newContextQueue(getContextBufferPool(c))};

CSTATE_MUT(c) = cstate;
LSTATE_MUT(cstate->upload_line) = cstate;
LSTATE_MUT(cstate->download_line) = cstate;

lockLine(cstate->upload_line);
self->up->upStream(self->up, newInitContext(cstate->upload_line));

if (! isAlive(cstate->upload_line))
{
unLockLine(cstate->upload_line);

CSTATE_MUT(c) = NULL;
LSTATE_MUT(cstate->upload_line) = NULL;
LSTATE_MUT(cstate->download_line) = NULL;
cleanup(cstate);

self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
return;
}
unLockLine(cstate->upload_line);

lockLine(cstate->download_line);
self->up->upStream(self->up, newInitContext(cstate->download_line));

if (! isAlive(cstate->download_line))
{
unLockLine(cstate->download_line);
self->up->upStream(self->up, newFinContext(cstate->upload_line));

CSTATE_MUT(c) = NULL;
LSTATE_MUT(cstate->upload_line) = NULL;
LSTATE_MUT(cstate->download_line) = NULL;
cleanup(cstate);

self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
return;
}
unLockLine(cstate->download_line);
setupLineUpSide(cstate->main_line, onMainLinePaused, cstate, onMainLineResumed);
setupLineDownSide(cstate->upload_line, onUDLinePaused, cstate, onUDLineResumed);
setupLineDownSide(cstate->download_line, onUDLinePaused, cstate, onUDLineResumed);
destroyContext(c);
}
else if (c->fin)
{
self->up->upStream(self->up, newFinContext(cstate->upload_line));
self->up->upStream(self->up, newFinContext(cstate->download_line));

CSTATE_MUT(c) = NULL;
LSTATE_MUT(cstate->upload_line) = NULL;
LSTATE_MUT(cstate->download_line) = NULL;
cleanup(cstate);

destroyContext(c);
}
}
}
static void downStream(tunnel_t *self, context_t *c)
{
halfduplex_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
{
self->dw->downStream(self->dw, switchLine(c, cstate->main_line));
}
else
{

if (c->fin)
{
if (c->line == cstate->download_line)
{
self->up->upStream(self->up, newFinContext(cstate->upload_line));
self->dw->downStream(self->dw, newFinContext(cstate->main_line));
}
else
{
assert(c->line == cstate->upload_line);
self->up->upStream(self->up, newFinContext(cstate->download_line));
self->dw->downStream(self->dw, newFinContext(cstate->main_line));
}

CSTATE_MUT(c) = NULL;
LSTATE_MUT(cstate->upload_line) = NULL;
LSTATE_MUT(cstate->download_line) = NULL;
cleanup(cstate);
destroyContext(c);
}
else
{
if (c->line == cstate->download_line)
{
self->dw->downStream(self->dw, switchLine(c, cstate->main_line));
}
else
{
destroyContext(c);
}
}
}
}

tunnel_t *newHalfDuplexClient(node_instance_context_t *instance_info)
{
(void) instance_info;
halfduplex_state_t *state = malloc(sizeof(halfduplex_state_t));
memset(state, 0, sizeof(halfduplex_state_t));

tunnel_t *t = newTunnel();
t->state = state;
t->upStream = &upStream;
t->downStream = &downStream;

return t;
}

api_result_t apiHalfDuplexClient(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
}

tunnel_t *destroyHalfDuplexClient(tunnel_t *self)
{
(void) (self);
return NULL;
}
tunnel_metadata_t getMetadataHalfDuplexClient(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
}
11 changes: 11 additions & 0 deletions tunnels/client/halfduplex/halfduplex_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once
#include "api.h"

// -------> upload con
// con <------> HalfDuplexClient
// <------- download con

tunnel_t *newHalfDuplexClient(node_instance_context_t *instance_info);
api_result_t apiHalfDuplexClient(tunnel_t *self, const char *msg);
tunnel_t *destroyHalfDuplexClient(tunnel_t *self);
tunnel_metadata_t getMetadataHalfDuplexClient(void);

0 comments on commit b9ccfa9

Please sign in to comment.