diff --git a/Cargo.lock b/Cargo.lock index 6c8afdf..7ef0e4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,7 +100,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] @@ -365,7 +365,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] @@ -387,12 +387,12 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] name = "databricks_kube" -version = "0.5.1" +version = "0.6.0" dependencies = [ "assert-json-diff", "async-stream", @@ -409,10 +409,12 @@ dependencies = [ "kube", "lazy_static", "log", + "reqwest", "schemars", "serde", "serde_json", "serde_yaml", + "thiserror", "tokio", "tokio-graceful-shutdown", "tokio-stream", @@ -1566,18 +1568,18 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -1818,7 +1820,7 @@ checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] @@ -1881,7 +1883,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] @@ -1950,9 +1952,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.15" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -1984,22 +1986,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.48", ] [[package]] diff --git a/README.md b/README.md index 0da2792..b02593d 100644 --- a/README.md +++ b/README.md @@ -63,31 +63,52 @@ EOF ### Usage -See the examples directory for samples of Databricks CRDs. Resources that are created via Kubernetes are owned by the operator: your checked-in manifests are the source of truth. It will not sync anything other than status back from the API, and overwrite changes made by users from the Databricks webapp. - -You may provide the `databricks-operator/owner` annotation as shown below (to be explicit). However, all resources created in Kube first (i.e. no associated API object found) are assumed to be owned by the operator. +See the examples directory for samples of Databricks CRDs. Resources that are created via Kubernetes are owned by the operator: your checked-in manifests are the source of truth. ```yaml apiVersion: com.dstancu.databricks/v1 kind: DatabricksJob metadata: - name: my-super-cool-job + name: my-word-count namespace: default - annotations: - databricks-operator/owner: operator +spec: + job: + settings: + email_notifications: + no_alert_for_skipped_runs: false + format: MULTI_TASK + job_clusters: + - job_cluster_key: word-count-cluster + new_cluster: + ... + max_concurrent_runs: 1 + name: my-word-count + git_source: + git_branch: misc-and-docs + git_provider: gitHub + git_url: https://github.com/mach-kernel/databricks-kube-operator + tasks: + - email_notifications: {} + job_cluster_key: word-count-cluster + notebook_task: + notebook_path: examples/job.py + source: GIT + task_key: my-word-count + timeout_seconds: 0 + timeout_seconds: 0 ``` -It is also possible to set a resource's owner to `api`, which will update the Kubernetes resource as it changes on Databricks. +Changes made by users in the Databricks webapp will be overwritten by the operator if drift is detected: -```yaml -apiVersion: com.dstancu.databricks/v1 -kind: DatabricksJob -metadata: - annotations: - databricks-operator/owner: api - generation: 1 - name: hello-world - ... +``` +[2024-01-11T14:20:40Z INFO databricks_kube::traits::remote_api_resource] Resource DatabricksJob my-word-count drifted! + Diff (remote, kube): + json atoms at path ".settings.tasks[0].notebook_task.notebook_path" are not equal: + lhs: + "examples/job_oops_is_this_right.py" + rhs: + "examples/job.py" +[2024-01-11T14:20:40Z INFO databricks_kube::traits::remote_api_resource] Resource DatabricksJob my-word-count reconciling drift... ``` Look at jobs (allowed to be viewed by the operator's access token): @@ -217,7 +238,6 @@ Want to add support for a new API? Provided it has an OpenAPI definition, these * Download API definition into `openapi/` and make a [Rust generator configuration](https://openapi-generator.tech/docs/generators/rust/) (feel free to copy the others and change name) * Generate the SDK, add it to the Cargo workspace and dependencies for `databricks-kube/` * Implement `RestConfig` for your new client -* Implement `From>` for `DatabricksKubeError` * Define the new CRD Spec type ([follow kube-rs tutorial](https://kube.rs/getting-started/)) * `impl RemoteAPIResource for MyNewCRD` * `impl StatusAPIResource for MyNewCRD` and [specify `TStatusType` in your CRD](https://github.com/kube-rs/kube/blob/main/examples/crd_derive.rs#L20) diff --git a/charts/databricks-kube-operator/Chart.yaml b/charts/databricks-kube-operator/Chart.yaml index 84abfea..5fb83d1 100644 --- a/charts/databricks-kube-operator/Chart.yaml +++ b/charts/databricks-kube-operator/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 -appVersion: 0.5.1 +appVersion: 0.6.0 name: databricks-kube-operator description: A kube-rs operator for managing Databricks API resources -version: 0.5.5 +version: 0.6.0 home: https://github.com/mach-kernel/databricks-kube-operator sources: diff --git a/databricks-kube/Cargo.toml b/databricks-kube/Cargo.toml index fb3d9b7..b18f8f2 100644 --- a/databricks-kube/Cargo.toml +++ b/databricks-kube/Cargo.toml @@ -5,7 +5,7 @@ path = "src/crdgen.rs" [package] name = "databricks_kube" default-run = "databricks_kube" -version = "0.5.1" +version = "0.6.0" edition = "2021" [dependencies] @@ -31,6 +31,11 @@ schemars = { version = "0.8.11", features = ["derive"] } tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] } tokio-graceful-shutdown = "0.11.1" tokio-stream = "0.1.11" +thiserror = "1.0.56" + +[dependencies.reqwest] +version = "^0.11" +features = ["json", "multipart"] [dev-dependencies] tower-test = "0.4.0" diff --git a/databricks-kube/src/crds/databricks_job.rs b/databricks-kube/src/crds/databricks_job.rs index c295ad8..3b942e7 100644 --- a/databricks-kube/src/crds/databricks_job.rs +++ b/databricks-kube/src/crds/databricks_job.rs @@ -8,9 +8,7 @@ use crate::{ traits::rest_config::RestConfig, }; -use databricks_rust_jobs::models::{ - JobsRunsList200Response, RunLifeCycleState, RunState, -}; +use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState}; use databricks_rust_jobs::{ apis::default_api, models::{ @@ -36,8 +34,6 @@ pub struct DatabricksJobStatus { pub latest_run_state: Option, } -// TODO: We added `NO_RUNS` to `RunLifeCycleState` because -// it was the laziest way to surface this, but should probably revisit impl Default for DatabricksJobStatus { fn default() -> Self { Self { @@ -180,7 +176,7 @@ impl RemoteAPIResource for DatabricksJob { } #[allow(irrefutable_let_patterns)] - fn every_reconcile_owned( + fn every_reconcile( &self, _: Arc, ) -> Pin> + Send>> { @@ -191,13 +187,13 @@ impl RemoteAPIResource for DatabricksJob { &self, context: Arc, ) -> Pin> + Send>> { - let job_id = self.spec().job.job_id; + let job_id = self + .spec() + .job + .job_id + .ok_or(DatabricksKubeError::IDUnsetError); try_stream! { - if job_id.is_none() { - yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?; - } - let config = Job::get_rest_config(context.clone()).await.unwrap(); let JobsGet200Response { @@ -206,7 +202,7 @@ impl RemoteAPIResource for DatabricksJob { settings, created_time, .. - } = default_api::jobs_get(&config, job_id.unwrap()).await?; + } = default_api::jobs_get(&config, job_id?).await?; yield Job { job_id, @@ -273,13 +269,14 @@ impl RemoteAPIResource for DatabricksJob { { let job = self.spec().job.clone(); let job_settings = job.settings.as_ref().cloned(); - let job_id = self.spec().job.job_id; - try_stream! { - if job_id.is_none() { - yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?; - } + let job_id = self + .spec() + .job + .job_id + .ok_or(DatabricksKubeError::IDUnsetError); + try_stream! { let config = Job::get_rest_config(context.clone()).await.unwrap(); default_api::jobs_update( @@ -288,7 +285,7 @@ impl RemoteAPIResource for DatabricksJob { /// TODO: unsupported atm // access_control_list: job.access_control_list JobsUpdateRequest { - job_id: job_id.unwrap(), + job_id: job_id?, new_settings: job_settings, ..JobsUpdateRequest::default() } @@ -305,17 +302,18 @@ impl RemoteAPIResource for DatabricksJob { &self, context: Arc, ) -> Pin> + Send + '_>> { - let job_id = self.spec().job.job_id; + let job_id = self + .spec() + .job + .job_id + .ok_or(DatabricksKubeError::IDUnsetError); try_stream! { - if job_id.is_none() { - yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?; - } - let config = Job::get_rest_config(context.clone()).await.unwrap(); + default_api::jobs_delete( &config, - JobsDeleteRequest { job_id: job_id.unwrap(), } + JobsDeleteRequest { job_id: job_id?, } ).await?; yield () diff --git a/databricks-kube/src/crds/git_credential.rs b/databricks-kube/src/crds/git_credential.rs index c62498e..468c11a 100644 --- a/databricks-kube/src/crds/git_credential.rs +++ b/databricks-kube/src/crds/git_credential.rs @@ -99,13 +99,11 @@ impl RemoteAPIResource for GitCredential { &self, context: Arc, ) -> Pin> + Send>> { - let credential_id = - self.spec() - .credential - .credential_id - .ok_or(DatabricksKubeError::APIError( - "Remote resource cannot exist".to_string(), - )); + let credential_id = self + .spec() + .credential + .credential_id + .ok_or(DatabricksKubeError::IDUnsetError); try_stream! { let config = APICredential::get_rest_config(context.clone()).await.unwrap(); @@ -126,7 +124,7 @@ impl RemoteAPIResource for GitCredential { try_stream! { let config = APICredential::get_rest_config(context.clone()).await.unwrap(); - let secret_name = self.spec().secret_name.clone().ok_or(DatabricksKubeError::SecretMissingError)?; + let secret_name = self.spec().secret_name.clone().ok_or(DatabricksKubeError::SecretMissingError("".to_string()))?; log::info!("Reading secret {}", secret_name); let secrets_api = Api::::default_namespaced(context.client.clone()); @@ -138,7 +136,7 @@ impl RemoteAPIResource for GitCredential { .flat_map(|m| m.get("personal_access_token").map(Clone::clone)) .flat_map(|buf| std::str::from_utf8(&buf.0).ok().map(ToString::to_string)) .next() - .ok_or(DatabricksKubeError::SecretMissingError)?; + .ok_or(DatabricksKubeError::SecretMissingError(secret_name))?; let new_credential = default_api::create_git_credential( &config, @@ -168,7 +166,7 @@ impl RemoteAPIResource for GitCredential { try_stream! { let config = APICredential::get_rest_config(context.clone()).await.unwrap(); - let secret_name = self.spec().secret_name.clone().ok_or(DatabricksKubeError::SecretMissingError)?; + let secret_name = self.spec().secret_name.clone().ok_or(DatabricksKubeError::SecretMissingError("".to_string()))?; log::info!("Reading secret {}", secret_name); let secrets_api = Api::::default_namespaced(context.client.clone()); @@ -180,7 +178,7 @@ impl RemoteAPIResource for GitCredential { .flat_map(|m| m.get("personal_access_token").map(Clone::clone)) .flat_map(|buf| std::str::from_utf8(&buf.0).ok().map(ToString::to_string)) .next() - .ok_or(DatabricksKubeError::SecretMissingError)?; + .ok_or(DatabricksKubeError::SecretMissingError(secret_name))?; default_api::update_git_credential( &config, diff --git a/databricks-kube/src/crds/repo.rs b/databricks-kube/src/crds/repo.rs index e2c300a..7e8284a 100644 --- a/databricks-kube/src/crds/repo.rs +++ b/databricks-kube/src/crds/repo.rs @@ -91,9 +91,7 @@ impl RemoteAPIResource for Repo { .spec() .repository .id - .ok_or(DatabricksKubeError::APIError( - "Remote resource cannot exist".to_string(), - )); + .ok_or(DatabricksKubeError::IDUnsetError); try_stream! { let config = APIRepo::get_rest_config(context.clone()).await.unwrap(); diff --git a/databricks-kube/src/error.rs b/databricks-kube/src/error.rs index 6b79a1d..9e9b63d 100644 --- a/databricks-kube/src/error.rs +++ b/databricks-kube/src/error.rs @@ -1,44 +1,87 @@ -use std::error::Error; +use serde::Serialize; +use serde_json::to_value; use std::fmt::{Debug, Display}; +use thiserror::Error; -use crate::context::CONFIGMAP_NAME; - -use databricks_rust_git_credentials::apis::Error as GitCredentialAPIError; -use databricks_rust_jobs::apis::Error as JobsAPIError; -use databricks_rust_repos::apis::Error as ReposAPIError; +use databricks_rust_git_credentials::apis::{ + Error as GitCredentialAPIError, ResponseContent as GitCredentialsResponseContent, +}; +use databricks_rust_jobs::apis::{Error as JobsAPIError, ResponseContent as JobsResponseContent}; +use databricks_rust_repos::apis::{ + Error as ReposAPIError, ResponseContent as ReposResponseContent, +}; use kube::runtime::finalizer::Error as KubeFinalizerError; impl From> for DatabricksKubeError where - T: Debug, + T: Debug + Serialize + 'static, { fn from(e: JobsAPIError) -> Self { - Self::APIError(format!("{:?}", e)) + match e { + JobsAPIError::ResponseError(JobsResponseContent { + status, + content, + entity, + }) => Self::APIError(OpenAPIError::ResponseError(SerializableResponseContent { + status, + content, + entity: entity.and_then(|e| to_value(e).ok()), + })), + JobsAPIError::Io(e) => Self::APIError(OpenAPIError::Io(e)), + JobsAPIError::Serde(e) => Self::APIError(OpenAPIError::Serde(e)), + JobsAPIError::Reqwest(e) => Self::APIError(OpenAPIError::Reqwest(e)), + } } } impl From> for DatabricksKubeError where - T: Debug, + T: Debug + Serialize + 'static, { fn from(e: GitCredentialAPIError) -> Self { - Self::APIError(format!("{:?}", e)) + match e { + GitCredentialAPIError::ResponseError(GitCredentialsResponseContent { + status, + content, + entity, + }) => Self::APIError(OpenAPIError::ResponseError(SerializableResponseContent { + status, + content, + entity: entity.and_then(|e| to_value(e).ok()), + })), + GitCredentialAPIError::Io(e) => Self::APIError(OpenAPIError::Io(e)), + GitCredentialAPIError::Serde(e) => Self::APIError(OpenAPIError::Serde(e)), + GitCredentialAPIError::Reqwest(e) => Self::APIError(OpenAPIError::Reqwest(e)), + } } } impl From> for DatabricksKubeError where - T: Debug, + T: Debug + Serialize + 'static, { fn from(e: ReposAPIError) -> Self { - Self::APIError(format!("{:?}", e)) + match e { + ReposAPIError::ResponseError(ReposResponseContent { + status, + content, + entity, + }) => Self::APIError(OpenAPIError::ResponseError(SerializableResponseContent { + status, + content, + entity: entity.and_then(|e| to_value(e).ok()), + })), + ReposAPIError::Io(e) => Self::APIError(OpenAPIError::Io(e)), + ReposAPIError::Serde(e) => Self::APIError(OpenAPIError::Serde(e)), + ReposAPIError::Reqwest(e) => Self::APIError(OpenAPIError::Reqwest(e)), + } } } impl From> for DatabricksKubeError where T: Debug, - T: Error, + T: std::error::Error, { fn from(e: KubeFinalizerError) -> Self { Self::FinalizerError(format!("{:?}", e)) @@ -46,51 +89,70 @@ where } #[derive(Debug)] +pub struct SerializableResponseContent { + pub status: reqwest::StatusCode, + pub content: String, + pub entity: Option, +} + +#[derive(Error, Debug)] +pub enum OpenAPIError { + Reqwest(#[from] reqwest::Error), + Serde(#[from] serde_json::Error), + Io(#[from] std::io::Error), + ResponseError(SerializableResponseContent), +} + +impl Display for OpenAPIError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OpenAPIError::Reqwest(err) => Display::fmt(err, f), + OpenAPIError::Serde(err) => Display::fmt(err, f), + OpenAPIError::Io(err) => Display::fmt(err, f), + OpenAPIError::ResponseError(SerializableResponseContent { + status, + content, + entity, + }) => write!(f, "API response error: {} {} {:?}", status, content, entity), + } + } +} + +#[derive(Error, Debug)] #[allow(dead_code)] pub enum DatabricksKubeError { - APIError(String), - ConfigMapMissingError, + #[error("Error calling Databricks API: {0}")] + APIError(#[from] OpenAPIError), + + #[error("Timed out waiting for config map {0}")] + ConfigMapMissingError(String), + + #[error("Controller reconciliation failed")] ControllerError(String), + + #[error("Timed out waiting for credentials")] CredentialsError, + + #[error( + "Timed out while waiting for CRD: {0}\n\nGet all CRDs by running:\ncargo run --bin crd_gen" + )] CRDMissingError(String), - SecretMissingError, - Shutdown(String), + + #[error("Finalizer error: {0}")] + FinalizerError(String), + + #[error("The resource ID is unset")] + IDUnsetError, + + #[error("Unable to update resource for: {0}")] ResourceUpdateError(String), + + #[error("Unable to get resource status for: {0}")] ResourceStatusError(String), - FinalizerError(String), -} -impl Display for DatabricksKubeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let msg = match self { - DatabricksKubeError::APIError(err )=> format!( - "Error calling Databricks API:\n{}", - err - ), - DatabricksKubeError::ControllerError(s) => format!( - "Controller reconciliation failed:\n{}", - s - ), - DatabricksKubeError::ConfigMapMissingError => format!( - "Timed out while waiting for config map: {}", - *CONFIGMAP_NAME - ), - DatabricksKubeError::CredentialsError => "Unable to get credentials".to_owned(), - DatabricksKubeError::CRDMissingError(crd) => format!( - "Timed out while waiting for CRD: {}\n\nGet all CRDs by running:\ncargo run --bin crd_gen", - crd - ), - DatabricksKubeError::Shutdown(s) => format!( - "Shutdown requested, exit: {}", - s - ), - DatabricksKubeError::SecretMissingError => "The secret referenced by this resource is missing".to_owned(), - DatabricksKubeError::ResourceUpdateError(s) => format!("Unable to update K8S Resource {}", s), - DatabricksKubeError::ResourceStatusError(s) => format!("Unable to get status {}", s), - DatabricksKubeError::FinalizerError(s) => format!("Finalizer failed {}", s), - }; - write!(f, "{}", msg) - } -} + #[error("Secret {0} is missing")] + SecretMissingError(String), -impl Error for DatabricksKubeError {} + #[error("Shutdown requested: {0}")] + Shutdown(String), +} diff --git a/databricks-kube/src/traits/remote_api_resource.rs b/databricks-kube/src/traits/remote_api_resource.rs index 0ac9481..a37632d 100644 --- a/databricks-kube/src/traits/remote_api_resource.rs +++ b/databricks-kube/src/traits/remote_api_resource.rs @@ -5,7 +5,7 @@ use crate::{context::Context, error::DatabricksKubeError}; use assert_json_diff::assert_json_matches_no_panic; use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use k8s_openapi::{DeepMerge, NamespaceResourceScope}; +use k8s_openapi::NamespaceResourceScope; use kube::{ api::PostParams, @@ -44,134 +44,91 @@ where { let mut resource = resource; let kube_api = Api::::default_namespaced(context.client.clone()); - let latest_remote = resource.remote_get(context.clone()).next().await.unwrap(); - - let op_config = context.clone().get_operator_config(); - let mut requeue_interval_sec = 300; - - if op_config.is_some() { - requeue_interval_sec = op_config.unwrap().default_requeue_interval.unwrap(); - } - - // todo: enum - let owner = resource - .annotations() - .get("databricks-operator/owner") - .map(Clone::clone) - .unwrap_or("operator".to_string()); - - // Create if owned - if (owner == "operator") && latest_remote.is_err() { - log::info!( - "Resource {} {} is missing in Databricks, creating", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - let created = resource - .remote_create(context.clone()) - .next() - .await - .unwrap()?; - - log::info!( - "Created {} {} in Databricks", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - kube_api - .replace(&resource.name_unchecked(), &PostParams::default(), &created) - .await - .map_err(|e| DatabricksKubeError::ResourceUpdateError(e.to_string()))?; - - log::info!( - "Updated {} {} in K8S", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - return Ok(Action::requeue(Duration::from_secs(requeue_interval_sec))); - } - - let latest_remote = latest_remote?; let kube_as_api: TAPIType = resource.as_ref().clone().into(); - if latest_remote != kube_as_api { - log::info!( - "Resource {} {} drifted!\nDiff (remote, kube):\n{}", - TCRDType::api_resource().kind, - resource.name_unchecked(), - assert_json_matches_no_panic( - &latest_remote, - &kube_as_api, - assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict) - ) - .unwrap_err() - ); - } - - // Push to API if operator owned, or let user know - if (latest_remote != kube_as_api) && (owner == "operator") { - log::info!( - "Resource {} {} is owned by databricks-kube-operator, reconciling drift...", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - let updated = resource - .remote_update(context.clone()) - .next() - .await - .unwrap()?; - - let replaced = kube_api - .replace(&resource.name_unchecked(), &PostParams::default(), &updated) - .await - .map_err(|e| DatabricksKubeError::ResourceUpdateError(e.to_string()))?; - - resource = replaced.into(); - - log::info!( - "Updated {} {} in K8S", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - } else if latest_remote != kube_as_api { - log::info!( - "Resource {} {} is not owned by databricks-kube-operator, updating Kubernetes object.\nTo push updates to Databricks, ensure databricks-operator/owner: operator", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - let mut latest_as_kube: TCRDType = latest_remote.into(); - latest_as_kube - .annotations_mut() - .extend(resource.annotations().clone()); - latest_as_kube - .labels_mut() - .extend(resource.labels().clone()); - latest_as_kube - .meta_mut() - .merge_from(resource.meta().clone()); - - let replaced = kube_api - .replace( - &resource.name_unchecked(), - &PostParams::default(), - &latest_as_kube, - ) - .await - .map_err(|e| DatabricksKubeError::ResourceUpdateError(e.to_string()))?; - - resource = replaced.into(); - } - - if owner == "operator" { - resource.every_reconcile_owned(context.clone()).await?; + let latest_remote = resource.remote_get(context.clone()).next().await.unwrap(); + let requeue_secs = context + .as_ref() + .get_operator_config() + .and_then(|c| c.default_requeue_interval) + .unwrap_or(300); + + match latest_remote { + Err(DatabricksKubeError::IDUnsetError) => { + log::info!( + "Resource {} {} is missing in Databricks, creating", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + + let created = resource + .remote_create(context.clone()) + .next() + .await + .unwrap()?; + + log::info!( + "Created {} {} in Databricks", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + + kube_api + .replace(&resource.name_unchecked(), &PostParams::default(), &created) + .await + .map_err(|e| DatabricksKubeError::ResourceUpdateError(e.to_string()))?; + + log::info!( + "Updated {} {} in K8S", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + } + Err(other) => return Err(other), + Ok(remote) => { + if remote != kube_as_api { + log::info!( + "Resource {} {} drifted!\nDiff (remote, kube):\n{}", + TCRDType::api_resource().kind, + resource.name_unchecked(), + assert_json_matches_no_panic( + &remote, + &kube_as_api, + assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict) + ) + .unwrap_err() + ); + + log::info!( + "Resource {} {} reconciling drift...", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + + let updated = resource + .remote_update(context.clone()) + .next() + .await + .unwrap()?; + + let replaced = kube_api + .replace(&resource.name_unchecked(), &PostParams::default(), &updated) + .await + .map_err(|e| DatabricksKubeError::ResourceUpdateError(e.to_string()))?; + + resource = replaced.into(); + + log::info!( + "Updated {} {} in K8S", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + } + } } - Ok(Action::requeue(Duration::from_secs(requeue_interval_sec))) + resource.every_reconcile(context.clone()).await?; + Ok(Action::requeue(Duration::from_secs(requeue_secs))) } #[allow(dead_code)] @@ -199,27 +156,19 @@ where TAPIType: Serialize, TAPIType: 'static, { - let owner = resource - .annotations() - .get("databricks-operator/owner") - .map(Clone::clone) - .unwrap_or("operator".to_string()); - - if owner == "operator" { - log::info!( - "Removing {} {} from Databricks", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - - resource.remote_delete(context.clone()).next().await; - - log::info!( - "Removed {} {} from Databricks", - TCRDType::api_resource().kind, - resource.name_unchecked() - ); - } + log::info!( + "Removing {} {} from Databricks", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); + + resource.remote_delete(context.clone()).next().await; + + log::info!( + "Removed {} {} from Databricks", + TCRDType::api_resource().kind, + resource.name_unchecked() + ); Ok(Action::await_change()) } @@ -325,7 +274,7 @@ pub trait RemoteAPIResource { ) } - fn every_reconcile_owned( + fn every_reconcile( &self, _context: Arc, ) -> Pin> + Send>> { diff --git a/databricks-kube/src/util.rs b/databricks-kube/src/util.rs index 4b52b0c..2197481 100644 --- a/databricks-kube/src/util.rs +++ b/databricks-kube/src/util.rs @@ -145,7 +145,9 @@ pub async fn ensure_api_secret( .flatten() .last() .flatten() - .ok_or(DatabricksKubeError::ConfigMapMissingError)?; + .ok_or(DatabricksKubeError::ConfigMapMissingError( + CONFIGMAP_NAME.clone(), + ))?; log::info!("Found API secret"); @@ -182,7 +184,9 @@ pub async fn ensure_configmap(cm_api: Api) -> Result for FakeResource { .boxed() } - fn every_reconcile_owned( + fn every_reconcile( &self, _context: Arc, ) -> Pin> + Send>> { @@ -82,7 +82,7 @@ impl RemoteAPIResource for FakeResource { .map(Clone::clone); try_stream! { - yield found.ok_or_else(|| DatabricksKubeError::APIError("Not found".to_string()))?; + yield found.ok_or_else(|| DatabricksKubeError::IDUnsetError)?; } .boxed() } @@ -303,81 +303,6 @@ async fn test_resource_kube_update_operator_owned() { .await; } -/// When an API owned resource is updated in Kubernetes -#[tokio::test] -async fn test_resource_kube_update_api_owned() { - let api_resource = FakeAPIResource { - id: 1, - description: None, - }; - TEST_STORE.pin().insert(1, api_resource.clone()); - - let mut resource: FakeResource = FakeResource::new( - "test", - FakeResourceSpec { - api_resource: api_resource.clone(), - }, - ); - resource.meta_mut().resource_version = Some("1".to_string()); - resource.meta_mut().annotations = Some({ - let mut annots = BTreeMap::new(); - annots.insert("databricks-operator/owner".to_string(), "api".to_string()); - annots - }); - // Bind the finalizer to avoid having to mock the PATCH request from the API - resource.meta_mut().finalizers = - Some(vec!["databricks-operator/remote_api_resource".to_owned()]); - - let updated_api_resource = FakeAPIResource { - id: 1, - description: Some("foobar".to_string()), - }; - - let mut updated_resource = resource.clone(); - updated_resource.spec.api_resource = updated_api_resource.clone(); - updated_resource.meta_mut().resource_version = Some("2".to_string()); - updated_resource.meta_mut().finalizers = resource.meta().finalizers.clone(); - - with_mocked_kube_server_and_controller( - move |mut handle| { - let r = resource.clone(); - let ur = updated_resource.clone(); - let ar = api_resource.clone(); - - async move { - loop { - mock_fake_resource_updated_kube( - &mut handle, - r.clone(), - ur.clone(), - ar.clone(), - Some("MODIFIED".to_string()), - ) - .await; - } - } - }, - |mut controller| async move { - // It reconciled successfully - let reconciled = controller.next().await; - assert!(reconciled.unwrap().is_ok()); - - // every_reconcile() was NOT triggered as the resource is not owned - assert!(!TEST_STORE.pin().contains_key(&-8675309)); - - // The object is the original API object - assert_eq!( - TEST_STORE.pin().get(&1).unwrap().clone(), - FakeAPIResource { - id: 1, - description: None - }, - ); - }, - ) - .await; -} - /// When the API resource is updated for an owned resource #[tokio::test] async fn test_resource_api_update_operator_owned() { @@ -441,66 +366,6 @@ async fn test_resource_api_update_operator_owned() { .await; } -/// When the API resource is updated for an API owned resource -#[tokio::test] -async fn test_resource_api_update_api_owned() { - // Begin with a CRD owned by the operator - let mut resource: FakeResource = FakeResource::new( - "foo", - FakeResourceSpec { - api_resource: FakeAPIResource { - id: 42, - description: None, - }, - }, - ); - - resource.meta_mut().annotations = Some({ - let mut annots = BTreeMap::new(); - annots.insert("databricks-operator/owner".to_string(), "api".to_string()); - annots - }); - // Bind the finalizer to avoid having to mock the PATCH request from the API - resource.meta_mut().finalizers = - Some(vec!["databricks-operator/remote_api_resource".to_owned()]); - - // Remote has a different value for "description" - let updated_resource = FakeAPIResource { - description: Some("hello".to_string()), - ..resource.spec().api_resource - }; - TEST_STORE.pin().insert(42, updated_resource.clone()); - - with_mocked_kube_server_and_controller( - move |mut handle| { - let r = resource.clone(); - let ur = updated_resource.clone(); - - async move { - loop { - mock_list_fake_resource( - &mut handle, - r.clone(), - // assertion made during PUT call - Some(ur.clone()), - None, - ) - .await; - } - } - }, - |mut controller| async move { - // It reconciled successfully - let reconciled = controller.next().await; - assert!(reconciled.unwrap().is_ok()); - - // every_reconcile() was NOT triggered as the resource is not owned - assert!(!TEST_STORE.pin().contains_key(&-8675309)); - }, - ) - .await; -} - /// When an owned Kubernetes resource matches the remote API #[tokio::test] async fn test_resource_in_sync() { @@ -624,56 +489,3 @@ async fn test_kube_delete_operator_owned() { }; timeout(Duration::from_secs(10), poll_store).await.unwrap(); } - -// When Kubernetes resource is deleted, but owned by remote API -#[tokio::test] -async fn test_kube_delete_api_owned() { - let mut resource: FakeResource = FakeResource::new( - "foo", - FakeResourceSpec { - api_resource: FakeAPIResource { - id: 1, - description: None, - }, - }, - ); - - resource.meta_mut().namespace = Some("default".to_string()); - resource.meta_mut().resource_version = Some("1".to_string()); - resource.meta_mut().annotations = Some({ - let mut annots = BTreeMap::new(); - annots.insert("databricks-operator/owner".to_string(), "api".to_string()); - annots - }); - // Bind the finalizer to avoid having to mock the PATCH request from the API - resource.meta_mut().finalizers = - Some(vec!["databricks-operator/remote_api_resource".to_owned()]); - - // Mark the resource as deleted - resource.meta_mut().deletion_timestamp = Some(Time(Utc::now())); - - TEST_STORE - .pin() - .insert(1, resource.spec().api_resource.clone()); - - with_mocked_kube_server_and_controller( - move |mut handle| { - let serve_me = resource.clone(); - - async move { - loop { - mock_fake_resource_deleted(&mut handle, serve_me.clone()).await; - } - } - }, - |mut controller| async move { - // It reconciled successfully and the resources are in sync - let reconciled = controller.next().await; - assert!(reconciled.unwrap().is_ok()); - - // The resource was NOT removed from the remote API - assert!(TEST_STORE.pin().get(&1).is_some()); - }, - ) - .await; -}