From 7de39dc0cf173a1f15c8ea3ba2d125f09d4d23d7 Mon Sep 17 00:00:00 2001 From: Eguo Wang Date: Mon, 23 Oct 2023 23:26:39 +0800 Subject: [PATCH] Refactor actor reconciler for support live and sync once --- Cargo.lock | 17 +- Cargo.toml | 4 +- controllers/src/actor_controller.rs | 117 +------------ controllers/src/error.rs | 6 +- resources/Cargo.toml | 1 + resources/src/builder.rs | 138 --------------- resources/src/containers/application.rs | 27 ++- resources/src/containers/buildpacks.rs | 61 +++++-- resources/src/containers/devcontainer.rs | 32 ++-- resources/src/containers/git_sync.rs | 30 +++- resources/src/containers/kaniko.rs | 41 +++-- resources/src/containers/mod.rs | 36 ++++ resources/src/containers/syncer.rs | 46 +++-- resources/src/deployer.rs | 211 +++++++++++++++++++++++ resources/src/deployment.rs | 52 +++--- resources/src/error.rs | 15 ++ resources/src/lib.rs | 2 +- 17 files changed, 475 insertions(+), 361 deletions(-) delete mode 100644 resources/src/builder.rs create mode 100644 resources/src/deployer.rs diff --git a/Cargo.lock b/Cargo.lock index 712c617..7311fe2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "amp-apiserver" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", "amp-resources", @@ -93,8 +93,8 @@ dependencies = [ [[package]] name = "amp-common" -version = "0.5.2" -source = "git+https://github.com/amphitheatre-app/common?tag=v0.5.2#212d04959cd64e37df9dc8243374d9a9d6289621" +version = "0.5.3" +source = "git+https://github.com/amphitheatre-app/common?tag=v0.5.3#8ca2e860dbc6ffc13cd3a2f82a453d555a60b606" dependencies = [ "anyhow", "chrono", @@ -121,7 +121,7 @@ dependencies = [ [[package]] name = "amp-controllers" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", "amp-resolver", @@ -146,7 +146,7 @@ dependencies = [ [[package]] name = "amp-crdgen" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", "clap", @@ -158,7 +158,7 @@ dependencies = [ [[package]] name = "amp-resolver" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", "amp-resources", @@ -172,9 +172,10 @@ dependencies = [ [[package]] name = "amp-resources" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", + "anyhow", "k8s-metrics", "k8s-openapi", "kube", @@ -191,7 +192,7 @@ dependencies = [ [[package]] name = "amp-syncer" -version = "0.6.0" +version = "0.6.3" dependencies = [ "amp-common", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 6431e76..58946cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.6.0" +version = "0.6.3" edition = "2021" license = "Apache-2.0" repository = "https://github.com/amphitheatre-app/amphitheatre" @@ -21,7 +21,7 @@ members = [ # https://doc.rust-lang.org/cargo/reference/workspaces.html#the-workspacedependencies-table [workspace.dependencies] anyhow = "1.0" -amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.5.2" } +amp-common = { git = "https://github.com/amphitheatre-app/common", tag = "v0.5.3" } amp-resolver = { path = "resolver" } amp-resources = { path = "resources" } async-nats = "0.32.1" diff --git a/controllers/src/actor_controller.rs b/controllers/src/actor_controller.rs index f06f30b..c4ea0a8 100644 --- a/controllers/src/actor_controller.rs +++ b/controllers/src/actor_controller.rs @@ -15,10 +15,9 @@ use std::sync::Arc; use std::time::Duration; -use amp_common::docker::{self, registry, DockerConfig}; -use amp_common::resource::{Actor, ActorState}; +use amp_common::resource::Actor; +use amp_resources::deployer::Deployer; use amp_resources::event::trace; -use amp_resources::{actor, builder, deployment, service}; use futures::{future, StreamExt}; use k8s_openapi::api::core::v1::Namespace; use kube::api::ListParams; @@ -75,117 +74,11 @@ pub fn error_policy(_actor: Arc, error: &Error, _ctx: Arc) -> Ac } async fn apply(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { - let mut action = Action::await_change(); - - if let Some(ref status) = actor.status { - if status.pending() { - action = init(actor, ctx, recorder).await? - } else if status.building() { - action = build(actor, ctx, recorder).await? - } else if status.running() { - action = run(actor, ctx, recorder).await? - } - } - - Ok(action) -} - -async fn init(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { - actor::patch_status(&ctx.k8s, actor, ActorState::building()).await.map_err(Error::ResourceError)?; - trace(recorder, format!("Building the image for Actor {}", actor.name_any())).await; - - Ok(Action::await_change()) -} - -async fn build(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { - // Return if the actor is live - if actor.spec.live { - info!("The actor is live mode, Running"); - let condition = ActorState::running(true, "AutoRun", None); - actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?; - - return Ok(Action::await_change()); - } - - // Return if the image already exists - let credentials = ctx.credentials.read().await; - let config = DockerConfig::from(&credentials.registries); - - let credential = docker::get_credential(&config, &actor.spec.image); - let credential = match credential { - Ok(credential) => Some(credential), - Err(err) => { - error!("Error handling docker configuration: {}", err); - None - } - }; - - if registry::exists(&actor.spec.image, credential).await.map_err(Error::DockerRegistryExistsFailed)? { - info!("The images already exists, Running"); - let condition = ActorState::running(true, "AutoRun", None); - actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?; - - return Ok(Action::await_change()); - } - - // Build the image - match builder::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { - true => { - // Build job already exists, update it if there are new changes - trace(recorder, format!("Try to refresh an existing build Job {}", actor.spec.name())).await; - builder::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - } - false => { - // Create a new build job - builder::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - trace(recorder, format!("Created new build Job: {}", actor.spec.name())).await; - } - } - - // Check If the build Job has not completed, requeue the reconciler. - if !builder::completed(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { - return Ok(Action::requeue(Duration::from_secs(60))); - } - - // Once the image is built, it is deployed to the cluster with the - // appropriate resource type (e.g. Deployment or StatefulSet). - let condition = ActorState::running(true, "AutoRun", None); - actor::patch_status(&ctx.k8s, actor, condition).await.map_err(Error::ResourceError)?; - trace(recorder, "The images builded, Running").await; - - Ok(Action::await_change()) -} - -async fn run(actor: &Actor, ctx: &Arc, recorder: &Recorder) -> Result { trace(recorder, format!("Try to deploying the resources for Actor {}", actor.name_any())).await; - match deployment::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { - true => { - // Deployment already exists, update it if there are new changes - trace(recorder, format!("Try to refresh an existing Deployment {}", actor.name_any())).await; - deployment::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - } - false => { - // Create a new Deployment - deployment::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - trace(recorder, format!("Created new Deployment: {}", actor.name_any())).await; - } - } - - if actor.spec.has_services() { - match service::exists(&ctx.k8s, actor).await.map_err(Error::ResourceError)? { - true => { - // Service already exists, update it if there are new changes - trace(recorder, format!("Try to refresh an existing Service {}", actor.name_any())).await; - service::update(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - } - false => { - // Create a new Service - service::create(&ctx.k8s, actor).await.map_err(Error::ResourceError)?; - trace(recorder, format!("Created new Service: {}", actor.name_any())).await; - } - } - } + let credentials = ctx.credentials.read().await; + let mut deployer = Deployer::new(ctx.k8s.clone(), &credentials, actor); + deployer.run().await.map_err(Error::DeployError)?; Ok(Action::await_change()) } diff --git a/controllers/src/error.rs b/controllers/src/error.rs index bf53fa5..62357ee 100644 --- a/controllers/src/error.rs +++ b/controllers/src/error.rs @@ -30,11 +30,11 @@ pub enum Error { #[error("ResolveError: {0}")] ResolveError(#[source] amp_resolver::errors::ResolveError), - #[error("DockerRegistryExistsFailed: {0}")] - DockerRegistryExistsFailed(#[source] anyhow::Error), - #[error("NatsError: {0}")] NatsError(#[from] async_nats::Error), + + #[error("Deploy Error: {0}")] + DeployError(#[source] amp_resources::error::Error), } pub type Result = std::result::Result; diff --git a/resources/Cargo.toml b/resources/Cargo.toml index bc4e2a4..ed9bf28 100644 --- a/resources/Cargo.toml +++ b/resources/Cargo.toml @@ -10,6 +10,7 @@ description = "K8s resources manager for the Amphitheatre platform" [dependencies] amp-common = { workspace = true, optional = false } +anyhow = { workspace = true, optional = false } kube = { workspace = true, optional = false } k8s-metrics = "0.14.0" k8s-openapi = { workspace = true, optional = false } diff --git a/resources/src/builder.rs b/resources/src/builder.rs deleted file mode 100644 index 6c2147e..0000000 --- a/resources/src/builder.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2023 The Amphitheatre Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; - -use amp_common::resource::Actor; -use amp_common::schema::BuildMethod; -use k8s_openapi::api::batch::v1::{Job, JobSpec}; -use k8s_openapi::api::core::v1::PodTemplateSpec; -use kube::api::{Patch, PatchParams, PostParams}; -use kube::core::ObjectMeta; -use kube::{Api, Client, Resource, ResourceExt}; -use tracing::{debug, info}; - -use crate::containers::{buildpacks, kaniko}; -use crate::error::{Error, Result}; -use crate::{hash, LAST_APPLIED_HASH_KEY}; - -pub async fn exists(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let name = actor.spec.name(); - - Ok(api.get_opt(&name).await.map_err(Error::KubeError)?.is_some()) -} - -pub async fn create(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - - let resource = new(actor)?; - debug!("The Job resource:\n {:?}\n", resource); - - let job = api.create(&PostParams::default(), &resource).await.map_err(Error::KubeError)?; - - info!("Created Job: {}", job.name_any()); - Ok(job) -} - -pub async fn update(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let name = actor.spec.name(); - - let mut job = api.get(&name).await.map_err(Error::KubeError)?; - debug!("The Job {} already exists: {:?}", &name, job); - - let expected_hash = hash(&actor.spec)?; - let found_hash: String = job.annotations().get(LAST_APPLIED_HASH_KEY).map_or("".into(), |v| v.into()); - - if found_hash == expected_hash { - debug!("The Job {} is already up-to-date", &name); - return Ok(job); - } - - let resource = new(actor)?; - debug!("The updating Job resource:\n {:?}\n", resource); - - let params = &PatchParams::apply("amp-controllers").force(); - job = api.patch(&name, params, &Patch::Apply(&resource)).await.map_err(Error::KubeError)?; - - info!("Updated Job: {}", job.name_any()); - Ok(job) -} - -/// Create a Job for build images -fn new(actor: &Actor) -> Result { - let name = actor.spec.name(); - - // Build the metadata for the job - let owner_reference = actor.controller_owner_ref(&()).unwrap(); - let annotations = BTreeMap::from([(LAST_APPLIED_HASH_KEY.into(), hash(&actor.spec)?)]); - let labels = BTreeMap::from([ - ("app.kubernetes.io/name".into(), name.clone()), - ("app.kubernetes.io/managed-by".into(), "Amphitheatre".into()), - ]); - let metadata = ObjectMeta { - name: Some(name), - owner_references: Some(vec![owner_reference]), - labels: Some(labels.clone()), - annotations: Some(annotations), - ..Default::default() - }; - - // Prefer to use Kaniko to build images with Dockerfile, - // else, build the image with Cloud Native Buildpacks - let build = actor.spec.character.build.clone().unwrap_or_default(); - let pod = match build.method() { - BuildMethod::Dockerfile => { - debug!("Found dockerfile, build it with kaniko"); - kaniko::pod(&actor.spec) - } - BuildMethod::Buildpacks => { - debug!("Build the image with Cloud Native Buildpacks"); - buildpacks::pod(&actor.spec) - } - }; - - // Build the spec for the job - let spec = JobSpec { - backoff_limit: Some(0), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { labels: Some(labels), ..Default::default() }), - spec: Some(pod), - }, - ..Default::default() - }; - - // Build and return the job resource - Ok(Job { metadata, spec: Some(spec), ..Default::default() }) -} - -pub async fn completed(client: &Client, actor: &Actor) -> Result { - debug!("Check If the build Job has not completed"); - - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let name = actor.spec.name(); - - if let Ok(Some(job)) = api.get_opt(&name).await { - debug!("Found Job {}", &name); - Ok(job.status.map_or(false, |s| s.succeeded >= Some(1))) - } else { - debug!("Not found Job {}", &name); - Ok(false) - } -} diff --git a/resources/src/containers/application.rs b/resources/src/containers/application.rs index 6e4245b..80e340e 100644 --- a/resources/src/containers/application.rs +++ b/resources/src/containers/application.rs @@ -12,16 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use amp_common::resource::{Actor, ActorSpec}; -use k8s_openapi::api::core::v1::{Container, PodSpec}; - -/// Build and return the pod spec for the actor -pub fn pod(actor: &Actor) -> PodSpec { - PodSpec { containers: vec![container(&actor.spec)], ..Default::default() } -} +use amp_common::resource::ActorSpec; +use k8s_openapi::api::core::v1::Container; /// Build and return the container spec for the actor -fn container(spec: &ActorSpec) -> Container { +pub fn container(spec: &ActorSpec) -> Container { let mut environments = Some(vec![]); let mut container_ports = Some(vec![]); @@ -40,3 +35,19 @@ fn container(spec: &ActorSpec) -> Container { ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let spec = ActorSpec { name: "test".into(), image: "test".into(), ..Default::default() }; + + let container = container(&spec); + + assert_eq!(container.name, "test"); + assert_eq!(container.image, Some("test".into())); + assert_eq!(container.image_pull_policy, Some("Always".into())); + } +} diff --git a/resources/src/containers/buildpacks.rs b/resources/src/containers/buildpacks.rs index f2014ea..7f484c5 100644 --- a/resources/src/containers/buildpacks.rs +++ b/resources/src/containers/buildpacks.rs @@ -13,24 +13,13 @@ // limitations under the License. use amp_common::resource::ActorSpec; -use k8s_openapi::api::core::v1::{Container, EnvVar, PodSpec, VolumeMount}; +use k8s_openapi::api::core::v1::{Container, EnvVar, VolumeMount}; -use super::{docker_config_volume, git_sync, workspace_mount, workspace_volume, WORKSPACE_DIR}; +use super::{workspace_mount, WORKSPACE_DIR}; use crate::args; -/// Build and return the pod spec for the buildpacks builder job -pub fn pod(spec: &ActorSpec) -> PodSpec { - PodSpec { - restart_policy: Some("Never".into()), - init_containers: Some(vec![git_sync::container(spec.source.as_ref().unwrap())]), - containers: vec![container(spec)], - volumes: Some(vec![workspace_volume(), docker_config_volume()]), - ..Default::default() - } -} - /// Build and return the container spec for the buildpacks container -fn container(spec: &ActorSpec) -> Container { +pub fn container(spec: &ActorSpec) -> Container { let build = spec.character.build.clone().unwrap_or_default(); // Parse the arguments for the container @@ -67,3 +56,47 @@ fn container(spec: &ActorSpec) -> Container { pub fn docker_config_mount() -> VolumeMount { VolumeMount { name: "docker-config".into(), mount_path: "/workspace/.docker".into(), ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let spec = ActorSpec { name: "test".into(), image: "test".into(), ..Default::default() }; + + let container = container(&spec); + + assert_eq!(container.name, "builder"); + assert_eq!(container.image, Some("gcr.io/buildpacks/builder:v1".into())); + assert_eq!(container.image_pull_policy, Some("IfNotPresent".into())); + assert_eq!(container.command, Some(vec!["/cnb/lifecycle/creator".into()])); + assert_eq!(container.args, Some(vec!["-app=/workspace/app".into(), "test".into()])); + assert_eq!( + container.env, + Some(vec![ + EnvVar { name: "CNB_PLATFORM_API".into(), value: Some("0.11".into()), ..Default::default() }, + EnvVar { name: "DOCKER_CONFIG".into(), value: Some("/workspace/.docker".into()), ..Default::default() }, + ]) + ); + assert_eq!( + container.volume_mounts, + Some(vec![ + VolumeMount { name: "workspace".into(), mount_path: "/workspace".into(), ..Default::default() }, + VolumeMount { + name: "docker-config".into(), + mount_path: "/workspace/.docker".into(), + ..Default::default() + }, + ]) + ); + } + + #[test] + fn test_docker_config_mount() { + let mount = docker_config_mount(); + + assert_eq!(mount.name, "docker-config"); + assert_eq!(mount.mount_path, "/workspace/.docker"); + } +} diff --git a/resources/src/containers/devcontainer.rs b/resources/src/containers/devcontainer.rs index ab06d2b..daf05cf 100644 --- a/resources/src/containers/devcontainer.rs +++ b/resources/src/containers/devcontainer.rs @@ -12,22 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use amp_common::resource::Actor; -use k8s_openapi::api::core::v1::{Container, PodSpec}; +use amp_common::resource::ActorSpec; +use k8s_openapi::api::core::v1::Container; -use super::{syncer, workspace_mount, workspace_volume, DEFAULT_DEVCONTAINER_IMAGE}; -use crate::error::Result; - -/// Build and return the pod spec for the devcontainer build deployment -pub fn pod(actor: &Actor) -> Result { - let syncer = syncer::container(actor)?; - let builder = container(actor); - - Ok(PodSpec { containers: vec![syncer, builder], volumes: Some(vec![workspace_volume()]), ..Default::default() }) -} +use super::{workspace_mount, DEFAULT_DEVCONTAINER_IMAGE}; /// Build and return the container spec for the devcontainer. -fn container(_actor: &Actor) -> Container { +pub fn container(_spec: &ActorSpec) -> Container { Container { name: "builder".to_string(), @@ -44,3 +35,18 @@ fn container(_actor: &Actor) -> Container { ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let actor = ActorSpec { name: "test".into(), image: "test".into(), ..Default::default() }; + let container = container(&actor); + + assert_eq!(container.name, "builder"); + assert_eq!(container.image, Some(DEFAULT_DEVCONTAINER_IMAGE.to_string())); + assert_eq!(container.image_pull_policy, Some("IfNotPresent".to_string())); + } +} diff --git a/resources/src/containers/git_sync.rs b/resources/src/containers/git_sync.rs index e8d7e9b..d85e8d5 100644 --- a/resources/src/containers/git_sync.rs +++ b/resources/src/containers/git_sync.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use amp_common::schema::GitReference; +use amp_common::resource::ActorSpec; use k8s_openapi::api::core::v1::Container; use super::{workspace_mount, DEFAULT_GIT_SYNC_IMAGE, WORKSPACE_DIR}; use crate::args; /// Build and return the container spec for the git-sync. -pub fn container(source: &GitReference) -> Container { +pub fn container(spec: &ActorSpec) -> Container { + let source = spec.source.as_ref().unwrap(); // Parse the arguments for the container let revision = source.rev(); let arguments = vec![ @@ -41,3 +42,28 @@ pub fn container(source: &GitReference) -> Container { ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let spec = ActorSpec { + name: "test".into(), + image: "test".into(), + source: Some(amp_common::schema::GitReference { + repo: "test".into(), + rev: Some("test".into()), + ..Default::default() + }), + ..Default::default() + }; + + let container = container(&spec); + + assert_eq!(container.name, "syncer"); + assert_eq!(container.image, Some(DEFAULT_GIT_SYNC_IMAGE.to_string())); + assert_eq!(container.image_pull_policy, Some("IfNotPresent".to_string())); + } +} diff --git a/resources/src/containers/kaniko.rs b/resources/src/containers/kaniko.rs index 589360c..157a0e5 100644 --- a/resources/src/containers/kaniko.rs +++ b/resources/src/containers/kaniko.rs @@ -15,24 +15,13 @@ use std::path::PathBuf; use amp_common::resource::ActorSpec; -use k8s_openapi::api::core::v1::{Container, PodSpec, VolumeMount}; +use k8s_openapi::api::core::v1::{Container, VolumeMount}; -use super::{docker_config_volume, git_sync, workspace_mount, workspace_volume, DEFAULT_KANIKO_IMAGE, WORKSPACE_DIR}; +use super::{workspace_mount, DEFAULT_KANIKO_IMAGE, WORKSPACE_DIR}; use crate::args; -/// Build and return the pod spec for the kaniko builder job -pub fn pod(spec: &ActorSpec) -> PodSpec { - PodSpec { - restart_policy: Some("Never".into()), - init_containers: Some(vec![git_sync::container(spec.source.as_ref().unwrap())]), - containers: vec![container(spec)], - volumes: Some(vec![workspace_volume(), docker_config_volume()]), - ..Default::default() - } -} - /// Build and return the container spec for the kaniko container -fn container(spec: &ActorSpec) -> Container { +pub fn container(spec: &ActorSpec) -> Container { let build = spec.character.build.clone().unwrap_or_default(); // Set the working directory to context. @@ -75,3 +64,27 @@ fn container(spec: &ActorSpec) -> Container { fn docker_config_mount() -> VolumeMount { VolumeMount { name: "docker-config".into(), mount_path: "/kaniko/.docker".into(), ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let spec = ActorSpec { name: "test".into(), image: "test".into(), ..Default::default() }; + + let container = container(&spec); + + assert_eq!(container.name, "builder"); + assert_eq!(container.image, Some(DEFAULT_KANIKO_IMAGE.into())); + assert_eq!(container.image_pull_policy, Some("IfNotPresent".into())); + } + + #[test] + fn test_docker_config_mount() { + let mount = docker_config_mount(); + + assert_eq!(mount.name, "docker-config"); + assert_eq!(mount.mount_path, "/kaniko/.docker"); + } +} diff --git a/resources/src/containers/mod.rs b/resources/src/containers/mod.rs index 1dd3b62..c300655 100644 --- a/resources/src/containers/mod.rs +++ b/resources/src/containers/mod.rs @@ -67,3 +67,39 @@ pub fn docker_config_volume() -> Volume { ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_workspace_volume() { + let volume = workspace_volume(); + + assert_eq!(volume.name, "workspace"); + assert_eq!(volume.empty_dir, Some(Default::default())); + } + + #[test] + fn test_workspace_mount() { + let mount = workspace_mount(); + + assert_eq!(mount.name, "workspace"); + assert_eq!(mount.mount_path, "/workspace"); + } + + #[test] + fn test_docker_config_volume() { + let volume = docker_config_volume(); + + assert_eq!(volume.name, "docker-config"); + + let secret = volume.secret.unwrap(); + assert_eq!(secret.secret_name, Some("amp-registry-credentials".into())); + + let items = secret.items.unwrap(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].key, ".dockerconfigjson"); + assert_eq!(items[0].path, "config.json"); + } +} diff --git a/resources/src/containers/syncer.rs b/resources/src/containers/syncer.rs index 6eb5b04..2792fa3 100644 --- a/resources/src/containers/syncer.rs +++ b/resources/src/containers/syncer.rs @@ -14,36 +14,30 @@ use std::path::PathBuf; -use amp_common::resource::Actor; +use amp_common::resource::ActorSpec; use k8s_openapi::api::core::v1::Container; -use kube::ResourceExt; use super::{workspace_mount, DEFAULT_SYNCER_IMAGE, WORKSPACE_DIR}; use crate::args; -use crate::error::{Error, Result}; +use crate::error::Result; /// Build and return the container spec for the syncer. -pub fn container(actor: &Actor) -> Result { - // Get the playbook name from the owner reference. - let playbook = actor - .owner_references() - .iter() - .find_map(|owner| (owner.kind == "Playbook").then(|| owner.name.clone())) - .ok_or_else(|| Error::MissingObjectKey(".metadata.ownerReferences"))?; - +pub fn container(playbook: &str, spec: &ActorSpec) -> Result { // Set the working directory to `workspace` argument. - let build = actor.spec.character.build.clone().unwrap_or_default(); + let build = spec.character.build.clone().unwrap_or_default(); let mut workdir = PathBuf::from(WORKSPACE_DIR); if let Some(context) = &build.context { workdir.push(context); } // FIXME: get the nats url from the config of context. + let once = spec.once.to_string(); let arguments = vec![ ("nats-url", "nats://amp-nats.amp-system.svc:4222"), ("workspace", workdir.to_str().unwrap()), - ("playbook", &playbook), - ("actor", actor.spec.name.as_str()), + ("playbook", playbook), + ("actor", spec.name.as_str()), + ("once", once.as_str()), ]; // FIXME: get the syncer image from the config of context. @@ -57,3 +51,27 @@ pub fn container(actor: &Actor) -> Result { ..Default::default() }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_container() { + let spec = ActorSpec { name: "test".into(), image: "test".into(), ..Default::default() }; + let container = container("test", &spec).unwrap(); + + assert_eq!(container.name, "syncer"); + assert_eq!(container.image, Some(DEFAULT_SYNCER_IMAGE.into())); + assert_eq!( + container.args, + Some(vec![ + "--nats-url=nats://amp-nats.amp-system.svc:4222".into(), + "--workspace=/workspace/app".into(), + "--playbook=test".into(), + "--actor=test".into(), + "--once=false".into() + ]) + ); + } +} diff --git a/resources/src/deployer.rs b/resources/src/deployer.rs new file mode 100644 index 0000000..0e55f50 --- /dev/null +++ b/resources/src/deployer.rs @@ -0,0 +1,211 @@ +// Copyright 2023 The Amphitheatre Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use amp_common::config::Credentials; +use amp_common::docker::{self, registry, DockerConfig}; +use amp_common::resource::Actor; +use amp_common::schema::BuildMethod; +use k8s_openapi::api::core::v1::{Container, PodSpec}; +use kube::ResourceExt; +use tracing::{debug, error, info}; + +use crate::containers::{application, buildpacks, docker_config_volume, git_sync, kaniko, syncer, workspace_volume}; +use crate::error::{self, Error, Result}; +use crate::{deployment, hash, service}; + +pub struct Deployer { + k8s: kube::Client, + credentials: Credentials, + actor: Actor, + live: bool, + once: bool, + + syncer: Option, + builder: Option, +} + +impl Deployer { + pub fn new(k8s: kube::Client, credentials: &Credentials, actor: &Actor) -> Self { + Self { + k8s, + credentials: credentials.clone(), + actor: actor.clone(), + live: actor.spec.live, + once: actor.spec.once, + syncer: None, + builder: None, + } + } + + pub async fn run(&mut self) -> Result<()> { + if !self.live && self.built(&self.actor.spec.image).await? { + info!("The image is already built, skip the build process"); + } else { + info!("Prepare the build and deploy requirements for building"); + self.prepare().await?; + } + + // Deploy the actor + self.deploy().await?; + + // Exposure the actor service + if self.actor.spec.has_services() { + self.serve().await?; + } + + Ok(()) + } + + // Check if the image is already built + async fn built(&self, image: &str) -> Result { + let config = DockerConfig::from(&self.credentials.registries); + + let credential = docker::get_credential(&config, image); + let credential = match credential { + Ok(credential) => Some(credential), + Err(err) => { + error!("Error handling docker configuration: {}", err); + None + } + }; + + if registry::exists(image, credential).await.map_err(Error::DockerRegistryExistsFailed)? { + info!("The images already exists, Running"); + return Ok(true); + } + + Ok(false) + } + + async fn prepare(&mut self) -> Result<()> { + // Set the syncer + if self.actor.spec.source.is_some() { + self.syncer = Some(git_sync::container(&self.actor.spec)); + } else { + let playbook = owner_reference(&self.actor)?; + self.syncer = Some(syncer::container(&playbook, &self.actor.spec)?); + } + + // Prefer to use Kaniko to build images with Dockerfile, + // else, build the image with Cloud Native Buildpacks + let build = self.actor.spec.character.build.clone().unwrap_or_default(); + match build.method() { + BuildMethod::Dockerfile => { + debug!("Found dockerfile, build it with kaniko"); + self.builder = Some(kaniko::container(&self.actor.spec)); + } + BuildMethod::Buildpacks => { + debug!("Build the image with Cloud Native Buildpacks"); + self.builder = Some(buildpacks::container(&self.actor.spec)); + } + }; + + Ok(()) + } + + async fn deploy(&self) -> Result<()> { + let name = self.actor.name_any(); + let namespace = self.actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; + + let resource = deployment::new(&self.actor, self.pod()?)?; + debug!("The Deployment resource:\n {:?}\n", resource); + + match deployment::exists(&self.k8s, &namespace, &name).await? { + true => { + // Deployment already exists, update it if there are new changes + info!("Try to refresh an existing Deployment {name}"); + let expected_hash = hash(&self.actor.spec)?; + deployment::update(&self.k8s, &namespace, &name, resource, expected_hash).await?; + } + false => { + // Create a new Deployment + deployment::create(&self.k8s, &namespace, resource).await?; + info!("Created new Deployment: {name}"); + } + } + + Ok(()) + } + + async fn serve(&self) -> Result<()> { + let name = self.actor.name_any(); + match service::exists(&self.k8s, &self.actor).await? { + true => { + info!("Try to refresh an existing Service {name}"); + service::update(&self.k8s, &self.actor).await?; + } + false => { + service::create(&self.k8s, &self.actor).await?; + info!("Created new Service: {name}"); + } + } + + Ok(()) + } + + fn pod(&self) -> Result { + let mut pod = PodSpec::default(); + + let mut init_containers = vec![]; + let mut containers = vec![]; + + // Arrange containers according to synchronization method and frequency + if self.live { + // Sync the local source to the server (live), the syncer and builder is required. + let syncer = self.syncer.as_ref().ok_or_else(|| Error::MissingSyncer)?; + let builder = self.builder.as_ref().ok_or_else(|| Error::MissingBuilder)?; + + // the syncer in init container if sync once (exit after sync once), + // else, the syncer is sidecar container, it will keep watching the changes. + match self.once { + true => init_containers.push(syncer.clone()), + false => containers.push(syncer.clone()), + } + + // whatever sync once or not, the builder is the main container. + // the executor will run the application in the builder container, directly. + containers.push(builder.clone()); + } else { + // Pull the source from git repo (not live), and exit after sync once (once). + // the syncer and builder in init containers, app as the main container. + if let Some(syncer) = &self.syncer { + init_containers.push(syncer.clone()); + } + + if let Some(builder) = &self.builder { + init_containers.push(builder.clone()); + } + + // If the docker image is already built, the syncer and builder will be skipped. + // The application will be the main container. + containers.push(application::container(&self.actor.spec)); + } + + pod.init_containers = Some(init_containers); + pod.containers = containers.clone(); + pod.volumes = Some(vec![workspace_volume(), docker_config_volume()]); + + Ok(pod) + } +} + +/// Get the playbook name from the owner reference. +#[inline] +fn owner_reference(actor: &Actor) -> Result { + actor + .owner_references() + .iter() + .find_map(|owner| (owner.kind == "Playbook").then(|| owner.name.clone())) + .ok_or_else(|| Error::MissingObjectKey(".metadata.ownerReferences")) +} diff --git a/resources/src/deployment.rs b/resources/src/deployment.rs index ebc1446..1e6e54e 100644 --- a/resources/src/deployment.rs +++ b/resources/src/deployment.rs @@ -16,66 +16,57 @@ use std::collections::BTreeMap; use amp_common::resource::Actor; use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec}; -use k8s_openapi::api::core::v1::PodTemplateSpec; +use k8s_openapi::api::core::v1::{PodSpec, PodTemplateSpec}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; use kube::api::{Patch, PatchParams, PostParams}; use kube::core::ObjectMeta; use kube::{Api, Client, Resource, ResourceExt}; use tracing::{debug, info}; -use crate::containers::{application, devcontainer}; - use super::error::{Error, Result}; use super::{hash, LAST_APPLIED_HASH_KEY}; -pub async fn exists(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let name = actor.name_any(); - - Ok(api.get_opt(&name).await.map_err(Error::KubeError)?.is_some()) +pub async fn exists(client: &Client, namespace: &str, name: &str) -> Result { + let api: Api = Api::namespaced(client.clone(), namespace); + Ok(api.get_opt(name).await.map_err(Error::KubeError)?.is_some()) } -pub async fn create(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - - let resource = new(actor)?; - debug!("The Deployment resource:\n {:?}\n", resource); - +pub async fn create(client: &Client, namespace: &str, resource: Deployment) -> Result { + let api: Api = Api::namespaced(client.clone(), namespace); let deployment = api.create(&PostParams::default(), &resource).await.map_err(Error::KubeError)?; - info!("Created Deployment: {}", deployment.name_any()); + Ok(deployment) } -pub async fn update(client: &Client, actor: &Actor) -> Result { - let namespace = actor.namespace().ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?; - let api: Api = Api::namespaced(client.clone(), namespace.as_str()); - let name = actor.name_any(); +pub async fn update( + client: &Client, + namespace: &str, + name: &str, + resource: Deployment, + expected_hash: String, +) -> Result { + let api: Api = Api::namespaced(client.clone(), namespace); + let mut deployment = api.get(name).await.map_err(Error::KubeError)?; + debug!("The Deployment {} already exists: {:?}", name, deployment); - let mut deployment = api.get(&name).await.map_err(Error::KubeError)?; - debug!("The Deployment {} already exists: {:?}", &name, deployment); - - let expected_hash = hash(&actor.spec)?; let found_hash: String = deployment.annotations().get(LAST_APPLIED_HASH_KEY).map_or("".into(), |v| v.into()); if found_hash == expected_hash { - debug!("The Deployment {} is already up-to-date", &name); + debug!("The Deployment {} is already up-to-date", name); return Ok(deployment); } - let resource = new(actor)?; debug!("The updating Deployment resource:\n {:?}\n", resource); let params = &PatchParams::apply("amp-controllers").force(); - deployment = api.patch(&name, params, &Patch::Apply(&resource)).await.map_err(Error::KubeError)?; + deployment = api.patch(name, params, &Patch::Apply(&resource)).await.map_err(Error::KubeError)?; info!("Updated Deployment: {}", deployment.name_any()); Ok(deployment) } -fn new(actor: &Actor) -> Result { +pub fn new(actor: &Actor, pod: PodSpec) -> Result { let name = actor.name_any(); // Build the metadata for the deployment @@ -93,9 +84,6 @@ fn new(actor: &Actor) -> Result { ..Default::default() }; - // Build the spec for the pod, depend on whether the actor is live or not. - let pod = if actor.spec.live { devcontainer::pod(actor)? } else { application::pod(actor) }; - // Build the spec for the deployment let spec = DeploymentSpec { selector: LabelSelector { match_labels: Some(labels.clone()), ..Default::default() }, diff --git a/resources/src/error.rs b/resources/src/error.rs index 3e03b0b..1aaac93 100644 --- a/resources/src/error.rs +++ b/resources/src/error.rs @@ -33,6 +33,21 @@ pub enum Error { // ApiError(#[source] ApiError), #[error("Metrics not available at the moment")] MetricsNotAvailable, + + #[error("Unknown Syncer: {0}")] + UnknownSyncer(String), + + #[error("Unknown Builder: {0}")] + UnknownBuilder(String), + + #[error("DockerRegistryExistsFailed: {0}")] + DockerRegistryExistsFailed(#[source] anyhow::Error), + + #[error("MissingSyncer")] + MissingSyncer, + + #[error("MissingBuilder")] + MissingBuilder, } pub type Result = std::result::Result; diff --git a/resources/src/lib.rs b/resources/src/lib.rs index 979c59b..764bf30 100644 --- a/resources/src/lib.rs +++ b/resources/src/lib.rs @@ -19,10 +19,10 @@ use sha2::{Digest, Sha256}; use self::error::{Error, Result}; pub mod actor; -pub mod builder; pub mod character; pub mod containers; pub mod credential; +pub mod deployer; pub mod deployment; pub mod error; pub mod event;