Skip to content

Commit

Permalink
Renaming questionable (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland authored Jan 30, 2024
1 parent d7269a9 commit 19e7757
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 70 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_han
* Returns:
* Returns a new query closure.
*/
z_owned_closure_query_t z_closure_query(_z_questionable_handler_t call, _z_dropper_handler_t drop, void *context);
z_owned_closure_query_t z_closure_query(_z_queryable_handler_t call, _z_dropper_handler_t drop, void *context);

/**
* Return a new reply closure.
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_samp
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks.
*
* Members:
* _z_questionable_handler_t call: `void (*_z_questionable_handler_t)(z_query_t *query, void *arg)` is the callback
* function.
* _z_queryable_handler_t call: `void (*_z_queryable_handler_t)(z_query_t *query, void *arg)` is the
* callback function.
* _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed.
* void *context: a pointer to an arbitrary state.
*/
typedef struct {
void *context;
_z_questionable_handler_t call;
_z_queryable_handler_t call;
_z_dropper_handler_t drop;
} z_owned_closure_query_t;

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ int8_t _z_subscriber_pull(const _z_subscriber_t *sub);
* The created :c:type:`_z_queryable_t` or null if the declaration failed.
*/
_z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _Bool complete,
_z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg);
_z_queryable_handler_t callback, _z_drop_handler_t dropper, void *arg);

/**
* Undeclare a :c:type:`_z_queryable_t`.
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ typedef struct _z_session_t {

// Session queryables
#if Z_FEATURE_QUERYABLE == 1
_z_questionable_rc_list_t *_local_questionable;
_z_session_queryable_rc_list_t *_local_queryable;
#endif
#if Z_FEATURE_QUERY == 1
_z_pending_query_list_t *_pending_queries;
Expand Down
10 changes: 5 additions & 5 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
#define _Z_QUERYABLE_DISTANCE_DEFAULT 0

/*------------------ Queryable ------------------*/
_z_questionable_rc_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id);
_z_questionable_rc_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key);
_z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q);
_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid);
void _z_unregister_questionable(_z_session_t *zn, _z_questionable_rc_t *q);
void _z_flush_questionables(_z_session_t *zn);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
#endif

#endif /* ZENOH_PICO_SESSION_QUERYABLE_H */
19 changes: 10 additions & 9 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,25 @@ typedef struct z_query_t z_query_t; // Forward type declaration to avoid cyclic
/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_questionable_handler_t)(const z_query_t *query, void *arg);
typedef void (*_z_queryable_handler_t)(const z_query_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
uint32_t _id;
_z_questionable_handler_t _callback;
_z_queryable_handler_t _callback;
_z_drop_handler_t _dropper;
void *_arg;
_Bool _complete;
} _z_questionable_t;
} _z_session_queryable_t;

_Bool _z_questionable_eq(const _z_questionable_t *one, const _z_questionable_t *two);
void _z_questionable_clear(_z_questionable_t *res);
_Bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two);
void _z_session_queryable_clear(_z_session_queryable_t *res);

_Z_REFCOUNT_DEFINE(_z_questionable, _z_questionable)
_Z_ELEM_DEFINE(_z_questionable, _z_questionable_t, _z_noop_size, _z_questionable_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_questionable_rc, _z_questionable_rc_t, _z_noop_size, _z_questionable_rc_drop, _z_noop_copy)
_Z_LIST_DEFINE(_z_questionable_rc, _z_questionable_rc_t)
_Z_REFCOUNT_DEFINE(_z_session_queryable, _z_session_queryable)
_Z_ELEM_DEFINE(_z_session_queryable, _z_session_queryable_t, _z_noop_size, _z_session_queryable_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t, _z_noop_size, _z_session_queryable_rc_drop,
_z_noop_copy)
_Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t)

typedef struct {
_z_reply_t _reply;
Expand Down
2 changes: 1 addition & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_han
return (z_owned_closure_sample_t){.call = call, .drop = drop, .context = context};
}

