From 58effe78b5cc69355dad406f44cfe773cb4ed40d Mon Sep 17 00:00:00 2001 From: "Wang, Yi" Date: Mon, 8 Jul 2024 22:03:59 +0800 Subject: [PATCH] =?UTF-8?q?update=20to=20metrics=200.23.0=20or=20could=20w?= =?UTF-8?q?ork=20with=20metrics-exporter-promethe=E2=80=A6=20(#2190)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit update to metrics 0.23.0 or could work with metrics-exporter-prometheus 0.15.1 Signed-off-by: Wang, Yi A --- Cargo.lock | 28 ++----------- router/Cargo.toml | 2 +- router/src/infer/mod.rs | 8 ++-- router/src/infer/v2/queue.rs | 8 ++-- router/src/infer/v2/scheduler.rs | 61 ++++++++++++++++------------ router/src/infer/v3/queue.rs | 8 ++-- router/src/infer/v3/scheduler.rs | 61 ++++++++++++++++------------ router/src/server.rs | 68 ++++++++++++++------------------ router/src/validation.rs | 4 +- 9 files changed, 119 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 090e2e80fe0..a8a04c71388 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1935,17 +1935,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "metrics" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" -dependencies = [ - "ahash", - "metrics-macros", - "portable-atomic", -] - [[package]] name = "metrics" version = "0.23.0" @@ -1969,7 +1958,7 @@ dependencies = [ "hyper-util", "indexmap 2.2.6", "ipnet", - "metrics 0.23.0", + "metrics", "metrics-util", "quanta", "thiserror", @@ -1977,17 +1966,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "metrics-macros" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.68", -] - [[package]] name = "metrics-util" version = "0.17.0" @@ -1997,7 +1975,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", - "metrics 0.23.0", + "metrics", "num_cpus", "quanta", "sketches-ddsketch", @@ -3834,7 +3812,7 @@ dependencies = [ "init-tracing-opentelemetry", "itertools 0.10.5", "jsonschema", - "metrics 0.21.1", + "metrics", "metrics-exporter-prometheus", "minijinja", "minijinja-contrib", diff --git a/router/Cargo.toml b/router/Cargo.toml index 5855ac86a4a..60fb5c9d109 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -24,7 +24,7 @@ futures = "0.3.28" hf-hub = { workspace = true } itertools = "0.10" jsonschema = { version = "0.17.1", features = ["draft202012"] } -metrics = "0.21.1" +metrics = "0.23.0" metrics-exporter-prometheus = { version = "0.15.1", features = [] } nohash-hasher = "0.2.0" opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } diff --git a/router/src/infer/mod.rs b/router/src/infer/mod.rs index 49282eb9eca..f3b10450ae0 100644 --- a/router/src/infer/mod.rs +++ b/router/src/infer/mod.rs @@ -91,14 +91,14 @@ impl Infer { .limit_concurrent_requests .try_acquire_owned() .map_err(|err| { - metrics::increment_counter!("tgi_request_failure", "err" => "overloaded"); + metrics::counter!("tgi_request_failure", "err" => "overloaded").increment(1); tracing::error!("{err}"); err })?; // Validate request let valid_request = self.validation.validate(request).await.map_err(|err| { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); tracing::error!("{err}"); err })?; @@ -140,7 +140,7 @@ impl Infer { .ok_or_else(|| InferError::TemplateError(ErrorKind::TemplateNotFound.into()))? .apply(messages, grammar_with_prompt) .map_err(|e| { - metrics::increment_counter!("tgi_request_failure", "err" => "template"); + metrics::counter!("tgi_request_failure", "err" => "template").increment(1); tracing::error!("{e}"); e }) @@ -214,7 +214,7 @@ impl Infer { }) } else { let err = InferError::IncompleteGeneration; - metrics::increment_counter!("tgi_request_failure", "err" => "incomplete"); + metrics::counter!("tgi_request_failure", "err" => "incomplete").increment(1); tracing::error!("{err}"); Err(err) } diff --git a/router/src/infer/v2/queue.rs b/router/src/infer/v2/queue.rs index 93cf94699d8..0b51645a84b 100644 --- a/router/src/infer/v2/queue.rs +++ b/router/src/infer/v2/queue.rs @@ -111,7 +111,7 @@ async fn queue_task( match cmd { QueueCommand::Append(entry, span) => { span.in_scope(|| state.append(*entry)); - metrics::increment_gauge!("tgi_queue_size", 1.0); + metrics::gauge!("tgi_queue_size").increment(1.0); } QueueCommand::NextBatch { min_size, @@ -124,7 +124,7 @@ async fn queue_task( let next_batch = state.next_batch(min_size, max_size, prefill_token_budget, token_budget); response_sender.send(next_batch).unwrap(); - metrics::gauge!("tgi_queue_size", state.entries.len() as f64); + metrics::gauge!("tgi_queue_size").set(state.entries.len() as f64); }), } } @@ -226,7 +226,7 @@ impl State { // Filter entries where the response receiver was dropped (== entries where the request // was dropped by the client) if entry.response_tx.is_closed() { - metrics::increment_counter!("tgi_request_failure", "err" => "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); tracing::debug!("Dropping entry"); continue; } @@ -336,7 +336,7 @@ impl State { // Increment batch id self.next_batch_id += 1; - metrics::histogram!("tgi_batch_next_size", batch.size as f64); + metrics::histogram!("tgi_batch_next_size").record(batch.size as f64); Some((batch_entries, batch, next_batch_span)) } diff --git a/router/src/infer/v2/scheduler.rs b/router/src/infer/v2/scheduler.rs index e4c3de26792..97379bc53c7 100644 --- a/router/src/infer/v2/scheduler.rs +++ b/router/src/infer/v2/scheduler.rs @@ -148,8 +148,8 @@ pub(crate) async fn batching_task( let batch_size = batch.size; let batch_max_tokens = batch.max_tokens; let mut batches = vec![batch]; - metrics::gauge!("tgi_batch_current_size", batch_size as f64); - metrics::gauge!("tgi_batch_current_max_tokens", batch_max_tokens as f64); + metrics::gauge!("tgi_batch_current_size").set(batch_size as f64); + metrics::gauge!("tgi_batch_current_max_tokens").set(batch_max_tokens as f64); let min_size = if waiting_tokens >= max_waiting_tokens { // If we didn't onboard any new requests since >= max_waiting_tokens, we try @@ -170,9 +170,11 @@ pub(crate) async fn batching_task( { // Tracking metrics if min_size.is_some() { - metrics::increment_counter!("tgi_batch_concat", "reason" => "backpressure"); + metrics::counter!("tgi_batch_concat", "reason" => "backpressure") + .increment(1); } else { - metrics::increment_counter!("tgi_batch_concat", "reason" => "wait_exceeded"); + metrics::counter!("tgi_batch_concat", "reason" => "wait_exceeded") + .increment(1); } entries.iter_mut().for_each(|(_, entry)| { @@ -219,8 +221,8 @@ pub(crate) async fn batching_task( .await; waiting_tokens += 1; } - metrics::gauge!("tgi_batch_current_size", 0.0); - metrics::gauge!("tgi_batch_current_max_tokens", 0.0); + metrics::gauge!("tgi_batch_current_size").set(0.0); + metrics::gauge!("tgi_batch_current_max_tokens").set(0.0); } } } @@ -234,7 +236,7 @@ async fn prefill( ) -> Option { let start_time = Instant::now(); let batch_id = batch.id; - metrics::increment_counter!("tgi_batch_inference_count", "method" => "prefill"); + metrics::counter!("tgi_batch_inference_count", "method" => "prefill").increment(1); match client.prefill(batch).await { Ok((generations, next_batch, timings)) => { @@ -248,11 +250,15 @@ async fn prefill( // Filter next batch and remove requests that were stopped let next_batch = filter_batch(client, next_batch, entries).await; - metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "prefill"); - metrics::increment_counter!("tgi_batch_inference_success", "method" => "prefill"); + metrics::histogram!("tgi_batch_forward_duration","method" => "prefill") + .record(timings.forward.as_secs_f64()); + metrics::histogram!("tgi_batch_decode_duration", "method" => "prefill") + .record(timings.decode.as_secs_f64()); + metrics::histogram!("tgi_batch_filter_duration", "method" => "prefill") + .record(start_filtering_time.elapsed().as_secs_f64()); + metrics::histogram!("tgi_batch_inference_duration","method" => "prefill") + .record(start_time.elapsed().as_secs_f64()); + metrics::counter!("tgi_batch_inference_success", "method" => "prefill").increment(1); next_batch } // If we have an error, we discard the whole batch @@ -261,7 +267,7 @@ async fn prefill( generation_health.store(false, Ordering::SeqCst); let _ = client.clear_cache(Some(batch_id)).await; send_errors(err, entries); - metrics::increment_counter!("tgi_batch_inference_failure", "method" => "prefill"); + metrics::counter!("tgi_batch_inference_failure", "method" => "prefill").increment(1); None } } @@ -276,7 +282,7 @@ async fn decode( ) -> Option { let start_time = Instant::now(); let batch_ids: Vec = batches.iter().map(|b| b.id).collect(); - metrics::increment_counter!("tgi_batch_inference_count", "method" => "decode"); + metrics::counter!("tgi_batch_inference_count", "method" => "decode").increment(1); match client.decode(batches).await { Ok((generations, next_batch, timings)) => { @@ -291,13 +297,18 @@ async fn decode( let next_batch = filter_batch(client, next_batch, entries).await; if let Some(concat_duration) = timings.concat { - metrics::histogram!("tgi_batch_concat_duration", concat_duration.as_secs_f64(), "method" => "decode"); + metrics::histogram!("tgi_batch_concat_duration", "method" => "decode") + .record(concat_duration.as_secs_f64()); } - metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "decode"); - metrics::increment_counter!("tgi_batch_inference_success", "method" => "decode"); + metrics::histogram!("tgi_batch_forward_duration", "method" => "decode") + .record(timings.forward.as_secs_f64()); + metrics::histogram!("tgi_batch_decode_duration", "method" => "decode") + .record(timings.decode.as_secs_f64()); + metrics::histogram!("tgi_batch_filter_duration", "method" => "decode") + .record(start_filtering_time.elapsed().as_secs_f64()); + metrics::histogram!("tgi_batch_inference_duration", "method" => "decode") + .record(start_time.elapsed().as_secs_f64()); + metrics::counter!("tgi_batch_inference_success", "method" => "decode").increment(1); next_batch } // If we have an error, we discard the whole batch @@ -307,7 +318,7 @@ async fn decode( let _ = client.clear_cache(Some(id)).await; } send_errors(err, entries); - metrics::increment_counter!("tgi_batch_inference_failure", "method" => "decode"); + metrics::counter!("tgi_batch_inference_failure", "method" => "decode").increment(1); None } } @@ -365,7 +376,7 @@ fn filter_send_generations(generations: Vec, entries: &mut IntMap "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); err }).unwrap_or(true); if stopped { @@ -381,7 +392,7 @@ fn send_responses( ) -> Result>>> { // Return directly if the channel is disconnected if entry.response_tx.is_closed() { - metrics::increment_counter!("tgi_request_failure", "err" => "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); return Ok(true); } @@ -407,7 +418,7 @@ fn send_responses( // Create last Token let tokens_ = generation.tokens.expect("Non empty tokens in generation"); let n = tokens_.ids.len(); - metrics::histogram!("tgi_request_skipped_tokens", (n - 1) as f64); + metrics::histogram!("tgi_request_skipped_tokens").record((n - 1) as f64); let mut iterator = tokens_ .ids .into_iter() @@ -472,7 +483,7 @@ fn send_errors(error: ClientError, entries: &mut IntMap) { // Create and enter a span to link this function back to the entry let _send_error_span = info_span!(parent: entry.temp_span.as_ref().expect("batch_span is None. This is a bug."), "send_error").entered(); let err = InferError::GenerationError(error.to_string()); - metrics::increment_counter!("tgi_request_failure", "err" => "generation"); + metrics::counter!("tgi_request_failure", "err" => "generation").increment(1); tracing::error!("{err}"); // unwrap_or is valid here as we don't care if the receiver is gone. diff --git a/router/src/infer/v3/queue.rs b/router/src/infer/v3/queue.rs index ba65b9b6a8c..894d9cab4ee 100644 --- a/router/src/infer/v3/queue.rs +++ b/router/src/infer/v3/queue.rs @@ -126,7 +126,7 @@ async fn queue_task( match cmd { QueueCommand::Append(entry, span) => { span.in_scope(|| state.append(*entry)); - metrics::increment_gauge!("tgi_queue_size", 1.0); + metrics::gauge!("tgi_queue_size").increment(1.0); } QueueCommand::NextBatch { min_size, @@ -141,7 +141,7 @@ async fn queue_task( .instrument(span) .await; response_sender.send(next_batch).unwrap(); - metrics::gauge!("tgi_queue_size", state.entries.len() as f64); + metrics::gauge!("tgi_queue_size").set(state.entries.len() as f64); } } } @@ -248,7 +248,7 @@ impl State { // Filter entries where the response receiver was dropped (== entries where the request // was dropped by the client) if entry.response_tx.is_closed() { - metrics::increment_counter!("tgi_request_failure", "err" => "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); tracing::debug!("Dropping entry"); continue; } @@ -399,7 +399,7 @@ impl State { // Increment batch id self.next_batch_id += 1; - metrics::histogram!("tgi_batch_next_size", batch.size as f64); + metrics::histogram!("tgi_batch_next_size").record(batch.size as f64); Some((batch_entries, batch, next_batch_span)) } diff --git a/router/src/infer/v3/scheduler.rs b/router/src/infer/v3/scheduler.rs index 543ce89f823..26cd95847de 100644 --- a/router/src/infer/v3/scheduler.rs +++ b/router/src/infer/v3/scheduler.rs @@ -154,8 +154,8 @@ pub(crate) async fn batching_task( let batch_size = batch.size; let batch_max_tokens = batch.max_tokens; let mut batches = vec![batch]; - metrics::gauge!("tgi_batch_current_size", batch_size as f64); - metrics::gauge!("tgi_batch_current_max_tokens", batch_max_tokens as f64); + metrics::gauge!("tgi_batch_current_size").set(batch_size as f64); + metrics::gauge!("tgi_batch_current_max_tokens").set(batch_max_tokens as f64); let min_size = if waiting_tokens >= max_waiting_tokens { // If we didn't onboard any new requests since >= max_waiting_tokens, we try @@ -176,9 +176,11 @@ pub(crate) async fn batching_task( { // Tracking metrics if min_size.is_some() { - metrics::increment_counter!("tgi_batch_concat", "reason" => "backpressure"); + metrics::counter!("tgi_batch_concat", "reason" => "backpressure") + .increment(1); } else { - metrics::increment_counter!("tgi_batch_concat", "reason" => "wait_exceeded"); + metrics::counter!("tgi_batch_concat", "reason" => "wait_exceeded") + .increment(1); } entries.iter_mut().for_each(|(_, entry)| { @@ -225,8 +227,8 @@ pub(crate) async fn batching_task( .await; waiting_tokens += 1; } - metrics::gauge!("tgi_batch_current_size", 0.0); - metrics::gauge!("tgi_batch_current_max_tokens", 0.0); + metrics::gauge!("tgi_batch_current_size").set(0.0); + metrics::gauge!("tgi_batch_current_max_tokens").set(0.0); } } } @@ -240,7 +242,7 @@ async fn prefill( ) -> Option { let start_time = Instant::now(); let batch_id = batch.id; - metrics::increment_counter!("tgi_batch_inference_count", "method" => "prefill"); + metrics::counter!("tgi_batch_inference_count", "method" => "prefill").increment(1); match client.prefill(batch).await { Ok((generations, next_batch, timings)) => { @@ -254,11 +256,15 @@ async fn prefill( // Filter next batch and remove requests that were stopped let next_batch = filter_batch(client, next_batch, entries).await; - metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "prefill"); - metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "prefill"); - metrics::increment_counter!("tgi_batch_inference_success", "method" => "prefill"); + metrics::histogram!("tgi_batch_forward_duration","method" => "prefill") + .record(timings.forward.as_secs_f64()); + metrics::histogram!("tgi_batch_decode_duration", "method" => "prefill") + .record(timings.decode.as_secs_f64()); + metrics::histogram!("tgi_batch_filter_duration", "method" => "prefill") + .record(start_filtering_time.elapsed().as_secs_f64()); + metrics::histogram!("tgi_batch_inference_duration", "method" => "prefill") + .record(start_time.elapsed().as_secs_f64()); + metrics::counter!("tgi_batch_inference_success", "method" => "prefill").increment(1); next_batch } // If we have an error, we discard the whole batch @@ -267,7 +273,7 @@ async fn prefill( generation_health.store(false, Ordering::SeqCst); let _ = client.clear_cache(Some(batch_id)).await; send_errors(err, entries); - metrics::increment_counter!("tgi_batch_inference_failure", "method" => "prefill"); + metrics::counter!("tgi_batch_inference_failure", "method" => "prefill").increment(1); None } } @@ -282,7 +288,7 @@ async fn decode( ) -> Option { let start_time = Instant::now(); let batch_ids: Vec = batches.iter().map(|b| b.id).collect(); - metrics::increment_counter!("tgi_batch_inference_count", "method" => "decode"); + metrics::counter!("tgi_batch_inference_count", "method" => "decode").increment(1); match client.decode(batches).await { Ok((generations, next_batch, timings)) => { @@ -297,13 +303,18 @@ async fn decode( let next_batch = filter_batch(client, next_batch, entries).await; if let Some(concat_duration) = timings.concat { - metrics::histogram!("tgi_batch_concat_duration", concat_duration.as_secs_f64(), "method" => "decode"); + metrics::histogram!("tgi_batch_concat_duration", "method" => "decode") + .record(concat_duration.as_secs_f64()); } - metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "decode"); - metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "decode"); - metrics::increment_counter!("tgi_batch_inference_success", "method" => "decode"); + metrics::histogram!("tgi_batch_forward_duration", "method" => "decode") + .record(timings.forward.as_secs_f64()); + metrics::histogram!("tgi_batch_decode_duration", "method" => "decode") + .record(timings.decode.as_secs_f64()); + metrics::histogram!("tgi_batch_filter_duration", "method" => "decode") + .record(start_filtering_time.elapsed().as_secs_f64()); + metrics::histogram!("tgi_batch_inference_duration", "method" => "decode") + .record(start_time.elapsed().as_secs_f64()); + metrics::counter!("tgi_batch_inference_success", "method" => "decode").increment(1); next_batch } // If we have an error, we discard the whole batch @@ -313,7 +324,7 @@ async fn decode( let _ = client.clear_cache(Some(id)).await; } send_errors(err, entries); - metrics::increment_counter!("tgi_batch_inference_failure", "method" => "decode"); + metrics::counter!("tgi_batch_inference_failure", "method" => "decode").increment(1); None } } @@ -371,7 +382,7 @@ fn filter_send_generations(generations: Vec, entries: &mut IntMap "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); err }).unwrap_or(true); if stopped { @@ -387,7 +398,7 @@ fn send_responses( ) -> Result>>> { // Return directly if the channel is disconnected if entry.response_tx.is_closed() { - metrics::increment_counter!("tgi_request_failure", "err" => "dropped"); + metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1); return Ok(true); } @@ -413,7 +424,7 @@ fn send_responses( // Create last Token let tokens_ = generation.tokens.expect("Non empty tokens in generation"); let n = tokens_.ids.len(); - metrics::histogram!("tgi_request_skipped_tokens", (n - 1) as f64); + metrics::histogram!("tgi_request_skipped_tokens").record((n - 1) as f64); let mut iterator = tokens_ .ids .into_iter() @@ -478,7 +489,7 @@ fn send_errors(error: ClientError, entries: &mut IntMap) { // Create and enter a span to link this function back to the entry let _send_error_span = info_span!(parent: entry.temp_span.as_ref().expect("batch_span is None. This is a bug."), "send_error").entered(); let err = InferError::GenerationError(error.to_string()); - metrics::increment_counter!("tgi_request_failure", "err" => "generation"); + metrics::counter!("tgi_request_failure", "err" => "generation").increment(1); tracing::error!("{err}"); // unwrap_or is valid here as we don't care if the receiver is gone. diff --git a/router/src/server.rs b/router/src/server.rs index db8b16ad0b1..4af8962e89e 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -185,7 +185,7 @@ pub(crate) async fn generate_internal( span: tracing::Span, ) -> Result<(HeaderMap, Json), (StatusCode, Json)> { let start_time = Instant::now(); - metrics::increment_counter!("tgi_request_count"); + metrics::counter!("tgi_request_count").increment(1); // Do not long ultra long inputs, like image payloads. tracing::debug!("Input: {}", &req.inputs[..1000.min(req.inputs.len())]); @@ -301,25 +301,15 @@ pub(crate) async fn generate_internal( ); // Metrics - metrics::increment_counter!("tgi_request_success"); - metrics::histogram!("tgi_request_duration", total_time.as_secs_f64()); - metrics::histogram!( - "tgi_request_validation_duration", - validation_time.as_secs_f64() - ); - metrics::histogram!("tgi_request_queue_duration", queue_time.as_secs_f64()); - metrics::histogram!( - "tgi_request_inference_duration", - inference_time.as_secs_f64() - ); - metrics::histogram!( - "tgi_request_mean_time_per_token_duration", - time_per_token.as_secs_f64() - ); - metrics::histogram!( - "tgi_request_generated_tokens", - response.generated_text.generated_tokens as f64 - ); + metrics::counter!("tgi_request_success").increment(1); + metrics::histogram!("tgi_request_duration").record(total_time.as_secs_f64()); + metrics::histogram!("tgi_request_validation_duration").record(validation_time.as_secs_f64()); + metrics::histogram!("tgi_request_queue_duration").record(queue_time.as_secs_f64()); + metrics::histogram!("tgi_request_inference_duration").record(inference_time.as_secs_f64()); + metrics::histogram!("tgi_request_mean_time_per_token_duration") + .record(time_per_token.as_secs_f64()); + metrics::histogram!("tgi_request_generated_tokens") + .record(response.generated_text.generated_tokens as f64); // Send response let mut output_text = response.generated_text.text; @@ -399,7 +389,7 @@ async fn generate_stream_internal( span: tracing::Span, ) -> (HeaderMap, impl Stream>) { let start_time = Instant::now(); - metrics::increment_counter!("tgi_request_count"); + metrics::counter!("tgi_request_count").increment(1); tracing::debug!("Input: {}", req.inputs); @@ -427,12 +417,12 @@ async fn generate_stream_internal( let best_of = req.parameters.best_of.unwrap_or(1); if best_of != 1 { let err = InferError::from(ValidationError::BestOfStream); - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); tracing::error!("{err}"); yield Ok(Event::from(err)); } else if req.parameters.decoder_input_details { let err = InferError::from(ValidationError::PrefillDetailsStream); - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); tracing::error!("{err}"); yield Ok(Event::from(err)); } else { @@ -500,13 +490,13 @@ async fn generate_stream_internal( span.record("seed", format!("{:?}", generated_text.seed)); // Metrics - metrics::increment_counter!("tgi_request_success"); - metrics::histogram!("tgi_request_duration", total_time.as_secs_f64()); - metrics::histogram!("tgi_request_validation_duration", validation_time.as_secs_f64()); - metrics::histogram!("tgi_request_queue_duration", queue_time.as_secs_f64()); - metrics::histogram!("tgi_request_inference_duration", inference_time.as_secs_f64()); - metrics::histogram!("tgi_request_mean_time_per_token_duration", time_per_token.as_secs_f64()); - metrics::histogram!("tgi_request_generated_tokens", generated_text.generated_tokens as f64); + metrics::counter!("tgi_request_success").increment(1); + metrics::histogram!("tgi_request_duration").record(total_time.as_secs_f64()); + metrics::histogram!("tgi_request_validation_duration").record(validation_time.as_secs_f64()); + metrics::histogram!("tgi_request_queue_duration").record(queue_time.as_secs_f64()); + metrics::histogram!("tgi_request_inference_duration").record(inference_time.as_secs_f64()); + metrics::histogram!("tgi_request_mean_time_per_token_duration").record(time_per_token.as_secs_f64()); + metrics::histogram!("tgi_request_generated_tokens").record(generated_text.generated_tokens as f64); // StreamResponse end_reached = true; @@ -553,7 +543,7 @@ async fn generate_stream_internal( // Skip if we already sent an error if !end_reached && !error { let err = InferError::IncompleteGeneration; - metrics::increment_counter!("tgi_request_failure", "err" => "incomplete"); + metrics::counter!("tgi_request_failure", "err" => "incomplete").increment(1); tracing::error!("{err}"); yield Ok(Event::from(err)); } @@ -604,7 +594,7 @@ async fn completions( Json(req): Json, ) -> Result)> { let span = tracing::Span::current(); - metrics::increment_counter!("tgi_request_count"); + metrics::counter!("tgi_request_count").increment(1); let CompletionRequest { max_tokens, @@ -625,7 +615,7 @@ async fn completions( // if suffix is present throw an error if req.suffix.is_some() { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); return Err(( StatusCode::UNPROCESSABLE_ENTITY, Json(ErrorResponse { @@ -637,7 +627,7 @@ async fn completions( } if req.prompt.0.len() > info.max_client_batch_size { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); return Err(( StatusCode::UNPROCESSABLE_ENTITY, Json(ErrorResponse { @@ -1009,7 +999,7 @@ async fn chat_completions( Json(req): Json, ) -> Result)> { let span = tracing::Span::current(); - metrics::increment_counter!("tgi_request_count"); + metrics::counter!("tgi_request_count").increment(1); let ChatRequest { logprobs, max_tokens, @@ -1039,7 +1029,7 @@ async fn chat_completions( // response_format and tools are mutually exclusive if response_format.is_some() && tools.as_ref().is_some() { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); return Err(( StatusCode::UNPROCESSABLE_ENTITY, Json(ErrorResponse { @@ -1053,7 +1043,7 @@ async fn chat_completions( let tool_grammar = match ToolGrammar::apply(tools, tool_choice) { Ok(grammar) => grammar, Err(err) => { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); tracing::error!("{err}"); return Err(( StatusCode::UNPROCESSABLE_ENTITY, @@ -1082,7 +1072,7 @@ async fn chat_completions( let inputs = match infer.apply_chat_template(messages, tools_grammar_prompt) { Ok(inputs) => inputs, Err(err) => { - metrics::increment_counter!("tgi_request_failure", "err" => "validation"); + metrics::counter!("tgi_request_failure", "err" => "validation").increment(1); tracing::error!("{err}"); return Err(( StatusCode::UNPROCESSABLE_ENTITY, @@ -1280,7 +1270,7 @@ async fn vertex_compatibility( Json(req): Json, ) -> Result)> { let span = tracing::Span::current(); - metrics::increment_counter!("tgi_request_count"); + metrics::counter!("tgi_request_count").increment(1); // check that theres at least one instance if req.instances.is_empty() { diff --git a/router/src/validation.rs b/router/src/validation.rs index 12cf2ab3678..07ad14c9cdb 100644 --- a/router/src/validation.rs +++ b/router/src/validation.rs @@ -157,7 +157,7 @@ impl Validation { )); } - metrics::histogram!("tgi_request_input_length", input_length as f64); + metrics::histogram!("tgi_request_input_length").record(input_length as f64); Ok((inputs, input_length, max_new_tokens)) } // Return inputs without validation @@ -384,7 +384,7 @@ impl Validation { ignore_eos_token: false, }; - metrics::histogram!("tgi_request_max_new_tokens", max_new_tokens as f64); + metrics::histogram!("tgi_request_max_new_tokens").record(max_new_tokens as f64); Ok(ValidGenerateRequest { inputs,