diff --git a/.cargo/config.toml b/.cargo/config.toml index 3ee5d541..b1f69846 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,8 @@ [env] -TS_RS_EXPORT_DIR = { value = "./sdk/admin-client/src/model", relative = true } \ No newline at end of file +TS_RS_EXPORT_DIR = { value = "./sdk/admin-client/src/model", relative = true } +CONFIG = "file:./resource/local-example" +PLUGINS = "./target/debug" +RUST_LOG = "trace" +FORMAT = "json" +KEY = "moCZihByqvXt4dfMYjOz75fzBi0eul6Ffg2EoUzWyqA=" +SK = "password" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 6924914e..2f04bdcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/model", "crates/config", "crates/shell", + "examples/sayhello" ] resolver = "2" @@ -37,7 +38,6 @@ rust-version = "1.64" [workspace.dependencies] spacegate-kernel = { path = "./crates/kernel" } spacegate-plugin = { path = "./crates/plugin" } -spacegate-model = { path = "./crates/model" } spacegate-config = { path = "./crates/config" } spacegate-shell = { path = "./crates/shell" } spacegate-ext-axum = { path = "./crates/extension/axum" } @@ -51,11 +51,8 @@ serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } toml = { version = "0.8", features = ["preserve_order"] } lazy_static = { version = "1.4" } -async-trait = { version = "0.1" } -itertools = { version = "0" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing = { version = "0" } -urlencoding = { version = "2" } # Time @@ -69,9 +66,6 @@ hyper-util = { version = "0" } # ws tokio-tungstenite = { version = "0" } - -# tower -tower-service = { version = "0.3" } tower-layer = { version = "0.3" } tower-http = { version = "0.5" } tower = { version = "0.4" } @@ -86,12 +80,6 @@ schemars = { version = "0.8.6" } # Test reqwest = { version = "0.11", features = ["json", "gzip", "brotli"] } testcontainers-modules = { version = "0.3" } -async-compression = { version = "0.3.13", features = [ - "tokio", - "gzip", - "deflate", - "brotli", -] } bytes = { version = "1" } @@ -101,9 +89,6 @@ hyper-rustls = { version = "0.26" } rustls-pemfile = "2" tokio-rustls = { version = "0.25" } -# serde -duration-str = "0.7.1" - # regex regex = { version = "1" } serde_regex = { version = "1.1.0" } @@ -119,8 +104,6 @@ ipnet = { version = "2" } # notify notify = { version = "6.1.1" } -# redis -deadpool-redis = { version = "0.14" } - # web-server axum = "0.7.4" + diff --git a/binary/admin-server/Cargo.toml b/binary/admin-server/Cargo.toml index 0b4ec5c5..e3b322cf 100644 --- a/binary/admin-server/Cargo.toml +++ b/binary/admin-server/Cargo.toml @@ -18,10 +18,12 @@ default = [] [dependencies] clap = { version = "4.5", features = ["derive", "env"] } +base64 = "0.13" spacegate-config = { workspace = true, features = [ "fs", ] } axum = { workspace = true } +axum-extra = { version = "*", features = ["cookie"] } tower = { version = "0.4" } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } @@ -30,6 +32,9 @@ tower-http = { version = "*", features = ["trace"] } serde_json = { workspace = true } reqwest = { workspace = true, features = ["json"] } serde = { workspace = true, features = ["derive"] } +jsonwebtoken = "9" +digest = "0.10.7" +sha2 = "0.10.8" [dev-dependencies] [package.metadata.docs.rs] diff --git a/binary/admin-server/src/clap.rs b/binary/admin-server/src/clap.rs index 0fb9100b..6b7ce4e2 100644 --- a/binary/admin-server/src/clap.rs +++ b/binary/admin-server/src/clap.rs @@ -10,6 +10,7 @@ use std::{ use clap::Parser; use serde_json::Value; +use spacegate_config::BoxError; use tracing::{info, warn}; use crate::state::PluginCode; @@ -34,6 +35,19 @@ pub struct Args { /// the format of the config file #[arg(short, long, env, default_value_t = ConfigFormat::Toml)] pub format: ConfigFormat, + #[arg(short, long, env)] + pub key: Option, + #[arg(short, long, env)] + pub sk: Option, +} +#[derive(Debug, Clone)] +pub struct Base64Decoded(pub Vec); +impl FromStr for Base64Decoded { + type Err = BoxError; + + fn from_str(s: &str) -> Result { + base64::decode(s).map_err(Into::into).map(Base64Decoded) + } } #[derive(Debug, Clone)] diff --git a/binary/admin-server/src/main.rs b/binary/admin-server/src/main.rs index 36ce1887..1fc4b4aa 100644 --- a/binary/admin-server/src/main.rs +++ b/binary/admin-server/src/main.rs @@ -19,11 +19,17 @@ async fn main() -> Result<(), Box> { let args = ::parse(); tracing::info!("server started with args: {:?}", args); let addr = SocketAddr::new(args.host, args.port); + let sec = args.key.map(|k| k.0.into()); + let digest: Option> = args.sk.map(|sk| { + let out = ::digest(sk); + let out: [u8; 32] = out.into(); + Arc::new(out) + }); // let schemas = args.schemas.load_all()?; let app = match args.config { clap::ConfigBackend::File(path) => { let backend = spacegate_config::service::fs::Fs::new(path, config_format::Json::default()); - create_app(backend) + create_app(backend, sec, digest) } clap::ConfigBackend::K8s(_ns) => { // let backend = spacegate_config::service::backend::k8s::K8s::with_default_client(ns).await?; @@ -41,13 +47,15 @@ async fn main() -> Result<(), Box> { } /// create app for an backend -pub fn create_app(backend: B) -> Router<()> +pub fn create_app(backend: B, sec: Option>, sk_digest: Option>) -> Router<()> where B: Discovery + Create + Retrieve + Update + Delete + Send + Sync + 'static, { let state = AppState { backend: Arc::new(backend), version: mw::version_control::Version::new(), + secret: sec, + sk_digest, // plugin_schemas: Arc::new(schemas.into()), }; service::router(state) diff --git a/binary/admin-server/src/mw.rs b/binary/admin-server/src/mw.rs index 13e5c298..d51db9d0 100644 --- a/binary/admin-server/src/mw.rs +++ b/binary/admin-server/src/mw.rs @@ -1 +1,2 @@ +pub mod authentication; pub mod version_control; diff --git a/binary/admin-server/src/mw/authentication.rs b/binary/admin-server/src/mw/authentication.rs new file mode 100644 index 00000000..dfa94eda --- /dev/null +++ b/binary/admin-server/src/mw/authentication.rs @@ -0,0 +1,43 @@ +use axum::{ + extract::{self, State}, + http::StatusCode, + middleware::Next, + response::Response, +}; +use axum_extra::extract::cookie::CookieJar; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use serde::{Deserialize, Serialize}; + +use crate::state::AppState; + +/// Our claims struct, it needs to derive `Serialize` and/or `Deserialize` +#[derive(Debug, Serialize, Deserialize)] +pub struct Claims { + pub sub: String, + pub exp: u64, + pub username: String, +} + +pub struct Authentication { + pub secret: String, +} + +pub async fn authentication(State(state): State>, cookie: CookieJar, request: extract::Request, next: Next) -> Response { + use axum::http::header::AUTHORIZATION; + if let Some(secret) = state.secret { + let Some(jwt) = request + .headers() + .get(AUTHORIZATION) + .and_then(|header| header.to_str().ok()) + .and_then(|header| header.strip_prefix("Bearer ")) + .or(cookie.get("jwt").map(|cookie| cookie.value())) + else { + return Response::builder().status(StatusCode::UNAUTHORIZED).body("expect jwt token".into()).unwrap(); + }; + let Ok(_jwt) = decode::(jwt, &DecodingKey::from_secret(secret.as_ref()), &Validation::new(Algorithm::HS256)) else { + return Response::builder().status(StatusCode::UNAUTHORIZED).body("invalid jwt token".into()).unwrap(); + }; + } + + next.run(request).await +} diff --git a/binary/admin-server/src/service.rs b/binary/admin-server/src/service.rs index 693e0ad6..afcc8ff4 100644 --- a/binary/admin-server/src/service.rs +++ b/binary/admin-server/src/service.rs @@ -3,6 +3,7 @@ use spacegate_config::service::*; use crate::{mw, state::AppState}; +pub mod auth; pub mod config; pub mod instance; pub mod plugin; @@ -14,11 +15,16 @@ where Router::new() .nest( "/config", - config::router::().layer(middleware::from_fn_with_state(state.clone(), mw::version_control::version_control)), + config::router::() + .layer(middleware::from_fn_with_state(state.clone(), mw::authentication::authentication)) + .layer(middleware::from_fn_with_state(state.clone(), mw::version_control::version_control)), ) .nest( "/plugin", - plugin::router::().layer(middleware::from_fn_with_state(state.clone(), mw::version_control::version_control)), + plugin::router::() + .layer(middleware::from_fn_with_state(state.clone(), mw::authentication::authentication)) + .layer(middleware::from_fn_with_state(state.clone(), mw::version_control::version_control)), ) + .nest("/auth", auth::router::()) .with_state(state) } diff --git a/binary/admin-server/src/service/auth.rs b/binary/admin-server/src/service/auth.rs new file mode 100644 index 00000000..36e80229 --- /dev/null +++ b/binary/admin-server/src/service/auth.rs @@ -0,0 +1,51 @@ +use std::time::SystemTime; + +use crate::{ + error::InternalError, + mw::authentication::Claims, + state::{self, AppState}, +}; +use axum::{ + extract::State, + http::{header::SET_COOKIE, HeaderValue}, + routing::post, + Json, Router, +}; +use jsonwebtoken::{encode, EncodingKey, Header}; +use serde::{Deserialize, Serialize}; +#[derive(Debug, Serialize, Deserialize)] +pub struct Login { + pub ak: String, + pub sk: String, +} +const EXPIRE: u64 = 3600; +async fn login(State(AppState { secret, sk_digest, .. }): State>, login: Json) -> Result { + let mut response = axum::response::Response::new(axum::body::Body::empty()); + if let Some(sk_digest) = sk_digest { + let out: [u8; 32] = ::digest(&login.sk).into(); + if &out != sk_digest.as_ref() { + *response.status_mut() = axum::http::StatusCode::UNAUTHORIZED; + return Ok(response); + } + } + if let Some(sec) = secret { + let jwt = encode( + &Header::default(), + &Claims { + sub: "admin".to_string(), + exp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() + EXPIRE, + username: login.ak.to_string(), + }, + &EncodingKey::from_secret(sec.as_ref()), + ) + .map_err(InternalError::boxed)?; + response.headers_mut().insert( + SET_COOKIE, + HeaderValue::from_str(&format!("jwt={jwt}; path=/; HttpOnly; Max-Age=3600")).expect("invalid jwt"), + ); + } + Ok(response) +} +pub fn router() -> axum::Router> { + Router::new().route("/login", post(login::)) +} diff --git a/binary/admin-server/src/service/instance.rs b/binary/admin-server/src/service/instance.rs index 84ad3447..ea8532e4 100644 --- a/binary/admin-server/src/service/instance.rs +++ b/binary/admin-server/src/service/instance.rs @@ -6,9 +6,3 @@ pub struct K8sInstance { pub name: Arc, pub namespace: Arc, } - -// impl K8sInstance { -// pub async fn fetch_all() -> Vec { -// config::k8s::fetch_all().await -// } -// } diff --git a/binary/admin-server/src/state.rs b/binary/admin-server/src/state.rs index bc833e2f..df2f2a1a 100644 --- a/binary/admin-server/src/state.rs +++ b/binary/admin-server/src/state.rs @@ -16,10 +16,11 @@ impl ToString for PluginCode { } } -#[derive(Debug)] pub struct AppState { pub backend: Arc, pub version: mw::version_control::Version, + pub secret: Option>, + pub sk_digest: Option>, // pub plugin_schemas: Arc>>, } @@ -28,6 +29,8 @@ impl Clone for AppState { Self { backend: self.backend.clone(), version: self.version.clone(), + secret: self.secret.clone(), + sk_digest: self.sk_digest.clone(), // plugin_schemas: self.plugin_schemas.clone(), } } diff --git a/binary/spacegate/Cargo.toml b/binary/spacegate/Cargo.toml index e0f9466c..21fdeb84 100644 --- a/binary/spacegate/Cargo.toml +++ b/binary/spacegate/Cargo.toml @@ -14,7 +14,7 @@ readme = "../../README.md" [features] -default = ["fs"] +default = ["fs", "dylib"] full = ["k8s", "fs", "redis", "axum"] build-k8s = ["k8s", "redis", "axum"] build-local = ["fs", "redis", "axum"] @@ -25,7 +25,7 @@ redis = ["spacegate-shell/cache"] axum = ["spacegate-shell/ext-axum"] # Used to statically link openssl at compile time static-openssl = ["openssl/vendored"] - +dylib = ["spacegate-shell/plugin-dylib"] [dependencies] # envy = { } clap = { version = "4.5", features = ["derive", "env"] } diff --git a/binary/spacegate/src/args.rs b/binary/spacegate/src/args.rs index 3885b754..5cc15d74 100644 --- a/binary/spacegate/src/args.rs +++ b/binary/spacegate/src/args.rs @@ -86,6 +86,8 @@ impl<'de> Deserialize<'de> for Config { Config::from_str(&s).map_err(serde::de::Error::custom) } } + +/// Spacegate start up arguments #[derive(Debug, Serialize, Deserialize, Clone, Parser)] #[command(version, about, long_about = None)] pub struct Args { @@ -93,11 +95,14 @@ pub struct Args { /// /// # Example /// ## File - /// file:/path/to/dir + /// `-c file:/path/to/dir` /// ## K8s - /// k8s:namespace + /// `-c k8s:namespace` /// ## Redis - /// redis:redis://some-redis-url + /// `-c redis:redis://some-redis-url` #[arg(short, long, env)] pub config: Config, + /// The dynamic lib plugins dir + #[arg(short, long, env)] + pub plugins: Option, } diff --git a/binary/spacegate/src/main.rs b/binary/spacegate/src/main.rs index 928c8507..000d485c 100644 --- a/binary/spacegate/src/main.rs +++ b/binary/spacegate/src/main.rs @@ -2,8 +2,38 @@ use clap::Parser; use spacegate_shell::BoxError; mod args; fn main() -> Result<(), BoxError> { + // todo: more subscriber required tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).init(); let args = args::Args::parse(); + if let Some(plugins) = args.plugins { + #[cfg(feature = "dylib")] + { + let dir = std::fs::read_dir(plugins)?; + for entry in dir { + let entry = entry?; + let path = entry.path(); + let ext = path.extension(); + let is_dylib = if cfg!(target_os = "windows") { + ext == Some("dll".as_ref()) + } else if cfg!(target_os = "macos") { + ext == Some("dylib".as_ref()) + } else { + ext == Some("so".as_ref()) + }; + if path.is_file() && is_dylib { + println!("loading plugin lib: {:?}", path); + let res = unsafe { spacegate_shell::plugin::PluginRepository::global().register_dylib(&path) }; + if let Err(e) = res { + eprintln!("fail to load plugin: {:?}", e); + } + } + } + } + #[cfg(not(feature = "dylib"))] + { + eprintln!("feature dylib not enabled") + } + } let rt = tokio::runtime::Builder::new_multi_thread().enable_all().thread_name(env!("CARGO_PKG_NAME")).build().expect("fail to build runtime"); rt.block_on(async move { match args.config { diff --git a/crates/config/src/constants.rs b/crates/config/src/constants.rs deleted file mode 100644 index 8a191ede..00000000 --- a/crates/config/src/constants.rs +++ /dev/null @@ -1,16 +0,0 @@ -pub const GATEWAY_CLASS_NAME: &str = "spacegate"; -pub const GATEWAY_ANNOTATION_REDIS_URL: &str = "redis_url"; -pub const GATEWAY_ANNOTATION_LOG_LEVEL: &str = "log_level"; -pub const GATEWAY_ANNOTATION_LANGUAGE: &str = "lang"; -pub const GATEWAY_ANNOTATION_IGNORE_TLS_VERIFICATION: &str = "ignore_tls_verification"; - -pub const DEFAULT_NAMESPACE: &str = "default"; -pub const ANNOTATION_RESOURCE_PRIORITY: &str = "priority"; - -pub const RAW_HTTP_ROUTE_KIND: &str = "raw.http.route.kind"; -pub const RAW_HTTP_ROUTE_KIND_DEFAULT: &str = "HTTPRoute"; -pub const RAW_HTTP_ROUTE_KIND_SPACEROUTE: &str = "HTTPSpaceroute"; - -pub const BANCKEND_KIND_SERVICE: &str = "Service"; -pub const BANCKEND_KIND_EXTERNAL_HTTP: &str = "ExternalHttp"; -pub const BANCKEND_KIND_EXTERNAL_HTTPS: &str = "ExternalHttps"; diff --git a/crates/kernel/src/backend_service.rs b/crates/kernel/src/backend_service.rs index 04f2bd5c..54409240 100644 --- a/crates/kernel/src/backend_service.rs +++ b/crates/kernel/src/backend_service.rs @@ -1,5 +1,4 @@ use std::convert::Infallible; -use std::path::Path; use std::sync::Arc; use futures_util::future::BoxFuture; @@ -19,22 +18,13 @@ pub mod echo; pub mod http_client_service; pub mod static_file_service; pub mod ws_client_service; -pub(crate) const FILE_SCHEMA: &str = "file"; -pub trait CloneHyperService: hyper::service::Service { - fn clone_box(&self) -> Box + Send + Sync>; -} +pub trait SharedHyperService: hyper::service::Service + Send + Sync + 'static {} -impl CloneHyperService for T -where - T: hyper::service::Service + Send + Sync + Clone + 'static, -{ - fn clone_box(&self) -> Box + Send + Sync> { - Box::new(self.clone()) - } -} +impl SharedHyperService for T where T: hyper::service::Service + Send + Sync + 'static {} +/// a service that can be shared between threads pub struct ArcHyperService { - pub boxed: Arc< - dyn CloneHyperService, Response = Response, Error = Infallible, Future = BoxFuture<'static, Result, Infallible>>> + Send + Sync, + pub shared: Arc< + dyn SharedHyperService, Response = Response, Error = Infallible, Future = BoxFuture<'static, Result, Infallible>>> + Send + Sync, >, } @@ -46,18 +36,18 @@ impl std::fmt::Debug for ArcHyperService { impl Clone for ArcHyperService { fn clone(&self) -> Self { - Self { boxed: self.boxed.clone() } + Self { shared: self.shared.clone() } } } impl ArcHyperService { pub fn new(service: T) -> Self where - T: Clone + CloneHyperService, Response = Response, Error = Infallible> + Send + Sync + 'static, + T: SharedHyperService, Response = Response, Error = Infallible> + Send + Sync + 'static, T::Future: Future, Infallible>> + 'static + Send, { let map_fut = MapFuture::new(service, |fut| Box::pin(fut) as _); - Self { boxed: Arc::new(map_fut) } + Self { shared: Arc::new(map_fut) } } } @@ -67,7 +57,7 @@ impl hyper::service::Service> for ArcHyperService { type Future = BoxFuture<'static, Result>; fn call(&self, req: Request) -> Self::Future { - Box::pin(self.boxed.call(req)) + Box::pin(self.shared.call(req)) } } @@ -83,11 +73,8 @@ impl hyper::service::Service> for ArcHyperService { pub async fn http_backend_service_inner(mut req: Request) -> Result { tracing::trace!(elapsed = ?req.extensions().get::().map(crate::extension::EnterTime::elapsed), "start a backend request"); x_forwarded_for(&mut req)?; - if req.uri().scheme_str() == Some(FILE_SCHEMA) { - return Ok(static_file_service::static_file_service(req, Path::new("./")).await); - } let mut client = get_client(); - let mut response = if req.headers().get(UPGRADE).is_some_and(|upgrade| upgrade.as_bytes().eq_ignore_ascii_case(b"websocket")) { + let response = if req.headers().get(UPGRADE).is_some_and(|upgrade| upgrade.as_bytes().eq_ignore_ascii_case(b"websocket")) { // dump request let (part, body) = req.into_parts(); let body = body.dump().await?; @@ -114,15 +101,11 @@ pub async fn http_backend_service_inner(mut req: Request) -> Result>::Ok(()) }); - tracing::trace!(elapsed = ?resp.extensions().get::().map(crate::extension::EnterTime::elapsed), "finish backend websocket forward"); // return response to client resp } else { - let resp = client.request(req).await; - tracing::trace!(elapsed = ?resp.extensions().get::().map(crate::extension::EnterTime::elapsed), "finish backend request"); - resp + client.request(req).await }; - response.extensions_mut().insert(unsafe { crate::extension::FromBackend::new() }); Ok(response) } diff --git a/crates/kernel/src/helper_layers.rs b/crates/kernel/src/helper_layers.rs index 7ccf58aa..fe37afa4 100644 --- a/crates/kernel/src/helper_layers.rs +++ b/crates/kernel/src/helper_layers.rs @@ -1,6 +1,3 @@ -/// Sync request filter. -pub mod filter; - /// Map service's future into another future. pub mod map_future; @@ -21,3 +18,6 @@ pub mod route; /// Timeout layer based on tokio timer pub mod timeout; + +/// Balancer layer +pub mod balancer; diff --git a/crates/kernel/src/helper_layers/async_filter.rs b/crates/kernel/src/helper_layers/async_filter.rs deleted file mode 100644 index fa00bfd3..00000000 --- a/crates/kernel/src/helper_layers/async_filter.rs +++ /dev/null @@ -1,116 +0,0 @@ -pub mod dump; -use std::{convert::Infallible, task::ready}; - -use futures_util::Future; -use hyper::{Request, Response}; -use pin_project_lite::pin_project; -use tower_layer::Layer; - -use crate::SgBody; - -pub trait AsyncFilter: Clone { - type Future: Future, Response>> + Send + 'static; - fn filter(&self, req: Request) -> Self::Future; -} - -#[derive(Debug, Clone)] -pub struct AsyncFilterRequestLayer { - filter: F, -} - -impl AsyncFilterRequestLayer { - pub fn new(filter: F) -> Self { - Self { filter } - } -} - -impl Layer for AsyncFilterRequestLayer -where - F: AsyncFilter, -{ - type Service = AsyncFilterRequest; - - fn layer(&self, inner: S) -> Self::Service { - AsyncFilterRequest { - filter: self.filter.clone(), - inner, - } - } -} - -#[derive(Debug, Clone)] -pub struct AsyncFilterRequest { - filter: F, - inner: S, -} - -pin_project! { - #[project = FilterResponseFutureStateProj] - enum FilterResponseFutureState { - Filter { - #[pin] - fut: F - }, - Inner { - #[pin] - fut: S - }, - } -} - -pin_project! { - pub struct FilterResponseFuture - where S: hyper::service::Service>, F: AsyncFilter - { - #[pin] - state: FilterResponseFutureState, - inner_service: S - } -} - -impl Future for FilterResponseFuture -where - F: AsyncFilter, - S: hyper::service::Service, Response = Response, Error = Infallible>, -{ - type Output = Result, Infallible>; - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - let mut this = self.project(); - loop { - match this.state.as_mut().project() { - FilterResponseFutureStateProj::Filter { fut } => match ready!(fut.poll(cx)) { - Ok(req) => this.state.set(FilterResponseFutureState::Inner { - fut: this.inner_service.call(req), - }), - Err(resp) => return std::task::Poll::Ready(Ok(resp)), - }, - FilterResponseFutureStateProj::Inner { fut } => { - let resp = ready!(fut.poll(cx)).expect("infallible"); - return std::task::Poll::Ready(Ok(resp)); - } - } - } - } -} - -impl hyper::service::Service> for AsyncFilterRequest -where - F: AsyncFilter, - S: Clone + hyper::service::Service, Error = Infallible, Response = Response>, -{ - type Response = Response; - type Error = Infallible; - type Future = FilterResponseFuture; - - fn call(&self, req: Request) -> Self::Future { - let inner = self.inner.clone(); - let filter = self.filter.clone(); - // filter the request - - FilterResponseFuture { - state: FilterResponseFutureState::Filter { fut: filter.filter(req) }, - inner_service: inner, - } - } -} diff --git a/crates/kernel/src/helper_layers/async_filter/dump.rs b/crates/kernel/src/helper_layers/async_filter/dump.rs deleted file mode 100644 index 996556b1..00000000 --- a/crates/kernel/src/helper_layers/async_filter/dump.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::pin::Pin; - -use futures_util::Future; -use hyper::{Request, Response}; - -use crate::{SgBody, SgResponseExt}; - -use super::AsyncFilter; - -#[derive(Debug, Clone, Copy)] -pub struct Dump; - -impl AsyncFilter for Dump { - type Future = Pin, Response>> + Send + 'static>>; - fn filter(&self, req: Request) -> Self::Future { - let (part, body) = req.into_parts(); - Box::pin(async move { - let body = body.dump().await.map_err(|e| Response::::bad_gateway(e.as_ref()))?; - Ok(Request::from_parts(part, body)) - }) - } -} diff --git a/crates/kernel/src/helper_layers/balancer.rs b/crates/kernel/src/helper_layers/balancer.rs new file mode 100644 index 00000000..e67b5212 --- /dev/null +++ b/crates/kernel/src/helper_layers/balancer.rs @@ -0,0 +1,37 @@ +pub mod ip_hash; +pub use ip_hash::IpHash; +pub mod random; +pub use random::Random; + +pub struct Balancer { + pub policy: P, + pub instances: Vec, + pub fallback: S, +} + +impl Balancer { + pub fn new(policy: P, instances: Vec, fallback: S) -> Self { + Self { policy, instances, fallback } + } +} + +pub trait BalancePolicy { + fn pick<'s>(&self, instances: &'s [S], req: &R) -> Option<&'s S>; +} + +impl hyper::service::Service for Balancer +where + P: BalancePolicy, + S: hyper::service::Service, + S::Future: std::marker::Send, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = S::Future; + + fn call(&self, req: R) -> Self::Future { + self.policy.pick(&self.instances, &req).unwrap_or(&self.fallback).call(req) + } +} diff --git a/crates/kernel/src/helper_layers/balancer/ip_hash.rs b/crates/kernel/src/helper_layers/balancer/ip_hash.rs new file mode 100644 index 00000000..d6decfd0 --- /dev/null +++ b/crates/kernel/src/helper_layers/balancer/ip_hash.rs @@ -0,0 +1,39 @@ +use crate::{extension::PeerAddr, SgRequest}; + +use super::BalancePolicy; +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + marker::PhantomData, +}; + +/// A policy that selects an instance based on the hash of the IP address. +#[derive(Debug, Clone)] +pub struct IpHash { + hasher: PhantomData H>, +} + +impl Default for IpHash { + fn default() -> Self { + Self { hasher: PhantomData } + } +} + +impl BalancePolicy for IpHash +where + H: Hasher + Default, +{ + fn pick<'s>(&self, instances: &'s [S], req: &SgRequest) -> Option<&'s S> { + if instances.is_empty() { + None + } else if instances.len() == 1 { + instances.first() + } else { + let mut hasher = H::default(); + let ip = req.extensions().get::()?.0.ip(); + ip.to_canonical().hash(&mut hasher); + let hash = hasher.finish(); + let index = (hash % instances.len() as u64) as usize; + instances.get(index) + } + } +} diff --git a/crates/kernel/src/helper_layers/balancer/random.rs b/crates/kernel/src/helper_layers/balancer/random.rs new file mode 100644 index 00000000..3f8f6dfd --- /dev/null +++ b/crates/kernel/src/helper_layers/balancer/random.rs @@ -0,0 +1,38 @@ +use rand::distributions::Distribution; + +use super::BalancePolicy; + +/// A policy that selects an instance randomly. +pub struct Random +where + I: rand::distributions::uniform::SampleUniform + std::cmp::PartialOrd, +{ + picker: rand::distributions::WeightedIndex, +} + +impl Random +where + I: rand::distributions::uniform::SampleUniform + std::cmp::PartialOrd + Clone + Default + for<'a> std::ops::AddAssign<&'a I>, +{ + pub fn new(weights: impl IntoIterator) -> Self { + Self { + picker: rand::distributions::WeightedIndex::new(weights).expect("invalid weights"), + } + } +} + +impl BalancePolicy for Random +where + I: rand::distributions::uniform::SampleUniform + std::cmp::PartialOrd, +{ + fn pick<'s>(&self, instances: &'s [S], _req: &R) -> Option<&'s S> { + if instances.is_empty() { + None + } else if instances.len() == 1 { + instances.first() + } else { + let index = self.picker.sample(&mut rand::thread_rng()); + instances.get(index) + } + } +} diff --git a/crates/kernel/src/helper_layers/bidirection_filter.rs b/crates/kernel/src/helper_layers/bidirection_filter.rs deleted file mode 100644 index 31ebf5ab..00000000 --- a/crates/kernel/src/helper_layers/bidirection_filter.rs +++ /dev/null @@ -1,155 +0,0 @@ -use crate::{service::ArcHyperService, SgBody}; -use futures_util::ready; -use hyper::{Request, Response}; -use pin_project_lite::pin_project; -use std::{ - convert::Infallible, - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tower_layer::Layer; - -/// Bi-Direction Filter -pub trait Bdf: Send + Sync { - type FutureReq: Future, Response>> + Send; - type FutureResp: Future> + Send; - - fn on_req(self: Arc, req: Request) -> Self::FutureReq; - fn on_resp(self: Arc, resp: Response) -> Self::FutureResp; -} - -/// Bi-Direction Filter Layer -#[derive(Debug, Clone)] -pub struct BdfLayer { - filter: Arc, -} - -impl BdfLayer { - pub fn new(filter: F) -> Self { - Self { filter: Arc::new(filter) } - } -} - -pin_project! { - #[derive(Debug, Clone)] - pub struct BdfService { - #[pin] - filter: Arc, - service: S, - } -} - -impl Layer for BdfLayer -where - F: Clone, -{ - type Service = BdfService; - fn layer(&self, service: ArcHyperService) -> Self::Service { - Self::Service { - filter: self.filter.clone(), - service, - } - } -} - -impl hyper::service::Service> for BdfService -where - Self: Clone, - S: hyper::service::Service, Error = Infallible, Response = Response>, - F: Bdf, -{ - type Response = Response; - type Error = Infallible; - type Future = FilterFuture; - - fn call(&self, request: Request) -> Self::Future { - let cloned = self.clone(); - FilterFuture { - request: Some(request), - state: FilterFutureState::Start, - filter: cloned, - } - } -} - -pin_project! { - pub struct FilterFuture - where - S: hyper::service::Service, Error = Infallible, Response = Response>, - F: Bdf, - { - request: Option>, - #[pin] - state: FilterFutureState, - #[pin] - filter: BdfService, - } -} - -pin_project! { - #[project = FilterFutureStateProj] - pub enum FilterFutureState { - Start, - Request { - #[pin] - fut: FReq, - }, - InnerCall { - #[pin] - fut: S, - }, - Response { - #[pin] - fut: FResp, - }, - } -} - -impl Future for FilterFuture -where - S: hyper::service::Service, Error = Infallible, Response = Response>, - F: Bdf, -{ - type Output = Result, Infallible>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.state.as_mut().project() { - FilterFutureStateProj::Start => { - tracing::trace!("enter bdf {}", std::any::type_name::()); - let fut = this.filter.filter.clone().on_req(this.request.take().expect("missing request at start state")); - this.state.set(FilterFutureState::Request { fut }); - } - FilterFutureStateProj::Request { fut } => { - let request_result = ready!(fut.poll(cx)); - match request_result { - Ok(req) => { - let fut = this.filter.as_mut().project().service.call(req); - tracing::trace!("leave bdf {}", std::any::type_name::()); - this.state.set(FilterFutureState::InnerCall { fut }); - } - Err(resp) => { - return Poll::Ready(Ok(resp)); - } - } - } - FilterFutureStateProj::InnerCall { fut } => { - let request_result = ready!(fut.poll(cx))?; - let fut = this.filter.filter.clone().on_resp(request_result); - this.state.set(FilterFutureState::Response { fut }); - } - FilterFutureStateProj::Response { fut } => { - let request_result = ready!(fut.poll(cx)); - tracing::trace!("leave bdf {}", std::any::type_name::()); - return Poll::Ready(Ok(request_result)); - } - } - } - } -} - -pub type BoxReqFut = Pin, Response>> + Send>>; -pub type BoxRespFut = Pin> + Send>>; diff --git a/crates/kernel/src/helper_layers/filter.rs b/crates/kernel/src/helper_layers/filter.rs deleted file mode 100644 index b888350d..00000000 --- a/crates/kernel/src/helper_layers/filter.rs +++ /dev/null @@ -1,62 +0,0 @@ -pub mod response_anyway; - -use std::{convert::Infallible, future::Ready}; - -use crate::SgBody; -use hyper::{Request, Response}; -use tower_layer::Layer; - -pub trait Filter: Clone { - /// # Errors - /// an error will be responded immediately - fn filter(&self, req: Request) -> Result, Response>; -} - -#[derive(Debug, Clone)] - -pub struct FilterRequestLayer { - filter: F, -} - -impl FilterRequestLayer { - pub fn new(filter: F) -> Self { - Self { filter } - } -} - -impl Layer for FilterRequestLayer -where - F: Filter, -{ - type Service = FilterRequest; - - fn layer(&self, inner: S) -> Self::Service { - FilterRequest { - filter: self.filter.clone(), - inner, - } - } -} - -#[derive(Debug, Clone)] -pub struct FilterRequest { - filter: F, - inner: S, -} - -impl hyper::service::Service> for FilterRequest -where - F: Filter, - S: hyper::service::Service, Error = Infallible, Response = Response>, -{ - type Response = Response; - type Error = Infallible; - type Future = futures_util::future::Either>, S::Future>; - - fn call(&self, req: Request) -> Self::Future { - match self.filter.filter(req) { - Ok(req) => futures_util::future::Either::Right(self.inner.call(req)), - Err(resp) => futures_util::future::Either::Left(std::future::ready(Ok(resp))), - } - } -} diff --git a/crates/kernel/src/helper_layers/filter/response_anyway.rs b/crates/kernel/src/helper_layers/filter/response_anyway.rs deleted file mode 100644 index d095bcb1..00000000 --- a/crates/kernel/src/helper_layers/filter/response_anyway.rs +++ /dev/null @@ -1,17 +0,0 @@ -use hyper::{header::CONTENT_TYPE, Request, Response}; - -use crate::SgBody; - -use super::Filter; - -#[derive(Debug, Clone)] -pub struct ResponseAnyway { - pub status: hyper::StatusCode, - pub message: hyper::body::Bytes, -} - -impl Filter for ResponseAnyway { - fn filter(&self, _req: Request) -> Result, Response> { - Err(Response::builder().status(self.status).header(CONTENT_TYPE, "text/html; charset=utf-8").body(SgBody::full(self.message.clone())).expect("invalid response builder")) - } -} diff --git a/crates/kernel/src/helper_layers/response_error.rs b/crates/kernel/src/helper_layers/response_error.rs deleted file mode 100644 index 5be57243..00000000 --- a/crates/kernel/src/helper_layers/response_error.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::{convert::Infallible, future::Future, marker, sync::Arc, task::ready}; - -use hyper::{Request, Response}; -use pin_project_lite::pin_project; -use tower_layer::Layer; - -use crate::SgBody; -#[derive(Debug, Clone)] -pub struct ResponseErrorLayer { - formatter: Arc, -} - -impl ResponseErrorLayer { - pub fn new(formatter: F) -> Self { - Self { formatter: Arc::new(formatter) } - } -} - -impl Default for ResponseErrorLayer { - fn default() -> Self { - Self { - formatter: Arc::new(DefaultErrorFormatter), - } - } -} - -#[derive(Debug, Clone)] -pub struct DefaultErrorFormatter; - -impl ErrorFormatter for DefaultErrorFormatter { - fn format(&self, err: impl std::error::Error) -> String { - err.to_string() - } -} - -pub trait ErrorFormatter { - fn format(&self, err: impl std::error::Error) -> String; -} - -impl Layer for ResponseErrorLayer -where - F: ErrorFormatter, -{ - type Service = ResponseError; - - fn layer(&self, inner: S) -> Self::Service { - ResponseError { - inner, - formatter: self.formatter.clone(), - } - } -} - -#[derive(Debug, Clone)] -pub struct ResponseError { - formatter: Arc, - inner: S, -} - -pin_project! { - pub struct ResponseErrorFuture { - formatter: Arc, - error: marker::PhantomData, - #[pin] - inner: F, - } -} - -impl ResponseErrorFuture { - pub fn new(formatter: impl Into>, fut: F) -> Self { - ResponseErrorFuture { - formatter: formatter.into(), - error: marker::PhantomData, - inner: fut, - } - } -} - -impl Future for ResponseErrorFuture -where - F: Future, E>>, - E: std::error::Error, - FMT: ErrorFormatter, -{ - type Output = Result, Infallible>; - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - use crate::SgResponseExt; - let this = self.project(); - let inner_call_result = ready!(this.inner.poll(cx)); - match inner_call_result { - Ok(resp) => std::task::Poll::Ready(Ok(resp)), - Err(e) => std::task::Poll::Ready(Ok(Response::from_error(e, this.formatter.as_ref()))), - } - } -} - -impl hyper::service::Service> for ResponseError -where - S: hyper::service::Service, Response = Response> + Send + Sync + 'static, - S::Error: std::error::Error, - FMT: ErrorFormatter, -{ - type Response = Response; - - type Error = Infallible; - - type Future = ResponseErrorFuture; - - fn call(&self, req: Request) -> Self::Future { - let fut = self.inner.call(req); - ResponseErrorFuture { - error: marker::PhantomData, - inner: fut, - formatter: self.formatter.clone(), - } - } -} diff --git a/crates/kernel/src/helper_layers/route.rs b/crates/kernel/src/helper_layers/route.rs index 713bd124..ed3e655f 100644 --- a/crates/kernel/src/helper_layers/route.rs +++ b/crates/kernel/src/helper_layers/route.rs @@ -3,6 +3,7 @@ use std::{convert::Infallible, ops::Index}; use futures_util::future::BoxFuture; pub use hyper::http::request::Parts; use hyper::{Request, Response}; +use tracing::{instrument, Instrument}; use crate::{extension::Matched, SgBody}; @@ -44,26 +45,18 @@ where type Error = Infallible; type Response = Response; type Future = BoxFuture<'static, Result>; + #[instrument("router", skip_all, fields(http.uri =? req.uri(), http.method =? req.method()))] fn call(&self, mut req: Request) -> Self::Future { - tracing::trace!("enter route {req:?}"); let fut: Self::Future = if let Some(index) = self.router.route(&mut req) { req.extensions_mut().insert(Matched { index: index.clone(), router: self.router.clone(), }); let fut = self.services.index(index).call(req); - Box::pin(async move { - let result = fut.await; - tracing::trace!("leave route"); - result - }) + Box::pin(fut.in_current_span()) } else { let fut = self.fallback.call(req); - Box::pin(async move { - let result = fut.await; - tracing::trace!("leave route"); - result - }) + Box::pin(fut.in_current_span()) }; fut } diff --git a/crates/kernel/src/helper_layers/stat.rs b/crates/kernel/src/helper_layers/stat.rs deleted file mode 100644 index 413d35dc..00000000 --- a/crates/kernel/src/helper_layers/stat.rs +++ /dev/null @@ -1,90 +0,0 @@ -use futures_util::Future; -use hyper::{Request, Response}; -use std::{convert::Infallible, sync::Arc, task::ready}; - -use crate::SgBody; - -#[derive(Debug, Clone)] -pub struct StatLayer

