From cf24b79775524ea9b7874443c4009db92acc1f2f Mon Sep 17 00:00:00 2001 From: Eguo Wang Date: Wed, 24 Jan 2024 21:25:42 +0800 Subject: [PATCH] refactor: Replacing actor reconcile logic with new actor workflows --- Cargo.lock | 14 ++--- Cargo.toml | 2 +- controllers/src/actor_controller.rs | 69 +++++++++++----------- controllers/src/errors.rs | 9 --- controllers/src/playbook_controller.rs | 22 +++---- workflow/src/actor/build.rs | 60 +++++++++++++++++++ workflow/src/actor/cleanup.rs | 79 ++++++++++++++++++++++++++ workflow/src/actor/deploy.rs | 70 +++++++++++++++++++++++ workflow/src/actor/expose.rs | 57 +++++++++++++++++++ workflow/src/actor/init.rs | 60 +++++++++++++++++++ workflow/src/actor/mod.rs | 33 +++++++++++ workflow/src/lib.rs | 1 + 12 files changed, 411 insertions(+), 65 deletions(-) create mode 100644 workflow/src/actor/build.rs create mode 100644 workflow/src/actor/cleanup.rs create mode 100644 workflow/src/actor/deploy.rs create mode 100644 workflow/src/actor/expose.rs create mode 100644 workflow/src/actor/init.rs create mode 100644 workflow/src/actor/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 47f3d27..2cdfcc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,7 +62,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "amp-resources", @@ -122,7 +122,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "amp-resolver", @@ -148,7 +148,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "clap", @@ -160,7 +160,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "amp-resources", @@ -174,7 +174,7 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "anyhow", @@ -194,7 +194,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "async-nats", @@ -211,7 +211,7 @@ dependencies = [ [[package]] name = "amp-workflow" -version = "0.8.4" +version = "0.8.5" dependencies = [ "amp-common", "amp-resolver", diff --git a/Cargo.toml b/Cargo.toml index c916536..ec88a0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.8.4" +version = "0.8.5" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" diff --git a/controllers/src/actor_controller.rs b/controllers/src/actor_controller.rs index dcd0501..62b1b33 100644 --- a/controllers/src/actor_controller.rs +++ b/controllers/src/actor_controller.rs @@ -16,12 +16,11 @@ use std::sync::Arc; use std::time::Duration; use amp_common::resource::Actor; -use amp_resources::deployer::Deployer; +use amp_workflow::Workflow; use futures::{future, StreamExt}; -use k8s_openapi::api::core::v1::Namespace; use kube::api::ListParams; use kube::runtime::controller::Action; -use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent}; +use kube::runtime::finalizer::{finalizer, Event}; use kube::runtime::{watcher, Controller}; use kube::{Api, ResourceExt}; use tracing::{error, info}; @@ -29,6 +28,8 @@ use tracing::{error, info}; use crate::context::Context; use crate::errors::{Error, Result}; +const FINALIZER_NAME: &str = "actors.amphitheatre.app/finalizer"; + pub async fn new(ctx: &Arc) { let api = Api::::all(ctx.k8s.clone()); @@ -47,51 +48,45 @@ pub async fn new(ctx: &Arc) { /// The reconciler that will be called when either object change pub async fn reconcile(actor: Arc, ctx: Arc) -> Result { - info!("Reconciling Actor \"{}\"", actor.name_any()); - let ns = actor.namespace().unwrap(); // actor is namespace scoped let api: Api = Api::namespaced(ctx.k8s.clone(), &ns); + let mut workflow = Workflow::new( + amp_workflow::Context { + k8s: Arc::new(ctx.k8s.clone()), + jetstream: ctx.jetstream.clone(), + credentials: ctx.credentials.clone(), + object: actor.clone(), + }, + Box::new(amp_workflow::actor::InitialState), + ); + // Reconcile the actor custom resource. - let finalizer_name = "actors.amphitheatre.app/finalizer"; - finalizer(&api, finalizer_name, actor, |event| async { + finalizer(&api, FINALIZER_NAME, actor, |event| async { match event { - FinalizerEvent::Apply(actor) => apply(&actor, &ctx).await, - FinalizerEvent::Cleanup(actor) => cleanup(&actor, &ctx).await, - } + Event::Apply(actor) => { + info!("Apply actor {}", actor.name_any()); + workflow.set_context(actor.clone()); + } + Event::Cleanup(actor) => { + info!("Cleanup actor {}", actor.name_any()); + workflow.set_context(actor.clone()); + workflow.transition(Box::new(amp_workflow::actor::CleanupState)); + } + }; + + // Runs the workflow until there is no next state + workflow.run().await; + + Ok(Action::await_change()) }) .await .map_err(|e| Error::FinalizerError(Box::new(e))) } + /// an error handler that will be called when the reconciler fails with access to both the /// object that caused the failure and the actual error -pub fn error_policy(_actor: Arc, error: &Error, _ctx: Arc) -> Action { +pub fn error_policy(_: Arc, error: &Error, _ctx: Arc) -> Action { error!("reconcile failed: {:?}", error); Action::requeue(Duration::from_secs(60)) } - -async fn apply(actor: &Actor, ctx: &Arc) -> Result { - info!("Try to deploying the resources for Actor {}", actor.name_any()); - - let credentials = ctx.credentials.read().await; - let mut deployer = Deployer::new(ctx.k8s.clone(), &credentials, actor); - deployer.run().await.map_err(Error::DeployError)?; - - Ok(Action::await_change()) -} - -pub async fn cleanup(actor: &Actor, ctx: &Arc) -> Result { - let namespace = actor.namespace().unwrap(); - let api: Api = Api::all(ctx.k8s.clone()); - - let ns = api.get(namespace.as_str()).await.map_err(Error::KubeError)?; - if let Some(status) = ns.status { - if status.phase == Some("Terminating".into()) { - return Ok(Action::await_change()); - } - } - - info!("Delete Actor `{}`", actor.name_any()); - - Ok(Action::await_change()) -} diff --git a/controllers/src/errors.rs b/controllers/src/errors.rs index 10b3617..32c746f 100644 --- a/controllers/src/errors.rs +++ b/controllers/src/errors.rs @@ -16,17 +16,8 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Kube Error: {0}")] - KubeError(#[source] kube::Error), - #[error("Finalizer Error: {0}")] FinalizerError(#[source] Box>), - - #[error("NatsError: {0}")] - NatsError(#[from] async_nats::Error), - - #[error("Deploy Error: {0}")] - DeployError(#[source] amp_resources::error::Error), } pub type Result = std::result::Result; diff --git a/controllers/src/playbook_controller.rs b/controllers/src/playbook_controller.rs index 1c12735..3dace65 100644 --- a/controllers/src/playbook_controller.rs +++ b/controllers/src/playbook_controller.rs @@ -51,18 +51,18 @@ pub async fn new(ctx: &Arc) { pub async fn reconcile(playbook: Arc, ctx: Arc) -> Result { let api: Api = Api::all(ctx.k8s.clone()); - // Reconcile the playbook custom resource. - finalizer(&api, FINALIZER_NAME, playbook.clone(), |event| async { - let mut workflow = Workflow::new( - amp_workflow::Context { - k8s: Arc::new(ctx.k8s.clone()), - jetstream: ctx.jetstream.clone(), - credentials: ctx.credentials.clone(), - object: playbook.clone(), - }, - Box::new(amp_workflow::playbook::InitialState), - ); + let mut workflow = Workflow::new( + amp_workflow::Context { + k8s: Arc::new(ctx.k8s.clone()), + jetstream: ctx.jetstream.clone(), + credentials: ctx.credentials.clone(), + object: playbook.clone(), + }, + Box::new(amp_workflow::playbook::InitialState), + ); + // Reconcile the playbook custom resource. + finalizer(&api, FINALIZER_NAME, playbook, |event| async { match event { Event::Apply(playbook) => { info!("Apply playbook {}", playbook.name_any()); diff --git a/workflow/src/actor/build.rs b/workflow/src/actor/build.rs new file mode 100644 index 0000000..4d73c9e --- /dev/null +++ b/workflow/src/actor/build.rs @@ -0,0 +1,60 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::Result; +use crate::{Context, State, Task}; + +use amp_common::resource::Actor; + +use async_trait::async_trait; +use tracing::error; + +use super::DeployingState; + +pub struct BuildingState; + +#[async_trait] +impl State for BuildingState { + /// Execute the logic for the building state + async fn handle(&self, ctx: &Context) -> Option>> { + // Check if BuildTask should be executed + let task = BuildTask::new(); + if task.matches(ctx) { + if let Err(err) = task.execute(ctx).await { + error!("Error during BuildTask execution: {}", err); + } + } + + // Transition to the next state if needed + Some(Box::new(DeployingState)) + } +} + +pub struct BuildTask; + +#[async_trait] +impl Task for BuildTask { + fn new() -> Self { + BuildTask + } + + fn matches(&self, _: &Context) -> bool { + true + } + + /// Execute the task logic for BuildTask using shared data + async fn execute(&self, _: &Context) -> Result<()> { + Ok(()) + } +} diff --git a/workflow/src/actor/cleanup.rs b/workflow/src/actor/cleanup.rs new file mode 100644 index 0000000..e0ecc9d --- /dev/null +++ b/workflow/src/actor/cleanup.rs @@ -0,0 +1,79 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::{Error, Result}; +use crate::{Context, State, Task}; + +use amp_common::resource::Actor; + +use async_trait::async_trait; +use k8s_openapi::api::core::v1::Namespace; +use kube::{Api, ResourceExt}; +use tracing::info; + +pub struct CleanupState; + +#[async_trait] +impl State for CleanupState { + /// Execute the logic for the cleanup state + async fn handle(&self, ctx: &Context) -> Option>> { + // Check if CleanupTask should be executed + let task = CleanupTask::new(); + if task.matches(ctx) { + if let Err(err) = task.execute(ctx).await { + // Handle error, maybe log it + println!("Error during CleanupTask execution: {}", err); + } + } + + None // No transition, end of workflow + } +} + +pub struct CleanupTask; + +#[async_trait] +impl Task for CleanupTask { + fn new() -> Self { + CleanupTask + } + + fn matches(&self, _: &Context) -> bool { + // Always true, this task is called directly from the controller + true + } + + // Execute the task logic for CleanupTask using shared data + async fn execute(&self, ctx: &Context) -> Result<()> { + self.cleanup(ctx, &ctx.object).await + } +} + +impl CleanupTask { + async fn cleanup(&self, ctx: &Context, actor: &Actor) -> Result<()> { + let namespace = actor.namespace().unwrap(); + let api: Api = Api::all((*ctx.k8s).clone()); + + let ns = api.get(namespace.as_str()).await.map_err(Error::KubeError)?; + if let Some(status) = ns.status { + if status.phase == Some("Terminating".into()) { + return Ok(()); + } + } + + info!("Delete Actor `{}`", actor.name_any()); + + Ok(()) + } +} diff --git a/workflow/src/actor/deploy.rs b/workflow/src/actor/deploy.rs new file mode 100644 index 0000000..76e5fc5 --- /dev/null +++ b/workflow/src/actor/deploy.rs @@ -0,0 +1,70 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::Error; +use crate::errors::Result; +use crate::{Context, State, Task}; + +use amp_common::resource::Actor; + +use amp_resources::deployer::Deployer; +use async_trait::async_trait; +use kube::ResourceExt; +use tracing::error; +use tracing::info; + +use super::ExposingState; + +pub struct DeployingState; + +#[async_trait] +impl State for DeployingState { + /// Execute the logic for the deploying state + async fn handle(&self, ctx: &Context) -> Option>> { + // Check if DeployTask should be executed + let task = DeployTask::new(); + if task.matches(ctx) { + if let Err(err) = task.execute(ctx).await { + error!("Error during DeployTask execution: {}", err); + } + } + + // Transition to the next state if needed + Some(Box::new(ExposingState)) + } +} + +pub struct DeployTask; + +#[async_trait] +impl Task for DeployTask { + fn new() -> Self { + DeployTask + } + + fn matches(&self, _: &Context) -> bool { + true + } + + /// Execute the task logic for DeployTask using shared data + async fn execute(&self, ctx: &Context) -> Result<()> { + info!("Try to deploying the resources for Actor {}", &ctx.object.name_any()); + + let credentials = ctx.credentials.read().await; + let mut deployer = Deployer::new((*ctx.k8s).clone(), &credentials, &ctx.object); + deployer.run().await.map_err(Error::DeployError)?; + + Ok(()) + } +} diff --git a/workflow/src/actor/expose.rs b/workflow/src/actor/expose.rs new file mode 100644 index 0000000..d302140 --- /dev/null +++ b/workflow/src/actor/expose.rs @@ -0,0 +1,57 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::Result; +use crate::{Context, State, Task}; + +use amp_common::resource::Actor; + +use async_trait::async_trait; +use tracing::error; + +pub struct ExposingState; + +#[async_trait] +impl State for ExposingState { + /// Execute the logic for the exposing state + async fn handle(&self, ctx: &Context) -> Option>> { + // Check if ExposeTask should be executed + let task = ExposeTask::new(); + if task.matches(ctx) { + if let Err(err) = task.execute(ctx).await { + error!("Error during ExposeTask execution: {}", err); + } + } + + None // No transition, wait for next state + } +} + +pub struct ExposeTask; + +#[async_trait] +impl Task for ExposeTask { + fn new() -> Self { + ExposeTask + } + + fn matches(&self, _: &Context) -> bool { + true + } + + /// Execute the task logic for ExposeTask using shared data + async fn execute(&self, _: &Context) -> Result<()> { + Ok(()) + } +} diff --git a/workflow/src/actor/init.rs b/workflow/src/actor/init.rs new file mode 100644 index 0000000..a7d57be --- /dev/null +++ b/workflow/src/actor/init.rs @@ -0,0 +1,60 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::Result; +use crate::{Context, State, Task}; + +use amp_common::resource::Actor; + +use async_trait::async_trait; +use tracing::error; + +use super::BuildingState; + +pub struct InitialState; + +#[async_trait] +impl State for InitialState { + /// Execute the logic for the initial state + async fn handle(&self, ctx: &Context) -> Option>> { + // Check if InitTask should be executed + let task = InitTask::new(); + if task.matches(ctx) { + if let Err(err) = task.execute(ctx).await { + error!("Error during InitTask execution: {}", err); + } + } + + // Transition to the next state if needed + Some(Box::new(BuildingState)) + } +} + +pub struct InitTask; + +#[async_trait] +impl Task for InitTask { + fn new() -> Self { + InitTask + } + + fn matches(&self, _: &Context) -> bool { + true + } + + /// Execute the task logic for InitTask using shared data + async fn execute(&self, _: &Context) -> Result<()> { + Ok(()) + } +} diff --git a/workflow/src/actor/mod.rs b/workflow/src/actor/mod.rs new file mode 100644 index 0000000..42a6d0f --- /dev/null +++ b/workflow/src/actor/mod.rs @@ -0,0 +1,33 @@ +// Copyright (c) The Amphitheatre Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod init; +pub use init::InitTask; +pub use init::InitialState; + +mod build; +pub use build::BuildTask; +pub use build::BuildingState; + +mod deploy; +pub use deploy::DeployTask; +pub use deploy::DeployingState; + +mod expose; +pub use expose::ExposeTask; +pub use expose::ExposingState; + +mod cleanup; +pub use cleanup::CleanupState; +pub use cleanup::CleanupTask; diff --git a/workflow/src/lib.rs b/workflow/src/lib.rs index d6d2709..0a743de 100644 --- a/workflow/src/lib.rs +++ b/workflow/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod actor; pub mod errors; pub mod playbook;