From d2117c5e85767523d88cd8883c94d23904afe602 Mon Sep 17 00:00:00 2001 From: David Stancu Date: Wed, 7 Dec 2022 17:19:39 -0500 Subject: [PATCH] include job settings in idempotency token --- Cargo.lock | 2 +- charts/databricks-kube-operator/Chart.yaml | 2 +- .../templates/sts.yaml | 2 +- databricks-kube/Cargo.toml | 2 +- databricks-kube/src/crds/databricks_job.rs | 38 +++++-------------- 5 files changed, 14 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f71c3ea..ecb0f4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "databricks_kube" -version = "0.3.1" +version = "0.3.2" dependencies = [ "assert-json-diff", "async-stream", diff --git a/charts/databricks-kube-operator/Chart.yaml b/charts/databricks-kube-operator/Chart.yaml index f6d4026..4fe7490 100644 --- a/charts/databricks-kube-operator/Chart.yaml +++ b/charts/databricks-kube-operator/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.3.1 +version: 0.3.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/databricks-kube-operator/templates/sts.yaml b/charts/databricks-kube-operator/templates/sts.yaml index b3c0f9c..fd9aab3 100644 --- a/charts/databricks-kube-operator/templates/sts.yaml +++ b/charts/databricks-kube-operator/templates/sts.yaml @@ -18,7 +18,7 @@ spec: terminationGracePeriodSeconds: 10 containers: - name: dko - image: ghcr.io/mach-kernel/databricks-kube-operator:0.3.1 + image: ghcr.io/mach-kernel/databricks-kube-operator:0.3.2 imagePullPolicy: Always env: - name: DATABRICKS_KUBE_CONFIGMAP diff --git a/databricks-kube/Cargo.toml b/databricks-kube/Cargo.toml index 0043a1e..9d655ad 100644 --- a/databricks-kube/Cargo.toml +++ b/databricks-kube/Cargo.toml @@ -5,7 +5,7 @@ path = "src/crdgen.rs" [package] name = "databricks_kube" default-run = "databricks_kube" -version = "0.3.1" +version = "0.3.2" edition = "2021" [dependencies] diff --git a/databricks-kube/src/crds/databricks_job.rs b/databricks-kube/src/crds/databricks_job.rs index 3e638e1..9c21a29 100644 --- a/databricks-kube/src/crds/databricks_job.rs +++ b/databricks-kube/src/crds/databricks_job.rs @@ -10,13 +10,13 @@ use crate::{ traits::rest_config::RestConfig, }; -use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState}; +use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState, JobSettings}; use databricks_rust_jobs::{ apis::default_api, models::{ job::Job, job_settings, jobs_create_request, JobsCreate200Response, JobsCreateRequest, JobsDeleteRequest, JobsGet200Response, JobsList200Response, JobsRunNowRequest, - JobsUpdateRequest, Run, + JobsUpdateRequest, }, }; @@ -152,36 +152,17 @@ impl From for Job { } impl DatabricksJob { - fn hash_run_request(request: &JobsRunNowRequest) -> u64 { + fn hash_run_request(request: &JobsRunNowRequest, settings: Option>) -> u64 { let mut hasher = DefaultHasher::new(); - request.job_id.hash(&mut hasher); - request.jar_params.hash(&mut hasher); - request.python_params.hash(&mut hasher); - request.spark_submit_params.hash(&mut hasher); + let request_as_json = serde_json::to_string(&request).unwrap(); - for val in request.python_named_params.iter().flat_map(|z| z.values()) { - val.to_string().hash(&mut hasher); + if let Some(settings) = settings { + let settings_as_json = serde_json::to_string(&settings).unwrap(); + settings_as_json.hash(&mut hasher); } - for val in request.notebook_params.iter().flat_map(|z| z.values()) { - val.to_string().hash(&mut hasher); - } - - for val in request.sql_params.iter().flat_map(|z| z.values()) { - val.to_string().hash(&mut hasher); - } - - if request.pipeline_params.is_some() { - request - .pipeline_params - .clone() - .unwrap() - .full_refresh - .hash(&mut hasher); - } - - request.dbt_commands.hash(&mut hasher); + request_as_json.hash(&mut hasher); // Databricks docs state a 64 char limit for the idempotency token, // so we can get away with coercing i64 to a string @@ -223,6 +204,7 @@ impl RemoteAPIResource for DatabricksJob { context: Arc, ) -> Pin> + Send>> { let job_id = self.spec().job.job_id.clone(); + let job_settings = self.spec().job.settings.clone(); let run_request = self.spec().run.clone(); let self_name = self.name_unchecked(); @@ -239,7 +221,7 @@ impl RemoteAPIResource for DatabricksJob { job_id, ..run_request.unwrap() }; - run_request.idempotency_token = Some(Self::hash_run_request(&run_request).to_string()); + run_request.idempotency_token = Some(Self::hash_run_request(&run_request, job_settings).to_string()); let triggered = default_api::jobs_run_now(&config, Some(run_request)).await?;