diff --git a/Cargo.lock b/Cargo.lock index 2c2e9a25c8e9..96be9ad1c18a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1168,7 +1168,7 @@ dependencies = [ "futures", "indicatif", "rand", - "reqwest 0.12.7", + "reqwest 0.12.9", "serde", "serde_json", "tantivy", @@ -1303,11 +1303,13 @@ dependencies = [ "axum", "chromadb", "chrono", + "clap", "figment", "guacamole", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "reqwest 0.12.9", "serde", "serde_json", "tokio", @@ -1415,9 +1417,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.17" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -1425,9 +1427,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.17" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -1437,9 +1439,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.13" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -3651,7 +3653,7 @@ dependencies = [ "percent-encoding", "quick-xml", "rand", - "reqwest 0.12.7", + "reqwest 0.12.9", "ring", "serde", "serde_json", @@ -4525,9 +4527,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.7" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64 0.22.1", "bytes", @@ -4552,7 +4554,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.13", - "rustls-native-certs 0.7.3", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.3", "rustls-pki-types", "serde", @@ -4703,19 +4705,6 @@ dependencies = [ "security-framework", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe", - "rustls-pemfile 2.1.3", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.8.0" diff --git a/rust/load/Cargo.toml b/rust/load/Cargo.toml index 5653ce943447..4eebef01948b 100644 --- a/rust/load/Cargo.toml +++ b/rust/load/Cargo.toml @@ -8,6 +8,7 @@ async-trait = "0.1.83" axum = "0.7" chromadb = { git = "https://github.com/rescrv/chromadb-rs", rev = "e364e35c34c660d4e8e862436ea600ddc2f46a1e" } chrono = { version = "0.4.38", features = ["serde"] } +clap = { version = "4", features = ["derive"] } figment = { version = "0.10.12", features = ["env", "yaml", "test"] } guacamole = { version = "0.9", default-features = false } serde.workspace = true @@ -27,3 +28,4 @@ opentelemetry-otlp = "0.27" opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } tracing.workspace = true tower-http = { version = "0.6.2", features = ["trace"] } +reqwest = { version = "0.12", features = ["json"] } diff --git a/rust/load/src/bin/chroma-load-inhibit.rs b/rust/load/src/bin/chroma-load-inhibit.rs new file mode 100644 index 000000000000..d5f0b43bbb55 --- /dev/null +++ b/rust/load/src/bin/chroma-load-inhibit.rs @@ -0,0 +1,18 @@ +//! Inhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/inhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Inhibited load on {}", host); + } else { + eprintln!("Failed to inhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to inhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/bin/chroma-load-start.rs b/rust/load/src/bin/chroma-load-start.rs new file mode 100644 index 000000000000..cf787552a746 --- /dev/null +++ b/rust/load/src/bin/chroma-load-start.rs @@ -0,0 +1,54 @@ +//! Start a workload on the chroma-load server. + +use clap::Parser; + +use chroma_load::rest::StartRequest; +use chroma_load::{humanize_expires, Workload}; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + name: String, + #[arg(long)] + expires: String, + #[arg(long)] + data_set: String, + #[arg(long)] + workload: String, + #[arg(long)] + throughput: f64, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StartRequest { + name: args.name, + expires: humanize_expires(&args.expires).unwrap_or(args.expires), + data_set: args.data_set, + workload: Workload::ByName(args.workload), + throughput: args.throughput, + }; + match client + .post(format!("{}/start", args.host)) + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + println!("Started workload on {}", args.host); + } else { + eprintln!( + "Failed to start workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to start workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-stop.rs b/rust/load/src/bin/chroma-load-stop.rs new file mode 100644 index 000000000000..97a227e112dc --- /dev/null +++ b/rust/load/src/bin/chroma-load-stop.rs @@ -0,0 +1,44 @@ +//! Stop a single workload on the chroma-load server. +//! +//! If you are looking to stop traffic for a SEV, see chroma-load-inhibit. + +use clap::Parser; +use uuid::Uuid; + +use chroma_load::rest::StopRequest; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + uuid: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StopRequest { + uuid: Uuid::parse_str(&args.uuid).unwrap(), + }; + match client + .post(format!("{}/stop", args.host)) + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + println!("Stopped workload on {}", args.host); + } else { + eprintln!( + "Failed to stop workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to stop workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-uninhibit.rs b/rust/load/src/bin/chroma-load-uninhibit.rs new file mode 100644 index 000000000000..546f8d850279 --- /dev/null +++ b/rust/load/src/bin/chroma-load-uninhibit.rs @@ -0,0 +1,18 @@ +//! Uninhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/uninhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Resumed load on {}", host); + } else { + eprintln!("Failed to uninhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to uninhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/lib.rs b/rust/load/src/lib.rs index c4434906bc63..b3924fba5906 100644 --- a/rust/load/src/lib.rs +++ b/rust/load/src/lib.rs @@ -732,6 +732,23 @@ pub async fn entrypoint() { runner.abort(); } +///////////////////////////////////////// humanize_expires ///////////////////////////////////////// + +pub fn humanize_expires(expires: &str) -> Option { + if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires) { + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("s") { + let expires = chrono::Utc::now() + chrono::Duration::seconds(duration.trim().parse().ok()?); + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("min") { + let expires = chrono::Utc::now() + + chrono::Duration::seconds(duration.trim().parse::().ok()? * 60i64); + Some(expires.to_rfc3339()) + } else { + Some(expires.to_string()) + } +} + /////////////////////////////////////////////// tests ////////////////////////////////////////////// #[cfg(test)]