Skip to content

Commit

Permalink
Log output is sequenced by workflow and optimized for closed phases #90
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Oct 30, 2023
1 parent 54280ae commit b592da2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 59 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.6.7"
version = "0.6.8"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down
129 changes: 82 additions & 47 deletions apiserver/src/handlers/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::Arc;

Expand All @@ -22,14 +23,18 @@ use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::Json;

use futures::{AsyncBufReadExt, Stream};
use k8s_openapi::api::core::v1::Pod;
use kube::api::{ListParams, LogParams};
use kube::{Api, ResourceExt};
use tokio_stream::StreamExt;
use futures::{AsyncBufReadExt, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::{ContainerStatus, Pod};
use kube::api::LogParams;
use kube::Api;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info};
use uuid::Uuid;

use kube::runtime::{watcher, WatchStreamExt};

use super::Result;
use crate::context::Context;
use crate::errors::ApiError;
Expand Down Expand Up @@ -91,58 +96,88 @@ pub async fn logs(
State(ctx): State<Arc<Context>>,
Path((pid, name)): Path<(Uuid, String)>,
) -> Sse<impl Stream<Item = axum::response::Result<Event, Infallible>>> {
info!("Get logs of actor {}/{}", pid, name);
info!("Start to tail the log stream of actor {} in {}...", name, pid);
let (sender, receiver) = tokio::sync::mpsc::channel(100);

// Watch the status of the pod, if the pod is running, then create a stream for it.
tokio::spawn(async move {
let api: Api<Pod> = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}"));
let config = watcher::Config::default().labels(&format!("app.kubernetes.io/name={name}"));
let mut watcher = watcher(api.clone(), config).applied_objects().boxed();
let subs = Arc::new(RwLock::new(HashSet::new()));

while let Some(pod) = watcher.try_next().await.unwrap() {
if pod.status.is_none() {
continue;
}

let api: Api<Pod> = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}"));
let param = ListParams {
label_selector: Some(format!("app.kubernetes.io/managed-by=Amphitheatre, app.kubernetes.io/name={name}")),
..Default::default()
};
let pods = api.list(&param).await.unwrap();
let status = pod.status.unwrap();
let pod_name = pod.metadata.name.unwrap();

// Create a stream that combines logs from all specified containers
let mut streams = Vec::new();
// check the init container status, if it's not running, then skip it.
if let Some(init_containers) = status.init_container_statuses {
for status in init_containers {
log(&api, &pod_name, &status, &sender, subs.clone()).await;
}
}

for pod in pods {
if pod.spec.is_none() {
continue;
// check the container status, if it's not running, then skip it.
if let Some(containers) = status.container_statuses {
for status in containers {
log(&api, &pod_name, &status, &sender, subs.clone()).await;
}
}
}
});

let pod_name = pod.name_any();
let spec = pod.spec.unwrap();
let stream = ReceiverStream::new(receiver);
let stream = stream.map(|line| Event::default().data(line)).map(Ok);

if let Some(init_containers) = spec.init_containers {
for container in init_containers {
debug!("init container: {}", container.name);
streams.push(stream(&api, &pod_name, &container.name).await);
}
}
for container in spec.containers {
debug!("container: {}", container.name);
streams.push(stream(&api, &pod_name, &container.name).await);
Sse::new(stream).keep_alive(KeepAlive::default())
}

async fn log(
api: &Api<Pod>,
pod: &str,
status: &ContainerStatus,
sender: &Sender<String>,
subs: Arc<RwLock<HashSet<String>>>,
) {
let pod = pod.to_string();
let name = status.name.clone();
let subscription_id: String = format!("{pod}-{name}", pod = pod, name = name);

debug!("container status: {:?}", status);

// If the container is not running, skip it.
if let Some(state) = &status.state {
if state.running.is_none() {
debug!("Skip log stream of container {} because it's not running.", name);
return;
}
}
// If job handle already exists in subscribe list, skip it.
if subs.read().await.contains(&subscription_id) {
debug!("Skip log stream of container {} because it's already subscribed.", name);
return;
}

let combined_stream = futures::stream::select_all(streams);
Sse::new(combined_stream).keep_alive(KeepAlive::default())
}
let api = api.clone();
let sender = sender.clone();

/// Get the log stream of a container
async fn stream(
api: &Api<Pod>,
name: &str,
container: &str,
) -> impl Stream<Item = axum::response::Result<Event, Infallible>> {
let params = LogParams { container: Some(container.into()), follow: true, timestamps: true, ..Default::default() };
let stream = api.log_stream(name, &params).await.unwrap().lines();

stream
.map(|result| match result {
Ok(line) => Event::default().data(line),
Err(err) => Event::default().event("error").data(err.to_string()),
})
.map(Ok)
// .throttle(Duration::from_secs(1))
tokio::spawn(async move {
let params = LogParams { container: Some(name.clone()), follow: true, timestamps: true, ..Default::default() };
let mut stream =
api.log_stream(&pod, &params).await.map_err(|e| ApiError::KubernetesError(e.to_string())).unwrap().lines();

info!("Start to receive the log stream of container {} in {}...", name, pod);
while let Some(line) = stream.try_next().await.unwrap() {
let _ = sender.send(line).await;
}
});

// save the job handle to subscribe list.
subs.write().await.insert(subscription_id);
}

/// Returns a actor's info, including environments, volumes...
Expand Down
5 changes: 0 additions & 5 deletions apiserver/src/services/playbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use amp_common::resource::{Playbook as PlaybookResource, PlaybookSpec};
use amp_resources::playbook;
use chrono::Utc;
use kube::ResourceExt;
use tracing::debug;
use uuid::Uuid;

use crate::context::Context;
Expand Down Expand Up @@ -70,12 +69,8 @@ impl PlaybookService {
},
);

debug!("CreatePlaybookRequest: {:#?}", req);
debug!("PlaybookResource: {:#?}", resource);

let playbook =
playbook::create(&ctx.k8s, &resource).await.map_err(|err| ApiError::KubernetesError(err.to_string()))?;
debug!("Created playbook custom response: {:#?}", playbook);

Ok(PlaybookResponse {
id: playbook.name_any(),
Expand Down

0 comments on commit b592da2

Please sign in to comment.