From 528d89e9afecac7fead641f8b8d114e863b8a48b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 23 Jan 2024 17:24:39 +0100 Subject: [PATCH] feat: add fragmentation feature token --- include/zenoh-pico/config.h | 7 ++++++ include/zenoh-pico/transport/transport.h | 12 ++++++++--- src/transport/multicast/rx.c | 10 +++++++++ src/transport/multicast/tx.c | 5 ++++- src/transport/peer_entry.c | 4 ++++ src/transport/raweth/tx.c | 4 ++++ src/transport/unicast/rx.c | 9 ++++++++ src/transport/unicast/transport.c | 27 ++++++++++++++++++++---- src/transport/unicast/tx.c | 4 ++++ 9 files changed, 74 insertions(+), 8 deletions(-) diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 084981a3e..b10fa06dd 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -225,6 +225,13 @@ #define Z_FEATURE_RAWETH_TRANSPORT 0 #endif +/** + * Enable message fragmentation. + */ +#ifndef Z_FEATURE_FRAGMENTATION +#define Z_FEATURE_FRAGMENTATION 1 +#endif + /*------------------ Compile-time configuration properties ------------------*/ /** * Default length for Zenoh ID. Maximum size is 16 bytes. diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index e708b7e82..313f77268 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -26,9 +26,11 @@ #include "zenoh-pico/protocol/definitions/transport.h" typedef struct { +#if Z_FEATURE_FRAGMENTATION == 1 // Defragmentation buffers _z_wbuf_t _dbuf_reliable; _z_wbuf_t _dbuf_best_effort; +#endif _z_id_t _remote_zid; _z_bytes_t _remote_addr; @@ -73,9 +75,13 @@ typedef struct { _z_link_t _link; - // Buffers - _z_wbuf_t _dbuf_reliable; // Defragmentation buffer - _z_wbuf_t _dbuf_best_effort; // Defragmentation buffer +#if Z_FEATURE_FRAGMENTATION == 1 + // Defragmentation buffer + _z_wbuf_t _dbuf_reliable; + _z_wbuf_t _dbuf_best_effort; +#endif + + // Regular Buffers _z_wbuf_t _wbuf; _z_zbuf_t _zbuf; diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 1d0fc2a75..0f1d9a1a6 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -146,7 +146,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t true) { entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; } else { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&entry->_dbuf_reliable); +#endif _Z_INFO("Reliable message dropped because it is out of order"); break; } @@ -155,7 +157,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t t_msg->_body._frame._sn) == true) { entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; } else { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&entry->_dbuf_best_effort); +#endif _Z_INFO("Best effort message dropped because it is out of order"); break; } @@ -175,6 +179,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t case _Z_MID_T_FRAGMENT: { _Z_INFO("Received Z_FRAGMENT message"); +#if Z_FEATURE_FRAGMENTATION == 1 if (entry == NULL) { break; } @@ -218,6 +223,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t // Reset the defragmentation buffer _z_wbuf_reset(dbuf); } +#else + _Z_INFO("Fragment dropped because fragmentation feature is deactivated"); +#endif break; } @@ -267,12 +275,14 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t _z_conduit_sn_list_copy(&entry->_sn_rx_sns, &t_msg->_body._join._next_sn); _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); +#if Z_FEATURE_FRAGMENTATION == 1 #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); #else entry->_dbuf_reliable = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); entry->_dbuf_best_effort = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); +#endif #endif // Update lease time (set as ms during) diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 637a206f7..18aefa3e4 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -114,6 +114,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ztm->_transmitted = true; // Mark the session that we have transmitted data } } else { +#if Z_FEATURE_FRAGMENTATION == 1 // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); @@ -143,9 +144,11 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } } } - // Clear the buffer as it's no longer required _z_wbuf_clear(&fbf); +#else + _Z_INFO("Sending the message required fragmentation feature that is deactivated."); +#endif } } diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index eebb7508f..599660fcc 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -17,16 +17,20 @@ #include "zenoh-pico/transport/utils.h" void _z_transport_peer_entry_clear(_z_transport_peer_entry_t *src) { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&src->_dbuf_reliable); _z_wbuf_clear(&src->_dbuf_best_effort); +#endif src->_remote_zid = _z_id_empty(); _z_bytes_clear(&src->_remote_addr); } void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_transport_peer_entry_t *src) { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); +#endif dst->_sn_res = src->_sn_res; _z_conduit_sn_list_copy(&dst->_sn_rx_sns, &src->_sn_rx_sns); diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 4ed64452c..ac360cc9e 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -278,6 +278,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Mark the session that we have transmitted data ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it +#if Z_FEATURE_FRAGMENTATION == 1 // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf @@ -307,6 +308,9 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, } // Clear the expandable buffer _z_wbuf_clear(&fbf); +#else + _Z_INFO("Sending the message required fragmentation feature that is deactivated."); +#endif } #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&ztm->_mutex_tx); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 1ae4c3f74..3ed08a0a6 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -108,7 +108,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { ztu->_sn_rx_reliable = t_msg->_body._frame._sn; } else { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); +#endif _Z_INFO("Reliable message dropped because it is out of order"); break; } @@ -116,7 +118,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; } else { +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_best_effort); +#endif _Z_INFO("Best effort message dropped because it is out of order"); break; } @@ -134,6 +138,8 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans } case _Z_MID_T_FRAGMENT: { + _Z_INFO("Received Z_FRAGMENT message"); +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) ? &ztu->_dbuf_reliable : &ztu->_dbuf_best_effort; // Select the right defragmentation buffer @@ -172,6 +178,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans // Reset the defragmentation buffer _z_wbuf_reset(dbuf); } +#else + _Z_INFO("Fragment dropped because fragmentation feature is deactivated"); +#endif break; } diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index b6375a4d7..3dd7b49a4 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -74,17 +74,32 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo expandable = false; dbuf_size = Z_FRAG_MAX_SIZE; #endif + // Initialize tx rx buffers zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false); zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size); + // Clean up the buffers if one of them failed to be allocated + if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) || + (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size)) { + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_free(&zt->_transport._unicast._mutex_tx); + zp_mutex_free(&zt->_transport._unicast._mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + _z_wbuf_clear(&zt->_transport._unicast._wbuf); + _z_zbuf_clear(&zt->_transport._unicast._zbuf); + } + +#if Z_FEATURE_FRAGMENTATION == 1 // Initialize the defragmentation buffers zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) || - (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) || + if ( #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { @@ -94,6 +109,9 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo #endif ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + _z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable); + _z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort); + #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_free(&zt->_transport._unicast._mutex_tx); zp_mutex_free(&zt->_transport._unicast._mutex_rx); @@ -101,9 +119,8 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo _z_wbuf_clear(&zt->_transport._unicast._wbuf); _z_zbuf_clear(&zt->_transport._unicast._zbuf); - _z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable); - _z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort); } +#endif } if (ret == _Z_RES_OK) { @@ -284,8 +301,10 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { // Clean up the buffers _z_wbuf_clear(&ztu->_wbuf); _z_zbuf_clear(&ztu->_zbuf); +#if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); _z_wbuf_clear(&ztu->_dbuf_best_effort); +#endif // Clean up PIDs ztu->_remote_zid = _z_id_empty(); diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 52d52b640..b89b7c0e9 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -123,6 +123,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ztu->_transmitted = true; // Mark the session that we have transmitted data } } else { +#if Z_FEATURE_FRAGMENTATION == 1 // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); @@ -155,6 +156,9 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg // Clear the buffer as it's no longer required _z_wbuf_clear(&fbf); +#else + _Z_INFO("Sending the message required fragmentation feature that is deactivated."); +#endif } }