diff --git a/examples/examples/z_querier_liveliness.rs b/examples/examples/z_querier_liveliness.rs new file mode 100644 index 000000000..c892b052b --- /dev/null +++ b/examples/examples/z_querier_liveliness.rs @@ -0,0 +1,75 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::time::Duration; + +use clap::Parser; +use tokio::time::sleep; +use zenoh::{key_expr::KeyExpr, Config}; +use zenoh_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr, timeout) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring Liveliness Querier on '{}'...", &key_expr); + let querier = session + .liveliness() + .declare_querier(key_expr) + .timeout(timeout) + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + loop { + println!("Sending Liveliness Query '{}'...", querier.key_expr()); + let replies = querier.get().await.unwrap(); + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr().as_str(),), + Err(err) => { + let payload = err + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + println!(">> Received (ERROR: '{}')", payload); + } + } + } + sleep(Duration::from_secs(1)).await; + } +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "group1/**")] + /// The key expression matching liveliness tokens to query. + key_expr: KeyExpr<'static>, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, Duration) { + let args = Args::parse(); + let timeout = Duration::from_millis(args.timeout); + (args.common.into(), args.key_expr, timeout) +} diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 20dd30b4b..65dc337c8 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -21,6 +21,8 @@ use std::{ use tracing::error; use zenoh_config::unwrap_or_default; +#[cfg(feature = "unstable")] +use zenoh_config::wrappers::EntityGlobalId; use zenoh_core::{Resolvable, Resolve, Result as ZResult, Wait}; use crate::{ @@ -174,6 +176,47 @@ impl<'a> Liveliness<'a> { } } + /// Create a [`Querier`](LivelinessQuerier) for to perform queries on liveliness tokens matching the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression to perform queries on + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// let tokens = querier.get().await.unwrap(); + /// while let Ok(reply) = tokens.recv_async().await { + /// if let Ok(sample) = reply.result() { + /// println!(">> Liveliness token {}", sample.key_expr()); + /// } + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn declare_querier<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> LivelinessQuerierBuilder<'a, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + let timeout = { + let conf = &self.session.0.runtime.config().lock().0; + Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) + }; + LivelinessQuerierBuilder { + session: self.session, + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + timeout, + } + } + /// Query liveliness tokens with matching key expressions. /// /// # Arguments @@ -658,6 +701,371 @@ impl IntoFuture for LivelinessSubscriberBuilder<'_, '_, Callback, true> } } +/// A querier that allows to perform livleiness queries. +/// +/// Queriers are automatically undeclared when dropped. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); +/// let tokens = querier.get().await.unwrap(); +/// # } +/// ``` +#[derive(Debug)] +pub struct LivelinessQuerier<'a> { + pub(crate) session: WeakSession, + pub(crate) id: Id, + pub(crate) key_expr: KeyExpr<'a>, + pub(crate) timeout: Duration, + pub(crate) undeclare_on_drop: bool, +} + +/// A builder for initializing a liveliness querier. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session +/// .liveliness() +/// .declare_querier("key/expression") +/// .await +/// .unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[zenoh_macros::unstable] +#[derive(Debug)] +pub struct LivelinessQuerierBuilder<'a, 'b> { + session: &'a Session, + key_expr: ZResult>, + timeout: Duration, +} + +impl<'a, 'b> LivelinessQuerierBuilder<'a, 'b> { + /// Set querier's queries timeout. + #[inline] + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b> Resolvable for LivelinessQuerierBuilder<'a, 'b> { + type To = ZResult>; +} + +#[zenoh_macros::unstable] +impl<'a> Wait for LivelinessQuerierBuilder<'a, '_> { + #[zenoh_macros::unstable] + fn wait(self) -> ::To { + let mut key_expr = self.key_expr?; + if !key_expr.is_fully_optimized(&self.session.0) { + key_expr = self.session.declare_keyexpr(key_expr).wait()?; + } + let id = self.session.0.declare_liveliness_querier_inner(&key_expr)?; + Ok(LivelinessQuerier { + session: self.session.downgrade(), + id, + key_expr, + timeout: self.timeout, + undeclare_on_drop: true, + }) + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for LivelinessQuerierBuilder<'_, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + #[zenoh_macros::unstable] + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl<'a> LivelinessQuerier<'a> { + /// Returns the [`EntityGlobalId`] of this Querier. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.declare_querier("key/expression") + /// .await + /// .unwrap(); + /// let querier_id = querier.id(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn id(&self) -> EntityGlobalId { + use zenoh_protocol::core::EntityGlobalIdProto; + EntityGlobalIdProto { + zid: self.session.zid().into(), + eid: self.id, + } + .into() + } + + #[inline] + pub fn key_expr(&self) -> &KeyExpr<'a> { + &self.key_expr + } + + /// Perform a liveliness a query. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// let tokens = querier.get(); + /// # } + /// ``` + #[inline] + pub fn get(&self) -> LivelinessQuerierGetBuilder<'_, DefaultHandler> { + LivelinessQuerierGetBuilder { + querier: self, + handler: DefaultHandler::default(), + } + } + + /// Undeclare the [`LivelinessQuerier`], informing the network that it needn't optimize queries for its key expression anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// querier.undeclare().await.unwrap(); + /// # } + /// ``` + pub fn undeclare(self) -> impl Resolve> + 'a { + UndeclarableSealed::undeclare_inner(self, ()) + } + + fn undeclare_impl(&mut self) -> ZResult<()> { + // set the flag first to avoid double panic if this function panic + self.undeclare_on_drop = false; + self.session.undeclare_liveliness_querier_inner(self.id) + } +} + +impl<'a> UndeclarableSealed<()> for LivelinessQuerier<'a> { + type Undeclaration = LivelinessQuerierUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Undeclaration { + LivelinessQuerierUndeclaration(self) + } +} + +/// A [`Resolvable`] returned when undeclaring a publisher. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.declare_querier("key/expression").await.unwrap(); +/// querier.undeclare().await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct LivelinessQuerierUndeclaration<'a>(LivelinessQuerier<'a>); + +impl Resolvable for LivelinessQuerierUndeclaration<'_> { + type To = ZResult<()>; +} + +impl Wait for LivelinessQuerierUndeclaration<'_> { + fn wait(mut self) -> ::To { + self.0.undeclare_impl() + } +} + +impl IntoFuture for LivelinessQuerierUndeclaration<'_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl Drop for LivelinessQuerier<'_> { + fn drop(&mut self) { + if self.undeclare_on_drop { + if let Err(error) = self.undeclare_impl() { + error!(error); + } + } + } +} + +/// A builder for initializing a liveliness querier's `query`. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// # use std::convert::TryFrom; +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); +/// let tokens = querier +/// .get() +/// .await +/// .unwrap(); +/// while let Ok(token) = tokens.recv_async().await { +/// match token.result() { +/// Ok(sample) => println!("Alive token ('{}')", sample.key_expr().as_str()), +/// Err(err) => println!("Received (ERROR: '{:?}')", err.payload()), +/// } +/// } +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct LivelinessQuerierGetBuilder<'a, Handler> { + pub(crate) querier: &'a LivelinessQuerier<'a>, + pub(crate) handler: Handler, +} + +impl<'a> LivelinessQuerierGetBuilder<'a, DefaultHandler> { + /// Receive the replies for this livliness querier's query with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// let _ = querier + /// .get() + /// .callback(|reply| { println!("Received {:?}", reply.result()); }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> LivelinessQuerierGetBuilder<'a, Callback> + where + F: Fn(Reply) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the replies for this liveliness querier's query with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](LivelinessQuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let mut n = 0; + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// let _ = querier + /// .get() + /// .callback_mut(move |reply| {n += 1;}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> LivelinessQuerierGetBuilder<'a, Callback> + where + F: FnMut(Reply) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the replies for this liveliness querier's query with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let querier = session.liveliness().declare_querier("key/expression").await.unwrap(); + /// let replies = querier + /// .get() + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(reply) = replies.recv_async().await { + /// println!("Received {:?}", reply.result()); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> LivelinessQuerierGetBuilder<'a, Handler> + where + Handler: IntoHandler, + { + let LivelinessQuerierGetBuilder { + querier, + handler: _, + } = self; + LivelinessQuerierGetBuilder { querier, handler } + } +} + +impl Resolvable for LivelinessQuerierGetBuilder<'_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult; +} + +impl Wait for LivelinessQuerierGetBuilder<'_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let (callback, receiver) = self.handler.into_handler(); + self.querier + .session + .liveliness_query(&self.querier.key_expr, self.querier.timeout, callback) + .map(|_| receiver) + } +} + +impl IntoFuture for LivelinessQuerierGetBuilder<'_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + /// A builder for initializing a liveliness `query`. /// /// # Examples diff --git a/zenoh/src/api/querier.rs b/zenoh/src/api/querier.rs index b9a25a5d6..6f4512466 100644 --- a/zenoh/src/api/querier.rs +++ b/zenoh/src/api/querier.rs @@ -145,7 +145,7 @@ impl<'a> Querier<'a> { } } - /// Undeclare the [`Querier`], informing the network that it needn't optimize publications for its key expression anymore. + /// Undeclare the [`Querier`], informing the network that it needn't optimize queries for its key expression anymore. /// /// # Examples /// ``` diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 8f2513a7d..7819eb8e9 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -134,6 +134,8 @@ pub(crate) struct SessionState { pub(crate) publishers: HashMap, pub(crate) queriers: HashMap, #[cfg(feature = "unstable")] + pub(crate) liveliness_queriers: HashMap, + #[cfg(feature = "unstable")] pub(crate) remote_tokens: HashMap>, //pub(crate) publications: Vec, pub(crate) subscribers: HashMap>, @@ -170,6 +172,8 @@ impl SessionState { publishers: HashMap::new(), queriers: HashMap::new(), #[cfg(feature = "unstable")] + liveliness_queriers: HashMap::new(), + #[cfg(feature = "unstable")] remote_tokens: HashMap::new(), //publications: Vec::new(), subscribers: HashMap::new(), @@ -1826,6 +1830,77 @@ impl SessionInner { Ok(sub_state) } + #[cfg(feature = "unstable")] + pub(crate) fn declare_liveliness_querier_inner(&self, key_expr: &KeyExpr) -> ZResult { + let mut state = zwrite!(self.state); + trace!("declare_liveliness_querier({:?})", key_expr); + let id = self.runtime.next_id(); + + let mut querier_state = QuerierState { + id, + remote_id: id, + key_expr: key_expr.clone().into_owned(), + destination: Locality::default(), + }; + + let primitives = state.primitives()?; + let declared_querier = + if let Some(twin_querier) = state.queriers.values().find(|p| &p.key_expr == key_expr) { + querier_state.remote_id = twin_querier.remote_id; + None + } else { + Some(key_expr.clone()) + }; + state.liveliness_queriers.insert(id, querier_state); + drop(state); + + if let Some(res) = declared_querier { + primitives.send_interest(Interest { + id, + mode: InterestMode::CurrentFuture, + options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS, + wire_expr: Some(res.to_wire(self).to_owned()), + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } + + Ok(id) + } + + #[cfg(feature = "unstable")] + pub(crate) fn undeclare_liveliness_querier_inner(&self, pid: Id) -> ZResult<()> { + let mut state = zwrite!(self.state); + let Ok(primitives) = state.primitives() else { + return Ok(()); + }; + if let Some(querier_state) = state.liveliness_queriers.remove(&pid) { + trace!("undeclare_liveliness_querier({:?})", querier_state); + // Note: there might be several queriers on the same KeyExpr. + // Before calling forget_queriers(key_expr), check if this was the last one. + if !state + .liveliness_queriers + .values() + .any(|p| p.remote_id == querier_state.remote_id) + { + drop(state); + primitives.send_interest(Interest { + id: querier_state.remote_id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, + ext_qos: declare::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + }); + } + Ok(()) + } else { + Err(zerror!("Unable to find liveliness querier").into()) + } + } + #[zenoh_macros::unstable] pub(crate) fn undeclare_liveliness(&self, tid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index e43f46770..f940e9cbe 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -417,7 +417,8 @@ pub mod scouting { #[zenoh_macros::unstable] pub mod liveliness { pub use crate::api::liveliness::{ - Liveliness, LivelinessGetBuilder, LivelinessSubscriberBuilder, LivelinessToken, + Liveliness, LivelinessGetBuilder, LivelinessQuerier, LivelinessQuerierBuilder, + LivelinessQuerierGetBuilder, LivelinessSubscriberBuilder, LivelinessToken, LivelinessTokenBuilder, LivelinessTokenUndeclaration, }; }