diff --git a/include/zenoh-pico/transport/raweth/transport.h b/include/zenoh-pico/transport/raweth/transport.h index 721a2a68d..a91eba18a 100644 --- a/include/zenoh-pico/transport/raweth/transport.h +++ b/include/zenoh-pico/transport/raweth/transport.h @@ -17,9 +17,6 @@ #include "zenoh-pico/api/types.h" -void _zp_raweth_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback); -void _zp_raweth_info_session(const _z_transport_t *zt, _z_config_t *ps); - int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param); int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, const _z_id_t *local_zid); diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index 5bc727a03..cf4761f56 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -19,6 +19,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/transport/multicast/rx.h" #include "zenoh-pico/transport/raweth/rx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" @@ -32,10 +33,9 @@ int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { _z_transport_message_t t_msg; ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr); if (ret == _Z_RES_OK) { - ret = _z_raweth_handle_transport_message(ztm, &t_msg, &addr); + ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); _z_t_msg_clear(&t_msg); } - return ret; } @@ -61,80 +61,31 @@ int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { void *_zp_raweth_read_task(void *ztm_arg) { #if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; + _z_transport_message_t t_msg; + _z_bytes_t addr = _z_bytes_wrap(NULL, 0); // Acquire and keep the lock _z_mutex_lock(&ztm->_mutex_rx); - // Prepare the buffer _z_zbuf_reset(&ztm->_zbuf); - - _z_bytes_t addr = _z_bytes_wrap(NULL, 0); + // Task loop while (ztm->_read_task_running == true) { - // Read bytes from socket to the main buffer - size_t to_read = 0; - - switch (ztm->_link._cap._flow) { - case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_bytes_clear(&addr); - _z_zbuf_compact(&ztm->_zbuf); - continue; - } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); - continue; - } - } - break; - case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (to_read == SIZE_MAX) { - continue; - } - break; - default: - break; + // Read message from link + int8_t ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr); + if (ret == _Z_ERR_TRANSPORT_RX_FAILED) { + continue; + } else { + _Z_ERROR("Connection closed due to malformed message\n"); + ztm->_read_task_running = false; + continue; } - // Wrap the main buffer for to_read bytes - _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); - - while (_z_zbuf_len(&zbuf) > 0) { - int8_t ret = _Z_RES_OK; - - // Decode one session message - _z_transport_message_t t_msg; - ret = _z_transport_message_decode(&t_msg, &zbuf); - if (ret == _Z_RES_OK) { - ret = _z_raweth_handle_transport_message(ztm, &t_msg, &addr); - - if (ret == _Z_RES_OK) { - _z_t_msg_clear(&t_msg); - _z_bytes_clear(&addr); - } else { - ztm->_read_task_running = false; - continue; - } - } else { - _Z_ERROR("Connection closed due to malformed message\n"); - ztm->_read_task_running = false; - continue; - } + // Process message + if (_z_multicast_handle_transport_message(ztm, &t_msg, &addr) != _Z_RES_OK) { + ztm->_read_task_running = false; + continue; } - - // Move the read position of the read buffer - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); + _z_t_msg_clear(&t_msg); + _z_bytes_clear(&addr); } _z_mutex_unlock(&ztm->_mutex_rx); diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 64ddf2ff9..ef189b4f4 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -32,7 +32,7 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr); // Check validity if ((rb == SIZE_MAX) || (rb < sizeof(_zp_eth_header_t))) { - return rb; + return SIZE_MAX; } // Check if header has vlan _Bool has_vlan = false; @@ -42,7 +42,7 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z } // Check validity if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) { - return rb; + return SIZE_MAX; } // Update buffer but skip eth header _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb); @@ -54,23 +54,6 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z return rb; } -static _z_transport_peer_entry_t *_z_find_peer_entry(_z_transport_peer_entry_list_t *l, _z_bytes_t *remote_addr) { - _z_transport_peer_entry_t *ret = NULL; - - _z_transport_peer_entry_list_t *xs = l; - for (; xs != NULL; xs = _z_transport_peer_entry_list_tail(xs)) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(xs); - if (val->_remote_addr.len != remote_addr->len) { - continue; - } - - if (memcmp(val->_remote_addr.start, remote_addr->start, remote_addr->len) == 0) { - ret = val; - } - } - return ret; -} - /*------------------ Reception helper ------------------*/ int8_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { _Z_DEBUG(">> recv session msg\n"); @@ -81,44 +64,22 @@ int8_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_messa _z_mutex_lock(&ztm->_mutex_rx); #endif // Z_FEATURE_MULTI_THREAD == 1 - size_t to_read = 0; - do { - switch (ztm->_link._cap._flow) { - case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_raweth_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; - } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; - } - } - break; - // Datagram capable links - case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } - break; - default: - break; + switch (ztm->_link._cap._flow) { + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: { + _z_zbuf_compact(&ztm->_zbuf); + // Read from link + size_t to_read = _z_raweth_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; } - } while (false); // The 1-iteration loop to use continue to break the entire loop on error - + default: + ret = _Z_ERR_GENERIC; + break; + } + // Decode message if (ret == _Z_RES_OK) { _Z_DEBUG(">> \t transport_message_decode: %ju\n", (uintmax_t)_z_zbuf_len(&ztm->_zbuf)); ret = _z_transport_message_decode(t_msg, &ztm->_zbuf); @@ -135,218 +96,6 @@ int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_ return _z_raweth_recv_t_msg_na(ztm, t_msg, addr); } -int8_t _z_raweth_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, - _z_bytes_t *addr) { - int8_t ret = _Z_RES_OK; -#if Z_FEATURE_MULTI_THREAD == 1 - // Acquire and keep the lock - _z_mutex_lock(&ztm->_mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Mark the session that we have received data from this peer - _z_transport_peer_entry_t *entry = _z_find_peer_entry(ztm->_peers, addr); - switch (_Z_MID(t_msg->_header)) { - case _Z_MID_T_FRAME: { - _Z_INFO("Received _Z_FRAME message\n"); - if (entry == NULL) { - break; - } - entry->_received = true; - - // Check if the SN is correct - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { - // @TODO: amend once reliability is in place. For the time being only - // monotonic SNs are ensured - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == - true) { - entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; - } else { - _z_wbuf_clear(&entry->_dbuf_reliable); - _Z_INFO("Reliable message dropped because it is out of order\n"); - break; - } - } else { - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, - t_msg->_body._frame._sn) == true) { - entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; - } else { - _z_wbuf_clear(&entry->_dbuf_best_effort); - _Z_INFO("Best effort message dropped because it is out of order\n"); - break; - } - } - - // Handle all the zenoh message, one by one - uint16_t mapping = entry->_peer_id; - size_t len = _z_vec_len(&t_msg->_body._frame._messages); - for (size_t i = 0; i < len; i++) { - _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); - _z_msg_fix_mapping(zm, mapping); - _z_handle_network_message(ztm->_session, zm, mapping); - } - - break; - } - - case _Z_MID_T_FRAGMENT: { - _Z_INFO("Received Z_FRAGMENT message\n"); - if (entry == NULL) { - break; - } - entry->_received = true; - - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &entry->_dbuf_reliable - : &entry->_dbuf_best_effort; // Select the right defragmentation buffer - - _Bool drop = false; - if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { - // Filling the wbuf capacity as a way to signaling the last fragment to reset the dbuf - // Otherwise, last (smaller) fragments can be understood as a complete message - _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf)); - drop = true; - } else { - _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, - t_msg->_body._fragment._payload.len); - } - - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_M) == false) { - if (drop == true) { // Drop message if it exceeds the fragmentation size - _z_wbuf_reset(dbuf); - break; - } - - _z_zbuf_t zbf = _z_wbuf_to_zbuf(dbuf); // Convert the defragmentation buffer into a decoding buffer - - _z_zenoh_message_t zm; - ret = _z_network_message_decode(&zm, &zbf); - if (ret == _Z_RES_OK) { - uint16_t mapping = entry->_peer_id; - _z_msg_fix_mapping(&zm, mapping); - _z_handle_network_message(ztm->_session, &zm, mapping); - _z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented - // zenoh messages are released when their transport message is released. - } - - // Free the decoding buffer - _z_zbuf_clear(&zbf); - // Reset the defragmentation buffer - _z_wbuf_reset(dbuf); - } - break; - } - - case _Z_MID_T_KEEP_ALIVE: { - _Z_INFO("Received _Z_KEEP_ALIVE message\n"); - if (entry == NULL) { - break; - } - entry->_received = true; - - break; - } - - case _Z_MID_T_INIT: { - // Do nothing, multicast transports are not expected to handle INIT messages - break; - } - - case _Z_MID_T_OPEN: { - // Do nothing, multicast transports are not expected to handle OPEN messages - break; - } - - case _Z_MID_T_JOIN: { - _Z_INFO("Received _Z_JOIN message\n"); - if (t_msg->_body._join._version != Z_PROTO_VERSION) { - break; - } - - if (entry == NULL) // New peer - { - entry = (_z_transport_peer_entry_t *)z_malloc(sizeof(_z_transport_peer_entry_t)); - if (entry != NULL) { - entry->_sn_res = _z_sn_max(t_msg->_body._join._seq_num_res); - - // If the new node has less representing capabilities then it is incompatible to communication - if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) || - (t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) || - (t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) { - ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; - } - - if (ret == _Z_RES_OK) { - entry->_remote_addr = _z_bytes_duplicate(addr); - entry->_remote_zid = t_msg->_body._join._zid; - - _z_conduit_sn_list_copy(&entry->_sn_rx_sns, &t_msg->_body._join._next_sn); - _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); - -#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 - entry->_dbuf_reliable = _z_wbuf_make(0, true); - entry->_dbuf_best_effort = _z_wbuf_make(0, true); -#else - entry->_dbuf_reliable = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); - entry->_dbuf_best_effort = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); -#endif - - // Update lease time (set as ms during) - entry->_lease = t_msg->_body._join._lease; - entry->_next_lease = entry->_lease; - entry->_received = true; - - ztm->_peers = _z_transport_peer_entry_list_insert(ztm->_peers, entry); - } else { - z_free(entry); - } - } else { - ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - } else { // Existing peer - entry->_received = true; - - // Check if the representing capabilities are still the same - if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) || - (t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) || - (t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) { - _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); - // TODO: cleanup here should also be done on mappings/subs/etc... - break; - } - - // Update SNs - _z_conduit_sn_list_copy(&entry->_sn_rx_sns, &t_msg->_body._join._next_sn); - _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); - - // Update lease time (set as ms during) - entry->_lease = t_msg->_body._join._lease; - } - break; - } - - case _Z_MID_T_CLOSE: { - _Z_INFO("Closing session as requested by the remote peer\n"); - - if (entry == NULL) { - break; - } - ztm->_peers = _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); - - break; - } - - default: { - _Z_ERROR("Unknown session message ID\n"); - break; - } - } - -#if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_unlock(&ztm->_mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - return ret; -} #else int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { _ZP_UNUSED(ztm); @@ -354,12 +103,4 @@ int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_ _ZP_UNUSED(addr); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -int8_t _z_raweth_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, - _z_bytes_t *addr) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(t_msg); - _ZP_UNUSED(addr); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} #endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/transport.c b/src/transport/raweth/transport.c index 4774514ea..21735dc7d 100644 --- a/src/transport/raweth/transport.c +++ b/src/transport/raweth/transport.c @@ -30,28 +30,6 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -void _zp_raweth_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback) { - void *ctx = callback->context; - _z_transport_peer_entry_list_t *l = zt->_transport._multicast._peers; - for (; l != NULL; l = _z_transport_peer_entry_list_tail(l)) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(l); - z_id_t id = val->_remote_zid; - - callback->call(&id, ctx); - } -} - -void _zp_raweth_info_session(const _z_transport_t *zt, _z_config_t *ps) { - _z_transport_peer_entry_list_t *xs = zt->_transport._multicast._peers; - while (xs != NULL) { - _z_transport_peer_entry_t *peer = _z_transport_peer_entry_list_head(xs); - _z_bytes_t remote_zid = _z_bytes_wrap(peer->_remote_zid.id, _z_id_len(peer->_remote_zid)); - _zp_config_insert(ps, Z_INFO_PEER_PID_KEY, _z_string_from_bytes(&remote_zid)); - - xs = _z_transport_peer_entry_list_tail(xs); - } -} - int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { int8_t ret = _Z_RES_OK;