Skip to content

Commit

Permalink
Refactor trace function and usage (event recorder)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Oct 22, 2023
1 parent 96cde37 commit 9bf4364
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 71 deletions.
40 changes: 11 additions & 29 deletions controllers/src/actor_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ async fn apply(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result
}

async fn init(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
trace(recorder, format!("Building the image for Actor {}", actor.name_any()))
.await
.map_err(Error::ResourceError)?;
actor::patch_status(&ctx.k8s, actor, ActorState::building()).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Building the image for Actor {}", actor.name_any())).await;

Ok(Action::await_change())
}

Expand Down Expand Up @@ -133,17 +132,13 @@ async fn build(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result
match builder::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Build job already exists, update it if there are new changes
let message = format!("Try to refresh an existing build Job {}", actor.spec.name());
trace(recorder, message).await.map_err(Error::ResourceError)?;

trace(recorder, format!("Try to refresh an existing build Job {}", actor.spec.name())).await;
builder::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new build job
let message = format!("Create new build Job: {}", actor.spec.name());
trace(recorder, message).await.map_err(Error::ResourceError)?;

builder::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new build Job: {}", actor.spec.name())).await;
}
}

Expand All @@ -154,52 +149,40 @@ async fn build(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result

// Once the image is built, it is deployed to the cluster with the
// appropriate resource type (e.g. Deployment or StatefulSet).
let message = "The images builded, Running";
trace(recorder, message).await.map_err(Error::ResourceError)?;

let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?;
trace(recorder, "The images builded, Running").await;

Ok(Action::await_change())
}

async fn run(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any()))
.await
.map_err(Error::ResourceError)?;
trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any())).await;

match deployment::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Deployment already exists, update it if there are new changes
let message = format!("Try to refresh an existing Deployment {}", actor.name_any());
trace(recorder, message).await.map_err(Error::ResourceError)?;

trace(recorder, format!("Try to refresh an existing Deployment {}", actor.name_any())).await;
deployment::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new Deployment
trace(recorder, format!("Create new Deployment: {}", actor.name_any()))
.await
.map_err(Error::ResourceError)?;
deployment::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new Deployment: {}", actor.name_any())).await;
}
}

if actor.spec.has_services() {
match service::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Service already exists, update it if there are new changes
let message = format!("Try to refresh an existing Service {}", actor.name_any());
trace(recorder, message).await.map_err(Error::ResourceError)?;

trace(recorder, format!("Try to refresh an existing Service {}", actor.name_any())).await;
service::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new Service
let message = format!("Create new Service: {}", actor.name_any());
trace(recorder, message).await.map_err(Error::ResourceError)?;

service::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new Service: {}", actor.name_any())).await;
}
}
}
Expand All @@ -218,8 +201,7 @@ pub async fn cleanup(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) ->
}
}

let message = format!("Delete Actor `{}`", actor.name_any());
trace(recorder, message).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Delete Actor `{}`", actor.name_any())).await;

Ok(Action::await_change())
}
46 changes: 20 additions & 26 deletions controllers/src/playbook_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,55 +93,52 @@ async fn apply(playbook: &Playbook, ctx: &Arc<Context>, recorder: &Recorder) ->
async fn init(playbook: &Playbook, ctx: &Arc<Context>, recorder: &Recorder) -> Result<()> {
// Create namespace for this playbook
namespace::create(&ctx.k8s, playbook).await.map_err(Error::ResourceError)?;
trace(recorder, "Created namespace for this playbook").await.map_err(Error::ResourceError)?;
trace(recorder, "Created namespace for this playbook").await;

add_preface(playbook, ctx, recorder).await?;

trace(recorder, "Init successfully, Let's begin resolving, now!").await.map_err(Error::ResourceError)?;
playbook::patch_status(&ctx.k8s, playbook, PlaybookState::resolving()).await.map_err(Error::ResourceError)?;
trace(recorder, "Init successfully, Let's begin resolving, now!").await;

Ok(())
}

