diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index d7bb29451..06117dd06 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -413,6 +413,32 @@ typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior } zp_send_join_options_t; +/** + * QoS settings of zenoh message. + */ +typedef _z_qos_t z_qos_t; +/** + * Returns message priority. + */ +static inline z_priority_t z_qos_get_priority(z_qos_t qos) { + z_priority_t ret = _z_n_qos_get_priority(qos); + return ret == _Z_PRIORITY_CONTROL ? Z_PRIORITY_DEFAULT : ret; +} +/** + * Returns message congestion control. + */ +static inline z_congestion_control_t z_qos_get_congestion_control(z_qos_t qos) { + return _z_n_qos_get_congestion_control(qos); +} +/** + * Returns message express flag. If set to true, the message is not batched to reduce the latency. + */ +static inline _Bool z_qos_get_express(z_qos_t qos) { return _z_n_qos_get_express(qos); } +/** + * Returns default qos settings. + */ +static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; } + /** * Represents a data sample. * diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 2eaa3e495..d7d2dc3f2 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -211,6 +211,13 @@ _z_keyexpr_t _z_rname(const char *rname); */ _z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix); +/** + * QoS settings of zenoh message. + */ +typedef struct { + uint8_t _val; +} _z_qos_t; + /** * A zenoh-net data sample. * @@ -227,6 +234,7 @@ typedef struct { _z_timestamp_t timestamp; _z_encoding_t encoding; z_sample_kind_t kind; + _z_qos_t qos; #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t attachment; #endif diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 5c09247d3..512ad2c8d 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -58,12 +58,24 @@ #define _Z_FLAG_N_RESPONSE_N 0x20 // 1 << 5 #define _Z_FLAG_N_RESPONSE_M 0x40 // 1 << 6 -typedef struct { - uint8_t _val; -} _z_n_qos_t; +typedef _z_qos_t _z_n_qos_t; -#define _z_n_qos_make(express, nodrop, priority) \ - (_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) } +static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control, + z_priority_t priority) { + _Bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; + _z_n_qos_t ret = {._val = (uint8_t)((express << 4) | (nodrop << 3) | priority)}; + return ret; +} +static inline z_priority_t _z_n_qos_get_priority(_z_n_qos_t n_qos) { + return (z_priority_t)(n_qos._val & 0x07 /* 0b111 */); +} +static inline z_congestion_control_t _z_n_qos_get_congestion_control(_z_n_qos_t n_qos) { + return (n_qos._val & 0x08 /* 0b1000 */) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP; +} +static inline _Bool _z_n_qos_get_express(_z_n_qos_t n_qos) { return (_Bool)(n_qos._val & 0x10 /* 0b10000 */); } +#define _z_n_qos_make(express, nodrop, priority) \ + _z_n_qos_create((_Bool)express, nodrop ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP, \ + (z_priority_t)priority) #define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5) // RESPONSE FINAL message flags: diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index f13ffe05f..77fb05af7 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len + _z_zint_t payload_len, _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att #endif ); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + const _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att diff --git a/src/api/api.c b/src/api/api.c index 4bdcc3421..73350bd23 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint ); // Trigger local subscriptions - _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len + _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, + _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority) #if Z_FEATURE_ATTACHMENT == 1 - , + , opt.attachment #endif ); @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l ); // Trigger local subscriptions - _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len + _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT #if Z_FEATURE_ATTACHMENT == 1 , opt.attachment diff --git a/src/session/push.c b/src/session/push.c index 55187bae7..0fa85cb4c 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment); #endif - ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp + ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos #if Z_FEATURE_ATTACHMENT == 1 , att diff --git a/src/session/rx.c b/src/session/rx.c index 9b27a9fbd..5ba2bc6ed 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint z_attachment_t att = _z_encoded_as_attachment(&put._attachment); #endif ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp + put._commons._timestamp, req._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , att @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = req._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint z_attachment_t att = _z_encoded_as_attachment(&put._attachment); #endif ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp + put._commons._timestamp, response._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , att @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = response._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() diff --git a/src/session/subscription.c b/src/session/subscription.c index 0cd1050d9..1b9345b0d 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -19,6 +19,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/session.h" @@ -153,7 +154,7 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca } void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len + _z_zint_t payload_len, _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att @@ -161,9 +162,9 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr ) { _z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)}; int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT, - _z_timestamp_null() + _z_timestamp_null(), qos #if Z_FEATURE_ATTACHMENT == 1 - , + , att #endif ); @@ -171,7 +172,8 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr } int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + const _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att @@ -200,6 +202,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co s.encoding = encoding; s.kind = kind; s.timestamp = timestamp; + s.qos = qos; #if Z_FEATURE_ATTACHMENT == 1 s.attachment = att; #endif @@ -256,11 +259,12 @@ void _z_flush_subscriptions(_z_session_t *zn) { #else // Z_FEATURE_SUBSCRIPTION == 0 void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len) { + _z_zint_t payload_len, _z_n_qos_t qos) { _ZP_UNUSED(zn); _ZP_UNUSED(keyexpr); _ZP_UNUSED(payload); _ZP_UNUSED(payload_len); + _ZP_UNUSED(qos); } #endif // Z_FEATURE_SUBSCRIPTION == 1 diff --git a/tests/fragment.py b/tests/fragment.py index d29295f0f..7b2a6f8d5 100644 --- a/tests/fragment.py +++ b/tests/fragment.py @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output): # Expected rx output & status z_rx_expected_status = 0 z_rx_expected_output = ( - "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1") + "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}") # Check the exit status of tx if tx_status == z_tx_expected_status: diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c index 133497694..5f2e248ae 100644 --- a/tests/z_test_fragment_rx.c +++ b/tests/z_test_fragment_rx.c @@ -29,7 +29,9 @@ void data_handler(const z_sample_t *sample, void *ctx) { break; } } - printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid); + printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr), + (int)sample->payload.len, is_valid, z_qos_get_priority(sample->qos), + z_qos_get_congestion_control(sample->qos)); z_drop(z_move(keystr)); } diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index b002a34d8..70b24968e 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -60,6 +60,8 @@ int main(int argc, char **argv) { // Put data z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + options.priority = Z_PRIORITY_DATA_HIGH; + options.congestion_control = Z_CONGESTION_CONTROL_BLOCK; for (int i = 0; i < 5; i++) { printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {