Skip to content

Commit

Permalink
Attachments validated on pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Jan 15, 2024
1 parent 6b710b0 commit ae2a366
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 60 deletions.
11 changes: 10 additions & 1 deletion examples/unix/c11/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@

#include <ctype.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/api/macros.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/protocol/core.h"

#if Z_FEATURE_PUBLICATION == 1
int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-put";
Expand Down Expand Up @@ -89,16 +95,19 @@ int main(int argc, char **argv) {
printf("Putting Data ('%s': '%s')...\n", keyexpr, value);
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, _z_bytes_wrap((uint8_t *)"hi", 2), _z_bytes_wrap((uint8_t *)"there", 5));
options.attachment = z_bytes_map_as_attachment(&map);
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options) < 0) {
printf("Oh no! Put has failed...\n");
}

z_bytes_map_drop(&map);
// z_undeclare_keyexpr(z_loan(s), z_move(ke));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));

z_close(z_move(s));
return 0;
}
Expand Down
14 changes: 14 additions & 0 deletions examples/unix/c11/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,31 @@

#include <ctype.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/api/types.h"
#include "zenoh-pico/protocol/core.h"

#if Z_FEATURE_SUBSCRIPTION == 1
int8_t attachment_handler(z_bytes_t key, z_bytes_t value, void *ctx) {
(void)ctx;
printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)value.len, value.start);
return 0;
}

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
if (z_attachment_check(&sample->attachment)) {
printf("Attachement found\n");
z_attachment_iterate(sample->attachment, attachment_handler, NULL);
}
z_drop(z_move(keystr));
}

Expand Down
6 changes: 4 additions & 2 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ typedef struct {
*
* Members:
* z_encoding_t encoding: The encoding of the payload.
* z_attachment_t attachment: an attachment to the response.
*/
typedef struct {
z_encoding_t encoding;
z_attachment_t attachment;
} z_query_reply_options_t;

/**
Expand Down Expand Up @@ -559,8 +561,8 @@ struct _z_bytes_pair_t {

void _z_bytes_pair_clear(struct _z_bytes_pair_t *this);

_Z_ELEM_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t, _z_noop_size, _z_bytes_pair_clear, _z_noop_copy);
_Z_LIST_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t);
_Z_ELEM_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t, _z_noop_size, _z_bytes_pair_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t)

/**
* A map of maybe-owned vector of bytes to maybe-owned vector of bytes.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>

/*-------- Bytes --------*/
/**
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len,
const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority);
z_priority_t priority, z_attachment_t attachment);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand Down Expand Up @@ -228,7 +228,7 @@ int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _
*/
int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, const _z_value_t value, _z_reply_handler_t callback,
void *arg_call, _z_drop_handler_t dropper, void *arg_drop);
void *arg_call, _z_drop_handler_t dropper, void *arg_drop, z_attachment_t attachment);
#endif

#endif /* ZENOH_PICO_PRIMITIVES_NETAPI_H */
24 changes: 19 additions & 5 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/string.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/system/platform.h"

#define _Z_OPTIONAL
Expand Down Expand Up @@ -98,6 +99,7 @@ typedef struct z_attachment_vtable_t {
*/
z_attachment_iter_driver_t iteration_driver;
} z_attachment_vtable_t;

/**
* A v-table based map of byte slice to byte slice.
*
Expand All @@ -109,13 +111,25 @@ typedef struct z_attachment_t {
z_attachment_iter_driver_t iteration_driver;
} z_attachment_t;

inline z_attachment_t z_attachment_null() { return (z_attachment_t){.data = NULL, .iteration_driver = NULL}; }
inline _Bool z_attachment_check(const z_attachment_t *attachment) { return attachment->iteration_driver != NULL; }
inline int8_t z_attachment_iterate(z_attachment_t this, z_attachment_iter_body_t body, void *ctx) {
return this.iteration_driver(this.data, body, ctx);
}
z_attachment_t z_attachment_null(void);
_Bool z_attachment_check(const z_attachment_t *attachment);
int8_t z_attachment_iterate(z_attachment_t this, z_attachment_iter_body_t body, void *ctx);
_z_bytes_t z_attachment_get(z_attachment_t this, _z_bytes_t key);

typedef struct {
union {
z_attachment_t decoded;
_z_bytes_t encoded;
} body;
_Bool is_encoded;
} _z_owned_encoded_attachment_t;
/**
* Estimate the length of an attachment once encoded.
*/
size_t _z_attachment_estimate_length(z_attachment_t att);
z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att);
void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att);

_z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp);
_z_timestamp_t _z_timestamp_null(void);
void _z_timestamp_clear(_z_timestamp_t *tstamp);
Expand Down
4 changes: 4 additions & 0 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ typedef struct {
_z_value_t _value;
_z_source_info_t _ext_source_info;
z_consolidation_mode_t _ext_consolidation;
_z_owned_encoded_attachment_t _ext_attachment;
} _z_msg_reply_t;
void _z_msg_reply_clear(_z_msg_reply_t *msg);
#define _Z_FLAG_Z_R_T 0x20
Expand Down Expand Up @@ -149,6 +150,7 @@ typedef struct {
_z_m_push_commons_t _commons;
_z_bytes_t _payload;
_z_encoding_t _encoding;
_z_owned_encoded_attachment_t _attachment;
} _z_msg_put_t;
void _z_msg_put_clear(_z_msg_put_t *);
#define _Z_M_PUT_ID 0x01
Expand All @@ -172,11 +174,13 @@ typedef struct {
_z_source_info_t _ext_info;
_z_value_t _ext_value;
z_consolidation_mode_t _ext_consolidation;
_z_owned_encoded_attachment_t _ext_attachment;
} _z_msg_query_t;
typedef struct {
_Bool info;
_Bool body;
_Bool consolidation;
_Bool attachment;
} _z_msg_query_reqexts_t;
_z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg);
void _z_msg_query_clear(_z_msg_query_t *msg);
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ _Z_VEC_DEFINE(_z_network_message, _z_network_message_t)
void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping);
_z_network_message_t _z_msg_make_pull(_z_keyexpr_t key, _z_zint_t pull_id);
_z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes_t) parameters, _z_zint_t qid,
z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value);
z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value,
z_attachment_t attachment);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value);
_z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key);
_z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ _z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8

