Skip to content

Commit

Permalink
Add ingress downsampling tests (eclipse-zenoh#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Feb 16, 2024
1 parent f09fcc2 commit 7ebdb3c
Showing 1 changed file with 65 additions and 47 deletions.
112 changes: 65 additions & 47 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,31 @@ impl IntervalCounter {
}
}

#[test]
fn downsampling_by_keyexpr() {
fn downsampling_by_keyexpr_impl(egress: bool) {
let _ = env_logger::builder().is_test(true).try_init();

use zenoh::prelude::sync::*;

let ds_cfg = format!(
r#"
[
{{
flow: "{}",
rules: [
{{ key_expr: "test/downsamples_by_keyexp/r100", rate: 10, }},
{{ key_expr: "test/downsamples_by_keyexp/r50", rate: 20, }}
],
}},
] "#,
(if egress { "egress" } else { "ingress" })
);

// declare subscriber
let zenoh_sub = zenoh::open(Config::default()).res().unwrap();
let mut config_sub = Config::default();
if !egress {
config_sub.insert_json5("downsampling", &ds_cfg).unwrap();
}
let zenoh_sub = zenoh::open(config_sub).res().unwrap();

let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new()));
let counter_r100_clone = counter_r100.clone();
Expand All @@ -72,24 +89,11 @@ fn downsampling_by_keyexpr() {
.unwrap();

// declare publisher
let mut config = Config::default();
config
.insert_json5(
"downsampling",
r#"
[
{
flow: "egress",
rules: [
{ key_expr: "test/downsamples_by_keyexp/r100", rate: 10, },
{ key_expr: "test/downsamples_by_keyexp/r50", rate: 20, }
],
},
]
"#,
)
.unwrap();
let zenoh_pub = zenoh::open(config).res().unwrap();
let mut config_pub = Config::default();
if egress {
config_pub.insert_json5("downsampling", &ds_cfg).unwrap();
}
let zenoh_pub = zenoh::open(config_pub).res().unwrap();
let publisher_r100 = zenoh_pub
.declare_publisher("test/downsamples_by_keyexp/r100")
.res()
Expand Down Expand Up @@ -127,18 +131,46 @@ fn downsampling_by_keyexpr() {
zlock!(counter_r100_clone).check_middle(100);
}

#[cfg(unix)]
#[test]
fn downsampling_by_interface() {
fn downsampling_by_keyexpr() {
downsampling_by_keyexpr_impl(true);
downsampling_by_keyexpr_impl(false);
}

#[cfg(unix)]
fn downsampling_by_interface_impl(egress: bool) {
let _ = env_logger::builder().is_test(true).try_init();

use zenoh::prelude::sync::*;

let ds_cfg = format!(
r#"
[
{{
interfaces: ["lo", "lo0"],
flow: "{0}",
rules: [
{{ key_expr: "test/downsamples_by_interface/r100", rate: 10, }},
],
}},
{{
interfaces: ["some_unknown_interface"],
flow: "{0}",
rules: [
{{ key_expr: "test/downsamples_by_interface/all", rate: 10, }},
],
}},
] "#,
(if egress { "egress" } else { "ingress" })
);
// declare subscriber
let mut config_sub = Config::default();
config_sub
.insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:7447"]"#)
.unwrap();
if !egress {
config_sub.insert_json5("downsampling", &ds_cfg).unwrap();
};
let zenoh_sub = zenoh::open(config_sub).res().unwrap();

let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new()));
Expand All @@ -164,30 +196,9 @@ fn downsampling_by_interface() {
config_pub
.insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:7447"]"#)
.unwrap();
config_pub
.insert_json5(
"downsampling",
r#"
[
{
interfaces: ["lo", "lo0"],
flow: "egress",
rules: [
{ key_expr: "test/downsamples_by_interface/r100", rate: 10, },
],
},
{
interfaces: ["some_unknown_interface"],
flow: "egress",
rules: [
{ key_expr: "test/downsamples_by_interface/all", rate: 10, },
],
},
]
"#,
)
.unwrap();

if egress {
config_pub.insert_json5("downsampling", &ds_cfg).unwrap();
}
let zenoh_pub = zenoh::open(config_pub).res().unwrap();
let publisher_r100 = zenoh_pub
.declare_publisher("test/downsamples_by_interface/r100")
Expand Down Expand Up @@ -219,6 +230,13 @@ fn downsampling_by_interface() {
zlock!(counter_r100_clone).check_middle(100);
}

#[cfg(unix)]
#[test]
fn downsampling_by_interface() {
downsampling_by_interface_impl(true);
downsampling_by_interface_impl(false);
}

#[test]
#[should_panic(expected = "unknown variant `down`")]
fn downsampling_config_error_wrong_strategy() {
Expand Down

0 comments on commit 7ebdb3c

Please sign in to comment.