Skip to content

Commit

Permalink
Add support for cargo fmt (#242)
Browse files Browse the repository at this point in the history
* Add support for cargo fmt

* Fix ci.yaml on. Fix README.md
  • Loading branch information
koropets authored Oct 2, 2024
1 parent c41d57f commit 3435a80
Show file tree
Hide file tree
Showing 23 changed files with 416 additions and 335 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ on:
push:
branches:
- master
paths-ignore:
- '**.md'
pull_request:
types:
- opened
- synchronize
paths-ignore:
- '**.md'

jobs:
build-docker:
Expand Down Expand Up @@ -57,6 +64,26 @@ jobs:
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
fmt:
name: Check format ${{ matrix.rust }}
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}

- name: cargo fmt
run: |
cargo fmt --check
test:
name: Test ${{ matrix.rust }} on ${{ matrix.os }}
needs: build-docker
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "gordo-controller"
version = "2.1.2"
version = "2.1.3"
authors = ["Miles Granger <[email protected]>", "Serhii Koropets <[email protected]>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# gordo-controller
Gordo controller
[Gordo](https://github.com/equinor/gordo) controller

[![CI](https://github.com/equinor/gordo-controller/workflows/CI/badge.svg)](https://github.com/equinor/gordo-controller/actions)

Expand Down
5 changes: 1 addition & 4 deletions examples/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
assert!(resp.is_empty());

let body = reqwest::get("http://0.0.0.0:8888/metrics")
.await?
.text()
.await?;
let body = reqwest::get("http://0.0.0.0:8888/metrics").await?.text().await?;

assert!(body.contains("gordo_controller_http_requests_total"));

Expand Down
5 changes: 2 additions & 3 deletions src/crd/argo/argo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use kube::CustomResource;
use serde::{Deserialize, Serialize};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

// Origin here https://github.com/argoproj/argo/blob/master/pkg/apis/workflow/v1alpha1/workflow_types.go#L34
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
Expand Down Expand Up @@ -30,8 +30,7 @@ impl Default for ArgoWorkflowPhase {
#[kube(group = "argoproj.io", version = "v1alpha1", kind = "Workflow", namespaced)]
#[kube(shortname = "wf")]
#[kube(status = "ArgoWorkflowStatus")]
pub struct ArgoWorkflowSpec {
}
pub struct ArgoWorkflowSpec {}

#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
pub struct ArgoWorkflowStatus {
Expand Down
148 changes: 89 additions & 59 deletions src/crd/argo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,46 @@
pub mod argo;
pub use argo::*;

use log::{error, info, warn};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status};
use crate::crd::pod::{POD_MATCH_LABELS, FAILED};
use crate::crd::metrics::warning_happened;
use k8s_openapi::api::core::v1::ContainerStateTerminated;
use chrono::{DateTime, Utc};
use k8s_openapi::{
api::core::v1::Pod,
use crate::crd::model::{
patch_model_status, patch_model_with_default_status, Model, ModelPhase, ModelPodTerminatedStatus,
};
use crate::crd::pod::{FAILED, POD_MATCH_LABELS};
use chrono::{DateTime, Utc};
use k8s_openapi::api::core::v1::ContainerStateTerminated;
use k8s_openapi::api::core::v1::Pod;
use kube::api::Api;
use log::{error, info, warn};

pub const WF_MATCH_LABELS: &'static [&'static str] = &[
"applications.gordo.equinor.com/project-name",
"applications.gordo.equinor.com/project-revision",
"applications.gordo.equinor.com/project-name",
"applications.gordo.equinor.com/project-revision",
];

pub const WF_NUMBER_LABEL: &str = "applications.gordo.equinor.com/project-workflow";

fn some_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec<ArgoWorkflowPhase>) -> bool {
workflows.iter()
.any(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
workflows.iter().any(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
}

fn all_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec<ArgoWorkflowPhase>) -> bool {
workflows.iter()
.all(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
workflows.iter().all(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
}

fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec<&'a Workflow> {
//TODO for performance reason we supposed to reimplement this algorithm with BTreeMap
//TODO for performance reason we supposed to reimplement this algorithm with BTreeMap
workflows
.iter()
.filter(|workflow| {
Expand Down Expand Up @@ -75,25 +73,23 @@ fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec<

fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec<Pod>) -> Vec<&'a ContainerStateTerminated> {
pods.iter()
.filter(|pod| {
match &pod.status {
Some(status) => match &status.phase {
Some(phase) => phase == FAILED,
None => false,
},
.filter(|pod| match &pod.status {
Some(status) => match &status.phase {
Some(phase) => phase == FAILED,
None => false,
}
},
None => false,
})
.filter(|pod| {
let pod_labels = &pod.metadata.labels;
let model_labels = &model.metadata.labels;
POD_MATCH_LABELS
.iter()
.all(|&label_name| {
match (model_labels, pod_labels) {
(Some(model_labels), Some(pod_labels)) => model_labels.get(label_name) == pod_labels.get(label_name),
_ => false,
.all(|&label_name| match (model_labels, pod_labels) {
(Some(model_labels), Some(pod_labels)) => {
model_labels.get(label_name) == pod_labels.get(label_name)
}
_ => false,
})
})
.flat_map(|pod| pod.status.as_ref())
Expand All @@ -104,10 +100,13 @@ fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec<Pod>) ->
.collect()
}

fn last_container_terminated_status(terminated_statuses: Vec<&ContainerStateTerminated>) -> Option<&ContainerStateTerminated> {
fn last_container_terminated_status(
terminated_statuses: Vec<&ContainerStateTerminated>,
) -> Option<&ContainerStateTerminated> {
if terminated_statuses.len() > 0 {
let min_date_time = DateTime::<Utc>::MIN_UTC.clone();
let last_terminated_state_ind = terminated_statuses.iter()
let last_terminated_state_ind = terminated_statuses
.iter()
.enumerate()
.max_by_key(|(_, terminated_state)| match &terminated_state.finished_at {
Some(finished_at) => finished_at.0,
Expand All @@ -129,7 +128,7 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
None => {
warn!("Model labels field is empty");
continue;
},
}
};
let model_name = match &model.metadata.name {
Some(model_name) => model_name,
Expand All @@ -139,17 +138,27 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
}
};
match &model.status {
Some(model_status) => {
let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) {
Some(model_status) => {
let is_reapplied_model = match (
&model_status.revision,
labels.get("applications.gordo.equinor.com/project-revision"),
) {
(Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision,
_ => false,
};
if !is_reapplied_model {
if !is_reapplied_model {
match &model_status.phase {
ModelPhase::InProgress | ModelPhase::Unknown => {
let found_workflows = find_model_workflows(&model, &workflows);
let mut new_model_phase: Option<ModelPhase> = None;
if some_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Error, ArgoWorkflowPhase::Failed, ArgoWorkflowPhase::Skipped]) {
if some_of_workflows_in_phases(
&found_workflows,
vec![
ArgoWorkflowPhase::Error,
ArgoWorkflowPhase::Failed,
ArgoWorkflowPhase::Skipped,
],
) {
new_model_phase = Some(ModelPhase::Failed);
} else if all_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Succeeded]) {
new_model_phase = Some(ModelPhase::Succeeded);
Expand All @@ -162,22 +171,31 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
if let Some(model_name) = labels.get("applications.gordo.equinor.com/model-name") {
let terminated_statuses = failed_pods_terminated_statuses(&model, &pods);
info!("Found {} failed pods in terminated status which is relates to the model '{}'", terminated_statuses.len(), model_name);
if let Some(terminated_status) = last_container_terminated_status(terminated_statuses) {
if let Some(terminated_status) =
last_container_terminated_status(terminated_statuses)
{
new_model_status.code = Some(terminated_status.exit_code);
if let Some(message) = &terminated_status.message {
let trimmed_message = message.trim_end();
if !trimmed_message.is_empty() {
let result: serde_json::Result<ModelPodTerminatedStatus> = serde_json::from_str(&trimmed_message);
let result: serde_json::Result<ModelPodTerminatedStatus> =
serde_json::from_str(&trimmed_message);
match result {
Ok(terminated_status_message) => {
info!("Last terminated status message {:?} for model '{}'", terminated_status_message, model_name);
new_model_status.error_type = terminated_status_message.error_type.clone();
new_model_status.message = terminated_status_message.message.clone();
new_model_status.traceback = terminated_status_message.traceback.clone();
},
info!(
"Last terminated status message {:?} for model '{}'",
terminated_status_message, model_name
);
new_model_status.error_type =
terminated_status_message.error_type.clone();
new_model_status.message =
terminated_status_message.message.clone();
new_model_status.traceback =
terminated_status_message.traceback.clone();
}
Err(err) => {
warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err);
warning_happened("parse_terminated_message")
warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err);
warning_happened("parse_terminated_message")
}
}
}
Expand All @@ -188,29 +206,41 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
if model_phase != model_status.phase {
match patch_model_status(&model_api, &model_name, &new_model_status).await {
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to {:?}", model_name, model.status, new_model.status);
info!(
"Patching Model '{}' from status {:?} to {:?}",
model_name, model.status, new_model.status
);
}
Err(err) => {
error!( "Failed to patch status of Model '{}' - error: {:?}", model_name, err);
error!(
"Failed to patch status of Model '{}' - error: {:?}",
model_name, err
);
}
}
}
}
},
}
_ => (),
}
} else {
match patch_model_with_default_status(&model_api, &model).await {
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to default status {:?}", model_name, model.status, new_model.status);
info!(
"Patching Model '{}' from status {:?} to default status {:?}",
model_name, model.status, new_model.status
);
}
Err(err) => {
error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model_name, err);
error!(
"Failed to patch status of Model '{}' with default status - error: {:?}",
model_name, err
);
}
}
}
}
_ => (),
};
}
}
}
Loading

0 comments on commit 3435a80

Please sign in to comment.