diff --git a/docs/ext.rst b/docs/ext.rst index 4ad6d9d..7b6ea08 100644 --- a/docs/ext.rst +++ b/docs/ext.rst @@ -46,4 +46,20 @@ Extra Zenoh entities. .. doxygenclass:: zenoh::ext::QueryingSubscriber :members: - :membergroups: Constructors Operators Methods Fields \ No newline at end of file + :membergroups: Constructors Operators Methods Fields + +.. doxygenclass:: zenoh::ext::AdvancedPublisher + :members: + :membergroups: Constructors Operators Methods Fields + +.. doxygenclass:: zenoh::ext::AdvancedSubscriber + :members: + :membergroups: Constructors Operators Methods Fields + +.. doxygenstruct:: zenoh::ext::Miss + :members: + :membergroups: Constructors Operators Methods Fields + +.. doxygenclass:: zenoh::ext::SampleMissListener + :members: + :membergroups: Constructors Operators Methods \ No newline at end of file diff --git a/docs/matching.rst b/docs/matching.rst index 554cae5..8c41226 100644 --- a/docs/matching.rst +++ b/docs/matching.rst @@ -1,5 +1,5 @@ Matching -================= +======== Classes related to getting information about matching Zenoh entities. .. doxygenstruct:: zenoh::MatchingStatus diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 7e7174d..2568da4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -60,10 +60,7 @@ function(add_examples glob mode lib) if(${file} MATCHES ".*querier.*$") continue() endif() - if(${file} MATCHES ".*query_sub.*$") - continue() - endif() - if(${file} MATCHES ".*pub_cache.*$") + if((${file} MATCHES ".*advanced_sub.*$") OR (${file} MATCHES ".*advanced_pub.*$")) continue() endif() endif() diff --git a/examples/zenohc/z_pub_cache.cxx b/examples/zenohc/z_advanced_pub.cxx similarity index 54% rename from examples/zenohc/z_pub_cache.cxx rename to examples/zenohc/z_advanced_pub.cxx index 9797d3b..f3114d0 100644 --- a/examples/zenohc/z_pub_cache.cxx +++ b/examples/zenohc/z_advanced_pub.cxx @@ -11,10 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -#include -#include - #include +#include #include #include #include @@ -26,60 +24,57 @@ using namespace zenoh; using namespace std::chrono_literals; +#ifdef ZENOHCXX_ZENOHC const char *default_value = "Pub from C++ zenoh-c!"; const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-c-pub"; -const char *default_prefix = ""; +#elif ZENOHCXX_ZENOHPICO +const char *default_value = "Pub from C++ zenoh-pico!"; +const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-pico-pub"; +#else +#error "Unknown zenoh backend" +#endif int _main(int argc, char **argv) { auto &&[config, args] = ConfigCliArgParser(argc, argv) - .named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to write to (string)", default_keyexpr) - .named_value({"v", "value"}, "VALUE", "Value to publish (string)", default_value) - .named_value({"i", "history"}, "HISTORY", "Number of publications to keep in cache (number)", "1") - .named_flag({"o", "complete"}, - "Set `complete` option to true. This means that this queryable is ultimate data source, no " - "need to scan other queryables") - .named_value({"x", "prefix"}, "PREFIX", "Queryable prefix", "") + .named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to publish to (string)", default_keyexpr) + .named_value({"p", "payload"}, "PAYLOAD", "Payload to publish (string)", default_value) + .named_value({"i", "history"}, "HISTORY_SIZE", "The number of publications to keep in cache (number)", "1") .run(); auto keyexpr = args.value("key"); - auto value = args.value("value"); + auto payload = args.value("payload"); auto history = std::atoi(args.value("history").data()); - auto complete = args.flag("complete"); - auto prefix = args.value("prefix"); - - config.insert_json5(Z_CONFIG_ADD_TIMESTAMP_KEY, "true"); std::cout << "Opening session..." << std::endl; auto session = Session::open(std::move(config)); - std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl; - ext::SessionExt::PublicationCacheOptions opts; - opts.history = history; - opts.queryable_complete = complete; - if (!prefix.empty()) { - opts.queryable_prefix = KeyExpr(prefix); - } - if (!std::string(prefix).empty()) { - opts.queryable_prefix = KeyExpr(prefix); - } - auto pub_cache = session.ext().declare_publication_cache(keyexpr, std::move(opts)); + ext::SessionExt::AdvancedPublisherOptions opts; + opts.cache = ext::SessionExt::AdvancedPublisherOptions::CacheOptions{}; + opts.cache->max_samples = history; + opts.publisher_detection = true; + opts.sample_miss_detection = true; + + std::cout << "Declaring AdvancedPublisher on '" << keyexpr << "'..." << std::endl; + auto pub = session.ext().declare_advanced_publisher(KeyExpr(keyexpr), std::move(opts)); std::cout << "Press CTRL-C to quit..." << std::endl; for (int idx = 0; idx < std::numeric_limits::max(); ++idx) { std::this_thread::sleep_for(1s); std::ostringstream ss; - ss << "[" << idx << "] " << value; + ss << "[" << idx << "] " << payload; auto s = ss.str(); - std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n"; - session.put(keyexpr, std::move(s)); + std::cout << "Put Data ('" << keyexpr << "': '" << s << "')...\n"; + pub.put(s); } return 0; } int main(int argc, char **argv) { try { +#ifdef ZENOHCXX_ZENOHC init_log_from_env_or("error"); +#endif _main(argc, argv); } catch (ZException e) { std::cout << "Received an error :" << e.what() << "\n"; diff --git a/examples/zenohc/z_query_sub.cxx b/examples/zenohc/z_advanced_sub.cxx similarity index 62% rename from examples/zenohc/z_query_sub.cxx rename to examples/zenohc/z_advanced_sub.cxx index 4601f01..d1ec651 100644 --- a/examples/zenohc/z_query_sub.cxx +++ b/examples/zenohc/z_advanced_sub.cxx @@ -11,9 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -#include -#include - #include #include #include @@ -41,34 +38,40 @@ int _main(int argc, char **argv) { auto &&[config, args] = ConfigCliArgParser(argc, argv) .named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to subscriber to (string)", "demo/example/**") - .named_value({"q", "query"}, "Query", - "Selector to use for queries (by default it's same as 'KEY_EXPRESSION') (string)", "") .run(); auto keyexpr = args.value("key"); - auto query = args.value("query"); std::cout << "Opening session..." << std::endl; auto session = Session::open(std::move(config)); - std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'" - << std::endl; - ext::SessionExt::QueryingSubscriberOptions opts; - - if (!query.empty()) { - opts.query_keyexpr = KeyExpr(query); - opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY; - } - auto querying_subscriber = - session.ext().declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts)); + ext::SessionExt::AdvancedSubscriberOptions opts; + opts.history = ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions{}; + opts.history->detect_late_publishers = true; + opts.recovery = ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{}; + opts.recovery->periodic_queries_period_ms = 1000; + opts.subscriber_detection = true; - std::cout << "Press CTRL-C to quit..." << std::endl; - for (auto res = querying_subscriber.handler().recv(); std::holds_alternative(res); - res = querying_subscriber.handler().recv()) { - const auto &sample = std::get(res); + auto data_handler = [](const Sample &sample) { std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('" - << sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')" + << sample.get_keyexpr().as_string_view() << "' : '" << sample.get_payload().as_string() << "')"; + std::cout << std::endl; + }; + + auto missed_sample_handler = [](const ext::Miss &miss) { + std::cout << ">> [Subscriber] Missed " << miss.nb << " samples from '" << miss.source.id() << "' !!!" << std::endl; + }; + + std::cout << "Declaring AdvancedSubscriber on '" << keyexpr << "'" << std::endl; + auto advanced_subscriber = + session.ext().declare_advanced_subscriber(keyexpr, data_handler, closures::none, std::move(opts)); + + advanced_subscriber.declare_background_sample_miss_listener(missed_sample_handler, closures::none); + + std::cout << "Press CTRL-C to quit..." << std::endl; + while (true) { + std::this_thread::sleep_for(1s); } return 0; diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index d595e5a..212efe8 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -44,7 +44,6 @@ #endif #include "api/ext/serialization.hxx" #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) -#include "api/ext/publication_cache.hxx" -#include "api/ext/querying_subscriber.hxx" +#include "api/ext/session_ext.hxx" #include "api/matching.hxx" #endif diff --git a/include/zenoh/api/ext/advanced_publisher.hxx b/include/zenoh/api/ext/advanced_publisher.hxx new file mode 100644 index 0000000..a53b210 --- /dev/null +++ b/include/zenoh/api/ext/advanced_publisher.hxx @@ -0,0 +1,247 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include + +#include "../../detail/closures_concrete.hxx" +#include "../base.hxx" +#include "../bytes.hxx" +#include "../encoding.hxx" +#include "../enums.hxx" +#include "../interop.hxx" +#include "../keyexpr.hxx" +#include "../publisher.hxx" +#include "../sample.hxx" +#include "../timestamp.hxx" +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "../matching.hxx" +#include "../source_info.hxx" +#endif + +namespace zenoh::ext { + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh advanced publisher. +/// +/// In addition to publishing the data, +/// it also maintains the storage, allowing matching subscribers to retrive missed samples. +/// @note Zenoh-c only +class AdvancedPublisher : public Owned<::ze_owned_advanced_publisher_t> { + AdvancedPublisher(zenoh::detail::null_object_t) : Owned(nullptr){}; + friend struct interop::detail::Converter; + + public: + /// @name Methods + + /// @brief Get the key expression of the advanced publisher. + const KeyExpr& get_keyexpr() const { + return interop::as_owned_cpp_ref(::ze_advanced_publisher_keyexpr(interop::as_loaned_c_ptr(*this))); + } + + /// @brief Undeclare advanced publisher. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_advanced_publisher(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Advanced Publisher"); + } + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Get the id of the advanced publisher. + /// @return id of this advancedc publisher. + EntityGlobalId get_id() const { + return interop::into_copyable_cpp_obj( + ::ze_advanced_publisher_id(interop::as_loaned_c_ptr(*this))); + } +#endif + + /// @brief Options to be passed to ``AdvancedPublisher::put`` operation. + struct PutOptions { + /// @name Fields + + /// Base put options. + zenoh::Publisher::PutOptions put_options = {}; + + /// @name Methods + + /// @brief Create default option settings. + static PutOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_advanced_publisher_put_options_t to_c_opts() { + ::ze_advanced_publisher_put_options_t opts; + ze_advanced_publisher_put_options_default(&opts); + opts.put_options = zenoh::interop::detail::Converter::to_c_opts(this->put_options); + return opts; + } + }; + + /// @brief Options to be passed to ``AdvancedPublisher::delete_resource`` operation. + struct DeleteOptions { + /// @name Fields + + /// Base delete options. + zenoh::Publisher::DeleteOptions delete_options = {}; + + /// @name Methods + + /// @brief Create default option settings. + static DeleteOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_advanced_publisher_delete_options_t to_c_opts() { + ::ze_advanced_publisher_delete_options_t opts; + ze_advanced_publisher_delete_options_default(&opts); + opts.delete_options = zenoh::interop::detail::Converter::to_c_opts(this->delete_options); + return opts; + } + }; + + /// @name Methods + + /// @brief Publish a message on advanced publisher key expression. + /// @param payload data to publish. + /// @param options optional parameters to pass to put operation. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void put(Bytes&& payload, PutOptions&& options = PutOptions::create_default(), zenoh::ZResult* err = nullptr) { + auto payload_ptr = interop::as_moved_c_ptr(payload); + ::ze_advanced_publisher_put_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + __ZENOH_RESULT_CHECK(::ze_advanced_publisher_put(interop::as_loaned_c_ptr(*this), payload_ptr, &opts), err, + "Failed to perform put operation"); + } + + /// @brief Undeclare the resource associated with the advanced publisher key expression. + /// @param options optional parameters to pass to delete operation. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void delete_resource(DeleteOptions&& options = DeleteOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + ::ze_advanced_publisher_delete_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + __ZENOH_RESULT_CHECK(::ze_advanced_publisher_delete(interop::as_loaned_c_ptr(*this), &opts), err, + "Failed to perform delete_resource operation"); + } + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct matching listener, registering a callback for notifying subscribers matching with a given + /// advanced publisher. + /// + /// @param on_status_change: the callable that will be called every time the matching status of the publisher + /// changes (i.e. if last subscriber disconnects or when the first subscriber connects). + /// @param on_drop the callable that will be called once matching listener is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``MatchingListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] MatchingListener declare_matching_listener(C&& on_status_change, D&& on_drop, + zenoh::ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_status_change should be callable with the following signature: void on_status_change(const " + "zenoh::MatchingStatus& status)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::zc_owned_closure_matching_status_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_status_change), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_status_change_call, + zenoh::detail::closures::_zenoh_on_drop, closure); + auto m = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_advanced_publisher_declare_matching_listener( + zenoh::interop::as_loaned_c_ptr(*this), zenoh::interop::as_owned_c_ptr(m), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); + return m; + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct matching listener, delivering notification on publisher status change through a streaming + /// handler. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param channel: an instance of channel. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``MatchingListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] MatchingListener> declare_matching_listener( + Channel channel, zenoh::ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + auto m = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::zc_publisher_declare_matching_listener( + interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(m), ::z_move(cb_handler_pair.first)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); + if (res != Z_OK) ::z_drop(zenoh::interop::as_moved_c_ptr(cb_handler_pair.second)); + return MatchingListener>( + std::move(m), std::move(cb_handler_pair.second)); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Declare matching listener, registering a callback for notifying subscribers matching with a given + /// advanced publisher. The callback will be run in the background until the corresponding publisher is destroyed. + /// + /// @param on_status_change: the callable that will be called every time the matching status of the publisher + /// changes (i.e. if last subscriber disconnects or when the first subscriber connects). + /// @param on_drop the callable that will be called once publisher is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + template + void declare_background_matching_listener(C&& on_status_change, D&& on_drop, zenoh::ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_status_change should be callable with the following signature: void on_status_change(const " + "zenoh::MatchingStatus& status)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::zc_owned_closure_matching_status_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_status_change), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_status_change_call, + zenoh::detail::closures::_zenoh_on_drop, closure); + zenoh::ZResult res = ::ze_advanced_publisher_declare_background_matching_listener( + zenoh::interop::as_loaned_c_ptr(*this), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare background Matching Listener"); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Gets advanced publisher matching status - i.e. if there are any subscribers matching its key expression. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + MatchingStatus get_matching_status(zenoh::ZResult* err = nullptr) const { + ::zc_matching_status_t m; + zenoh::ZResult res = ::ze_advanced_publisher_get_matching_status(zenoh::interop::as_loaned_c_ptr(*this), &m); + __ZENOH_RESULT_CHECK(res, err, "Failed to get matching status"); + return {m.matching}; + } +#endif +}; + +} // namespace zenoh::ext +#endif diff --git a/include/zenoh/api/ext/advanced_subscriber.hxx b/include/zenoh/api/ext/advanced_subscriber.hxx new file mode 100644 index 0000000..670fadc --- /dev/null +++ b/include/zenoh/api/ext/advanced_subscriber.hxx @@ -0,0 +1,312 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "../../detail/closures_concrete.hxx" +#include "../base.hxx" +#include "../interop.hxx" +#include "../keyexpr.hxx" +#include "../sample.hxx" +#include "miss.hxx" + +namespace zenoh::ext { + +namespace detail { +class AdvancedSubscriberBase : public Owned<::ze_owned_advanced_subscriber_t> { + protected: + AdvancedSubscriberBase(zenoh::detail::null_object_t) : Owned(nullptr){}; + AdvancedSubscriberBase(::ze_owned_advanced_subscriber_t* s) : Owned(s){}; + + public: + /// @name Methods + + /// @brief Construct sample miss listener, registering a callback for notifying about missed samples. + /// + /// @param on_miss_detected: the callable that will be called every time when missed samples are detected. + /// @param on_drop the callable that will be called once sample miss listener is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``SampleMissListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] SampleMissListener declare_sample_miss_listener(C&& on_miss_detected, D&& on_drop, + zenoh::ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_miss_detected should be callable with the following signature: void on_status_change(const " + "zenoh::ext::Miss& miss)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::ze_owned_closure_miss_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_miss_detected), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_miss_detected_call, + zenoh::detail::closures::_zenoh_on_drop, closure); + auto m = zenoh::interop::detail::null>(); + ZResult res = ::ze_advanced_subscriber_declare_sample_miss_listener( + zenoh::interop::as_loaned_c_ptr(*this), zenoh::interop::as_owned_c_ptr(m), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Sample Miss Listener"); + return m; + } + + /// @brief Declare sample miss listener, registering a callback for notifying about detected missed samples. + /// The callback will be run in the background until the corresponding advanced subscriber is destroyed. + /// + /// @param on_miss_detected: the callable that will be called every time when missed samples are detected. + /// @param on_drop the callable that will be called once sample miss listener is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + template + void declare_background_sample_miss_listener(C&& on_miss_detected, D&& on_drop, + zenoh::ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_miss_detected should be callable with the following signature: void on_status_change(const " + "zenoh::ext::Miss& miss)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::ze_owned_closure_miss_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_miss_detected), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_miss_detected_call, + zenoh::detail::closures::_zenoh_on_drop, closure); + + ZResult res = ::ze_advanced_subscriber_declare_background_sample_miss_listener( + zenoh::interop::as_loaned_c_ptr(*this), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare background Sample Miss Listener"); + } + + /// @brief Declares a liveliness token listener for matching publishers detection. Only advanced publishers, + /// enabling publisher detection can be detected. + /// @param on_sample the callable that will be called each time a liveliness token status is changed. + /// @param on_drop the callable that will be called once subscriber is destroyed or undeclared. + /// @param options options to pass to listener declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``zenoh::Subscriber`` object. + template + [[nodiscard]] zenoh::Subscriber detect_publishers( + C&& on_sample, D&& on_drop, + zenoh::Session::LivelinessSubscriberOptions&& options = + zenoh::Session::LivelinessSubscriberOptions::create_default(), + ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + ::z_liveliness_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + zenoh::Subscriber s = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_advanced_subscriber_detect_publishers( + zenoh::interop::as_loaned_c_ptr(*this), zenoh::interop::as_owned_c_ptr(s), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Liveliness Token Subscriber"); + return s; + } + + /// @brief Declares a background liveliness token listener for matching publishers detection. Only advanced + /// publishers, enabling publisher detection can be detected. The subscriber callback will be run in the background + /// until the corresponding session is closed or destroyed. + /// @param on_sample the callable that will be called each time a liveliness token status is changed. + /// @param on_drop the callable that will be called once subscriber is destroyed or undeclared. + /// @param options options to pass to subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + template + void detect_publishers_background(C&& on_sample, D&& on_drop, + zenoh::Session::LivelinessSubscriberOptions&& options = + zenoh::Session::LivelinessSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + ::z_liveliness_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + zenoh::ZResult res = ::ze_advanced_subscriber_detect_publishers_background( + zenoh::interop::as_loaned_c_ptr(*this), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Liveliness Token Subscriber"); + } + + /// @brief Declares a liveliness token listener for matching publishers detection. Only advanced publishers, + /// enabling publisher detection can be detected. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param channel an instance of channel. + /// @param options options to pass to subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``Subscriber`` object. + template + [[nodiscard]] zenoh::Subscriber> detect_publishers( + Channel channel, + zenoh::Session::LivelinessSubscriberOptions&& options = + zenoh::Session::LivelinessSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::z_liveliness_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + zenoh::Subscriber s = zenoh::interop::detail::null>(); + zenoh::ZResult res = + ::z_liveliness_declare_subscriber(zenoh::interop::as_loaned_c_ptr(*this), zenoh::interop::as_owned_c_ptr(s), + ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Liveliness Token Subscriber"); + if (res != Z_OK) ::z_drop(::z_move(*zenoh::interop::as_moved_c_ptr(cb_handler_pair.second))); + return zenoh::Subscriber>( + std::move(s), std::move(cb_handler_pair.second)); + } +}; +} // namespace detail + +template +class AdvancedSubscriber; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh advanced subscriber. +/// +/// In addition to receiving the data it is subscribed to, +/// it also will fetch data from a Queryable at startup and peridodically (using `AdvancedSubscriber::get`). +/// @note Zenoh-c only. +template <> +class AdvancedSubscriber : public detail::AdvancedSubscriberBase { + protected: + using AdvancedSubscriberBase::AdvancedSubscriberBase; + friend struct zenoh::interop::detail::Converter; + + public: + /// @name Methods + + /// @brief Undeclare advanced subscriber. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_advanced_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Advanced Subscriber"); + } +}; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief An owned Zenoh advanced subscriber. +/// +/// In addition to receiving the data it is subscribed to, +/// it is also able to receive notifications regarding missed samples and/or automatically recover them. +/// @note Zenoh-c only. +/// @tparam Handler streaming handler exposing data. If `void`, no handler access is provided and instead data is being +/// processed inside the callback. +template +class AdvancedSubscriber : public detail::AdvancedSubscriberBase { + Handler _handler; + + public: + /// @name Constructors + + /// @brief Construct stream advanced subscriber from callback advanced subscriber and handler. + /// + /// @param s callback advanced subscriber, that should expose data to the handler in its callback. + /// @param handler handler to access data exposed by `s`. Zenoh handlers implement + /// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own + /// interface. + AdvancedSubscriber(AdvancedSubscriber&& s, Handler handler) + : AdvancedSubscriberBase(interop::as_owned_c_ptr(s)), _handler(std::move(handler)) {} + + /// @name Methods + + /// @brief Undeclare advanced subscriber, and return its handler, which can still be used to process any messages + /// received prior to undeclaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + Handler undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_advanced_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Querying Subscriber"); + return std::move(this->_handler); + } + + /// @brief Return the handler to advanced subscriber data stream. + const Handler& handler() const { return _handler; }; +}; + +} // namespace zenoh::ext + +namespace zenoh::interop { +/// @brief Return a pair of pointers to owned zenoh-c representations of advanced subscriber and its callback. +template >> +auto as_owned_c_ptr(zenoh::ext::AdvancedSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to owned zenoh-c representations of advanced subscriber and its callback. +template >> +auto as_owned_c_ptr(const zenoh::ext::AdvancedSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of advanced subscriber and its callback. +template >> +auto as_loaned_c_ptr(zenoh::ext::AdvancedSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of advanced subscriber and its callback. +template >> +auto as_loaned_c_ptr(const zenoh::ext::AdvancedSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of advanced subscriber and its callback. +template >> +auto as_moved_c_ptr(zenoh::ext::AdvancedSubscriber& s) { + return std::make_pair(as_moved_c_ptr(static_cast(s)), + as_moved_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of advanced subscriber and its callback. +/// Will return a pair of null pointers if option is empty. +template >> +auto as_moved_c_ptr(std::optional>& s) -> decltype(as_moved_c_ptr(s.value())) { + if (!s.has_value()) { + return as_moved_c_ptr(s.value()); + } else { + return {}; + } +} + +/// @brief Move advanced subscriber and its handler to a pair containing corresponding zenoh-c structs. +template >> +auto move_to_c_obj(zenoh::ext::AdvancedSubscriber&& s) { + return std::make_pair(move_to_c_obj(std::move(static_cast(s))), + move_to_c_obj(std::move(const_cast(s)))); +} +} // namespace zenoh::interop +#endif diff --git a/include/zenoh/api/ext/miss.hxx b/include/zenoh/api/ext/miss.hxx new file mode 100644 index 0000000..15ca70f --- /dev/null +++ b/include/zenoh/api/ext/miss.hxx @@ -0,0 +1,162 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + +#include "../../detail/closures.hxx" +#include "../base.hxx" +#include "../interop.hxx" +#include "../source_info.hxx" + +namespace zenoh::ext { + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future +/// release. +/// @brief A struct that represents missed samples. +/// @note Zenoh-c only. +struct Miss { + /// @name Fields + + /// The source of missed samples. + zenoh::EntityGlobalId source; + /// The number of missed samples. + uint32_t nb; +}; +} // namespace zenoh::ext + +namespace zenoh::detail::closures { +extern "C" { +inline void _zenoh_on_miss_detected_call(const ::ze_miss_t* miss, void* context) { + IClosure::call_from_context( + context, + zenoh::ext::Miss{zenoh::interop::into_copyable_cpp_obj(miss->source), miss->nb}); +} +} +} // namespace zenoh::detail::closures + +namespace zenoh::ext { +namespace detail { +class SampleMissListenerBase : public Owned<::ze_owned_sample_miss_listener_t> { + protected: + SampleMissListenerBase(zenoh::detail::null_object_t) : Owned(nullptr){}; + SampleMissListenerBase(::ze_owned_sample_miss_listener_t* m) : Owned(m){}; + friend struct zenoh::interop::detail::Converter; +}; +} // namespace detail + +template +class SampleMissListener; + +template <> +class SampleMissListener : public detail::SampleMissListenerBase { + protected: + using SampleMissListenerBase::SampleMissListenerBase; + friend struct zenoh::interop::detail::Converter; + + public: + /// @name Methods + + /// @brief Undeclare sample miss listener. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(zenoh::ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_sample_miss_listener(zenoh::interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Sample Miss Listener"); + } +}; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future +/// release. +/// @brief A Zenoh sample miss listener listener. Missed samples can only be detected from advanced publishers, enabling +/// sample miss detection. +/// +/// A listener that sends notification when the advanced subscriber misses a sample. +/// Dropping the corresponding subscriber, also drops the listener. +/// @note Zenoh-c only. +template +class SampleMissListener : public detail::SampleMissListenerBase { + Handler _handler; + + public: + /// @name Constructors + + /// @brief Construct stream sample miss listener from callback sample miss listener and handler. + /// + /// @param m callback sample miss listener, that should expose data to the handler in its callback. + /// @param handler handler to access data exposed by `m`. Zenoh handlers implement + /// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own + /// interface. + SampleMissListener(SampleMissListener&& m, Handler handler) + : SampleMissListenerBase(zenoh::interop::as_owned_c_ptr(m)), _handler(std::move(handler)) {} + + /// @name Methods + + /// @brief Undeclare sample miss listener, and return its handler, which can still be used to process any messages + /// received prior to undeclaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + Handler undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_sample_miss_listener(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Sample Miss Listener"); + return std::move(this->_handler); + } + + /// @brief Return the handler to sample miss listener data stream. + const Handler& handler() const { return _handler; }; + friend class Session; +}; +} // namespace zenoh::ext + +namespace zenoh::interop { +/// @brief Return a pair of pointers to owned zenoh-c representations of sample miss listener and its callback. +template >> +auto as_owned_c_ptr(zenoh::ext::SampleMissListener& m) { + return std::make_pair(as_owned_c_ptr(static_cast(m)), + as_owned_c_ptr(const_cast(m.handler()))); +} + +/// @brief Return a pair of pointers to owned zenoh-c representations of sample miss listener and its callback. +template >> +auto as_owned_c_ptr(const zenoh::ext::SampleMissListener& m) { + return std::make_pair(as_owned_c_ptr(static_cast(m)), + as_owned_c_ptr(m.handler())); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of sample miss listener and its callback. +template >> +auto as_moved_c_ptr(zenoh::ext::SampleMissListener& m) { + return std::make_pair(as_moved_c_ptr(static_cast(m)), + as_moved_c_ptr(const_cast(m.handler()))); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of sample miss listener and its callback. +/// Will return a pair of null pointers if option is empty. +template >> +auto as_moved_c_ptr(std::optional>& m) -> decltype(as_moved_c_ptr(m.value())) { + if (!m.has_value()) { + return as_moved_c_ptr(m.value()); + } else { + return {}; + } +} + +/// @brief Move sample miss listener and its handler to a pair containing corresponding zenoh-c structs. +template >> +auto move_to_c_obj(zenoh::ext::SampleMissListener&& m) { + return std::make_pair(move_to_c_obj(std::move(static_cast(m))), + move_to_c_obj(std::move(const_cast(m)))); +} +} // namespace zenoh::interop +#endif \ No newline at end of file diff --git a/include/zenoh/api/ext/publication_cache.hxx b/include/zenoh/api/ext/publication_cache.hxx index c17ec66..5064ec4 100644 --- a/include/zenoh/api/ext/publication_cache.hxx +++ b/include/zenoh/api/ext/publication_cache.hxx @@ -19,11 +19,10 @@ #include "../interop.hxx" #include "../keyexpr.hxx" #include "../sample.hxx" -#include "session_ext.hxx" namespace zenoh::ext { -/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @warning This API is deprecated. Please use zenoh::ext::AdvancedPublisher. /// @brief A Zenoh publication cache. /// /// Used to store publications on intersecting key expressions. Can be queried later via `zenoh::Session::get` to @@ -50,40 +49,5 @@ class PublicationCache : public Owned<::ze_owned_publication_cache_t> { } }; -[[nodiscard]] inline PublicationCache SessionExt::declare_publication_cache(const KeyExpr& key_expr, - PublicationCacheOptions&& options, - ZResult* err) const { - ::ze_publication_cache_options_t opts; - ze_publication_cache_options_default(&opts); - opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.queryable_origin = options.queryable_origin; -#endif - opts.queryable_complete = options.queryable_complete; - opts.history = options.history; - opts.resources_limit = options.resources_limit; - ext::PublicationCache p = interop::detail::null(); - ZResult res = ::ze_declare_publication_cache(interop::as_loaned_c_ptr(this->_session), interop::as_owned_c_ptr(p), - interop::as_loaned_c_ptr(key_expr), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Publication Cache"); - return p; -} - -inline void SessionExt::declare_background_publication_cache(const KeyExpr& key_expr, PublicationCacheOptions&& options, - ZResult* err) const { - ::ze_publication_cache_options_t opts; - ze_publication_cache_options_default(&opts); - opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.queryable_origin = options.queryable_origin; -#endif - opts.queryable_complete = options.queryable_complete; - opts.history = options.history; - opts.resources_limit = options.resources_limit; - ZResult res = ::ze_declare_background_publication_cache(interop::as_loaned_c_ptr(this->_session), - interop::as_loaned_c_ptr(key_expr), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Publication Cache"); -} - } // namespace zenoh::ext #endif diff --git a/include/zenoh/api/ext/querying_subscriber.hxx b/include/zenoh/api/ext/querying_subscriber.hxx index 387a9e3..fc5bcf6 100644 --- a/include/zenoh/api/ext/querying_subscriber.hxx +++ b/include/zenoh/api/ext/querying_subscriber.hxx @@ -19,7 +19,6 @@ #include "../interop.hxx" #include "../keyexpr.hxx" #include "../sample.hxx" -#include "session_ext.hxx" namespace zenoh::ext { @@ -45,18 +44,18 @@ class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { z_get_options_default(&opts); opts.target = options.target; opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation); - opts.payload = ::zenoh::interop::as_moved_c_ptr(options.payload); - opts.encoding = ::zenoh::interop::as_moved_c_ptr(options.encoding); + opts.payload = zenoh::interop::as_moved_c_ptr(options.payload); + opts.encoding = zenoh::interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - opts.source_info = ::zenoh::interop::as_moved_c_ptr(options.source_info); + opts.source_info = zenoh::interop::as_moved_c_ptr(options.source_info); opts.accept_replies = options.accept_replies; opts.allowed_destination = options.allowed_destination; #endif - opts.attachment = ::zenoh::interop::as_moved_c_ptr(options.attachment); + opts.attachment = zenoh::interop::as_moved_c_ptr(options.attachment); opts.timeout_ms = options.timeout_ms; ZResult res = ::ze_querying_subscriber_get(interop::as_loaned_c_ptr(*this), - ::zenoh::interop::as_loaned_c_ptr(key_expr), &opts); + zenoh::interop::as_loaned_c_ptr(key_expr), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to perform get operation"); } @@ -68,7 +67,7 @@ class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { template class QueryingSubscriber; -/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @warning This API is deprecated. Please use zenoh::ext::AdvancedSubscriber. /// @brief A Zenoh querying subscriber. /// /// In addition to receiving the data it is subscribed to, @@ -79,12 +78,12 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { protected: using QueryingSubscriberBase::QueryingSubscriberBase; friend class Session; - friend struct ::zenoh::interop::detail::Converter; + friend struct zenoh::interop::detail::Converter; public: /// @name Methods - /// @brief Undeclare publication cache. + /// @brief Undeclare querying subscriber. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. void undeclare(ZResult* err = nullptr) && { @@ -93,7 +92,7 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { } }; -/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @warning This API is deprecated. Please use zenoh::ext::AdvancedSubscriber. /// @brief A Zenoh querying subscriber declared by ``SessionExt::declare_querying_subscriber``. /// /// In addition to receiving the data it is subscribed to, @@ -134,96 +133,6 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { friend class Session; }; -template -[[nodiscard]] QueryingSubscriber SessionExt::declare_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, - D&& on_drop, - QueryingSubscriberOptions&& options, - ZResult* err) const { - static_assert(std::is_invocable_r::value, - "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); - static_assert(std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()"); - ::z_owned_closure_sample_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = typename ::zenoh::detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, ::zenoh::detail::closures::_zenoh_on_sample_call, ::zenoh::detail::closures::_zenoh_on_drop, - closure); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = ::zenoh::interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - opts.query_timeout_ms = options.query_timeout_ms; - ext::QueryingSubscriber qs = ::zenoh::interop::detail::null>(); - ZResult res = - ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(this->_session), ::zenoh::interop::as_owned_c_ptr(qs), - ::zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); - return qs; -} - -template -void SessionExt::declare_background_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, D&& on_drop, - QueryingSubscriberOptions&& options, ZResult* err) const { - static_assert(std::is_invocable_r::value, - "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); - static_assert(std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()"); - ::z_owned_closure_sample_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = typename ::zenoh::detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, ::zenoh::detail::closures::_zenoh_on_sample_call, ::zenoh::detail::closures::_zenoh_on_drop, - closure); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = ::zenoh::interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - ; - opts.query_timeout_ms = options.query_timeout_ms; - ZResult res = ::ze_declare_background_querying_subscriber(::zenoh::interop::as_loaned_c_ptr(this->_session), - ::zenoh::interop::as_loaned_c_ptr(key_expr), - ::z_move(c_closure), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); -} - -template -[[nodiscard]] QueryingSubscriber> -SessionExt::declare_querying_subscriber(const KeyExpr& key_expr, Channel channel, QueryingSubscriberOptions&& options, - ZResult* err) const { - auto cb_handler_pair = channel.template into_cb_handler_pair(); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = ::zenoh::interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - opts.query_timeout_ms = options.query_timeout_ms; - ext::QueryingSubscriber qs = ::zenoh::interop::detail::null>(); - ZResult res = ::ze_declare_querying_subscriber( - interop::as_loaned_c_ptr(this->_session), ::zenoh::interop::as_owned_c_ptr(qs), - ::zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); - if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); - return ext::QueryingSubscriber>(std::move(qs), - std::move(cb_handler_pair.second)); -} - } // namespace zenoh::ext namespace zenoh::interop { @@ -243,6 +152,7 @@ auto as_owned_c_ptr(const zenoh::ext::QueryingSubscriber& s) { /// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback. template >> +[[deprecated]] auto as_loaned_c_ptr(zenoh::ext::QueryingSubscriber& s) { return std::make_pair(as_loaned_c_ptr(static_cast(s)), as_loaned_c_ptr(const_cast(s.handler()))); diff --git a/include/zenoh/api/ext/session_ext.hxx b/include/zenoh/api/ext/session_ext.hxx index 7480e16..f55592c 100644 --- a/include/zenoh/api/ext/session_ext.hxx +++ b/include/zenoh/api/ext/session_ext.hxx @@ -19,32 +19,33 @@ #include "../base.hxx" #include "../enums.hxx" #include "../keyexpr.hxx" +#include "../matching.hxx" #include "../query_consolidation.hxx" #include "../session.hxx" +#include "advanced_publisher.hxx" +#include "advanced_subscriber.hxx" +#include "publication_cache.hxx" +#include "querying_subscriber.hxx" namespace zenoh::ext { -template -class QueryingSubscriber; -class PublicationCache; /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. /// @brief Zenoh Session interface extension. /// @note Zenoh-c only. class SessionExt { - const ::zenoh::Session& _session; + const zenoh::Session& _session; public: /// @name Constructors - /// @brief Get extension interface for `::zenoh::Session`. Its lifetime is bound that of the session. + /// @brief Get extension interface for `zenoh::Session`. Its lifetime is bound that of the session. /// @param session SessionExt(const zenoh::Session& session) : _session(session){}; /// @name Methods - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + /// @warning This API is deprecated. Please use zenoh::ext::AdvancedPublisher. /// @brief Options passed to the ``SessionExt::declare_publication_cache``. struct PublicationCacheOptions { /// The prefix used for queryable. @@ -64,35 +65,62 @@ class SessionExt { /// @brief Create default option settings. static PublicationCacheOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_publication_cache_options_t to_c_opts() { + ::ze_publication_cache_options_t opts; + ze_publication_cache_options_default(&opts); + opts.queryable_prefix = zenoh::interop::as_loaned_c_ptr(this->queryable_prefix); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.queryable_origin = this->queryable_origin; +#endif + opts.queryable_complete = this->queryable_complete; + opts.history = this->history; + opts.resources_limit = this->resources_limit; + return opts; + } }; - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + /// @warning This API is deprecated. Please use zenoh::ext::SessionExt::declare_advanced_publisher. /// @brief Construct and declare a publication cache. /// @param key_expr: The key expression to publish to. /// @param options: Additional options for the publication cache. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. /// @return declared ``PublicationCache`` instance. - [[nodiscard]] PublicationCache declare_publication_cache( + [[deprecated("Use declare_advanced_publisher instead.")]] [[nodiscard]] PublicationCache declare_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) const; + zenoh::ZResult* err = nullptr) const { + ::ze_publication_cache_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + PublicationCache p = zenoh::interop::detail::null(); + zenoh::ZResult res = ::ze_declare_publication_cache(zenoh::interop::as_loaned_c_ptr(this->_session), + zenoh::interop::as_owned_c_ptr(p), + zenoh::interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Publication Cache"); + return p; + } + + /// @warning This API is deprecated. Please use zenoh::ext::SessionExt::declare_advanced_publisher. /// @brief Declare a background publication cache. It will function in background until the corresponding session /// is closed or destoryed. /// @param key_expr the key expression to publish to. /// @param options additional options for the publication cache. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. + [[deprecated]] void declare_background_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) const; + zenoh::ZResult* err = nullptr) const { + ::ze_publication_cache_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + zenoh::ZResult res = ::ze_declare_background_publication_cache( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Publication Cache"); + } - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief Options passed to the ``Session::declare_querying_subscriber``. + /// @warning This API is deprecated. Please use zenoh::ext::AdvancedSubscriber. + /// @brief Options passed to the ``SessionExt::declare_querying_subscriber``. struct QueryingSubscriberOptions { /// @name Fields @@ -100,14 +128,15 @@ class SessionExt { std::optional query_keyexpr = {}; #if defined(Z_FEATURE_UNSTABLE_API) /// The restriction for the matching publications that will be received by this publication cache. - Locality allowed_origin = ::zc_locality_default(); + zenoh::Locality allowed_origin = ::zc_locality_default(); /// The accepted replies for queries. - ReplyKeyExpr query_accept_replies = ::zc_reply_keyexpr_default(); + zenoh::ReplyKeyExpr query_accept_replies = ::zc_reply_keyexpr_default(); #endif /// @brief The target to be used for queries. - QueryTarget query_target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + zenoh::QueryTarget query_target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; /// @brief The consolidation mode to be used for queries. - QueryConsolidation query_consolidation = QueryConsolidation(ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + zenoh::QueryConsolidation query_consolidation = + QueryConsolidation(ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); /// @brief The timeout to be used for queries. uint64_t query_timeout_ms = 0; @@ -115,10 +144,25 @@ class SessionExt { /// @brief Create default option settings. static QueryingSubscriberOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_querying_subscriber_options_t to_c_opts() { + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = zenoh::interop::as_loaned_c_ptr(this->query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = this->allowed_origin; + opts.query_accept_replies = this->query_accept_replies; +#endif + opts.query_target = this->query_target; + opts.query_consolidation = *zenoh::interop::as_copyable_c_ptr(this->query_consolidation); + opts.query_timeout_ms = this->query_timeout_ms; + return opts; + }; }; - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + /// @warning This API is deprecated. Please use zenoh::ext::SessionExt::declare_advanced_subscriber. /// @brief Construct and declare a querying subscriber. /// @param key_expr the key expression to subscribe to. /// @param on_sample the callback that will be called for each received sample. @@ -128,13 +172,32 @@ class SessionExt { /// thrown in case of error. /// @return declared ``QueryingSubscriber`` instance. template - [[nodiscard]] QueryingSubscriber declare_querying_subscriber( - const KeyExpr& key_expr, C&& on_sample, D&& on_drop, - QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const; + [[nodiscard]] [[deprecated("Use declare_advanced_subscriber instead.")]] QueryingSubscriber + declare_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::ze_querying_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + QueryingSubscriber qs = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_declare_querying_subscriber( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_owned_c_ptr(qs), + zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + return qs; + } - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + /// @warning This API is deprecated. Please use zenoh::ext::SessionExt::declare_background_advanced_subscriber. /// @brief Declare a background querying subscriber for a given key expression. Subscriber callback will be called /// to process the messages, until the corresponding session is closed or dropped. /// @param key_expr the key expression to subscribe to. @@ -144,13 +207,31 @@ class SessionExt { /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. template + [[deprecated("Use declare_background_advanced_subscriber instead.")]] void declare_background_querying_subscriber( const KeyExpr& key_expr, C&& on_sample, D&& on_drop, QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const; + zenoh::ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + ::ze_querying_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + zenoh::ZResult res = ::ze_declare_background_querying_subscriber( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_loaned_c_ptr(key_expr), + ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + } - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. + /// @warning This API is deprecated. Please use zenoh::ext::SessionExt::declare_advanced_subscriber. /// @brief Construct and declare a querying subscriber. /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or /// ``zenoh::channels::RingChannel``). @@ -161,12 +242,293 @@ class SessionExt { /// thrown in case of error. /// @return a ``QueryingSubscriber`` object. template - [[nodiscard]] QueryingSubscriber> declare_querying_subscriber( + [[deprecated("Use declare_advanced_subscriber instead.")]] [[nodiscard]] QueryingSubscriber< + typename Channel::template HandlerType> + declare_querying_subscriber(const KeyExpr& key_expr, Channel channel, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::ze_querying_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + QueryingSubscriber qs = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_declare_querying_subscriber( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_owned_c_ptr(qs), + zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); + if (res != Z_OK) ::z_drop(zenoh::interop::as_moved_c_ptr(cb_handler_pair.second)); + return QueryingSubscriber>( + std::move(qs), std::move(cb_handler_pair.second)); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// Options passed to the `SessionExt::declare_advanced_publisher()` function. + struct AdvancedPublisherOptions { + /// @name Fields + + /// @brief Setting for advanced publisher's cache. The cache allows advanced subscribers to recover history + /// and/or lost samples. + struct CacheOptions { + /// @name Fields + + /// Number of samples to keep for each resource. + size_t max_samples = 1; + /// The congestion control to apply to replies. + zenoh::CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; + /// The priority of replies. + zenoh::Priority priority = ::z_priority_default(); + /// If set to ``true``, this cache replies will not be batched. This usually has a positive impact on + /// latency but negative impact on throughput. + bool is_express = false; + + /// @name Methods + + /// @brief Create default option settings. + static CacheOptions create_default() { return {}; } + }; + + /// Base publisher options. + zenoh::Session::PublisherOptions publisher_options = {}; + /// Optional settings for publisher cache. + std::optional cache; + /// Allow matching Subscribers to detect lost samples and optionally ask for retransimission. + /// + /// Retransmission can only be done if history is enabled on subscriber side. + bool sample_miss_detection = false; + /// Allow this publisher to be detected through liveliness. + bool publisher_detection = false; + /// An optional key expression to be added to the liveliness token key expression. + /// It can be used to convey meta data. + std::optional publisher_detection_metadata = {}; + + /// @name Methods + + /// @brief Create default option settings. + static AdvancedPublisherOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_advanced_publisher_options_t to_c_opts() { + ::ze_advanced_publisher_options_t opts; + ::ze_advanced_publisher_options_default(&opts); + opts.publisher_options = zenoh::interop::detail::Converter::to_c_opts(this->publisher_options); + if (this->cache.has_value()) { + opts.cache.is_enabled = true; + opts.cache.max_samples = this->cache->max_samples; + opts.cache.congestion_control = this->cache->congestion_control; + opts.cache.priority = this->cache->priority; + opts.cache.is_express = this->cache->is_express; + } + opts.publisher_detection = this->publisher_detection; + opts.sample_miss_detection = this->sample_miss_detection; + opts.publisher_detection_metadata = zenoh::interop::as_loaned_c_ptr(this->publisher_detection_metadata); + return opts; + } + }; + + /// @brief Create an ``AdvancedPublisher`` object to publish data to matching ``zenoh::Subscriber``or + /// ``AdvancedSubscriber`` objects. + /// @param key_expr the key expression to match the subscribers. + /// @param options options passed to advanced publisher declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return an ``AdvancedPublisher`` object. + AdvancedPublisher declare_advanced_publisher( + const KeyExpr& key_expr, AdvancedPublisherOptions&& options = AdvancedPublisherOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + ::ze_advanced_publisher_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + AdvancedPublisher p = zenoh::interop::detail::null(); + zenoh::ZResult res = ::ze_declare_advanced_publisher(zenoh::interop::as_loaned_c_ptr(this->_session), + zenoh::interop::as_owned_c_ptr(p), + zenoh::interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Advanced Publisher"); + return p; + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Options passed to the ``SessionExt::declare_advanced_subscriber``. + struct AdvancedSubscriberOptions { + /// @name Fields + + /// @brief Settings for retrievieng historical data. History can only be retransmitted by Publishers that enable + /// caching. + struct HistoryOptions { + /// @name Fields + + /// Enable detection of late joiner publishers and query for their historical data. + /// Late joiner detection can only be achieved for Publishers that enable publisher_detection. + bool detect_late_publishers = false; + /// Number of samples to query for each resource. ``0`` corresponds to no limit on number of samples. + size_t max_samples = 0; + /// Maximum age of samples to query. ``0`` corresponds to no limit on samples' age. + uint64_t max_age_ms = 0; + + /// @name Methods + + /// @brief Create default option settings. + static HistoryOptions create_default() { return {}; }; + }; + + /// @brief Settings for recovering lost messages for Advanced Subscriber. + struct RecoveryOptions { + /// @name Fields + + /// @brief Period for queries for not yet received Samples. + /// + /// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost. + /// So it is useful for sporadic publications but useless for periodic publications + /// with a period smaller or equal to this period. + /// Retransmission can only be achieved by Publishers that also activate retransmission. + /// ``0`` corresponds to default query period value. + uint64_t periodic_queries_period_ms = 0; + + /// @name Methods + + /// @brief Create default option settings. + static RecoveryOptions create_default() { return {}; }; + }; + + /// Base subscriber options. + zenoh::Session::SubscriberOptions subscriber_options; + /// Optional settings for querying historical data. History can only be retransmitted by Publishers that enable + /// caching. + std::optional history = {}; + /// Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be + /// done by Publishers that enable caching and sample_miss_detection. + std::optional recovery = {}; + /// Timeout to be used for history and recovery queries. + /// Default value will be used if set to ``0``. + uint64_t query_timeout_ms = 0; + /// Allow this subscriber to be detected through liveliness. + bool subscriber_detection = false; + /// An optional key expression to be added to the liveliness token key expression. + /// It can be used to convey meta data. + std::optional subscriber_detection_metadata = {}; + + /// @name Methods + + /// @brief Create default option settings. + static AdvancedSubscriberOptions create_default() { return {}; } + + private: + friend struct zenoh::interop::detail::Converter; + ::ze_advanced_subscriber_options_t to_c_opts() { + ::ze_advanced_subscriber_options_t opts; + ::ze_advanced_subscriber_options_default(&opts); + opts.subscriber_options = zenoh::interop::detail::Converter::to_c_opts(this->subscriber_options); + if (this->history.has_value()) { + opts.history.is_enabled = true; + opts.history.detect_late_publishers = this->history->detect_late_publishers; + opts.history.max_samples = this->history->max_samples; + opts.history.max_age_ms = this->history->max_age_ms; + } + if (this->recovery.has_value()) { + opts.recovery.is_enabled = true; + opts.recovery.periodic_queries_period_ms = this->recovery->periodic_queries_period_ms; + } + opts.query_timeout_ms = this->query_timeout_ms; + opts.subscriber_detection = this->subscriber_detection; + opts.subscriber_detection_metadata = zenoh::interop::as_loaned_c_ptr(this->subscriber_detection_metadata); + return opts; + } + }; + + /// @brief Create an ``AdvancedSubscriber`` object to receive data from matching ``Publisher`` objects or from + /// ``Session::put`` and ``Session::delete_resource`` requests. + /// @param key_expr the key expression to match the publishers. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options options to pass to subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``Subscriber`` object. + template + [[nodiscard]] AdvancedSubscriber declare_advanced_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + AdvancedSubscriberOptions&& options = AdvancedSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + ::ze_advanced_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + AdvancedSubscriber s = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_declare_advanced_subscriber( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_owned_c_ptr(s), + zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Advanced Subscriber"); + return s; + } + + /// @brief Declare a background advanced subscriber. It will receive data from matching ``Publisher`` objects or + /// from + /// ``Session::put`` and ``Session::delete_resource`` requests, until the corresponding session is closed or + /// destroyed. + /// @param key_expr the key expression to match the publishers. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options options to pass to subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + template + void declare_background_advanced_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + AdvancedSubscriberOptions&& options = AdvancedSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename zenoh::detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, zenoh::detail::closures::_zenoh_on_sample_call, zenoh::detail::closures::_zenoh_on_drop, + closure); + ::ze_advanced_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + ZResult res = ::ze_declare_background_advanced_subscriber(zenoh::interop::as_loaned_c_ptr(this->_session), + zenoh::interop::as_loaned_c_ptr(key_expr), + ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Advanced Subscriber"); + } + + /// @brief Create an ``AdvancedSubscriber`` object to receive data from matching ``Publisher`` objects or from. + /// ``Session::put`` and ``Session::delete_resource`` requests. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param key_expr the key expression to match the publishers. + /// @param channel an instance of channel. + /// @param options options to pass to subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``Subscriber`` object. + template + [[nodiscard]] AdvancedSubscriber> declare_advanced_subscriber( const KeyExpr& key_expr, Channel channel, - QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const; + AdvancedSubscriberOptions&& options = AdvancedSubscriberOptions::create_default(), + zenoh::ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::ze_advanced_subscriber_options_t opts = zenoh::interop::detail::Converter::to_c_opts(options); + AdvancedSubscriber s = zenoh::interop::detail::null>(); + zenoh::ZResult res = ::ze_declare_advanced_subscriber( + zenoh::interop::as_loaned_c_ptr(this->_session), zenoh::interop::as_owned_c_ptr(s), + zenoh::interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Advanced Subscriber"); + if (res != Z_OK) ::z_drop(zenoh::interop::as_moved_c_ptr(cb_handler_pair.second)); + return Subscriber>(std::move(s), + std::move(cb_handler_pair.second)); + } }; - } // namespace zenoh::ext #endif \ No newline at end of file diff --git a/include/zenoh/api/interop.hxx b/include/zenoh/api/interop.hxx index 43b621d..811fb43 100644 --- a/include/zenoh/api/interop.hxx +++ b/include/zenoh/api/interop.hxx @@ -214,6 +214,11 @@ struct Converter { static T copyable_to_cpp(const Inner& i) { return T(i); } + + template + static auto to_c_opts(OPTIONS& options) { + return options.to_c_opts(); + } }; template diff --git a/include/zenoh/api/publisher.hxx b/include/zenoh/api/publisher.hxx index a7377f6..b96ff4e 100644 --- a/include/zenoh/api/publisher.hxx +++ b/include/zenoh/api/publisher.hxx @@ -60,6 +60,20 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// @brief Create default option settings. static PutOptions create_default() { return {}; } + + private: + friend struct interop::detail::Converter; + ::z_publisher_put_options_t to_c_opts() { + ::z_publisher_put_options_t opts; + z_publisher_put_options_default(&opts); + opts.encoding = interop::as_moved_c_ptr(this->encoding); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.source_info = interop::as_moved_c_ptr(this->source_info); +#endif + opts.attachment = interop::as_moved_c_ptr(this->attachment); + opts.timestamp = interop::as_copyable_c_ptr(this->timestamp); + return opts; + } }; /// @brief Options to be passed to ``Publisher::delete_resource`` operation. @@ -73,6 +87,15 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// @brief Create default option settings. static DeleteOptions create_default() { return {}; } + + private: + friend struct interop::detail::Converter; + ::z_publisher_delete_options_t to_c_opts() { + ::z_publisher_delete_options_t opts; + z_publisher_delete_options_default(&opts); + opts.timestamp = interop::as_copyable_c_ptr(this->timestamp); + return opts; + } }; /// @name Methods @@ -84,15 +107,7 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// thrown in case of error. void put(Bytes&& payload, PutOptions&& options = PutOptions::create_default(), ZResult* err = nullptr) const { auto payload_ptr = interop::as_moved_c_ptr(payload); - ::z_publisher_put_options_t opts; - z_publisher_put_options_default(&opts); - opts.encoding = interop::as_moved_c_ptr(options.encoding); -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - opts.source_info = interop::as_moved_c_ptr(options.source_info); -#endif - opts.attachment = interop::as_moved_c_ptr(options.attachment); - opts.timestamp = interop::as_copyable_c_ptr(options.timestamp); - + ::z_publisher_put_options_t opts = interop::detail::Converter::to_c_opts(options); __ZENOH_RESULT_CHECK(::z_publisher_put(interop::as_loaned_c_ptr(*this), payload_ptr, &opts), err, "Failed to perform put operation"); } @@ -102,9 +117,7 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. void delete_resource(DeleteOptions&& options = DeleteOptions::create_default(), ZResult* err = nullptr) const { - ::z_publisher_delete_options_t opts; - z_publisher_delete_options_default(&opts); - opts.timestamp = interop::as_copyable_c_ptr(options.timestamp); + ::z_publisher_delete_options_t opts = interop::detail::Converter::to_c_opts(options); __ZENOH_RESULT_CHECK(::z_publisher_delete(interop::as_loaned_c_ptr(*this), &opts), err, "Failed to perform delete_resource operation"); } diff --git a/include/zenoh/api/query.hxx b/include/zenoh/api/query.hxx index 3b29370..dd4fd34 100644 --- a/include/zenoh/api/query.hxx +++ b/include/zenoh/api/query.hxx @@ -233,7 +233,7 @@ class Query : public Owned<::z_owned_query_t> { "Failed to send reply del"); } - /// @brief Construct a a shallow copy of this Query. + /// @brief Construct a shallow copy of this Query. /// This can be used, to send query replies outside of ``Queryable`` callback. /// /// The query responses will be sent only when the last clone is destroyed. diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index fe7b6f8..f22c26b 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -405,12 +405,28 @@ class Session : public Owned<::z_owned_session_t> { #if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_SUBSCRIPTION == 1 /// @brief Options to be passed when declaring a ``Subscriber``. struct SubscriberOptions { - /// @name Fields +/// @name Fields +/// Restrict the matching publications that will be received by this Subscribers to the ones +/// that have the compatible allowed_destination. +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + Locality allowed_origin = ::zc_locality_default(); +#endif /// @name Methods /// @brief Create default option settings. static SubscriberOptions create_default() { return {}; } + + private: + friend struct interop::detail::Converter; + ::z_subscriber_options_t to_c_opts() { + ::z_subscriber_options_t opts; + z_subscriber_options_default(&opts); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = this->allowed_origin; +#endif + return opts; + } }; /// @brief Create a ``Subscriber`` object to receive data from matching ``Publisher`` objects or from @@ -437,10 +453,8 @@ class Session : public Owned<::z_owned_session_t> { using ClosureType = typename detail::closures::Closure; auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::z_subscriber_options_t opts; - z_subscriber_options_default(&opts); - (void)options; - Subscriber s(zenoh::detail::null_object); + ::z_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); + Subscriber s = interop::detail::null>(); ZResult res = ::z_declare_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(s), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Subscriber"); @@ -471,9 +485,7 @@ class Session : public Owned<::z_owned_session_t> { using ClosureType = typename detail::closures::Closure; auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::z_subscriber_options_t opts; - z_subscriber_options_default(&opts); - (void)options; + ::z_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); ZResult res = ::z_declare_background_subscriber(interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Subscriber"); @@ -494,10 +506,8 @@ class Session : public Owned<::z_owned_session_t> { const KeyExpr& key_expr, Channel channel, SubscriberOptions&& options = SubscriberOptions::create_default(), ZResult* err = nullptr) const { auto cb_handler_pair = channel.template into_cb_handler_pair(); - ::z_subscriber_options_t opts; - z_subscriber_options_default(&opts); - (void)options; - Subscriber s(zenoh::detail::null_object); + ::z_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); + Subscriber s = interop::detail::null>(); ZResult res = ::z_declare_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(s), interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); @@ -654,6 +664,24 @@ class Session : public Owned<::z_owned_session_t> { /// @brief Create default option settings. static PublisherOptions create_default() { return {}; } + + private: + friend struct interop::detail::Converter; + ::z_publisher_options_t to_c_opts() { + ::z_publisher_options_t opts; + z_publisher_options_default(&opts); + opts.congestion_control = this->congestion_control; + opts.priority = this->priority; + opts.is_express = this->is_express; +#if defined(Z_FEATURE_UNSTABLE_API) + opts.reliability = this->reliability; +#endif +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_destination = this->allowed_destination; +#endif + opts.encoding = interop::as_moved_c_ptr(this->encoding); + return opts; + } }; /// @brief Create a ``Publisher`` object to publish data to matching ``Subscriber`` objects. @@ -665,20 +693,8 @@ class Session : public Owned<::z_owned_session_t> { Publisher declare_publisher(const KeyExpr& key_expr, PublisherOptions&& options = PublisherOptions::create_default(), ZResult* err = nullptr) const { - ::z_publisher_options_t opts; - z_publisher_options_default(&opts); - opts.congestion_control = options.congestion_control; - opts.priority = options.priority; - opts.is_express = options.is_express; -#if defined(Z_FEATURE_UNSTABLE_API) - opts.reliability = options.reliability; -#endif -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_destination = options.allowed_destination; -#endif - opts.encoding = interop::as_moved_c_ptr(options.encoding); - Publisher p = interop::detail::null(); + ::z_publisher_options_t opts = interop::detail::Converter::to_c_opts(options); ZResult res = ::z_declare_publisher(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(p), interop::as_loaned_c_ptr(key_expr), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Publisher"); @@ -864,6 +880,9 @@ class Session : public Owned<::z_owned_session_t> { uint8_t _dummy = 0; public: + /// @name Methods + + /// @brief Create default option settings. static LivelinessDeclarationOptions create_default() { return {}; } }; @@ -894,9 +913,25 @@ class Session : public Owned<::z_owned_session_t> { /// @brief Options to pass to ``Session::liveliness_declare_subscriber``. struct LivelinessSubscriberOptions { public: + /// @name Fields + + /// If true, subscriber will receive the state change notifications for liveliness tokens that were declared + /// before its declaration. bool history = false; + /// @name Methods + + /// @brief Create default option settings. static LivelinessSubscriberOptions create_default() { return {}; } + + private: + friend struct interop::detail::Converter; + ::z_liveliness_subscriber_options_t to_c_opts() { + ::z_liveliness_subscriber_options_t opts; + ::z_liveliness_subscriber_options_default(&opts); + opts.history = this->history; + return opts; + } }; /// @brief Declares a subscriber on liveliness tokens that intersect `key_expr`. @@ -923,10 +958,8 @@ class Session : public Owned<::z_owned_session_t> { using ClosureType = typename detail::closures::Closure; auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::z_liveliness_subscriber_options_t opts; - z_liveliness_subscriber_options_default(&opts); - opts.history = options.history; - Subscriber s(zenoh::detail::null_object); + ::z_liveliness_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); + Subscriber s = interop::detail::null>(); ZResult res = ::z_liveliness_declare_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(s), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Liveliness Token Subscriber"); @@ -962,9 +995,7 @@ class Session : public Owned<::z_owned_session_t> { using ClosureType = typename detail::closures::Closure; auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::z_liveliness_subscriber_options_t opts; - z_liveliness_subscriber_options_default(&opts); - opts.history = options.history; + ::z_liveliness_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); ZResult res = ::zc_liveliness_declare_background_subscriber( interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Liveliness Token Subscriber"); @@ -987,10 +1018,8 @@ class Session : public Owned<::z_owned_session_t> { LivelinessSubscriberOptions&& options = LivelinessSubscriberOptions::create_default(), ZResult* err = nullptr) const { auto cb_handler_pair = channel.template into_cb_handler_pair(); - ::z_liveliness_subscriber_options_t opts; - z_liveliness_subscriber_options_default(&opts); - opts.history = options.history; - Subscriber s(zenoh::detail::null_object); + ::z_liveliness_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); + Subscriber s = interop::detail::null>(); ZResult res = ::z_liveliness_declare_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(s), interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); @@ -1101,10 +1130,11 @@ class Session : public Owned<::z_owned_session_t> { /// @brief Get access to extension functionality. /// @tparam Ext Session interface extension. /// @return Session interface extension providing access to non-core Zenoh functionality. - template + template Ext ext() const { return Ext(*this); } #endif }; + } // namespace zenoh diff --git a/include/zenoh/api/subscriber.hxx b/include/zenoh/api/subscriber.hxx index b7ce438..baf7eca 100644 --- a/include/zenoh/api/subscriber.hxx +++ b/include/zenoh/api/subscriber.hxx @@ -35,7 +35,6 @@ class SubscriberBase : public Owned<::z_owned_subscriber_t> { const KeyExpr& get_keyexpr() const { return interop::as_owned_cpp_ref(::z_subscriber_keyexpr(interop::as_loaned_c_ptr(*this))); } - friend class zenoh::Session; }; } // namespace detail @@ -46,7 +45,7 @@ template <> class Subscriber : public detail::SubscriberBase { protected: using SubscriberBase::SubscriberBase; - friend class Session; + friend struct interop::detail::Converter; public: /// @name Methods diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 80c52c3..f048084 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -88,13 +88,18 @@ function(set_strict_warnings file mode) # Enable all warnings and treat warnings as errors # to check that library can be built in maximally strict mode if(MSVC) - target_compile_options(${target} PRIVATE /W4 /WX) + target_compile_options(${target} PRIVATE + /W4 # show all warning + /WX # consider all warning as errors + #/wd4996 # allow deprecated + ) else() target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic - -Wold-style-cast + -Wold-style-cast + #-Wno-deprecated-declarations -Werror) endif() endfunction()