Skip to content

Commit

Permalink
code cleanup, remove query.reply_del
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Mar 6, 2024
1 parent ea22d3d commit 2f93757
Showing 1 changed file with 53 additions and 97 deletions.
150 changes: 53 additions & 97 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::prelude::*;
use crate::query::ReplyKeyExpr;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
use crate::sample::DataInfo;
#[zenoh_macros::unstable]
use crate::sample::SourceInfo;
use crate::SessionRef;
Expand All @@ -32,7 +31,6 @@ use std::future::Ready;
use std::ops::Deref;
use std::sync::Arc;
use uhlc::Timestamp;
use zenoh_buffers::ZBuf;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::core::WireExpr;
use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal};
Expand Down Expand Up @@ -111,9 +109,26 @@ impl Query {
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_> {
let Sample {
key_expr,
value,
kind: _,
timestamp,
qos: _,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
attachment,
} = sample;
ReplyBuilder {
query: self,
sample,
key_expr,
value,
timestamp,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
attachment,
}
}

Expand All @@ -132,11 +147,15 @@ impl Query {
IntoKeyExpr: Into<KeyExpr<'static>>,
IntoValue: Into<Value>,
{
let mut sample = Sample::new(key_expr, value);
sample.kind = SampleKind::Put;
ReplyBuilder {
query: self,
sample,
key_expr: key_expr.into(),
value: value.into(),
timestamp: None,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
}
}
/// Sends a error reply to this Query.
Expand All @@ -152,22 +171,6 @@ impl Query {
}
}

/// Sends a delete reply to this Query.
///
/// By default, queries only accept replies whose key expression intersects with the query's.
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply_del<IntoKeyExpr>(&self, key_expr: IntoKeyExpr) -> ReplyDelBuilder<'_>
where
IntoKeyExpr: Into<KeyExpr<'static>>,
{
ReplyDelBuilder {
query: self,
keyexpr: key_expr.into(),
}
}

/// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
/// This getter allows you to check whether or not a specific query does.
#[zenoh_macros::unstable]
Expand Down Expand Up @@ -212,7 +215,13 @@ impl fmt::Display for Query {
#[derive(Debug)]
pub struct ReplyBuilder<'a> {
query: &'a Query,
sample: Sample,
key_expr: KeyExpr<'static>,
value: Value,
timestamp: Option<Timestamp>,
#[cfg(feature = "unstable")]
source_info: SourceInfo,
#[cfg(feature = "unstable")]
attachment: Option<Attachment>,
}

/// A builder returned by [`Query::reply_err()`](Query::reply_err).
Expand All @@ -223,27 +232,19 @@ pub struct ReplyErrBuilder<'a> {
value: Value,
}

/// A builder returned by [`Query::reply_del()`](Query::reply_del).
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct ReplyDelBuilder<'a> {
query: &'a Query,
keyexpr: KeyExpr<'static>,
}

impl<'a> ReplyBuilder<'a> {
#[zenoh_macros::unstable]
pub fn with_attachment(mut self, attachment: Attachment) -> Self {
self.sample = self.sample.with_attachment(attachment);
self.attachment = Some(attachment);
self
}
#[zenoh_macros::unstable]
pub fn with_source_id(mut self, source_info: SourceInfo) -> Self {
self.sample = self.sample.with_source_info(source_info);
pub fn with_source_info(mut self, source_info: SourceInfo) -> Self {
self.source_info = source_info;
self
}
pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self {
self.sample = self.sample.with_timestamp(timestamp);
self.timestamp = Some(timestamp);
self
}
}
Expand All @@ -255,65 +256,44 @@ impl<'a> Resolvable for ReplyBuilder<'a> {
impl SyncResolve for ReplyBuilder<'_> {
fn res_sync(self) -> <Self as Resolvable>::To {
if !self.query._accepts_any_replies().unwrap_or(false)
&& !self.query.key_expr().intersects(&self.sample.key_expr)
&& !self.query.key_expr().intersects(&self.key_expr)
{
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.sample.key_expr, self.query.key_expr())
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.key_expr, self.query.key_expr())
}
let Sample {
key_expr,
value: Value { payload, encoding },
kind,
timestamp,
qos,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
attachment,
} = self.sample;
#[allow(unused_mut)]
let mut data_info = DataInfo {
kind,
encoding: Some(encoding),
timestamp,
qos,
source_id: None,
source_sn: None,
};
#[allow(unused_mut)]
let mut ext_attachment = None;
#[allow(unused_mut)]
let mut ext_sinfo = None;
#[cfg(feature = "unstable")]
{
data_info.source_id = source_info.source_id;
data_info.source_sn = source_info.source_sn;
if let Some(attachment) = attachment {
if self.source_info.source_id.is_some() || self.source_info.source_sn.is_some() {
ext_sinfo = Some(zenoh::reply::ext::SourceInfoType {
zid: self.source_info.source_id.unwrap_or_default(),
eid: 0, // @TODO use proper EntityId (#703)
sn: self.source_info.source_sn.unwrap_or_default() as u32,
})
}
if let Some(attachment) = self.attachment {
ext_attachment = Some(attachment.into());
}
}
self.query.inner.primitives.send_response(Response {
rid: self.query.inner.qid,
wire_expr: WireExpr {
scope: 0,
suffix: std::borrow::Cow::Owned(key_expr.into()),
suffix: std::borrow::Cow::Owned(self.key_expr.into()),
mapping: Mapping::Sender,
},
payload: ResponseBody::Reply(zenoh::Reply {
timestamp: data_info.timestamp,
encoding: data_info.encoding.unwrap_or_default(),
ext_sinfo: if data_info.source_id.is_some() || data_info.source_sn.is_some() {
Some(zenoh::reply::ext::SourceInfoType {
zid: data_info.source_id.unwrap_or_default(),
eid: 0, // @TODO use proper EntityId (#703)
sn: data_info.source_sn.unwrap_or_default() as u32,
})
} else {
None
},
timestamp: self.timestamp,
encoding: self.value.encoding,
ext_sinfo,
ext_consolidation: ConsolidationType::default(),
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment,
ext_unknown: vec![],
payload,
payload: self.value.payload,
}),
ext_qos: response::ext::QoSType::response_default(),
ext_tstamp: None,
Expand Down Expand Up @@ -378,30 +358,6 @@ impl<'a> AsyncResolve for ReplyErrBuilder<'a> {
}
}

impl<'a> Resolvable for ReplyDelBuilder<'a> {
type To = ZResult<()>;
}

impl SyncResolve for ReplyDelBuilder<'_> {
fn res_sync(self) -> <Self as Resolvable>::To {
let mut sample = Sample::new(self.keyexpr, ZBuf::empty());
sample.kind = SampleKind::Delete;
ReplyBuilder {
query: self.query,
sample,
}
.res_sync()
}
}

impl<'a> AsyncResolve for ReplyDelBuilder<'a> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

pub(crate) struct QueryableState {
pub(crate) id: Id,
pub(crate) key_expr: WireExpr<'static>,
Expand Down

0 comments on commit 2f93757

Please sign in to comment.