Skip to content

Commit

Permalink
[ENH][chroma-load] Support delay on workloads. (#3210)
Browse files Browse the repository at this point in the history
This supports delaying a workload.  The way it's implemented, workloads
advertise being "active" and just silently NOP when they are "inactive".
  • Loading branch information
rescrv authored Dec 3, 2024
1 parent 8b6f614 commit 9ec6b33
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust/load/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ edition = "2021"
async-trait = "0.1.83"
axum = "0.7"
chromadb = { git = "https://github.com/rescrv/chromadb-rs", rev = "e364e35c34c660d4e8e862436ea600ddc2f46a1e" }
chrono = "0.4.38"
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4", features = ["derive"] }
figment = { version = "0.10.12", features = ["env", "yaml", "test"] }
guacamole = { version = "0.9", default-features = false }
serde.workspace = true
Expand All @@ -27,3 +28,4 @@ opentelemetry-otlp = "0.27"
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
tracing.workspace = true
tower-http = { version = "0.6.2", features = ["trace"] }
reqwest = { version = "0.12", features = ["json"] }
32 changes: 32 additions & 0 deletions rust/load/examples/workload-json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use chroma_load::{Distribution, GetQuery, QueryQuery, Workload};

fn main() {
let w = Workload::Hybrid(vec![
(1.0, Workload::Nop),
(1.0, Workload::ByName("foo".to_string())),
(
1.0,
Workload::Get(GetQuery {
limit: Distribution::Constant(10),
document: None,
metadata: None,
}),
),
(
1.0,
Workload::Query(QueryQuery {
limit: Distribution::Constant(10),
document: None,
metadata: None,
}),
),
(
1.0,
Workload::Delay {
after: chrono::DateTime::parse_from_rfc3339("2021-01-01T00:00:00+00:00").unwrap(),
wrap: Box::new(Workload::Nop),
},
),
]);
println!("{}", serde_json::to_string_pretty(&w).unwrap());
}
18 changes: 18 additions & 0 deletions rust/load/src/bin/chroma-load-inhibit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Inhibit chroma-load on every host provided on the command line.
#[tokio::main]
async fn main() {
for host in std::env::args().skip(1) {
let client = reqwest::Client::new();
match client.post(format!("{}/inhibit", host)).send().await {
Ok(resp) => {
if resp.status().is_success() {
println!("Inhibited load on {}", host);
} else {
eprintln!("Failed to inhibit load on {}: {}", host, resp.status());
}
}
Err(e) => eprintln!("Failed to inhibit load on {}: {}", host, e),
}
}
}
68 changes: 68 additions & 0 deletions rust/load/src/bin/chroma-load-start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Start a workload on the chroma-load server.
use clap::Parser;

use chroma_load::rest::StartRequest;
use chroma_load::{humanize_expires, Workload};

#[derive(Parser, Debug)]
struct Args {
#[arg(long)]
host: String,
#[arg(long)]
name: String,
#[arg(long)]
expires: String,
#[arg(long)]
data_set: String,
#[arg(long)]
workload: String,
#[arg(long)]
throughput: f64,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let client = reqwest::Client::new();
let req = StartRequest {
name: args.name,
expires: humanize_expires(&args.expires).unwrap_or(args.expires),
data_set: args.data_set,
workload: Workload::ByName(args.workload),
throughput: args.throughput,
};
match client
.post(format!("{}/start", args.host))
.header(reqwest::header::ACCEPT, "application/json")
.json(&req)
.send()
.await
{
Ok(resp) => {
if resp.status().is_success() {
let uuid = match resp.text().await {
Ok(uuid) => uuid,
Err(err) => {
eprintln!("Failed to start workload on {}: {}", args.host, err);
return;
}
};
println!(
"Started workload on {}:\n{}",
args.host,
// SAFETY(rescrv): serde_json::to_string_pretty should always convert to JSON
// when it just parses as JSON.
uuid,
);
} else {
eprintln!(
"Failed to start workload on {}: {}",
args.host,
resp.status()
);
}
}
Err(e) => eprintln!("Failed to start workload on {}: {}", args.host, e),
}
}
55 changes: 55 additions & 0 deletions rust/load/src/bin/chroma-load-status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! Inspect chroma-load
use clap::Parser;

#[derive(Parser, Debug)]
struct Args {
#[arg(long)]
host: String,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let client = reqwest::Client::new();
match client
.get(&args.host)
.header(reqwest::header::ACCEPT, "application/json")
.send()
.await
{
Ok(resp) => {
if resp.status().is_success() {
let status = match resp.json::<chroma_load::rest::Status>().await {
Ok(status) => status,
Err(e) => {
eprintln!("Failed to fetch workload status on {}: {}", args.host, e);
return;
}
};
if status.inhibited {
println!("inhibited");
} else {
for running in status.running {
println!(
"{} {} {} {} {}",
running.uuid,
running.expires,
running.name,
running.data_set,
// SAFETY(rescrv): WorkloadSummary always converts to JSON.
serde_json::to_string(&running.workload).unwrap()
);
}
}
} else {
eprintln!(
"Failed to get workload status on {}: {}",
args.host,
resp.status()
);
}
}
Err(e) => eprintln!("Failed to get workload status on {}: {}", args.host, e),
}
}
44 changes: 44 additions & 0 deletions rust/load/src/bin/chroma-load-stop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! Stop a single workload on the chroma-load server.
//!
//! If you are looking to stop traffic for a SEV, see chroma-load-inhibit.
use clap::Parser;
use uuid::Uuid;

use chroma_load::rest::StopRequest;

#[derive(Parser, Debug)]
struct Args {
#[arg(long)]
host: String,
#[arg(long)]
uuid: String,
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let client = reqwest::Client::new();
let req = StopRequest {
uuid: Uuid::parse_str(&args.uuid).unwrap(),
};
match client
.post(format!("{}/stop", args.host))
.json(&req)
.send()
.await
{
Ok(resp) => {
if resp.status().is_success() {
println!("Stopped workload on {}", args.host);
} else {
eprintln!(
"Failed to stop workload on {}: {}",
args.host,
resp.status()
);
}
}
Err(e) => eprintln!("Failed to stop workload on {}: {}", args.host, e),
}
}
18 changes: 18 additions & 0 deletions rust/load/src/bin/chroma-load-uninhibit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Uninhibit chroma-load on every host provided on the command line.
#[tokio::main]
async fn main() {
for host in std::env::args().skip(1) {
let client = reqwest::Client::new();
match client.post(format!("{}/uninhibit", host)).send().await {
Ok(resp) => {
if resp.status().is_success() {
println!("Resumed load on {}", host);
} else {
eprintln!("Failed to uninhibit load on {}: {}", host, resp.status());
}
}
Err(e) => eprintln!("Failed to uninhibit load on {}: {}", host, e),
}
}
}
Loading

0 comments on commit 9ec6b33

Please sign in to comment.