From 6e7a3c973a71a3b8cb9ef8a25de7e0dc148d498a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:09:09 +0100 Subject: [PATCH] feat: update transport files --- include/zenoh-pico/transport/common/tx.h | 4 +-- src/transport/common/rx.c | 10 ++---- src/transport/common/tx.c | 40 +++++++++--------------- src/transport/manager.c | 18 ++++------- src/transport/multicast/read.c | 10 ++---- src/transport/multicast/rx.c | 9 ++---- src/transport/multicast/tx.c | 12 +++---- src/transport/unicast/read.c | 10 ++---- src/transport/unicast/rx.c | 8 ++--- src/transport/unicast/transport.c | 8 ++--- src/transport/unicast/tx.c | 12 +++---- 11 files changed, 52 insertions(+), 89 deletions(-) diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 634b38e2e..d11c1c80e 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 2476a9b9a..8f25fff0f 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -28,10 +28,8 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: // Read the message length if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { size_t len = 0; @@ -52,9 +50,7 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index ca17089c3..12fc69e24 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,21 +27,19 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { _z_wbuf_reset(buf); - switch (link_capabilities) { + switch (flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_FLOW_STREAM: for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(buf, 0, i); } _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: break; } @@ -52,11 +50,10 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { - switch (link_capabilities) { +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { + switch (flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: { + case Z_LINK_CAP_FLOW_STREAM: { size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); @@ -64,8 +61,7 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { break; } // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: break; } @@ -94,18 +90,14 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(&wbf, 0, i); } _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: break; default: ret = _Z_ERR_GENERIC; @@ -114,10 +106,8 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: { + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: { // Write the message length in the reserved space if needed size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { @@ -125,9 +115,7 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m } break; } - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: break; default: ret = _Z_ERR_GENERIC; diff --git a/src/transport/manager.c b/src/transport/manager.c index bc85a8dbc..7d8f79811 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -31,10 +31,9 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local return ret; } // Open transport - switch (zl._capabilities) { + switch (zl._cap._transport) { // Unicast transport - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_UNICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_UNICAST: { _z_transport_unicast_establish_param_t tp_param; ret = _z_unicast_open_client(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -45,8 +44,7 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local break; } // Multicast transport - case Z_LINK_CAP_MULTICAST_STREAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_client(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -73,10 +71,8 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z if (ret != _Z_RES_OK) { return ret; } - switch (zl._capabilities) { - // Unicast capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_UNICAST_DATAGRAM: { + switch (zl._cap._transport) { + case Z_LINK_CAP_TRANSPORT_UNICAST: { _z_transport_unicast_establish_param_t tp_param; ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -86,9 +82,7 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z ret = _z_unicast_transport_create(zt, &zl, &tp_param); break; } - // Multicast capable links - case Z_LINK_CAP_MULTICAST_STREAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index ce221fe64..12073d0ce 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -73,10 +73,8 @@ void *_zp_multicast_read_task(void *ztm_arg) { // Read bytes from socket to the main buffer size_t to_read = 0; - switch (ztm->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -99,9 +97,7 @@ void *_zp_multicast_read_task(void *ztm_arg) { } } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztm->_zbuf); to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); if (to_read == SIZE_MAX) { diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 0893b4307..5d43eb60c 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -59,10 +59,8 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me size_t to_read = 0; do { - switch (ztm->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -85,8 +83,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me } break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztm->_zbuf); to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (to_read == SIZE_MAX) { diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 94ff3c333..230c8c85a 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); if (ret == _Z_RES_OK) { @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index d3fcbbdad..02b95ddef 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -68,10 +68,8 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (ztu->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - switch (ztu->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztu->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -93,9 +91,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { } } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztu->_zbuf); to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (to_read == SIZE_MAX) { diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index c5ee7748e..c1e3faa2d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -37,10 +37,9 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag size_t to_read = 0; do { - switch (ztu->_link._capabilities) { + switch (ztu->_link._cap._flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -63,8 +62,7 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag } break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztu->_zbuf); to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (to_read == SIZE_MAX) { diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 046b41630..15f660bd8 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -53,13 +53,11 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo size_t dbuf_size = 0; _Bool expandable = false; - switch (zl->_capabilities) { - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: expandable = true; break; - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: expandable = false; break; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 93997048a..21694e1e9 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); if (ret == _Z_RES_OK) { @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); if (ztu->_wbuf._ioss._len == 1) { ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) {