From 835c36d0618cc118059a7bed7dce9ca88081fae2 Mon Sep 17 00:00:00 2001 From: RWDai <27391645+RWDai@users.noreply.github.com> Date: Mon, 20 Nov 2023 18:31:05 +0800 Subject: [PATCH] remove admin k8s feature(in progress) --- admin/Cargo.toml | 19 +- admin/src/api.rs | 1 + admin/src/api/spacegate_manage_api.rs | 38 +++ admin/src/client.rs | 28 ++ admin/src/config.rs | 242 +++++++++++++++--- admin/src/initializer.rs | 1 + admin/src/main.rs | 2 +- admin/src/model/vo.rs | 2 +- admin/src/model/vo/k8s_cluster_vo.rs | 1 - admin/src/model/vo/spacegate_inst_vo.rs | 75 ++++++ admin/src/service.rs | 1 + admin/src/service/base_service.rs | 133 +--------- admin/src/service/gateway_service.rs | 110 +------- admin/src/service/plugin_service.rs | 38 ++- admin/src/service/route_service.rs | 28 +- admin/src/service/secret_service.rs | 15 +- admin/src/service/spacegate_manage_service.rs | 9 + kernel-common/Cargo.toml | 5 +- kernel-common/src/client.rs | 80 ++++++ .../src/converter/gateway_k8s_conv.rs | 1 + kernel-common/src/helper/k8s_helper.rs | 27 +- kernel-common/src/lib.rs | 1 + kernel/Cargo.toml | 8 +- 23 files changed, 536 insertions(+), 329 deletions(-) create mode 100644 admin/src/api/spacegate_manage_api.rs create mode 100644 admin/src/client.rs delete mode 100644 admin/src/model/vo/k8s_cluster_vo.rs create mode 100644 admin/src/model/vo/spacegate_inst_vo.rs create mode 100644 admin/src/service/spacegate_manage_service.rs create mode 100644 kernel-common/src/client.rs diff --git a/admin/Cargo.toml b/admin/Cargo.toml index b03e431f..eab2a1b6 100644 --- a/admin/Cargo.toml +++ b/admin/Cargo.toml @@ -13,16 +13,19 @@ edition.workspace = true readme = "./README.md" [features] -k8s = ["kube", "k8s-openapi", "k8s-gateway-api","kernel-common/k8s"] -doc=["tardis/openapi-rapidoc"] +doc = ["tardis/openapi-rapidoc"] [dependencies] -tardis = { workspace = true ,features = ["web-server","crypto"]} +tardis = { workspace = true, features = ["web-server", "crypto", "cache"] } serde.workspace = true serde_json.workspace = true -kernel-common={path = "../kernel-common",features = ["admin-support"]} -kube = { workspace = true, optional = true } -k8s-openapi = { workspace = true, optional = true } -k8s-gateway-api = { workspace = true, optional = true } -lazy_static = "1.4.0" \ No newline at end of file +secrecy = { version = "0.8.0", features = [ + "alloc", + "serde", +] } +kernel-common = { path = "../kernel-common", features = ["admin-support", "k8s","cache"] } +kube = { workspace = true } +k8s-openapi = { workspace = true } +k8s-gateway-api = { workspace = true } +lazy_static = { workspace = true } \ No newline at end of file diff --git a/admin/src/api.rs b/admin/src/api.rs index e906965f..6344fb94 100644 --- a/admin/src/api.rs +++ b/admin/src/api.rs @@ -3,3 +3,4 @@ pub(crate) mod gateway_api; pub(crate) mod plugin_api; pub(crate) mod route_api; pub(crate) mod tls_api; +pub(crate) mod spacegate_manage_api; diff --git a/admin/src/api/spacegate_manage_api.rs b/admin/src/api/spacegate_manage_api.rs new file mode 100644 index 00000000..6e49c7f6 --- /dev/null +++ b/admin/src/api/spacegate_manage_api.rs @@ -0,0 +1,38 @@ +use crate::model::vo::spacegate_inst_vo::InstConfigVo; +use tardis::web::poem_openapi; +use tardis::web::poem_openapi::param::Query; +use tardis::web::poem_openapi::payload::Json; +use tardis::web::web_resp::{TardisApiResult, TardisResp, Void}; + +#[derive(Clone, Default)] +pub struct SpacegateManageApi; + +#[poem_openapi::OpenApi(prefix_path = "/spacegate/manage")] +impl SpacegateManageApi { + /// List Spacegate Inst + #[oai(path = "/", method = "get")] + async fn list(&self) -> TardisApiResult> { + SpacegateManageService::list().await?; + } + + /// Add Spacegate Inst + #[oai(path = "/", method = "post")] + async fn add(&self, add: Json) -> TardisApiResult { + // HttpRouteVoService::add(add.0).await?; + TardisResp::ok(Void {}) + } + + /// Update Spacegate Inst + #[oai(path = "/", method = "put")] + async fn update(&self, backend: Json) -> TardisApiResult { + // HttpRouteVoService::update(backend.0).await?; + TardisResp::ok(Void {}) + } + + /// Delete Spacegate Inst + #[oai(path = "/", method = "delete")] + async fn delete(&self, name: Query) -> TardisApiResult { + // HttpRouteVoService::delete(&name.0).await?; + TardisResp::ok(Void {}) + } +} diff --git a/admin/src/client.rs b/admin/src/client.rs new file mode 100644 index 00000000..1d374c8a --- /dev/null +++ b/admin/src/client.rs @@ -0,0 +1,28 @@ +use crate::config::k8s_config::ToKubeconfig; +use crate::model::vo::spacegate_inst_vo::K8sClusterConfig; +use kube::config::Kubeconfig; +use lazy_static::lazy_static; +use std::mem; +use std::sync::RwLock; +use tardis::basic::error::TardisError; +use tardis::basic::result::TardisResult; + +lazy_static! { + ///Total kube config + pub static ref KUBECONFIG: RwLock = RwLock::default(); +} + +pub fn add_k8s_config(config: K8sClusterConfig) -> TardisResult<()> { + for _ in 0..100 { + if let Ok(mut kube_config) = KUBECONFIG.try_write() { + let mut swap_config = Kubeconfig::default(); + mem::swap(&mut swap_config, &mut kube_config); + swap_config = swap_config.merge(config.to_kubeconfig()).map_err(|e| TardisError::wrap(&format!("kubeconfig parse error:{e}"), ""))?; + + mem::swap(&mut swap_config, &mut kube_config); + return Ok(()); + }; + } + + Err(TardisError::conflict("[SG.admin] add config: get lock time out", "")) +} diff --git a/admin/src/config.rs b/admin/src/config.rs index d71dc0b5..4290e3e6 100644 --- a/admin/src/config.rs +++ b/admin/src/config.rs @@ -1,9 +1,24 @@ +use crate::config::k8s_config::K8sConfig; +use serde::Deserializer; +use tardis::config::config_dto::CacheConfig; use tardis::serde::{Deserialize, Serialize}; #[derive(Default, Debug, Serialize, Deserialize)] #[serde(default)] pub struct SpacegateAdminConfig { - #[cfg(feature = "k8s")] + pub cache_config: Option, + + #[serde(flatten)] + pub kube_config: AdminK8sConfig, +} + +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct AdminK8sConfig { + /// If enable is true , then must use k8s configmap. + /// Otherwise , just try k8s. + pub enable: bool, + pub k8s_config: Option, /// # `KUBECONFIG` /// @@ -18,41 +33,198 @@ pub struct SpacegateAdminConfig { /// ```cmd /// set KUBECONFIG=:\path\to\your\config /// ``` - #[cfg(feature = "k8s")] pub kube_config: Option, } -#[cfg(feature = "k8s")] -#[derive(Default, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct K8sConfig { - /// The configured cluster url - pub cluster_url: String, - /// The configured default namespace - pub default_namespace: String, - /// The configured root certificate - pub root_cert: Option>>, - /// Set the timeout for connecting to the Kubernetes API. - /// - /// A value of `None` means no timeout - pub connect_timeout: Option, - /// Set the timeout for the Kubernetes API response. - /// - /// A value of `None` means no timeout - pub read_timeout: Option, - /// Set the timeout for the Kubernetes API request. - /// - /// A value of `None` means no timeout - pub write_timeout: Option, - /// Whether to accept invalid certificates - pub accept_invalid_certs: bool, - /// Stores information to tell the cluster who you are. - pub auth_info: kube::config::AuthInfo, - // TODO Actually support proxy or create an example with custom client - /// Optional proxy URL. - pub proxy_url: Option, - /// If set, apiserver certificate will be validated to contain this string - /// - /// If not set, the `cluster_url` is used instead - pub tls_server_name: Option, +pub mod k8s_config { + use kube::config::NamedContext; + use secrecy::SecretString; + use serde::{Deserialize, Deserializer, Serialize}; + use tardis::web::poem_openapi; + + pub trait ToKubeconfig { + fn to_kubeconfig(self) -> T; + } + + #[derive(Default, Debug, Serialize, Deserialize, poem_openapi::Object)] + #[serde(default)] + pub struct K8sConfig { + /// Referencable names to cluster configs + #[serde(default, deserialize_with = "deserialize_null_as_default")] + pub clusters: NamedCluster, + /// Referencable names to user configs + #[serde(default, deserialize_with = "deserialize_null_as_default")] + pub users: NamedAuthInfo, + } + impl ToKubeconfig for K8sConfig { + fn to_kubeconfig(self) -> kube::config::Kubeconfig { + let cluster = self.clusters.to_kubeconfig(); + let user = self.users.to_kubeconfig(); + let context = NamedContext { + name: "default".to_string(), + context: Some(kube::config::Context { + cluster: cluster.name.clone(), + user: user.name.clone(), + namespace: None, + extensions: None, + }), + }; + kube::config::Kubeconfig { + preferences: None, + clusters: vec![cluster], + auth_infos: vec![user], + contexts: vec![context], + current_context: Some("default".to_string()), + extensions: None, + kind: Some("Config".to_string()), + api_version: Some("v1".to_string()), + } + } + } + + #[derive(Clone, Debug, Serialize, Deserialize, Default, poem_openapi::Object)] + pub struct NamedCluster { + /// Name of cluster + pub name: String, + /// Information about how to communicate with a kubernetes cluster + #[serde(skip_serializing_if = "Option::is_none")] + pub cluster: Option, + } + + impl ToKubeconfig for NamedCluster { + fn to_kubeconfig(self) -> kube::config::NamedCluster { + kube::config::NamedCluster { + name: self.name, + cluster: self.cluster.map(|c| c.to_kubeconfig()), + } + } + } + + /// Cluster stores information to connect Kubernetes cluster. + #[derive(Clone, Debug, Serialize, Deserialize, Default, poem_openapi::Object)] + pub struct Cluster { + /// The address of the kubernetes cluster (https://hostname:port). + #[serde(skip_serializing_if = "Option::is_none")] + pub server: Option, + /// Skips the validity check for the server's certificate. This will make your HTTPS connections insecure. + #[serde(rename = "insecure-skip-tls-verify")] + #[serde(skip_serializing_if = "Option::is_none")] + pub insecure_skip_tls_verify: Option, + /// The path to a cert file for the certificate authority. + #[serde(rename = "certificate-authority")] + #[serde(skip_serializing_if = "Option::is_none")] + pub certificate_authority: Option, + /// PEM-encoded certificate authority certificates. Overrides `certificate_authority` + #[serde(rename = "certificate-authority-data")] + #[serde(skip_serializing_if = "Option::is_none")] + pub certificate_authority_data: Option, + /// URL to the proxy to be used for all requests. + #[serde(rename = "proxy-url")] + #[serde(skip_serializing_if = "Option::is_none")] + pub proxy_url: Option, + /// Name used to check server certificate. + /// + /// If `tls_server_name` is `None`, the hostname used to contact the server is used. + #[serde(rename = "tls-server-name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub tls_server_name: Option, + } + + impl ToKubeconfig for Cluster { + fn to_kubeconfig(self) -> kube::config::Cluster { + kube::config::Cluster { + server: self.server, + insecure_skip_tls_verify: self.insecure_skip_tls_verify, + certificate_authority: self.certificate_authority, + certificate_authority_data: self.certificate_authority_data, + proxy_url: self.proxy_url, + tls_server_name: self.tls_server_name, + extensions: None, + } + } + } + + /// NamedAuthInfo associates name with authentication. + #[derive(Clone, Debug, Serialize, Deserialize, Default, poem_openapi::Object)] + pub struct NamedAuthInfo { + /// Name of the user + pub name: String, + /// Information that describes identity of the user + #[serde(rename = "user")] + #[serde(skip_serializing_if = "Option::is_none")] + pub auth_info: Option, + } + + impl ToKubeconfig for NamedAuthInfo { + fn to_kubeconfig(self) -> kube::config::NamedAuthInfo { + kube::config::NamedAuthInfo { + name: self.name, + auth_info: self.auth_info.map(|auth| auth.to_kubeconfig()), + } + } + } + + #[derive(Clone, Debug, Serialize, Deserialize, Default, poem_openapi::Object)] + pub struct AuthInfo { + /// The username for basic authentication to the kubernetes cluster. + #[serde(skip_serializing_if = "Option::is_none")] + pub username: Option, + /// The password for basic authentication to the kubernetes cluster. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub password: Option, + + /// The bearer token for authentication to the kubernetes cluster. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub token: Option, + /// Pointer to a file that contains a bearer token (as described above). If both `token` and token_file` are present, `token` takes precedence. + #[serde(rename = "tokenFile")] + #[serde(skip_serializing_if = "Option::is_none")] + pub token_file: Option, + + /// Path to a client cert file for TLS. + #[serde(rename = "client-certificate")] + #[serde(skip_serializing_if = "Option::is_none")] + pub client_certificate: Option, + /// PEM-encoded data from a client cert file for TLS. Overrides `client_certificate` + #[serde(rename = "client-certificate-data")] + #[serde(skip_serializing_if = "Option::is_none")] + pub client_certificate_data: Option, + + /// Path to a client key file for TLS. + #[serde(rename = "client-key")] + #[serde(skip_serializing_if = "Option::is_none")] + pub client_key: Option, + /// PEM-encoded data from a client key file for TLS. Overrides `client_key` + #[serde(rename = "client-key-data")] + #[serde(skip_serializing_if = "Option::is_none", default)] + pub client_key_data: Option, + } + + impl ToKubeconfig for AuthInfo { + fn to_kubeconfig(self) -> kube::config::AuthInfo { + kube::config::AuthInfo { + username: self.username, + password: self.password.map(|s| SecretString::new(s)), + token: self.token.map(|token| SecretString::new(token)), + token_file: self.token_file, + client_certificate: self.client_certificate, + client_certificate_data: self.client_certificate_data, + client_key: self.client_key, + client_key_data: self.client_key_data.map(|data| SecretString::new(data)), + impersonate: None, + impersonate_groups: None, + auth_provider: None, + exec: None, + } + } + } + + fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> Result + where + T: Default + Deserialize<'de>, + D: Deserializer<'de>, + { + let opt = Option::deserialize(deserializer)?; + Ok(opt.unwrap_or_default()) + } } diff --git a/admin/src/initializer.rs b/admin/src/initializer.rs index dd892155..64d1fa26 100644 --- a/admin/src/initializer.rs +++ b/admin/src/initializer.rs @@ -4,6 +4,7 @@ use tardis::basic::result::TardisResult; use tardis::web::web_server::{TardisWebServer, WebServerModule}; pub(crate) async fn init(web_server: &TardisWebServer) -> TardisResult<()> { + // todo 根据现有的k8s资源初始化成VO init_api(web_server).await } diff --git a/admin/src/main.rs b/admin/src/main.rs index 36688412..ca97be09 100644 --- a/admin/src/main.rs +++ b/admin/src/main.rs @@ -1,6 +1,7 @@ use tardis::{basic::result::TardisResult, tokio, TardisFuns}; mod api; +mod client; mod config; mod constants; mod helper; @@ -10,7 +11,6 @@ mod service; #[tokio::main] async fn main() -> TardisResult<()> { - // todo 根据现有的k8s资源初始化成VO TardisFuns::init(Some("config")).await?; let web_server = TardisFuns::web_server(); initializer::init(&web_server).await?; diff --git a/admin/src/model/vo.rs b/admin/src/model/vo.rs index 19a9a09b..6908c836 100644 --- a/admin/src/model/vo.rs +++ b/admin/src/model/vo.rs @@ -1,8 +1,8 @@ pub mod backend_vo; pub mod gateway_vo; pub mod http_route_vo; -pub mod k8s_cluster_vo; pub mod plugin_vo; +pub mod spacegate_inst_vo; pub mod tls_vo; /// Vo is a base until for admin. diff --git a/admin/src/model/vo/k8s_cluster_vo.rs b/admin/src/model/vo/k8s_cluster_vo.rs deleted file mode 100644 index 593252c8..00000000 --- a/admin/src/model/vo/k8s_cluster_vo.rs +++ /dev/null @@ -1 +0,0 @@ -//todo diff --git a/admin/src/model/vo/spacegate_inst_vo.rs b/admin/src/model/vo/spacegate_inst_vo.rs new file mode 100644 index 00000000..ddb94d52 --- /dev/null +++ b/admin/src/model/vo/spacegate_inst_vo.rs @@ -0,0 +1,75 @@ +use crate::config::k8s_config::{K8sConfig, ToKubeconfig}; +use crate::constants; +use crate::model::vo::Vo; +use kube::config::NamedContext; +use serde::{Deserialize, Serialize}; +use tardis::web::poem_openapi; + +#[derive(Debug, Serialize, Deserialize, poem_openapi::Object)] +pub struct InstConfigVo { + pub type_: InstConfigType, + pub k8s_cluster_config: K8sClusterConfig, + pub redis_config: RedisConfig, +} + +impl Vo for InstConfigVo { + fn get_vo_type() -> String { + constants::CLUSTER_TYPE.to_string() + } + + fn get_unique_name(&self) -> String { + match &self.type_ { + InstConfigType::K8sClusterConfig => self.k8s_cluster_config.name.clone(), + InstConfigType::RedisConfig => self.redis_config.name.clone(), + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, poem_openapi::Enum)] +pub enum InstConfigType { + K8sClusterConfig, + RedisConfig, +} + +#[derive(Debug, Default, Serialize, Deserialize, poem_openapi::Object)] +#[serde(default)] +pub struct K8sClusterConfig { + /// uid + pub name: String, + #[serde(flatten)] + pub config: K8sConfig, +} + +impl ToKubeconfig for K8sClusterConfig { + fn to_kubeconfig(self) -> kube::config::Kubeconfig { + let cluster = self.config.clusters.to_kubeconfig(); + let user = self.config.users.to_kubeconfig(); + let context = NamedContext { + name: self.name.clone(), + context: Some(kube::config::Context { + cluster: cluster.name.clone(), + user: user.name.clone(), + namespace: None, + extensions: None, + }), + }; + kube::config::Kubeconfig { + preferences: None, + clusters: vec![cluster], + auth_infos: vec![user], + contexts: vec![context], + current_context: Some(self.name), + extensions: None, + kind: Some("Config".to_string()), + api_version: Some("v1".to_string()), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone, poem_openapi::Object)] +pub struct RedisConfig { + /// uid + pub name: String, + pub auth: String, + pub url: String, +} diff --git a/admin/src/service.rs b/admin/src/service.rs index 2bfff701..dacf9189 100644 --- a/admin/src/service.rs +++ b/admin/src/service.rs @@ -4,3 +4,4 @@ pub(crate) mod gateway_service; pub(crate) mod plugin_service; pub(crate) mod route_service; pub(crate) mod secret_service; +pub(crate) mod spacegate_manage_service; diff --git a/admin/src/service/base_service.rs b/admin/src/service/base_service.rs index cdbaa2c0..86b5fb22 100644 --- a/admin/src/service/base_service.rs +++ b/admin/src/service/base_service.rs @@ -1,10 +1,7 @@ use crate::constants::TYPE_CONFIG_NAME_MAP; -#[cfg(feature = "k8s")] use crate::helper::get_k8s_client; use crate::model::vo::Vo; -#[cfg(feature = "k8s")] use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta}; -#[cfg(feature = "k8s")] use kube::{api::PostParams, Api}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -54,7 +51,6 @@ where } } - #[cfg(feature = "k8s")] async fn add_vo(config: T) -> TardisResult where T: 'async_trait, @@ -62,7 +58,6 @@ where Self::add_or_update_vo(config, true).await } - #[cfg(feature = "k8s")] async fn update_vo(config: T) -> TardisResult where T: 'async_trait, @@ -135,125 +130,6 @@ where } } -//todo remove -// impl BaseService { -// #[cfg(feature = "k8s")] -// pub async fn get_type_map<'a, T>() -> TardisResult> -// where -// T: Vo + Deserialize<'a>, -// { -// if let Some(t_config) = -// get_config_map_api().await?.get_opt(&get_config_name::()).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] Kubernetes client error: {e}"), ""))? -// { -// if let Some(b_map) = t_config.data { -// Ok(b_map.into_iter().collect()) -// } else { -// Ok(HashMap::new()) -// } -// } else { -// init_config_map_by_t::().await?; -// Ok(HashMap::new()) -// } -// } - -// pub async fn get_by_id(id: &str) -> TardisResult -// where -// T: Vo + Deserialize<'a>, -// { -// if let Some(t_str) = BaseService::get_type_map::().await?.remove(id) { -// Ok(t_str) -// } else { -// Err(TardisError::not_found("", "")) -// } -// } - -// #[cfg(feature = "k8s")] -// pub async fn add<'a, T>(config: T) -> TardisResult -// where -// T: Vo + Serialize + Deserialize<'a>, -// { -// Self::add_or_update::(config, true).await -// } - -// #[cfg(feature = "k8s")] -// pub async fn update<'a, T>(config: T) -> TardisResult -// where -// T: Vo + Serialize + Deserialize<'a>, -// { -// Self::add_or_update::(config, false).await -// } - -// #[cfg(feature = "k8s")] -// pub async fn add_or_update<'a, T>(config: T, add_only: bool) -> TardisResult -// where -// T: Vo + Serialize + Deserialize<'a>, -// { -// let id = config.get_unique_name(); -// let mut datas = Self::get_type_map::().await?; -// if let Some(_) = datas.get(&id) { -// if add_only { -// return Err(TardisError::bad_request(&format!("[SG.admin] {}:{} already exists", T::get_vo_type(), id), "")); -// } else { -// log::debug!("[SG.admin] add_or_update {}:{} exists , will update", T::get_vo_type(), id); -// } -// } else { -// log::debug!("[SG.admin] add_or_update {}:{} not exists , will add", T::get_vo_type(), id); -// } - -// datas.insert( -// id.clone(), -// serde_json::to_string(&config).map_err(|e| TardisError::bad_request(&format!("Serialization to json failed:{e}"), ""))?, -// ); -// get_config_map_api() -// .await? -// .replace( -// &get_config_name::(), -// &PostParams::default(), -// &ConfigMap { -// data: Some(datas.into_iter().collect()), -// metadata: ObjectMeta { -// name: Some(get_config_name::()), -// ..Default::default() -// }, -// ..Default::default() -// }, -// ) -// .await -// .map_err(|e| TardisError::io_error(&format!("[SG.admin] Kubernetes client error:{e}"), ""))?; -// Ok(config) -// } - -// #[cfg(feature = "k8s")] -// pub async fn delete<'a, T>(config_id: &str) -> TardisResult<()> -// where -// T: Vo + Serialize + Deserialize<'a>, -// { -// let mut datas = Self::get_type_map::().await?; -// if let Some(_) = datas.remove(config_id) { -// get_config_map_api() -// .await? -// .replace( -// &get_config_name::(), -// &PostParams::default(), -// &ConfigMap { -// data: Some(datas.into_iter().collect()), -// metadata: ObjectMeta { -// name: Some(get_config_name::()), -// ..Default::default() -// }, -// ..Default::default() -// }, -// ) -// .await -// .map_err(|e| TardisError::io_error(&format!("[SG.admin] Kubernetes client error:{e}"), ""))?; -// } else { -// log::debug!("{}:{} already not exists", T::get_vo_type(), config_id); -// } -// Ok(()) -// } -// } - -#[cfg(feature = "k8s")] pub async fn init_config_map_by_t() -> TardisResult<()> where T: Vo, @@ -276,7 +152,6 @@ where Ok(()) } -#[cfg(feature = "k8s")] pub fn get_config_name() -> String where T: Vo, @@ -284,11 +159,16 @@ where TYPE_CONFIG_NAME_MAP.get(T::get_vo_type().as_str()).expect("").to_string() } -#[cfg(feature = "k8s")] +//todo get_k8s_client pub async fn get_config_map_api() -> TardisResult> { Ok(Api::namespaced(get_k8s_client().await?, "spacegate")) } +#[cfg(all(feature = "cache", not(feature = "k8s")))] +pub async fn get_redis_client() -> TardisResult { + //todo +} + #[cfg(test)] mod test { use crate::model::vo::backend_vo::SgBackendRefVo; @@ -298,7 +178,6 @@ mod test { use tardis::tokio; #[tokio::test] - #[cfg(feature = "k8s")] #[ignore] async fn k8s_test() { let mut add_o_1 = SgBackendRefVo { diff --git a/admin/src/service/gateway_service.rs b/admin/src/service/gateway_service.rs index 43f76d30..5c3c5b83 100644 --- a/admin/src/service/gateway_service.rs +++ b/admin/src/service/gateway_service.rs @@ -9,7 +9,6 @@ use k8s_gateway_api::Gateway; use kernel_common::constants::k8s_constants::DEFAULT_NAMESPACE; #[cfg(feature = "k8s")] use kube::api::{DeleteParams, PostParams}; -use std::collections::HashSet; #[cfg(feature = "k8s")] use kube::Api; @@ -18,8 +17,6 @@ use super::base_service::VoBaseService; use crate::model::vo_converter::VoConv; use crate::service::plugin_service::PluginK8sService; #[cfg(feature = "k8s")] -use kernel_common::converter::plugin_k8s_conv::SgSingeFilter; -#[cfg(feature = "k8s")] use kernel_common::helper::k8s_helper::{format_k8s_obj_unique, parse_k8s_obj_unique, parse_k8s_unique_or_default, WarpKubeResult}; use tardis::basic::result::TardisResult; @@ -49,7 +46,7 @@ impl GatewayVoService { let (namespace, _) = parse_k8s_unique_or_default(&add.get_unique_name()); let (gateway, sgfilters) = add_model.to_kube_gateway(); - let gateway_api: Api = Api::namespaced(get_k8s_client().await?, &namespace); + let gateway_api: Api = Self::get_gateway_api(&Some(namespace)).await?; let _ = gateway_api.create(&PostParams::default(), &gateway).await.warp_result_by_method("Add Gateway")?; @@ -71,11 +68,11 @@ impl GatewayVoService { #[cfg(feature = "k8s")] { let (namespace, name) = parse_k8s_obj_unique(update_un); - let gateway_api: Api = Api::namespaced(get_k8s_client().await?, &namespace); + let gateway_api: Api = Self::get_gateway_api(&Some(namespace)).await?; let (update_gateway, update_filter) = update_sg_gateway.to_kube_gateway(); gateway_api.replace(&name, &PostParams::default(), &update_gateway).await.warp_result_by_method("Replace Gateway")?; - Self::update_gateway_filter(old_sg_gateway.to_kube_gateway().1, update_filter).await?; + PluginK8sService::update_filter_changes(old_sg_gateway.to_kube_gateway().1, update_filter).await?; } Self::update_vo(update).await } @@ -97,109 +94,8 @@ impl GatewayVoService { Ok(()) } - #[cfg(feature = "k8s")] - async fn update_gateway_filter(old: Vec, update: Vec) -> TardisResult<()> { - if old.is_empty() && update.is_empty() { - return Ok(()); - } - - let old_set: HashSet<_> = old.into_iter().collect(); - let update_set: HashSet<_> = update.into_iter().collect(); - - let update_vec: Vec<_> = old_set.intersection(&update_set).collect(); - PluginK8sService::update_sgfilter_vec(&update_vec).await?; - let add_vec: Vec<_> = update_set.difference(&old_set).collect(); - PluginK8sService::add_sgfilter_vec(&add_vec).await?; - let delete_vec: Vec<_> = old_set.difference(&update_set).collect(); - PluginK8sService::delete_sgfilter_vec(&delete_vec).await?; - - Ok(()) - } - #[cfg(feature = "k8s")] async fn get_gateway_api(namespace: &Option) -> TardisResult> { Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) } } -//old version -// //todo try to compress with kernel::config::config_by_k8s -// #[cfg(feature = "k8s")] -// pub async fn kube_to(gateway_list: Vec) -> TardisResult> { -// let mut result = vec![]; -// for g in gateway_list { -// result.push(SgGateway { -// name: g.name_any(), -// parameters: SgParameters::from_kube_gateway(&g), -// listeners: join_all( -// g.spec -// .listeners -// .into_iter() -// .map(|listener| async move { -// let tls = match listener.tls { -// Some(tls) => { -// let certificate_ref = tls -// .certificate_refs -// .as_ref() -// .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is required", ""))? -// .get(0) -// .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is empty", ""))?; -// let secret_api: Api = -// Api::namespaced(get_k8s_client().await?, certificate_ref.namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string())); -// if let Some(secret_obj) = secret_api -// .get_opt(&certificate_ref.name) -// .await -// .map_err(|error| TardisError::wrap(&format!("[SG.Config] Kubernetes error: {error:?}"), ""))? -// { -// let secret_data = secret_obj -// .data -// .ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data is required", certificate_ref.name), ""))?; -// let tls_crt = secret_data.get("tls.crt").ok_or_else(|| { -// TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.crt] is required", certificate_ref.name), "") -// })?; -// let tls_key = secret_data.get("tls.key").ok_or_else(|| { -// TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.key] is required", certificate_ref.name), "") -// })?; -// Some(SgTlsConfig { -// mode: SgTlsMode::from(tls.mode).unwrap_or_default(), -// key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"), -// cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"), -// }) -// } else { -// TardisError::not_found(&format!("[SG.admin] Gateway have tls secret [{}], but not found!", certificate_ref.name), ""); -// None -// } -// } -// None => None, -// }; -// let sg_listener = SgListener { -// name: listener.name, -// ip: None, -// port: listener.port, -// protocol: match listener.protocol.to_lowercase().as_str() { -// "http" => SgProtocol::Http, -// "https" => SgProtocol::Https, -// "ws" => SgProtocol::Ws, -// _ => { -// return Err(TardisError::not_implemented( -// &format!("[SG.Config] Gateway [spec.listener.protocol={}] not supported yet", listener.protocol), -// "", -// )) -// } -// }, -// tls, -// hostname: listener.hostname, -// }; -// Ok(sg_listener) -// }) -// .collect::>(), -// ) -// .await -// .into_iter() -// .map(|listener| listener.expect("[SG.Config] Unexpected none: listener")) -// .collect(), -// filters: None, -// }) -// } -// Ok(result) -// } -// } diff --git a/admin/src/service/plugin_service.rs b/admin/src/service/plugin_service.rs index c57c7a70..75d7c8fc 100644 --- a/admin/src/service/plugin_service.rs +++ b/admin/src/service/plugin_service.rs @@ -13,7 +13,7 @@ use kernel_common::{ use kube::api::{ListParams, PostParams}; #[cfg(feature = "k8s")] use kube::{Api, ResourceExt}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tardis::basic::result::TardisResult; pub struct PluginVoService; @@ -42,6 +42,7 @@ impl PluginVoService { pub(crate) async fn add(add: SgFilterVo) -> TardisResult { Self::add_vo(add).await } + pub(crate) async fn update(update: SgFilterVo) -> TardisResult { Self::update_vo(update).await } @@ -50,20 +51,32 @@ impl PluginVoService { Self::delete_vo(id).await?; Ok(()) } - - #[cfg(feature = "k8s")] - #[inline] - pub async fn get_filter_api(namespace: &Option) -> TardisResult> { - Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) - } } #[cfg(feature = "k8s")] impl PluginK8sService { + pub(crate) async fn update_filter_changes(old: Vec, update: Vec) -> TardisResult<()> { + if old.is_empty() && update.is_empty() { + return Ok(()); + } + + let old_set: HashSet<_> = old.into_iter().collect(); + let update_set: HashSet<_> = update.into_iter().collect(); + + let update_vec: Vec<_> = old_set.intersection(&update_set).collect(); + PluginK8sService::update_sgfilter_vec(&update_vec).await?; + let add_vec: Vec<_> = update_set.difference(&old_set).collect(); + PluginK8sService::add_sgfilter_vec(&add_vec).await?; + let delete_vec: Vec<_> = old_set.difference(&update_set).collect(); + PluginK8sService::delete_sgfilter_vec(&delete_vec).await?; + + Ok(()) + } + pub async fn add_sgfilter_vec(sgfilters: &[&SgSingeFilter]) -> TardisResult<()> { let mut filter_map = HashMap::new(); for sf in sgfilters { - let filter_api: Api = PluginVoService::get_filter_api(&Some(sf.namespace.clone())).await?; + let filter_api: Api = Self::get_filter_api(&Some(sf.namespace.clone())).await?; let namespace_filter = if let Some(filter_list) = filter_map.get(&sf.namespace) { filter_list @@ -89,7 +102,7 @@ impl PluginK8sService { pub async fn update_sgfilter_vec(sgfilters: &[&SgSingeFilter]) -> TardisResult<()> { let mut filter_map = HashMap::new(); for sf in sgfilters { - let filter_api: Api = PluginVoService::get_filter_api(&Some(sf.namespace.clone())).await?; + let filter_api: Api = Self::get_filter_api(&Some(sf.namespace.clone())).await?; let namespace_filter = if let Some(filter_list) = filter_map.get(&sf.namespace) { filter_list @@ -123,7 +136,7 @@ impl PluginK8sService { pub async fn delete_sgfilter_vec(sgfilters: &[&SgSingeFilter]) -> TardisResult<()> { let mut filter_map = HashMap::new(); for sf in sgfilters { - let filter_api: Api = PluginVoService::get_filter_api(&Some(sf.namespace.clone())).await?; + let filter_api: Api = Self::get_filter_api(&Some(sf.namespace.clone())).await?; let namespace_filter = if let Some(filter_list) = filter_map.get(&sf.namespace) { filter_list @@ -142,4 +155,9 @@ impl PluginK8sService { } Ok(()) } + + #[inline] + pub async fn get_filter_api(namespace: &Option) -> TardisResult> { + Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) + } } diff --git a/admin/src/service/route_service.rs b/admin/src/service/route_service.rs index d32663b6..ebeabebc 100644 --- a/admin/src/service/route_service.rs +++ b/admin/src/service/route_service.rs @@ -6,9 +6,10 @@ use crate::model::vo_converter::VoConv; use crate::service::base_service::VoBaseService; use crate::service::plugin_service::PluginK8sService; +use k8s_openapi::api::core::v1::Secret; +use kernel_common::constants::k8s_constants::DEFAULT_NAMESPACE; #[cfg(feature = "k8s")] use kernel_common::{ - converter::plugin_k8s_conv::SgSingeFilter, helper::k8s_helper::{format_k8s_obj_unique, parse_k8s_obj_unique, parse_k8s_unique_or_default, WarpKubeResult}, k8s_crd::http_spaceroute::HttpSpaceroute, }; @@ -16,7 +17,6 @@ use kernel_common::{ use kube::api::{DeleteParams, PostParams}; #[cfg(feature = "k8s")] use kube::Api; -use std::collections::HashSet; use tardis::basic::result::TardisResult; pub struct HttpRouteVoService; @@ -75,7 +75,7 @@ impl HttpRouteVoService { let (update_httproute, update_filter) = update_sg_httproute.to_kube_httproute(); http_route_api.replace(&name, &PostParams::default(), &update_httproute).await.warp_result_by_method("Replace HttpSpaceroute")?; - Self::update_httproute_filter(old_sg_httproute.to_kube_httproute().1, update_filter).await?; + PluginK8sService::update_filter_changes(old_sg_httproute.to_kube_httproute().1, update_filter).await?; } Self::update_vo(update).await } @@ -84,7 +84,7 @@ impl HttpRouteVoService { let (namespace, name) = parse_k8s_obj_unique(id); #[cfg(feature = "k8s")] { - let http_route_api: Api = Api::namespaced(get_k8s_client().await?, &namespace); + let http_route_api: Api = Self::get_spaceroute_api(&Some(namespace)).await?; http_route_api.delete(&name, &DeleteParams::default()).await.warp_result_by_method("Delete HttpSpaceroute")?; @@ -96,23 +96,9 @@ impl HttpRouteVoService { Ok(()) } - //todo 和gateway_service 里的那个合并 #[cfg(feature = "k8s")] - async fn update_httproute_filter(old: Vec, update: Vec) -> TardisResult<()> { - if old.is_empty() && update.is_empty() { - return Ok(()); - } - - let old_set: HashSet<_> = old.into_iter().collect(); - let update_set: HashSet<_> = update.into_iter().collect(); - - let update_vec: Vec<_> = old_set.intersection(&update_set).collect(); - PluginK8sService::update_sgfilter_vec(&update_vec).await?; - let add_vec: Vec<_> = update_set.difference(&old_set).collect(); - PluginK8sService::add_sgfilter_vec(&add_vec).await?; - let delete_vec: Vec<_> = old_set.difference(&update_set).collect(); - PluginK8sService::delete_sgfilter_vec(&delete_vec).await?; - - Ok(()) + #[inline] + async fn get_spaceroute_api(namespace: &Option) -> TardisResult> { + Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) } } diff --git a/admin/src/service/secret_service.rs b/admin/src/service/secret_service.rs index d3619478..0dbb0fc8 100644 --- a/admin/src/service/secret_service.rs +++ b/admin/src/service/secret_service.rs @@ -1,11 +1,13 @@ +use crate::helper::get_k8s_client; use crate::model::query_dto::{GatewayQueryDto, SgTlsQueryInst, ToInstance}; use crate::model::vo::Vo; use crate::service::base_service::VoBaseService; use crate::service::gateway_service::GatewayVoService; use k8s_openapi::api::core::v1::Secret; +use kernel_common::constants::k8s_constants::DEFAULT_NAMESPACE; #[cfg(feature = "k8s")] use kernel_common::{ - helper::k8s_helper::{format_k8s_obj_unique, get_base_k8s_client, parse_k8s_obj_unique, parse_k8s_unique_or_default, WarpKubeResult}, + helper::k8s_helper::{format_k8s_obj_unique, parse_k8s_obj_unique, parse_k8s_unique_or_default, WarpKubeResult}, inner_model::gateway::SgTls, }; use kube::api::{DeleteParams, PostParams}; @@ -37,7 +39,7 @@ impl TlsVoService { #[cfg(feature = "k8s")] { let (namespace, _) = parse_k8s_unique_or_default(&add.get_unique_name()); - let secret_api: Api = Api::namespaced(get_base_k8s_client().await?, &namespace); + let secret_api: Api = Self::get_secret_api(&Some(namespace)).await?; let s = add_model.to_kube_tls(); secret_api.create(&PostParams::default(), &s).await.warp_result_by_method("Add Secret")?; } @@ -51,7 +53,7 @@ impl TlsVoService { #[cfg(feature = "k8s")] { let (namespace, name) = parse_k8s_obj_unique(&unique_name); - let secret_api: Api = Api::namespaced(get_base_k8s_client().await?, &namespace); + let secret_api: Api = Self::get_secret_api(&Some(namespace)).await?; let s = update.clone().to_kube_tls(); secret_api.replace(&name, &PostParams::default(), &s).await.warp_result_by_method("Update Secret")?; } @@ -66,7 +68,7 @@ impl TlsVoService { #[cfg(feature = "k8s")] { let (namespace, name) = parse_k8s_obj_unique(id); - let secret_api: Api = Api::namespaced(get_base_k8s_client().await?, &namespace); + let secret_api: Api = Self::get_secret_api(&Some(namespace)).await?; secret_api.delete(&name, &DeleteParams::default()).await.warp_result_by_method("Delete Secret")?; } let gateways = GatewayVoService::list(GatewayQueryDto { ..Default::default() }.to_instance()?).await?; @@ -83,4 +85,9 @@ impl TlsVoService { )) } } + + #[cfg(feature = "k8s")] + async fn get_secret_api(namespace: &Option) -> TardisResult> { + Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) + } } diff --git a/admin/src/service/spacegate_manage_service.rs b/admin/src/service/spacegate_manage_service.rs new file mode 100644 index 00000000..51a880b5 --- /dev/null +++ b/admin/src/service/spacegate_manage_service.rs @@ -0,0 +1,9 @@ +use crate::model::vo::spacegate_inst_vo::InstConfigVo; +use crate::service::base_service::VoBaseService; +use kernel_common::inner_model::gateway::SgTls; + +pub struct SpacegateManageService; + +impl VoBaseService for SpacegateManageService {} + +impl SpacegateManageService {} diff --git a/kernel-common/Cargo.toml b/kernel-common/Cargo.toml index b8822ee3..16f86fd2 100644 --- a/kernel-common/Cargo.toml +++ b/kernel-common/Cargo.toml @@ -14,8 +14,9 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -k8s = ["kube", "k8s-openapi","schemars","k8s-openapi/schemars","k8s-gateway-api","lazy_static"] -admin-support=["tardis/web-server"] +k8s = ["kube", "k8s-openapi", "schemars", "k8s-openapi/schemars", "k8s-gateway-api", "lazy_static"] +admin-support = ["tardis/web-server"] +cache = ["tardis/cache"] [dependencies] serde.workspace = true diff --git a/kernel-common/src/client.rs b/kernel-common/src/client.rs new file mode 100644 index 00000000..9cc73ad3 --- /dev/null +++ b/kernel-common/src/client.rs @@ -0,0 +1,80 @@ +#[cfg(feature = "cache")] +mod cache_client { + use std::collections::HashMap; + use std::sync::{Arc, OnceLock}; + use tardis::basic::error::TardisError; + use tardis::basic::result::TardisResult; + use tardis::cache::cache_client::TardisCacheClient; + use tardis::config::config_dto::CacheModuleConfig; + use tardis::tokio::sync::RwLock; + + pub fn cache_clients() -> &'static RwLock>> { + static CACHE_CLIENTS: OnceLock>>> = OnceLock::new(); + CACHE_CLIENTS.get_or_init(Default::default) + } + + pub async fn init(name: impl Into, url: &str) -> TardisResult<()> { + let cache = TardisCacheClient::init(&CacheModuleConfig::builder().url(url.parse().expect("invalid url")).build()).await?; + { + let mut write = cache_clients().write().await; + write.insert(name.into(), Arc::new(cache)); + } + Ok(()) + } + + pub async fn remove(name: &str) -> TardisResult<()> { + { + let mut write = cache_clients().write().await; + write.remove(name); + } + Ok(()) + } + + pub async fn get(name: &str) -> TardisResult> { + { + let read = cache_clients().read().await; + read.get(name).cloned().ok_or_else(|| TardisError::bad_request("[SG.server] Get client failed", "")) + } + } +} +#[cfg(feature = "k8s")] +mod k8s_client { + use std::collections::HashMap; + use std::sync::{Arc, OnceLock}; + use tardis::basic::error::TardisError; + use tardis::basic::result::TardisResult; + use tardis::cache::cache_client::TardisCacheClient; + // use tardis::cache::cache_client::TardisCacheClient; + use tardis::config::config_dto::CacheModuleConfig; + use tardis::tokio::sync::RwLock; + + pub fn k8s_clients() -> &'static RwLock>> { + static K8S_CLIENTS: OnceLock>>> = OnceLock::new(); + K8S_CLIENTS.get_or_init(Default::default) + } + + pub async fn init(name: impl Into, config: kube::Config) -> TardisResult<()> { + kube::Client::try_from(config) + // let cache = TardisCacheClient::init(&CacheModuleConfig::builder().url(url.parse().expect("invalid url")).build()).await?; + // { + // let mut write = k8s_clients().write().await; + // write.insert(name.into(), Arc::new(cache)); + // } + Ok(()) + } + + pub async fn remove(name: &str) -> TardisResult<()> { + { + let mut write = k8s_clients().write().await; + write.remove(name); + } + Ok(()) + } + + pub async fn get(name: &str) -> TardisResult> { + { + let read = k8s_clients().read().await; + read.get(name).cloned().ok_or_else(|| TardisError::bad_request("[SG.server] Get client failed", "")) + } + } +} diff --git a/kernel-common/src/converter/gateway_k8s_conv.rs b/kernel-common/src/converter/gateway_k8s_conv.rs index 929a2483..8674b161 100644 --- a/kernel-common/src/converter/gateway_k8s_conv.rs +++ b/kernel-common/src/converter/gateway_k8s_conv.rs @@ -209,6 +209,7 @@ impl SgTls { .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is required", ""))? .get(0) .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is empty", ""))?; + //todo client 选择有问题 let secret_api: Api = Api::namespaced(get_base_k8s_client().await?, certificate_ref.namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string())); let result = if let Some(secret_obj) = secret_api.get_opt(&certificate_ref.name).await.map_err(|error| TardisError::wrap(&format!("[SG.Config] Kubernetes error: {error:?}"), ""))? diff --git a/kernel-common/src/helper/k8s_helper.rs b/kernel-common/src/helper/k8s_helper.rs index 7f2fbe6b..40189cf2 100644 --- a/kernel-common/src/helper/k8s_helper.rs +++ b/kernel-common/src/helper/k8s_helper.rs @@ -67,21 +67,32 @@ pub async fn get_base_k8s_client() -> TardisResult { } } -pub async fn set_k8s_client_by_file(path: &str) -> TardisResult<()> { +/// # Set Base kube client by file path +/// It only needs to be set once during initialization, +/// which conflicts with `set_base_k8s_client_by_config` +pub async fn set_base_k8s_client_by_file(path: &str) -> TardisResult<()> { let kube_config = Kubeconfig::read_from(path).map_err(|e| TardisError::conflict(&format!("[SG.admin] Read kubernetes config error:{e}"), ""))?; - set_k8s_client_by_config(kube_config).await?; + set_base_k8s_client_by_config(kube_config).await?; Ok(()) } -pub async fn set_k8s_client_by_config(kube_config: Kubeconfig) -> TardisResult<()> { - let config = Config::from_custom_kubeconfig(kube_config, &KubeConfigOptions::default()) - .await - .map_err(|e| TardisError::conflict(&format!("[SG.admin] Parse kubernetes config error:{e}"), ""))?; - - let client = Client::try_from(config).map_err(|e| TardisError::conflict(&format!("[SG.admin] Create kubernetes client error:{e}"), ""))?; +/// # Set Base kube client by `Kubeconfig` +/// It only needs to be set once during initialization, +/// which conflicts with `set_base_k8s_client_by_file` +pub async fn set_base_k8s_client_by_config(kube_config: Kubeconfig) -> TardisResult<()> { + let client = get_k8s_client_by_config(kube_config).await?; let mut golabl = GLOBAL_CLIENT.write().await; *golabl = Some(client); Ok(()) } + +/// # Get kube client by `Kubeconfig` +/// Instantiate `Kubeconfig` as client +pub async fn get_k8s_client_by_config(kube_config: Kubeconfig) -> TardisResult { + let config = Config::from_custom_kubeconfig(kube_config, &KubeConfigOptions::default()) + .await + .map_err(|e| TardisError::conflict(&format!("[SG.admin] Parse kubernetes config error:{e}"), ""))?; + Ok(Client::try_from(config).map_err(|e| TardisError::conflict(&format!("[SG.admin] Create kubernetes client error:{e}"), ""))?) +} diff --git a/kernel-common/src/lib.rs b/kernel-common/src/lib.rs index fd263ccc..07bd11e5 100644 --- a/kernel-common/src/lib.rs +++ b/kernel-common/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod constants; #[cfg(feature = "k8s")] pub mod converter; diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 8d86323a..cd5b44ee 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -18,9 +18,9 @@ path = "src/lib.rs" [features] local = ["tardis/fs"] -cache = ["tardis/cache"] +cache = ["tardis/cache", "kernel-common/cache"] ws = ["tardis/ws-client"] -k8s = ["kube", "k8s-openapi", "k8s-gateway-api", "schemars", "cache","kernel-common/k8s"] +k8s = ["kube", "k8s-openapi", "k8s-gateway-api", "schemars", "cache", "kernel-common/k8s"] [dependencies] serde.workspace = true @@ -31,7 +31,7 @@ itertools.workspace = true urlencoding.workspace = true async-compression.workspace = true -kernel-common={path = "../kernel-common" } +kernel-common = { path = "../kernel-common" } tardis = { workspace = true, features = ["future", "crypto", "tls"] } http.workspace = true rustls = { workspace = true, features = ["dangerous_configuration"] } @@ -54,7 +54,7 @@ tardis = { workspace = true, features = ["test", "web-client", "web-server"] } reqwest = { workspace = true } tracing-subscriber = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } -testcontainers-modules ={ workspace = true } +testcontainers-modules = { workspace = true } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"]