diff --git a/Cargo.lock b/Cargo.lock index 67ac844..081e08f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,7 +62,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "amp-resources", @@ -94,7 +94,7 @@ dependencies = [ [[package]] name = "amp-builder" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "amp-resources", @@ -136,7 +136,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "amp-resolver", @@ -162,7 +162,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "clap", @@ -174,7 +174,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "amp-resources", @@ -188,13 +188,14 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "anyhow", "k8s-metrics", "k8s-openapi", "kube", + "lazy_static", "serde", "serde_json", "serde_yaml", @@ -208,7 +209,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-common", "async-nats", @@ -225,7 +226,7 @@ dependencies = [ [[package]] name = "amp-workflow" -version = "0.8.11" +version = "0.8.12" dependencies = [ "amp-builder", "amp-common", diff --git a/Cargo.toml b/Cargo.toml index 89a29e8..c094c76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.8.11" +version = "0.8.12" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" @@ -36,6 +36,7 @@ dotenv = "0.15.0" futures = "0.3" kube = { version = "0.88.0", default-features = false, features = ["runtime", "derive", "rustls-tls"] } k8s-openapi = { version = "0.21.0", default-features = false, features = ["schemars", "v1_28"] } +lazy_static = "1.4.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9.31" diff --git a/apiserver/src/handlers/actor.rs b/apiserver/src/handlers/actor.rs index 5da055f..1b4c7c5 100644 --- a/apiserver/src/handlers/actor.rs +++ b/apiserver/src/handlers/actor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::convert::Infallible; use std::sync::Arc; @@ -23,22 +22,17 @@ use axum::response::sse::{Event, KeepAlive}; use axum::response::{IntoResponse, Sse}; use axum::Json; -use futures::{AsyncBufReadExt, Stream, StreamExt, TryStreamExt}; -use k8s_openapi::api::core::v1::{ContainerStatus, Pod}; -use kube::api::LogParams; -use kube::Api; -use tokio::sync::mpsc::Sender; -use tokio::sync::RwLock; +use futures::{Stream, StreamExt}; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, info}; -use uuid::Uuid; -use kube::runtime::{watcher, WatchStreamExt}; +use tracing::info; +use uuid::Uuid; use super::Result; use crate::context::Context; use crate::errors::ApiError; use crate::services::actor::ActorService; +use crate::services::logger::Logger; // The Actors Service Handlers. // See [API Documentation: actor](https://docs.amphitheatre.app/api/actor) @@ -99,92 +93,17 @@ pub async fn logs( info!("Start to tail the log stream of actor {} in {}...", name, pid); let (sender, receiver) = tokio::sync::mpsc::channel(100); - // Watch the status of the pod, if the pod is running, then create a stream for it. + // Start to watch the status of the pod. tokio::spawn(async move { - let api: Api = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}")); - let config = watcher::Config::default().labels(&format!("amphitheatre.app/character={name}")); - let mut watcher = watcher(api.clone(), config).applied_objects().boxed(); - let subs = Arc::new(RwLock::new(HashSet::new())); - - while let Some(pod) = watcher.try_next().await.unwrap() { - if pod.status.is_none() { - continue; - } - - let status = pod.status.unwrap(); - let pod_name = pod.metadata.name.unwrap(); - - // check the init container status, if it's not running, then skip it. - if let Some(init_containers) = status.init_container_statuses { - for status in init_containers { - log(&api, &pod_name, &status, &sender, subs.clone()).await; - } - } - - // check the container status, if it's not running, then skip it. - if let Some(containers) = status.container_statuses { - for status in containers { - log(&api, &pod_name, &status, &sender, subs.clone()).await; - } - } - } + Logger::new(ctx.k8s.clone(), sender.clone(), pid, name).start().await; }); let stream = ReceiverStream::new(receiver); - let stream = stream.map(|line| Event::default().data(line)).map(Ok); + let stream = stream.map(Ok); Sse::new(stream).keep_alive(KeepAlive::default()) } -async fn log( - api: &Api, - pod: &str, - status: &ContainerStatus, - sender: &Sender, - subs: Arc>>, -) { - let pod = pod.to_string(); - let name = status.name.clone(); - let subscription_id: String = format!("{pod}-{name}", pod = pod, name = name); - - debug!("container status: {:?}", status); - - // If the container is not running, skip it. - if let Some(state) = &status.state { - if state.running.is_none() { - debug!("Skip log stream of container {} because it's not running.", name); - return; - } - } - // If job handle already exists in subscribe list, skip it. - if subs.read().await.contains(&subscription_id) { - debug!("Skip log stream of container {} because it's already subscribed.", name); - return; - } - - let api = api.clone(); - let sender = sender.clone(); - - tokio::spawn(async move { - let params = LogParams { - container: Some(name.clone()), - follow: true, - tail_lines: Some(100), - timestamps: true, - ..Default::default() - }; - let mut stream = api.log_stream(&pod, ¶ms).await.map_err(ApiError::KubernetesError).unwrap().lines(); - - info!("Start to receive the log stream of container {} in {}...", name, pod); - while let Some(line) = stream.try_next().await.unwrap() { - let _ = sender.send(line).await; - } - }); - - // save the job handle to subscribe list. - subs.write().await.insert(subscription_id); -} - /// Returns a actor's info, including environments, volumes... #[utoipa::path( get, path = "/v1/actors/{pid}/{name}/info", diff --git a/apiserver/src/services/logger.rs b/apiserver/src/services/logger.rs new file mode 100644 index 0000000..31ef6f5 --- /dev/null +++ b/apiserver/src/services/logger.rs @@ -0,0 +1,165 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use axum::response::sse::Event; +use futures::AsyncBufReadExt; +use futures::StreamExt; +use futures::TryStreamExt; +use k8s_openapi::api::core::v1::ContainerStatus; +use k8s_openapi::api::core::v1::Pod; +use kube::api::LogParams; +use kube::runtime::watcher::Config; +use kube::runtime::{watcher, WatchStreamExt}; +use kube::Api; +use kube::ResourceExt; +use tokio::sync::mpsc::Sender; +use tokio::task::JoinHandle; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +pub struct Logger { + api: Api, // The Kubernetes API client. + sender: Sender, // The sender of the log stream. + config: Config, // The configuration of watcher. + watches: HashMap>, // The map of watching containers. +} + +impl Logger { + /// Creates a new logger. + pub fn new(client: kube::Client, sender: Sender, playbook: Uuid, actor: String) -> Self { + let api: Api = Api::namespaced(client, &format!("amp-{playbook}")); + let label_selector = format!("amphitheatre.app/character={actor}"); + let config = Config::default().labels(&label_selector); + + Self { api, sender, config, watches: HashMap::new() } + } + + /// Starts the logger. + pub async fn start(&mut self) { + let watcher = watcher(self.api.clone(), self.config.clone()); + let mut watcher = watcher.touched_objects().boxed(); + + while let Some(pod) = watcher.try_next().await.unwrap() { + let pod_name = pod.name_any(); + if let Some(status) = pod.status { + // Unsubscribe all the watches of the pod if pod is terminating. + // and then break the loop to exit the function. + if status.phase == Some("Terminating".into()) { + self.unsubscribe_all(&pod_name); + return; + } + + self.watches(&pod_name, status.init_container_statuses).await; + self.watches(&pod_name, status.container_statuses).await; + } + } + } + + /// Watches the containers of the pod. + async fn watches(&mut self, pod: &str, containers: Option>) { + if containers.is_none() { + warn!("No container statuses found in pod {}.", pod); + return; + } + + // Iterate the containers of the pod. + for container in containers.unwrap() { + if container.state.is_none() { + warn!("No state found in container {} of {}.", container.name, pod); + continue; + } + let state = container.state.unwrap(); + + // If the container is running, then subscribe the log stream. + if state.running.is_some_and(|s| s.started_at.is_some()) { + let key = &format!("{}-{}", pod, container.name); + if self.watches.contains_key(key) { + debug!("Skip container {} of {} because it's watching.", &container.name, pod); + continue; + } + self.subscribe(pod, &container.name).await; + } + + // If the container is terminated, then unsubscribe the log stream. + if state.terminated.is_some_and(|s| s.finished_at.is_some()) { + debug!("Container {} in {} has been terminated.", container.name, pod); + self.unsubscribe(pod, &container.name); + } + } + } + + /// Subscribes the log stream of the container. + async fn subscribe(&mut self, pod: &str, container: &str) { + let key = format!("{}-{}", pod, container); + + let api = self.api.clone(); + let sender = self.sender.clone(); + let container = container.to_string(); + let pod = pod.to_string(); + + let task = tokio::spawn(async move { + Self::tail(api, sender, pod, container).await; + }); + + self.watches.insert(key, task); + } + + /// Tails the log stream of the container. + async fn tail(api: Api, sender: Sender, pod: String, container: String) { + let params = LogParams { + container: Some(container.to_string()), + follow: true, + tail_lines: Some(100), + timestamps: true, + ..Default::default() + }; + + match api.log_stream(&pod, ¶ms).await { + Ok(stream) => { + info!("Start to receive the log stream of container {} in {}...", container, pod); + let mut lines = stream.lines(); + while let Ok(Some(line)) = lines.try_next().await { + _ = sender.send(Event::default().data(line)).await; + } + } + Err(err) => { + let message = + format!("Some error occurred while log stream for container {} in {}: {}.", container, pod, err); + error!("{}", message); + _ = sender.send(Event::default().data(message)).await; + } + } + } + + /// Unsubscribes all the log streams. + fn unsubscribe_all(&mut self, pod: &str) { + for (key, task) in self.watches.drain() { + if key.starts_with(pod) { + info!("Unsubscribe the log stream of container {} in {}.", key, pod); + task.abort(); + } + } + } + + /// Unsubscribes the log stream of the container. + fn unsubscribe(&mut self, pod: &str, container: &str) { + let key = &format!("{}-{}", pod, container); + if let Some(task) = self.watches.remove(key) { + info!("Unsubscribe the log stream of container {} in {}.", container, pod); + task.abort(); + } + } +} diff --git a/apiserver/src/services/mod.rs b/apiserver/src/services/mod.rs index c2d6e5c..bd732dc 100644 --- a/apiserver/src/services/mod.rs +++ b/apiserver/src/services/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod actor; +pub mod logger; pub mod playbook; pub type Result = std::result::Result;