Skip to content

Commit

Permalink
rework grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 26, 2024
1 parent 2b1ce58 commit bee2c65
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 59 deletions.
55 changes: 25 additions & 30 deletions tunnels/client/protobuf/protobuf_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum

typedef struct protobuf_client_state_s
{
void*_;
void *_;
} protobuf_client_state_t;

typedef struct protobuf_client_con_state_s
Expand Down Expand Up @@ -73,59 +73,54 @@ static void downStream(tunnel_t *self, context_t *c)
if (c->payload != NULL)
{
buffer_stream_t *bstream = cstate->stream_buf;
bufferStreamPush(cstate->stream_buf, c->payload);
bufferStreamPush(bstream, c->payload);
c->payload = NULL;

while (true)
{
if (bufferStreamLen(bstream) < 3)
if (bufferStreamLen(bstream) < 1)
{
destroyContext(c);
return;
}
unsigned int read_len = (bufferStreamLen(bstream) >= 4 ? 4 : 2);
unsigned char uleb_encoded_buf[4];
bufferStreamViewBytesAt(bstream, 1, uleb_encoded_buf, read_len);
// if (uleb_encoded_buf[0] != '\n')
// {
// LOGE("ProtoBufClient: rejected, invalid data");
// goto disconnect;
// }

size_t data_len = 0;
size_t bytes_passed = readUleb128ToUint64(uleb_encoded_buf, uleb_encoded_buf + read_len, &data_len);
if (data_len == 0)
shift_buffer_t *full_data = bufferStreamFullRead(bstream);
const uint8_t *uleb_data = &(((uint8_t *) rawBuf(full_data))[1]); // first byte is \n (protobuf)
size_t data_len = 0;
size_t bytes_passed = readUleb128ToUint64(uleb_data, uleb_data + bufLen(full_data) - 1, &data_len);
if (data_len == 0 || (bufLen(full_data) - (bytes_passed + 1)) < data_len)
{
if (uleb_encoded_buf[0] == 0x0)
{
LOGE("ProtoBufClient: rejected, invalid data");
goto disconnect;
}

// keep waiting for more data to come
bufferStreamPush(bstream, full_data);
destroyContext(c);
return;
}

if (data_len > kMaxPacketSize)
{
LOGE("ProtoBufClient: rejected, size too large %zu (%zu passed %lu left)", data_len, bytes_passed,
bufferStreamLen(bstream));
LOGE("ProtoBufServer: rejected, size too large");
goto disconnect;
}
if (! (bufferStreamLen(bstream) >= 1 + bytes_passed + data_len))
{
destroyContext(c);
return;
}

context_t *downstream_ctx = newContextFrom(c);
downstream_ctx->payload = bufferStreamRead(cstate->stream_buf, 1 + bytes_passed + data_len);
downstream_ctx->payload = shallowSliceBuffer(full_data, data_len + (bytes_passed + 1));
shiftr(downstream_ctx->payload, 1 + bytes_passed);

if (bufLen(full_data) > 0)
{
bufferStreamPush(bstream, full_data);
}
else
{
reuseBuffer(getContextBufferPool(c), full_data);
}

if (! cstate->first_sent)
{
downstream_ctx->first = true;
cstate->first_sent = true;
}

self->dw->downStream(self->dw, downstream_ctx);

if (! isAlive(c->line))
{
destroyContext(c);
Expand Down
55 changes: 26 additions & 29 deletions tunnels/server/protobuf/protobuf_server.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "protobuf_server.h"
#include "basic_types.h"
#include "buffer_pool.h"
#include "buffer_stream.h"
#include "loggers/network_logger.h"
#include "node.h"
Expand All @@ -8,6 +9,7 @@
#include "uleb128.h"
#include <stdatomic.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
/*
we shall not use nanopb or any protobuf lib because they need atleast 1 memcopy
Expand Down Expand Up @@ -44,59 +46,54 @@ static void upStream(tunnel_t *self, context_t *c)
if (c->payload != NULL)
{
buffer_stream_t *bstream = cstate->stream_buf;
bufferStreamPush(cstate->stream_buf, c->payload);
bufferStreamPush(bstream, c->payload);
c->payload = NULL;

while (true)
{
if (bufferStreamLen(bstream) < 3)
if (bufferStreamLen(bstream) < 1)
{
destroyContext(c);
return;
}
unsigned int read_len = (bufferStreamLen(bstream) >= 4 ? 4 : 2);
unsigned char uleb_encoded_buf[4];
bufferStreamViewBytesAt(bstream, 1, uleb_encoded_buf, read_len);
// if (uleb_encoded_buf[0] != '\n')
// {
// LOGE("ProtoBufServer: rejected, invalid data");
// goto disconnect;
// }
size_t data_len = 0;
size_t bytes_passed = readUleb128ToUint64(uleb_encoded_buf, uleb_encoded_buf + read_len, &data_len);
if (data_len == 0)
shift_buffer_t *full_data = bufferStreamFullRead(bstream);
const uint8_t *uleb_data = &(((uint8_t *) rawBuf(full_data))[1]); // first byte is \n (protobuf)
size_t data_len = 0;
size_t bytes_passed = readUleb128ToUint64(uleb_data, uleb_data + bufLen(full_data) - 1, &data_len);
if (data_len == 0 || (bufLen(full_data) - (bytes_passed + 1)) < data_len)
{
if (uleb_encoded_buf[0] == 0x0)
{
LOGE("ProtoBufServer: rejected, invalid data");

goto disconnect;
}

// keep waiting for more data to come
bufferStreamPush(bstream, full_data);
destroyContext(c);
return;
}

if (data_len > kMaxPacketSize)
{
LOGE("ProtoBufServer: rejected, size too large %zu (%zu passed %lu left)", data_len, bytes_passed,
bufferStreamLen(bstream));
LOGE("ProtoBufServer: rejected, size too large");
goto disconnect;
}
if (! (bufferStreamLen(bstream) >= 1 + bytes_passed + data_len))
{
destroyContext(c);
return;
}

context_t *upstream_ctx = newContextFrom(c);
upstream_ctx->payload = bufferStreamRead(cstate->stream_buf, 1 + bytes_passed + data_len);
upstream_ctx->payload = shallowSliceBuffer(full_data, data_len + (bytes_passed + 1));
shiftr(upstream_ctx->payload, 1 + bytes_passed);

if (bufLen(full_data) > 0)
{
bufferStreamPush(bstream, full_data);
}
else
{
reuseBuffer(getContextBufferPool(c), full_data);
}

if (! cstate->first_sent)
{
upstream_ctx->first = true;
cstate->first_sent = true;
}

self->up->upStream(self->up, upstream_ctx);

if (! isAlive(c->line))
{
destroyContext(c);
Expand Down

0 comments on commit bee2c65

Please sign in to comment.