z_owned_closure_query_t z_closure_query(_z_questionable_handler_t call, _z_dropper_handler_t drop, void *context) {
z_owned_closure_query_t z_closure_query(_z_queryable_handler_t call, _z_dropper_handler_t drop, void *context) {
return (z_owned_closure_query_t){.call = call, .drop = drop, .context = context};
}

Expand Down
18 changes: 9 additions & 9 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
#if Z_FEATURE_QUERYABLE == 1
/*------------------ Queryable Declaration ------------------*/
_z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _Bool complete,
_z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg) {
_z_questionable_t q;
_z_queryable_handler_t callback, _z_drop_handler_t dropper, void *arg) {
_z_session_queryable_t q;
q._id = _z_get_entity_id(&zn->in->val);
q._key = _z_get_expanded_key_from_key(&zn->in->val, &keyexpr);
q._complete = complete;
Expand All @@ -281,11 +281,11 @@ _z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr,
// Allocate queryable
_z_queryable_t *ret = (_z_queryable_t *)zp_malloc(sizeof(_z_queryable_t));
if (ret == NULL) {
_z_questionable_clear(&q);
_z_session_queryable_clear(&q);
return NULL;
}
// Create questionable entry, stored at session-level, do not drop it by the end of this function.
_z_questionable_rc_t *sp_q = _z_register_questionable(&zn->in->val, &q);
// Create session_queryable entry, stored at session-level, do not drop it by the end of this function.
_z_session_queryable_rc_t *sp_q = _z_register_session_queryable(&zn->in->val, &q);
if (sp_q == NULL) {
_z_queryable_free(&ret);
return NULL;
Expand All @@ -294,7 +294,7 @@ _z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_questionable(&zn->in->val, sp_q);
_z_unregister_session_queryable(&zn->in->val, sp_q);
_z_queryable_free(&ret);
return NULL;
}
Expand All @@ -309,8 +309,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
if (qle == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Find questionable entry
_z_questionable_rc_t *q = _z_get_questionable_by_id(&qle->_zn.in->val, qle->_entity_id);
// Find session_queryable entry
_z_session_queryable_rc_t *q = _z_get_session_queryable_by_id(&qle->_zn.in->val, qle->_entity_id);
if (q == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}
Expand All @@ -322,7 +322,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
}
_z_n_msg_clear(&n_msg);
// Only if message is successfully send, local queryable state can be removed
_z_unregister_questionable(&qle->_zn.in->val, q);
_z_unregister_session_queryable(&qle->_zn.in->val, q);
_z_session_rc_drop(&qle->_zn);
return _Z_RES_OK;
}
Expand Down
80 changes: 42 additions & 38 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,48 @@
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERYABLE == 1
_Bool _z_questionable_eq(const _z_questionable_t *one, const _z_questionable_t *two) { return one->_id == two->_id; }
_Bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two) {
return one->_id == two->_id;
}

void _z_questionable_clear(_z_questionable_t *qle) {
void _z_session_queryable_clear(_z_session_queryable_t *qle) {
if (qle->_dropper != NULL) {
qle->_dropper(qle->_arg);
}
_z_keyexpr_clear(&qle->_key);
}

/*------------------ Queryable ------------------*/
_z_questionable_rc_t *__z_get_questionable_by_id(_z_questionable_rc_list_t *qles, const _z_zint_t id) {
_z_questionable_rc_t *ret = NULL;
_z_session_queryable_rc_t *__z_get_session_queryable_by_id(_z_session_queryable_rc_list_t *qles, const _z_zint_t id) {
_z_session_queryable_rc_t *ret = NULL;

_z_questionable_rc_list_t *xs = qles;
_z_session_queryable_rc_list_t *xs = qles;
while (xs != NULL) {
_z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs);
_z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs);
if (id == qle->in->val._id) {
ret = qle;
break;
}

xs = _z_questionable_rc_list_tail(xs);
xs = _z_session_queryable_rc_list_tail(xs);
}

return ret;
}

_z_questionable_rc_list_t *__z_get_questionable_by_key(_z_questionable_rc_list_t *qles, const _z_keyexpr_t key) {
_z_questionable_rc_list_t *ret = NULL;
_z_session_queryable_rc_list_t *__z_get_session_queryable_by_key(_z_session_queryable_rc_list_t *qles,
const _z_keyexpr_t key) {
_z_session_queryable_rc_list_t *ret = NULL;

_z_questionable_rc_list_t *xs = qles;
_z_session_queryable_rc_list_t *xs = qles;
while (xs != NULL) {
_z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs);
_z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs);
if (_z_keyexpr_intersects(qle->in->val._key._suffix, strlen(qle->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
ret = _z_questionable_rc_list_push(ret, _z_questionable_rc_clone_as_ptr(qle));
ret = _z_session_queryable_rc_list_push(ret, _z_session_queryable_rc_clone_as_ptr(qle));
}

xs = _z_questionable_rc_list_tail(xs);
xs = _z_session_queryable_rc_list_tail(xs);
}

return ret;
Expand All @@ -76,27 +79,27 @@ _z_questionable_rc_list_t *__z_get_questionable_by_key(_z_questionable_rc_list_t
* Make sure that the following mutexes are locked before calling this function:
* - zn->_mutex_inner
*/
_z_questionable_rc_t *__unsafe_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) {
_z_questionable_rc_list_t *qles = zn->_local_questionable;
return __z_get_questionable_by_id(qles, id);
_z_session_queryable_rc_t *__unsafe_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id) {
_z_session_queryable_rc_list_t *qles = zn->_local_queryable;
return __z_get_session_queryable_by_id(qles, id);
}

/**
* This function is unsafe because it operates in potentially concurrent data.
* Make sure that the following mutexes are locked before calling this function:
* - zn->_mutex_inner
*/
_z_questionable_rc_list_t *__unsafe_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key) {
_z_questionable_rc_list_t *qles = zn->_local_questionable;
return __z_get_questionable_by_key(qles, key);
_z_session_queryable_rc_list_t *__unsafe_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key) {
_z_session_queryable_rc_list_t *qles = zn->_local_queryable;
return __z_get_session_queryable_by_key(qles, key);
}

_z_questionable_rc_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) {
_z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_questionable_rc_t *qle = __unsafe_z_get_questionable_by_id(zn, id);
_z_session_queryable_rc_t *qle = __unsafe_z_get_session_queryable_by_id(zn, id);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand All @@ -105,13 +108,13 @@ _z_questionable_rc_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_
return qle;
}

