Skip to content

Commit

Permalink
Merge pull request #348 from DenisBiryukov91/feature/priority-in-sample
Browse files Browse the repository at this point in the history
Add support for qos settings in sample
  • Loading branch information
milyin authored Feb 23, 2024
2 parents 3bab290 + 790bd62 commit 2ac04a2
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 22 deletions.
26 changes: 26 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,32 @@ typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
} zp_send_join_options_t;

/**
* QoS settings of zenoh message.
*/
typedef _z_qos_t z_qos_t;
/**
* Returns message priority.
*/
static inline z_priority_t z_qos_get_priority(z_qos_t qos) {
z_priority_t ret = _z_n_qos_get_priority(qos);
return ret == _Z_PRIORITY_CONTROL ? Z_PRIORITY_DEFAULT : ret;
}
/**
* Returns message congestion control.
*/
static inline z_congestion_control_t z_qos_get_congestion_control(z_qos_t qos) {
return _z_n_qos_get_congestion_control(qos);
}
/**
* Returns message express flag. If set to true, the message is not batched to reduce the latency.
*/
static inline _Bool z_qos_get_express(z_qos_t qos) { return _z_n_qos_get_express(qos); }
/**
* Returns default qos settings.
*/
static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; }

/**
* Represents a data sample.
*
Expand Down
8 changes: 8 additions & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ _z_keyexpr_t _z_rname(const char *rname);
*/
_z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix);

/**
* QoS settings of zenoh message.
*/
typedef struct {
uint8_t _val;
} _z_qos_t;

/**
* A zenoh-net data sample.
*
Expand All @@ -227,6 +234,7 @@ typedef struct {
_z_timestamp_t timestamp;
_z_encoding_t encoding;
z_sample_kind_t kind;
_z_qos_t qos;
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
Expand Down
22 changes: 17 additions & 5 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,24 @@
#define _Z_FLAG_N_RESPONSE_N 0x20 // 1 << 5
#define _Z_FLAG_N_RESPONSE_M 0x40 // 1 << 6

typedef struct {
uint8_t _val;
} _z_n_qos_t;
typedef _z_qos_t _z_n_qos_t;

#define _z_n_qos_make(express, nodrop, priority) \
(_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) }
static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control,
z_priority_t priority) {
_Bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
_z_n_qos_t ret = {._val = (uint8_t)((express << 4) | (nodrop << 3) | priority)};
return ret;
}
static inline z_priority_t _z_n_qos_get_priority(_z_n_qos_t n_qos) {
return (z_priority_t)(n_qos._val & 0x07 /* 0b111 */);
}
static inline z_congestion_control_t _z_n_qos_get_congestion_control(_z_n_qos_t n_qos) {
return (n_qos._val & 0x08 /* 0b1000 */) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP;
}
static inline _Bool _z_n_qos_get_express(_z_n_qos_t n_qos) { return (_Bool)(n_qos._val & 0x10 /* 0b10000 */); }
#define _z_n_qos_make(express, nodrop, priority) \
_z_n_qos_create((_Bool)express, nodrop ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP, \
(z_priority_t)priority)
#define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5)

// RESPONSE FINAL message flags:
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
#if Z_FEATURE_ATTACHMENT == 1
,
,
opt.attachment
#endif
);
Expand Down Expand Up @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
Expand Down
2 changes: 1 addition & 1 deletion src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand Down
8 changes: 4 additions & 4 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = req._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down Expand Up @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = response._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down
14 changes: 9 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -153,25 +154,26 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null()
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
,
,
att
#endif
);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down Expand Up @@ -200,6 +202,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
s.encoding = encoding;
s.kind = kind;
s.timestamp = timestamp;
s.qos = qos;
#if Z_FEATURE_ATTACHMENT == 1
s.attachment = att;
#endif
Expand Down Expand Up @@ -256,11 +259,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len) {
_z_zint_t payload_len, _z_n_qos_t qos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(payload_len);
_ZP_UNUSED(qos);
}

#endif // Z_FEATURE_SUBSCRIPTION == 1
2 changes: 1 addition & 1 deletion tests/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output):
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
Expand Down
4 changes: 3 additions & 1 deletion tests/z_test_fragment_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ void data_handler(const z_sample_t *sample, void *ctx) {
break;
}
}
printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid);
printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr),
(int)sample->payload.len, is_valid, z_qos_get_priority(sample->qos),
z_qos_get_congestion_control(sample->qos));
z_drop(z_move(keystr));
}

Expand Down
2 changes: 2 additions & 0 deletions tests/z_test_fragment_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int main(int argc, char **argv) {
// Put data
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
options.priority = Z_PRIORITY_DATA_HIGH;
options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
for (int i = 0; i < 5; i++) {
printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size);
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {
Expand Down

0 comments on commit 2ac04a2

Please sign in to comment.