From 07795fa1e52d3c0a19232c42ff9d3c6d4e9ca5fa Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 18 Nov 2024 19:48:18 +0100 Subject: [PATCH] interest support --- zenoh/src/api/key_expr.rs | 16 ++++++ zenoh/src/net/routing/hat/client/pubsub.rs | 32 +++-------- zenoh/src/net/routing/hat/client/queries.rs | 51 +++++++++++++----- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 32 +++-------- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 53 +++++++++++++------ 5 files changed, 105 insertions(+), 79 deletions(-) diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 2a3c775bfc..6f4cd1ef73 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -88,6 +88,22 @@ impl KeyExpr<'static> { s, ))) } + + pub(crate) fn string_intersects(left: &str, right: &str) -> bool { + if let (Ok(l), Ok(r)) = (KeyExpr::try_from(left), KeyExpr::try_from(right)) { + l.intersects(&r) + } else { + false + } + } + + pub(crate) fn string_includes(left: &str, right: &str) -> bool { + if let (Ok(l), Ok(r)) = (KeyExpr::try_from(left), KeyExpr::try_from(right)) { + l.includes(&r) + } else { + false + } + } } impl<'a> KeyExpr<'a> { /// Equivalent to `::try_from(t)`. diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 96a128b75d..9d8fcf77c0 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -339,37 +339,19 @@ impl HatPubSubTrait for HatCode { .values() .filter(|f| f.whatami != WhatAmI::Client) { - if face.local_interests.values().any(|interest| { + if !face.local_interests.values().any(|interest| { interest.finalized && interest.options.subscribers() && interest .res .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) + .map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr())) .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) - }) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); - } - } else { + }) || face_hat!(face) + .remote_subs + .values() + .any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr())) + { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index cd417ef84b..c37ec6869f 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -33,15 +33,18 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + resource::{NodeId, Resource, SessionContext}, + tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, + }, + hat::{HatQueriesTrait, SendDeclare, Sources}, + router::{update_query_routes_from, RoutesIndexes}, + RoutingContext, }, - hat::{HatQueriesTrait, SendDeclare, Sources}, - router::RoutesIndexes, - RoutingContext, }; #[inline] @@ -272,6 +275,8 @@ pub(super) fn queries_new_face( propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare); } } + // recompute routes + update_query_routes_from(tables, &mut tables.root_res.clone()); } lazy_static::lazy_static! { @@ -349,12 +354,30 @@ impl HatQueriesTrait for HatCode { }; if source_type == WhatAmI::Client { - if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - info: None, - }); + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if !face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.queryables() + && interest + .res + .as_ref() + .map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr())) + .unwrap_or(true) + }) || face_hat!(face) + .remote_qabls + .values() + .any(|qbl| KeyExpr::string_intersects(&qbl.expr(), expr.full_expr())) + { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + info: None, + }); + } } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 776b472c87..7d33bcef43 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -603,37 +603,19 @@ impl HatPubSubTrait for HatCode { .values() .filter(|f| f.whatami == WhatAmI::Router) { - if face.local_interests.values().any(|interest| { + if !face.local_interests.values().any(|interest| { interest.finalized && interest.options.subscribers() && interest .res .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) + .map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr())) .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) - }) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); - } - } else { + }) || face_hat!(face) + .remote_subs + .values() + .any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr())) + { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 052d401690..8bdc1ee62b 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -36,15 +36,20 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; -use crate::net::routing::{ - dispatcher::{ - face::FaceState, - resource::{NodeId, Resource, SessionContext}, - tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, +use crate::{ + key_expr::KeyExpr, + net::routing::{ + dispatcher::{ + face::FaceState, + resource::{NodeId, Resource, SessionContext}, + tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, + }, + hat::{ + p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources, + }, + router::{update_query_routes_from, RoutesIndexes}, + RoutingContext, }, - hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, - router::{update_query_routes_from, RoutesIndexes}, - RoutingContext, }; #[inline] @@ -589,13 +594,31 @@ impl HatQueriesTrait for HatCode { }; if source_type == WhatAmI::Client { - // TODO: BNestMatching: What if there is a local compete ? - if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - info: None, - }); + // TODO: BestMatching: What if there is a local compete ? + for face in tables + .faces + .values() + .filter(|f| f.whatami == WhatAmI::Router) + { + if !face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.queryables() + && interest + .res + .as_ref() + .map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr())) + .unwrap_or(true) + }) || face_hat!(face) + .remote_qabls + .values() + .any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr())) + { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + info: None, + }); + } } for face in tables.faces.values().filter(|f| {