Skip to content

Commit

Permalink
Add owned_query entity (#327)
Browse files Browse the repository at this point in the history
* feat: add z_owned_query_t

Allows z_query to be extracted from queryable callbacks if needed.

* feat: add _z_value_copy function

* feat: add _z_query_create function

* feat: use _z_query_create in trigger questionable
  • Loading branch information
jean-roland authored Jan 25, 2024
1 parent 702dfeb commit a87cea8
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 38 deletions.
8 changes: 8 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ typedef struct {
} z_queryable_t;
_OWNED_TYPE_PTR(_z_queryable_t, queryable)

/**
* Represents a Zenoh query entity, received by Zenoh Queryable entities.
*
*/
typedef struct z_query_t {
z_owned_query_t _val;
} z_query_t;

/**
* Represents the encoding of a payload, in a MIME-like format.
*
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
*/
int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload);
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
16 changes: 14 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@
/**
* The query to be answered by a queryable.
*/
typedef struct z_query_t {
typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn;
char *_parameters;
_Bool _anyke;
} z_query_t;
} _z_query_t;

void _z_query_clear(_z_query_t *q);
_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
* Container for an owned query rc
*/
typedef struct {
_z_query_rc_t _rc;
} z_owned_query_t;

/**
* Return type when declaring a queryable.
Expand All @@ -41,6 +51,8 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters,
_z_session_t *zn, uint32_t request_id);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ typedef struct {
} _z_value_t;
_z_value_t _z_value_null(void);
_z_value_t _z_value_steal(_z_value_t *value);
void _z_value_copy(_z_value_t *dst, const _z_value_t *src);
void _z_value_clear(_z_value_t *src);
void _z_value_free(_z_value_t **hello);

Expand Down
9 changes: 5 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,14 @@ z_query_consolidation_t z_query_consolidation_none(void) {
z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); }

z_bytes_t z_query_parameters(const z_query_t *query) {
z_bytes_t parameters = _z_bytes_wrap((uint8_t *)query->_parameters, strlen(query->_parameters));
z_bytes_t parameters =
_z_bytes_wrap((uint8_t *)query->_val._rc.in->val._parameters, strlen(query->_val._rc.in->val._parameters));
return parameters;
}

z_value_t z_query_value(const z_query_t *query) { return query->_value; }
z_value_t z_query_value(const z_query_t *query) { return query->_val._rc.in->val._value; }

z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_key; }
z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_val._rc.in->val._key; }

_Bool z_value_is_initialized(z_value_t *value) {
_Bool ret = false;
Expand Down Expand Up @@ -917,7 +918,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
.len = payload_len,
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
return _z_send_reply(query, keyexpr, value);
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value);
return _Z_ERR_GENERIC;
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_RES_OK;
}

int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) {
int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) {
int8_t ret = _Z_RES_OK;

_z_keyexpr_t q_ke;
Expand Down
31 changes: 31 additions & 0 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,38 @@

#include "zenoh-pico/net/query.h"

#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

void _z_query_clear(_z_query_t *q) {
// Send REPLY_FINAL message
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id);
if (_z_send_n_msg(q->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_Z_ERROR("Query send REPLY_FINAL transport failure !");
}
// Clean up memory
_z_msg_clear(&z_msg);
zp_free(q->_parameters);
_z_keyexpr_clear(&q->_key);
_z_value_clear(&q->_value);
// Ideally free session rc if you have one
}

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters,
_z_session_t *zn, uint32_t request_id) {
_z_query_t q;
q._request_id = request_id;
q._zn = zn; // Ideally would have been an rc
q._parameters = (char *)zp_malloc(parameters->len + 1);
memcpy(q._parameters, parameters->start, parameters->len);
q._parameters[parameters->len] = 0;
q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
_z_keyexpr_copy(&q._key, key);
_z_value_copy(&q._value, value);
return q;
}

void _z_queryable_clear(_z_queryable_t *qbl) {
// Nothing to clear
(void)(qbl);
Expand Down
5 changes: 5 additions & 0 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ _z_value_t _z_value_steal(_z_value_t *value) {
*value = _z_value_null();
return ret;
}
void _z_value_copy(_z_value_t *dst, const _z_value_t *src) {
dst->encoding.prefix = src->encoding.prefix;
_z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix);
_z_bytes_copy(&dst->payload, &src->payload);
}

#if Z_FEATURE_ATTACHMENT == 1
struct _z_seeker_t {
Expand Down
39 changes: 9 additions & 30 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <stdint.h>
#include <string.h>

#include "zenoh-pico/api/types.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/net/query.h"
#include "zenoh-pico/protocol/core.h"
Expand Down Expand Up @@ -140,7 +141,7 @@ _z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable
return ret;
}

int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid) {
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
Expand All @@ -155,42 +156,20 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

// Build the query
z_query_t q;
q._zn = zn;
q._request_id = qid;
q._key = key;
#if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32)))
char *params = zp_malloc(query->_parameters.len + 1);
#else
char params[query->_parameters.len + 1];
#endif
memcpy(params, query->_parameters.start, query->_parameters.len);
params[query->_parameters.len] = 0;
q._parameters = params;
q._value.encoding = query->_ext_value.encoding;
q._value.payload = query->_ext_value.payload;
q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
// Build the z_query
z_query_t query = {._val = {._rc = _z_query_rc_new()}};
query._val._rc.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid);
// Parse questionable list
_z_questionable_rc_list_t *xs = qles;
while (xs != NULL) {
_z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs);
qle->in->val._callback(&q, qle->in->val._arg);
qle->in->val._callback(&query, qle->in->val._arg);
xs = _z_questionable_rc_list_tail(xs);
}

// Clean up
_z_query_rc_drop(&query._val._rc);
_z_keyexpr_clear(&key);
_z_questionable_rc_list_free(&qles);
#if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32)))
zp_free(params);
#endif

// Send the final reply
// Create the final reply
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._request_id);
if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_msg_clear(&z_msg);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand Down

0 comments on commit a87cea8

Please sign in to comment.