Skip to content

Commit

Permalink
add querier
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Nov 13, 2024
1 parent da11732 commit 59f74fd
Show file tree
Hide file tree
Showing 10 changed files with 966 additions and 30 deletions.
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ validated_struct::validator! {
subscribers: Vec<OwnedKeyExpr>,
/// A list of key-expressions for which all included publishers will be aggregated into.
publishers: Vec<OwnedKeyExpr>,
/// A list of key-expressions for which all included queriers will be aggregated into.
queriers: Vec<OwnedKeyExpr>,
},
pub transport: #[derive(Default)]
TransportConf {
Expand Down
127 changes: 127 additions & 0 deletions examples/examples/z_querier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, query::QueryTarget, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
async fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");

let (config, keyexpr, payload, target, timeout) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring Querier on '{keyexpr}'...");
let querier = session
.declare_querier(keyexpr)
.target(target)
.timeout(timeout)
.await
.unwrap();
println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {}", payload.clone().unwrap_or_default());
println!(
"Querying '{}' with payload: '{}')...",
&querier.key_expr(),
buf
);
let replies = querier
.get()
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
// Refer to z_bytes.rs to see how to serialize different types of message
.payload(buf)
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> Received ('{}': '{}')",
sample.key_expr().as_str(),
payload,
);
}
Err(err) => {
let payload = err
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(">> Received (ERROR: '{}')", payload);
}
}
}
}
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
enum Qt {
BestMatching,
All,
AllComplete,
}

#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The selection of resources to query
key_expr: KeyExpr<'static>,
#[arg(short, long)]
/// An optional payload to put in the query.
payload: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
#[arg(short = 'o', long, default_value = "10000")]
/// The query timeout in milliseconds.
timeout: u64,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
KeyExpr<'static>,
Option<String>,
QueryTarget,
Duration,
) {
let args = Args::parse();
(
args.common.into(),
args.key_expr,
args.payload,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Qt::AllComplete => QueryTarget::AllComplete,
},
Duration::from_millis(args.timeout),
)
}
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
pub(crate) mod querier;
pub(crate) mod query;
pub(crate) mod queryable;
pub(crate) mod reply;
Expand Down
31 changes: 2 additions & 29 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
use std::future::{IntoFuture, Ready};

use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::core::CongestionControl;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{core::CongestionControl, network::Mapping};

#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
Expand Down Expand Up @@ -375,34 +375,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
fn wait(self) -> <Self as Resolvable>::To {
let mut key_expr = self.key_expr?;
if !key_expr.is_fully_optimized(&self.session.0) {
let session_id = self.session.0.id;
let expr_id = self.session.0.declare_prefix(key_expr.as_str()).wait()?;
let prefix_len = key_expr
.len()
.try_into()
.expect("How did you get a key expression with a length over 2^32!?");
key_expr = match key_expr.0 {
crate::api::key_expr::KeyExprInner::Borrowed(key_expr)
| crate::api::key_expr::KeyExprInner::BorrowedWire { key_expr, .. } => {
KeyExpr(crate::api::key_expr::KeyExprInner::BorrowedWire {
key_expr,
expr_id,
mapping: Mapping::Sender,
prefix_len,
session_id,
})
}
crate::api::key_expr::KeyExprInner::Owned(key_expr)
| crate::api::key_expr::KeyExprInner::Wire { key_expr, .. } => {
KeyExpr(crate::api::key_expr::KeyExprInner::Wire {
key_expr,
expr_id,
mapping: Mapping::Sender,
prefix_len,
session_id,
})
}
}
key_expr = self.session.declare_keyexpr(key_expr).wait()?;
}
let id = self
.session
Expand Down
Loading

0 comments on commit 59f74fd

Please sign in to comment.