Skip to content

Commit

Permalink
Merge branch '120722/idempotency-token-job-settings'
Browse files Browse the repository at this point in the history
  • Loading branch information
mach-kernel committed Dec 7, 2022
2 parents 4423c72 + d2117c5 commit 23401a2
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/databricks-kube-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.3.1
version: 0.3.2

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
2 changes: 1 addition & 1 deletion charts/databricks-kube-operator/templates/sts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
terminationGracePeriodSeconds: 10
containers:
- name: dko
image: ghcr.io/mach-kernel/databricks-kube-operator:0.3.1
image: ghcr.io/mach-kernel/databricks-kube-operator:0.3.2
imagePullPolicy: Always
env:
- name: DATABRICKS_KUBE_CONFIGMAP
Expand Down
2 changes: 1 addition & 1 deletion databricks-kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ path = "src/crdgen.rs"
[package]
name = "databricks_kube"
default-run = "databricks_kube"
version = "0.3.1"
version = "0.3.2"
edition = "2021"

[dependencies]
Expand Down
38 changes: 10 additions & 28 deletions databricks-kube/src/crds/databricks_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use crate::{
traits::rest_config::RestConfig,
};

use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState};
use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState, JobSettings};
use databricks_rust_jobs::{
apis::default_api,
models::{
job::Job, job_settings, jobs_create_request, JobsCreate200Response, JobsCreateRequest,
JobsDeleteRequest, JobsGet200Response, JobsList200Response, JobsRunNowRequest,
JobsUpdateRequest, Run,
JobsUpdateRequest,
},
};

Expand Down Expand Up @@ -152,36 +152,17 @@ impl From<DatabricksJob> for Job {
}

impl DatabricksJob {
fn hash_run_request(request: &JobsRunNowRequest) -> u64 {
fn hash_run_request(request: &JobsRunNowRequest, settings: Option<Box<JobSettings>>) -> u64 {
let mut hasher = DefaultHasher::new();

request.job_id.hash(&mut hasher);
request.jar_params.hash(&mut hasher);
request.python_params.hash(&mut hasher);
request.spark_submit_params.hash(&mut hasher);
let request_as_json = serde_json::to_string(&request).unwrap();

for val in request.python_named_params.iter().flat_map(|z| z.values()) {
val.to_string().hash(&mut hasher);
if let Some(settings) = settings {
let settings_as_json = serde_json::to_string(&settings).unwrap();
settings_as_json.hash(&mut hasher);
}

for val in request.notebook_params.iter().flat_map(|z| z.values()) {
val.to_string().hash(&mut hasher);
}

for val in request.sql_params.iter().flat_map(|z| z.values()) {
val.to_string().hash(&mut hasher);
}

if request.pipeline_params.is_some() {
request
.pipeline_params
.clone()
.unwrap()
.full_refresh
.hash(&mut hasher);
}

request.dbt_commands.hash(&mut hasher);
request_as_json.hash(&mut hasher);

// Databricks docs state a 64 char limit for the idempotency token,
// so we can get away with coercing i64 to a string
Expand Down Expand Up @@ -223,6 +204,7 @@ impl RemoteAPIResource<Job> for DatabricksJob {
context: Arc<Context>,
) -> Pin<Box<dyn Future<Output = Result<(), DatabricksKubeError>> + Send>> {
let job_id = self.spec().job.job_id.clone();
let job_settings = self.spec().job.settings.clone();
let run_request = self.spec().run.clone();
let self_name = self.name_unchecked();

Expand All @@ -239,7 +221,7 @@ impl RemoteAPIResource<Job> for DatabricksJob {
job_id,
..run_request.unwrap()
};
run_request.idempotency_token = Some(Self::hash_run_request(&run_request).to_string());
run_request.idempotency_token = Some(Self::hash_run_request(&run_request, job_settings).to_string());

let triggered = default_api::jobs_run_now(&config, Some(run_request)).await?;

Expand Down

0 comments on commit 23401a2

Please sign in to comment.