{ - policy: Arc

, -} - -impl

StatLayer

{ - pub fn new(policy: impl Into>) -> Self { - Self { policy: policy.into() } - } -} - -pub trait Policy { - fn on_request(&self, req: &Request); - fn on_response(&self, resp: &Response); -} - -#[derive(Debug, Clone)] -pub struct Stat { - policy: Arc

, - inner: S, -} - -impl Stat { - pub fn new(policy: impl Into>, inner: S) -> Self { - Self { policy: policy.into(), inner } - } -} - -impl tower_layer::Layer for StatLayer

-where - P: Policy + Clone, -{ - type Service = Stat; - - fn layer(&self, inner: S) -> Self::Service { - Stat::new(self.policy.clone(), inner) - } -} - -impl hyper::service::Service> for Stat -where - P: Policy, - S: hyper::service::Service, Response = Response, Error = Infallible>, -{ - type Response = Response; - type Error = Infallible; - type Future = ResponseFuture; - - fn call(&self, req: Request) -> Self::Future { - self.policy.on_request(&req); - let fut = self.inner.call(req); - ResponseFuture::new(fut, self.policy.clone()) - } -} - -pin_project_lite::pin_project! { - pub struct ResponseFuture { - #[pin] - inner: F, - policy: Arc

, - } - -} - -impl ResponseFuture { - pub fn new(inner: F, policy: Arc

) -> Self { - Self { inner, policy } - } -} - -impl Future for ResponseFuture -where - F: Future, Infallible>>, - P: Policy, -{ - type Output = Result, Infallible>; - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - let this = self.project(); - let response = ready!(this.inner.poll(cx)).expect("infallible"); - this.policy.on_response(&response); - std::task::Poll::Ready(Ok(response)) - } -} diff --git a/crates/kernel/src/lib.rs b/crates/kernel/src/lib.rs index fef952fd..53593974 100644 --- a/crates/kernel/src/lib.rs +++ b/crates/kernel/src/lib.rs @@ -1,13 +1,24 @@ +//! # Spacegate kernel crate. +//! +//! This crate provides the core functionality of spacegate. + #![deny(clippy::unwrap_used, clippy::dbg_macro, clippy::unimplemented, clippy::todo)] #![warn(clippy::missing_errors_doc, clippy::indexing_slicing)] -// pub mod config; +/// https services, ws services, and static file services. pub mod backend_service; +/// a boxed body pub mod body; +/// extensions for request and response pub mod extension; +/// extractors for request pub mod extractor; +/// helper layers pub mod helper_layers; +/// tcp listener pub mod listener; +/// gateway service pub mod service; +/// util functions and structs pub mod utils; pub use backend_service::ArcHyperService; @@ -22,11 +33,15 @@ use hyper::{body::Bytes, Request, Response, StatusCode}; use tower_layer::layer_fn; pub type BoxResult = Result; +/// A boxed error. pub type BoxError = Box; +/// Alias for a request with a boxed body. pub type SgRequest = Request; +/// Alias for a response with a boxed body. pub type SgResponse = Response; +/// Provides extension methods for [`Request`](hyper::Request). pub trait SgRequestExt { fn with_reflect(&mut self); fn reflect_mut(&mut self) -> &mut Reflect; @@ -72,6 +87,7 @@ impl SgRequestExt for SgRequest { } } +/// Provides extension methods for [`Response`](hyper::Response). pub trait SgResponseExt { fn with_code_message(code: StatusCode, message: impl Into) -> Self; fn bad_gateway(e: E) -> Self @@ -94,14 +110,13 @@ impl SgResponseExt for Response { } } -pub type ReqOrResp = Result, Response>; - +/// A boxed [`Layer`] that can be used as a plugin layer in gateway. pub struct BoxLayer { boxed: Box + Send + Sync + 'static>, } impl BoxLayer { - /// Create a new [`SgBoxLayer`]. + /// Create a new [`BoxLayer`]. pub fn new(inner_layer: L) -> Self where L: Layer + Send + Sync + 'static, @@ -115,15 +130,17 @@ impl BoxLayer { Self { boxed: Box::new(layer) } } + + /// Create a new [`BoxLayer`] with an arc wrapped layer. #[must_use] - pub fn layer_boxed(&self, inner: ArcHyperService) -> ArcHyperService { + pub fn layer_shared(&self, inner: ArcHyperService) -> ArcHyperService { self.boxed.layer(inner) } } impl Layer for BoxLayer where - S: Clone + hyper::service::Service, Response = Response, Error = Infallible> + Send + Sync + 'static, + S: hyper::service::Service, Response = Response, Error = Infallible> + Send + Sync + 'static, >>::Future: std::marker::Send, { type Service = ArcHyperService; diff --git a/crates/kernel/src/listener.rs b/crates/kernel/src/listener.rs index cc891f99..fc83e4da 100644 --- a/crates/kernel/src/listener.rs +++ b/crates/kernel/src/listener.rs @@ -58,7 +58,7 @@ impl SgListen { } #[derive(Clone)] -pub struct HyperServiceAdapter +struct HyperServiceAdapter where S: hyper::service::Service, Error = Infallible, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, diff --git a/crates/kernel/src/service/gateway.rs b/crates/kernel/src/service/gateway.rs index 8a189ae8..94d09939 100644 --- a/crates/kernel/src/service/gateway.rs +++ b/crates/kernel/src/service/gateway.rs @@ -1,3 +1,8 @@ +//! +//! +//! +//! + pub mod builder; use std::{collections::HashMap, ops::Index, sync::Arc}; @@ -83,7 +88,7 @@ impl Index<(usize, usize)> for HttpRoutedService { } impl Router for GatewayRouter { type Index = (usize, usize); - #[instrument(skip_all, fields(uri = req.uri().to_string(), method = req.method().as_str(), host = ?req.headers().get(HOST) ))] + #[instrument(skip_all, fields(http.host =? req.headers().get(HOST) ))] fn route(&self, req: &mut Request) -> Option { let host = req.uri().host().or(req.headers().get(HOST).and_then(|x| x.to_str().ok()))?; let indices = self.hostname_tree.get(host)?; diff --git a/crates/kernel/src/service/http_route.rs b/crates/kernel/src/service/http_route.rs index 1e790e8c..c7d3bfae 100644 --- a/crates/kernel/src/service/http_route.rs +++ b/crates/kernel/src/service/http_route.rs @@ -4,9 +4,9 @@ pub mod match_request; use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); use crate::{ - backend_service::{http_backend_service, static_file_service::static_file_service, ArcHyperService}, + backend_service::{get_http_backend_service, http_backend_service, static_file_service::static_file_service, ArcHyperService}, extension::{BackendHost, Reflect}, - helper_layers::random_pick, + helper_layers::balancer::{self, Balancer}, utils::{fold_box_layers::fold_layers, schema_port::port_to_schema}, BoxLayer, SgBody, }; @@ -15,6 +15,7 @@ use futures_util::future::BoxFuture; use hyper::{Request, Response}; use tower_layer::Layer; +use tracing::{instrument, Instrument}; use self::{ builder::{HttpBackendBuilder, HttpRouteBuilder, HttpRouteRuleBuilder}, @@ -61,55 +62,52 @@ pub struct HttpRouteRule { pub plugins: Vec, timeouts: Option, backends: Vec, + balance_policy: BalancePolicyEnum, pub ext: hyper::http::Extensions, } +#[derive(Debug, Default)] +pub enum BalancePolicyEnum { + Random, + #[default] + IpHash, +} + impl HttpRouteRule { pub fn builder() -> HttpRouteRuleBuilder { HttpRouteRuleBuilder::new() } pub fn as_service(&self) -> HttpRouteRuleService { use crate::helper_layers::timeout::TimeoutLayer; - let empty = self.backends.is_empty(); let filter_layer = self.plugins.iter(); let time_out = self.timeouts.unwrap_or(DEFAULT_TIMEOUT); - let service = if empty { - fold_layers(filter_layer, ArcHyperService::new(TimeoutLayer::new(time_out).layer(HttpBackendService::http_default()))) - } else { - let service_iter = self.backends.iter().map(|l| (l.weight, l.as_service())); - let random_picker = random_pick::RandomPick::new(service_iter); - fold_layers(filter_layer, ArcHyperService::new(TimeoutLayer::new(time_out).layer(random_picker))) + let fallback = get_http_backend_service(); + let service_iter = self.backends.iter().map(HttpBackend::as_service).collect::>(); + let balanced = match self.balance_policy { + BalancePolicyEnum::Random => { + let weights = self.backends.iter().map(|x| x.weight); + ArcHyperService::new(Balancer::new(balancer::Random::new(weights), service_iter, fallback)) + } + BalancePolicyEnum::IpHash => ArcHyperService::new(Balancer::new(balancer::IpHash::default(), service_iter, fallback)), }; - - let r#match = self.r#match.clone().map(|v| v.into_iter().map(Arc::new).collect::>()); - HttpRouteRuleService { - r#match, - service, - ext: self.ext.clone(), - } + let service = fold_layers(filter_layer, ArcHyperService::new(TimeoutLayer::new(time_out).layer(balanced))); + HttpRouteRuleService { service } } } #[derive(Clone)] pub struct HttpRouteRuleService { - pub r#match: Option]>>, pub service: ArcHyperService, - pub ext: hyper::http::Extensions, } impl hyper::service::Service> for HttpRouteRuleService { type Response = Response; type Error = Infallible; type Future = >>::Future; - + #[instrument("route_rule", skip_all)] fn call(&self, req: Request) -> Self::Future { - tracing::trace!(elapsed = ?req.extensions().get::().map(crate::extension::EnterTime::elapsed), "enter route rule"); let fut = self.service.call(req); - Box::pin(async move { - let result = fut.await; - tracing::trace!("finished route rule"); - result - }) + Box::pin(fut.in_current_span()) } } @@ -118,7 +116,6 @@ impl hyper::service::Service> for HttpRouteRuleService { Backend *****************************************************************************************/ - #[derive(Debug)] pub struct HttpBackend { pub plugins: Vec, @@ -134,10 +131,7 @@ impl HttpBackend { } pub fn as_service(&self) -> ArcHyperService { let inner_service = HttpBackendService { - weight: self.weight, backend: self.backend.clone().into(), - timeout: self.timeout, - ext: self.ext.clone(), }; let timeout_layer = crate::helper_layers::timeout::TimeoutLayer::new(self.timeout.unwrap_or(DEFAULT_TIMEOUT)); let filtered = fold_layers(self.plugins.iter(), ArcHyperService::new(timeout_layer.layer(inner_service))); @@ -154,9 +148,6 @@ pub enum Backend { #[derive(Clone)] pub struct HttpBackendService { pub backend: Arc, - pub weight: u16, - pub timeout: Option, - pub ext: hyper::http::Extensions, } impl HttpBackendService { @@ -167,9 +158,6 @@ impl HttpBackendService { port: None, schema: None, }), - weight: 1, - timeout: None, - ext: hyper::http::Extensions::new(), } } } @@ -179,14 +167,16 @@ impl hyper::service::Service> for HttpBackendService { type Error = Infallible; type Future = BoxFuture<'static, Result, Infallible>>; - fn call(&self, req: Request) -> Self::Future { - let map_request = match self.backend.as_ref() { + #[instrument("backend", skip_all)] + fn call(&self, mut req: Request) -> Self::Future { + let req = match self.backend.as_ref() { Backend::Http { host: None, port: None, schema: None, - } => None, - Backend::Http { host, port, schema } => Some(move |mut req: Request| { + } + | Backend::File { .. } => req, + Backend::Http { host, port, schema } => { if let Some(ref host) = host { if let Some(reflect) = req.extensions_mut().get_mut::() { reflect.insert(BackendHost::new(host.clone())); @@ -217,17 +207,23 @@ impl hyper::service::Service> for HttpBackendService { } } req - }), - Backend::File { .. } => None, + } }; - let req = if let Some(map_request) = map_request { map_request(req) } else { req }; let backend = self.backend.clone(); tracing::trace!(elapsed = ?req.extensions().get::().map(crate::extension::EnterTime::elapsed), "enter backend {backend:?}"); - Box::pin(async move { - match backend.as_ref() { - Backend::Http { .. } => http_backend_service(req).await, - Backend::File { path } => Ok(static_file_service(req, path).await), + Box::pin( + async move { + unsafe { + let mut response = match backend.as_ref() { + Backend::Http { .. } => http_backend_service(req).await.unwrap_unchecked(), + Backend::File { path } => static_file_service(req, path).await, + }; + response.extensions_mut().insert(crate::extension::FromBackend::new()); + tracing::trace!(elapsed = ?response.extensions().get::().map(crate::extension::EnterTime::elapsed), "finish backend request"); + Ok(response) + } } - }) + .in_current_span(), + ) } } diff --git a/crates/kernel/src/service/http_route/builder.rs b/crates/kernel/src/service/http_route/builder.rs index 51879211..78753098 100644 --- a/crates/kernel/src/service/http_route/builder.rs +++ b/crates/kernel/src/service/http_route/builder.rs @@ -2,7 +2,7 @@ use std::{fmt::Debug, path::PathBuf, time::Duration}; use crate::BoxLayer; -use super::{match_request::HttpRouteMatch, Backend, HttpBackend, HttpRoute, HttpRouteRule}; +use super::{match_request::HttpRouteMatch, Backend, BalancePolicyEnum, HttpBackend, HttpRoute, HttpRouteRule}; #[derive(Debug)] pub struct HttpRouteBuilder { @@ -85,6 +85,7 @@ pub struct HttpRouteRuleBuilder { timeouts: Option, backends: Vec, pub extensions: hyper::http::Extensions, + pub balance_policy: BalancePolicyEnum, } impl Default for HttpRouteRuleBuilder { fn default() -> Self { @@ -100,6 +101,7 @@ impl HttpRouteRuleBuilder { timeouts: None, backends: Vec::new(), extensions: Default::default(), + balance_policy: BalancePolicyEnum::default(), } } pub fn match_item(mut self, item: impl Into) -> Self { @@ -137,6 +139,10 @@ impl HttpRouteRuleBuilder { self.backends.extend(backend); self } + pub fn balance_policy(mut self, policy: BalancePolicyEnum) -> Self { + self.balance_policy = policy; + self + } pub fn build(self) -> HttpRouteRule { HttpRouteRule { r#match: self.r#match, @@ -144,6 +150,7 @@ impl HttpRouteRuleBuilder { timeouts: self.timeouts, backends: self.backends, ext: self.extensions, + balance_policy: self.balance_policy, } } pub fn ext(mut self, extension: hyper::http::Extensions) -> Self { @@ -214,21 +221,18 @@ impl HttpBackendBuilder { pub fn host(mut self, host: impl Into) -> Self { self.backend = HttpBackendKindBuilder { host: Some(host.into()), - ..Default::default() + ..self.backend }; self } pub fn port(mut self, port: u16) -> Self { - self.backend = HttpBackendKindBuilder { - port: Some(port), - ..Default::default() - }; + self.backend = HttpBackendKindBuilder { port: Some(port), ..self.backend }; self } pub fn schema(mut self, schema: impl Into) -> Self { self.backend = HttpBackendKindBuilder { schema: Some(schema.into()), - ..Default::default() + ..self.backend }; self } diff --git a/crates/kernel/src/utils.rs b/crates/kernel/src/utils.rs index 2ede4524..ee9c6b8a 100644 --- a/crates/kernel/src/utils.rs +++ b/crates/kernel/src/utils.rs @@ -2,6 +2,7 @@ pub mod fold_box_layers; mod never; pub mod query_kv; pub use never::never; +pub use query_kv::QueryKvIter; pub mod schema_port; mod x_forwarded_for; pub use x_forwarded_for::x_forwarded_for; diff --git a/crates/kernel/src/utils/fold_box_layers.rs b/crates/kernel/src/utils/fold_box_layers.rs index 4e244be7..8e32d1fd 100644 --- a/crates/kernel/src/utils/fold_box_layers.rs +++ b/crates/kernel/src/utils/fold_box_layers.rs @@ -1,8 +1,10 @@ use crate::{ArcHyperService, BoxLayer}; +/// Fold layers into a single service, +/// the order of the layers is reversed. pub fn fold_layers<'a>(layers: impl Iterator + std::iter::DoubleEndedIterator, mut inner: ArcHyperService) -> ArcHyperService { for l in layers.rev() { - inner = l.layer_boxed(inner); + inner = l.layer_shared(inner); } inner } diff --git a/crates/kernel/src/utils/query_kv.rs b/crates/kernel/src/utils/query_kv.rs index 2a53b1f4..bcd1b26f 100644 --- a/crates/kernel/src/utils/query_kv.rs +++ b/crates/kernel/src/utils/query_kv.rs @@ -1,3 +1,16 @@ +/// A zero-copy query key-value iterator. +/// +/// # Example +/// ```rust +/// # use spacegate_kernel::utils::QueryKvIter; +/// # fn main() { +/// let query = "a=1&b=2&c"; +/// let mut iter = QueryKvIter::new(query); +/// assert_eq!(iter.next(), Some(("a", Some("1")))); +/// assert_eq!(iter.next(), Some(("b", Some("2")))); +/// assert_eq!(iter.next(), Some(("c", None))); +/// # } +/// ``` pub struct QueryKvIter<'a> { inner: &'a str, } diff --git a/crates/kernel/src/utils/with_length_or_chunked.rs b/crates/kernel/src/utils/with_length_or_chunked.rs index 319cdff5..439b80ca 100644 --- a/crates/kernel/src/utils/with_length_or_chunked.rs +++ b/crates/kernel/src/utils/with_length_or_chunked.rs @@ -1,5 +1,7 @@ use crate::SgBody; use hyper::{header::HeaderValue, Response}; + +/// Set the content length header or transfer encoding to chunked. pub fn with_length_or_chunked(resp: &mut Response) { const CHUNKED: &str = "chunked"; resp.headers_mut().remove(hyper::header::CONTENT_LENGTH); diff --git a/crates/kernel/src/utils/x_request_id.rs b/crates/kernel/src/utils/x_request_id.rs index e8ebf638..ba8d142e 100644 --- a/crates/kernel/src/utils/x_request_id.rs +++ b/crates/kernel/src/utils/x_request_id.rs @@ -7,9 +7,11 @@ use hyper::{http::HeaderValue, Request, Response}; use crate::{helper_layers::function::Inner, SgBody}; +/// Generate a `x-request-id` header for the request and response. pub trait XRequestIdAlgo { fn generate() -> HeaderValue; } +/// The header name for `x-request-id`. pub const X_REQUEST_ID_HEADER_NAME: &str = "x-request-id"; /// Add a `x-request-id` header to the request and then response. /// diff --git a/crates/kernel/tests/test_h2.rs b/crates/kernel/tests/test_h2.rs index 2f0247e7..fb0deeca 100644 --- a/crates/kernel/tests/test_h2.rs +++ b/crates/kernel/tests/test_h2.rs @@ -20,7 +20,28 @@ use tokio_rustls::rustls::ServerConfig; use tokio_util::sync::CancellationToken; use tower_layer::Layer; #[tokio::test] -async fn test_h2() { +async fn test_h2_over_tls() { + std::env::set_var("RUST_LOG", "TRACE,h2=off,tokio_util=off,spacegate_kernel=TRACE"); + tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).init(); + tokio::spawn(gateway()); + tokio::spawn(axum_server()); + // wait for startup + tokio::time::sleep(Duration::from_millis(200)).await; + let client = reqwest::Client::builder().danger_accept_invalid_certs(true).http2_prior_knowledge().build().unwrap(); + let mut task_set = tokio::task::JoinSet::new(); + for idx in 0..1 { + let client = client.clone(); + task_set.spawn(async move { + let echo = client.post("https://[::]:9443/echo").body(idx.to_string()).send().await.expect("fail to send").text().await.expect("fail to get text"); + println!("echo: {echo}"); + assert_eq!(idx.to_string(), echo); + }); + } + while let Some(Ok(r)) = task_set.join_next().await {} +} + +#[tokio::test] +async fn test_h2c() { std::env::set_var("RUST_LOG", "TRACE,h2=off,tokio_util=off,spacegate_kernel=TRACE"); tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).init(); tokio::spawn(gateway()); @@ -32,7 +53,7 @@ async fn test_h2() { for idx in 0..10 { let client = client.clone(); task_set.spawn(async move { - let echo = client.post("https://[::]:9002/echo").body(idx.to_string()).send().await.expect("fail to send").text().await.expect("fail to get text"); + let echo = client.post("http://[::]:9080/echo").body(idx.to_string()).send().await.expect("fail to send").text().await.expect("fail to get text"); println!("echo: {echo}"); assert_eq!(idx.to_string(), echo); }); @@ -45,25 +66,34 @@ async fn gateway() { let gateway = gateway::Gateway::builder("test_h2") .http_routers([( "test_h2".to_string(), - HttpRoute::builder().rule(HttpRouteRule::builder().match_all().backend(HttpBackend::builder().host("[::]").port(9003).build()).build()).build(), + HttpRoute::builder().rule(HttpRouteRule::builder().match_all().backend(HttpBackend::builder().host("[::]").port(9003).schema("https").build()).build()).build(), )]) .build(); - let addr = SocketAddr::from_str("[::]:9002").expect("invalid host"); + let addr = SocketAddr::from_str("[::]:9080").expect("invalid host"); + let addr_tls = SocketAddr::from_str("[::]:9443").expect("invalid host"); + + let listener_tls = SgListen::new(addr_tls, gateway.as_service(), cancel.clone(), "listener").with_tls_config(tls_config()); + let listener = SgListen::new(addr, gateway.as_service(), cancel, "listener"); - let listener = SgListen::new(addr, gateway.as_service(), cancel, "listener").with_tls_config(tls_config()); - listener.listen().await.expect("fail to listen"); + let f_tls = listener_tls.listen(); + let f = listener.listen(); + let (res_tls, res) = tokio::join!(f_tls, f); + res_tls.expect("fail to listen tls"); + res.expect("fail to listen"); } const CERT: &[u8] = include_bytes!("test_https/.cert"); const KEY: &[u8] = include_bytes!("test_https/.key"); fn tls_config() -> ServerConfig { - ServerConfig::builder() + let mut config = ServerConfig::builder() .with_no_client_auth() .with_single_cert( rustls_pemfile::certs(&mut CERT).filter_map(Result::ok).collect(), rustls_pemfile::private_key(&mut KEY).ok().flatten().expect("fail to get key"), ) - .expect("fail to build tls config") + .expect("fail to build tls config"); + config.alpn_protocols = vec![b"h2".to_vec()]; + config } async fn axum_server() { diff --git a/crates/plugin/Cargo.toml b/crates/plugin/Cargo.toml index 89334492..696ee458 100644 --- a/crates/plugin/Cargo.toml +++ b/crates/plugin/Cargo.toml @@ -20,6 +20,7 @@ path = "src/lib.rs" cache = ["redis"] redis = ["spacegate-kernel/ext-redis", "spacegate-model/ext-redis", "spacegate-ext-redis"] axum = ["spacegate-ext-axum", "spacegate-model/ext-axum"] +dylib = ["libloading"] limit = ["cache"] header-modifier = [] inject = [] @@ -44,6 +45,7 @@ full = [ schema = ["schemars", "schemars/chrono"] [dependencies] +libloading = { version = "0.8", optional = true } spacegate-model = { path = "../model" } spacegate-kernel = { path = "../kernel" } tracing = { workspace = true } @@ -74,7 +76,7 @@ spacegate-ext-redis = { path = "../extension/redis", optional = true } spacegate-ext-axum = { path = "../extension/axum", optional = true } # rt -tokio = { workspace = true } +tokio = { workspace = true, features = ["rt", "fs", "time", "net"]} arc-swap = "1" [dev-dependencies] diff --git a/crates/plugin/schema/header_modifier.json b/crates/plugin/schema/header_modifier.json deleted file mode 100644 index 029486a7..00000000 --- a/crates/plugin/schema/header_modifier.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgFilterHeaderModifier", - "examples": [ - { - "kind": "Request", - "remove": [], - "sets": {} - } - ], - "type": "object", - "properties": { - "kind": true, - "remove": { - "type": [ - "array", - "null" - ], - "items": true - }, - "sets": { - "type": [ - "object", - "null" - ], - "additionalProperties": true - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/inject.json b/crates/plugin/schema/inject.json deleted file mode 100644 index 987e1d83..00000000 --- a/crates/plugin/schema/inject.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgFilterInject", - "examples": [ - { - "req_inject_url": "http://localhost:8080/inject", - "req_timeout": { - "nanos": 0, - "secs": 5 - }, - "resp_inject_url": "http://localhost:8080/inject", - "resp_timeout": { - "nanos": 0, - "secs": 5 - } - } - ], - "type": "object", - "properties": { - "req_inject_url": { - "type": [ - "string", - "null" - ] - }, - "req_timeout": { - "type": "object", - "properties": { - "nanos": { - "type": "integer" - }, - "secs": { - "type": "integer" - } - } - }, - "resp_inject_url": { - "type": [ - "string", - "null" - ] - }, - "resp_timeout": { - "type": "object", - "properties": { - "nanos": { - "type": "integer" - }, - "secs": { - "type": "integer" - } - } - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/limit.json b/crates/plugin/schema/limit.json deleted file mode 100644 index 17880ddc..00000000 --- a/crates/plugin/schema/limit.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RateLimitConfig", - "type": "object", - "required": [ - "id" - ], - "properties": { - "id": { - "type": "string" - }, - "max_request_number": { - "type": [ - "integer", - "null" - ], - "format": "uint64", - "minimum": 0.0 - }, - "time_window_ms": { - "type": [ - "integer", - "null" - ], - "format": "uint64", - "minimum": 0.0 - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/maintenance.json b/crates/plugin/schema/maintenance.json deleted file mode 100644 index 9d829a02..00000000 --- a/crates/plugin/schema/maintenance.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgFilterMaintenanceConfig", - "examples": [ - { - "enabled_time_range": null, - "exclude_ip_range": null, - "msg": "under maintenance", - "title": "maintenance title" - } - ], - "type": "object", - "properties": { - "enabled_time_range": true, - "exclude_ip_range": true, - "msg": { - "type": "string" - }, - "title": { - "type": "string" - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/redirect.json b/crates/plugin/schema/redirect.json deleted file mode 100644 index e2c89c02..00000000 --- a/crates/plugin/schema/redirect.json +++ /dev/null @@ -1,92 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RedirectFilter", - "description": "RedirectFilter defines a filter that redirects a request.\n\nhttps://gateway-api.sigs.k8s.io/geps/gep-726/", - "type": "object", - "properties": { - "hostname": { - "description": "Hostname is the hostname to be used in the value of the Location header in the response. When empty, the hostname in the Host header of the request is used.", - "type": [ - "string", - "null" - ] - }, - "path": { - "description": "Path defines parameters used to modify the path of the incoming request. The modified path is then used to construct the Location header. When empty, the request path is used as-is.", - "anyOf": [ - { - "$ref": "#/definitions/SgHttpPathModifier" - }, - { - "type": "null" - } - ] - }, - "port": { - "description": "Port is the port to be used in the value of the Location header in the response.", - "type": [ - "integer", - "null" - ], - "format": "uint16", - "minimum": 0.0 - }, - "scheme": { - "description": "Scheme is the scheme to be used in the value of the Location header in the response. When empty, the scheme of the request is used.", - "type": [ - "string", - "null" - ] - }, - "status_code": { - "description": "StatusCode is the HTTP status code to be used in response.", - "type": [ - "integer", - "null" - ], - "format": "uint16", - "minimum": 0.0 - } - }, - "definitions": { - "SgHttpPathModifier": { - "type": "object", - "required": [ - "kind", - "value" - ], - "properties": { - "kind": { - "description": "Type defines the type of path modifier.", - "allOf": [ - { - "$ref": "#/definitions/SgHttpPathModifierType" - } - ] - }, - "value": { - "description": "Value is the value to be used to replace the path during forwarding.", - "type": "string" - } - } - }, - "SgHttpPathModifierType": { - "oneOf": [ - { - "description": "This type of modifier indicates that the full path will be replaced by the specified value.", - "type": "string", - "enum": [ - "ReplaceFullPath" - ] - }, - { - "description": "This type of modifier indicates that any prefix path matches will be replaced by the substitution value. For example, a path with a prefix match of “/foo” and a ReplacePrefixMatch substitution of “/bar” will have the “/foo” prefix replaced with “/bar” in matching requests.", - "type": "string", - "enum": [ - "ReplacePrefixMatch" - ] - } - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/redis-count.json b/crates/plugin/schema/redis-count.json deleted file mode 100644 index e377a91d..00000000 --- a/crates/plugin/schema/redis-count.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RedisCountConfig", - "type": "object", - "required": [ - "header" - ], - "properties": { - "header": { - "type": "string" - }, - "id": { - "type": [ - "string", - "null" - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/redis-dynamic-route.json b/crates/plugin/schema/redis-dynamic-route.json deleted file mode 100644 index e7f73d99..00000000 --- a/crates/plugin/schema/redis-dynamic-route.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RedisDynamicRouteConfig", - "type": "object", - "required": [ - "header" - ], - "properties": { - "header": { - "type": "string" - }, - "id": { - "type": [ - "string", - "null" - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/redis-limit.json b/crates/plugin/schema/redis-limit.json deleted file mode 100644 index b8361726..00000000 --- a/crates/plugin/schema/redis-limit.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RedisLimitConfig", - "type": "object", - "required": [ - "header" - ], - "properties": { - "header": { - "type": "string" - }, - "id": { - "type": [ - "string", - "null" - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/redis-time-range.json b/crates/plugin/schema/redis-time-range.json deleted file mode 100644 index 49908e3f..00000000 --- a/crates/plugin/schema/redis-time-range.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "RedisTimeRangeConfig", - "type": "object", - "required": [ - "header" - ], - "properties": { - "header": { - "type": "string" - }, - "id": { - "type": [ - "string", - "null" - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/retry.json b/crates/plugin/schema/retry.json deleted file mode 100644 index c3a85a4b..00000000 --- a/crates/plugin/schema/retry.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgPluginRetryConfig", - "type": "object", - "properties": { - "backoff": { - "description": "Backoff strategies can vary depending on the specific implementation and requirements. see [BackOff]", - "default": "Exponential", - "allOf": [ - { - "$ref": "#/definitions/BackOff" - } - ] - }, - "base_interval": { - "description": "milliseconds", - "default": 100, - "type": "integer", - "format": "uint64", - "minimum": 0.0 - }, - "max_interval": { - "description": "milliseconds", - "default": 10000, - "type": "integer", - "format": "uint64", - "minimum": 0.0 - }, - "retirable_methods": { - "default": [ - "*" - ], - "type": "array", - "items": { - "type": "string" - } - }, - "retries": { - "default": 3, - "type": "integer", - "format": "uint16", - "minimum": 0.0 - } - }, - "definitions": { - "BackOff": { - "oneOf": [ - { - "type": "string", - "enum": [ - "Random" - ] - }, - { - "description": "Fixed interval", - "type": "string", - "enum": [ - "Fixed" - ] - }, - { - "description": "In the exponential backoff strategy, the initial delay is relatively short, but it gradually increases as the number of retries increases. Typically, the delay time is calculated by multiplying a base value with an exponential factor. For example, the delay time might be calculated as `base_value * (2 ^ retry_count)`.", - "type": "string", - "enum": [ - "Exponential" - ] - } - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/rewrite.json b/crates/plugin/schema/rewrite.json deleted file mode 100644 index 255b4ecd..00000000 --- a/crates/plugin/schema/rewrite.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgFilterRewriteConfig", - "description": "RewriteFilter defines a filter that modifies a request during forwarding.\n\nhttps://gateway-api.sigs.k8s.io/geps/gep-726/", - "type": "object", - "properties": { - "hostname": { - "description": "Hostname is the value to be used to replace the Host header value during forwarding.", - "type": [ - "string", - "null" - ] - }, - "path": { - "description": "Path defines parameters used to modify the path of the incoming request. The modified path is then used to construct the Location header. When empty, the request path is used as-is.", - "anyOf": [ - { - "$ref": "#/definitions/SgHttpPathModifier" - }, - { - "type": "null" - } - ] - } - }, - "definitions": { - "SgHttpPathModifier": { - "type": "object", - "required": [ - "kind", - "value" - ], - "properties": { - "kind": { - "description": "Type defines the type of path modifier.", - "allOf": [ - { - "$ref": "#/definitions/SgHttpPathModifierType" - } - ] - }, - "value": { - "description": "Value is the value to be used to replace the path during forwarding.", - "type": "string" - } - } - }, - "SgHttpPathModifierType": { - "oneOf": [ - { - "description": "This type of modifier indicates that the full path will be replaced by the specified value.", - "type": "string", - "enum": [ - "ReplaceFullPath" - ] - }, - { - "description": "This type of modifier indicates that any prefix path matches will be replaced by the substitution value. For example, a path with a prefix match of “/foo” and a ReplacePrefixMatch substitution of “/bar” will have the “/foo” prefix replaced with “/bar” in matching requests.", - "type": "string", - "enum": [ - "ReplacePrefixMatch" - ] - } - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/static-resource.json b/crates/plugin/schema/static-resource.json deleted file mode 100644 index 0c992484..00000000 --- a/crates/plugin/schema/static-resource.json +++ /dev/null @@ -1,92 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "StaticResourceConfig", - "description": "StaticResourceConfig", - "type": "object", - "required": [ - "body", - "code", - "content_type" - ], - "properties": { - "body": { - "description": "response body", - "allOf": [ - { - "$ref": "#/definitions/BodyEnum" - } - ] - }, - "code": { - "description": "response status code", - "type": "integer", - "format": "uint16", - "minimum": 0.0 - }, - "content_type": { - "description": "response content type", - "type": "string" - } - }, - "definitions": { - "BodyEnum": { - "oneOf": [ - { - "description": "json value", - "type": "object", - "required": [ - "kind", - "value" - ], - "properties": { - "kind": { - "type": "string", - "enum": [ - "Json" - ] - }, - "value": true - } - }, - { - "description": "plain text", - "type": "object", - "required": [ - "kind", - "value" - ], - "properties": { - "kind": { - "type": "string", - "enum": [ - "Text" - ] - }, - "value": { - "type": "string" - } - } - }, - { - "description": "read a static file from file system", - "type": "object", - "required": [ - "kind", - "value" - ], - "properties": { - "kind": { - "type": "string", - "enum": [ - "File" - ] - }, - "value": { - "type": "string" - } - } - } - ] - } - } -} \ No newline at end of file diff --git a/crates/plugin/schema/status.json b/crates/plugin/schema/status.json deleted file mode 100644 index ccf69c49..00000000 --- a/crates/plugin/schema/status.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SgFilterStatusConfig", - "type": "object", - "properties": { - "host": { - "default": "0.0.0.0", - "type": "string" - }, - "interval": { - "description": "second", - "default": 5, - "type": "integer", - "format": "uint64", - "minimum": 0.0 - }, - "port": { - "default": 8110, - "type": "integer", - "format": "uint16", - "minimum": 0.0 - }, - "status_cache_key": { - "default": "spacegate:cache:plugin:status", - "type": "string" - }, - "title": { - "default": "System Status", - "type": "string" - }, - "unhealthy_threshold": { - "description": "Unhealthy threshold , if server error more than this, server will be tag as unhealthy", - "default": 3, - "type": "integer", - "format": "uint16", - "minimum": 0.0 - }, - "window_cache_key": { - "default": "sg:plugin:filter:window:key", - "type": "string" - } - } -} \ No newline at end of file diff --git a/crates/plugin/src/dynamic.rs b/crates/plugin/src/dynamic.rs new file mode 100644 index 00000000..3af246c0 --- /dev/null +++ b/crates/plugin/src/dynamic.rs @@ -0,0 +1,68 @@ +//! # Using a dynamic library as plugins +//! +//! +//! +//! + +use std::ffi::OsStr; + +use spacegate_kernel::BoxResult; + +/// Macro to register plugins from a dynamic library. +/// +/// # Usage +/// ```rust no_run +/// use spacegate_plugin::dynamic_lib; +/// dynamic_lib! { +/// #[cfg(feature = "my_plugin1")] +/// MyPlugin1, +/// MyPlugin2, +/// MyPlugin3, +/// } +/// +/// ``` +#[macro_export] +macro_rules! dynamic_lib { + ($( + $(#[$m:meta])* + $Type:ty + ),*) => { + #[no_mangle] + pub extern "Rust" fn register(repo: &$crate::PluginRepository) { + $( + $(#[$m])* + repo.register::<$Type>(); + )* + } + }; +} +impl crate::PluginRepository { + /// + /// # Usage + /// The library must implement a function named `register` with the following signature: + /// ```rust no_run + /// # use spacegate_plugin::PluginRepository; + /// #[no_mangle] + /// pub extern "Rust" fn register(repo: &PluginRepository) { + /// + /// } + /// ``` + /// A way to define this function is using the [`crate::dynamic_lib!`] macro. + /// + /// # Safety + /// Loading a dynamic library could lead to undefined behavior if the library is not implemented correctly. + /// + /// Loaded libraries will be leaked and never unloaded, so you should be careful with this function. + /// + /// # Errors + /// Target is not a valid dynamic library or the library does not implement the `register` function. + pub unsafe fn register_dylib>(&self, path: P) -> BoxResult<()> { + let lib = libloading::Library::new(path)?; + let register: libloading::Symbol = lib.get(b"register")?; + register(self); + let lib = Box::new(lib); + // keep the allocated memory + Box::leak(lib); + Ok(()) + } +} diff --git a/crates/plugin/src/error.rs b/crates/plugin/src/error.rs index b82dac07..beabfcc1 100644 --- a/crates/plugin/src/error.rs +++ b/crates/plugin/src/error.rs @@ -5,6 +5,17 @@ use spacegate_kernel::{SgBody, SgResponseExt}; use crate::Plugin; +/// # Usage +/// 1. Create a plugin error with a status code +/// ```rust ignore +/// let x = result.map_err(PluginError::status::); +/// ``` +/// 2. Create a plugin error with a status 500 +/// ```rust ignore +/// let x = result.map_err(PluginError::internal_error::); +/// ``` +/// 3. Convert it into a response +/// Just use [`Response::from`]. #[derive(Debug)] pub struct PluginError { plugin_code: &'static str, diff --git a/crates/plugin/src/ext/axum.rs b/crates/plugin/src/ext/axum.rs index 649b5597..999abd7e 100644 --- a/crates/plugin/src/ext/axum.rs +++ b/crates/plugin/src/ext/axum.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::HashMap}; +use std::collections::HashMap; use serde::{Deserialize, Serialize}; use spacegate_ext_axum::{ @@ -28,16 +28,16 @@ pub async fn register_plugin_routes() { .await } -pub async fn repo_snapshot() -> axum::Json, PluginRepoSnapshot>> { - axum::Json(crate::SgPluginRepository::global().repo_snapshot()) +pub async fn repo_snapshot() -> axum::Json> { + axum::Json(crate::PluginRepository::global().repo_snapshot()) } pub async fn instance_snapshot(Query(instance_id): Query) -> axum::Json> { - axum::Json(crate::SgPluginRepository::global().instance_snapshot(instance_id)) + axum::Json(crate::PluginRepository::global().instance_snapshot(instance_id)) } pub async fn plugin_list() -> axum::Json> { - axum::Json(crate::SgPluginRepository::global().plugin_list()) + axum::Json(crate::PluginRepository::global().plugin_list()) } #[cfg(feature = "schema")] @@ -48,7 +48,6 @@ pub struct PluginCode { #[cfg(feature = "schema")] pub async fn plugin_schema(Query(PluginCode { code }): Query) -> axum::Json> { - let code: Cow<'static, str> = code.into(); - let schema = crate::SgPluginRepository::global().plugins.read().expect("poisoned").get(&code).and_then(|p| p.schema.clone()); + let schema = crate::PluginRepository::global().plugins.read().expect("poisoned").get(&code).and_then(|p| p.schema.clone()); axum::Json(schema) } diff --git a/crates/plugin/src/instance.rs b/crates/plugin/src/instance.rs index b73baccb..a07c2061 100644 --- a/crates/plugin/src/instance.rs +++ b/crates/plugin/src/instance.rs @@ -14,22 +14,20 @@ use spacegate_model::PluginConfig; use crate::mount::{MountPoint, MountPointIndex}; pub struct PluginInstance { - // data pub config: PluginConfig, pub mount_points: HashMap, pub hooks: PluginInstanceHooks, - pub resource: PluginInstanceResource, pub plugin_function: crate::layer::PluginFunction, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DropMarker { +pub(crate) struct DropMarker { drop_signal: Arc, } #[derive(Debug, Clone, Default)] -pub struct DropMarkerSet { +pub(crate) struct DropMarkerSet { pub(crate) inner: HashSet, } @@ -39,9 +37,6 @@ pub struct DropTracer { } impl DropTracer { - pub fn count(&self) -> usize { - self.drop_signal.strong_count() - } pub fn all_dropped(&self) -> bool { self.drop_signal.strong_count() == 0 } @@ -58,7 +53,6 @@ pub(crate) fn drop_trace() -> (DropTracer, DropMarker) { ) } -pub type BoxMakeFn = Box Result + Sync + Send + 'static>; type PluginInstanceHook = Box Result<(), BoxError> + Send + Sync + 'static>; #[derive(Default)] @@ -104,7 +98,7 @@ macro_rules! expose_hooks { pub(crate) fn $hook(&self) -> BoxResult<()> { self.call_hook(&self.hooks.$hook) } - pub fn $setter(&mut self, hook: M) + pub(crate) fn $setter(&mut self, hook: M) where M: Fn(&PluginInstance) -> Result<(), BoxError> + Send + Sync + 'static { self.hooks.$hook = Some(Box::new(hook)) @@ -112,7 +106,7 @@ macro_rules! expose_hooks { )* }; } - +#[allow(dead_code)] impl PluginInstance { pub(crate) fn mount_at(&mut self, mount_point: &mut M, index: MountPointIndex) -> Result<(), BoxError> { let tracer = mount_point.mount(self)?; diff --git a/crates/plugin/src/layer.rs b/crates/plugin/src/layer.rs index 1b43e047..0086a354 100644 --- a/crates/plugin/src/layer.rs +++ b/crates/plugin/src/layer.rs @@ -8,15 +8,8 @@ use spacegate_kernel::{ SgBody, }; -// use crate::instance::PluginInstanceId; -// pub struct InstanceRef { -// id: PluginInstanceId, -// } - #[derive(Clone)] pub struct PluginFunction { - // refer: Arc, - // f: ArcSwap, Inner) -> BoxFuture<'static, Response> + Send + Sync + 'static>, f: Arc>, } @@ -29,15 +22,6 @@ impl PluginFunction { } } -impl Drop for PluginFunction { - fn drop(&mut self) { - // if Arc::strong_count(&self.refer) == 1 { - // try to drop the plugin instance - // SgPluginRepository::global().instances. - // } - } -} - impl PluginFunction { pub fn swap(&self, f: Box, Inner) -> BoxFuture<'static, Response> + Send + Sync + 'static>) { self.f.store(f.into()); diff --git a/crates/plugin/src/lib.rs b/crates/plugin/src/lib.rs index 95356c56..4a1c9bdb 100644 --- a/crates/plugin/src/lib.rs +++ b/crates/plugin/src/lib.rs @@ -15,23 +15,55 @@ use mount::{MountPoint, MountPointIndex}; use serde::{Deserialize, Serialize}; pub use serde_json; pub use serde_json::{Error as SerdeJsonError, Value as JsonValue}; -pub use spacegate_kernel::helper_layers::filter::{Filter, FilterRequest, FilterRequestLayer}; pub use spacegate_kernel::helper_layers::function::Inner; pub use spacegate_kernel::BoxError; pub use spacegate_kernel::BoxLayer; -use spacegate_kernel::SgBody; +pub use spacegate_kernel::{SgBody, SgRequest, SgRequestExt, SgResponse, SgResponseExt}; pub mod error; pub mod model; pub mod mount; // pub mod plugins; pub mod instance; pub use error::PluginError; +#[cfg(feature = "dylib")] +pub mod dynamic; pub mod ext; pub mod layer; pub mod plugins; #[cfg(feature = "schema")] pub use schemars; pub use spacegate_model::{plugin_meta, PluginAttributes, PluginConfig, PluginInstanceId, PluginInstanceMap, PluginInstanceName, PluginMetaData}; + +/// # Plugin Trait +/// It's a easy way to define a plugin through this trait. +/// You should give a unique [`code`](Plugin::CODE) for the plugin, +/// and implement the [`call`](Plugin::call) function and the [`create`](Plugin::create) function. +/// +/// # Example +/// In the follow example, we add a server header for each response. +/// ```rust +/// # use spacegate_plugin::{Plugin, SgRequest, SgResponse, Inner, BoxError, PluginConfig}; +/// pub struct ServerHeaderPlugin { +/// header_value: String, +/// } +/// +/// impl Plugin for ServerHeaderPlugin { +/// const CODE: &'static str = "server-header"; +/// async fn call(&self, req: SgRequest, inner: Inner) -> Result { +/// let mut resp = inner.call(req).await; +/// resp.headers_mut().insert("server", self.header_value.parse()?); +/// Ok(resp) +/// } +/// fn create(plugin_config: PluginConfig) -> Result { +/// let Some(header_value) = plugin_config.spec.get("header_value") else { +/// return Err("missing header_value".into()) +/// }; +/// Ok(Self { +/// header_value: header_value.as_str().unwrap_or("spacegate").to_string(), +/// }) +/// } +/// } +/// ``` pub trait Plugin: Any + Sized + Send + Sync { /// plugin code, it should be unique repository-wise. const CODE: &'static str; @@ -48,7 +80,7 @@ pub trait Plugin: Any + Sized + Send + Sync { /// /// If you want to return a error response with other status code, use `PluginError::new` to create a error response, and wrap /// it with `Ok`. - fn call(&self, req: Request, inner: Inner) -> impl Future, BoxError>> + Send; + fn call(&self, req: SgRequest, inner: Inner) -> impl Future> + Send; fn create(plugin_config: PluginConfig) -> Result; fn create_by_spec(spec: JsonValue, name: PluginInstanceName) -> Result { Self::create(PluginConfig { @@ -59,7 +91,7 @@ pub trait Plugin: Any + Sized + Send + Sync { /// Register the plugin to the repository. /// /// You can also register axum server route here. - fn register(repo: &SgPluginRepository) { + fn register(repo: &PluginRepository) { repo.plugins.write().expect("SgPluginRepository register error").insert(Self::CODE.into(), PluginDefinitionObject::from_trait::()); } @@ -70,14 +102,14 @@ pub trait Plugin: Any + Sized + Send + Sync { } } -/// Plugin Attributes +/// Plugin Trait Object pub struct PluginDefinitionObject { pub mono: bool, pub code: Cow<'static, str>, pub meta: PluginMetaData, #[cfg(feature = "schema")] pub schema: Option, - pub make_pf: BoxMakePfMethod, + pub make_pf: Box, } impl PluginDefinitionObject { @@ -117,7 +149,9 @@ impl PluginDefinitionObject { let plugin = Arc::new(P::create(config)?); let function = move |req: Request, inner: Inner| { let plugin = plugin.clone(); + let plugin_span = tracing::span!(tracing::Level::INFO, "plugin", code = P::CODE); let task = async move { + let _entered = plugin_span.enter(); match plugin.call(req, inner).await { Ok(resp) => resp, Err(e) => { @@ -130,13 +164,14 @@ impl PluginDefinitionObject { }; Ok(Box::new(function) as InnerBoxPf) }; + let make_pf = Box::new(constructor); Self { code: P::CODE.into(), #[cfg(feature = "schema")] schema: P::schema_opt(), mono: P::MONO, meta: P::meta(), - make_pf: Box::new(constructor), + make_pf, } } #[inline] @@ -150,28 +185,32 @@ pub trait PluginSchemaExt { fn schema() -> schemars::schema::RootSchema; } -type BoxMakePfMethod = Box Result + Send + Sync + 'static>; -#[derive(Default, Clone)] -pub struct SgPluginRepository { - pub plugins: Arc, PluginDefinitionObject>>>, - pub instances: Arc>>, -} +pub type MakePfMethod = dyn Fn(PluginConfig) -> Result + Send + Sync + 'static; -pub struct PluginInstanceRef { - pub id: PluginInstanceId, - pub digest: u64, +/// # Plugin Repository +/// A repository to manage plugins, it stores plugin definitions and instances. +/// +/// You can get a global instance through [`PluginRepository::global`]. +#[derive(Default, Clone)] +pub struct PluginRepository { + plugins: Arc>>, + instances: Arc>>, } -impl SgPluginRepository { +impl PluginRepository { + /// Get a global instance of this repository. + /// + /// Once the repository is initialized, it will register all plugins in this crate. pub fn global() -> &'static Self { - static INIT: OnceLock = OnceLock::new(); + static INIT: OnceLock = OnceLock::new(); INIT.get_or_init(|| { - let repo = SgPluginRepository::new(); + let repo = PluginRepository::new(); repo.register_prelude(); repo }) } + /// register all plugins in this crates pub fn register_prelude(&self) { self.register::(); #[cfg(feature = "limit")] @@ -201,20 +240,24 @@ impl SgPluginRepository { } } + /// create a new empty repository pub fn new() -> Self { Self::default() } + /// register by [`Plugin`] trait pub fn register(&self) { self.register_custom(PluginDefinitionObject::from_trait::

