diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 7d776251d..7af62184a 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -191,6 +191,14 @@ typedef struct { } z_queryable_t; _OWNED_TYPE_PTR(_z_queryable_t, queryable) +/** + * Represents a Zenoh query entity, received by Zenoh Queryable entities. + * + */ +typedef struct z_query_t { + z_owned_query_t _val; +} z_query_t; + /** * Represents the encoding of a payload, in a MIME-like format. * diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index a4b76c5b1..1a91d0d9b 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -211,7 +211,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle); * key: The resource key of this reply. The caller keeps the ownership. * payload: The value of this reply, the caller keeps ownership. */ -int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); +int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 8c6a2d701..0dafb9f4d 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -23,14 +23,24 @@ /** * The query to be answered by a queryable. */ -typedef struct z_query_t { +typedef struct _z_query_t { _z_value_t _value; _z_keyexpr_t _key; uint32_t _request_id; _z_session_t *_zn; char *_parameters; _Bool _anyke; -} z_query_t; +} _z_query_t; + +void _z_query_clear(_z_query_t *q); +_Z_REFCOUNT_DEFINE(_z_query, _z_query) + +/** + * Container for an owned query rc + */ +typedef struct { + _z_query_rc_t _rc; +} z_owned_query_t; /** * Return type when declaring a queryable. @@ -41,6 +51,8 @@ typedef struct { } _z_queryable_t; #if Z_FEATURE_QUERYABLE == 1 +_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters, + _z_session_t *zn, uint32_t request_id); void _z_queryable_clear(_z_queryable_t *qbl); void _z_queryable_free(_z_queryable_t **qbl); #endif diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index bb7285854..2eaa3e495 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -245,6 +245,7 @@ typedef struct { } _z_value_t; _z_value_t _z_value_null(void); _z_value_t _z_value_steal(_z_value_t *value); +void _z_value_copy(_z_value_t *dst, const _z_value_t *src); void _z_value_clear(_z_value_t *src); void _z_value_free(_z_value_t **hello); diff --git a/src/api/api.c b/src/api/api.c index 9c3187894..2598bc1f4 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -291,13 +291,14 @@ z_query_consolidation_t z_query_consolidation_none(void) { z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); } z_bytes_t z_query_parameters(const z_query_t *query) { - z_bytes_t parameters = _z_bytes_wrap((uint8_t *)query->_parameters, strlen(query->_parameters)); + z_bytes_t parameters = + _z_bytes_wrap((uint8_t *)query->_val._rc.in->val._parameters, strlen(query->_val._rc.in->val._parameters)); return parameters; } -z_value_t z_query_value(const z_query_t *query) { return query->_value; } +z_value_t z_query_value(const z_query_t *query) { return query->_val._rc.in->val._value; } -z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_key; } +z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_val._rc.in->val._key; } _Bool z_value_is_initialized(z_value_t *value) { _Bool ret = false; @@ -917,7 +918,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui .len = payload_len, }, .encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}}; - return _z_send_reply(query, keyexpr, value); + return _z_send_reply(&query->_val._rc.in->val, keyexpr, value); return _Z_ERR_GENERIC; } #endif diff --git a/src/net/primitives.c b/src/net/primitives.c index c8ecc6560..84385c6fb 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -327,7 +327,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { return _Z_RES_OK; } -int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) { +int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) { int8_t ret = _Z_RES_OK; _z_keyexpr_t q_ke; diff --git a/src/net/query.c b/src/net/query.c index 1f56e612d..41b290f52 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -13,7 +13,38 @@ #include "zenoh-pico/net/query.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" + +void _z_query_clear(_z_query_t *q) { + // Send REPLY_FINAL message + _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id); + if (_z_send_n_msg(q->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _Z_ERROR("Query send REPLY_FINAL transport failure !"); + } + // Clean up memory + _z_msg_clear(&z_msg); + zp_free(q->_parameters); + _z_keyexpr_clear(&q->_key); + _z_value_clear(&q->_value); + // Ideally free session rc if you have one +} + #if Z_FEATURE_QUERYABLE == 1 +_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters, + _z_session_t *zn, uint32_t request_id) { + _z_query_t q; + q._request_id = request_id; + q._zn = zn; // Ideally would have been an rc + q._parameters = (char *)zp_malloc(parameters->len + 1); + memcpy(q._parameters, parameters->start, parameters->len); + q._parameters[parameters->len] = 0; + q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; + _z_keyexpr_copy(&q._key, key); + _z_value_copy(&q._value, value); + return q; +} + void _z_queryable_clear(_z_queryable_t *qbl) { // Nothing to clear (void)(qbl); diff --git a/src/protocol/core.c b/src/protocol/core.c index 29608735e..2e4619443 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -72,6 +72,11 @@ _z_value_t _z_value_steal(_z_value_t *value) { *value = _z_value_null(); return ret; } +void _z_value_copy(_z_value_t *dst, const _z_value_t *src) { + dst->encoding.prefix = src->encoding.prefix; + _z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix); + _z_bytes_copy(&dst->payload, &src->payload); +} #if Z_FEATURE_ATTACHMENT == 1 struct _z_seeker_t { diff --git a/src/session/queryable.c b/src/session/queryable.c index a73615c61..341b8b447 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -16,6 +16,7 @@ #include #include +#include "zenoh-pico/api/types.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/query.h" #include "zenoh-pico/protocol/core.h" @@ -140,7 +141,7 @@ _z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable return ret; } -int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid) { +int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid) { int8_t ret = _Z_RES_OK; #if Z_FEATURE_MULTI_THREAD == 1 @@ -155,42 +156,20 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons zp_mutex_unlock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - // Build the query - z_query_t q; - q._zn = zn; - q._request_id = qid; - q._key = key; -#if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32))) - char *params = zp_malloc(query->_parameters.len + 1); -#else - char params[query->_parameters.len + 1]; -#endif - memcpy(params, query->_parameters.start, query->_parameters.len); - params[query->_parameters.len] = 0; - q._parameters = params; - q._value.encoding = query->_ext_value.encoding; - q._value.payload = query->_ext_value.payload; - q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; + // Build the z_query + z_query_t query = {._val = {._rc = _z_query_rc_new()}}; + query._val._rc.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid); + // Parse questionable list _z_questionable_rc_list_t *xs = qles; while (xs != NULL) { _z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs); - qle->in->val._callback(&q, qle->in->val._arg); + qle->in->val._callback(&query, qle->in->val._arg); xs = _z_questionable_rc_list_tail(xs); } - + // Clean up + _z_query_rc_drop(&query._val._rc); _z_keyexpr_clear(&key); _z_questionable_rc_list_free(&qles); -#if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32))) - zp_free(params); -#endif - - // Send the final reply - // Create the final reply - _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._request_id); - if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_msg_clear(&z_msg); } else { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner);