diff --git a/Cargo.lock b/Cargo.lock index b9fbe6b843b..5283f1e99b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,11 +1304,13 @@ dependencies = [ "axum", "chromadb", "chrono", + "clap", "figment", "guacamole", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "reqwest 0.12.9", "serde", "serde_json", "tokio", diff --git a/rust/load/Cargo.toml b/rust/load/Cargo.toml index 5653ce94344..4eebef01948 100644 --- a/rust/load/Cargo.toml +++ b/rust/load/Cargo.toml @@ -8,6 +8,7 @@ async-trait = "0.1.83" axum = "0.7" chromadb = { git = "https://github.com/rescrv/chromadb-rs", rev = "e364e35c34c660d4e8e862436ea600ddc2f46a1e" } chrono = { version = "0.4.38", features = ["serde"] } +clap = { version = "4", features = ["derive"] } figment = { version = "0.10.12", features = ["env", "yaml", "test"] } guacamole = { version = "0.9", default-features = false } serde.workspace = true @@ -27,3 +28,4 @@ opentelemetry-otlp = "0.27" opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } tracing.workspace = true tower-http = { version = "0.6.2", features = ["trace"] } +reqwest = { version = "0.12", features = ["json"] } diff --git a/rust/load/src/bin/chroma-load-inhibit.rs b/rust/load/src/bin/chroma-load-inhibit.rs new file mode 100644 index 00000000000..d5f0b43bbb5 --- /dev/null +++ b/rust/load/src/bin/chroma-load-inhibit.rs @@ -0,0 +1,18 @@ +//! Inhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/inhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Inhibited load on {}", host); + } else { + eprintln!("Failed to inhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to inhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/bin/chroma-load-start.rs b/rust/load/src/bin/chroma-load-start.rs new file mode 100644 index 00000000000..b06694719c1 --- /dev/null +++ b/rust/load/src/bin/chroma-load-start.rs @@ -0,0 +1,68 @@ +//! Start a workload on the chroma-load server. + +use clap::Parser; + +use chroma_load::rest::StartRequest; +use chroma_load::{humanize_expires, Workload}; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + name: String, + #[arg(long)] + expires: String, + #[arg(long)] + data_set: String, + #[arg(long)] + workload: String, + #[arg(long)] + throughput: f64, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StartRequest { + name: args.name, + expires: humanize_expires(&args.expires).unwrap_or(args.expires), + data_set: args.data_set, + workload: Workload::ByName(args.workload), + throughput: args.throughput, + }; + match client + .post(format!("{}/start", args.host)) + .header(reqwest::header::ACCEPT, "application/json") + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + let uuid = match resp.text().await { + Ok(uuid) => uuid, + Err(err) => { + eprintln!("Failed to start workload on {}: {}", args.host, err); + return; + } + }; + println!( + "Started workload on {}:\n{}", + args.host, + // SAFETY(rescrv): serde_json::to_string_pretty should always convert to JSON + // when it just parses as JSON. + uuid, + ); + } else { + eprintln!( + "Failed to start workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to start workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-status.rs b/rust/load/src/bin/chroma-load-status.rs new file mode 100644 index 00000000000..fcf96ac4116 --- /dev/null +++ b/rust/load/src/bin/chroma-load-status.rs @@ -0,0 +1,55 @@ +//! Inspect chroma-load + +use clap::Parser; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + match client + .get(&args.host) + .header(reqwest::header::ACCEPT, "application/json") + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + let status = match resp.json::().await { + Ok(status) => status, + Err(e) => { + eprintln!("Failed to fetch workload status on {}: {}", args.host, e); + return; + } + }; + if status.inhibited { + println!("inhibited"); + } else { + for running in status.running { + println!( + "{} {} {} {} {}", + running.uuid, + running.expires, + running.name, + running.data_set, + // SAFETY(rescrv): WorkloadSummary always converts to JSON. + serde_json::to_string(&running.workload).unwrap() + ); + } + } + } else { + eprintln!( + "Failed to get workload status on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to get workload status on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-stop.rs b/rust/load/src/bin/chroma-load-stop.rs new file mode 100644 index 00000000000..97a227e112d --- /dev/null +++ b/rust/load/src/bin/chroma-load-stop.rs @@ -0,0 +1,44 @@ +//! Stop a single workload on the chroma-load server. +//! +//! If you are looking to stop traffic for a SEV, see chroma-load-inhibit. + +use clap::Parser; +use uuid::Uuid; + +use chroma_load::rest::StopRequest; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + uuid: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StopRequest { + uuid: Uuid::parse_str(&args.uuid).unwrap(), + }; + match client + .post(format!("{}/stop", args.host)) + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + println!("Stopped workload on {}", args.host); + } else { + eprintln!( + "Failed to stop workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to stop workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-uninhibit.rs b/rust/load/src/bin/chroma-load-uninhibit.rs new file mode 100644 index 00000000000..546f8d85027 --- /dev/null +++ b/rust/load/src/bin/chroma-load-uninhibit.rs @@ -0,0 +1,18 @@ +//! Uninhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/uninhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Resumed load on {}", host); + } else { + eprintln!("Failed to uninhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to uninhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/lib.rs b/rust/load/src/lib.rs index bad7ab7e727..27adfd04bd5 100644 --- a/rust/load/src/lib.rs +++ b/rust/load/src/lib.rs @@ -736,6 +736,21 @@ pub async fn entrypoint() { runner.abort(); } +pub fn humanize_expires(expires: &str) -> Option { + if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires) { + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("s") { + let expires = chrono::Utc::now() + chrono::Duration::seconds(duration.trim().parse().ok()?); + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("min") { + let expires = chrono::Utc::now() + + chrono::Duration::seconds(duration.trim().parse::().ok()? * 60i64); + Some(expires.to_rfc3339()) + } else { + Some(expires.to_string()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/load/src/rest.rs b/rust/load/src/rest.rs index 2d2db9c73ea..dfc19866691 100644 --- a/rust/load/src/rest.rs +++ b/rust/load/src/rest.rs @@ -21,9 +21,10 @@ impl From<&dyn crate::DataSet> for Description { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Status { + pub inhibited: bool, pub running: Vec, pub data_sets: Vec, - pub workloads: Vec, + pub workloads: Vec, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]