Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
RWDai committed May 7, 2024
1 parent 0eaff0d commit 1c2b345
Show file tree
Hide file tree
Showing 16 changed files with 393 additions and 209 deletions.
2 changes: 2 additions & 0 deletions binary/admin-server/src/service/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use axum::{
use serde_json::Value;
use spacegate_config::{service::Discovery, BoxError, PluginAttributes};
use tokio::{sync::RwLock, time::Instant};
use tracing::info;

static ATTR_CACHE: OnceLock<RwLock<HashMap<String, PluginAttributes>>> = OnceLock::new();

Expand All @@ -24,6 +25,7 @@ async fn sync_attr_cache<B: Discovery>(backend: &B, refresh: bool) -> Result<(),
drop(next_sync);
let mut cache = ATTR_CACHE.get_or_init(Default::default).write().await;
if let Some(remote) = backend.api_url().await? {
info!("refresh plugin attr from: {}", remote);
let attrs = <dyn Instance>::plugin_list(&remote).await?;
cache.clear();
cache.extend(attrs.into_iter().map(|attr| (attr.code.to_string(), attr)));
Expand Down
23 changes: 21 additions & 2 deletions crates/config/src/service/k8s/convert/filter_k8s_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use k8s_gateway_api::{HttpHeader, HttpPathModifier, HttpRequestHeaderFilter, HttpRequestRedirectFilter, HttpRouteFilter, HttpUrlRewriteFilter, LocalObjectReference};
use kube::{
api::{PatchParams, PostParams},
api::{DeleteParams, PatchParams, PostParams},
Api, ResourceExt,
};
use spacegate_model::{constants::SG_FILTER_KIND, ext::k8s::crd::sg_filter::SgFilter};
Expand Down Expand Up @@ -41,6 +41,9 @@ pub(crate) trait PluginIdConv {
/// can be ues in gateway and route level
async fn add_filter_target(&self, target: K8sSgFilterSpecTargetRef, client: &K8s) -> BoxResult<()>;

/// can be ues in gateway and route level
async fn remove_filter_target(&self, target: K8sSgFilterSpecTargetRef, client: &K8s) -> BoxResult<()>;

// mix of [SgRouteFilter::to_singe_filter] and [SgRouteFilter::to_http_route_filter]
// PluginInstanceId can be converted into `SgRouteFilter` or `HttpRouteFilter`
// async fn to_route_filter_or_add_filter_target(&self, target: K8sSgFilterSpecTargetRef, client: &K8s) -> Option<HttpRouteFilter>;
Expand All @@ -51,7 +54,7 @@ impl PluginIdConv for PluginInstanceId {
match self.name.clone() {
PluginInstanceName::Anon { uid: _ } => None,
PluginInstanceName::Named { name } => Some(SgSingeFilter {
name: Some(name.clone()),
name: name.clone(),
namespace: namespace.to_owned(),
filter: K8sSgFilterSpecFilter {
code: self.code.to_string(),
Expand Down Expand Up @@ -156,6 +159,22 @@ impl PluginIdConv for PluginInstanceId {
Ok(())
}

async fn remove_filter_target(&self, target: K8sSgFilterSpecTargetRef, client: &K8s) -> BoxResult<()> {
let filter_api: Api<SgFilter> = client.get_namespace_api();
if let Ok(mut filter) = filter_api.get(&self.name.to_string()).await {
if filter.spec.target_refs.iter().find(|t| t.eq(&&target)).is_some() {
filter.spec.target_refs.retain(|t| !t.eq(&target));

if filter.spec.target_refs.is_empty() {
filter_api.delete(&filter.name_any(), &DeleteParams::default()).await?;
} else {
filter_api.replace(&filter.name_any(), &PostParams::default(), &filter).await?;
}
};
}
Ok(())
}

fn to_http_route_filter(self) -> Option<HttpRouteFilter> {
match self.name {
PluginInstanceName::Anon { uid: _ } => None,
Expand Down
12 changes: 9 additions & 3 deletions crates/config/src/service/k8s/convert/route_k8s_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl SgHttpRouteRuleConv for SgHttpRouteRule {

pub(crate) trait SgHttpRouteMatchConv {
fn from_kube_httproute(route_match: HttpRouteMatch) -> SgHttpRouteMatch;

/// Returns (matches, filters)
/// path 和 header 对应插件的关系是:
///
Expand All @@ -178,16 +179,21 @@ pub(crate) trait SgHttpRouteMatchConv {
/// | header1 | Some(request_header_modifier1) |
/// | header2 | Some(request_header_modifier2) |
/// | header3 | None |
/// | ~~header4,header5~~(not support) | ~~Some(request_header_modifier3),None~~ |
/// | path_match4,header6 | Some(url_rewrite3),None |
/// | path_match5,header7 | None,Some(request_header_modifier4) |
///
/// ==>
/// result ==>
///
/// matches_vec:[path_match1, path_match2, path_match3, header1, header2, header3, (path_match4,header6), (path_match5,header7)]
/// filter_vec:[url_rewrite1,url_rewrite2,request_header_modifier1,request_header_modifier2,url_rewrite3,request_header_modifier4]
///
/// | matches | plugins |
/// | --- | --- |
///
fn into_kube_httproute(self) -> (Vec<HttpRouteMatch>, Vec<HttpRouteFilter>);
}
impl SgHttpRouteMatchConv for SgHttpRouteMatch {
fn into_kube_httproute(self) -> (Vec<HttpRouteMatch>, Vec<HttpRouteFilter>) {
// todo: not complete
let (match_vec, plugins) = if let Some(method_vec) = self.method {
method_vec
.into_iter()
Expand Down
19 changes: 12 additions & 7 deletions crates/config/src/service/k8s/create.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use k8s_gateway_api::Gateway;
use k8s_openapi::api::core::v1::Secret;
use kube::{api::PostParams, Api, ResourceExt};
use kube::{api::PostParams, Api};
use spacegate_model::{
ext::k8s::crd::{
http_spaceroute,
sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
},
ext::k8s::crd::{http_spaceroute, sg_filter::SgFilter},
BoxError, PluginInstanceId,
};

use crate::{service::Create, BoxResult};
use crate::{
service::{Create, Update},
BoxResult,
};

use super::{
convert::{filter_k8s_conv::PluginIdConv as _, gateway_k8s_conv::SgGatewayConv as _, route_k8s_conv::SgHttpRouteConv as _, ToTarget as _},
Expand Down Expand Up @@ -53,7 +53,12 @@ impl Create for K8s {

if let Some(filter) = filter {
let filter_api: Api<SgFilter> = self.get_namespace_api();
filter_api.create(&PostParams::default(), &filter.into()).await?;
if filter_api.get_opt(&filter.name).await?.is_none() {
filter_api.create(&PostParams::default(), &filter.into()).await?;
} else {
// do update
self.update_plugin(id, filter.filter.config).await?;
}
}
Ok(())
}
Expand Down
27 changes: 10 additions & 17 deletions crates/config/src/service/k8s/delete.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
use k8s_gateway_api::{Gateway, HttpRoute};
use k8s_openapi::api::core::v1::Secret;
use kube::{api::DeleteParams, Api, ResourceExt as _};
use spacegate_model::ext::k8s::crd::{http_spaceroute::HttpSpaceroute, sg_filter::SgFilter};
use spacegate_model::ext::k8s::crd::http_spaceroute::HttpSpaceroute;

use crate::{
service::{Delete, Retrieve as _},
BoxResult,
};

use super::{
convert::{gateway_k8s_conv::SgGatewayConv as _, route_k8s_conv::SgHttpRouteConv as _},
convert::{filter_k8s_conv::PluginIdConv, gateway_k8s_conv::SgGatewayConv as _, route_k8s_conv::SgHttpRouteConv as _, ToTarget as _},
K8s,
};

impl Delete for K8s {
async fn delete_config_item_gateway(&self, gateway_name: &str) -> BoxResult<()> {
let gateway_api: Api<Gateway> = self.get_namespace_api();

if let Some(gateway) = self.retrieve_config_item_gateway(gateway_name).await? {
let (_, secret, delete_plugin_ids) = gateway.to_kube_gateway(&self.namespace);
if let Some(sg_gateway) = self.retrieve_config_item_gateway(gateway_name).await? {
let (gateway, secret, delete_plugin_ids) = sg_gateway.to_kube_gateway(&self.namespace);

if let Some(secret) = secret {
let secret_api: Api<Secret> = self.get_namespace_api();
secret_api.delete(&secret.name_any(), &DeleteParams::default()).await?;
}

for delete_plugin_id in delete_plugin_ids {
self.delete_plugin(&delete_plugin_id).await?;
delete_plugin_id.remove_filter_target(gateway.to_target_ref(), self).await?;
}

gateway_api.delete(gateway_name, &DeleteParams::default()).await?;
Expand All @@ -38,10 +38,10 @@ impl Delete for K8s {
let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
let httproute_api: Api<HttpRoute> = self.get_namespace_api();

if let Some(http_route) = self.retrieve_config_item_route(gateway_name, route_name).await? {
let (_, delete_plugin_ids) = http_route.to_kube_httproute(gateway_name, route_name, &self.namespace);
if let Some(sg_http_route) = self.retrieve_config_item_route(gateway_name, route_name).await? {
let (route, delete_plugin_ids) = sg_http_route.to_kube_httproute(gateway_name, route_name, &self.namespace);
for delete_plugin_id in delete_plugin_ids {
self.delete_plugin(&delete_plugin_id).await?;
delete_plugin_id.remove_filter_target(route.to_target_ref(), self).await?;
}
match http_spaceroute_api.delete(route_name, &DeleteParams::default()).await {
Ok(_) => Ok(()),
Expand All @@ -55,15 +55,8 @@ impl Delete for K8s {
}
}

async fn delete_plugin(&self, id: &spacegate_model::PluginInstanceId) -> BoxResult<()> {
let filter_api: Api<SgFilter> = self.get_namespace_api();
match id.name.clone() {
spacegate_model::PluginInstanceName::Anon { uid: _ } => {}
spacegate_model::PluginInstanceName::Named { name } => {
filter_api.delete(&name, &DeleteParams::default()).await?;
}
spacegate_model::PluginInstanceName::Mono => {}
}
async fn delete_plugin(&self, _id: &spacegate_model::PluginInstanceId) -> BoxResult<()> {
// do nothing
Ok(())
}
}
19 changes: 16 additions & 3 deletions crates/config/src/service/k8s/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use spacegate_model::{BackendHost, BoxResult};
use k8s_openapi::api::core::v1::Service;
use kube::{api::ListParams, Api, ResourceExt};
use spacegate_model::{BackendHost, BoxResult, K8sServiceData};

use crate::service::Discovery;

Expand All @@ -10,7 +12,18 @@ impl Discovery for K8s {
}

async fn backends(&self) -> BoxResult<Vec<BackendHost>> {
todo!();
Ok(vec![])
let service_api: Api<Service> = self.get_all_api();
let result = service_api
.list(&ListParams::default())
.await?
.into_iter()
.map(|s| {
BackendHost::K8sService(K8sServiceData {
name: s.name_any(),
namespace: s.namespace(),
})
})
.collect();
Ok(result)
}
}
29 changes: 15 additions & 14 deletions crates/config/src/service/k8s/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use kube::{
runtime::{watcher, WatchStreamExt},
Api, Resource, ResourceExt,
};

use crate::{
use spacegate_model::{
constants,
k8s_crd::{http_spaceroute::HttpSpaceroute, sg_filter::SgFilter},
service::{backend::k8s::K8s, Retrieve},
ext::k8s::crd::{http_spaceroute::HttpSpaceroute, sg_filter::SgFilter},
BoxResult, Config,
};

use super::{ConfigEventType, ConfigType, CreateListener, Listen};
use crate::service::{ConfigEventType, ConfigType, CreateListener, Listen, ListenEvent, Retrieve as _};

use super::K8s;

pub struct K8sListener {
rx: tokio::sync::mpsc::UnboundedReceiver<(ConfigType, ConfigEventType)>,
Expand Down Expand Up @@ -168,7 +168,7 @@ impl CreateListener for K8s {
let move_evt_tx = evt_tx.clone();
let move_namespace = self.namespace.to_string();
let move_http_spaceroute_api = http_spaceroute_api.clone();
//watche http spaceroute
//watch http spaceroute
tokio::task::spawn(async move {
let mut uid_version_map = HashMap::new();
let ew = watcher::watcher(move_http_spaceroute_api, watcher::Config::default());
Expand All @@ -182,7 +182,7 @@ impl CreateListener for K8s {
let move_evt_tx = evt_tx.clone();
let move_namespace = self.namespace.to_string();
let move_http_route_api = http_route_api.clone();
//watche http route
//watch http route
tokio::task::spawn(async move {
let mut uid_version_map = HashMap::new();
let ew = watcher::watcher(move_http_route_api, watcher::Config::default());
Expand All @@ -204,22 +204,23 @@ impl CreateListener for K8s {
.clone()
.into_values()
.flat_map(|item| {
let mut filters = item.gateway.filters.clone();
let route_filters = item.routes.values().flat_map(|route| route.filters.clone()).collect::<Vec<_>>();
filters.extend(route_filters);
filters
let mut plugin_ids = item.gateway.plugins.clone();
let route_plugin_ids = item.routes.values().flat_map(|route| route.plugins.clone()).collect::<Vec<_>>();
plugin_ids.extend(route_plugin_ids);
plugin_ids
})
.map(|f| (f.code, f.name))
.collect::<Vec<_>>();
let move_evt_tx = evt_tx.clone();
let move_namespace = self.namespace.to_string();
//watche sgfilter

//watch sgfilter
tokio::task::spawn(async move {
let mut uid_version_map = HashMap::new();
let ew = watcher::watcher(sg_filter_api, watcher::Config::default()).touched_objects();
pin_mut!(ew);
while let Some(filter) = ew.try_next().await.unwrap_or_default() {
if filter.spec.filters.iter().any(|inner_filter| move_filter_codes_names.contains(&(inner_filter.code.clone(), inner_filter.name.clone())))
if filter.spec.filters.iter().any(|inner_filter| move_filter_codes_names.contains(&(inner_filter.code.clone().into(), inner_filter.name.clone().into())))
&& uid_version_map.get(&filter.name_any()).is_none()
{
uid_version_map.insert(filter.name_any(), filter.resource_version());
Expand Down Expand Up @@ -270,7 +271,7 @@ impl CreateListener for K8s {
}

impl Listen for K8sListener {
fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<BoxResult<super::ListenEvent>> {
fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<BoxResult<ListenEvent>> {
if let Some(next) = ready!(self.rx.poll_recv(cx)) {
std::task::Poll::Ready(Ok(next))
} else {
Expand Down
7 changes: 4 additions & 3 deletions crates/config/src/service/k8s/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use k8s_openapi::NamespaceResourceScope;
pub mod convert;
pub mod create;
pub mod delete;
// pub mod listen;
pub mod retrieve;
// pub mod update;
pub mod discovery;
// todo check listen
pub mod listen;
pub mod retrieve;
pub mod update;

pub struct K8s {
pub namespace: Arc<str>,
Expand Down
Loading

0 comments on commit 1c2b345

Please sign in to comment.