Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connection restoring #799

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right);

void _z_locator_init(_z_locator_t *locator);
_z_string_t _z_locator_to_string(const _z_locator_t *loc);
z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s);
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s);

size_t _z_locator_size(_z_locator_t *lc);
void _z_locator_clear(_z_locator_t *lc);
Expand All @@ -72,7 +72,7 @@ typedef struct {
} _z_endpoint_t;

_z_string_t _z_endpoint_to_string(const _z_endpoint_t *e);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s);
void _z_endpoint_clear(_z_endpoint_t *ep);
void _z_endpoint_free(_z_endpoint_t **ep);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ typedef struct _z_link_t {

void _z_link_clear(_z_link_t *zl);
void _z_link_free(_z_link_t **zl);
z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator);

z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf);
size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr);
Expand Down
39 changes: 37 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/list.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -55,6 +56,11 @@ typedef struct _z_session_t {
_z_resource_list_t *_local_resources;
_z_resource_list_t *_remote_resources;

// Information for session restoring
// Empty _config means session is not restorable
_z_config_t _config;
_z_network_message_list_t *_decalaration_cache;

// Session subscriptions
#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_list_t *_subscriptions;
Expand Down Expand Up @@ -99,14 +105,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session)
* Open a zenoh-net session
*
* Parameters:
* zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value.
* config: A set of properties. The caller keeps its ownership.
* zn: A pointer of A :c:type:`_z_session_t` used as a return value.
* zid: A pointer to Zenoh ID.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid);

/**
* Reopen a disconnected zenoh-net session
*
* Parameters:
* zn: Existing zenoh-net session.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_reopen(_z_session_rc_t *zn);

/**
* Store declaration network message to cache for resend it after session restore
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with declaration
*/
void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Remove corresponding declaration from the cache
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with undeclaration
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config);
void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Close a zenoh-net session.
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg);
inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); }
_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move)
_Z_SVEC_DEFINE(_z_network_message, _z_network_message_t)
_Z_LIST_DEFINE(_z_network_message, _z_network_message_t)

void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping);
_z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid,
Expand All @@ -315,6 +316,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);
z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src);
_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
_z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout,
const bool exit_on_first);

z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid);
z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid);
void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

Expand Down
30 changes: 30 additions & 0 deletions include/zenoh-pico/transport/common/transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_COMMON_TRANSPORT_H
#define ZENOH_PICO_COMMON_TRANSPORT_H

#include "zenoh-pico/transport/transport.h"

#ifdef __cplusplus
extern "C" {
#endif

void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks);

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ enum _z_peer_op_e {
_Z_PEER_OP_LISTEN = 1,
};

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op);
z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode,
int peer_op);
void _z_free_transport(_z_transport_t **zt);

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa
const _z_id_t *local_zid);
z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only);
z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason);
void _z_multicast_transport_clear(_z_transport_t *zt);
void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ typedef struct {
uint8_t _seq_num_res;
} _z_transport_multicast_establish_param_t;

_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt);
z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason);
void _z_transport_clear(_z_transport_t *zt);
void _z_transport_free(_z_transport_t **zt);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
const _z_id_t *local_zid, int peer_op);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks);

#ifdef __cplusplus
}
Expand Down
64 changes: 51 additions & 13 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "zenoh-pico/transport/common/tx.h"
#include "zenoh-pico/transport/multicast.h"
#include "zenoh-pico/transport/unicast.h"
#include "zenoh-pico/utils/config.h"
#include "zenoh-pico/utils/endianness.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/pointers.h"
Expand Down Expand Up @@ -626,35 +627,72 @@ z_result_t z_scout(z_moved_config_t *config, z_moved_closure_hello_t *callback,

void z_open_options_default(z_open_options_t *options) { options->__dummy = 0; }

z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) {
_ZP_UNUSED(options);
static _z_id_t _z_session_get_zid(const _z_config_t *config) {
_z_id_t zid = _z_id_empty();
char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY);
if (opt_as_str != NULL) {
_z_uuid_to_bytes(zid.id, opt_as_str);
} else {
_z_session_generate_zid(&zid, Z_ZID_LENGTH);
}
return zid;
}

