Skip to content

Commit

Permalink
feat: mutualize code with multicast
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Nov 28, 2023
1 parent 788eb07 commit 79ecd1d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 369 deletions.
3 changes: 0 additions & 3 deletions include/zenoh-pico/transport/raweth/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "zenoh-pico/api/types.h"

void _zp_raweth_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback);
void _zp_raweth_info_session(const _z_transport_t *zt, _z_config_t *ps);

int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param);
int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl,
const _z_id_t *local_zid);
Expand Down
87 changes: 19 additions & 68 deletions src/transport/raweth/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/transport/multicast/rx.h"
#include "zenoh-pico/transport/raweth/rx.h"
#include "zenoh-pico/transport/unicast/rx.h"
#include "zenoh-pico/utils/logging.h"
Expand All @@ -32,10 +33,9 @@ int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) {
_z_transport_message_t t_msg;
ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr);
if (ret == _Z_RES_OK) {
ret = _z_raweth_handle_transport_message(ztm, &t_msg, &addr);
ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr);
_z_t_msg_clear(&t_msg);
}

return ret;
}

Expand All @@ -61,80 +61,31 @@ int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) {
void *_zp_raweth_read_task(void *ztm_arg) {
#if Z_FEATURE_MULTI_THREAD == 1
_z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg;
_z_transport_message_t t_msg;
_z_bytes_t addr = _z_bytes_wrap(NULL, 0);

// Acquire and keep the lock
_z_mutex_lock(&ztm->_mutex_rx);

// Prepare the buffer
_z_zbuf_reset(&ztm->_zbuf);

_z_bytes_t addr = _z_bytes_wrap(NULL, 0);
// Task loop
while (ztm->_read_task_running == true) {
// Read bytes from socket to the main buffer
size_t to_read = 0;

switch (ztm->_link._cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr);
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_bytes_clear(&addr);
_z_zbuf_compact(&ztm->_zbuf);
continue;
}
}

for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8);
}

if (_z_zbuf_len(&ztm->_zbuf) < to_read) {
_z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL);
if (_z_zbuf_len(&ztm->_zbuf) < to_read) {
_z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE);
_z_zbuf_compact(&ztm->_zbuf);
continue;
}
}
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztm->_zbuf);
to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr);
if (to_read == SIZE_MAX) {
continue;
}
break;
default:
break;
// Read message from link
int8_t ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr);
if (ret == _Z_ERR_TRANSPORT_RX_FAILED) {
continue;
} else {
_Z_ERROR("Connection closed due to malformed message\n");
ztm->_read_task_running = false;
continue;
}
// Wrap the main buffer for to_read bytes
_z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read);

while (_z_zbuf_len(&zbuf) > 0) {
int8_t ret = _Z_RES_OK;

// Decode one session message
_z_transport_message_t t_msg;
ret = _z_transport_message_decode(&t_msg, &zbuf);
if (ret == _Z_RES_OK) {
ret = _z_raweth_handle_transport_message(ztm, &t_msg, &addr);

if (ret == _Z_RES_OK) {
_z_t_msg_clear(&t_msg);
_z_bytes_clear(&addr);
} else {
ztm->_read_task_running = false;
continue;
}
} else {
_Z_ERROR("Connection closed due to malformed message\n");
ztm->_read_task_running = false;
continue;
}
// Process message
if (_z_multicast_handle_transport_message(ztm, &t_msg, &addr) != _Z_RES_OK) {
ztm->_read_task_running = false;
continue;
}

// Move the read position of the read buffer
_z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read);
_z_t_msg_clear(&t_msg);
_z_bytes_clear(&addr);
}

_z_mutex_unlock(&ztm->_mutex_rx);
Expand Down
Loading

0 comments on commit 79ecd1d

Please sign in to comment.