Skip to content

Commit

Permalink
interest support
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Nov 18, 2024
1 parent 800993b commit 07795fa
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 79 deletions.
16 changes: 16 additions & 0 deletions zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ impl KeyExpr<'static> {
s,
)))
}

pub(crate) fn string_intersects(left: &str, right: &str) -> bool {
if let (Ok(l), Ok(r)) = (KeyExpr::try_from(left), KeyExpr::try_from(right)) {
l.intersects(&r)
} else {
false
}
}

pub(crate) fn string_includes(left: &str, right: &str) -> bool {
if let (Ok(l), Ok(r)) = (KeyExpr::try_from(left), KeyExpr::try_from(right)) {
l.includes(&r)
} else {
false
}
}
}
impl<'a> KeyExpr<'a> {
/// Equivalent to `<KeyExpr as TryFrom>::try_from(t)`.
Expand Down
32 changes: 7 additions & 25 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,37 +339,19 @@ impl HatPubSubTrait for HatCode {
.values()
.filter(|f| f.whatami != WhatAmI::Client)
{
if face.local_interests.values().any(|interest| {
if !face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr()))
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
}) || face_hat!(face)
.remote_subs
.values()
.any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr()))
{
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
Expand Down
51 changes: 37 additions & 14 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;

use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
use crate::net::routing::{
dispatcher::{
face::FaceState,
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
use crate::{
key_expr::KeyExpr,
net::routing::{
dispatcher::{
face::FaceState,
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
},
hat::{HatQueriesTrait, SendDeclare, Sources},
router::RoutesIndexes,
RoutingContext,
};

#[inline]
Expand Down Expand Up @@ -272,6 +275,8 @@ pub(super) fn queries_new_face(
propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare);
}
}
// recompute routes
update_query_routes_from(tables, &mut tables.root_res.clone());
}

lazy_static::lazy_static! {
Expand Down Expand Up @@ -349,12 +354,30 @@ impl HatQueriesTrait for HatCode {
};

if source_type == WhatAmI::Client {
if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
info: None,
});
for face in tables
.faces
.values()
.filter(|f| f.whatami != WhatAmI::Client)
{
if !face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.queryables()
&& interest
.res
.as_ref()
.map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr()))
.unwrap_or(true)
}) || face_hat!(face)
.remote_qabls
.values()
.any(|qbl| KeyExpr::string_intersects(&qbl.expr(), expr.full_expr()))
{
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
info: None,
});
}
}
}

Expand Down
32 changes: 7 additions & 25 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,37 +603,19 @@ impl HatPubSubTrait for HatCode {
.values()
.filter(|f| f.whatami == WhatAmI::Router)
{
if face.local_interests.values().any(|interest| {
if !face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr()))
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
}) || face_hat!(face)
.remote_subs
.values()
.any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr()))
{
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
Expand Down
53 changes: 38 additions & 15 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;

use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
use crate::net::routing::{
dispatcher::{
face::FaceState,
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
use crate::{
key_expr::KeyExpr,
net::routing::{
dispatcher::{
face::FaceState,
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{
p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources,
},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
},
hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
};

#[inline]
Expand Down Expand Up @@ -589,13 +594,31 @@ impl HatQueriesTrait for HatCode {
};

if source_type == WhatAmI::Client {
// TODO: BNestMatching: What if there is a local compete ?
if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
info: None,
});
// TODO: BestMatching: What if there is a local compete ?
for face in tables
.faces
.values()
.filter(|f| f.whatami == WhatAmI::Router)
{
if !face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.queryables()
&& interest
.res
.as_ref()
.map(|res| KeyExpr::string_includes(&res.expr(), expr.full_expr()))
.unwrap_or(true)
}) || face_hat!(face)
.remote_qabls
.values()
.any(|sub| KeyExpr::string_intersects(&sub.expr(), expr.full_expr()))
{
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
info: None,
});
}
}

for face in tables.faces.values().filter(|f| {
Expand Down

0 comments on commit 07795fa

Please sign in to comment.