diff --git a/admin/Cargo.toml b/admin/Cargo.toml index 2a09b028..ab824501 100644 --- a/admin/Cargo.toml +++ b/admin/Cargo.toml @@ -16,7 +16,7 @@ readme = "./README.md" k8s = ["kube", "k8s-openapi", "k8s-gateway-api","kernel-common/k8s"] [dependencies] -tardis = { workspace = true ,features = ["web-server"]} +tardis = { workspace = true ,features = ["web-server","crypto"]} serde.workspace = true serde_json.workspace = true diff --git a/admin/src/api/backend_api.rs b/admin/src/api/backend_api.rs index 700e9aef..f4c0cb56 100644 --- a/admin/src/api/backend_api.rs +++ b/admin/src/api/backend_api.rs @@ -1,6 +1,6 @@ use crate::model::query_dto::BackendRefQueryDto; -use crate::model::vo::backend_vo::BackendRefVO; -use crate::service::backend_ref_service::BackendRefService; +use crate::model::vo::backend_vo::SgBackendRefVO; +use crate::service::backend_ref_service::BackendRefServiceVo; use tardis::web::poem::web::Query; use tardis::web::poem_openapi; use tardis::web::poem_openapi::param::Path; @@ -15,8 +15,8 @@ pub struct BackendApi; impl BackendApi { /// Get Backend List #[oai(path = "/", method = "get")] - async fn list(&self, name: Query>, namespace: Query>) -> TardisApiResult> { - let result = BackendRefService::list( + async fn list(&self, name: Query>, namespace: Query>) -> TardisApiResult> { + let result = BackendRefServiceVo::list( namespace.0.clone(), BackendRefQueryDto { name: name.0, @@ -29,22 +29,22 @@ impl BackendApi { /// Add Backend #[oai(path = "/", method = "post")] - async fn add(&self, backend: Json) -> TardisApiResult { - BackendRefService::add(backend.0).await?; + async fn add(&self, backend: Json) -> TardisApiResult { + BackendRefServiceVo::add(backend.0).await?; TardisResp::ok(Void {}) } /// update Backend #[oai(path = "/", method = "put")] - async fn update(&self, backend: Json) -> TardisApiResult { - BackendRefService::update(backend.0).await?; + async fn update(&self, backend: Json) -> TardisApiResult { + BackendRefServiceVo::update(backend.0).await?; TardisResp::ok(Void {}) } /// delete Backend #[oai(path = "/:backend_id", method = "put")] async fn delete(&self, backend_id: Path) -> TardisApiResult { - BackendRefService::delete(&backend_id.0).await?; + BackendRefServiceVo::delete(&backend_id.0).await?; TardisResp::ok(Void {}) } } diff --git a/admin/src/api/gateway_api.rs b/admin/src/api/gateway_api.rs index f13ed176..ad21d4a6 100644 --- a/admin/src/api/gateway_api.rs +++ b/admin/src/api/gateway_api.rs @@ -1,9 +1,5 @@ -use crate::model::query_dto::GatewayQueryDto; -use crate::model::vo::backend_vo::BackendRefVO; use crate::model::vo::gateway_vo::SgGatewayVO; -use crate::service::gateway_service::GatewayService; -use crate::service::plugin_service::PluginService; -use kernel_common::inner_model::gateway::SgGateway; +use crate::service::gateway_service::GatewayServiceVo; use tardis::web::poem_openapi; use tardis::web::poem_openapi::param::Query; use tardis::web::poem_openapi::payload::Json; @@ -17,22 +13,22 @@ pub struct GatewayApi; impl GatewayApi { /// Add Gateway #[oai(path = "/", method = "post")] - async fn add(&self, backend: Json) -> TardisApiResult { - GatewayService::add(backend.0).await?; + async fn add(&self, add: Json) -> TardisApiResult { + GatewayServiceVo::add(add.0).await?; TardisResp::ok(Void {}) } /// Update Gateway #[oai(path = "/", method = "put")] async fn update(&self, backend: Json) -> TardisApiResult { - GatewayService::update(backend.0).await?; + GatewayServiceVo::update(backend.0).await?; TardisResp::ok(Void {}) } /// Delete Gateway #[oai(path = "/", method = "delete")] - async fn delete(&self, namespace: Query>, name: Query) -> TardisApiResult { - GatewayService::delete(namespace.0, &name.0).await?; + async fn delete(&self, name: Query) -> TardisApiResult { + GatewayServiceVo::delete(&name.0).await?; TardisResp::ok(Void {}) } } diff --git a/admin/src/api/plugin_api.rs b/admin/src/api/plugin_api.rs index fdbe7a26..eeb0299d 100644 --- a/admin/src/api/plugin_api.rs +++ b/admin/src/api/plugin_api.rs @@ -1,7 +1,8 @@ use crate::model::query_dto::PluginQueryDto; -use crate::model::vo::backend_vo::BackendRefVO; -use crate::service::backend_ref_service::BackendRefService; -use crate::service::plugin_service::PluginService; +use crate::model::vo::backend_vo::SgBackendRefVO; +use crate::model::vo::plugin_vo::SgFilterVO; +use crate::service::backend_ref_service::BackendRefServiceVo; +use crate::service::plugin_service::PluginServiceVo; use tardis::web::poem_openapi; use tardis::web::poem_openapi::param::{Path, Query}; use tardis::web::poem_openapi::payload::Json; @@ -15,7 +16,7 @@ impl PluginApi { /// Get Plugin List #[oai(path = "/", method = "get")] async fn list(&self, ids: Query>, name: Query>, namespace: Query>, code: Query>) -> TardisApiResult { - let _ = PluginService::list(PluginQueryDto { + let _ = PluginServiceVo::list(PluginQueryDto { ids: ids.0.map(|s| s.split(',').map(|s| s.to_string()).collect::>()), name: name.0, namespace: namespace.0, @@ -28,22 +29,22 @@ impl PluginApi { /// Add Plugin #[oai(path = "/", method = "post")] - async fn add(&self, backend: Json) -> TardisApiResult { - PluginService::add(backend.0).await?; + async fn add(&self, add: Json) -> TardisApiResult { + PluginServiceVo::add(add.0).await?; TardisResp::ok(Void {}) } /// Update Plugin #[oai(path = "/", method = "put")] - async fn update(&self, backend: Json) -> TardisApiResult { - PluginService::update(backend.0).await?; + async fn update(&self, update: Json) -> TardisApiResult { + PluginServiceVo::update(update.0).await?; TardisResp::ok(Void {}) } /// Delete Plugin #[oai(path = "/:backend_id", method = "put")] async fn delete(&self, backend_id: Path) -> TardisApiResult { - PluginService::delete(&backend_id.0).await?; + PluginServiceVo::delete(&backend_id.0).await?; TardisResp::ok(Void {}) } } diff --git a/admin/src/api/tls_config_api.rs b/admin/src/api/tls_config_api.rs index f352fa8f..4a575705 100644 --- a/admin/src/api/tls_config_api.rs +++ b/admin/src/api/tls_config_api.rs @@ -1,8 +1,8 @@ -use crate::model::query_dto::PluginQueryDto; -use crate::model::vo::backend_vo::BackendRefVO; -use crate::service::plugin_service::PluginService; +use crate::model::vo::gateway_vo::SgTlsConfigVO; +use crate::service::tls_config_service::TlsConfigServiceVo; use tardis::web::poem_openapi; -use tardis::web::poem_openapi::param::Path; +use tardis::web::poem_openapi::param::{Path, Query}; +use tardis::web::poem_openapi::payload::Json; use tardis::web::web_resp::{TardisApiResult, TardisResp, Void}; #[derive(Clone, Default)] @@ -14,35 +14,35 @@ impl TlsConfigApi { /// Get TlsConfig List #[oai(path = "/", method = "get")] async fn list(&self, ids: Query>, name: Query>, namespace: Query>, code: Query>) -> TardisApiResult { - let _ = PluginService::list(PluginQueryDto { - ids: ids.0.map(|s| s.split(',').map(|s| s.to_string()).collect::>()), - name: name.0, - namespace: namespace.0, - code: code.0, - target: None, - }) - .await; + // let _ = PluginService::list(PluginQueryDto { + // ids: ids.0.map(|s| s.split(',').map(|s| s.to_string()).collect::>()), + // name: name.0, + // namespace: namespace.0, + // code: code.0, + // target: None, + // }) + // .await; TardisResp::ok(Void {}) } /// Add TlsConfig #[oai(path = "/", method = "post")] - async fn add(&self, backend: Json) -> TardisApiResult { - PluginService::add(backend.0).await?; + async fn add(&self, tls_config: Json) -> TardisApiResult { + TlsConfigServiceVo::add(tls_config.0).await?; TardisResp::ok(Void {}) } /// Update TlsConfig #[oai(path = "/", method = "put")] - async fn update(&self, backend: Json) -> TardisApiResult { - PluginService::update(backend.0).await?; + async fn update(&self, tls_config: Json) -> TardisApiResult { + TlsConfigServiceVo::update(tls_config.0).await?; TardisResp::ok(Void {}) } /// Delete TlsConfig #[oai(path = "/:backend_id", method = "put")] - async fn delete(&self, backend_id: Path) -> TardisApiResult { - PluginService::delete(&backend_id.0).await?; + async fn delete(&self, tls_config_id: Path) -> TardisApiResult { + TlsConfigServiceVo::delete(&tls_config_id.0).await?; TardisResp::ok(Void {}) } } diff --git a/admin/src/main.rs b/admin/src/main.rs index 9bb9a2ed..36688412 100644 --- a/admin/src/main.rs +++ b/admin/src/main.rs @@ -10,6 +10,7 @@ mod service; #[tokio::main] async fn main() -> TardisResult<()> { + // todo 根据现有的k8s资源初始化成VO TardisFuns::init(Some("config")).await?; let web_server = TardisFuns::web_server(); initializer::init(&web_server).await?; diff --git a/admin/src/model/query_dto.rs b/admin/src/model/query_dto.rs index 7a525e02..3eb2d99e 100644 --- a/admin/src/model/query_dto.rs +++ b/admin/src/model/query_dto.rs @@ -29,6 +29,7 @@ impl ToFields for GatewayQueryDto { } } +#[derive(Default)] pub struct PluginQueryDto { pub ids: Option>, pub name: Option, diff --git a/admin/src/model/vo/backend_vo.rs b/admin/src/model/vo/backend_vo.rs index cabc6820..207658b5 100644 --- a/admin/src/model/vo/backend_vo.rs +++ b/admin/src/model/vo/backend_vo.rs @@ -6,7 +6,7 @@ use tardis::web::poem_openapi; /// BackendRef defines how a HTTPRoute should forward an HTTP request. #[derive(Default, Debug, Serialize, Deserialize, Clone, poem_openapi::Object)] -pub struct BackendRefVO { +pub struct SgBackendRefVO { /// unique by id pub id: String, /// Name is the kubernetes service name OR url host. @@ -28,7 +28,7 @@ pub struct BackendRefVO { pub filters: Option>, } -impl Vo for BackendRefVO { +impl Vo for SgBackendRefVO { fn get_vo_type() -> String { constants::BACKEND_REF_TYPE.to_string() } diff --git a/admin/src/model/vo/http_route_vo.rs b/admin/src/model/vo/http_route_vo.rs index 7ba0acfb..ceb1f726 100644 --- a/admin/src/model/vo/http_route_vo.rs +++ b/admin/src/model/vo/http_route_vo.rs @@ -6,7 +6,7 @@ use tardis::web::poem_openapi; /// /// Reference: [Kubernetes Gateway](https://gateway-api.sigs.k8s.io/references/spec/#gateway.networking.k8s.io%2fv1beta1.HTTPRoute) #[derive(Default, Debug, Serialize, Deserialize, Clone, poem_openapi::Object)] -pub struct SgHttpRoute { +pub struct SgHttpRouteVo { /// Name of the HttpRoute. Global Unique. /// /// In k8s mode, this name MUST be unique within a namespace. @@ -29,7 +29,7 @@ pub struct SgHttpRouteRuleVO { pub matches: Option>, /// [crate::model::vo::plugin_vo::SgFilterVO]'s id pub filters: Option>, - /// [crate::model::vo::backend_vo::BackendRefVO]'s id + /// [crate::model::vo::backend_vo::SgBackendRefVO]'s id pub backends: Option>, /// Timeout define the timeout for requests that match this rule. pub timeout_ms: Option, diff --git a/admin/src/model/vo_converter.rs b/admin/src/model/vo_converter.rs index 8b137891..e9572620 100644 --- a/admin/src/model/vo_converter.rs +++ b/admin/src/model/vo_converter.rs @@ -1 +1,14 @@ +use tardis::async_trait::async_trait; +use tardis::basic::result::TardisResult; +pub mod gateway_vo_conv; +pub mod plugin_vo_conv; + +#[async_trait] +pub trait VoConv +where + S: VoConv, +{ + async fn to_model(self) -> TardisResult; + async fn from_model(model: M) -> TardisResult; +} diff --git a/admin/src/model/vo_converter/gateway_vo_conv.rs b/admin/src/model/vo_converter/gateway_vo_conv.rs new file mode 100644 index 00000000..c09ca1db --- /dev/null +++ b/admin/src/model/vo_converter/gateway_vo_conv.rs @@ -0,0 +1,92 @@ +use crate::model::query_dto::PluginQueryDto; +use crate::model::vo::gateway_vo::{SgGatewayVO, SgListenerVO, SgTlsConfigVO}; +use crate::model::vo_converter::VoConv; +use crate::service::base_service::VoBaseService; +use crate::service::plugin_service::PluginServiceVo; +use crate::service::tls_config_service::TlsConfigServiceVo; +use kernel_common::inner_model::gateway::{SgGateway, SgListener, SgTlsConfig}; +use kernel_common::inner_model::plugin_filter::SgRouteFilter; +use tardis::async_trait::async_trait; +use tardis::basic::result::TardisResult; +use tardis::futures_util::future::join_all; +use tardis::TardisFuns; + +#[async_trait] +impl VoConv for SgGatewayVO { + async fn to_model(self) -> TardisResult { + let filters = if let Some(filter_strs) = self.filters { + Some( + join_all( + PluginServiceVo::list(PluginQueryDto { + ids: Some(filter_strs), + ..Default::default() + }) + .await? + .into_iter() + .map(|f| f.to_model()) + .collect::>(), + ) + .await + .into_iter() + .collect::>>()?, + ) + } else { + None + }; + Ok(SgGateway { + name: self.name, + parameters: self.parameters, + listeners: join_all(self.listeners.into_iter().map(|l| l.to_model()).collect::>()).await.into_iter().collect::>>()?, + filters, + }) + } + + async fn from_model(model: SgGateway) -> TardisResult { + todo!() + } +} + +#[async_trait] +impl VoConv for SgListenerVO { + async fn to_model(self) -> TardisResult { + let tls = if let Some(tls_id) = self.tls { + Some(TlsConfigServiceVo::get_by_id(&tls_id).await?.to_model().await?) + } else { + None + }; + + Ok(SgListener { + name: self.name, + ip: self.ip, + port: self.port, + protocol: self.protocol, + tls, + hostname: self.hostname, + }) + } + + async fn from_model(model: SgListener) -> TardisResult { + todo!() + } +} + +#[async_trait] +impl VoConv for SgTlsConfigVO { + async fn to_model(self) -> TardisResult { + Ok(SgTlsConfig { + mode: self.mode, + key: self.key, + cert: self.cert, + }) + } + + async fn from_model(model: SgTlsConfig) -> TardisResult { + Ok(SgTlsConfigVO { + id: TardisFuns::crypto.digest.md5(&format!("{}{}{}", model.mode.clone().to_kube_tls_mode_type().to_string(), model.key, model.cert))?, + mode: model.mode, + key: model.key, + cert: model.cert, + ref_ids: None, + }) + } +} diff --git a/admin/src/model/vo_converter/plugin_vo_conv.rs b/admin/src/model/vo_converter/plugin_vo_conv.rs new file mode 100644 index 00000000..895bb0ee --- /dev/null +++ b/admin/src/model/vo_converter/plugin_vo_conv.rs @@ -0,0 +1,20 @@ +use crate::model::vo::plugin_vo::SgFilterVO; +use crate::model::vo_converter::VoConv; +use kernel_common::inner_model::plugin_filter::SgRouteFilter; +use tardis::async_trait::async_trait; +use tardis::basic::result::TardisResult; + +#[async_trait] +impl VoConv for SgFilterVO { + async fn to_model(self) -> TardisResult { + Ok(SgRouteFilter { + code: self.code, + name: self.name, + spec: self.spec, + }) + } + + async fn from_model(model: SgRouteFilter) -> TardisResult { + todo!() + } +} diff --git a/admin/src/service/backend_ref_service.rs b/admin/src/service/backend_ref_service.rs index a5965922..aee6aad8 100644 --- a/admin/src/service/backend_ref_service.rs +++ b/admin/src/service/backend_ref_service.rs @@ -1,31 +1,31 @@ use crate::model::query_dto::BackendRefQueryDto; -use crate::model::vo::backend_vo::BackendRefVO; -use crate::service::base_service::BaseService; +use crate::model::vo::backend_vo::SgBackendRefVO; +use crate::service::base_service::VoBaseService; use std::process::id; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; use tardis::web::poem::delete; -pub struct BackendRefService; +pub struct BackendRefServiceVo; -impl BaseService<'_, BackendRefVO> for BackendRefService {} +impl VoBaseService for BackendRefServiceVo {} -impl BackendRefService { - pub(crate) async fn list(id: Option, query: BackendRefQueryDto) -> TardisResult> { +impl BackendRefServiceVo { + pub(crate) async fn list(id: Option, query: BackendRefQueryDto) -> TardisResult> { //todo query - Self::get_type_map() + Self::get_str_type_map() .await? .values() .into_iter() .map(|v| serde_json::from_str(v).map_err(|e| TardisError::bad_request(&format!(""), ""))) - .collect::>>() + .collect::>>() } - pub(crate) async fn add(add: BackendRefVO) -> TardisResult<()> { + pub(crate) async fn add(add: SgBackendRefVO) -> TardisResult<()> { Self::add_vo(add).await?; Ok(()) } - pub(crate) async fn update(update: BackendRefVO) -> TardisResult<()> { + pub(crate) async fn update(update: SgBackendRefVO) -> TardisResult<()> { Self::update_vo(update).await?; Ok(()) } diff --git a/admin/src/service/base_service.rs b/admin/src/service/base_service.rs index 449e1076..35e10260 100644 --- a/admin/src/service/base_service.rs +++ b/admin/src/service/base_service.rs @@ -5,19 +5,20 @@ use k8s_openapi::api::core::v1::ConfigMap; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use kube::api::PostParams; use kube::Api; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tardis::async_trait::async_trait; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; -use tardis::log; +use tardis::{log, TardisFuns}; #[async_trait] -pub trait BaseService<'a, T> +pub trait VoBaseService where - T: Vo + Serialize + Deserialize<'a> + Sync + Send, + T: Vo + Serialize + Sync + Send + DeserializeOwned, { - async fn get_type_map() -> TardisResult> { + async fn get_str_type_map() -> TardisResult> { if let Some(t_config) = get_config_map_api().await?.get_opt(&get_config_name::()).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] Kubernetes client error: {e}"), ""))? { @@ -32,27 +33,48 @@ where } } - async fn get_by_id(id: &str) -> TardisResult { - if let Some(t_str) = Self::get_type_map().await?.remove(id) { - Ok(t_str) + async fn get_type_map() -> TardisResult> { + Ok(Self::get_str_type_map().await?.into_iter().map(|(k, v)| Ok((k, TardisFuns::json.str_to_obj::(&v)?))).collect::>>()?) + } + + async fn get_by_id_opt(id: &str) -> TardisResult> { + if let Some(t_str) = Self::get_str_type_map().await?.remove(id) { + Ok(TardisFuns::json.str_to_obj(&t_str)?) + } else { + Ok(None) + } + } + + async fn get_by_id(id: &str) -> TardisResult { + if let Some(t) = Self::get_by_id_opt(id).await? { + Ok(t) } else { - Err(TardisError::not_found("", "")) + Err(TardisError::not_found(&format!("[SG.admin] Get Error: {}:{} not exists", T::get_vo_type(), id), "")) } } #[cfg(feature = "k8s")] - async fn add_vo(config: T) -> TardisResult { + async fn add_vo(config: T) -> TardisResult + where + T: 'async_trait, + { Self::add_or_update_vo(config, true).await } #[cfg(feature = "k8s")] - async fn update_vo(config: T) -> TardisResult { + async fn update_vo(config: T) -> TardisResult + where + T: 'async_trait, + { Self::add_or_update_vo(config, false).await } - async fn add_or_update_vo(config: T, add_only: bool) -> TardisResult { + async fn add_or_update_vo(config: T, add_only: bool) -> TardisResult + where + T: 'async_trait, + { let id = config.get_unique_name(); - let mut datas = Self::get_type_map().await?; + let mut datas = Self::get_str_type_map().await?; if let Some(_) = datas.get(&id) { if add_only { return Err(TardisError::bad_request(&format!("[SG.admin] {}:{} already exists", T::get_vo_type(), id), "")); @@ -87,7 +109,7 @@ where } async fn delete_vo(config_id: &str) -> TardisResult<()> { - let mut datas = Self::get_type_map().await?; + let mut datas = Self::get_str_type_map().await?; if let Some(_) = datas.remove(config_id) { get_config_map_api() .await? @@ -267,17 +289,17 @@ pub async fn get_config_map_api() -> TardisResult> { #[cfg(test)] mod test { - use crate::model::vo::backend_vo::BackendRefVO; + use crate::model::vo::backend_vo::SgBackendRefVO; use crate::model::vo::Vo; - use crate::service::backend_ref_service::BackendRefService; - use crate::service::base_service::BaseService; + use crate::service::backend_ref_service::BackendRefServiceVo; + use crate::service::base_service::VoBaseService; use tardis::tokio; #[tokio::test] #[cfg(feature = "k8s")] #[ignore] async fn k8s_test() { - let mut add_o_1 = BackendRefVO { + let mut add_o_1 = SgBackendRefVO { id: "id34325".to_string(), name_or_host: "backend_name".to_string(), namespace: None, @@ -287,18 +309,18 @@ mod test { weight: None, filters: None, }; - BackendRefService::add_vo(add_o_1.clone()).await.unwrap(); - assert!(BackendRefService::add_vo(add_o_1.clone()).await.is_err()); + BackendRefServiceVo::add_vo(add_o_1.clone()).await.unwrap(); + assert!(BackendRefServiceVo::add_vo(add_o_1.clone()).await.is_err()); - let get_o_1 = serde_json::from_str::(&BackendRefService::get_type_map().await.unwrap().get(&add_o_1.get_unique_name()).unwrap()).unwrap(); + let get_o_1 = serde_json::from_str::(&BackendRefServiceVo::get_str_type_map().await.unwrap().get(&add_o_1.get_unique_name()).unwrap()).unwrap(); assert_eq!(get_o_1.port, add_o_1.port); add_o_1.port = 1832; - BaseService::update_vo(add_o_1.clone()).await.unwrap(); + BackendRefServiceVo::update_vo(add_o_1.clone()).await.unwrap(); - let get_o_1 = serde_json::from_str::(&BackendRefService::get_type_map().await.unwrap().get(&add_o_1.get_unique_name()).unwrap()).unwrap(); + let get_o_1 = serde_json::from_str::(&BackendRefServiceVo::get_str_type_map().await.unwrap().get(&add_o_1.get_unique_name()).unwrap()).unwrap(); assert_eq!(get_o_1.port, add_o_1.port); - BackendRefService::delete_vo(&add_o_1.get_unique_name()).await.unwrap(); + BackendRefServiceVo::delete_vo(&add_o_1.get_unique_name()).await.unwrap(); } } diff --git a/admin/src/service/gateway_service.rs b/admin/src/service/gateway_service.rs index 52ca229f..11eb4517 100644 --- a/admin/src/service/gateway_service.rs +++ b/admin/src/service/gateway_service.rs @@ -3,10 +3,12 @@ use crate::helper::get_k8s_client; use crate::model::query_dto::GatewayQueryDto; #[cfg(feature = "k8s")] use crate::model::ToFields; +use std::clone; +use std::process::id; use crate::model::vo::gateway_vo::SgGatewayVO; use crate::model::vo::Vo; -use crate::service::plugin_service::PluginService; +use crate::service::plugin_service::PluginServiceVo; #[cfg(feature = "k8s")] use k8s_gateway_api::Gateway; #[cfg(feature = "k8s")] @@ -22,59 +24,71 @@ use kube::api::{DeleteParams, PostParams}; #[cfg(feature = "k8s")] use kube::{api::ListParams, Api, ResourceExt}; +use crate::model::vo_converter::VoConv; +use kernel_common::helper::k8s_helper::{parse_k8s_obj_unique, WarpKubeResult}; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; -use tardis::futures_util::future::join_all; -use super::base_service::BaseService; +use super::base_service::VoBaseService; -pub struct GatewayService; +pub struct GatewayServiceVo; -impl BaseService<'_, SgGatewayVO> for GatewayService {} +impl VoBaseService for GatewayServiceVo {} + +impl GatewayServiceVo { + pub async fn add(add: SgGatewayVO) -> TardisResult { + #[cfg(feature = "k8s")] + { + let (namespace, name) = parse_k8s_obj_unique(&add.get_unique_name()); + let (gateway, secrets, sgfilters) = add.clone().to_model().await?.to_kube_gateway(&namespace); + + let (gateway_api, secret_api): (Api, Api) = + (Api::namespaced(get_k8s_client().await?, &namespace), Api::namespaced(get_k8s_client().await?, &namespace)); + + let result_gateway = gateway_api.create(&PostParams::default(), &gateway).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] error:{e}"), ""))?; + + for mut s in secrets { + s.metadata.owner_references = Some(vec![OwnerReference { + api_version: "gateway.networking.k8s.io/v1beta1".to_string(), + kind: "Gateway".to_string(), + name: result_gateway.name_any(), + uid: result_gateway.uid().expect("Can not get create gateway uid"), + ..Default::default() + }]); + secret_api.create(&PostParams::default(), &s).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] error:{e}"), ""))?; + } + + PluginServiceVo::add_sgfilter_vec(sgfilters).await?; + } + Self::add_vo(add).await + } -impl GatewayService { pub async fn update_by_id(id: &str) -> TardisResult<()> { - if let Some(gateway_str) = Self::get_type_map().await?.remove(id) { - let gateway_o = serde_json::from_str(&gateway_str) - .map_err(|e| TardisError::bad_request(&format!("[SG.admin] Deserialization {}:{} failed:{e}", SgGatewayVO::get_vo_type(), id), ""))?; - GatewayService::update(gateway_o).await?; - } else { - return Err(TardisError::not_found("", "")); - }; - Ok(()) + let gateway_o = Self::get_by_id(&id).await?; + GatewayServiceVo::update(gateway_o).await } pub async fn update(update: SgGatewayVO) -> TardisResult<()> { - if let Some(old_gateway_str) = Self::get_type_map().await?.remove(&update.get_unique_name()) { - } else { - return Err(TardisError::not_found("", "")); - }; - + let gateway_o = Self::get_by_id(&update.get_unique_name()).await?; + //todo Ok(()) } - pub async fn add(add: SgGatewayVO) -> TardisResult<()> { - Self::add_vo(add).await?; + #[cfg(feature = "k8s")] + pub async fn delete(id: &str) -> TardisResult<()> { + let (namespace, name) = parse_k8s_obj_unique(id); + let gateway_api: Api = Self::get_gateway_api(&Some(namespace)).await?; - Ok(()) - } + gateway_api.delete(&name, &DeleteParams::default()).await.warp_result_by_method("Delete Gateway")?; - pub async fn delete(id: &str) -> TardisResult<()> { - // Self::delete_vo(id).await?; Ok(()) } - pub async fn delete(namespace: Option, name: &str) -> TardisResult<()> { - #[cfg(feature = "k8s")] - { - let gateway_api: Api = Self::get_gateway_api(&namespace).await?; - gateway_api.delete(name, &DeleteParams::default()).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] delete error:{e}"), ""))?; - } - #[cfg(not(feature = "k8s"))] - {} - Ok(()) + #[cfg(feature = "k8s")] + async fn get_gateway_api(namespace: &Option) -> TardisResult> { + Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) } } //old version @@ -115,27 +129,7 @@ impl GatewayService { // { // let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string()); -// let (gateway_api, secret_api): (Api, Api) = -// (Api::namespaced(get_k8s_client().await?, &namespace), Api::namespaced(get_k8s_client().await?, &namespace)); - -// let (gateway, secrets, sgfilters) = add.to_kube_gateway(&namespace); - -// let result_gateway = gateway_api.create(&PostParams::default(), &gateway).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] error:{e}"), ""))?; - -// for mut s in secrets { -// s.metadata.owner_references = Some(vec![OwnerReference { -// api_version: "gateway.networking.k8s.io/v1beta1".to_string(), -// kind: "Gateway".to_string(), -// name: result_gateway.name_any(), -// uid: result_gateway.uid().expect("Can not get create gateway uid"), -// ..Default::default() -// }]); -// secret_api.create(&PostParams::default(), &s).await.map_err(|e| TardisError::io_error(&format!("[SG.admin] error:{e}"), ""))?; -// } - -// PluginService::add_sgfilter_vec(sgfilters).await?; - -// result = Self::kube_to(vec![result_gateway]).await?.remove(0); +// // } // #[cfg(not(feature = "k8s"))] // { @@ -238,8 +232,3 @@ impl GatewayService { // Ok(result) // } // } - -#[cfg(feature = "k8s")] -async fn get_gateway_api(namespace: &Option) -> TardisResult> { - Ok(Api::namespaced(get_k8s_client().await?, namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string()))) -} diff --git a/admin/src/service/plugin_service.rs b/admin/src/service/plugin_service.rs index 29282ab9..c39f213e 100644 --- a/admin/src/service/plugin_service.rs +++ b/admin/src/service/plugin_service.rs @@ -2,29 +2,38 @@ use crate::helper::get_k8s_client; use crate::model::query_dto::PluginQueryDto; use crate::model::vo::plugin_vo::SgFilterVO; -use crate::service::base_service::BaseService; +use crate::service::base_service::VoBaseService; #[cfg(feature = "k8s")] use kernel_common::constants::DEFAULT_NAMESPACE; +use kernel_common::converter::plugin_k8s_conv::SgSingeFilter; +use kernel_common::helper::k8s_helper::WarpKubeResult; #[cfg(feature = "k8s")] use kernel_common::k8s_crd::sg_filter::SgFilter; +use kube::api::{ListParams, PostParams}; #[cfg(feature = "k8s")] use kube::Api; -use tardis::basic::error::TardisError; +use kube::ResourceExt; +use std::collections::HashMap; use tardis::basic::result::TardisResult; -pub struct PluginService; +pub struct PluginServiceVo; -impl BaseService<'_, SgFilterVO> for PluginService {} +impl VoBaseService for PluginServiceVo {} -impl PluginService { +impl PluginServiceVo { pub(crate) async fn list(query: PluginQueryDto) -> TardisResult> { //todo query - Self::get_type_map() + Ok(Self::get_type_map() .await? - .values() + .into_values() .into_iter() - .map(|v| serde_json::from_str(v).map_err(|e| TardisError::bad_request(&format!(""), ""))) - .collect::>>() + .filter(|f| if let Some(ids) = &query.ids { ids.contains(&f.id) } else { true } + && query.name.eq(&f.name) + && if let Some(code) = &query.code { + code.eq(&f.code) + }else { true } + ) + .collect::>()) } pub(crate) async fn add(add: SgFilterVO) -> TardisResult<()> { @@ -41,34 +50,34 @@ impl PluginService { Ok(()) } - // #[cfg(feature = "k8s")] - // pub async fn add_sgfilter_vec(sgfilters: Vec) -> TardisResult<()> { - // let mut filter_map = HashMap::new(); - // for sf in sgfilters { - // let filter_api: Api = Self::get_filter_api(&Some(sf.namespace.clone())).await?; + #[cfg(feature = "k8s")] + pub async fn add_sgfilter_vec(sgfilters: Vec) -> TardisResult<()> { + let mut filter_map = HashMap::new(); + for sf in sgfilters { + let filter_api: Api = Self::get_filter_api(&Some(sf.namespace.clone())).await?; - // let namespace_filter = if let Some(filter_list) = filter_map.get(&sf.namespace) { - // filter_list - // } else { - // let filter_list = filter_api.list(&ListParams::default()).await.warp_result_by_method("list")?; - // filter_map.insert(sf.namespace.clone(), filter_list); - // filter_map.get(&sf.namespace).expect("") - // }; + let namespace_filter = if let Some(filter_list) = filter_map.get(&sf.namespace) { + filter_list + } else { + let filter_list = filter_api.list(&ListParams::default()).await.warp_result_by_method("list")?; + filter_map.insert(sf.namespace.clone(), filter_list); + filter_map.get(&sf.namespace).expect("") + }; - // if let Some(mut query_sf) = namespace_filter.items.clone().into_iter().find(|f| f.spec.filters.iter().any(|qsf| qsf.code == sf.filter.code)) { - // if query_sf.spec.target_refs.iter().any(|t_r| t_r == &sf.target_ref) { - // //存在 - // } else { - // query_sf.spec.target_refs.push(sf.target_ref); - // filter_api.replace(&query_sf.name_any(), &PostParams::default(), &query_sf).await.warp_result_by_method("replace")?; - // } - // } else { - // filter_api.create(&PostParams::default(), &sf.to_sg_filter()).await.warp_result_by_method("create")?; - // } - // } + if let Some(mut query_sf) = namespace_filter.items.clone().into_iter().find(|f| f.spec.filters.iter().any(|qsf| qsf.code == sf.filter.code)) { + if query_sf.spec.target_refs.iter().any(|t_r| t_r == &sf.target_ref) { + //存在 + } else { + query_sf.spec.target_refs.push(sf.target_ref); + filter_api.replace(&query_sf.name_any(), &PostParams::default(), &query_sf).await.warp_result_by_method("replace")?; + } + } else { + filter_api.create(&PostParams::default(), &sf.to_sg_filter()).await.warp_result_by_method("create")?; + } + } - // Ok(()) - // } + Ok(()) + } #[cfg(feature = "k8s")] #[inline] diff --git a/admin/src/service/tls_config_service.rs b/admin/src/service/tls_config_service.rs index b9ab791e..dc779999 100644 --- a/admin/src/service/tls_config_service.rs +++ b/admin/src/service/tls_config_service.rs @@ -1,20 +1,20 @@ use crate::model::query_dto::SgTlsConfigQueryVO; use crate::model::vo::gateway_vo::SgTlsConfigVO; use crate::model::vo::Vo; -use crate::service::base_service::BaseService; +use crate::service::base_service::VoBaseService; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; -use super::gateway_service::GatewayService; +use super::gateway_service::GatewayServiceVo; -pub struct TlsConfigService; +pub struct TlsConfigServiceVo; -impl BaseService<'_, SgTlsConfigVO> for TlsConfigService {} +impl VoBaseService for TlsConfigServiceVo {} -impl TlsConfigService { +impl TlsConfigServiceVo { pub(crate) async fn list(query: SgTlsConfigQueryVO) -> TardisResult> { //todo query - Self::get_type_map() + Self::get_str_type_map() .await? .values() .into_iter() @@ -29,12 +29,12 @@ impl TlsConfigService { pub(crate) async fn update(update: SgTlsConfigVO) -> TardisResult<()> { let unique_name = update.get_unique_name(); - if let Some(o_str) = Self::get_type_map().await?.remove(&unique_name) { + if let Some(o_str) = Self::get_str_type_map().await?.remove(&unique_name) { let mut o: SgTlsConfigVO = serde_json::from_str(&o_str) .map_err(|e| TardisError::bad_request(&format!("[SG.admin] Deserialization {}:{} failed:{e}", SgTlsConfigVO::get_vo_type(), unique_name), ""))?; if let Some(ids) = o.ref_ids { for ref_id in ids { - GatewayService::update_by_id(&ref_id).await?; + GatewayServiceVo::update_by_id(&ref_id).await?; } } Self::update_vo(update).await?; @@ -46,7 +46,7 @@ impl TlsConfigService { pub(crate) async fn add_ref_ids(id: &str, ref_ids: &[String]) -> TardisResult<()> { let mut ref_ids = ref_ids.to_vec(); - if let Some(o_str) = Self::get_type_map().await?.remove(id) { + if let Some(o_str) = Self::get_str_type_map().await?.remove(id) { let mut o: SgTlsConfigVO = serde_json::from_str(&o_str).map_err(|e| TardisError::bad_request(&format!("[SG.admin] Deserialization {}:{id} failed:{e}", SgTlsConfigVO::get_vo_type()), ""))?; if let Some(ids) = &mut o.ref_ids { diff --git a/kernel-common/src/converter/gateway_k8s_conv.rs b/kernel-common/src/converter/gateway_k8s_conv.rs index 783924d4..bfa2fa81 100644 --- a/kernel-common/src/converter/gateway_k8s_conv.rs +++ b/kernel-common/src/converter/gateway_k8s_conv.rs @@ -1,12 +1,18 @@ -use crate::constants::GATEWAY_CLASS_NAME; +use crate::constants::{DEFAULT_NAMESPACE, GATEWAY_CLASS_NAME}; use crate::converter::plugin_k8s_conv::SgSingeFilter; -use crate::inner_model::gateway::{SgGateway, SgParameters, SgTlsConfig, SgTlsMode}; +use crate::helper::k8s_helper::get_k8s_client; +use crate::inner_model::gateway::{SgGateway, SgListener, SgParameters, SgProtocol, SgTlsConfig, SgTlsMode}; use crate::k8s_crd::sg_filter::{K8sSgFilterSpecFilter, K8sSgFilterSpecTargetRef}; use k8s_gateway_api::{Gateway, GatewaySpec, GatewayTlsConfig, Listener, SecretObjectReference, TlsModeType}; use k8s_openapi::api::core::v1::Secret; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::ByteString; +use kube::{Api, ResourceExt}; use std::collections::BTreeMap; +use std::str::FromStr; +use tardis::basic::error::TardisError; +use tardis::basic::result::TardisResult; +use tardis::futures_util::future::join_all; use tardis::TardisFuns; impl SgGateway { @@ -32,8 +38,8 @@ impl SgGateway { hostname: l.hostname, port: l.port, protocol: l.protocol.to_string(), - tls: l.tls.map(|l| { - let (tls_config, secret) = l.to_kube_tls(namespace); + tls: l.tls.map(|tls| { + let (tls_config, secret) = tls.to_kube_tls(namespace); secrets.push(secret); tls_config }), @@ -70,28 +76,56 @@ impl SgGateway { (gateway, secrets, sgfilters) } -} -impl SgParameters { - pub fn from_kube_gateway(gateway: &Gateway) -> Self { - let gateway_annotations = gateway.metadata.annotations.clone(); - if let Some(gateway_annotations) = gateway_annotations { - SgParameters { - redis_url: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_REDIS_URL).map(|v| v.to_string()), - log_level: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_LOG_LEVEL).map(|v| v.to_string()), - lang: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_LANGUAGE).map(|v| v.to_string()), - ignore_tls_verification: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_IGNORE_TLS_VERIFICATION).and_then(|v| v.parse::().ok()), - } - } else { - SgParameters { - redis_url: None, - log_level: None, - lang: None, - ignore_tls_verification: None, - } - } + pub async fn from_kube_gateway(gateway: Gateway) -> TardisResult { + //todo filters + let filters = None; + let result = SgGateway { + name: gateway.name_any(), + parameters: SgParameters::from_kube_gateway(&gateway), + listeners: join_all( + gateway + .spec + .listeners + .into_iter() + .map(|listener| async move { + let tls = match listener.tls { + Some(tls) => SgTlsConfig::from_kube_tls(tls).await?, + None => None, + }; + let sg_listener = SgListener { + name: listener.name, + ip: None, + port: listener.port, + protocol: match listener.protocol.to_lowercase().as_str() { + "http" => SgProtocol::Http, + "https" => SgProtocol::Https, + "ws" => SgProtocol::Ws, + _ => { + return Err(TardisError::not_implemented( + &format!("[SG.Config] Gateway [spec.listener.protocol={}] not supported yet", listener.protocol), + "", + )) + } + }, + tls, + hostname: listener.hostname, + }; + Ok(sg_listener) + }) + .collect::>(), + ) + .await + .into_iter() + .map(|listener| listener.expect("[SG.Config] Unexpected none: listener")) + .collect(), + filters, + }; + Ok(result) } +} +impl SgParameters { pub(crate) fn to_kube_gateway(self) -> BTreeMap { let mut ann = BTreeMap::new(); if let Some(redis_url) = self.redis_url { @@ -111,6 +145,25 @@ impl SgParameters { } ann } + + pub fn from_kube_gateway(gateway: &Gateway) -> Self { + let gateway_annotations = gateway.metadata.annotations.clone(); + if let Some(gateway_annotations) = gateway_annotations { + SgParameters { + redis_url: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_REDIS_URL).map(|v| v.to_string()), + log_level: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_LOG_LEVEL).map(|v| v.to_string()), + lang: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_LANGUAGE).map(|v| v.to_string()), + ignore_tls_verification: gateway_annotations.get(crate::constants::GATEWAY_ANNOTATION_IGNORE_TLS_VERIFICATION).and_then(|v| v.parse::().ok()), + } + } else { + SgParameters { + redis_url: None, + log_level: None, + lang: None, + ignore_tls_verification: None, + } + } + } } impl SgTlsConfig { @@ -149,13 +202,53 @@ impl SgTlsConfig { }, ) } + + pub async fn from_kube_tls(tls: GatewayTlsConfig) -> TardisResult> { + let certificate_ref = tls + .certificate_refs + .as_ref() + .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is required", ""))? + .get(0) + .ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is empty", ""))?; + let secret_api: Api = Api::namespaced(get_k8s_client().await?, certificate_ref.namespace.as_ref().unwrap_or(&DEFAULT_NAMESPACE.to_string())); + let result = if let Some(secret_obj) = + secret_api.get_opt(&certificate_ref.name).await.map_err(|error| TardisError::wrap(&format!("[SG.Config] Kubernetes error: {error:?}"), ""))? + { + let secret_data = + secret_obj.data.ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data is required", certificate_ref.name), ""))?; + let tls_crt = secret_data + .get("tls.crt") + .ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.crt] is required", certificate_ref.name), ""))?; + let tls_key = secret_data + .get("tls.key") + .ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.key] is required", certificate_ref.name), ""))?; + Some(SgTlsConfig { + mode: SgTlsMode::from(tls.mode).unwrap_or_default(), + key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"), + cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"), + }) + } else { + TardisError::not_found(&format!("[SG.admin] Gateway have tls secret [{}], but not found!", certificate_ref.name), ""); + None + }; + Ok(result) + } } impl SgTlsMode { - pub(crate) fn to_kube_tls_mode_type(self) -> TlsModeType { + pub fn to_kube_tls_mode_type(self) -> TlsModeType { match self { SgTlsMode::Terminate => "Terminate".to_string(), SgTlsMode::Passthrough => "Passthrough".to_string(), } } + + pub fn from(mode: Option) -> Option { + if let Some(mode) = mode { + if let Ok(mode) = SgTlsMode::from_str(&mode) { + return Some(mode); + } + } + None + } } diff --git a/kernel-common/src/helper/k8s_helper.rs b/kernel-common/src/helper/k8s_helper.rs index 7692e6e2..8491fc39 100644 --- a/kernel-common/src/helper/k8s_helper.rs +++ b/kernel-common/src/helper/k8s_helper.rs @@ -1,3 +1,4 @@ +use kube::Client; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; @@ -31,3 +32,8 @@ impl WarpKubeResult for kube::Result { self.map_err(|e| TardisError::wrap(&format!("[SG.kube] kubernetes api [{method}] error:{e}"), "")) } } + +//todo 可配置化 request merge with admin::helper::get_k8s_client() +pub async fn get_k8s_client() -> TardisResult { + Client::try_default().await.map_err(|error| TardisError::wrap(&format!("[SG.admin] Get kubernetes client error: {error:?}"), "")) +} diff --git a/kernel-common/src/inner_model/gateway.rs b/kernel-common/src/inner_model/gateway.rs index 3f6aaa72..16c50ca7 100644 --- a/kernel-common/src/inner_model/gateway.rs +++ b/kernel-common/src/inner_model/gateway.rs @@ -116,14 +116,3 @@ impl FromStr for SgTlsMode { } } } - -impl SgTlsMode { - pub fn from(mode: Option) -> Option { - if let Some(mode) = mode { - if let Ok(mode) = SgTlsMode::from_str(&mode) { - return Some(mode); - } - } - None - } -}