Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
RWDai committed Oct 30, 2023
1 parent 4621de5 commit d4ad569
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 54 deletions.
19 changes: 16 additions & 3 deletions admin/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,23 @@ pub fn fuzzy_regex(query: impl AsRef<str>) -> TardisResult<Regex> {
Ok(Regex::new(&format!("^{}$", query))?.into())
}

pub fn find_add_delete(new: Vec<String>, old: Vec<String>) -> (Vec<String>, Vec<String>) {
let add: Vec<String> = new.iter().filter(|item| !old.contains(item)).cloned().collect();
pub fn find_add_delete<T>(new: Vec<T>, old: Vec<T>) -> (Vec<T>, Vec<T>)
where
T: std::cmp::PartialEq + std::clone::Clone,
{
if new.is_empty() && old.is_empty() {
return (vec![], vec![]);
}
if new.is_empty() {
return (vec![], old);
}
if old.is_empty() {
return (new, vec![]);
}

let add: Vec<T> = new.iter().filter(|item| !old.contains(item)).cloned().collect();

let delete: Vec<String> = old.into_iter().filter(|item| !new.contains(item)).collect();
let delete: Vec<T> = old.into_iter().filter(|item| !new.contains(item)).collect();

(add, delete)
}
Expand Down
4 changes: 2 additions & 2 deletions admin/src/model/vo/gateway_vo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct SgListenerVO {
/// unique by id
#[derive(Debug, Serialize, Deserialize, Clone, poem_openapi::Object)]
pub struct SgTlsConfigVO {
pub id: String,
pub name: String,
pub mode: SgTlsMode,
pub key: String,
pub cert: String,
Expand All @@ -68,6 +68,6 @@ impl Vo for SgTlsConfigVO {
}

fn get_unique_name(&self) -> String {
self.id.clone()
self.name.clone()
}
}
3 changes: 2 additions & 1 deletion admin/src/model/vo_converter/gateway_vo_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl VoConv<SgListener, SgListenerVO> for SgListenerVO {
impl VoConv<SgTlsConfig, SgTlsConfigVO> for SgTlsConfigVO {
async fn to_model(self) -> TardisResult<SgTlsConfig> {
Ok(SgTlsConfig {
name: self.name,
mode: self.mode,
key: self.key,
cert: self.cert,
Expand All @@ -82,7 +83,7 @@ impl VoConv<SgTlsConfig, SgTlsConfigVO> for SgTlsConfigVO {

async fn from_model(model: SgTlsConfig) -> TardisResult<SgTlsConfigVO> {
Ok(SgTlsConfigVO {
id: TardisFuns::crypto.digest.md5(&format!("{}{}{}", model.mode.clone().to_kube_tls_mode_type().to_string(), model.key, model.cert))?,
name: model.name,
mode: model.mode,
key: model.key,
cert: model.cert,
Expand Down
46 changes: 40 additions & 6 deletions admin/src/service/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::model::ToFields;
use std::clone;
use std::process::id;

use crate::model::vo::gateway_vo::SgGatewayVO;
use crate::model::vo::gateway_vo::{SgGatewayVO, SgTlsConfigVO};
use crate::model::vo::Vo;
use crate::service::plugin_service::PluginVoService;
#[cfg(feature = "k8s")]
Expand All @@ -24,13 +24,14 @@ use kube::api::{DeleteParams, PostParams};
#[cfg(feature = "k8s")]
use kube::{api::ListParams, Api, ResourceExt};

use crate::helper::find_add_delete;
use crate::model::vo_converter::VoConv;
use kernel_common::helper::k8s_helper::{parse_k8s_obj_unique, WarpKubeResult};
use tardis::basic::error::TardisError;
use tardis::basic::result::TardisResult;
use crate::helper::find_add_delete;

use super::base_service::VoBaseService;
use super::tls_config_service::TlsConfigVoService;

pub struct GatewayVoService;

Expand All @@ -48,10 +49,11 @@ impl GatewayVoService {
.collect())
}
pub async fn add(add: SgGatewayVO) -> TardisResult<SgGatewayVO> {
let add_model = add.clone().to_model().await?;
#[cfg(feature = "k8s")]
{
let (namespace, _) = parse_k8s_obj_unique(&add.get_unique_name());
let (gateway, secrets, sgfilters) = add.clone().to_model().await?.to_kube_gateway();
let (gateway, secrets, sgfilters) = add_model.to_kube_gateway();

let (gateway_api, secret_api): (Api<Gateway>, Api<Secret>) =
(Api::namespaced(get_k8s_client().await?, &namespace), Api::namespaced(get_k8s_client().await?, &namespace));
Expand Down Expand Up @@ -80,9 +82,41 @@ impl GatewayVoService {
}

pub async fn update(update: SgGatewayVO) -> TardisResult<()> {
let old_gateway = Self::get_by_id(&update.get_unique_name()).await?;
let (add,delete)=find_add_delete(update.);
//todo
let update_un = &update.get_unique_name();
let old_gateway = Self::get_by_id(update_un).await?;
let (add_tls, delete_tls) = find_add_delete(
update.listeners.iter().map(|l| l.tls.clone().unwrap_or_default()).filter(|tls| !tls.is_empty()).collect(),
old_gateway.listeners.iter().map(|l| l.tls.clone().unwrap_or_default()).filter(|tls| !tls.is_empty()).collect(),
);
Self::add_tls_config(update_un, add_tls).await?;
Self::delete_tls_config(update_un, delete_tls).await?;

let (add, delete) = find_add_delete(update.filters.clone().unwrap_or_default(), old_gateway.filters.unwrap_or_default());

let update_sg_gateway = update.clone().to_model().await?;
#[cfg(feature = "k8s")]
{
let (namespace, name) = parse_k8s_obj_unique(update_un);
let (gateway_api, secret_api): (Api<Gateway>, Api<Secret>) =
(Api::namespaced(get_k8s_client().await?, &namespace), Api::namespaced(get_k8s_client().await?, &namespace));
let (update_gateway, update_secret, update_filter) = update_sg_gateway.to_kube_gateway();
gateway_api.replace(&name, &PostParams::default(), &update_gateway).await.warp_result_by_method("Replace Gateway")?;
}
Self::update_vo(update).await?;
Ok(())
}

pub async fn add_tls_config(id: &str, adds: Vec<String>) -> TardisResult<()> {
for add in adds {
TlsConfigVoService::modify_ref_ids(&add, id, false);
}
Ok(())
}

pub async fn delete_tls_config(id: &str, deletes: Vec<String>) -> TardisResult<()> {
for delete in deletes {
TlsConfigVoService::modify_ref_ids(&delete, id, true);
}
Ok(())
}

Expand Down
23 changes: 17 additions & 6 deletions admin/src/service/tls_config_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::model::query_dto::SgTlsConfigQueryVO;
use crate::model::vo::gateway_vo::SgTlsConfigVO;
use crate::model::vo::Vo;
use crate::model::vo_converter::VoConv;
use crate::service::base_service::VoBaseService;
use tardis::basic::error::TardisError;
use tardis::basic::result::TardisResult;
Expand All @@ -23,6 +24,9 @@ impl TlsConfigVoService {
}

pub(crate) async fn add(add: SgTlsConfigVO) -> TardisResult<()> {
let add_model = add.clone().to_model().await?;
#[cfg(feature = "k8s")]
{}
Self::add_vo(add).await?;
Ok(())
}
Expand All @@ -44,20 +48,27 @@ impl TlsConfigVoService {
Ok(())
}

pub(crate) async fn add_ref_ids(id: &str, ref_ids: &[String]) -> TardisResult<()> {
let mut ref_ids = ref_ids.to_vec();
/// delete:true means delete, false means add
pub(crate) async fn modify_ref_ids(id: &str, ref_id: &str, delete: bool) -> TardisResult<()> {
if let Some(o_str) = Self::get_str_type_map().await?.remove(id) {
let mut o: SgTlsConfigVO =
serde_json::from_str(&o_str).map_err(|e| TardisError::bad_request(&format!("[SG.admin] Deserialization {}:{id} failed:{e}", SgTlsConfigVO::get_vo_type()), ""))?;
if let Some(ids) = &mut o.ref_ids {
ref_ids.append(ids);
o.ref_ids = Some(ref_ids);
if delete {
ids.retain(|id| id != ref_id);
} else {
ids.push(ref_id.to_string());
}
} else {
o.ref_ids = Some(ref_ids);
if delete {
return Err(TardisError::not_found("delete failed", ""));
} else {
o.ref_ids = Some(vec![ref_id.to_string()]);
}
}
Self::update_vo(o).await?;
} else {
return Err(TardisError::not_found("", ""));
return Err(TardisError::not_found(&format!("can not find tls:{id}"), ""));
};
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions kernel-common/src/converter/gateway_k8s_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl SgTlsConfig {
let result = if let Some(secret_obj) =
secret_api.get_opt(&certificate_ref.name).await.map_err(|error| TardisError::wrap(&format!("[SG.Config] Kubernetes error: {error:?}"), ""))?
{
let secret_un = get_k8s_obj_unique(&secret_obj);
let secret_data =
secret_obj.data.ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data is required", certificate_ref.name), ""))?;
let tls_crt = secret_data
Expand All @@ -225,6 +226,7 @@ impl SgTlsConfig {
.get("tls.key")
.ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.key] is required", certificate_ref.name), ""))?;
Some(SgTlsConfig {
name: secret_un,
mode: SgTlsMode::from(tls.mode).unwrap_or_default(),
key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"),
cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"),
Expand Down
5 changes: 5 additions & 0 deletions kernel-common/src/inner_model/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ impl Display for SgProtocol {
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "admin-support", derive(poem_openapi::Object))]
pub struct SgTlsConfig {
/// Name of the Secret. Global Unique.
///
/// In k8s mode, this name MUST be unique within a namespace.
/// format see [k8s_helper::format_k8s_obj_unique]
pub name: String,
pub mode: SgTlsMode,
pub key: String,
pub cert: String,
Expand Down
54 changes: 18 additions & 36 deletions kernel/src/config/config_by_k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ pub async fn init(namespaces: Option<String>) -> TardisResult<Vec<(SgGateway, Ve
});

async fn watch_http_spaceroute(http_route_obj: HttpSpaceroute, http_route_objs_versions: &HashMap<String, String>, http_route_apis: (&Api<HttpSpaceroute>, &Api<HttpRoute>)) {
log::trace!("[SG.Config] http_route config watch tiger. name:{}", k8s_helper::get_k8s_obj_unique(&http_route_obj));
log::trace!(
"[SG.Config] http_route config watch tiger. name:{}",
kernel_common::helper::k8s_helper::get_k8s_obj_unique(&http_route_obj)
);
if http_route_objs_versions.get(http_route_obj.metadata.uid.as_ref().unwrap_or(&"".to_string())).unwrap_or(&"".to_string())
== http_route_obj.metadata.resource_version.as_ref().unwrap_or(&"".to_string())
{
Expand Down Expand Up @@ -166,7 +169,10 @@ pub async fn init(namespaces: Option<String>) -> TardisResult<Vec<(SgGateway, Ve
return;
};

log::debug!("[SG.Config] Http route:{} config change found", k8s_helper::get_k8s_obj_unique(&http_route_obj));
log::debug!(
"[SG.Config] Http route:{} config change found",
kernel_common::helper::k8s_helper::get_k8s_obj_unique(&http_route_obj)
);

overload_http_route(gateway_obj, (&http_route_apis.0, &http_route_apis.1)).await;
}
Expand Down Expand Up @@ -212,7 +218,10 @@ pub async fn init(namespaces: Option<String>) -> TardisResult<Vec<(SgGateway, Ve
let ew = watcher::watcher(filter_api.clone(), watcher::Config::default()).touched_objects();
pin_mut!(ew);
while let Some(filter_obj) = ew.try_next().await.unwrap_or_default() {
log::trace!("[SG.Config] filter_api config watch tiger. name:{}", k8s_helper::get_k8s_obj_unique(&filter_obj));
log::trace!(
"[SG.Config] filter_api config watch tiger. name:{}",
kernel_common::helper::k8s_helper::get_k8s_obj_unique(&filter_obj)
);
if sg_filter_objs_versions.get(filter_obj.metadata.uid.as_ref().unwrap_or(&"".to_string())).unwrap_or(&"".to_string())
== filter_obj.metadata.resource_version.as_ref().unwrap_or(&"".to_string())
{
Expand Down Expand Up @@ -318,7 +327,10 @@ pub async fn init(namespaces: Option<String>) -> TardisResult<Vec<(SgGateway, Ve
continue;
}

log::trace!("[SG.Config] SgFilter config:{} change found", k8s_helper::get_k8s_obj_unique(&filter_obj));
log::trace!(
"[SG.Config] SgFilter config:{} change found",
kernel_common::helper::k8s_helper::get_k8s_obj_unique(&filter_obj)
);

let http_route_api = (
&Api::all(get_client().await.expect("[SG.Config] Failed to get client")),
Expand Down Expand Up @@ -546,38 +558,7 @@ async fn process_gateway_config(gateway_objs: Vec<Gateway>) -> TardisResult<Vec<
.listeners
.into_iter()
.map(|listener| async move {
let tls = match listener.tls {
Some(tls) => {
let certificate_ref = tls
.certificate_refs
.as_ref()
.ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is required", ""))?
.get(0)
.ok_or_else(|| TardisError::format_error("[SG.Config] Gateway [spec.listener.tls.certificateRefs] is empty", ""))?;
let secret_api: Api<Secret> = if let Some(namespace) = &certificate_ref.namespace {
Api::namespaced(get_client().await?, namespace)
} else {
Api::all(get_client().await?)
};
let secret_obj =
secret_api.get(&certificate_ref.name).await.map_err(|error| TardisError::wrap(&format!("[SG.Config] Kubernetes error: {error:?}"), ""))?;
let secret_data = secret_obj
.data
.ok_or_else(|| TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data is required", certificate_ref.name), ""))?;
let tls_crt = secret_data.get("tls.crt").ok_or_else(|| {
TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.crt] is required", certificate_ref.name), "")
})?;
let tls_key = secret_data.get("tls.key").ok_or_else(|| {
TardisError::format_error(&format!("[SG.Config] Gateway tls secret [{}] data [tls.key] is required", certificate_ref.name), "")
})?;
Some(SgTlsConfig {
mode: SgTlsMode::from(tls.mode).unwrap_or_default(),
key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"),
cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"),
})
}
None => None,
};
let tls = if let Some(tls) = listener.tls { SgTlsConfig::from_kube_tls(tls).await? } else { None };
let sg_listener = SgListener {
name: listener.name,
ip: None,
Expand Down Expand Up @@ -958,6 +939,7 @@ fn convert_filters(filters: Option<Vec<HttpRouteFilter>>) -> Option<Vec<SgRouteF
.map(|filters| filters.into_iter().map(|filter| filter.expect("Unreachable code")).collect_vec())
}

//todo move to common
fn convert_to_kube_filters(filters: Option<Vec<SgRouteFilter>>) -> TardisResult<Option<Vec<HttpRouteFilter>>> {
filters
.map(|filters| {
Expand Down

0 comments on commit d4ad569

Please sign in to comment.