async fn resolve(playbook: &Playbook, ctx: &Arc<Context>, recorder: &Recorder) -> Result<()> {
// Check if there are any repositories to fetch
//
let mut fetches: HashSet<(&str, Partner)> = HashSet::new();
let credentials = ctx.credentials.read().await;

if let Some(characters) = &playbook.spec.characters {
let exists: HashSet<&String> = characters.iter().map(|char| &char.meta.name).collect();
debug!("The currently existing actors are: {exists:?}");

for character in characters {
if let Some(partners) = &character.partners {
for (name, partner) in partners {
if exists.contains(name) {
continue;
if !exists.contains(name) {
fetches.insert((name, partner.clone()));
}
fetches.insert((name, partner.clone()));
}
}
}
debug!("The currently existing actors are: {exists:?}");
}

debug!("The repositories to be fetched are: {fetches:?}");
debug!("The repositories to be fetched are: {fetches:?}");
}

// Fetch the actors from the repositories
//
let credentials: tokio::sync::RwLockReadGuard<'_, amp_common::config::Credentials> = ctx.credentials.read().await;
for (name, partner) in fetches.iter() {
let character =
resolver::partner::load(&ctx.k8s, &credentials, name, partner).await.map_err(Error::ResolveError)?;

let message = "Fetch and add the actor to this playbook";
trace(recorder, message).await.map_err(Error::ResourceError)?;

playbook::add(&ctx.k8s, playbook, character).await.map_err(Error::ResourceError)?;
trace(recorder, "Fetch and add the actor to this playbook").await;
}

// If there are no repositories to fetch, then the resolution is complete.
if fetches.is_empty() {
let message = "Resolved successfully, Running";
trace(recorder, message).await.map_err(Error::ResourceError)?;

playbook::patch_status(&ctx.k8s, playbook, PlaybookState::running(true, "AutoRun", None))
.await
.map_err(Error::ResourceError)?;
let condition = PlaybookState::running(true, "AutoRun", None);
playbook::patch_status(&ctx.k8s, playbook, condition).await.map_err(Error::ResourceError)?;
trace(recorder, "Resolved successfully, Running").await;
}

Ok(())
Expand All @@ -153,9 +150,8 @@ async fn add_preface(playbook: &Playbook, ctx: &Arc<Context>, recorder: &Recorde
let credentials = ctx.credentials.read().await;
let character =
resolver::preface::load(&ctx.k8s, &credentials, &playbook.spec.preface).await.map_err(Error::ResolveError)?;

trace(recorder, "Fetch and add the character to this playbook").await.map_err(Error::ResourceError)?;
playbook::add(&ctx.k8s, playbook, character).await.map_err(Error::ResourceError)?;
trace(recorder, "Fetch and add the character to this playbook").await;

Ok(())
}
Expand All @@ -169,16 +165,14 @@ async fn run(playbook: &Playbook, ctx: &Arc<Context>, recorder: &Recorder) -> Re
match actor::exists(&ctx.k8s, playbook, name).await.map_err(Error::ResourceError)? {
true => {
// Actor already exists, update it if there are new changes
let message = format!("Try to refresh an existing Actor {}", name);
trace(recorder, message).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Try to refresh an existing Actor {}", name)).await;

let spec = resolver::to_actor(character, &credentials).map_err(Error::ResolveError)?;
actor::update(&ctx.k8s, playbook, &spec).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new actor
let message = format!("Create new Actor: {}", name);
trace(recorder, message).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Create new Actor: {}", name)).await;

let spec = resolver::to_actor(character, &credentials).map_err(Error::ResolveError)?;
actor::create(&ctx.k8s, playbook, &spec).await.map_err(Error::ResourceError)?;
Expand Down
29 changes: 13 additions & 16 deletions resources/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,21 @@
// limitations under the License.

use kube::runtime::events::{Event, EventType, Recorder};
use tracing::info;
use tracing::{error, info};

use super::error::{Error, Result};

pub async fn trace(recorder: &Recorder, message: impl Into<String>) -> Result<()> {
pub async fn trace(recorder: &Recorder, message: impl Into<String>) {
let message: String = message.into();

info!("{}", message);
recorder
.publish(Event {
type_: EventType::Normal,
reason: "Tracing".into(),
note: Some(message),
action: "Reconciling".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;

Ok(())
let event = Event {
type_: EventType::Normal,
reason: "Tracing".into(),
note: Some(message),
action: "Reconciling".into(),
secondary: None,
};

if let Err(err) = recorder.publish(event).await {
error!("Failed to publish event: {}", err);
}
}

0 comments on commit 9bf4364

Please sign in to comment.