Skip to content

Commit

Permalink
Refactor actor reconciler for support live and sync once
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Oct 23, 2023
1 parent d6cf915 commit 7de39dc
Show file tree
Hide file tree
Showing 17 changed files with 475 additions and 361 deletions.
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.6.0"
version = "0.6.3"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand All @@ -21,7 +21,7 @@ members = [
# https://doc.rust-lang.org/cargo/reference/workspaces.html#the-workspacedependencies-table
[workspace.dependencies]
anyhow = "1.0"
amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.5.2" }
amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.5.3" }
amp-resolver = { path = "resolver" }
amp-resources = { path = "resources" }
async-nats = "0.32.1"
Expand Down
117 changes: 5 additions & 112 deletions controllers/src/actor_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use std::sync::Arc;
use std::time::Duration;

use amp_common::docker::{self, registry, DockerConfig};
use amp_common::resource::{Actor, ActorState};
use amp_common::resource::Actor;
use amp_resources::deployer::Deployer;
use amp_resources::event::trace;
use amp_resources::{actor, builder, deployment, service};
use futures::{future, StreamExt};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ListParams;
Expand Down Expand Up @@ -75,117 +74,11 @@ pub fn error_policy(_actor: Arc<Actor>, error: &Error, _ctx: Arc<Context>) -> Ac
}

async fn apply(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
let mut action = Action::await_change();

if let Some(ref status) = actor.status {
if status.pending() {
action = init(actor, ctx, recorder).await?
} else if status.building() {
action = build(actor, ctx, recorder).await?
} else if status.running() {
action = run(actor, ctx, recorder).await?
}
}

Ok(action)
}

async fn init(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
actor::patch_status(&ctx.k8s, actor, ActorState::building()).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Building the image for Actor {}", actor.name_any())).await;

Ok(Action::await_change())
}

async fn build(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
// Return if the actor is live
if actor.spec.live {
info!("The actor is live mode, Running");
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?;

return Ok(Action::await_change());
}

// Return if the image already exists
let credentials = ctx.credentials.read().await;
let config = DockerConfig::from(&credentials.registries);

let credential = docker::get_credential(&config, &actor.spec.image);
let credential = match credential {
Ok(credential) => Some(credential),
Err(err) => {
error!("Error handling docker configuration: {}", err);
None
}
};

if registry::exists(&actor.spec.image, credential).await.map_err(Error::DockerRegistryExistsFailed)? {
info!("The images already exists, Running");
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?;

return Ok(Action::await_change());
}

// Build the image
match builder::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Build job already exists, update it if there are new changes
trace(recorder, format!("Try to refresh an existing build Job {}", actor.spec.name())).await;
builder::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new build job
builder::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new build Job: {}", actor.spec.name())).await;
}
}

// Check If the build Job has not completed, requeue the reconciler.
if !builder::completed(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
return Ok(Action::requeue(Duration::from_secs(60)));
}

// Once the image is built, it is deployed to the cluster with the
// appropriate resource type (e.g. Deployment or StatefulSet).
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?;
trace(recorder, "The images builded, Running").await;

Ok(Action::await_change())
}

async fn run(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any())).await;

match deployment::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Deployment already exists, update it if there are new changes
trace(recorder, format!("Try to refresh an existing Deployment {}", actor.name_any())).await;
deployment::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new Deployment
deployment::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new Deployment: {}", actor.name_any())).await;
}
}

if actor.spec.has_services() {
match service::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? {
true => {
// Service already exists, update it if there are new changes
trace(recorder, format!("Try to refresh an existing Service {}", actor.name_any())).await;
service::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
}
false => {
// Create a new Service
service::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?;
trace(recorder, format!("Created new Service: {}", actor.name_any())).await;
}
}
}
let credentials = ctx.credentials.read().await;
let mut deployer = Deployer::new(ctx.k8s.clone(), &credentials, actor);
deployer.run().await.map_err(Error::DeployError)?;

Ok(Action::await_change())
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ pub enum Error {
#[error("ResolveError: {0}")]
ResolveError(#[source] amp_resolver::errors::ResolveError),

#[error("DockerRegistryExistsFailed: {0}")]
DockerRegistryExistsFailed(#[source] anyhow::Error),

#[error("NatsError: {0}")]
NatsError(#[from] async_nats::Error),

#[error("Deploy Error: {0}")]
DeployError(#[source] amp_resources::error::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions resources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "K8s resources manager for the Amphitheatre platform"

[dependencies]
amp-common = { workspace = true, optional = false }
anyhow = { workspace = true, optional = false }
kube = { workspace = true, optional = false }
k8s-metrics = "0.14.0"
k8s-openapi = { workspace = true, optional = false }
Expand Down
138 changes: 0 additions & 138 deletions resources/src/builder.rs

This file was deleted.

Loading

0 comments on commit 7de39dc

Please sign in to comment.