()) } + /// register a custom plugin pub fn register_custom>(&self, attr: A) { let attr: PluginDefinitionObject = attr.into(); let mut map = self.plugins.write().expect("SgPluginRepository register error"); - let _old_attr = map.insert(attr.code.clone(), attr); + let _old_attr = map.insert(attr.code.to_string(), attr); } + /// clear all instances pub fn clear_instances(&self) { let mut instances = self.instances.write().expect("SgPluginRepository register error"); for (_, inst) in instances.drain() { @@ -224,6 +267,7 @@ impl SgPluginRepository { } } + /// create or update a plugin instance by config pub fn create_or_update_instance(&self, config: PluginConfig) -> Result<(), BoxError> { let attr_rg = self.plugins.read().expect("SgPluginRepository register error"); let code = config.code(); @@ -240,7 +284,6 @@ impl SgPluginRepository { let instance = PluginInstance { config, plugin_function: pf, - resource: Default::default(), mount_points: Default::default(), hooks: Default::default(), }; @@ -250,6 +293,7 @@ impl SgPluginRepository { Ok(()) } + /// remove a plugin instance by id pub fn remove_instance(&self, id: &PluginInstanceId) -> Result, BoxError> { let mut instances = self.instances.write().expect("SgPluginRepository register error"); if let Some(instance) = instances.remove(id) { @@ -260,6 +304,7 @@ impl SgPluginRepository { } } + /// mount a plugin instance to a mount point pub fn mount(&self, mount_point: &mut M, mount_index: MountPointIndex, id: PluginInstanceId) -> Result<(), BoxError> { let attr_rg = self.plugins.read().expect("SgPluginRepository register error"); let code = id.code.as_ref(); @@ -290,7 +335,7 @@ impl SgPluginRepository { map.values().map(PluginDefinitionObject::attr).collect() } - pub fn repo_snapshot(&self) -> HashMap, PluginRepoSnapshot> { + pub fn repo_snapshot(&self) -> HashMap { let plugins = self.plugins.read().expect("SgPluginRepository register error"); plugins .iter() @@ -300,7 +345,7 @@ impl SgPluginRepository { ( code.clone(), PluginRepoSnapshot { - code: code.clone(), + code: code.clone().into(), mono: attr.mono, meta: attr.meta.clone(), instances, diff --git a/crates/plugin/src/plugins.rs b/crates/plugin/src/plugins.rs index 7f454e89..6d60c56f 100644 --- a/crates/plugin/src/plugins.rs +++ b/crates/plugin/src/plugins.rs @@ -18,5 +18,3 @@ pub mod rewrite; // pub mod status; pub mod static_resource; - -// pub mod ffi; diff --git a/crates/plugin/src/plugins/maintenance.rs b/crates/plugin/src/plugins/maintenance.rs index dd293c13..4b36db01 100644 --- a/crates/plugin/src/plugins/maintenance.rs +++ b/crates/plugin/src/plugins/maintenance.rs @@ -200,22 +200,10 @@ crate::schema!(MaintenancePlugin, MaintenancePluginConfig); #[cfg(test)] mod test { - use hyper::StatusCode; - use hyper::{Method, Request, Version}; - use serde_json::json; - use spacegate_kernel::backend_service::get_echo_service; - use spacegate_kernel::extension::PeerAddr; - use spacegate_kernel::helper_layers::function::Inner; - use spacegate_kernel::BoxError; - use spacegate_kernel::SgBody; - use spacegate_model::{PluginInstanceId, PluginInstanceName}; // use tardis::chrono::{Duration, Local}; // use tardis::serde_json; // use tardis::tokio; - use crate::plugins::maintenance::MaintenancePlugin; - use crate::{Plugin, PluginConfig}; - // #[tokio::test] // async fn test_config() -> Result<(), BoxError> { // let now = Local::now(); diff --git a/crates/plugin/src/plugins/status/status_plugin.rs b/crates/plugin/src/plugins/status/status_plugin.rs index 63b034eb..de9831fb 100644 --- a/crates/plugin/src/plugins/status/status_plugin.rs +++ b/crates/plugin/src/plugins/status/status_plugin.rs @@ -13,7 +13,7 @@ use tardis::tardis_static; #[cfg(not(feature = "cache"))] use tardis::tokio::sync::RwLock; #[cfg(feature = "cache")] -use spacegate_ext_redis::{redis::AsyncCommands}; +use spacegate_ext_redis::redis::AsyncCommands; #[cfg(feature = "cache")] use tardis::TardisFuns; #[cfg(not(feature = "cache"))] diff --git a/crates/shell/Cargo.toml b/crates/shell/Cargo.toml index c1d9c3dd..5415df01 100644 --- a/crates/shell/Cargo.toml +++ b/crates/shell/Cargo.toml @@ -46,7 +46,7 @@ plugin-rewrite = ["spacegate-plugin/rewrite"] plugin-maintenance = ["spacegate-plugin/maintenance"] # plugin-decompression = ["spacegate-plugin/decompression"] plugin-status = ["spacegate-plugin/status"] - +plugin-dylib = ["spacegate-plugin/dylib"] [dependencies] spacegate-kernel = { workspace = true, features = ["reload"] } diff --git a/crates/shell/src/config.rs b/crates/shell/src/config.rs index 35674309..40a908b9 100644 --- a/crates/shell/src/config.rs +++ b/crates/shell/src/config.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, net::SocketAddr}; use crate::server::RunningSgGateway; use futures_util::{Stream, StreamExt}; -use spacegate_plugin::SgPluginRepository; +use spacegate_plugin::PluginRepository; use tokio_util::sync::CancellationToken; pub use spacegate_config::model::*; @@ -122,14 +122,14 @@ async fn handler(event: (ConfigType, ConfigEventType), config: &C, (ConfigType::Plugin { id }, ConfigEventType::Create | ConfigEventType::Update) => { let config = config.retrieve_plugin(&id).await?; if let Some(config) = config { - if let Err(e) = SgPluginRepository::global().create_or_update_instance(config) { + if let Err(e) = PluginRepository::global().create_or_update_instance(config) { tracing::error!("[SG.Config] plugin {id:?} create failed: {e}", id = id, e = e); } } else { tracing::error!("[SG.Config] plugin {id:?} not found"); } } - (ConfigType::Plugin { id }, ConfigEventType::Delete) => match SgPluginRepository::global().remove_instance(&id) { + (ConfigType::Plugin { id }, ConfigEventType::Delete) => match PluginRepository::global().remove_instance(&id) { Ok(_mount_points) => {} Err(e) => { tracing::error!("[SG.Config] file to remove plugin {id:?} : {e}", id = id, e = e); diff --git a/crates/shell/src/config/plugin_filter_dto.rs b/crates/shell/src/config/plugin_filter_dto.rs index 90b4b2a0..423c6c68 100644 --- a/crates/shell/src/config/plugin_filter_dto.rs +++ b/crates/shell/src/config/plugin_filter_dto.rs @@ -1,12 +1,12 @@ use spacegate_config::{model::PluginConfig, PluginInstanceId}; use spacegate_plugin::{ mount::{MountPoint, MountPointIndex}, - SgPluginRepository, + PluginRepository, }; pub fn global_batch_update_plugin(plugins: Vec) { for plugin in plugins { - match SgPluginRepository::global().create_or_update_instance(plugin) { + match PluginRepository::global().create_or_update_instance(plugin) { Ok(_) => {} Err(e) => { tracing::error!("fail to create or update plugin {e}") @@ -16,10 +16,10 @@ pub fn global_batch_update_plugin(plugins: Vec) { } pub fn global_batch_mount_plugin(plugins: Vec, mount_point: &mut MP, mount_index: MountPointIndex) { - batch_mount_plugin(SgPluginRepository::global(), plugins, mount_point, mount_index); + batch_mount_plugin(PluginRepository::global(), plugins, mount_point, mount_index); } -pub fn batch_mount_plugin(repo: &SgPluginRepository, plugins: Vec, mount_point: &mut MP, mount_index: MountPointIndex) { +pub fn batch_mount_plugin(repo: &PluginRepository, plugins: Vec, mount_point: &mut MP, mount_index: MountPointIndex) { for plugin in plugins { if let Err(e) = repo.mount(mount_point, mount_index.clone(), plugin) { tracing::error!("fail to mount plugin {e}") diff --git a/crates/shell/src/server.rs b/crates/shell/src/server.rs index 01592b47..1233d825 100644 --- a/crates/shell/src/server.rs +++ b/crates/shell/src/server.rs @@ -13,7 +13,7 @@ use spacegate_kernel::{ service::gateway::{builder::default_gateway_route_fallback, create_http_router, HttpRouterService}, ArcHyperService, BoxError, }; -use spacegate_plugin::{mount::MountPointIndex, SgPluginRepository}; +use spacegate_plugin::{mount::MountPointIndex, PluginRepository}; use std::sync::Arc; use std::time::Duration; use std::vec::Vec; @@ -157,7 +157,7 @@ pub static GLOBAL_STORE: OnceLock>>> impl RunningSgGateway { pub async fn global_init(config: Config, signal: CancellationToken) { for (id, spec) in config.plugins.into_inner() { - if let Err(err) = SgPluginRepository::global().create_or_update_instance(PluginConfig { id: id.clone(), spec }) { + if let Err(err) = PluginRepository::global().create_or_update_instance(PluginConfig { id: id.clone(), spec }) { tracing::error!("[SG.Config] fail to init plugin [{id}]: {err}", id = id.to_string()); } } @@ -182,7 +182,7 @@ impl RunningSgGateway { while let Some(res) = task.join_next().await { res.expect("tokio join error") } - SgPluginRepository::global().clear_instances() + PluginRepository::global().clear_instances() } pub fn global_store() -> Arc>> { diff --git a/debug.md b/debug.md deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/sayhello/Cargo.toml b/examples/sayhello/Cargo.toml new file mode 100644 index 00000000..f2acf1c2 --- /dev/null +++ b/examples/sayhello/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sayhello" +version.workspace = true +authors.workspace = true +description.workspace = true +keywords.workspace = true +categories.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true + +[lib] +name = "sayhello" +path = "src/lib.rs" +crate-type = ["dylib"] +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +spacegate-plugin = { workspace = true, features = ["dylib"] } +hyper = { workspace = true } diff --git a/examples/sayhello/src/lib.rs b/examples/sayhello/src/lib.rs new file mode 100644 index 00000000..561c49db --- /dev/null +++ b/examples/sayhello/src/lib.rs @@ -0,0 +1,21 @@ +use hyper::header::HeaderValue; +use spacegate_plugin::{dynamic_lib, BoxError, Inner, Plugin, SgRequest, SgResponse}; +pub struct SayHelloPlugin; + +impl Plugin for SayHelloPlugin { + const CODE: &'static str = "sayhello"; + + async fn call(&self, req: SgRequest, inner: Inner) -> Result { + let mut resp = inner.call(req).await; + resp.headers_mut().insert("hello", HeaderValue::from_static("world")); + Ok(resp) + } + + fn create(_plugin_config: spacegate_plugin::PluginConfig) -> Result { + Ok(Self) + } +} + +dynamic_lib! { + SayHelloPlugin +} diff --git a/resource/local-example/config.json b/resource/local-example/config.json index 955f0235..57bda24f 100644 --- a/resource/local-example/config.json +++ b/resource/local-example/config.json @@ -23,6 +23,10 @@ { "name": "add-server-header", "code": "header-modifier" + }, + { + "name": "say-hello", + "code": "sayhello" } ], "routes": [ @@ -36,8 +40,8 @@ { "path": { "kind": "Prefix", - "value": "/", - "replace": null + "value": "/doc", + "replace": "/" }, "header": null, "query": null, @@ -49,7 +53,7 @@ { "host": { "kind": "File", - "path": "./" + "path": "./target/doc" }, "port": 80, "timeout_ms": null, @@ -110,16 +114,6 @@ ], "plugins": { "named": [ - { - "name": "new-instance-2", - "code": "limit", - "spec": {} - }, - { - "name": "new-instance", - "code": "limit", - "spec": {} - }, { "name": "rewrite-test", "code": "rewrite", @@ -146,6 +140,11 @@ }, "content_type": "text/html" } + }, + { + "name": "say-hello", + "code": "sayhello", + "spec": null } ], "mono": [] diff --git a/resource/local-example/plugins/.gitignore b/resource/local-example/plugins/.gitignore new file mode 100644 index 00000000..a577d687 --- /dev/null +++ b/resource/local-example/plugins/.gitignore @@ -0,0 +1,3 @@ +*.so +*.dll +*.dylib \ No newline at end of file diff --git a/sdk/admin-client/package.json b/sdk/admin-client/package.json index 4af1116b..6e2856d7 100644 --- a/sdk/admin-client/package.json +++ b/sdk/admin-client/package.json @@ -1,6 +1,6 @@ { "name": "spacegate-admin-client", - "version": "0.2.0-alpha.14", + "version": "0.2.0-alpha.16", "description": "Type Script Client Lib for Spacegate Admin Server", "homepage": "https://github.com/ideal-world/spacegate", "main": "dist/index.js", @@ -21,4 +21,4 @@ "files": [ "dist" ] -} \ No newline at end of file +} diff --git a/sdk/admin-client/src/api/index.ts b/sdk/admin-client/src/api/index.ts index ff13fad8..877955e5 100644 --- a/sdk/admin-client/src/api/index.ts +++ b/sdk/admin-client/src/api/index.ts @@ -1,4 +1,4 @@ -import axios, { AxiosResponse, AxiosInstance } from 'axios' +import axios, { AxiosResponse, AxiosInstance, AxiosError } from 'axios' import { Config, ConfigItem, PluginAttributes, SgGateway, SgHttpRoute } from '../model' import { PluginConfig } from '../model/PluginConfig' import { PluginInstanceName } from '../model/PluginInstanceName' @@ -28,25 +28,45 @@ export class ExceptionVersionConflict extends Error { } } +export class ExceptionUnauthorized extends Error { + constructor() { + super('spacegate-admin-client: Unauthorized') + } +} + + export function setClient(...args: Parameters) { let instance = axios.create(...args) instance.interceptors.request.use((cfg) => { cfg.headers['X-Client-Version'] = Client.clientVersion ?? '0' return cfg }); - instance.interceptors.response.use((resp) => { - // this shall be lower case - let value = resp.headers['x-server-version']; - let is_conflict = (resp.status == 409) - if (value !== undefined && value !== Client.clientVersion) { - if (is_conflict) { - throw new ExceptionVersionConflict() - } else { - Client.clientVersion = value + instance.interceptors.response.use( + (resp) => { + // this shall be lower case + let value = resp.headers['x-server-version']; + let is_conflict = (resp.status == 409) + if (value !== undefined && value !== Client.clientVersion) { + if (is_conflict) { + throw new ExceptionVersionConflict() + } else { + Client.clientVersion = value + } + } + return resp + }, + (err) => { + if (err instanceof AxiosError) { + if (err.response?.status == 409) { + throw new ExceptionVersionConflict() + } + if (err.response?.status == 401) { + throw new ExceptionUnauthorized() + } } + throw err } - return resp - }) + ) Client.axiosInstance = instance } @@ -209,4 +229,14 @@ export async function pluginAttr(code: string): Promise> { return Client.axiosInstance.get(`/plugin/schema/${code}`) +} + +/********************************************** + auth +**********************************************/ + +export async function authLogin(ak: string, sk: string): Promise { + return Client.axiosInstance.post(`/auth/login`, { + ak, sk + }) } \ No newline at end of file diff --git a/sdk/admin-client/src/model/BackendHost.ts b/sdk/admin-client/src/model/BackendHost.ts index 054dda83..1a3567ca 100644 --- a/sdk/admin-client/src/model/BackendHost.ts +++ b/sdk/admin-client/src/model/BackendHost.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { K8sServiceData } from "./K8sServiceData"; -export type BackendHost = { "kind": "Host", host: string, } | { "kind": "K8sService" } & K8sServiceData; \ No newline at end of file +export type BackendHost = { "kind": "Host", host: string, } | { "kind": "K8sService" } & K8sServiceData | { "kind": "File", path: string, }; \ No newline at end of file diff --git a/todo.md b/todo.md deleted file mode 100644 index 30f8424d..00000000 --- a/todo.md +++ /dev/null @@ -1,5 +0,0 @@ -1. [x] 分离所有的包的redis客户端 -2. [ ] 修改插件的目录结构 -3. [x] 权重支持负数 -4. [ ] 测试用例 -5. [ ] 插件可独立配置 \ No newline at end of file