Skip to content

Commit

Permalink
Fix resource duplication bug, refactor glue to OpenAPI SDK error stru…
Browse files Browse the repository at this point in the history
…cts (#42)

* 1/2, greatly refactor error handling

* 2/2 fix apierror usage in test -- return fake 404 with actual status is an improvement
remove tests for deprecated api-owned resources

* for creation case have an explicit IDUnsetError
get rid of last of owner cruft + every_reconcile_owned -> every_reconcile

* update readme, bump version

* remove dead test, use IDUnsetError in fake crd impl

* fmt

* bump chart
  • Loading branch information
mach-kernel authored Jan 11, 2024
1 parent 689e33e commit 15f9655
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 466 deletions.
36 changes: 19 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 37 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,52 @@ EOF

### Usage

See the examples directory for samples of Databricks CRDs. Resources that are created via Kubernetes are owned by the operator: your checked-in manifests are the source of truth. It will not sync anything other than status back from the API, and overwrite changes made by users from the Databricks webapp.

You may provide the `databricks-operator/owner` annotation as shown below (to be explicit). However, all resources created in Kube first (i.e. no associated API object found) are assumed to be owned by the operator.
See the examples directory for samples of Databricks CRDs. Resources that are created via Kubernetes are owned by the operator: your checked-in manifests are the source of truth.

```yaml
apiVersion: com.dstancu.databricks/v1
kind: DatabricksJob
metadata:
name: my-super-cool-job
name: my-word-count
namespace: default
annotations:
databricks-operator/owner: operator
spec:
job:
settings:
email_notifications:
no_alert_for_skipped_runs: false
format: MULTI_TASK
job_clusters:
- job_cluster_key: word-count-cluster
new_cluster:
...
max_concurrent_runs: 1
name: my-word-count
git_source:
git_branch: misc-and-docs
git_provider: gitHub
git_url: https://github.com/mach-kernel/databricks-kube-operator
tasks:
- email_notifications: {}
job_cluster_key: word-count-cluster
notebook_task:
notebook_path: examples/job.py
source: GIT
task_key: my-word-count
timeout_seconds: 0
timeout_seconds: 0
```
It is also possible to set a resource's owner to `api`, which will update the Kubernetes resource as it changes on Databricks.
Changes made by users in the Databricks webapp will be overwritten by the operator if drift is detected:
```yaml
apiVersion: com.dstancu.databricks/v1
kind: DatabricksJob
metadata:
annotations:
databricks-operator/owner: api
generation: 1
name: hello-world
...
```
[2024-01-11T14:20:40Z INFO databricks_kube::traits::remote_api_resource] Resource DatabricksJob my-word-count drifted!
Diff (remote, kube):
json atoms at path ".settings.tasks[0].notebook_task.notebook_path" are not equal:
lhs:
"examples/job_oops_is_this_right.py"
rhs:
"examples/job.py"
[2024-01-11T14:20:40Z INFO databricks_kube::traits::remote_api_resource] Resource DatabricksJob my-word-count reconciling drift...
```

Look at jobs (allowed to be viewed by the operator's access token):
Expand Down Expand Up @@ -217,7 +238,6 @@ Want to add support for a new API? Provided it has an OpenAPI definition, these
* Download API definition into `openapi/` and make a [Rust generator configuration](https://openapi-generator.tech/docs/generators/rust/) (feel free to copy the others and change name)
* Generate the SDK, add it to the Cargo workspace and dependencies for `databricks-kube/`
* Implement `RestConfig<TSDKConfig>` for your new client
* Implement `From<TSDKAPIError<E>>` for `DatabricksKubeError`
* Define the new CRD Spec type ([follow kube-rs tutorial](https://kube.rs/getting-started/))
* `impl RemoteAPIResource<TAPIResource> for MyNewCRD`
* `impl StatusAPIResource<TStatusType> for MyNewCRD` and [specify `TStatusType` in your CRD](https://github.com/kube-rs/kube/blob/main/examples/crd_derive.rs#L20)
Expand Down
4 changes: 2 additions & 2 deletions charts/databricks-kube-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
appVersion: 0.5.1
appVersion: 0.6.0
name: databricks-kube-operator
description: A kube-rs operator for managing Databricks API resources
version: 0.5.5
version: 0.6.0

home: https://github.com/mach-kernel/databricks-kube-operator
sources:
Expand Down
7 changes: 6 additions & 1 deletion databricks-kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ path = "src/crdgen.rs"
[package]
name = "databricks_kube"
default-run = "databricks_kube"
version = "0.5.1"
version = "0.6.0"
edition = "2021"

[dependencies]
Expand All @@ -31,6 +31,11 @@ schemars = { version = "0.8.11", features = ["derive"] }
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] }
tokio-graceful-shutdown = "0.11.1"
tokio-stream = "0.1.11"
thiserror = "1.0.56"

[dependencies.reqwest]
version = "^0.11"
features = ["json", "multipart"]

[dev-dependencies]
tower-test = "0.4.0"
Expand Down
46 changes: 22 additions & 24 deletions databricks-kube/src/crds/databricks_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use crate::{
traits::rest_config::RestConfig,
};

use databricks_rust_jobs::models::{
JobsRunsList200Response, RunLifeCycleState, RunState,
};
use databricks_rust_jobs::models::{JobsRunsList200Response, RunLifeCycleState, RunState};
use databricks_rust_jobs::{
apis::default_api,
models::{
Expand All @@ -36,8 +34,6 @@ pub struct DatabricksJobStatus {
pub latest_run_state: Option<RunState>,
}

// TODO: We added `NO_RUNS` to `RunLifeCycleState` because
// it was the laziest way to surface this, but should probably revisit
impl Default for DatabricksJobStatus {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -180,7 +176,7 @@ impl RemoteAPIResource<Job> for DatabricksJob {
}

#[allow(irrefutable_let_patterns)]
fn every_reconcile_owned(
fn every_reconcile(
&self,
_: Arc<Context>,
) -> Pin<Box<dyn Future<Output = Result<(), DatabricksKubeError>> + Send>> {
Expand All @@ -191,13 +187,13 @@ impl RemoteAPIResource<Job> for DatabricksJob {
&self,
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<Job, DatabricksKubeError>> + Send>> {
let job_id = self.spec().job.job_id;
let job_id = self
.spec()
.job
.job_id
.ok_or(DatabricksKubeError::IDUnsetError);

try_stream! {
if job_id.is_none() {
yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?;
}

let config = Job::get_rest_config(context.clone()).await.unwrap();

let JobsGet200Response {
Expand All @@ -206,7 +202,7 @@ impl RemoteAPIResource<Job> for DatabricksJob {
settings,
created_time,
..
} = default_api::jobs_get(&config, job_id.unwrap()).await?;
} = default_api::jobs_get(&config, job_id?).await?;

yield Job {
job_id,
Expand Down Expand Up @@ -273,13 +269,14 @@ impl RemoteAPIResource<Job> for DatabricksJob {
{
let job = self.spec().job.clone();
let job_settings = job.settings.as_ref().cloned();
let job_id = self.spec().job.job_id;

try_stream! {
if job_id.is_none() {
yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?;
}
let job_id = self
.spec()
.job
.job_id
.ok_or(DatabricksKubeError::IDUnsetError);

try_stream! {
let config = Job::get_rest_config(context.clone()).await.unwrap();

default_api::jobs_update(
Expand All @@ -288,7 +285,7 @@ impl RemoteAPIResource<Job> for DatabricksJob {
/// TODO: unsupported atm
// access_control_list: job.access_control_list
JobsUpdateRequest {
job_id: job_id.unwrap(),
job_id: job_id?,
new_settings: job_settings,
..JobsUpdateRequest::default()
}
Expand All @@ -305,17 +302,18 @@ impl RemoteAPIResource<Job> for DatabricksJob {
&self,
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<(), DatabricksKubeError>> + Send + '_>> {
let job_id = self.spec().job.job_id;
let job_id = self
.spec()
.job
.job_id
.ok_or(DatabricksKubeError::IDUnsetError);

try_stream! {
if job_id.is_none() {
yield Err(DatabricksKubeError::APIError("Resource does not exist".to_string()))?;
}

let config = Job::get_rest_config(context.clone()).await.unwrap();

default_api::jobs_delete(
&config,
JobsDeleteRequest { job_id: job_id.unwrap(), }
JobsDeleteRequest { job_id: job_id?, }
).await?;

yield ()
Expand Down
Loading

0 comments on commit 15f9655

Please sign in to comment.