From 723909e9f33d39fad6ed5085718ecf772c3c479d Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 27 Jun 2024 17:42:40 +0100 Subject: [PATCH 01/22] feat(server): gracefully shutting down mediator-server --- mediator-server/Cargo.toml | 4 ++- mediator-server/src/main.rs | 66 ++++++++++++++++++++++++++----------- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/mediator-server/Cargo.toml b/mediator-server/Cargo.toml index b8b862a1..d4ee657e 100644 --- a/mediator-server/Cargo.toml +++ b/mediator-server/Cargo.toml @@ -34,8 +34,10 @@ strum_macros = "0.24" uuid = { version = "1", features = ["fast-rng", "v4"] } csv = "1.1.6" +tokio-util = {version = "0.7.7", features = ["rt"]} + [dev-dependencies] anyhow = "1" -httpc-test = "0.1" \ No newline at end of file +httpc-test = "0.1" diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index 96be69e6..1850767c 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -1,45 +1,74 @@ -use std::net::SocketAddr; - -use axum::{response::{IntoResponse, Response}, Router, middleware, Json, http::{Uri, Method}}; +use axum::{ + http::{Method, Uri}, + middleware, + response::{IntoResponse, Response}, + Json, Router, +}; use serde_json::json; +use std::net::SocketAddr; +use tokio_util::task::TaskTracker; use uuid::Uuid; +mod constants; mod coordinate_mediation; mod ctx; -mod constants; mod error; -mod models; mod log; +mod models; mod utils; -use crate::{models::RecipientController, log::log_request}; +use crate::{log::log_request, models::RecipientController}; -pub use self::{error::{Error, Result, ClientError}, ctx::Ctx}; +pub use self::{ + ctx::Ctx, + error::{ClientError, Error, Result}, +}; #[tokio::main] async fn main() -> Result<()> { let mc = RecipientController::new().await; let routes_mediate_request = coordinate_mediation::routes_mediate_request::routes(mc.clone()); let routes_all = Router::new() - .nest("/coordinate-mediation/2.0/mediate-request", routes_mediate_request) + .nest( + "/coordinate-mediation/2.0/mediate-request", + routes_mediate_request, + ) .layer(middleware::map_response(main_response_mapper)); + // create a messager which will send the shutdown message to the server and its processes + // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter + let (mut shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); + + // Create a tracker wish will use to track and wait for operations to finish + // before shutdown + let tracker = TaskTracker::new(); + + // first condition satisfied + tracker.close(); + // shutdown_tx.send("value"); // region: --- Start Server let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("->> Listening on {addr}\n"); - axum::Server::bind(&addr) - .serve(routes_all.into_make_service()) - .await - .unwrap(); + + // spawning task tracker on server to handle server's shutdown + tracker.spawn(async move { + axum::Server::bind(&addr) + .serve(routes_all.into_make_service()) + .await + .unwrap(); + }); // endregion: --- Start Server + // gracyfully shuting down the server on CTRL-C or on shutdown alert from shutdown transmitter + // select on the operations for which we wish to gracefully shutdown the server + tokio::select! { + _shutdown_message = shutdown_rx.recv() => { print!("{}", _shutdown_message.unwrap()); tracker.wait().await;}, + _ = tokio::signal::ctrl_c() => {eprintln!("shutting down")}, + } + Ok(()) } - -async fn main_response_mapper( - uri: Uri, - req_method: Method, - res:Response) -> Response { +async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Response { println!("->> {:<12} - main_response_mapper", "RESPONSE_MAPPER"); let uuid = Uuid::new_v4(); @@ -61,9 +90,8 @@ async fn main_response_mapper( // Build the new response from the client_error_body. (*status_code, Json(client_error_body)).into_response() - }); - + // -- Build and log the server log line let client_error = client_status_error.unzip().1; let _ = log_request(uuid, req_method, uri, service_error, client_error).await; From c792ebe9f5985707e99137eb282fb8ffa3aed4b4 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Fri, 28 Jun 2024 06:07:49 +0100 Subject: [PATCH 02/22] feat(shutdown): gracefully shutting down generic server --- generic-server/Cargo.toml | 1 + generic-server/src/main.rs | 27 +++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index 773b991a..89df0f80 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -25,6 +25,7 @@ server-plugin = { path = "../server-plugin" } chrono = { version = "0.4.26", optional = true } did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } +tokio-util = {version = "0.7.11", features = ["rt"] } [dev-dependencies] tower = { version = "0.4.13", features = ["util"] } diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 33d6a5de..30dd0c30 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -2,9 +2,14 @@ use generic_server::app; use axum::Server; use std::net::SocketAddr; +use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() { + + //creating cancellation token which can be cloned and closed to tell server and process to finish + let token = CancellationToken::new(); + // Load dotenv-flow variables dotenv_flow::dotenv_flow().ok(); @@ -15,10 +20,24 @@ async fn main() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); tracing::info!("listening on {addr}"); - Server::bind(&addr) - .serve(app().into_make_service()) - .await - .unwrap(); + + // create a messager which will send the shutdown message to the server and its processes + // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter + let (_shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); + + // spawn task for server + tokio::spawn(async move { + Server::bind(&addr) + .serve(app().into_make_service()) + .await + .unwrap(); + }); + + // watching on shutdown events/signals to gracefully shutdown servers + tokio::select! { + _msg = shutdown_rx.recv() => {eprintln!("\nshutting down gracefully:{}", _msg.unwrap()); token.cancel()} + _ = tokio::signal::ctrl_c() => {eprintln!("\nshutting down gracefully"); token.cancel()} + } } fn config_tracing() { From 5f9800f238644747a6db9cac27d581c3ba34c38b Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Fri, 28 Jun 2024 06:09:28 +0100 Subject: [PATCH 03/22] fix(shutdown): gracefully shutting down mediator server --- mediator-server/src/main.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index 1850767c..ad1f184e 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -6,7 +6,7 @@ use axum::{ }; use serde_json::json; use std::net::SocketAddr; -use tokio_util::task::TaskTracker; +use tokio_util::sync::CancellationToken; use uuid::Uuid; mod constants; @@ -37,21 +37,17 @@ async fn main() -> Result<()> { // create a messager which will send the shutdown message to the server and its processes // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter - let (mut shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); - // Create a tracker wish will use to track and wait for operations to finish - // before shutdown - let tracker = TaskTracker::new(); + // create cancellation tokens which when closed will tell processes to shutdown + let token = CancellationToken::new(); - // first condition satisfied - tracker.close(); - // shutdown_tx.send("value"); // region: --- Start Server let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("->> Listening on {addr}\n"); // spawning task tracker on server to handle server's shutdown - tracker.spawn(async move { + tokio::spawn(async move { axum::Server::bind(&addr) .serve(routes_all.into_make_service()) .await @@ -59,11 +55,11 @@ async fn main() -> Result<()> { }); // endregion: --- Start Server - // gracyfully shuting down the server on CTRL-C or on shutdown alert from shutdown transmitter + // gracyfully shutting down the server on CTRL-C or on shutdown alert from shutdown transmitter // select on the operations for which we wish to gracefully shutdown the server tokio::select! { - _shutdown_message = shutdown_rx.recv() => { print!("{}", _shutdown_message.unwrap()); tracker.wait().await;}, - _ = tokio::signal::ctrl_c() => {eprintln!("shutting down")}, + _shutdown_message = shutdown_rx.recv() => {eprintln!("shutting down"); token.cancel()}, + _ = tokio::signal::ctrl_c() => {eprintln!("shutting down"); token.cancel()}, } Ok(()) From 4161afbc1a5e038fa0e917751399e15cfd3cca85 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Tue, 2 Jul 2024 11:06:06 +0100 Subject: [PATCH 04/22] fix(test): adding shutdown test plus reformating --- generic-server/Cargo.toml | 4 ++- generic-server/src/main.rs | 72 +++++++++++++++++++++++++++----------- 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index 89df0f80..d5e7c667 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -26,10 +26,12 @@ chrono = { version = "0.4.26", optional = true } did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } tokio-util = {version = "0.7.11", features = ["rt"] } +once_cell = "1.19.0" [dev-dependencies] tower = { version = "0.4.13", features = ["util"] } - +assert_cmd = "2.0" +predicates = "3.0.4" [features] default = ["plugin-index", "plugin-did_endpoint", "plugin-oob_messages"] diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 30dd0c30..a743aa8d 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,49 +1,53 @@ -use generic_server::app; - use axum::Server; -use std::net::SocketAddr; +use generic_server::app; +use once_cell::sync::Lazy; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::{mpsc::Sender, Mutex}; use tokio_util::sync::CancellationToken; +// create a global signal shutdown signal transmitter +static SHUTDOWN: Lazy>>>> = Lazy::new(|| Arc::new(Mutex::new(None))); #[tokio::main] async fn main() { - - //creating cancellation token which can be cloned and closed to tell server and process to finish + // Start server + let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); + let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + tracing::info!("listening on {addr}"); + run_and_shutdown_server(addr).await +} + +async fn run_and_shutdown_server(add: SocketAddr) { + //creating cancellation token which can be cloned and closed to tell server and process to finish let token = CancellationToken::new(); - + // Load dotenv-flow variables dotenv_flow::dotenv_flow().ok(); // Enable logging config_tracing(); - - // Start server - let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); - let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); - tracing::info!("listening on {addr}"); - // create a messager which will send the shutdown message to the server and its processes // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter - let (_shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); // spawn task for server - tokio::spawn(async move { - Server::bind(&addr) + tokio::spawn(async move { + Server::bind(&add) .serve(app().into_make_service()) .await .unwrap(); }); + // watching on shutdown events/signals to gracefully shutdown servers + let mut lock = SHUTDOWN.lock().await; + lock.replace(shutdown_tx); - // watching on shutdown events/signals to gracefully shutdown servers tokio::select! { - _msg = shutdown_rx.recv() => {eprintln!("\nshutting down gracefully:{}", _msg.unwrap()); token.cancel()} + _msg = shutdown_rx.recv() => {eprintln!("\nshutting down gracefully:{:?}", _msg); token.cancel()} _ = tokio::signal::ctrl_c() => {eprintln!("\nshutting down gracefully"); token.cancel()} - } + }; } - fn config_tracing() { use tracing::Level; use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt}; - let tracing_layer = tracing_subscriber::fmt::layer(); let filter = filter::Targets::new() .with_target("hyper::proto", Level::INFO) @@ -55,3 +59,31 @@ fn config_tracing() { .with(filter) .init(); } + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + + #[tokio::test] + async fn test_server_shutdown() { + let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); + let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + tracing::info!("listening on {addr}"); + + // run server in background + tokio::spawn(run_and_shutdown_server(addr)); + + // send shutdown signal + let mut lock = SHUTDOWN.lock().await; + let sender = lock.as_mut(); + match sender { + Some(sender) => { + sender.send("value".to_owned()).await.unwrap(); + + } + None => {} + } + + } +} From 2b6b60ae63ad0a4daf273d723552f0c86049bca5 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Tue, 2 Jul 2024 11:20:00 +0100 Subject: [PATCH 05/22] fix(test): adding test and code reformating --- mediator-server/Cargo.toml | 1 + mediator-server/src/main.rs | 46 ++++++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/mediator-server/Cargo.toml b/mediator-server/Cargo.toml index d4ee657e..8a6a21e4 100644 --- a/mediator-server/Cargo.toml +++ b/mediator-server/Cargo.toml @@ -35,6 +35,7 @@ uuid = { version = "1", features = ["fast-rng", "v4"] } csv = "1.1.6" tokio-util = {version = "0.7.7", features = ["rt"]} +once_cell = "1.19.0" diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index ad1f184e..3dca054a 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -4,8 +4,10 @@ use axum::{ response::{IntoResponse, Response}, Json, Router, }; +use once_cell::sync::Lazy; use serde_json::json; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::{mpsc::Sender, Mutex}; use tokio_util::sync::CancellationToken; use uuid::Uuid; @@ -23,9 +25,15 @@ pub use self::{ ctx::Ctx, error::{ClientError, Error, Result}, }; +// create a global signal shutdown signal transmitter +static SHUTDOWN: Lazy>>>> = Lazy::new(|| Arc::new(Mutex::new(None))); #[tokio::main] async fn main() -> Result<()> { + run_shutdown().await; + Ok(()) +} + async fn run_shutdown() { let mc = RecipientController::new().await; let routes_mediate_request = coordinate_mediation::routes_mediate_request::routes(mc.clone()); let routes_all = Router::new() @@ -39,14 +47,18 @@ async fn main() -> Result<()> { // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); - // create cancellation tokens which when closed will tell processes to shutdown - let token = CancellationToken::new(); + // create cancellation tokens which when closed will tell processes to shutdown + let token = CancellationToken::new(); // region: --- Start Server let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("->> Listening on {addr}\n"); - // spawning task tracker on server to handle server's shutdown +// initialising global shutdown transmitter + let mut lock = SHUTDOWN.lock().await; + lock.replace(shutdown_tx); + + // spawning task on server to handle server's shutdown tokio::spawn(async move { axum::Server::bind(&addr) .serve(routes_all.into_make_service()) @@ -61,9 +73,8 @@ async fn main() -> Result<()> { _shutdown_message = shutdown_rx.recv() => {eprintln!("shutting down"); token.cancel()}, _ = tokio::signal::ctrl_c() => {eprintln!("shutting down"); token.cancel()}, } - - Ok(()) } + async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Response { println!("->> {:<12} - main_response_mapper", "RESPONSE_MAPPER"); let uuid = Uuid::new_v4(); @@ -96,3 +107,26 @@ async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Re error_response.unwrap_or(res) } +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_server_shutdown_with_shutdown_signal() { + + // run server in background + tokio::spawn(run_shutdown()); + + // send shutdown signal + let mut lock = SHUTDOWN.lock().await; + let sender = lock.as_mut(); + match sender { + Some(sender) => { + sender.send("Shutdown".to_owned()).await.unwrap(); + + } + None => {} + } + + } +} \ No newline at end of file From 6c42a8d998bff120b85b97577f59577b5d4c5ec9 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Wed, 3 Jul 2024 14:48:50 +0100 Subject: [PATCH 06/22] fix(update): updated shutdown to include unmounting of plugins --- generic-server/src/lib.rs | 24 ++++++++++++++++----- generic-server/src/main.rs | 16 +++++++------- generic-server/src/plugin/container.rs | 30 +++++++++++++++++++++++++- 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/generic-server/src/lib.rs b/generic-server/src/lib.rs index 5cfbd604..bda8d429 100644 --- a/generic-server/src/lib.rs +++ b/generic-server/src/lib.rs @@ -1,18 +1,32 @@ pub mod plugin; pub mod util; - +use once_cell::unsync::Lazy; use plugin::container::PluginContainer; use axum::Router; use tower_http::catch_panic::CatchPanicLayer; use tower_http::trace::TraceLayer; +// creating plugincontainer globally for easy loading and unloading +static mut CONTAINER: Lazy> = Lazy::new(|| None); pub fn app() -> Router { let mut container = PluginContainer::default(); let _ = container.load(); + unsafe { CONTAINER.replace(container) }; + unsafe { + Router::new() // + .merge(CONTAINER.as_ref().unwrap().routes().unwrap_or_default()) + .layer(TraceLayer::new_for_http()) + .layer(CatchPanicLayer::new()) + } +} - Router::new() // - .merge(container.routes().unwrap_or_default()) - .layer(TraceLayer::new_for_http()) - .layer(CatchPanicLayer::new()) +// creating function to unmount plugins on shutdown +pub fn unload_for_shutdown() { + unsafe { + CONTAINER.as_mut().unwrap().unload().unwrap(); + } + } + + diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index a743aa8d..bae73974 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,12 +1,13 @@ use axum::Server; -use generic_server::app; +use generic_server::{app, unload_for_shutdown}; use once_cell::sync::Lazy; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc::Sender, Mutex}; use tokio_util::sync::CancellationToken; -// create a global signal shutdown signal transmitter -static SHUTDOWN: Lazy>>>> = Lazy::new(|| Arc::new(Mutex::new(None))); +// create a global shutdown signal transmitter +static SHUTDOWN: Lazy>>>> = + Lazy::new(|| Arc::new(Mutex::new(None))); #[tokio::main] async fn main() { // Start server @@ -41,8 +42,8 @@ async fn run_and_shutdown_server(add: SocketAddr) { lock.replace(shutdown_tx); tokio::select! { - _msg = shutdown_rx.recv() => {eprintln!("\nshutting down gracefully:{:?}", _msg); token.cancel()} - _ = tokio::signal::ctrl_c() => {eprintln!("\nshutting down gracefully"); token.cancel()} + _msg = shutdown_rx.recv() => {eprintln!("\nUmounting plugins\nshutting down gracefully:{:?}", _msg); unload_for_shutdown(); token.cancel(); } + _ = tokio::signal::ctrl_c() => {eprintln!("\nUnmounting Plugins\nshutting down gracefully"); unload_for_shutdown(); token.cancel(); } }; } fn config_tracing() { @@ -62,7 +63,7 @@ fn config_tracing() { #[cfg(test)] mod tests { - use super::*; + use super::{SHUTDOWN, run_and_shutdown_server}; use std::net::SocketAddr; #[tokio::test] @@ -80,10 +81,9 @@ mod tests { match sender { Some(sender) => { sender.send("value".to_owned()).await.unwrap(); - } None => {} } - + } } diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index caafcb0c..37916886 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet}; - use axum::Router; use server_plugin::{Plugin, PluginError}; @@ -89,6 +88,35 @@ impl<'a> PluginContainer<'a> { } } + pub fn unload(&mut self) -> Result<(), PluginContainerError> { + // Unmount plugins and clearing the vector of routes + let errors: HashMap<_, _> = self + .plugins + .iter() + .filter_map(|plugins| match plugins.unmount() { + Ok(_) => { + tracing::info!("Unmounted plugins {}", plugins.name()); + self.collected_routes.clear(); + None + } + Err(err) => { + tracing::error!("error unmounting plugins {}", plugins.name()); + Some((plugins.name().to_owned(), err)) + } + }) + .collect(); + + // Flag as unloaded + self.loaded = false; + + // Return state of completion + if errors.is_empty() { + tracing::debug!("plugin container unloaded"); + Ok(()) + } else { + Err(PluginContainerError::PluginErrorMap(errors)) + } + } /// Merge collected routes from all plugins successfully initialized. pub fn routes(&self) -> Result { if self.loaded { From a5eb619eb45c3b9c795c5b73badb23ff82bbf456 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 4 Jul 2024 15:37:56 +0100 Subject: [PATCH 07/22] feat(): gracefully shutdown server --- generic-server/Cargo.toml | 1 + generic-server/src/main.rs | 33 ++++++++------------------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index d5e7c667..a8e5ca36 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -27,6 +27,7 @@ did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } tokio-util = {version = "0.7.11", features = ["rt"] } once_cell = "1.19.0" +nix = { version = "0.29.0", features = ["feature"] } [dev-dependencies] tower = { version = "0.4.13", features = ["util"] } diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index bae73974..0229e3e7 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,13 +1,8 @@ use axum::Server; use generic_server::{app, unload_for_shutdown}; -use once_cell::sync::Lazy; -use std::{net::SocketAddr, sync::Arc}; -use tokio::sync::{mpsc::Sender, Mutex}; +use std::net::SocketAddr; use tokio_util::sync::CancellationToken; -// create a global shutdown signal transmitter -static SHUTDOWN: Lazy>>>> = - Lazy::new(|| Arc::new(Mutex::new(None))); #[tokio::main] async fn main() { // Start server @@ -26,10 +21,7 @@ async fn run_and_shutdown_server(add: SocketAddr) { // Enable logging config_tracing(); - // create a messager which will send the shutdown message to the server and its processes - // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter - let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); - + // spawn task for server tokio::spawn(async move { Server::bind(&add) @@ -37,12 +29,8 @@ async fn run_and_shutdown_server(add: SocketAddr) { .await .unwrap(); }); - // watching on shutdown events/signals to gracefully shutdown servers - let mut lock = SHUTDOWN.lock().await; - lock.replace(shutdown_tx); tokio::select! { - _msg = shutdown_rx.recv() => {eprintln!("\nUmounting plugins\nshutting down gracefully:{:?}", _msg); unload_for_shutdown(); token.cancel(); } _ = tokio::signal::ctrl_c() => {eprintln!("\nUnmounting Plugins\nshutting down gracefully"); unload_for_shutdown(); token.cancel(); } }; } @@ -63,9 +51,12 @@ fn config_tracing() { #[cfg(test)] mod tests { - use super::{SHUTDOWN, run_and_shutdown_server}; + use tokio::signal; + + use super::run_and_shutdown_server; use std::net::SocketAddr; + #[tokio::test] async fn test_server_shutdown() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); @@ -74,16 +65,8 @@ mod tests { // run server in background tokio::spawn(run_and_shutdown_server(addr)); + // send a shutdown signal to main thread + signal::unix::SignalKind::terminate(); - // send shutdown signal - let mut lock = SHUTDOWN.lock().await; - let sender = lock.as_mut(); - match sender { - Some(sender) => { - sender.send("value".to_owned()).await.unwrap(); } - None => {} - } - - } } From 47a5fb9062563c59f5566e9e65f621304f3a160c Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 4 Jul 2024 16:45:20 +0100 Subject: [PATCH 08/22] feat(): gracefull shutdown --- mediator-server/Cargo.toml | 1 - mediator-server/src/main.rs | 40 ++++++++----------------------------- 2 files changed, 8 insertions(+), 33 deletions(-) diff --git a/mediator-server/Cargo.toml b/mediator-server/Cargo.toml index 8a6a21e4..d4ee657e 100644 --- a/mediator-server/Cargo.toml +++ b/mediator-server/Cargo.toml @@ -35,7 +35,6 @@ uuid = { version = "1", features = ["fast-rng", "v4"] } csv = "1.1.6" tokio-util = {version = "0.7.7", features = ["rt"]} -once_cell = "1.19.0" diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index 3dca054a..7660c419 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -4,10 +4,9 @@ use axum::{ response::{IntoResponse, Response}, Json, Router, }; -use once_cell::sync::Lazy; + use serde_json::json; -use std::{net::SocketAddr, sync::Arc}; -use tokio::sync::{mpsc::Sender, Mutex}; +use std::net::SocketAddr; use tokio_util::sync::CancellationToken; use uuid::Uuid; @@ -25,15 +24,13 @@ pub use self::{ ctx::Ctx, error::{ClientError, Error, Result}, }; -// create a global signal shutdown signal transmitter -static SHUTDOWN: Lazy>>>> = Lazy::new(|| Arc::new(Mutex::new(None))); #[tokio::main] async fn main() -> Result<()> { run_shutdown().await; Ok(()) } - async fn run_shutdown() { +async fn run_shutdown() { let mc = RecipientController::new().await; let routes_mediate_request = coordinate_mediation::routes_mediate_request::routes(mc.clone()); let routes_all = Router::new() @@ -43,10 +40,6 @@ async fn main() -> Result<()> { ) .layer(middleware::map_response(main_response_mapper)); - // create a messager which will send the shutdown message to the server and its processes - // any process which wishes to stop the server can send a shutdown message to the shutdown transmitter - let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::(2); - // create cancellation tokens which when closed will tell processes to shutdown let token = CancellationToken::new(); @@ -54,10 +47,6 @@ async fn main() -> Result<()> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("->> Listening on {addr}\n"); -// initialising global shutdown transmitter - let mut lock = SHUTDOWN.lock().await; - lock.replace(shutdown_tx); - // spawning task on server to handle server's shutdown tokio::spawn(async move { axum::Server::bind(&addr) @@ -67,11 +56,9 @@ async fn main() -> Result<()> { }); // endregion: --- Start Server - // gracyfully shutting down the server on CTRL-C or on shutdown alert from shutdown transmitter // select on the operations for which we wish to gracefully shutdown the server tokio::select! { - _shutdown_message = shutdown_rx.recv() => {eprintln!("shutting down"); token.cancel()}, - _ = tokio::signal::ctrl_c() => {eprintln!("shutting down"); token.cancel()}, + _ = tokio::signal::ctrl_c() => {eprintln!("shutting down"); token.cancel()} } } @@ -110,23 +97,12 @@ async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Re #[cfg(test)] mod tests { use super::*; - + use tokio::signal; #[tokio::test] async fn test_server_shutdown_with_shutdown_signal() { - // run server in background tokio::spawn(run_shutdown()); - - // send shutdown signal - let mut lock = SHUTDOWN.lock().await; - let sender = lock.as_mut(); - match sender { - Some(sender) => { - sender.send("Shutdown".to_owned()).await.unwrap(); - - } - None => {} - } - + // send a shutdown signal to main thread + signal::unix::SignalKind::terminate(); } -} \ No newline at end of file +} From cb756335ed378c21fe3d19bf68da7212a68cf433 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 4 Jul 2024 16:52:28 +0100 Subject: [PATCH 09/22] fix(): correcting typos --- mediator-server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index 7660c419..881dad29 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -99,7 +99,7 @@ mod tests { use super::*; use tokio::signal; #[tokio::test] - async fn test_server_shutdown_with_shutdown_signal() { + async fn test_server_shutdown() { // run server in background tokio::spawn(run_shutdown()); // send a shutdown signal to main thread From 5ac7f82868c5a70c13db4b2b9d1a4aa01674d761 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 4 Jul 2024 16:53:27 +0100 Subject: [PATCH 10/22] fix(): fixing dependencies --- generic-server/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index a8e5ca36..b78e63d3 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -31,8 +31,7 @@ nix = { version = "0.29.0", features = ["feature"] } [dev-dependencies] tower = { version = "0.4.13", features = ["util"] } -assert_cmd = "2.0" -predicates = "3.0.4" + [features] default = ["plugin-index", "plugin-did_endpoint", "plugin-oob_messages"] From 838176315d17d1036791333dc90c214d2a2fade2 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Wed, 10 Jul 2024 08:49:53 +0100 Subject: [PATCH 11/22] fix(global): global fix of shutdown impl --- generic-server/src/lib.rs | 23 +++------ generic-server/src/main.rs | 21 ++++---- generic-server/src/plugin/container.rs | 14 +++--- mediator-server/Cargo.toml | 4 +- mediator-server/src/main.rs | 70 +++++++------------------- 5 files changed, 45 insertions(+), 87 deletions(-) diff --git a/generic-server/src/lib.rs b/generic-server/src/lib.rs index bda8d429..6b641153 100644 --- a/generic-server/src/lib.rs +++ b/generic-server/src/lib.rs @@ -1,32 +1,21 @@ pub mod plugin; pub mod util; -use once_cell::unsync::Lazy; use plugin::container::PluginContainer; use axum::Router; use tower_http::catch_panic::CatchPanicLayer; use tower_http::trace::TraceLayer; -// creating plugincontainer globally for easy loading and unloading -static mut CONTAINER: Lazy> = Lazy::new(|| None); -pub fn app() -> Router { +pub fn app() -> (PluginContainer<'static>, Router) { let mut container = PluginContainer::default(); let _ = container.load(); - unsafe { CONTAINER.replace(container) }; - unsafe { - Router::new() // - .merge(CONTAINER.as_ref().unwrap().routes().unwrap_or_default()) + + let router = Router::new() + .merge(container.routes().unwrap_or_default()) .layer(TraceLayer::new_for_http()) - .layer(CatchPanicLayer::new()) - } + .layer(CatchPanicLayer::new()); + (container, router) } -// creating function to unmount plugins on shutdown -pub fn unload_for_shutdown() { - unsafe { - CONTAINER.as_mut().unwrap().unload().unwrap(); - } - -} diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 0229e3e7..b1822b2d 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,5 +1,5 @@ use axum::Server; -use generic_server::{app, unload_for_shutdown}; +use generic_server::app; use std::net::SocketAddr; use tokio_util::sync::CancellationToken; @@ -9,11 +9,11 @@ async fn main() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); tracing::info!("listening on {addr}"); - run_and_shutdown_server(addr).await + generic_server_with_gracefull_shutdown(addr).await } -async fn run_and_shutdown_server(add: SocketAddr) { - //creating cancellation token which can be cloned and closed to tell server and process to finish +async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { + // creating cancellation token tell server task and process to shutdown let token = CancellationToken::new(); // Load dotenv-flow variables @@ -22,16 +22,19 @@ async fn run_and_shutdown_server(add: SocketAddr) { // Enable logging config_tracing(); + // Load plugins + let (mut plugin_container, router) = app(); // spawn task for server tokio::spawn(async move { - Server::bind(&add) - .serve(app().into_make_service()) + Server::bind(&addr) + .serve(router.into_make_service()) .await .unwrap(); }); tokio::select! { - _ = tokio::signal::ctrl_c() => {eprintln!("\nUnmounting Plugins\nshutting down gracefully"); unload_for_shutdown(); token.cancel(); } + _ = tokio::signal::ctrl_c() => { + let _ = plugin_container.unload(); token.cancel();} }; } fn config_tracing() { @@ -53,7 +56,7 @@ fn config_tracing() { mod tests { use tokio::signal; - use super::run_and_shutdown_server; + use super::generic_server_with_gracefull_shutdown; use std::net::SocketAddr; @@ -64,7 +67,7 @@ mod tests { tracing::info!("listening on {addr}"); // run server in background - tokio::spawn(run_and_shutdown_server(addr)); + tokio::spawn(generic_server_with_gracefull_shutdown(addr)); // send a shutdown signal to main thread signal::unix::SignalKind::terminate(); diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index 37916886..f143718e 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -1,6 +1,6 @@ -use std::collections::{HashMap, HashSet}; use axum::Router; use server_plugin::{Plugin, PluginError}; +use std::collections::{HashMap, HashSet}; use super::PLUGINS; @@ -88,20 +88,20 @@ impl<'a> PluginContainer<'a> { } } + /// unload container plugins pub fn unload(&mut self) -> Result<(), PluginContainerError> { // Unmount plugins and clearing the vector of routes let errors: HashMap<_, _> = self .plugins .iter() - .filter_map(|plugins| match plugins.unmount() { + .filter_map(|plugin| match plugin.unmount() { Ok(_) => { - tracing::info!("Unmounted plugins {}", plugins.name()); - self.collected_routes.clear(); + tracing::info!("Unmounted plugin {}", plugin.name()); None } Err(err) => { - tracing::error!("error unmounting plugins {}", plugins.name()); - Some((plugins.name().to_owned(), err)) + tracing::error!("error unmounting plugin {}", plugin.name()); + Some((plugin.name().to_owned(), err)) } }) .collect(); @@ -111,12 +111,14 @@ impl<'a> PluginContainer<'a> { // Return state of completion if errors.is_empty() { + self.collected_routes.clear(); tracing::debug!("plugin container unloaded"); Ok(()) } else { Err(PluginContainerError::PluginErrorMap(errors)) } } + /// Merge collected routes from all plugins successfully initialized. pub fn routes(&self) -> Result { if self.loaded { diff --git a/mediator-server/Cargo.toml b/mediator-server/Cargo.toml index d4ee657e..b8b862a1 100644 --- a/mediator-server/Cargo.toml +++ b/mediator-server/Cargo.toml @@ -34,10 +34,8 @@ strum_macros = "0.24" uuid = { version = "1", features = ["fast-rng", "v4"] } csv = "1.1.6" -tokio-util = {version = "0.7.7", features = ["rt"]} - [dev-dependencies] anyhow = "1" -httpc-test = "0.1" +httpc-test = "0.1" \ No newline at end of file diff --git a/mediator-server/src/main.rs b/mediator-server/src/main.rs index 881dad29..96be69e6 100644 --- a/mediator-server/src/main.rs +++ b/mediator-server/src/main.rs @@ -1,68 +1,45 @@ -use axum::{ - http::{Method, Uri}, - middleware, - response::{IntoResponse, Response}, - Json, Router, -}; +use std::net::SocketAddr; +use axum::{response::{IntoResponse, Response}, Router, middleware, Json, http::{Uri, Method}}; use serde_json::json; -use std::net::SocketAddr; -use tokio_util::sync::CancellationToken; use uuid::Uuid; -mod constants; mod coordinate_mediation; mod ctx; +mod constants; mod error; -mod log; mod models; +mod log; mod utils; -use crate::{log::log_request, models::RecipientController}; +use crate::{models::RecipientController, log::log_request}; -pub use self::{ - ctx::Ctx, - error::{ClientError, Error, Result}, -}; +pub use self::{error::{Error, Result, ClientError}, ctx::Ctx}; #[tokio::main] async fn main() -> Result<()> { - run_shutdown().await; - Ok(()) -} -async fn run_shutdown() { let mc = RecipientController::new().await; let routes_mediate_request = coordinate_mediation::routes_mediate_request::routes(mc.clone()); let routes_all = Router::new() - .nest( - "/coordinate-mediation/2.0/mediate-request", - routes_mediate_request, - ) + .nest("/coordinate-mediation/2.0/mediate-request", routes_mediate_request) .layer(middleware::map_response(main_response_mapper)); - // create cancellation tokens which when closed will tell processes to shutdown - let token = CancellationToken::new(); - // region: --- Start Server let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("->> Listening on {addr}\n"); - - // spawning task on server to handle server's shutdown - tokio::spawn(async move { - axum::Server::bind(&addr) - .serve(routes_all.into_make_service()) - .await - .unwrap(); - }); + axum::Server::bind(&addr) + .serve(routes_all.into_make_service()) + .await + .unwrap(); // endregion: --- Start Server - // select on the operations for which we wish to gracefully shutdown the server - tokio::select! { - _ = tokio::signal::ctrl_c() => {eprintln!("shutting down"); token.cancel()} - } + Ok(()) } -async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Response { +async fn main_response_mapper( + uri: Uri, + req_method: Method, + res:Response) -> Response { println!("->> {:<12} - main_response_mapper", "RESPONSE_MAPPER"); let uuid = Uuid::new_v4(); @@ -84,8 +61,9 @@ async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Re // Build the new response from the client_error_body. (*status_code, Json(client_error_body)).into_response() - }); + }); + // -- Build and log the server log line let client_error = client_status_error.unzip().1; let _ = log_request(uuid, req_method, uri, service_error, client_error).await; @@ -94,15 +72,3 @@ async fn main_response_mapper(uri: Uri, req_method: Method, res: Response) -> Re error_response.unwrap_or(res) } -#[cfg(test)] -mod tests { - use super::*; - use tokio::signal; - #[tokio::test] - async fn test_server_shutdown() { - // run server in background - tokio::spawn(run_shutdown()); - // send a shutdown signal to main thread - signal::unix::SignalKind::terminate(); - } -} From 7d19446af0a7cbedc6e6d73b4d8f62a0fd0906a7 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Wed, 10 Jul 2024 09:09:37 +0100 Subject: [PATCH 12/22] fix(config): fixing dependencies --- generic-server/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index b78e63d3..c36afa03 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -27,9 +27,9 @@ did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } tokio-util = {version = "0.7.11", features = ["rt"] } once_cell = "1.19.0" -nix = { version = "0.29.0", features = ["feature"] } [dev-dependencies] +nix = { version = "0.29.0", features = ["feature"] } tower = { version = "0.4.13", features = ["util"] } [features] From 89730c56e45206bc06ffa3c9cbd65fb85f4af14b Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Wed, 10 Jul 2024 14:09:50 +0100 Subject: [PATCH 13/22] fix(test): added test for plugin unloading and some code formating --- generic-server/src/main.rs | 13 ++++++------- generic-server/src/plugin/container.rs | 18 +++++++++++++++++- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index b1822b2d..d793d0b4 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -21,9 +21,10 @@ async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { // Enable logging config_tracing(); - + // Load plugins let (mut plugin_container, router) = app(); + // spawn task for server tokio::spawn(async move { Server::bind(&addr) @@ -34,12 +35,15 @@ async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { + tracing::info!("\nshuting down gracefully"); let _ = plugin_container.unload(); token.cancel();} }; } fn config_tracing() { use tracing::Level; + use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt}; + let tracing_layer = tracing_subscriber::fmt::layer(); let filter = filter::Targets::new() .with_target("hyper::proto", Level::INFO) @@ -54,12 +58,9 @@ fn config_tracing() { #[cfg(test)] mod tests { - use tokio::signal; - use super::generic_server_with_gracefull_shutdown; use std::net::SocketAddr; - #[tokio::test] async fn test_server_shutdown() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); @@ -69,7 +70,5 @@ mod tests { // run server in background tokio::spawn(generic_server_with_gracefull_shutdown(addr)); // send a shutdown signal to main thread - signal::unix::SignalKind::terminate(); - - } + } } diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index f143718e..2dd9cc45 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -118,7 +118,7 @@ impl<'a> PluginContainer<'a> { Err(PluginContainerError::PluginErrorMap(errors)) } } - + /// Merge collected routes from all plugins successfully initialized. pub fn routes(&self) -> Result { if self.loaded { @@ -298,4 +298,20 @@ mod tests { PluginContainerError::Unloaded ); } + #[test] + fn test_unloading() { + // loading container from test_loading + let mut container = PluginContainer { + loaded: false, + collected_routes: vec![], + plugins: &vec![Box::new(FirstPlugin {}), Box::new(SecondPlugin {})], + }; + assert!(container.load().is_ok()); + assert!(container.routes().is_ok()); + + // unloading container and clearing routes + assert!(container.unload().is_ok()); + + assert_eq!(container.collected_routes.len(), 0); + } } From 9d4c701ea6918250d53cc1d00d786232974cca4a Mon Sep 17 00:00:00 2001 From: Christiantyemele <144161981+Christiantyemele@users.noreply.github.com> Date: Wed, 10 Jul 2024 14:24:02 +0100 Subject: [PATCH 14/22] Update main.rs --- generic-server/src/main.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index d793d0b4..f2c912a8 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -13,8 +13,6 @@ async fn main() { } async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { - // creating cancellation token tell server task and process to shutdown - let token = CancellationToken::new(); // Load dotenv-flow variables dotenv_flow::dotenv_flow().ok(); @@ -36,7 +34,7 @@ async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::info!("\nshuting down gracefully"); - let _ = plugin_container.unload(); token.cancel();} + let _ = plugin_container.unload();} }; } fn config_tracing() { From 681010451b4bf31ad142b55d84e3e9ab6d7f4e26 Mon Sep 17 00:00:00 2001 From: Christiantyemele <144161981+Christiantyemele@users.noreply.github.com> Date: Wed, 10 Jul 2024 14:25:16 +0100 Subject: [PATCH 15/22] Update main.rs --- generic-server/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index f2c912a8..74543215 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,7 +1,6 @@ use axum::Server; use generic_server::app; use std::net::SocketAddr; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() { From 53392441ed274de25712d5d7f1b330f49f8241d8 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Wed, 10 Jul 2024 14:29:54 +0100 Subject: [PATCH 16/22] fix(update): removed tokio_utils --- generic-server/Cargo.toml | 1 - generic-server/src/main.rs | 2 -- 2 files changed, 3 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index c36afa03..80564d24 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -25,7 +25,6 @@ server-plugin = { path = "../server-plugin" } chrono = { version = "0.4.26", optional = true } did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } -tokio-util = {version = "0.7.11", features = ["rt"] } once_cell = "1.19.0" [dev-dependencies] diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index f2c912a8..9fdd73c8 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -1,7 +1,6 @@ use axum::Server; use generic_server::app; use std::net::SocketAddr; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() { @@ -13,7 +12,6 @@ async fn main() { } async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { - // Load dotenv-flow variables dotenv_flow::dotenv_flow().ok(); From 329d3017f45810f760eb11d2e0eecd4eb48efa72 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 11 Jul 2024 11:49:26 +0100 Subject: [PATCH 17/22] fix(global): reformatting --- generic-server/Cargo.toml | 2 +- generic-server/src/lib.rs | 14 ++++++-------- generic-server/src/main.rs | 21 +++------------------ 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index 80564d24..f51a681d 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -25,7 +25,7 @@ server-plugin = { path = "../server-plugin" } chrono = { version = "0.4.26", optional = true } did-endpoint = { path = "../did-endpoint", optional = true } oob-messages = { path = "../oob-messages", optional = true } -once_cell = "1.19.0" + [dev-dependencies] nix = { version = "0.29.0", features = ["feature"] } diff --git a/generic-server/src/lib.rs b/generic-server/src/lib.rs index 6b641153..c7802d81 100644 --- a/generic-server/src/lib.rs +++ b/generic-server/src/lib.rs @@ -9,13 +9,11 @@ use tower_http::trace::TraceLayer; pub fn app() -> (PluginContainer<'static>, Router) { let mut container = PluginContainer::default(); let _ = container.load(); - - let router = Router::new() - .merge(container.routes().unwrap_or_default()) - .layer(TraceLayer::new_for_http()) - .layer(CatchPanicLayer::new()); - (container, router) -} - + let router = Router::new() + .merge(container.routes().unwrap_or_default()) + .layer(TraceLayer::new_for_http()) + .layer(CatchPanicLayer::new()); + (container, router) +} diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 9fdd73c8..6b8757cf 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -32,9 +32,11 @@ async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::info!("\nshuting down gracefully"); - let _ = plugin_container.unload();} + let _ = plugin_container.unload(); + } }; } + fn config_tracing() { use tracing::Level; @@ -51,20 +53,3 @@ fn config_tracing() { .with(filter) .init(); } - -#[cfg(test)] -mod tests { - use super::generic_server_with_gracefull_shutdown; - use std::net::SocketAddr; - - #[tokio::test] - async fn test_server_shutdown() { - let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); - let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); - tracing::info!("listening on {addr}"); - - // run server in background - tokio::spawn(generic_server_with_gracefull_shutdown(addr)); - // send a shutdown signal to main thread - } -} From 3b7b958a44c9402f92ff8307bf415825f2ef5748 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 11 Jul 2024 13:11:19 +0100 Subject: [PATCH 18/22] fix(): resloved conflicts --- generic-server/src/lib.rs | 2 +- generic-server/src/plugin/container.rs | 40 +++++++++++++++----------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/generic-server/src/lib.rs b/generic-server/src/lib.rs index c7802d81..3f8da177 100644 --- a/generic-server/src/lib.rs +++ b/generic-server/src/lib.rs @@ -6,7 +6,7 @@ use axum::Router; use tower_http::catch_panic::CatchPanicLayer; use tower_http::trace::TraceLayer; -pub fn app() -> (PluginContainer<'static>, Router) { +pub fn app() -> (PluginContainer, Router) { let mut container = PluginContainer::default(); let _ = container.load(); diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index 99616fd6..8d7fcb5d 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -1,7 +1,7 @@ -use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, Mutex}; use axum::Router; use server_plugin::{Plugin, PluginError}; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; use super::PLUGINS; @@ -104,14 +104,17 @@ impl PluginContainer { let errors: HashMap<_, _> = self .plugins .iter() - .filter_map(|plugin| match plugin.unmount() { - Ok(_) => { - tracing::info!("Unmounted plugin {}", plugin.name()); - None - } - Err(err) => { - tracing::error!("error unmounting plugin {}", plugin.name()); - Some((plugin.name().to_owned(), err)) + .filter_map(|plugin| { + let plugin = plugin.lock().unwrap(); + match plugin.unmount() { + Ok(_) => { + tracing::info!("Unmounted plugin {}", plugin.name()); + None + } + Err(err) => { + tracing::error!("error unmounting plugin {}", plugin.name()); + Some((plugin.name().to_owned(), err)) + } } }) .collect(); @@ -284,26 +287,26 @@ mod tests { let plugins: Arc>>> = Arc::new(vec![ Arc::new(Mutex::new(FirstPlugin {})), Arc::new(Mutex::new(SecondPlugin {})), - Arc::new(Mutex::new(SecondAgainPlugin {})), + Arc::new(Mutex::new(SecondAgainPlugin {})), ]); - + // Initialize PluginContainer with the mock plugins let mut container = PluginContainer { loaded: false, collected_routes: vec![], plugins: Arc::clone(&plugins), }; - + // Attempt to load plugins with duplicates let result = container.load(); - + // Assert that the result is an error due to duplicate entries assert_eq!(result.unwrap_err(), PluginContainerError::DuplicateEntry); - + // Verify collected routes (should not be affected by duplicates) assert_eq!(container.collected_routes.len(), 0); // No routes should be collected on error } - + #[test] fn test_loading_with_failing_plugin() { // Mock plugins for testing @@ -361,7 +364,10 @@ mod tests { let mut container = PluginContainer { loaded: false, collected_routes: vec![], - plugins: &vec![Box::new(FirstPlugin {}), Box::new(SecondPlugin {})], + plugins: Arc::new(vec![ + Arc::new(Mutex::new(FirstPlugin {})), + Arc::new(Mutex::new(SecondPlugin {})), + ]), }; assert!(container.load().is_ok()); assert!(container.routes().is_ok()); From 705266d9073cd33c512a8f10287fc6c2ca4ab627 Mon Sep 17 00:00:00 2001 From: Christiantyemele Date: Thu, 11 Jul 2024 14:09:13 +0100 Subject: [PATCH 19/22] fix(global): fixing code formatting --- generic-server/Cargo.toml | 27 +++++++++++++------------- generic-server/src/main.rs | 7 +++---- generic-server/src/plugin/container.rs | 8 +++++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/generic-server/Cargo.toml b/generic-server/Cargo.toml index f51a681d..06512b63 100644 --- a/generic-server/Cargo.toml +++ b/generic-server/Cargo.toml @@ -1,40 +1,39 @@ [package] +edition = "2021" name = "generic-server" version = "0.1.0" -edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html default-run = "generic-server" [dependencies] async-trait = "0.1.73" -axum = { version = "0.6.20" } +axum = {version = "0.6.20"} dotenv-flow = "0.15.0" -hyper = { version = "0.14.27", features = ["full"] } +hyper = {version = "0.14.27", features = ["full"]} lazy_static = "1.4.0" serde_json = "1.0.104" -tokio = { version = "1.30.0", features = ["full"] } -tower-http = { version = "0.4.3", features = ["catch-panic", "trace"] } +tokio = {version = "1.30.0", features = ["full"]} +tower-http = {version = "0.4.3", features = ["catch-panic", "trace"]} tracing = "0.1.37" -tracing-subscriber = { version = "0.3.17", features = ["json"] } +tracing-subscriber = {version = "0.3.17", features = ["json"]} # Plugins traits -server-plugin = { path = "../server-plugin" } +server-plugin = {path = "../server-plugin"} # optional -chrono = { version = "0.4.26", optional = true } -did-endpoint = { path = "../did-endpoint", optional = true } -oob-messages = { path = "../oob-messages", optional = true } - +chrono = {version = "0.4.26", optional = true} +did-endpoint = {path = "../did-endpoint", optional = true} +oob-messages = {path = "../oob-messages", optional = true} [dev-dependencies] -nix = { version = "0.29.0", features = ["feature"] } -tower = { version = "0.4.13", features = ["util"] } +nix = {version = "0.29.0", features = ["feature"]} +tower = {version = "0.4.13", features = ["util"]} [features] default = ["plugin-index", "plugin-did_endpoint", "plugin-oob_messages"] # plugins -plugin-index = ["dep:chrono"] plugin-did_endpoint = ["dep:did-endpoint"] +plugin-index = ["dep:chrono"] plugin-oob_messages = ["dep:oob-messages"] diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 6b8757cf..62603971 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -8,10 +8,10 @@ async fn main() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); tracing::info!("listening on {addr}"); - generic_server_with_gracefull_shutdown(addr).await + generic_server_with_graceful_shutdown(addr).await } -async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { +async fn generic_server_with_graceful_shutdown(addr: SocketAddr) { // Load dotenv-flow variables dotenv_flow::dotenv_flow().ok(); @@ -31,12 +31,11 @@ async fn generic_server_with_gracefull_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { - tracing::info!("\nshuting down gracefully"); + tracing::info!("shuting down gracefully"); let _ = plugin_container.unload(); } }; } - fn config_tracing() { use tracing::Level; diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index 8d7fcb5d..3da136a3 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -360,7 +360,7 @@ mod tests { } #[test] fn test_unloading() { - // loading container from test_loading + // Initialize PluginContainer with the mock plugins let mut container = PluginContainer { loaded: false, collected_routes: vec![], @@ -369,12 +369,14 @@ mod tests { Arc::new(Mutex::new(SecondPlugin {})), ]), }; + // Test unloading plugins assert!(container.load().is_ok()); - assert!(container.routes().is_ok()); + + // Verify collected routes + assert_eq!(container.collected_routes.len(), 2); // unloading container and clearing routes assert!(container.unload().is_ok()); - assert_eq!(container.collected_routes.len(), 0); } } From 018b6718e2b70663e455fe48fc3e83d571e69a72 Mon Sep 17 00:00:00 2001 From: Ingrid Kamga Date: Thu, 11 Jul 2024 14:41:16 +0100 Subject: [PATCH 20/22] Fix formatting --- generic-server/src/main.rs | 6 +++--- generic-server/src/plugin/container.rs | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 62603971..dbb845da 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -8,7 +8,7 @@ async fn main() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); tracing::info!("listening on {addr}"); - generic_server_with_graceful_shutdown(addr).await + generic_server_with_graceful_shutdown(addr).await; } async fn generic_server_with_graceful_shutdown(addr: SocketAddr) { @@ -31,14 +31,14 @@ async fn generic_server_with_graceful_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { - tracing::info!("shuting down gracefully"); + tracing::info!("shutting down gracefully"); let _ = plugin_container.unload(); } }; } + fn config_tracing() { use tracing::Level; - use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt}; let tracing_layer = tracing_subscriber::fmt::layer(); diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index 3da136a3..7ba200c6 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -108,7 +108,7 @@ impl PluginContainer { let plugin = plugin.lock().unwrap(); match plugin.unmount() { Ok(_) => { - tracing::info!("Unmounted plugin {}", plugin.name()); + tracing::info!("unmounted plugin {}", plugin.name()); None } Err(err) => { @@ -358,6 +358,7 @@ mod tests { PluginContainerError::Unloaded ); } + #[test] fn test_unloading() { // Initialize PluginContainer with the mock plugins From 6f2fd077df9b4ece2c26a1b5e5f631d4e9736ffc Mon Sep 17 00:00:00 2001 From: Christiantyemele <144161981+Christiantyemele@users.noreply.github.com> Date: Thu, 11 Jul 2024 15:27:52 +0100 Subject: [PATCH 21/22] Update main.rs --- generic-server/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/generic-server/src/main.rs b/generic-server/src/main.rs index 62603971..dbb845da 100644 --- a/generic-server/src/main.rs +++ b/generic-server/src/main.rs @@ -8,7 +8,7 @@ async fn main() { let port = std::env::var("SERVER_LOCAL_PORT").unwrap_or("3000".to_owned()); let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); tracing::info!("listening on {addr}"); - generic_server_with_graceful_shutdown(addr).await + generic_server_with_graceful_shutdown(addr).await; } async fn generic_server_with_graceful_shutdown(addr: SocketAddr) { @@ -31,14 +31,14 @@ async fn generic_server_with_graceful_shutdown(addr: SocketAddr) { tokio::select! { _ = tokio::signal::ctrl_c() => { - tracing::info!("shuting down gracefully"); + tracing::info!("shutting down gracefully"); let _ = plugin_container.unload(); } }; } + fn config_tracing() { use tracing::Level; - use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt}; let tracing_layer = tracing_subscriber::fmt::layer(); From 711ab8ef41436b43d4b7924d0c344bdf0d45d24e Mon Sep 17 00:00:00 2001 From: Christiantyemele <144161981+Christiantyemele@users.noreply.github.com> Date: Thu, 11 Jul 2024 15:29:23 +0100 Subject: [PATCH 22/22] Update container.rs --- generic-server/src/plugin/container.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/generic-server/src/plugin/container.rs b/generic-server/src/plugin/container.rs index 3da136a3..7ba200c6 100644 --- a/generic-server/src/plugin/container.rs +++ b/generic-server/src/plugin/container.rs @@ -108,7 +108,7 @@ impl PluginContainer { let plugin = plugin.lock().unwrap(); match plugin.unmount() { Ok(_) => { - tracing::info!("Unmounted plugin {}", plugin.name()); + tracing::info!("unmounted plugin {}", plugin.name()); None } Err(err) => { @@ -358,6 +358,7 @@ mod tests { PluginContainerError::Unloaded ); } + #[test] fn test_unloading() { // Initialize PluginContainer with the mock plugins