_z_questionable_rc_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr);
_z_questionable_rc_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key);
_z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand All @@ -120,18 +123,18 @@ _z_questionable_rc_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z
return qles;
}

_z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q) {
_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q) {
_Z_DEBUG(">>> Allocating queryable for (%ju:%s)", (uintmax_t)q->_key._id, q->_key._suffix);
_z_questionable_rc_t *ret = NULL;
_z_session_queryable_rc_t *ret = NULL;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

ret = (_z_questionable_rc_t *)zp_malloc(sizeof(_z_questionable_rc_t));
ret = (_z_session_queryable_rc_t *)zp_malloc(sizeof(_z_session_queryable_rc_t));
if (ret != NULL) {
*ret = _z_questionable_rc_new_from_val(*q);
zn->_local_questionable = _z_questionable_rc_list_push(zn->_local_questionable, ret);
*ret = _z_session_queryable_rc_new_from_val(*q);
zn->_local_queryable = _z_session_queryable_rc_list_push(zn->_local_queryable, ret);
}

#if Z_FEATURE_MULTI_THREAD == 1
Expand All @@ -150,7 +153,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const

_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key);
if (key._suffix != NULL) {
_z_questionable_rc_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key);
_z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand All @@ -159,17 +162,17 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
// Build the z_query
z_query_t query = {._val = {._rc = _z_query_rc_new()}};
query._val._rc.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid);
// Parse questionable list
_z_questionable_rc_list_t *xs = qles;
// Parse session_queryable list
_z_session_queryable_rc_list_t *xs = qles;
while (xs != NULL) {
_z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs);
_z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs);
qle->in->val._callback(&query, qle->in->val._arg);
xs = _z_questionable_rc_list_tail(xs);
xs = _z_session_queryable_rc_list_tail(xs);
}
// Clean up
_z_query_rc_drop(&query._val._rc);
_z_keyexpr_clear(&key);
_z_questionable_rc_list_free(&qles);
_z_session_queryable_rc_list_free(&qles);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand All @@ -181,24 +184,25 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
return ret;
}

void _z_unregister_questionable(_z_session_t *zn, _z_questionable_rc_t *qle) {
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *qle) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

zn->_local_questionable = _z_questionable_rc_list_drop_filter(zn->_local_questionable, _z_questionable_rc_eq, qle);
zn->_local_queryable =
_z_session_queryable_rc_list_drop_filter(zn->_local_queryable, _z_session_queryable_rc_eq, qle);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
}

void _z_flush_questionables(_z_session_t *zn) {
void _z_flush_session_queryable(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_questionable_rc_list_free(&zn->_local_questionable);
_z_session_queryable_rc_list_free(&zn->_local_queryable);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
Expand Down
4 changes: 2 additions & 2 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) {
zn->_remote_subscriptions = NULL;
#endif
#if Z_FEATURE_QUERYABLE == 1
zn->_local_questionable = NULL;
zn->_local_queryable = NULL;
#endif
#if Z_FEATURE_QUERY == 1
zn->_pending_queries = NULL;
Expand Down Expand Up @@ -110,7 +110,7 @@ void _z_session_clear(_z_session_t *zn) {
_z_flush_subscriptions(zn);
#endif
#if Z_FEATURE_QUERYABLE == 1
_z_flush_questionables(zn);
_z_flush_session_queryable(zn);
#endif
#if Z_FEATURE_QUERY == 1
_z_flush_pending_queries(zn);
Expand Down

0 comments on commit 19e7757

Please sign in to comment.