diff --git a/Cargo.lock b/Cargo.lock index bf5967a..2102785 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "amp-resources", @@ -121,7 +121,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "amp-resolver", @@ -146,7 +146,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "clap", @@ -158,7 +158,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "amp-resources", @@ -172,7 +172,7 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "anyhow", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.6.7" +version = "0.6.8" dependencies = [ "amp-common", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 6a30e96..c35aa4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.6.7" +version = "0.6.8" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" diff --git a/apiserver/src/handlers/actor.rs b/apiserver/src/handlers/actor.rs index 38f2084..1c6325a 100644 --- a/apiserver/src/handlers/actor.rs +++ b/apiserver/src/handlers/actor.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::convert::Infallible; use std::sync::Arc; @@ -22,14 +23,18 @@ use axum::response::sse::{Event, KeepAlive}; use axum::response::{IntoResponse, Sse}; use axum::Json; -use futures::{AsyncBufReadExt, Stream}; -use k8s_openapi::api::core::v1::Pod; -use kube::api::{ListParams, LogParams}; -use kube::{Api, ResourceExt}; -use tokio_stream::StreamExt; +use futures::{AsyncBufReadExt, Stream, StreamExt, TryStreamExt}; +use k8s_openapi::api::core::v1::{ContainerStatus, Pod}; +use kube::api::LogParams; +use kube::Api; +use tokio::sync::mpsc::Sender; +use tokio::sync::RwLock; +use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info}; use uuid::Uuid; +use kube::runtime::{watcher, WatchStreamExt}; + use super::Result; use crate::context::Context; use crate::errors::ApiError; @@ -91,58 +96,88 @@ pub async fn logs( State(ctx): State>, Path((pid, name)): Path<(Uuid, String)>, ) -> Sse>> { - info!("Get logs of actor {}/{}", pid, name); + info!("Start to tail the log stream of actor {} in {}...", name, pid); + let (sender, receiver) = tokio::sync::mpsc::channel(100); + + // Watch the status of the pod, if the pod is running, then create a stream for it. + tokio::spawn(async move { + let api: Api = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}")); + let config = watcher::Config::default().labels(&format!("app.kubernetes.io/name={name}")); + let mut watcher = watcher(api.clone(), config).applied_objects().boxed(); + let subs = Arc::new(RwLock::new(HashSet::new())); + + while let Some(pod) = watcher.try_next().await.unwrap() { + if pod.status.is_none() { + continue; + } - let api: Api = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}")); - let param = ListParams { - label_selector: Some(format!("app.kubernetes.io/managed-by=Amphitheatre, app.kubernetes.io/name={name}")), - ..Default::default() - }; - let pods = api.list(¶m).await.unwrap(); + let status = pod.status.unwrap(); + let pod_name = pod.metadata.name.unwrap(); - // Create a stream that combines logs from all specified containers - let mut streams = Vec::new(); + // check the init container status, if it's not running, then skip it. + if let Some(init_containers) = status.init_container_statuses { + for status in init_containers { + log(&api, &pod_name, &status, &sender, subs.clone()).await; + } + } - for pod in pods { - if pod.spec.is_none() { - continue; + // check the container status, if it's not running, then skip it. + if let Some(containers) = status.container_statuses { + for status in containers { + log(&api, &pod_name, &status, &sender, subs.clone()).await; + } + } } + }); - let pod_name = pod.name_any(); - let spec = pod.spec.unwrap(); + let stream = ReceiverStream::new(receiver); + let stream = stream.map(|line| Event::default().data(line)).map(Ok); - if let Some(init_containers) = spec.init_containers { - for container in init_containers { - debug!("init container: {}", container.name); - streams.push(stream(&api, &pod_name, &container.name).await); - } - } - for container in spec.containers { - debug!("container: {}", container.name); - streams.push(stream(&api, &pod_name, &container.name).await); + Sse::new(stream).keep_alive(KeepAlive::default()) +} + +async fn log( + api: &Api, + pod: &str, + status: &ContainerStatus, + sender: &Sender, + subs: Arc>>, +) { + let pod = pod.to_string(); + let name = status.name.clone(); + let subscription_id: String = format!("{pod}-{name}", pod = pod, name = name); + + debug!("container status: {:?}", status); + + // If the container is not running, skip it. + if let Some(state) = &status.state { + if state.running.is_none() { + debug!("Skip log stream of container {} because it's not running.", name); + return; } } + // If job handle already exists in subscribe list, skip it. + if subs.read().await.contains(&subscription_id) { + debug!("Skip log stream of container {} because it's already subscribed.", name); + return; + } - let combined_stream = futures::stream::select_all(streams); - Sse::new(combined_stream).keep_alive(KeepAlive::default()) -} + let api = api.clone(); + let sender = sender.clone(); -/// Get the log stream of a container -async fn stream( - api: &Api, - name: &str, - container: &str, -) -> impl Stream> { - let params = LogParams { container: Some(container.into()), follow: true, timestamps: true, ..Default::default() }; - let stream = api.log_stream(name, ¶ms).await.unwrap().lines(); - - stream - .map(|result| match result { - Ok(line) => Event::default().data(line), - Err(err) => Event::default().event("error").data(err.to_string()), - }) - .map(Ok) - // .throttle(Duration::from_secs(1)) + tokio::spawn(async move { + let params = LogParams { container: Some(name.clone()), follow: true, timestamps: true, ..Default::default() }; + let mut stream = + api.log_stream(&pod, ¶ms).await.map_err(|e| ApiError::KubernetesError(e.to_string())).unwrap().lines(); + + info!("Start to receive the log stream of container {} in {}...", name, pod); + while let Some(line) = stream.try_next().await.unwrap() { + let _ = sender.send(line).await; + } + }); + + // save the job handle to subscribe list. + subs.write().await.insert(subscription_id); } /// Returns a actor's info, including environments, volumes... diff --git a/apiserver/src/services/playbook.rs b/apiserver/src/services/playbook.rs index 54edc6a..176c8fb 100644 --- a/apiserver/src/services/playbook.rs +++ b/apiserver/src/services/playbook.rs @@ -18,7 +18,6 @@ use amp_common::resource::{Playbook as PlaybookResource, PlaybookSpec}; use amp_resources::playbook; use chrono::Utc; use kube::ResourceExt; -use tracing::debug; use uuid::Uuid; use crate::context::Context; @@ -70,12 +69,8 @@ impl PlaybookService { }, ); - debug!("CreatePlaybookRequest: {:#?}", req); - debug!("PlaybookResource: {:#?}", resource); - let playbook = playbook::create(&ctx.k8s, &resource).await.map_err(|err| ApiError::KubernetesError(err.to_string()))?; - debug!("Created playbook custom response: {:#?}", playbook); Ok(PlaybookResponse { id: playbook.name_any(),