diff --git a/controllers/src/actor_controller.rs b/controllers/src/actor_controller.rs index ec384bc..f06f30b 100644 --- a/controllers/src/actor_controller.rs +++ b/controllers/src/actor_controller.rs @@ -91,10 +91,9 @@ async fn apply(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result } async fn init(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { - trace(recorder, format!("Building the image for Actor {}", actor.name_any())) - .await - .map_err(Error::ResourceError)?; actor::patch_status(&ctx.k8s, actor, ActorState::building()).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Building the image for Actor {}", actor.name_any())).await; + Ok(Action::await_change()) } @@ -133,17 +132,13 @@ async fn build(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result match builder::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { true => { // Build job already exists, update it if there are new changes - let message = format!("Try to refresh an existing build Job {}", actor.spec.name()); - trace(recorder, message).await.map_err(Error::ResourceError)?; - + trace(recorder, format!("Try to refresh an existing build Job {}", actor.spec.name())).await; builder::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; } false => { // Create a new build job - let message = format!("Create new build Job: {}", actor.spec.name()); - trace(recorder, message).await.map_err(Error::ResourceError)?; - builder::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Created new build Job: {}", actor.spec.name())).await; } } @@ -154,34 +149,26 @@ async fn build(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result // Once the image is built, it is deployed to the cluster with the // appropriate resource type (e.g. Deployment or StatefulSet). - let message = "The images builded, Running"; - trace(recorder, message).await.map_err(Error::ResourceError)?; - let condition = ActorState::running(true, "AutoRun", None); actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?; + trace(recorder, "The images builded, Running").await; Ok(Action::await_change()) } async fn run(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { - trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any())) - .await - .map_err(Error::ResourceError)?; + trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any())).await; match deployment::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { true => { // Deployment already exists, update it if there are new changes - let message = format!("Try to refresh an existing Deployment {}", actor.name_any()); - trace(recorder, message).await.map_err(Error::ResourceError)?; - + trace(recorder, format!("Try to refresh an existing Deployment {}", actor.name_any())).await; deployment::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; } false => { // Create a new Deployment - trace(recorder, format!("Create new Deployment: {}", actor.name_any())) - .await - .map_err(Error::ResourceError)?; deployment::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Created new Deployment: {}", actor.name_any())).await; } } @@ -189,17 +176,13 @@ async fn run(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { // Service already exists, update it if there are new changes - let message = format!("Try to refresh an existing Service {}", actor.name_any()); - trace(recorder, message).await.map_err(Error::ResourceError)?; - + trace(recorder, format!("Try to refresh an existing Service {}", actor.name_any())).await; service::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; } false => { // Create a new Service - let message = format!("Create new Service: {}", actor.name_any()); - trace(recorder, message).await.map_err(Error::ResourceError)?; - service::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Created new Service: {}", actor.name_any())).await; } } } @@ -218,8 +201,7 @@ pub async fn cleanup(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> } } - let message = format!("Delete Actor `{}`", actor.name_any()); - trace(recorder, message).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Delete Actor `{}`", actor.name_any())).await; Ok(Action::await_change()) } diff --git a/controllers/src/playbook_controller.rs b/controllers/src/playbook_controller.rs index 3210621..05e7137 100644 --- a/controllers/src/playbook_controller.rs +++ b/controllers/src/playbook_controller.rs @@ -93,55 +93,52 @@ async fn apply(playbook: &Playbook, ctx: &Arc, recorder: &Recorder) -> async fn init(playbook: &Playbook, ctx: &Arc, recorder: &Recorder) -> Result<()> { // Create namespace for this playbook namespace::create(&ctx.k8s, playbook).await.map_err(Error::ResourceError)?; - trace(recorder, "Created namespace for this playbook").await.map_err(Error::ResourceError)?; + trace(recorder, "Created namespace for this playbook").await; add_preface(playbook, ctx, recorder).await?; - - trace(recorder, "Init successfully, Let's begin resolving, now!").await.map_err(Error::ResourceError)?; playbook::patch_status(&ctx.k8s, playbook, PlaybookState::resolving()).await.map_err(Error::ResourceError)?; + trace(recorder, "Init successfully, Let's begin resolving, now!").await; Ok(()) } async fn resolve(playbook: &Playbook, ctx: &Arc, recorder: &Recorder) -> Result<()> { + // Check if there are any repositories to fetch + // let mut fetches: HashSet<(&str, Partner)> = HashSet::new(); - let credentials = ctx.credentials.read().await; if let Some(characters) = &playbook.spec.characters { let exists: HashSet<&String> = characters.iter().map(|char| &char.meta.name).collect(); + debug!("The currently existing actors are: {exists:?}"); for character in characters { if let Some(partners) = &character.partners { for (name, partner) in partners { - if exists.contains(name) { - continue; + if !exists.contains(name) { + fetches.insert((name, partner.clone())); } - fetches.insert((name, partner.clone())); } } } - debug!("The currently existing actors are: {exists:?}"); - } - debug!("The repositories to be fetched are: {fetches:?}"); + debug!("The repositories to be fetched are: {fetches:?}"); + } + // Fetch the actors from the repositories + // + let credentials: tokio::sync::RwLockReadGuard<'_, amp_common::config::Credentials> = ctx.credentials.read().await; for (name, partner) in fetches.iter() { let character = resolver::partner::load(&ctx.k8s, &credentials, name, partner).await.map_err(Error::ResolveError)?; - - let message = "Fetch and add the actor to this playbook"; - trace(recorder, message).await.map_err(Error::ResourceError)?; - playbook::add(&ctx.k8s, playbook, character).await.map_err(Error::ResourceError)?; + trace(recorder, "Fetch and add the actor to this playbook").await; } + // If there are no repositories to fetch, then the resolution is complete. if fetches.is_empty() { - let message = "Resolved successfully, Running"; - trace(recorder, message).await.map_err(Error::ResourceError)?; - - playbook::patch_status(&ctx.k8s, playbook, PlaybookState::running(true, "AutoRun", None)) - .await - .map_err(Error::ResourceError)?; + let condition = PlaybookState::running(true, "AutoRun", None); + playbook::patch_status(&ctx.k8s, playbook, condition).await.map_err(Error::ResourceError)?; + trace(recorder, "Resolved successfully, Running").await; } Ok(()) @@ -153,9 +150,8 @@ async fn add_preface(playbook: &Playbook, ctx: &Arc, recorder: &Recorde let credentials = ctx.credentials.read().await; let character = resolver::preface::load(&ctx.k8s, &credentials, &playbook.spec.preface).await.map_err(Error::ResolveError)?; - - trace(recorder, "Fetch and add the character to this playbook").await.map_err(Error::ResourceError)?; playbook::add(&ctx.k8s, playbook, character).await.map_err(Error::ResourceError)?; + trace(recorder, "Fetch and add the character to this playbook").await; Ok(()) } @@ -169,16 +165,14 @@ async fn run(playbook: &Playbook, ctx: &Arc, recorder: &Recorder) -> Re match actor::exists(&ctx.k8s, playbook, name).await.map_err(Error::ResourceError)? { true => { // Actor already exists, update it if there are new changes - let message = format!("Try to refresh an existing Actor {}", name); - trace(recorder, message).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Try to refresh an existing Actor {}", name)).await; let spec = resolver::to_actor(character, &credentials).map_err(Error::ResolveError)?; actor::update(&ctx.k8s, playbook, &spec).await.map_err(Error::ResourceError)?; } false => { // Create a new actor - let message = format!("Create new Actor: {}", name); - trace(recorder, message).await.map_err(Error::ResourceError)?; + trace(recorder, format!("Create new Actor: {}", name)).await; let spec = resolver::to_actor(character, &credentials).map_err(Error::ResolveError)?; actor::create(&ctx.k8s, playbook, &spec).await.map_err(Error::ResourceError)?; diff --git a/resources/src/event.rs b/resources/src/event.rs index 57ae1b7..609d956 100644 --- a/resources/src/event.rs +++ b/resources/src/event.rs @@ -13,24 +13,21 @@ // limitations under the License. use kube::runtime::events::{Event, EventType, Recorder}; -use tracing::info; +use tracing::{error, info}; -use super::error::{Error, Result}; - -pub async fn trace(recorder: &Recorder, message: impl Into) -> Result<()> { +pub async fn trace(recorder: &Recorder, message: impl Into) { let message: String = message.into(); - info!("{}", message); - recorder - .publish(Event { - type_: EventType::Normal, - reason: "Tracing".into(), - note: Some(message), - action: "Reconciling".into(), - secondary: None, - }) - .await - .map_err(Error::KubeError)?; - Ok(()) + let event = Event { + type_: EventType::Normal, + reason: "Tracing".into(), + note: Some(message), + action: "Reconciling".into(), + secondary: None, + }; + + if let Err(err) = recorder.publish(event).await { + error!("Failed to publish event: {}", err); + } }