diff --git a/Cargo.lock b/Cargo.lock index d2cdc7e..31baa55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "amp-resources", @@ -121,7 +121,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "amp-resolver", @@ -146,7 +146,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "clap", @@ -158,7 +158,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "amp-resources", @@ -172,7 +172,7 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "anyhow", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.6.5" +version = "0.6.6" dependencies = [ "amp-common", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 613c0d1..b895900 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.6.5" +version = "0.6.6" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" diff --git a/apiserver/src/handlers/actor.rs b/apiserver/src/handlers/actor.rs index b63511a..38f2084 100644 --- a/apiserver/src/handlers/actor.rs +++ b/apiserver/src/handlers/actor.rs @@ -14,7 +14,6 @@ use std::convert::Infallible; use std::sync::Arc; -use std::time::Duration; use amp_common::sync::Synchronization; use axum::extract::{Path, State}; @@ -23,13 +22,12 @@ use axum::response::sse::{Event, KeepAlive}; use axum::response::{IntoResponse, Sse}; use axum::Json; -use futures::AsyncBufReadExt; -use futures::Stream; -use tokio_stream::StreamExt; - +use futures::{AsyncBufReadExt, Stream}; use k8s_openapi::api::core::v1::Pod; -use kube::api::LogParams; -use kube::Api; +use kube::api::{ListParams, LogParams}; +use kube::{Api, ResourceExt}; +use tokio_stream::StreamExt; +use tracing::{debug, info}; use uuid::Uuid; use super::Result; @@ -93,22 +91,58 @@ pub async fn logs( State(ctx): State>, Path((pid, name)): Path<(Uuid, String)>, ) -> Sse>> { - let api: Api = Api::namespaced(ctx.k8s.clone(), &pid.to_string()); - let params = LogParams::default(); - - let stream = api - .log_stream(&name, ¶ms) - .await - .unwrap() - .lines() + info!("Get logs of actor {}/{}", pid, name); + + let api: Api = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}")); + let param = ListParams { + label_selector: Some(format!("app.kubernetes.io/managed-by=Amphitheatre, app.kubernetes.io/name={name}")), + ..Default::default() + }; + let pods = api.list(¶m).await.unwrap(); + + // Create a stream that combines logs from all specified containers + let mut streams = Vec::new(); + + for pod in pods { + if pod.spec.is_none() { + continue; + } + + let pod_name = pod.name_any(); + let spec = pod.spec.unwrap(); + + if let Some(init_containers) = spec.init_containers { + for container in init_containers { + debug!("init container: {}", container.name); + streams.push(stream(&api, &pod_name, &container.name).await); + } + } + for container in spec.containers { + debug!("container: {}", container.name); + streams.push(stream(&api, &pod_name, &container.name).await); + } + } + + let combined_stream = futures::stream::select_all(streams); + Sse::new(combined_stream).keep_alive(KeepAlive::default()) +} + +/// Get the log stream of a container +async fn stream( + api: &Api, + name: &str, + container: &str, +) -> impl Stream> { + let params = LogParams { container: Some(container.into()), follow: true, timestamps: true, ..Default::default() }; + let stream = api.log_stream(name, ¶ms).await.unwrap().lines(); + + stream .map(|result| match result { Ok(line) => Event::default().data(line), Err(err) => Event::default().event("error").data(err.to_string()), }) .map(Ok) - .throttle(Duration::from_secs(1)); - - Sse::new(stream).keep_alive(KeepAlive::default()) + // .throttle(Duration::from_secs(1)) } /// Returns a actor's info, including environments, volumes...