Skip to content

Commit

Permalink
feat: flush metrics on clean shutdown (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
sighphyre authored Oct 16, 2024
1 parent 32680a7 commit acfb38b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
31 changes: 31 additions & 0 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,37 @@ fn decide_where_to_post(
}
}

pub async fn send_metrics_one_shot(
metrics_cache: Arc<MetricsCache>,
feature_refresher: Arc<FeatureRefresher>,
) {
let envs = metrics_cache.get_metrics_by_environment();
for (env, batch) in envs.iter() {
let (use_new_endpoint, token) =
decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone());
let batches = metrics_cache.get_appropriately_sized_env_batches(batch);
trace!("Posting {} batches for {env}", batches.len());
for batch in batches {
if !batch.applications.is_empty() || !batch.metrics.is_empty() {
let result = if use_new_endpoint {
feature_refresher
.unleash_client
.send_bulk_metrics_to_client_endpoint(batch.clone(), &token)
.await
} else {
feature_refresher
.unleash_client
.send_batch_metrics(batch.clone())
.await
};
if let Err(edge_error) = result {
warn!("Shut down metrics flush failed with {edge_error:?}")
}
}
}
}
}

pub async fn send_metrics_task(
metrics_cache: Arc<MetricsCache>,
feature_refresher: Arc<FeatureRefresher>,
Expand Down
13 changes: 11 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use unleash_types::client_metrics::ConnectVia;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use tracing::info;
use unleash_edge::builder::build_caches_and_refreshers;
use unleash_edge::cli::{CliArgs, EdgeMode};
use unleash_edge::http::background_send_metrics::send_metrics_one_shot;
use unleash_edge::http::feature_refresher::FeatureRefresher;
use unleash_edge::metrics::client_metrics::MetricsCache;
use unleash_edge::offline::offline_hotload;
use unleash_edge::persistence::{persist_data, EdgePersistence};
Expand Down Expand Up @@ -148,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> {
tokio::select! {
_ = server.run() => {
tracing::info!("Actix is shutting down. Persisting data");
clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone()).await;
clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await;
tracing::info!("Actix was shutdown properly");
},
_ = refresher.start_refresh_features_background_task() => {
Expand Down Expand Up @@ -184,7 +187,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ => tokio::select! {
_ = server.run() => {
tracing::info!("Actix is shutting down. Persisting data");
clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone()).await;
clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await;
tracing::info!("Actix was shutdown properly");

}
Expand All @@ -199,6 +202,8 @@ async fn clean_shutdown(
persistence: Option<Arc<dyn EdgePersistence>>,
feature_cache: Arc<DashMap<String, ClientFeatures>>,
token_cache: Arc<DashMap<String, EdgeToken>>,
metrics_cache: Arc<MetricsCache>,
feature_refresher: Option<Arc<FeatureRefresher>>,
) {
let tokens: Vec<EdgeToken> = token_cache
.iter()
Expand All @@ -225,4 +230,8 @@ async fn clean_shutdown(
.for_each(|failed_save| tracing::error!("Failed backing up: {failed_save:?}"));
}
}
if let Some(feature_refresher) = feature_refresher {
info!("Connected to an upstream, flushing last set of metrics");
send_metrics_one_shot(metrics_cache, feature_refresher).await;
}
}

0 comments on commit acfb38b

Please sign in to comment.