Skip to content

Commit

Permalink
feature: tool init. Endpoint + k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
kekonen committed Jan 6, 2023
1 parent 58558d7 commit 32553b2
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
16 changes: 16 additions & 0 deletions tools/lifeguard/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "lifeguard"
version = "0.1.0"
edition = "2021"

[workspace]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
kube = { version = "0.78.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.17.0", features = ["v1_26"] }
futures = "0.3"

tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
warp = "0.3"
120 changes: 120 additions & 0 deletions tools/lifeguard/src/k8s.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::fmt;

use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube::api::DeleteParams;
use kube::core::WatchEvent;
use kube::{
api::{Api, ListParams, ResourceExt},
Client,
};

const ENVIRONMENT: &str = "dev";

#[derive(PartialEq, Debug)]
pub enum ResultPodRestartStatus {
Deleted,
Created,
Running,
Timeout,
NotFound,
None,
}

impl fmt::Display for ResultPodRestartStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ResultPodRestartStatus::Deleted => write!(f, "Deleted"),
ResultPodRestartStatus::Created => write!(f, "Created"),
ResultPodRestartStatus::Running => write!(f, "Running"),
ResultPodRestartStatus::Timeout => write!(f, "Timeout"),
ResultPodRestartStatus::NotFound => write!(f, "NotFound"),
ResultPodRestartStatus::None => write!(f, "None"),
}
}
}

pub struct K8S {
client: Client,
}

impl K8S {
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
Ok(K8S { client })
}

pub async fn kill_pod(
&self,
network: &str,
agent_name: &str,
) -> Result<ResultPodRestartStatus, Box<dyn std::error::Error>> {
let pods: Api<Pod> = Api::default_namespaced(self.client.clone());

let name = &format!("{}-{}-{}-0", ENVIRONMENT, network, agent_name);
println!("Supposed to kill this one: {}", name);

let pod = if let Some(pod) = pods.get_opt(name).await? {
pod
} else {
return Ok(ResultPodRestartStatus::NotFound);
};

println!("Found! -> {}", pod.name_any());
pods.delete(name, &DeleteParams::default()).await?;

let mut latest_status = ResultPodRestartStatus::None;

let lp = ListParams::default()
.fields(&format!("metadata.name={}", name))
.timeout(30);
let mut stream = pods.watch(&lp, "0").await?.boxed();

// TODO: clean this \/
let mut pod_deleted = false;
let mut pod_recreated = false;
let mut pod_running = false;
while let Some(event) = stream.try_next().await? {
match event {
WatchEvent::Added(pod) => {
if pod_deleted {
pod_recreated = true;
latest_status = ResultPodRestartStatus::Created;
}
println!("ADDED: {}", pod.name_any())
}
WatchEvent::Modified(pod) => {
println!(
"UPDATED: {}->{:?}",
pod.name_any(),
pod.status.as_ref().and_then(|s| s.phase.as_ref())
);

if pod.status.and_then(|status| status.phase).as_deref() == Some("Running")
&& (pod_recreated || pod_deleted)
// We probably don't really need `pod_recreated`
{
pod_running = true;
println!("RUNNING!");
latest_status = ResultPodRestartStatus::Running;
break;
}
}
WatchEvent::Deleted(pod) => {
pod_deleted = true;
latest_status = ResultPodRestartStatus::Deleted;
println!("DELETED: {}", pod.name_any())
}
WatchEvent::Error(e) => println!("ERROR: {} {} ({})", e.code, e.message, e.status),
_ => {}
};
}

if latest_status == ResultPodRestartStatus::None {
latest_status = ResultPodRestartStatus::Timeout;
}
println!("Done! -> {}", pod_running);

Ok(latest_status)
}
}
10 changes: 10 additions & 0 deletions tools/lifeguard/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
mod k8s;
mod server;

pub(crate) type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[tokio::main]
async fn main() -> Result<()> {
server::run_server().await?;
Ok(())
}
46 changes: 46 additions & 0 deletions tools/lifeguard/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::k8s::ResultPodRestartStatus;
use crate::k8s::K8S;
use std::sync::Arc;
use tokio::sync::Mutex;
use warp::Filter;
use warp::Rejection;

use warp::http::StatusCode;
use warp::reply::with_status;

fn with_k8s(
k8s: Arc<Mutex<K8S>>,
) -> impl Filter<Extract = (Arc<Mutex<K8S>>,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || k8s.clone())
}

async fn handler(
network: String,
agent: String,
k8s: Arc<Mutex<K8S>>,
) -> Result<impl warp::Reply, Rejection> {
let k8s = k8s.lock().await;
if let Ok(status) = k8s.kill_pod(&network, &agent).await {
let status_code = match status {
ResultPodRestartStatus::Running => StatusCode::OK,
ResultPodRestartStatus::NotFound => StatusCode::NOT_FOUND,
_ => StatusCode::GATEWAY_TIMEOUT,
};
Ok(with_status(status.to_string(), status_code))
} else {
Err(warp::reject::not_found())
}
}

pub async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
let k8s = K8S::new().await?;
let k8s = Arc::new(Mutex::new(k8s));

// Get ready for a ~30 sec timeout
let lifeguard = warp::path!("lifeguard" / String / String)
.and(with_k8s(k8s))
.and_then(handler);

warp::serve(lifeguard).run(([127, 0, 0, 1], 3030)).await;
Ok(())
}

0 comments on commit 32553b2

Please sign in to comment.