Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
RWDai committed May 8, 2024
1 parent 97d666f commit 3f195d0
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 14 deletions.
2 changes: 1 addition & 1 deletion binary/spacegate/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use spacegate_shell::BoxError;
mod args;
fn main() -> Result<(), BoxError> {
// todo: more subscriber required
// TODO: more subscriber required
tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).init();
let args = args::Args::parse();
if let Some(plugins) = args.plugins {
Expand Down
6 changes: 3 additions & 3 deletions crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ k8s = [
"k8s-gateway-api",
"schemars",
"spacegate-model/ext-k8s",
"rand",
]
[dependencies]
spacegate-model = { path = "../model" }
Expand All @@ -53,9 +54,7 @@ k8s-gateway-api = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
lazy_static.workspace = true

notify = { workspace = true, features = [
"macos_kqueue",
], optional = true }
notify = { workspace = true, features = ["macos_kqueue"], optional = true }

chrono = { version = "0.4" }

Expand All @@ -65,6 +64,7 @@ redis = { version = "0.24", features = [
], optional = true }
deadpool-redis = { version = "0.14", optional = true }
lru = { version = "0.12.0", optional = true }
rand = { version = "*", optional = true }

[dev-dependencies]
reqwest = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/config/src/service/config_format/yaml.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
// todo
// TODO
4 changes: 2 additions & 2 deletions crates/config/src/service/k8s/convert/route_k8s_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub(crate) trait SgHttpRouteMatchConv {
}
impl SgHttpRouteMatchConv for SgHttpRouteMatch {
fn into_kube_httproute(self) -> (Vec<HttpRouteMatch>, Vec<HttpRouteFilter>) {
// todo: not complete
// TODO: not complete
let (match_vec, plugins) = if let Some(method_vec) = self.method {
method_vec
.into_iter()
Expand Down Expand Up @@ -257,7 +257,7 @@ impl SgHttpRouteMatchConv for SgHttpRouteMatch {
(
vec![HttpRouteMatch {
path: path,
//todo
//TODO
headers: None,
query_params: self.query.map(|q_vec| q_vec.into_iter().map(|q| q.into_kube_httproute()).collect::<Vec<_>>()),
method: None,
Expand Down
34 changes: 32 additions & 2 deletions crates/config/src/service/k8s/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use k8s_openapi::api::core::v1::Service;
use k8s_openapi::api::{
apps::v1::DaemonSet,
core::v1::{Pod, Service},
};
use kube::{api::ListParams, Api, ResourceExt};
use rand::Rng as _;
use spacegate_model::{BackendHost, BoxResult, K8sServiceData};

use crate::service::Discovery;
Expand All @@ -8,7 +12,33 @@ use super::K8s;

impl Discovery for K8s {
async fn api_url(&self) -> BoxResult<Option<String>> {
todo!()
// TODO Start from GatewayClass and look down, and read api port from GatewayClass
let pod_api: Api<Pod> = self.get_namespace_api();
let ds_api: Api<DaemonSet> = self.get_namespace_api();
let dss = ds_api.list(&ListParams::default()).await?;
let pods = pod_api.list(&ListParams::default()).await?;

let mut pods = pods.items;
pods.retain(|p| {
for owner_ref in p.owner_references() {
for ds in &dss {
if owner_ref.uid == ds.uid().unwrap_or_default() && owner_ref.name == ds.name_any() {
return true;
}
}
}
false
});

if pods.is_empty() {
return Ok(None);
}
let index = rand::thread_rng().gen_range(0..pods.len());
let rand_pod = pods.get(index).expect("pods should not be empty");
if let Some(host_ip) = rand_pod.status.clone().and_then(|s| s.host_ip) {
return Ok(Some(format!("{host_ip}:{}", spacegate_model::constants::DEFAULT_API_PORT)));
};
Ok(None)
}

async fn backends(&self) -> BoxResult<Vec<BackendHost>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/config/src/service/k8s/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ impl CreateListener for K8s {
async fn create_listener(&self) -> BoxResult<(Config, Box<dyn Listen>)> {
let (evt_tx, evt_rx) = tokio::sync::mpsc::unbounded_channel();

self.reject_gateway_class(constants::GATEWAY_CLASS_NAME).await?;
let config = self.retrieve_config().await?;
self.accept_gateway_class(constants::GATEWAY_CLASS_NAME).await?;

let gateway_api: Api<Gateway> = self.get_namespace_api();
let http_route_api: Api<HttpRoute> = self.get_namespace_api();
Expand Down
74 changes: 72 additions & 2 deletions crates/config/src/service/k8s/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::sync::Arc;

use k8s_openapi::NamespaceResourceScope;
use k8s_gateway_api::{GatewayClass, GatewayClassStatus};
use k8s_openapi::{apimachinery::pkg::apis::meta::v1::Condition, NamespaceResourceScope};
use spacegate_model::BoxResult;

pub mod convert;
pub mod create;
pub mod delete;
pub mod discovery;
// todo check listen
// TODO check listen
pub mod listen;
pub mod retrieve;
pub mod update;
Expand Down Expand Up @@ -44,4 +46,72 @@ impl K8s {
{
kube::Api::namespaced(self.client.clone(), &self.namespace)
}

pub(crate) async fn accept_gateway_class(&self, name: &str) -> BoxResult<()> {
let condition = Condition {
last_transition_time: k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
message: "Accepted".to_string(),
reason: "".to_string(),
status: "True".to_string(),
type_: "Accepted".to_string(),
observed_generation: None,
};
let gateway_class_api: kube::Api<GatewayClass> = self.get_all_api();
let mut gateway_class = gateway_class_api.get_status(name).await?;
gateway_class.status = if let Some(mut status) = gateway_class.status {
status.conditions = if let Some(mut conditions) = status.conditions {
for condition in &conditions {
if condition.status == "True" && condition.type_ == "Accepted" {
return Ok(());
}
}
conditions.push(condition);
Some(conditions)
} else {
Some(vec![condition])
};

Some(status)
} else {
Some(GatewayClassStatus {
conditions: Some(vec![condition]),
})
};
gateway_class_api.replace_status(name, &kube::api::PostParams::default(), serde_json::to_vec(&gateway_class)?).await?;
Ok(())
}

pub(crate) async fn reject_gateway_class(&self, name: &str) -> BoxResult<()> {
let condition = Condition {
last_transition_time: k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
message: "Load config or refresh config,waiting for complete".to_string(),
reason: "".to_string(),
status: "False".to_string(),
type_: "Accepted".to_string(),
observed_generation: None,
};
let gateway_class_api: kube::Api<GatewayClass> = self.get_all_api();
let mut gateway_class = gateway_class_api.get_status(name).await?;
gateway_class.status = if let Some(mut status) = gateway_class.status {
status.conditions = if let Some(mut conditions) = status.conditions {
for condition in conditions {
if condition.status == "False" && condition.type_ == "Accepted" {
return Ok(());
}
}
conditions = vec![condition];
Some(conditions)
} else {
Some(vec![condition])
};

Some(status)
} else {
Some(GatewayClassStatus {
conditions: Some(vec![condition]),
})
};
gateway_class_api.replace_status(name, &kube::api::PostParams::default(), serde_json::to_vec(&gateway_class)?).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/config/src/service/k8s/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl K8s {
Ok(())
}

// TODO remove
// pub(crate) async fn update_filter_changes(&self, old: Vec<SgSingeFilter>, update: Vec<SgSingeFilter>) -> BoxResult<()> {
// if old.is_empty() && update.is_empty() {
// return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion crates/kernel/src/backend_service/static_file_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub async fn static_file_service(mut request: SgRequest, dir: &Path) -> SgRespon
}
let cache_this = cache_policy(&metadata);
if cache_this {
// todo: cache
// TODO: cache
}
}
let mut buffer = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/model/src/ext/k8s/crd/http_spaceroute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl HttpSpaceroute {
}
}

// // todo replace kernel::config::config_by_k8s::get_http_spaceroute_by_api
// // TODO replace kernel::config::config_by_k8s::get_http_spaceroute_by_api
// pub async fn get_http_spaceroute_by_api(
// gateway_uniques: &[String],
// (http_spaceroute_api, http_route_api): (&Api<HttpSpaceroute>, &Api<HttpRoute>),
Expand Down
4 changes: 3 additions & 1 deletion resource/kube-manifests/spacegate-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ spec:
imagePullPolicy: IfNotPresent
env:
- name: spacegate_ns
value: spacegate
valueFrom:
fieldRef:
fieldPath: metadata.namespace
# - name: RUST_LOG
# value: trace,hyper=error,tower=off,mio=error,kube_client=error

0 comments on commit 3f195d0

Please sign in to comment.