_z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp);
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
z_attachment_t att);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_sptr_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);

Expand Down
17 changes: 9 additions & 8 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
opt.priority = options->priority;
}
ret = _z_write(zs._val, keyexpr, (const uint8_t *)payload, payload_len, opt.encoding, Z_SAMPLE_KIND_PUT,
opt.congestion_control, opt.priority);
opt.congestion_control, opt.priority, options->attachment);

return ret;
}
Expand All @@ -624,7 +624,7 @@ int8_t z_delete(z_session_t zs, z_keyexpr_t keyexpr, const z_delete_options_t *o
opt.priority = options->priority;
}
ret = _z_write(zs._val, keyexpr, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, opt.congestion_control,
opt.priority);
opt.priority, z_attachment_null());

return ret;
}
Expand Down Expand Up @@ -683,15 +683,15 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
}

ret = _z_write(pub._val->_zn, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT,
pub._val->_congestion_control, pub._val->_priority);
pub._val->_congestion_control, pub._val->_priority, options->attachment);

return ret;
}

int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options) {
(void)(options);
return _z_write(pub._val->_zn, pub._val->_key, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE,
pub._val->_congestion_control, pub._val->_priority);
pub._val->_congestion_control, pub._val->_priority, z_attachment_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) {
Expand All @@ -709,7 +709,8 @@ OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _
z_get_options_t z_get_options_default(void) {
return (z_get_options_t){.target = z_query_target_default(),
.consolidation = z_query_consolidation_default(),
.value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()}};
.value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()},
.attachment = z_attachment_null()};
}

typedef struct __z_reply_handler_wrapper_t {
Expand Down Expand Up @@ -757,7 +758,7 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne
}

ret = _z_query(zs._val, keyexpr, parameters, opt.target, opt.consolidation.mode, opt.value, __z_reply_handler,
wrapped_ctx, callback->drop, ctx);
wrapped_ctx, callback->drop, ctx, opt.attachment);
return ret;
}

Expand Down Expand Up @@ -1158,7 +1159,7 @@ void z_bytes_map_insert_by_alias(const z_owned_bytes_map_t *this_, z_bytes_t key
memset(insert, 0, sizeof(struct _z_bytes_pair_t));
insert->key = _z_bytes_wrap(key.start, key.len);
insert->value = _z_bytes_wrap(value.start, value.len);
_z_bytes_pair_list_push(this_->_inner, insert);
((z_owned_bytes_map_t *)this_)->_inner = _z_bytes_pair_list_push(this_->_inner, insert);
}
}
void z_bytes_map_insert_by_copy(const z_owned_bytes_map_t *this_, z_bytes_t key, z_bytes_t value) {
Expand All @@ -1182,7 +1183,7 @@ void z_bytes_map_insert_by_copy(const z_owned_bytes_map_t *this_, z_bytes_t key,
memset(insert, 0, sizeof(struct _z_bytes_pair_t));
_z_bytes_copy(&insert->key, &key);
_z_bytes_copy(&insert->value, &value);
_z_bytes_pair_list_push(this_->_inner, insert);
((z_owned_bytes_map_t *)this_)->_inner = _z_bytes_pair_list_push(this_->_inner, insert);
}
}
int8_t z_bytes_map_iter(const z_owned_bytes_map_t *this_, z_attachment_iter_body_t body, void *ctx) {
Expand Down
3 changes: 3 additions & 0 deletions src/collections/bytes.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ _z_bytes_t _z_bytes_steal(_z_bytes_t *b) {
*b = _z_bytes_empty();
return ret;
}
_Bool _z_bytes_eq(const _z_bytes_t *left, const _z_bytes_t *right) {
return left->len == right->len && memcmp(left->start, right->start, left->len) == 0;
}
8 changes: 5 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len,
const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority) {
z_priority_t priority, z_attachment_t attachment) {
int8_t ret = _Z_RES_OK;
_z_network_message_t msg;
switch (kind) {
Expand All @@ -146,6 +146,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay
._commons = {._timestamp = _z_timestamp_null(), ._source_info = _z_source_info_null()},
._payload = _z_bytes_wrap(payload, len),
._encoding = encoding,
._attachment = {.is_encoded = false, .body.decoded = attachment},
},
},
};
Expand Down Expand Up @@ -383,7 +384,7 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu
/*------------------ Query ------------------*/
int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback,
void *arg_call, _z_drop_handler_t dropper, void *arg_drop) {
void *arg_call, _z_drop_handler_t dropper, void *arg_drop, z_attachment_t attachment) {
int8_t ret = _Z_RES_OK;

// Create the pending query object
Expand All @@ -404,7 +405,8 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters,
ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
_z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters));
_z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value);
_z_zenoh_message_t z_msg =
_z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value, attachment);

if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_pending_query(zn, pq);
Expand Down
Loading

0 comments on commit ae2a366

Please sign in to comment.