static z_result_t _z_session_rc_init(z_owned_session_t *zs, _z_id_t *zid) {
z_internal_session_null(zs);
_z_session_t *s = z_malloc(sizeof(_z_session_t));
if (s == NULL) {
z_config_drop(config);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
memset(s, 0, sizeof(_z_session_t));
// Create rc

z_result_t ret = _z_session_init(s, zid);
if (ret != _Z_RES_OK) {
_Z_ERROR("_z_open failed: %i", ret);
z_free(s);
return ret;
}

_z_session_rc_t zsrc = _z_session_rc_new(s);
if (zsrc._cnt == NULL) {
_z_session_clear(s);
z_free(s);
z_config_drop(config);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
zs->_rc = zsrc;
// Open session
z_result_t ret = _z_open(&zs->_rc, &config->_this._val);

return _Z_RES_OK;
}

z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) {
_ZP_UNUSED(options);

_z_config_t *cfg = &config->_this._val;
if (config == NULL) {
_Z_ERROR("A valid config is missing.");
return _Z_ERR_GENERIC;
}

_z_id_t zid = _z_session_get_zid(cfg);

z_result_t ret = _z_session_rc_init(zs, &zid);
if (ret != _Z_RES_OK) {
_Z_ERROR("_z_open failed: %i", ret);
_z_session_rc_decr(&zs->_rc);
z_internal_session_null(zs);
z_config_drop(config);
z_free(s);
return ret;
}

ret = _z_open(&zs->_rc, cfg, &zid);
if (ret != _Z_RES_OK) {
z_session_drop(z_session_move(zs));
z_config_drop(config);
return ret;
}
// Clean up
z_config_drop(config);
if (/* session is restorable*/ true) {
_Z_OWNED_RC_IN_VAL(zs)->_config = config->_this._val;
z_internal_config_null(&config->_this);
} else {
z_config_drop(config);
}
return _Z_RES_OK;
}

Expand Down
12 changes: 6 additions & 6 deletions src/link/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right) {
return res;
}

static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_string_t *str) {
static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, const _z_string_t *str) {
*protocol = _z_string_null();

const char *p_start = _z_string_data(str);
Expand All @@ -97,7 +97,7 @@ static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_stri
return _z_string_copy_substring(protocol, str, 0, p_len);
}

static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string_t *str) {
static z_result_t _z_locator_address_from_string(_z_string_t *address, const _z_string_t *str) {
*address = _z_string_null();

// Find protocol separator
Expand Down Expand Up @@ -130,7 +130,7 @@ static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string
return _z_string_copy_substring(address, str, start_offset, addr_len);
}

z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, _z_string_t *str) {
z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, const _z_string_t *str) {
*strint = _z_str_intmap_make();

// Find metadata separator
Expand Down Expand Up @@ -169,7 +169,7 @@ void _z_locator_metadata_onto_str(char *dst, size_t dst_len, const _z_str_intmap
_z_str_intmap_onto_str(dst, dst_len, s, 0, NULL);
}

z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *str) {
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *str) {
if (str == NULL || !_z_string_check(str)) {
return _Z_ERR_CONFIG_LOCATOR_INVALID;
}
Expand Down Expand Up @@ -284,7 +284,7 @@ void _z_endpoint_free(_z_endpoint_t **ep) {
}
}

z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, _z_string_t *str, _z_string_t *proto) {
z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, const _z_string_t *str, _z_string_t *proto) {
char *p_start = (char *)memchr(_z_string_data(str), ENDPOINT_CONFIG_SEPARATOR, _z_string_len(str));
if (p_start != NULL) {
p_start = _z_ptr_char_offset(p_start, 1);
Expand Down Expand Up @@ -411,7 +411,7 @@ char *_z_endpoint_config_to_string(const _z_str_intmap_t *s, const _z_string_t *
return NULL;
}

z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *str) {
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *str) {
_z_endpoint_init(ep);
_Z_CLEAN_RETURN_IF_ERR(_z_locator_from_string(&ep->_locator, str), _z_endpoint_clear(ep));
_Z_CLEAN_RETURN_IF_ERR(_z_endpoint_config_from_string(&ep->_config, str, &ep->_locator._protocol),
Expand Down
4 changes: 2 additions & 2 deletions src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "zenoh-pico/link/manager.h"
#include "zenoh-pico/utils/logging.h"

z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) {
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator) {
z_result_t ret = _Z_RES_OK;

_z_endpoint_t ep;
Expand Down Expand Up @@ -71,7 +71,7 @@ z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) {
return ret;
}

z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator) {
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator) {
z_result_t ret = _Z_RES_OK;

_z_endpoint_t ep;
Expand Down
Loading
Loading