From 32553b2af2850678af662b1faf5a427975499447 Mon Sep 17 00:00:00 2001 From: Daniil Naumetc Date: Fri, 6 Jan 2023 22:54:48 +0100 Subject: [PATCH] feature: tool init. Endpoint + k8s --- tools/lifeguard/Cargo.toml | 16 +++++ tools/lifeguard/src/k8s.rs | 120 ++++++++++++++++++++++++++++++++++ tools/lifeguard/src/main.rs | 10 +++ tools/lifeguard/src/server.rs | 46 +++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 tools/lifeguard/Cargo.toml create mode 100644 tools/lifeguard/src/k8s.rs create mode 100644 tools/lifeguard/src/main.rs create mode 100644 tools/lifeguard/src/server.rs diff --git a/tools/lifeguard/Cargo.toml b/tools/lifeguard/Cargo.toml new file mode 100644 index 00000000..e1016310 --- /dev/null +++ b/tools/lifeguard/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "lifeguard" +version = "0.1.0" +edition = "2021" + +[workspace] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +kube = { version = "0.78.0", features = ["runtime", "derive"] } +k8s-openapi = { version = "0.17.0", features = ["v1_26"] } +futures = "0.3" + +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +warp = "0.3" diff --git a/tools/lifeguard/src/k8s.rs b/tools/lifeguard/src/k8s.rs new file mode 100644 index 00000000..6124e2b9 --- /dev/null +++ b/tools/lifeguard/src/k8s.rs @@ -0,0 +1,120 @@ +use std::fmt; + +use futures::{StreamExt, TryStreamExt}; +use k8s_openapi::api::core::v1::Pod; +use kube::api::DeleteParams; +use kube::core::WatchEvent; +use kube::{ + api::{Api, ListParams, ResourceExt}, + Client, +}; + +const ENVIRONMENT: &str = "dev"; + +#[derive(PartialEq, Debug)] +pub enum ResultPodRestartStatus { + Deleted, + Created, + Running, + Timeout, + NotFound, + None, +} + +impl fmt::Display for ResultPodRestartStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ResultPodRestartStatus::Deleted => write!(f, "Deleted"), + ResultPodRestartStatus::Created => write!(f, "Created"), + ResultPodRestartStatus::Running => write!(f, "Running"), + ResultPodRestartStatus::Timeout => write!(f, "Timeout"), + ResultPodRestartStatus::NotFound => write!(f, "NotFound"), + ResultPodRestartStatus::None => write!(f, "None"), + } + } +} + +pub struct K8S { + client: Client, +} + +impl K8S { + pub async fn new() -> Result> { + let client = Client::try_default().await?; + Ok(K8S { client }) + } + + pub async fn kill_pod( + &self, + network: &str, + agent_name: &str, + ) -> Result> { + let pods: Api = Api::default_namespaced(self.client.clone()); + + let name = &format!("{}-{}-{}-0", ENVIRONMENT, network, agent_name); + println!("Supposed to kill this one: {}", name); + + let pod = if let Some(pod) = pods.get_opt(name).await? { + pod + } else { + return Ok(ResultPodRestartStatus::NotFound); + }; + + println!("Found! -> {}", pod.name_any()); + pods.delete(name, &DeleteParams::default()).await?; + + let mut latest_status = ResultPodRestartStatus::None; + + let lp = ListParams::default() + .fields(&format!("metadata.name={}", name)) + .timeout(30); + let mut stream = pods.watch(&lp, "0").await?.boxed(); + + // TODO: clean this \/ + let mut pod_deleted = false; + let mut pod_recreated = false; + let mut pod_running = false; + while let Some(event) = stream.try_next().await? { + match event { + WatchEvent::Added(pod) => { + if pod_deleted { + pod_recreated = true; + latest_status = ResultPodRestartStatus::Created; + } + println!("ADDED: {}", pod.name_any()) + } + WatchEvent::Modified(pod) => { + println!( + "UPDATED: {}->{:?}", + pod.name_any(), + pod.status.as_ref().and_then(|s| s.phase.as_ref()) + ); + + if pod.status.and_then(|status| status.phase).as_deref() == Some("Running") + && (pod_recreated || pod_deleted) + // We probably don't really need `pod_recreated` + { + pod_running = true; + println!("RUNNING!"); + latest_status = ResultPodRestartStatus::Running; + break; + } + } + WatchEvent::Deleted(pod) => { + pod_deleted = true; + latest_status = ResultPodRestartStatus::Deleted; + println!("DELETED: {}", pod.name_any()) + } + WatchEvent::Error(e) => println!("ERROR: {} {} ({})", e.code, e.message, e.status), + _ => {} + }; + } + + if latest_status == ResultPodRestartStatus::None { + latest_status = ResultPodRestartStatus::Timeout; + } + println!("Done! -> {}", pod_running); + + Ok(latest_status) + } +} diff --git a/tools/lifeguard/src/main.rs b/tools/lifeguard/src/main.rs new file mode 100644 index 00000000..744dac4a --- /dev/null +++ b/tools/lifeguard/src/main.rs @@ -0,0 +1,10 @@ +mod k8s; +mod server; + +pub(crate) type Result = std::result::Result>; + +#[tokio::main] +async fn main() -> Result<()> { + server::run_server().await?; + Ok(()) +} diff --git a/tools/lifeguard/src/server.rs b/tools/lifeguard/src/server.rs new file mode 100644 index 00000000..9adf56a3 --- /dev/null +++ b/tools/lifeguard/src/server.rs @@ -0,0 +1,46 @@ +use crate::k8s::ResultPodRestartStatus; +use crate::k8s::K8S; +use std::sync::Arc; +use tokio::sync::Mutex; +use warp::Filter; +use warp::Rejection; + +use warp::http::StatusCode; +use warp::reply::with_status; + +fn with_k8s( + k8s: Arc>, +) -> impl Filter>,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || k8s.clone()) +} + +async fn handler( + network: String, + agent: String, + k8s: Arc>, +) -> Result { + let k8s = k8s.lock().await; + if let Ok(status) = k8s.kill_pod(&network, &agent).await { + let status_code = match status { + ResultPodRestartStatus::Running => StatusCode::OK, + ResultPodRestartStatus::NotFound => StatusCode::NOT_FOUND, + _ => StatusCode::GATEWAY_TIMEOUT, + }; + Ok(with_status(status.to_string(), status_code)) + } else { + Err(warp::reject::not_found()) + } +} + +pub async fn run_server() -> Result<(), Box> { + let k8s = K8S::new().await?; + let k8s = Arc::new(Mutex::new(k8s)); + + // Get ready for a ~30 sec timeout + let lifeguard = warp::path!("lifeguard" / String / String) + .and(with_k8s(k8s)) + .and_then(handler); + + warp::serve(lifeguard).run(([127, 0, 0, 1], 3030)).await; + Ok(()) +}