From 955f8e2daa0c393e1c0b84a1f211d676f661d801 Mon Sep 17 00:00:00 2001 From: Reboot-Codes Date: Fri, 27 Sep 2024 15:49:55 -0700 Subject: [PATCH] Broken: Attempting SIGINT Handling, ModMan formation. --- clover-hub/Cargo.lock | 20 +- clover-hub/Cargo.toml | 2 + clover-hub/dev.sh | 2 +- clover-hub/src/main.rs | 66 +++++- clover-hub/src/server/evtbuzz/listener.rs | 224 ++++++++++-------- clover-hub/src/server/evtbuzz/models.rs | 3 +- clover-hub/src/server/evtbuzz/websockets.rs | 2 +- clover-hub/src/server/mod.rs | 34 +-- clover-hub/src/server/modman/displays/mod.rs | 6 + clover-hub/src/server/modman/mod.rs | 88 ++++++- clover-hub/src/server/modman/models.rs | 12 +- clover-hub/src/tui/mod.rs | 3 +- clover-hub/src/utils.rs | 4 +- docs/docs/clover-hub/server/evtbuzz/events.md | 2 +- docs/docs/clover-hub/server/evtbuzz/store.md | 7 + 15 files changed, 331 insertions(+), 144 deletions(-) mode change 100644 => 100755 clover-hub/dev.sh create mode 100644 clover-hub/src/server/modman/displays/mod.rs create mode 100644 docs/docs/clover-hub/server/evtbuzz/store.md diff --git a/clover-hub/Cargo.lock b/clover-hub/Cargo.lock index a310385..a4872ff 100644 --- a/clover-hub/Cargo.lock +++ b/clover-hub/Cargo.lock @@ -345,10 +345,12 @@ dependencies = [ "regex", "serde", "serde_json", + "signal-hook 0.3.17", "taurus", "thiserror", "tokio", "tokio-stream", + "tokio-util", "tui", "url", "uuid", @@ -419,7 +421,7 @@ dependencies = [ "libc 0.2.155", "mio 0.7.14", "parking_lot 0.11.2", - "signal-hook", + "signal-hook 0.1.17", "winapi", ] @@ -436,7 +438,7 @@ dependencies = [ "mio 0.7.14", "parking_lot 0.11.2", "serde", - "signal-hook", + "signal-hook 0.1.17", "winapi", ] @@ -1626,6 +1628,16 @@ dependencies = [ "signal-hook-registry", ] +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc 0.2.155", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -1837,9 +1849,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", diff --git a/clover-hub/Cargo.toml b/clover-hub/Cargo.toml index c5a7790..f9cd69f 100644 --- a/clover-hub/Cargo.toml +++ b/clover-hub/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" crossterm = { version = "0.19", features = [ "serde" ] } tui = { version = "0.14", default-features = false, features = ['crossterm', 'serde'] } clap = { version = "4.5.4", features = ["derive"] } +signal-hook = "0.3.17" # Debugging Tools log = "0.4.8" @@ -18,6 +19,7 @@ chrono = { version = "0.4", features = ["serde"] } regex = "1.10.4" uuid = { version = "1.8.0", features = ["serde", "v4"] } thiserror = "1.0" +tokio-util = "0.7.12" # HTTP/WS api_key = "0.1.0" diff --git a/clover-hub/dev.sh b/clover-hub/dev.sh old mode 100644 new mode 100755 index 6bc7524..8a4b6fc --- a/clover-hub/dev.sh +++ b/clover-hub/dev.sh @@ -1,2 +1,2 @@ -CLOVER_LOG="clover::server=debug" CLOVER_MASTER_PRINT="true" cargo run -- run server +CLOVER_LOG="clover=debug" CLOVER_MASTER_PRINT="true" cargo run -- run server diff --git a/clover-hub/src/main.rs b/clover-hub/src/main.rs index b07a950..ae8a053 100644 --- a/clover-hub/src/main.rs +++ b/clover-hub/src/main.rs @@ -5,9 +5,13 @@ mod server; mod tui; mod utils; -use log::{info, warn}; +use log::{debug, info, warn}; use env_logger; +use signal_hook::consts::SIGINT; +use signal_hook::iterator::Signals; +use tokio::sync::mpsc; use std::env; +use std::error::Error; use std::ffi::OsString; use std::num::ParseIntError; use std::sync::Arc; @@ -64,46 +68,88 @@ fn unwrap_port_arg(arg: Result) -> u16 { } #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { env_logger::Builder::new() .parse_filters(&env::var("CLOVER_LOG").unwrap_or("info".to_string())) .init(); info!("Starting Clover Hub."); - let matches = cli().get_matches(); + // BLEHHHHHHHHHHH Seriously???? Anyways... + let matches = Box::leak(Box::new(cli().get_matches())); + let subcommand = matches.subcommand(); - match matches.subcommand() { + match subcommand { Some(("run", sub_matches)) => { let run_command = sub_matches.subcommand().unwrap_or(("aio", sub_matches)); match run_command { ("aio", sub_matches) => { + let mut signals = Signals::new([SIGINT])?; + let (server_signal_tx, server_signal_rx) = mpsc::unbounded_channel::(); + let (tui_signal_tx, tui_signal_rx) = mpsc::unbounded_channel::(); + let port = unwrap_port_arg(sub_matches.get_one::("port").expect("Default set in Clap.").parse::()); info!("Running Backend Server and Terminal UI (All-In-One)..."); let server_port = Arc::new(port); - let server_handle = tokio::task::spawn(async move { server_main(*server_port.to_owned()).await; }); + let server_handle = tokio::task::spawn(async move { + server_main(*server_port.to_owned(), server_signal_rx).await; + }); + let tui_port = Arc::new(port); let tui_handle = tokio::task::spawn(async move { - let _ = tui_main(*tui_port.to_owned(), Ok::("localhost".to_string()).ok()).await; + let _ = tui_main(*tui_port.to_owned(), Ok::("localhost".to_string()).ok(), tui_signal_rx).await; }); - futures::future::join_all(vec![tui_handle, server_handle]).await; + let signal_handle = tokio::task::spawn(async move { + for signal in signals.forever() { + info!("{}", signal); + let _ = server_signal_tx.send(signal.clone()); + let _ = tui_signal_tx.send(signal.clone()); + } + }); + + futures::future::join_all(vec![signal_handle, tui_handle, server_handle]).await; } ("server", sub_matches) => { + let mut signals = Signals::new([SIGINT])?; + let (server_signal_tx, server_signal_rx) = mpsc::unbounded_channel::(); let port = unwrap_port_arg(sub_matches.get_one::("port").expect("Default provided in Clap.").parse::()); info!("Running Backend Server..."); - server_main(port).await; + let server_handle = tokio::task::spawn(async move { + server_main(port, server_signal_rx).await; + }); + + let signal_handle = tokio::task::spawn(async move { + for signal in signals.forever() { + info!("{}", signal); + let _ = server_signal_tx.send(signal.clone()); + } + }); + + futures::future::join_all(vec![server_handle, signal_handle]).await; } ("tui", sub_matches) => { + let mut signals = Signals::new([SIGINT])?; + let (tui_signal_tx, tui_signal_rx) = mpsc::unbounded_channel::(); let host = sub_matches.get_one::("host").expect("Default set in Clap."); let port = unwrap_port_arg(sub_matches.get_one::("port").expect("Default set in Clap.").parse::()); + let signal_handle = tokio::task::spawn(async move { + for signal in signals.forever() { + let _ = tui_signal_tx.send(signal); + } + }); + info!("Running Terminal UI..."); let tui_host = Arc::new(host); - tui_main(port, Ok::((*tui_host.to_owned()).to_string()).ok()).await.err(); + let tui_handle = tokio::task::spawn(async move { + tui_main(port, Ok::((*tui_host.to_owned()).to_string()).ok(), tui_signal_rx).await.err(); + }); + + futures::future::join_all(vec![tui_handle, signal_handle]).await; } (name, _) => { unreachable!("Unsupported subcommand `{name}`") @@ -121,5 +167,5 @@ async fn main() { _ => unreachable!(), // If all subcommands are defined above, anything else is unreachable!() } - // Continued program logic goes here... + Ok(()) } \ No newline at end of file diff --git a/clover-hub/src/server/evtbuzz/listener.rs b/clover-hub/src/server/evtbuzz/listener.rs index 7db7020..915aea1 100644 --- a/clover-hub/src/server/evtbuzz/listener.rs +++ b/clover-hub/src/server/evtbuzz/listener.rs @@ -1,6 +1,7 @@ use log::{debug, error, info, warn}; use regex::Regex; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use url::Url; use std::collections::HashMap; use std::convert::Infallible; @@ -182,10 +183,12 @@ pub async fn evtbuzz_listener( mut modman_ipc: (&CoreUserConfig, UnboundedReceiver), mut inference_engine_ipc: (&CoreUserConfig, UnboundedReceiver), mut appd_ipc: (&CoreUserConfig, UnboundedReceiver), + mut signal_rx: UnboundedReceiver, evtbuzz_user_config: Arc ) { info!("Starting EvtBuzz on port: {}...", port); + let cancellation_token = CancellationToken::new(); let clients_tx: Arc>>> = Arc::new(Mutex::new(HashMap::new())); let arbiter_cfg = arbiter_ipc.0; @@ -250,169 +253,180 @@ pub async fn evtbuzz_listener( // TODO: Start creating GQL API endpoint. let server_port = Arc::new(port.clone()); + let http_token = cancellation_token.clone(); let http_handle = tokio::task::spawn(async move { - warp::serve(routes) + let (_, server) = warp::serve(routes) // TODO: Add option for listening address. - .try_bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), *server_port)).await; + .try_bind_with_graceful_shutdown(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), *server_port), async move { + tokio::select! { + _ = http_token.cancelled() => {} + } + }).expect(""); + server.await; + info!("Server stopped."); }); let ipc_dispatch_store = Arc::new(store.clone()); let ipc_dispatch_clients_tx = Arc::new(clients_tx.clone()); let ipc_dispatch_user_config = Arc::new(evtbuzz_user_config.clone()); - let ipc_dispatch_handle = tokio::task::spawn(async move { - while let Some(message) = ipc_rx.recv().await { - debug!("Got message type: {}, with data:\n {}", message.kind.clone(), message.message.clone()); - for client in ipc_dispatch_store.clients.lock().await.clone().into_iter() { - let client_id = Arc::new(client.0); - let mutex = &ipc_dispatch_clients_tx.to_owned(); - let client_senders = mutex.lock(); - let hash_map = &client_senders.await; - let mut message_sent = false; - - match hash_map.get(&client_id.to_string()) { - Some(client_sender) => { - if client.1.active { - match ipc_dispatch_store.clone().api_keys.lock().await.get(&client.1.api_key) { - Some(api_key) => { - for allowed_event_regex in &api_key.allowed_events_to { - match Regex::new(&allowed_event_regex) { - Ok(regex) => { - if regex.is_match(&allowed_event_regex) && !(message.author.clone().split("?client=").collect::>()[1] == *client_id.clone()) { - debug!("Sending event: \"{}\", to client: {}...", message.kind.clone(), client_id.clone()); - match client_sender.send(message.clone()) { - Ok(_) => { - message_sent = true; - }, - Err(e) => { - error!("Failed to send message to client: {}, due to:\n{}", client_id.clone(), e); - } - }; - - break; + let ipc_dispatch_token = cancellation_token.clone(); + tokio::spawn(ipc_dispatch_token.run_until_cancelled(async move { + match ipc_rx.recv().await { + Some(message) => { + debug!("Got message type: {}, with data:\n {}", message.kind.clone(), message.message.clone()); + for client in ipc_dispatch_store.clients.lock().await.clone().into_iter() { + let client_id = Arc::new(client.0); + let mutex = &ipc_dispatch_clients_tx.to_owned(); + let client_senders = mutex.lock(); + let hash_map = &client_senders.await; + let mut message_sent = false; + + match hash_map.get(&client_id.to_string()) { + Some(client_sender) => { + if client.1.active { + match ipc_dispatch_store.clone().api_keys.lock().await.get(&client.1.api_key) { + Some(api_key) => { + for allowed_event_regex in &api_key.allowed_events_to { + match Regex::new(&allowed_event_regex) { + Ok(regex) => { + if regex.is_match(&allowed_event_regex) && !(message.author.clone().split("?client=").collect::>()[1] == *client_id.clone()) { + debug!("Sending event: \"{}\", to client: {}...", message.kind.clone(), client_id.clone()); + match client_sender.send(message.clone()) { + Ok(_) => { + message_sent = true; + }, + Err(e) => { + error!("Failed to send message to client: {}, due to:\n{}", client_id.clone(), e); + } + }; + + break; + } + }, + Err(e) => { + error!("Message: \"{}\", failed, allowed event regular expression for client: {}, errored with: {}", message.kind, client_id.clone(), e); } - }, - Err(e) => { - error!("Message: \"{}\", failed, allowed event regular expression for client: {}, errored with: {}", message.kind, client_id.clone(), e); } } - } - if (!message_sent) && api_key.echo && (message.author.clone().split("?client=").collect::>()[1] == *client_id.clone()) { - debug!("Echoing event: \"{}\", to client: {}...", message.kind.clone(), client_id.clone()); - match client_sender.send(message.clone()) { - Ok(_) => { - message_sent = true; - }, - Err(e) => { - error!("Failed to send message to client: {}, due to:\n{}", client_id.clone(), e); - } + if (!message_sent) && api_key.echo && (message.author.clone().split("?client=").collect::>()[1] == *client_id.clone()) { + debug!("Echoing event: \"{}\", to client: {}...", message.kind.clone(), client_id.clone()); + match client_sender.send(message.clone()) { + Ok(_) => { + message_sent = true; + }, + Err(e) => { + error!("Failed to send message to client: {}, due to:\n{}", client_id.clone(), e); + } - }; + }; + } + }, + None => { + error!("DANGER! Client: {}, had API key removed from store without closing connection on removal, THIS IS BAD; please report this! Closing connection...", client_id.clone()); + + let kind = Url::parse("clover://evtbuzz.clover.reboot-codes.com/clients/unauthorize") + .unwrap() + .query_pairs_mut() + .append_pair("id", &client_id.clone()) + .finish() + .to_string(); + + let generated_message = gen_ipc_message( + &ipc_dispatch_store.clone(), + &ipc_dispatch_user_config.clone(), + kind, + "api key removed from store".to_string() + ).await; + ipc_dispatch_store.messages.lock().await.insert(generated_message.id.clone(), generated_message.clone().into()); + + let _ = client_sender.send(generated_message.clone()); } - }, - None => { - error!("DANGER! Client: {}, had API key removed from store without closing connection on removal, THIS IS BAD; please report this! Closing connection...", client_id.clone()); - - let kind = Url::parse("clover://hub/server/listener/clients/unauthorize") - .unwrap() - .query_pairs_mut() - .append_pair("id", &client_id.clone()) - .finish() - .to_string(); - - let generated_message = gen_ipc_message( - &ipc_dispatch_store.clone(), - &ipc_dispatch_user_config.clone(), - kind, - "api key removed from store".to_string() - ).await; - ipc_dispatch_store.messages.lock().await.insert(generated_message.id.clone(), generated_message.clone().into()); - - let _ = client_sender.send(generated_message.clone()); } } + }, + None => { + error!("Client: {}, does not exist in the client map!", client_id.clone()); } - }, - None => { - error!("Client: {}, does not exist in the client map!", client_id.clone()); } - } - if !message_sent { debug!("Message: \"{}\", not sent to client: {}", message.kind.clone(), client_id.clone()); } - } + if !message_sent { debug!("Message: \"{}\", not sent to client: {}", message.kind.clone(), client_id.clone()); } + } + }, + None => {} } - }); + })); // IPC Handle for data from WS clients. - let ipc_receive_handle = tokio::task::spawn(async move { - while let Some(msg) = from_client_rx.recv().await { - debug!("Got message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}", msg.author.clone(), msg.kind.clone(), msg.message.clone()); - match ipc_tx.send(msg.clone()) { - Ok(_) => {}, - Err(e) => { - error!("Failed to send message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}, due to:\n{}", msg.author.clone(), msg.kind.clone(), msg.message.clone(), e); - } - }; + let ipc_recv_token = cancellation_token.clone(); + let ipc_receive_handle = tokio::task::spawn(async move {ipc_recv_token.run_until_cancelled(async move { + match from_client_rx.recv().await { + Some(msg) => { + debug!("Got message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}", msg.author.clone(), msg.kind.clone(), msg.message.clone()); + match ipc_tx.send(msg.clone()) { + Ok(_) => {}, + Err(e) => { + error!("Failed to send message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}, due to:\n{}", msg.author.clone(), msg.kind.clone(), msg.message.clone(), e); + } + }; + }, + None => {} } - }); + });}); // Internal IPC Handles let from_arbiter_cfg = Arc::new(arbiter_cfg.clone()); let from_arbiter_store = Arc::new(store.clone()); let from_arbiter_tx = Arc::new(from_client_tx.clone()); - let from_arbiter_handle = tokio::task::spawn(async move { + let from_arbiter_token = cancellation_token.clone(); + let from_arbiter_handle = tokio::task::spawn(async move {from_arbiter_token.run_until_cancelled(async move { while let Some(msg) = arbiter_ipc.1.recv().await { handle_ipc_send(&from_arbiter_tx, msg, &from_arbiter_cfg.clone(), &from_arbiter_store.clone()).await; } - }); + });}); let from_renderer_cfg = Arc::new(renderer_cfg.clone()); let from_renderer_store = Arc::new(store.clone()); let from_renderer_tx = Arc::new(from_client_tx.clone()); - let from_renderer_handle = tokio::task::spawn(async move { + let from_renderer_token = cancellation_token.clone(); + let from_renderer_handle = tokio::task::spawn(async move {from_renderer_token.run_until_cancelled(async move { while let Some(msg) = renderer_ipc.1.recv().await { handle_ipc_send(&from_renderer_tx, msg, &from_renderer_cfg.clone(), &from_renderer_store.clone()).await; } - }); + });}); let from_modman_cfg = Arc::new(modman_cfg.clone()); let from_modman_store = Arc::new(store.clone()); let from_modman_tx = Arc::new(from_client_tx.clone()); - let from_modman_handle = tokio::task::spawn(async move { + let from_modman_token = cancellation_token.clone(); + let from_modman_handle = tokio::task::spawn(async move {from_modman_token.run_until_cancelled(async move { while let Some(msg) = modman_ipc.1.recv().await { handle_ipc_send(&from_modman_tx, msg, &from_modman_cfg.clone(), &from_modman_store.clone()).await; } - }); + });}); let from_inference_engine_cfg = Arc::new(inference_engine_cfg.clone()); let from_inference_engine_store = Arc::new(store.clone()); let from_inference_engine_tx = Arc::new(from_client_tx.clone()); - let from_inference_engine_handle = tokio::task::spawn(async move { + let from_inference_engine_token = cancellation_token.clone(); + let from_inference_engine_handle = tokio::task::spawn(async move {from_inference_engine_token.run_until_cancelled(async move { while let Some(msg) = inference_engine_ipc.1.recv().await { handle_ipc_send(&from_inference_engine_tx, msg, &from_inference_engine_cfg.clone(), &from_inference_engine_store.clone()).await; } - }); + });}); let from_appd_cfg = Arc::new(appd_cfg.clone()); let from_appd_store = Arc::new(store.clone()); let from_appd_tx = Arc::new(from_client_tx.clone()); - let from_appd_handle = tokio::task::spawn(async move { + let from_appd_token = cancellation_token.clone(); + let from_appd_handle = tokio::task::spawn(async move {from_appd_token.run_until_cancelled(async move { while let Some(msg) = appd_ipc.1.recv().await { handle_ipc_send(&from_appd_tx, msg, &from_appd_cfg.clone(), &from_appd_store.clone()).await; } - }); + });}); + + http_handle.await; - futures::future::join_all(vec![ - http_handle, - ipc_dispatch_handle, - ipc_receive_handle, - from_arbiter_handle, - from_renderer_handle, - from_modman_handle, - from_inference_engine_handle, - from_appd_handle - ]).await; - - info!("Shutting down EvtBuzz..."); + info!("Cleaning and saving store..."); // TODO: Clean up registered sessions when server is shutting down. } diff --git a/clover-hub/src/server/evtbuzz/models.rs b/clover-hub/src/server/evtbuzz/models.rs index 574c024..d69fe47 100644 --- a/clover-hub/src/server/evtbuzz/models.rs +++ b/clover-hub/src/server/evtbuzz/models.rs @@ -77,7 +77,8 @@ pub struct UserConfig { pub api_keys: Vec } -// TODO: Add serialization/deserialization functions.... ough. +// TODO: Add serialization/deserialization functions... +// TODO: Add options for making certain models ephemeral or persistent. #[derive(Debug, Clone)] pub struct Store { pub users: Arc>>, diff --git a/clover-hub/src/server/evtbuzz/websockets.rs b/clover-hub/src/server/evtbuzz/websockets.rs index 395452c..35b558f 100644 --- a/clover-hub/src/server/evtbuzz/websockets.rs +++ b/clover-hub/src/server/evtbuzz/websockets.rs @@ -130,7 +130,7 @@ pub async fn handle_ws_client(auth: (UserWithId, ApiKeyWithKey, ClientWithId, Se let send_client = Arc::new(client.clone()); let send_handle = tokio::task::spawn(async move { while let Some(msg) = to_client_rx.recv().await { - if msg.kind == Url::parse("clover://hub/server/listener/clients/unauthorize") + if msg.kind == Url::parse("clover://evtbuzz.clover.reboot-codes.com/clients/unauthorize") .unwrap() .query_pairs_mut() .append_pair("id", send_client.id.clone().as_str()) diff --git a/clover-hub/src/server/mod.rs b/clover-hub/src/server/mod.rs index 953986c..d270376 100644 --- a/clover-hub/src/server/mod.rs +++ b/clover-hub/src/server/mod.rs @@ -19,6 +19,9 @@ use arbiter::arbiter_main; use renderer::renderer_main; use modman::modman_main; use inference_engine::inference_engine_main; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::utils::gen_ipc_message; async fn handle_ipc_send(sender: &mpsc::UnboundedSender, msg: IPCMessageWithId, user_config: CoreUserConfig, store: &Store) { let users_mutex = &store.users.to_owned(); @@ -54,7 +57,7 @@ async fn handle_ipc_send(sender: &mpsc::UnboundedSender, msg: } } -pub async fn server_main(port: u16) { +pub async fn server_main(port: u16, mut signal_rx: UnboundedReceiver) { info!("Starting CloverHub..."); let ( @@ -79,7 +82,7 @@ pub async fn server_main(port: u16) { let (arbiter_to_tx, arbiter_to_rx) = mpsc::unbounded_channel::(); let arbiter_store = Arc::new(store.clone()); let arbiter_uca = Arc::new(arbiter_user_config.clone()); - let arbiter_handler = tokio::task::spawn(async move { + let arbiter_handle = tokio::task::spawn(async move { arbiter_main(arbiter_from_tx, arbiter_to_rx, arbiter_store.clone(), arbiter_uca.clone()).await; }); @@ -88,7 +91,7 @@ pub async fn server_main(port: u16) { let (renderer_to_tx, renderer_to_rx) = mpsc::unbounded_channel::(); let renderer_store = Arc::new(store.clone()); let renderer_uca = Arc::new(renderer_user_config.clone()); - let renderer_handler = tokio::task::spawn(async move { + let renderer_handle = tokio::task::spawn(async move { renderer_main(renderer_from_tx, renderer_to_rx, renderer_store.clone(), renderer_uca.clone()).await; }); @@ -97,7 +100,7 @@ pub async fn server_main(port: u16) { let (modman_to_tx, modman_to_rx) = mpsc::unbounded_channel::(); let modman_store = Arc::new(store.clone()); let modman_uca = Arc::new(modman_user_config.clone()); - let modman_handler = tokio::task::spawn(async move { + let modman_handle = tokio::task::spawn(async move { modman_main(modman_from_tx, modman_to_rx, modman_store.clone(), modman_uca.clone()).await; }); @@ -106,7 +109,7 @@ pub async fn server_main(port: u16) { let (inference_engine_to_tx, inference_engine_to_rx) = mpsc::unbounded_channel::(); let inference_engine_store = Arc::new(store.clone()); let inference_engine_uca = Arc::new(inference_engine_user_config.clone()); - let inference_engine_handler = tokio::task::spawn(async move { + let inference_engine_handle = tokio::task::spawn(async move { inference_engine_main(inference_engine_from_tx, inference_engine_to_rx, inference_engine_store.clone(), inference_engine_uca.clone()).await; }); @@ -115,7 +118,7 @@ pub async fn server_main(port: u16) { let (appd_to_tx, appd_to_rx) = mpsc::unbounded_channel::(); let appd_store = Arc::new(store.clone()); let appd_uca = Arc::new(appd_user_config.clone()); - let appd_handler = tokio::task::spawn(async move { + let appd_handle = tokio::task::spawn(async move { appd_main(appd_from_tx, appd_to_rx, appd_store.clone(), appd_uca.clone()).await; }); @@ -129,7 +132,7 @@ pub async fn server_main(port: u16) { let evtbuzz_modman_user_config_arc = Arc::new(modman_user_config.clone()); let evtbuzz_inference_engine_user_config_arc = Arc::new(inference_engine_user_config.clone()); let evtbuzz_appd_user_config_arc = Arc::new(appd_user_config.clone()); - let evtbuzz_handler = tokio::task::spawn(async move { + let evtbuzz_handle = tokio::task::spawn(async move { evtbuzz_listener( *evtbuzz_port.to_owned(), evtbuzz_from_tx, @@ -140,13 +143,14 @@ pub async fn server_main(port: u16) { (&evtbuzz_modman_user_config_arc.clone(), modman_from_rx), (&evtbuzz_inference_engine_user_config_arc.clone(), inference_engine_from_rx), (&evtbuzz_appd_user_config_arc.clone(), appd_from_rx), + signal_rx, evtbuzz_uca.clone() ).await; }); // Get messages from EvtBuzz (incl ones from the other threads), and pass them around. Yes, this does include looping events back into EvtBuzz. let ipc_listener_dispatch_store = Arc::new(store.clone()); - let ipc_from_listener_dispatch = tokio::task::spawn(async move { + let ipc_from_listener_dispatch_handle = tokio::task::spawn(async move { while let Some(msg) = evtbuzz_from_rx.recv().await { handle_ipc_send(&evtbuzz_to_tx, msg.clone(), evtbuzz_user_config.clone(), &ipc_listener_dispatch_store.clone()).await; handle_ipc_send(&arbiter_to_tx, msg.clone(), arbiter_user_config.clone(), &ipc_listener_dispatch_store.clone()).await; @@ -158,12 +162,12 @@ pub async fn server_main(port: u16) { }); futures::future::join_all(vec![ - evtbuzz_handler, - ipc_from_listener_dispatch, - arbiter_handler, - renderer_handler, - modman_handler, - inference_engine_handler, - appd_handler + evtbuzz_handle, + ipc_from_listener_dispatch_handle, + arbiter_handle, + renderer_handle, + modman_handle, + inference_engine_handle, + appd_handle ]).await; } diff --git a/clover-hub/src/server/modman/displays/mod.rs b/clover-hub/src/server/modman/displays/mod.rs new file mode 100644 index 0000000..8726a44 --- /dev/null +++ b/clover-hub/src/server/modman/displays/mod.rs @@ -0,0 +1,6 @@ +use crate::server::evtbuzz::models::Store; +use super::models::{Component, Module}; + +pub fn init_display(store: &Store, module: Module, id: String, component: Component) -> bool { + true +} diff --git a/clover-hub/src/server/modman/mod.rs b/clover-hub/src/server/modman/mod.rs index c90842a..e5deea0 100644 --- a/clover-hub/src/server/modman/mod.rs +++ b/clover-hub/src/server/modman/mod.rs @@ -1,15 +1,99 @@ pub mod models; +pub mod displays; use std::sync::Arc; -use log::info; +use displays::init_display; +use log::{debug, error, info, warn}; +use models::Module; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use url::Url; use super::evtbuzz::models::{IPCMessageWithId, CoreUserConfig, Store}; +async fn init_module(store: &Store, id: String, module: Module) -> (bool, usize) { + let mut initialized_module = module.initialized; + let mut initialized_module_components = 0; + + if !initialized_module { + if module.components.len() == 0 { + warn!("Module: {}, does not have any components, skipping.", id.clone()); + initialized_module = true; + } else { + for (component_id, component) in module.components.iter() { + if component.component_type.clone().starts_with(&"com.reboot-codes.clover.display".to_string()) { + if init_display(&store, module.clone(), component_id.clone(), component.clone()) { initialized_module_components += 1; } + } + + // TODO: Add init functions for other component types. + } + + if initialized_module_components != module.components.len() { + if initialized_module_components > 0 { + warn!("Module: {}, only initialized {} out of {} components!", id.clone(), initialized_module_components, module.components.len()); + initialized_module = true; + } else { + error!("Module: {}, failed to initialize!", id.clone()); + } + } + } + } + + (initialized_module, initialized_module_components) +} + pub async fn modman_main( ipc_tx: UnboundedSender, - ipc_rx: UnboundedReceiver, + mut ipc_rx: UnboundedReceiver, store: Arc, user_config: Arc ) { info!("Starting ModMan..."); + + // Initialize modules that were registered already via persistence. + for (id, module) in store.modules.lock().await.iter() { + info!("Initializing pre configured module: {}:\n type: {}\n name: {}", id.clone(), module.module_type.clone(), module.pretty_name.clone()); + let (initialized, _components_initialized) = init_module(&store, id.clone(), module.clone()).await; + + // Update the store with new state of the module. + if initialized { + store.modules.lock().await.insert(id.clone(), Module { + module_type: module.module_type.clone(), + pretty_name: module.pretty_name.clone(), + initialized: true, + components: module.components.clone() + }); + } + } + + let ipc_recv_handle = tokio::task::spawn(async move { + while let Some(msg) = ipc_rx.recv().await { + let kind = Url::parse(&msg.kind.clone()).unwrap(); + + // Verify that we care about this event. + if kind.host().unwrap() == url::Host::Domain("modman.clover.reboot-codes.com") { + debug!("Processing: {}", msg.kind.clone()); + } + } + }); + + futures::future::join_all(vec![ipc_recv_handle]).await; + + // Clean up all modules on shutdown. + info!("Cleaning up modules..."); + for (id, module) in store.modules.lock().await.iter() { + if module.initialized { + info!("De-initializing configured module: {}:\n type: {}\n name: {}", id.clone(), module.module_type.clone(), module.pretty_name.clone()); + // let (de_initialized, _components_de_initialized) = de_init_module(&store, id.clone(), module.clone()).await; + let de_initialized = true; + + // Update the store with new state of the module. + if de_initialized { + store.modules.lock().await.insert(id.clone(), Module { + module_type: module.module_type.clone(), + pretty_name: module.pretty_name.clone(), + initialized: false, + components: module.components.clone() + }); + } + } + } } diff --git a/clover-hub/src/server/modman/models.rs b/clover-hub/src/server/modman/models.rs index 1144218..31aecf6 100644 --- a/clover-hub/src/server/modman/models.rs +++ b/clover-hub/src/server/modman/models.rs @@ -1,6 +1,16 @@ -// TODO: Add module models!!!! +use std::collections::HashMap; + #[derive(Debug, Clone)] pub struct Module { pub module_type: String, pub pretty_name: String, + pub initialized: bool, + pub components: HashMap } + +#[derive(Debug, Clone)] +pub struct Component { + pub component_type: String +} + +// TODO: Add component types diff --git a/clover-hub/src/tui/mod.rs b/clover-hub/src/tui/mod.rs index fc94e39..069573e 100644 --- a/clover-hub/src/tui/mod.rs +++ b/clover-hub/src/tui/mod.rs @@ -1,5 +1,6 @@ use std::{io, thread, time::Duration}; use log::info; +use tokio::sync::mpsc::UnboundedReceiver; use tui::{ backend::CrosstermBackend, layout::{Constraint, Direction, Layout}, style::{Color, Style}, text::{Span, Spans}, widgets::{Block, Borders, Paragraph}, Terminal }; @@ -9,7 +10,7 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; -pub async fn tui_main(port: u16, host: Option) -> Result<(), io::Error> { +pub async fn tui_main(port: u16, host: Option, signal_rx: UnboundedReceiver) -> Result<(), io::Error> { info!("Starting TUI..."); // setup terminal diff --git a/clover-hub/src/utils.rs b/clover-hub/src/utils.rs index 51d6672..c781372 100644 --- a/clover-hub/src/utils.rs +++ b/clover-hub/src/utils.rs @@ -1,4 +1,4 @@ -use std::{hash::{DefaultHasher, Hash, Hasher}, sync::Arc}; +use std::hash::{DefaultHasher, Hash, Hasher}; use api_key::types::{ApiKeyResults, Default, StringGenerator}; use chrono::prelude::{DateTime, Utc}; use uuid::Uuid; @@ -34,7 +34,7 @@ pub fn gen_api_key() -> String { /// Generates a new API key after checking that it is not currently in the Store. pub async fn gen_api_key_with_check(store: &Store) -> String { loop { - let api_key = Uuid::new_v4().to_string(); + let api_key = gen_api_key(); match store.api_keys.lock().await.get(&api_key.clone()) { Some(_) => {}, None => { diff --git a/docs/docs/clover-hub/server/evtbuzz/events.md b/docs/docs/clover-hub/server/evtbuzz/events.md index 0e68048..6e6363c 100644 --- a/docs/docs/clover-hub/server/evtbuzz/events.md +++ b/docs/docs/clover-hub/server/evtbuzz/events.md @@ -4,7 +4,7 @@ These events are agnostic to what's connected to CloverHub. ## Unauthorize Client -`clover://hub.clover.reboot-codes.com/server/listener/clients/unauthorize` +`clover://evtbuzz.clover.reboot-codes.com/clients/unauthorize` Params: diff --git a/docs/docs/clover-hub/server/evtbuzz/store.md b/docs/docs/clover-hub/server/evtbuzz/store.md new file mode 100644 index 0000000..2ba3830 --- /dev/null +++ b/docs/docs/clover-hub/server/evtbuzz/store.md @@ -0,0 +1,7 @@ +# Store + +The store is an in memory state machine managed by EvtBuzz. All configurations and ephemeral states are stored here. + +## Ephemeral Data + +Ephemeral data in the store is dumped when CloverHub exits by default. IN THE FUTURE, you may supply configuration to keep certain models in persistent storage, or make normally persistent models into ephemeral ones.