diff --git a/tunnels/client/protobuf/protobuf_client.c b/tunnels/client/protobuf/protobuf_client.c index a0ed5032..3aab8696 100644 --- a/tunnels/client/protobuf/protobuf_client.c +++ b/tunnels/client/protobuf/protobuf_client.c @@ -21,7 +21,7 @@ enum typedef struct protobuf_client_state_s { - void*_; + void *_; } protobuf_client_state_t; typedef struct protobuf_client_con_state_s @@ -73,59 +73,54 @@ static void downStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { buffer_stream_t *bstream = cstate->stream_buf; - bufferStreamPush(cstate->stream_buf, c->payload); + bufferStreamPush(bstream, c->payload); c->payload = NULL; while (true) { - if (bufferStreamLen(bstream) < 3) + if (bufferStreamLen(bstream) < 1) { destroyContext(c); return; } - unsigned int read_len = (bufferStreamLen(bstream) >= 4 ? 4 : 2); - unsigned char uleb_encoded_buf[4]; - bufferStreamViewBytesAt(bstream, 1, uleb_encoded_buf, read_len); - // if (uleb_encoded_buf[0] != '\n') - // { - // LOGE("ProtoBufClient: rejected, invalid data"); - // goto disconnect; - // } - - size_t data_len = 0; - size_t bytes_passed = readUleb128ToUint64(uleb_encoded_buf, uleb_encoded_buf + read_len, &data_len); - if (data_len == 0) + shift_buffer_t *full_data = bufferStreamFullRead(bstream); + const uint8_t *uleb_data = &(((uint8_t *) rawBuf(full_data))[1]); // first byte is \n (protobuf) + size_t data_len = 0; + size_t bytes_passed = readUleb128ToUint64(uleb_data, uleb_data + bufLen(full_data) - 1, &data_len); + if (data_len == 0 || (bufLen(full_data) - (bytes_passed + 1)) < data_len) { - if (uleb_encoded_buf[0] == 0x0) - { - LOGE("ProtoBufClient: rejected, invalid data"); - goto disconnect; - } - - // keep waiting for more data to come + bufferStreamPush(bstream, full_data); destroyContext(c); return; } + if (data_len > kMaxPacketSize) { - LOGE("ProtoBufClient: rejected, size too large %zu (%zu passed %lu left)", data_len, bytes_passed, - bufferStreamLen(bstream)); + LOGE("ProtoBufServer: rejected, size too large"); goto disconnect; } - if (! (bufferStreamLen(bstream) >= 1 + bytes_passed + data_len)) - { - destroyContext(c); - return; - } + context_t *downstream_ctx = newContextFrom(c); - downstream_ctx->payload = bufferStreamRead(cstate->stream_buf, 1 + bytes_passed + data_len); + downstream_ctx->payload = shallowSliceBuffer(full_data, data_len + (bytes_passed + 1)); shiftr(downstream_ctx->payload, 1 + bytes_passed); + + if (bufLen(full_data) > 0) + { + bufferStreamPush(bstream, full_data); + } + else + { + reuseBuffer(getContextBufferPool(c), full_data); + } + if (! cstate->first_sent) { downstream_ctx->first = true; cstate->first_sent = true; } + self->dw->downStream(self->dw, downstream_ctx); + if (! isAlive(c->line)) { destroyContext(c); diff --git a/tunnels/server/protobuf/protobuf_server.c b/tunnels/server/protobuf/protobuf_server.c index a423752e..adeb281d 100644 --- a/tunnels/server/protobuf/protobuf_server.c +++ b/tunnels/server/protobuf/protobuf_server.c @@ -1,5 +1,6 @@ #include "protobuf_server.h" #include "basic_types.h" +#include "buffer_pool.h" #include "buffer_stream.h" #include "loggers/network_logger.h" #include "node.h" @@ -8,6 +9,7 @@ #include "uleb128.h" #include #include +#include #include /* we shall not use nanopb or any protobuf lib because they need atleast 1 memcopy @@ -44,59 +46,54 @@ static void upStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { buffer_stream_t *bstream = cstate->stream_buf; - bufferStreamPush(cstate->stream_buf, c->payload); + bufferStreamPush(bstream, c->payload); c->payload = NULL; while (true) { - if (bufferStreamLen(bstream) < 3) + if (bufferStreamLen(bstream) < 1) { destroyContext(c); return; } - unsigned int read_len = (bufferStreamLen(bstream) >= 4 ? 4 : 2); - unsigned char uleb_encoded_buf[4]; - bufferStreamViewBytesAt(bstream, 1, uleb_encoded_buf, read_len); - // if (uleb_encoded_buf[0] != '\n') - // { - // LOGE("ProtoBufServer: rejected, invalid data"); - // goto disconnect; - // } - size_t data_len = 0; - size_t bytes_passed = readUleb128ToUint64(uleb_encoded_buf, uleb_encoded_buf + read_len, &data_len); - if (data_len == 0) + shift_buffer_t *full_data = bufferStreamFullRead(bstream); + const uint8_t *uleb_data = &(((uint8_t *) rawBuf(full_data))[1]); // first byte is \n (protobuf) + size_t data_len = 0; + size_t bytes_passed = readUleb128ToUint64(uleb_data, uleb_data + bufLen(full_data) - 1, &data_len); + if (data_len == 0 || (bufLen(full_data) - (bytes_passed + 1)) < data_len) { - if (uleb_encoded_buf[0] == 0x0) - { - LOGE("ProtoBufServer: rejected, invalid data"); - - goto disconnect; - } - - // keep waiting for more data to come + bufferStreamPush(bstream, full_data); destroyContext(c); return; } + if (data_len > kMaxPacketSize) { - LOGE("ProtoBufServer: rejected, size too large %zu (%zu passed %lu left)", data_len, bytes_passed, - bufferStreamLen(bstream)); + LOGE("ProtoBufServer: rejected, size too large"); goto disconnect; } - if (! (bufferStreamLen(bstream) >= 1 + bytes_passed + data_len)) - { - destroyContext(c); - return; - } + context_t *upstream_ctx = newContextFrom(c); - upstream_ctx->payload = bufferStreamRead(cstate->stream_buf, 1 + bytes_passed + data_len); + upstream_ctx->payload = shallowSliceBuffer(full_data, data_len + (bytes_passed + 1)); shiftr(upstream_ctx->payload, 1 + bytes_passed); + + if (bufLen(full_data) > 0) + { + bufferStreamPush(bstream, full_data); + } + else + { + reuseBuffer(getContextBufferPool(c), full_data); + } + if (! cstate->first_sent) { upstream_ctx->first = true; cstate->first_sent = true; } + self->up->upStream(self->up, upstream_ctx); + if (! isAlive(c->line)) { destroyContext(c);