From 3568a4ece84c3042ea0c7fa663dd8da130786b7f Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 2 Dec 2024 09:13:27 -0800 Subject: [PATCH 1/4] [ENH] Command-line tools for every chroma-load endpoint. chroma-load-start start a workload chroma-load-stop stop a workload chroma-load-inhibit stop all workloads chroma-load-uninhibit reverse the effects of chroma-load-inhibit --- Cargo.lock | 2 + rust/load/Cargo.toml | 2 + rust/load/src/bin/chroma-load-inhibit.rs | 18 ++++++++ rust/load/src/bin/chroma-load-start.rs | 54 ++++++++++++++++++++++ rust/load/src/bin/chroma-load-stop.rs | 44 ++++++++++++++++++ rust/load/src/bin/chroma-load-uninhibit.rs | 18 ++++++++ rust/load/src/lib.rs | 15 ++++++ 7 files changed, 153 insertions(+) create mode 100644 rust/load/src/bin/chroma-load-inhibit.rs create mode 100644 rust/load/src/bin/chroma-load-start.rs create mode 100644 rust/load/src/bin/chroma-load-stop.rs create mode 100644 rust/load/src/bin/chroma-load-uninhibit.rs diff --git a/Cargo.lock b/Cargo.lock index b9fbe6b843b..5283f1e99b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,11 +1304,13 @@ dependencies = [ "axum", "chromadb", "chrono", + "clap", "figment", "guacamole", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "reqwest 0.12.9", "serde", "serde_json", "tokio", diff --git a/rust/load/Cargo.toml b/rust/load/Cargo.toml index 5653ce94344..4eebef01948 100644 --- a/rust/load/Cargo.toml +++ b/rust/load/Cargo.toml @@ -8,6 +8,7 @@ async-trait = "0.1.83" axum = "0.7" chromadb = { git = "https://github.com/rescrv/chromadb-rs", rev = "e364e35c34c660d4e8e862436ea600ddc2f46a1e" } chrono = { version = "0.4.38", features = ["serde"] } +clap = { version = "4", features = ["derive"] } figment = { version = "0.10.12", features = ["env", "yaml", "test"] } guacamole = { version = "0.9", default-features = false } serde.workspace = true @@ -27,3 +28,4 @@ opentelemetry-otlp = "0.27" opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } tracing.workspace = true tower-http = { version = "0.6.2", features = ["trace"] } +reqwest = { version = "0.12", features = ["json"] } diff --git a/rust/load/src/bin/chroma-load-inhibit.rs b/rust/load/src/bin/chroma-load-inhibit.rs new file mode 100644 index 00000000000..d5f0b43bbb5 --- /dev/null +++ b/rust/load/src/bin/chroma-load-inhibit.rs @@ -0,0 +1,18 @@ +//! Inhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/inhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Inhibited load on {}", host); + } else { + eprintln!("Failed to inhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to inhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/bin/chroma-load-start.rs b/rust/load/src/bin/chroma-load-start.rs new file mode 100644 index 00000000000..cf787552a74 --- /dev/null +++ b/rust/load/src/bin/chroma-load-start.rs @@ -0,0 +1,54 @@ +//! Start a workload on the chroma-load server. + +use clap::Parser; + +use chroma_load::rest::StartRequest; +use chroma_load::{humanize_expires, Workload}; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + name: String, + #[arg(long)] + expires: String, + #[arg(long)] + data_set: String, + #[arg(long)] + workload: String, + #[arg(long)] + throughput: f64, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StartRequest { + name: args.name, + expires: humanize_expires(&args.expires).unwrap_or(args.expires), + data_set: args.data_set, + workload: Workload::ByName(args.workload), + throughput: args.throughput, + }; + match client + .post(format!("{}/start", args.host)) + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + println!("Started workload on {}", args.host); + } else { + eprintln!( + "Failed to start workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to start workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-stop.rs b/rust/load/src/bin/chroma-load-stop.rs new file mode 100644 index 00000000000..97a227e112d --- /dev/null +++ b/rust/load/src/bin/chroma-load-stop.rs @@ -0,0 +1,44 @@ +//! Stop a single workload on the chroma-load server. +//! +//! If you are looking to stop traffic for a SEV, see chroma-load-inhibit. + +use clap::Parser; +use uuid::Uuid; + +use chroma_load::rest::StopRequest; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, + #[arg(long)] + uuid: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + let req = StopRequest { + uuid: Uuid::parse_str(&args.uuid).unwrap(), + }; + match client + .post(format!("{}/stop", args.host)) + .json(&req) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + println!("Stopped workload on {}", args.host); + } else { + eprintln!( + "Failed to stop workload on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to stop workload on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/bin/chroma-load-uninhibit.rs b/rust/load/src/bin/chroma-load-uninhibit.rs new file mode 100644 index 00000000000..546f8d85027 --- /dev/null +++ b/rust/load/src/bin/chroma-load-uninhibit.rs @@ -0,0 +1,18 @@ +//! Uninhibit chroma-load on every host provided on the command line. + +#[tokio::main] +async fn main() { + for host in std::env::args().skip(1) { + let client = reqwest::Client::new(); + match client.post(format!("{}/uninhibit", host)).send().await { + Ok(resp) => { + if resp.status().is_success() { + println!("Resumed load on {}", host); + } else { + eprintln!("Failed to uninhibit load on {}: {}", host, resp.status()); + } + } + Err(e) => eprintln!("Failed to uninhibit load on {}: {}", host, e), + } + } +} diff --git a/rust/load/src/lib.rs b/rust/load/src/lib.rs index bad7ab7e727..27adfd04bd5 100644 --- a/rust/load/src/lib.rs +++ b/rust/load/src/lib.rs @@ -736,6 +736,21 @@ pub async fn entrypoint() { runner.abort(); } +pub fn humanize_expires(expires: &str) -> Option { + if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires) { + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("s") { + let expires = chrono::Utc::now() + chrono::Duration::seconds(duration.trim().parse().ok()?); + Some(expires.to_rfc3339()) + } else if let Some(duration) = expires.strip_suffix("min") { + let expires = chrono::Utc::now() + + chrono::Duration::seconds(duration.trim().parse::().ok()? * 60i64); + Some(expires.to_rfc3339()) + } else { + Some(expires.to_string()) + } +} + #[cfg(test)] mod tests { use super::*; From d8070654a9ff63445bb8208f4eddf7bd4967f508 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 2 Dec 2024 10:12:27 -0800 Subject: [PATCH 2/4] status tool --- rust/load/src/bin/chroma-load-status.rs | 35 +++++++++++++++++++++++++ rust/load/src/rest.rs | 2 +- 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 rust/load/src/bin/chroma-load-status.rs diff --git a/rust/load/src/bin/chroma-load-status.rs b/rust/load/src/bin/chroma-load-status.rs new file mode 100644 index 00000000000..5e874a1e43d --- /dev/null +++ b/rust/load/src/bin/chroma-load-status.rs @@ -0,0 +1,35 @@ +//! Inspect chroma-load + +use clap::Parser; + +#[derive(Parser, Debug)] +struct Args { + #[arg(long)] + host: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let client = reqwest::Client::new(); + match client + .get(&args.host) + .header(reqwest::header::ACCEPT, "application/json") + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + let status = resp.json::().await; + println!("{:#?}", status); + } else { + eprintln!( + "Failed to get workload status on {}: {}", + args.host, + resp.status() + ); + } + } + Err(e) => eprintln!("Failed to get workload status on {}: {}", args.host, e), + } +} diff --git a/rust/load/src/rest.rs b/rust/load/src/rest.rs index 2d2db9c73ea..966442c98d7 100644 --- a/rust/load/src/rest.rs +++ b/rust/load/src/rest.rs @@ -23,7 +23,7 @@ impl From<&dyn crate::DataSet> for Description { pub struct Status { pub running: Vec, pub data_sets: Vec, - pub workloads: Vec, + pub workloads: Vec, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] From cdbf0a98f876576270a4935a42642290392136d4 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 2 Dec 2024 10:27:56 -0800 Subject: [PATCH 3/4] status tool prints useful status --- rust/load/src/bin/chroma-load-status.rs | 24 ++++++++++++++++++++++-- rust/load/src/rest.rs | 1 + 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/rust/load/src/bin/chroma-load-status.rs b/rust/load/src/bin/chroma-load-status.rs index 5e874a1e43d..fcf96ac4116 100644 --- a/rust/load/src/bin/chroma-load-status.rs +++ b/rust/load/src/bin/chroma-load-status.rs @@ -20,8 +20,28 @@ async fn main() { { Ok(resp) => { if resp.status().is_success() { - let status = resp.json::().await; - println!("{:#?}", status); + let status = match resp.json::().await { + Ok(status) => status, + Err(e) => { + eprintln!("Failed to fetch workload status on {}: {}", args.host, e); + return; + } + }; + if status.inhibited { + println!("inhibited"); + } else { + for running in status.running { + println!( + "{} {} {} {} {}", + running.uuid, + running.expires, + running.name, + running.data_set, + // SAFETY(rescrv): WorkloadSummary always converts to JSON. + serde_json::to_string(&running.workload).unwrap() + ); + } + } } else { eprintln!( "Failed to get workload status on {}: {}", diff --git a/rust/load/src/rest.rs b/rust/load/src/rest.rs index 966442c98d7..dfc19866691 100644 --- a/rust/load/src/rest.rs +++ b/rust/load/src/rest.rs @@ -21,6 +21,7 @@ impl From<&dyn crate::DataSet> for Description { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Status { + pub inhibited: bool, pub running: Vec, pub data_sets: Vec, pub workloads: Vec, From 45276465c6bed6ab9524283697dc387995b16132 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 2 Dec 2024 10:41:01 -0800 Subject: [PATCH 4/4] Make start print the uuid of started task. --- rust/load/src/bin/chroma-load-start.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rust/load/src/bin/chroma-load-start.rs b/rust/load/src/bin/chroma-load-start.rs index cf787552a74..b06694719c1 100644 --- a/rust/load/src/bin/chroma-load-start.rs +++ b/rust/load/src/bin/chroma-load-start.rs @@ -34,13 +34,27 @@ async fn main() { }; match client .post(format!("{}/start", args.host)) + .header(reqwest::header::ACCEPT, "application/json") .json(&req) .send() .await { Ok(resp) => { if resp.status().is_success() { - println!("Started workload on {}", args.host); + let uuid = match resp.text().await { + Ok(uuid) => uuid, + Err(err) => { + eprintln!("Failed to start workload on {}: {}", args.host, err); + return; + } + }; + println!( + "Started workload on {}:\n{}", + args.host, + // SAFETY(rescrv): serde_json::to_string_pretty should always convert to JSON + // when it just parses as JSON. + uuid, + ); } else { eprintln!( "Failed to start workload on {}: {}",