Skip to content

Commit

Permalink
Add unstable tag everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Dec 6, 2024
1 parent c9cc963 commit 2d6550d
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 2 deletions.
24 changes: 23 additions & 1 deletion zenoh-ext/src/advanced_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct QoS {
is_express: bool,
}

#[zenoh_macros::unstable]
impl Default for QoS {
fn default() -> Self {
Self {
Expand All @@ -56,20 +57,24 @@ impl Default for QoS {
}

#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl QoSBuilderTrait for QoS {
#[allow(unused_mut)]
#[zenoh_macros::unstable]
fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
self.congestion_control = congestion_control;
self
}

#[allow(unused_mut)]
#[zenoh_macros::unstable]
fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}

#[allow(unused_mut)]
#[zenoh_macros::unstable]
fn express(mut self, is_express: bool) -> Self {
self.is_express = is_express;
self
Expand All @@ -84,6 +89,7 @@ pub struct CacheConfig {
replies_qos: QoS,
}

#[zenoh_macros::unstable]
impl Default for CacheConfig {
fn default() -> Self {
Self {
Expand All @@ -93,14 +99,17 @@ impl Default for CacheConfig {
}
}

#[zenoh_macros::unstable]
impl CacheConfig {
/// Specify how many samples to keep for each resource.
#[zenoh_macros::unstable]
pub fn max_samples(mut self, depth: usize) -> Self {
self.max_samples = depth;
self
}

/// The QoS to apply to replies.
#[zenoh_macros::unstable]
pub fn replies_qos(mut self, qos: QoS) -> Self {
self.replies_qos = qos;
self
Expand All @@ -118,7 +127,9 @@ pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
liveliness: bool,
}

#[zenoh_macros::unstable]
impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
pub(crate) fn new(
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
Expand All @@ -134,6 +145,7 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
}

/// Change the prefix used for queryable.
#[zenoh_macros::unstable]
pub fn queryable_prefix<TryIntoKeyExpr>(mut self, queryable_prefix: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
Expand All @@ -144,38 +156,45 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
}

/// Change the history size for each resource.
#[zenoh_macros::unstable]
pub fn history(mut self, history: CacheConfig) -> Self {
self.history = history;
self
}
}

#[zenoh_macros::unstable]
impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
type To = ZResult<AdvancedCache>;
}

#[zenoh_macros::unstable]
impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
AdvancedCache::new(self)
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

#[zenoh_macros::unstable]
fn decode_range(range: &str) -> (Option<u32>, Option<u32>) {
let mut split = range.split("..");
let start = split.next().and_then(|s| s.parse::<u32>().ok());
let end = split.next().map(|s| s.parse::<u32>().ok()).unwrap_or(start);
(start, end)
}

#[zenoh_macros::unstable]
fn sample_in_range(sample: &Sample, start: Option<u32>, end: Option<u32>) -> bool {
if start.is_none() && end.is_none() {
true
Expand All @@ -200,7 +219,9 @@ pub struct AdvancedCache {
_token: Option<LivelinessToken>,
}

#[zenoh_macros::unstable]
impl AdvancedCache {
#[zenoh_macros::unstable]
fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
let key_expr = conf.pub_key_expr?.into_owned();
// the queryable_prefix (optional), and the key_expr for AdvancedCache's queryable ("[<queryable_prefix>]/<pub_key_expr>")
Expand Down Expand Up @@ -322,7 +343,8 @@ impl AdvancedCache {
})
}

pub fn cache_sample(&self, sample: Sample) {
#[zenoh_macros::unstable]
pub(crate) fn cache_sample(&self, sample: Sample) {
if let Ok(mut queue) = self.cache.write() {
if queue.len() >= self.max_samples {
queue.pop_front();
Expand Down
42 changes: 41 additions & 1 deletion zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use zenoh::{
use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC};

#[derive(PartialEq)]
#[zenoh_macros::unstable]
pub(crate) enum Sequencing {
None,
Timestamp,
Expand All @@ -53,7 +54,9 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
history: CacheConfig,
}

#[zenoh_macros::unstable]
impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
pub(crate) fn new(
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
Expand All @@ -72,12 +75,14 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be achieved if history is enabled.
#[zenoh_macros::unstable]
pub fn sample_miss_detection(mut self) -> Self {
self.sequencing = Sequencing::SequenceNumber;
self
}

/// Change the history size for each resource.
#[zenoh_macros::unstable]
pub fn cache(mut self, config: CacheConfig) -> Self {
self.cache = true;
if self.sequencing == Sequencing::None {
Expand All @@ -90,6 +95,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
/// Allow this publisher to be detected by subscribers.
///
/// This allows Subscribers to retrieve the local history.
#[zenoh_macros::unstable]
pub fn publisher_detection(mut self) -> Self {
self.liveliness = true;
self
Expand All @@ -98,6 +104,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
/// A key expression added to the liveliness token key expression
/// and to the cache queryable key expression.
/// It can be used to convey meta data.
#[zenoh_macros::unstable]
pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
Expand All @@ -108,20 +115,25 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
}
}

#[zenoh_macros::unstable]
impl<'a> Resolvable for AdvancedPublisherBuilder<'a, '_, '_> {
type To = ZResult<AdvancedPublisher<'a>>;
}

#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
AdvancedPublisher::new(self)
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
Expand All @@ -136,7 +148,9 @@ pub struct AdvancedPublisher<'a> {
_token: Option<LivelinessToken>,
}

#[zenoh_macros::unstable]
impl<'a> AdvancedPublisher<'a> {
#[zenoh_macros::unstable]
fn new(conf: AdvancedPublisherBuilder<'a, '_, '_>) -> ZResult<Self> {
let key_expr = conf.pub_key_expr?;
let meta = match conf.meta_key_expr {
Expand Down Expand Up @@ -221,29 +235,34 @@ impl<'a> AdvancedPublisher<'a> {
/// let publisher_id = publisher.id();
/// # }
/// ```
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
self.publisher.id()
}

#[inline]
#[zenoh_macros::unstable]
pub fn key_expr(&self) -> &KeyExpr<'a> {
self.publisher.key_expr()
}

/// Get the [`Encoding`] used when publishing data.
#[inline]
#[zenoh_macros::unstable]
pub fn encoding(&self) -> &Encoding {
self.publisher.encoding()
}

/// Get the `congestion_control` applied when routing the data.
#[inline]
#[zenoh_macros::unstable]
pub fn congestion_control(&self) -> CongestionControl {
self.publisher.congestion_control()
}

/// Get the priority of the written data.
#[inline]
#[zenoh_macros::unstable]
pub fn priority(&self) -> Priority {
self.publisher.priority()
}
Expand All @@ -261,6 +280,7 @@ impl<'a> AdvancedPublisher<'a> {
/// # }
/// ```
#[inline]
#[zenoh_macros::unstable]
pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
where
IntoZBytes: Into<ZBytes>,
Expand Down Expand Up @@ -293,6 +313,7 @@ impl<'a> AdvancedPublisher<'a> {
/// publisher.delete().await.unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
let mut builder = self.publisher.delete();
if let Some(seqnum) = &self.seqnum {
Expand Down Expand Up @@ -375,24 +396,30 @@ impl<'a> AdvancedPublisher<'a> {
/// publisher.undeclare().await.unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
self.publisher.undeclare()
}
}

#[zenoh_macros::unstable]
pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
#[zenoh_macros::unstable]
pub type AdvancedPublisherDeleteBuilder<'a> =
AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;

#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Clone)]
#[zenoh_macros::unstable]
pub struct AdvancedPublicationBuilder<'a, P> {
pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
pub(crate) cache: Option<&'a AdvancedCache>,
}

#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
#[zenoh_macros::unstable]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
builder: self.builder.encoding(encoding),
Expand All @@ -402,14 +429,16 @@ impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderP
}

#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[cfg(feature = "unstable")]
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self {
Self {
builder: self.builder.source_info(source_info),
..self
}
}
#[zenoh_macros::unstable]
fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -420,7 +449,9 @@ impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
}

#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[zenoh_macros::unstable]
fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
Self {
builder: self.builder.timestamp(timestamp),
Expand All @@ -429,12 +460,15 @@ impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
}
}

#[zenoh_macros::unstable]
impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
type To = ZResult<()>;
}

#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherPutBuilder<'_> {
#[inline]
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
if let Some(cache) = self.cache {
cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
Expand All @@ -443,8 +477,10 @@ impl Wait for AdvancedPublisherPutBuilder<'_> {
}
}

#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherDeleteBuilder<'_> {
#[inline]
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
if let Some(cache) = self.cache {
cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
Expand All @@ -453,19 +489,23 @@ impl Wait for AdvancedPublisherDeleteBuilder<'_> {
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
Expand Down
Loading

0 comments on commit 2d6550d

Please sign in to comment.