From 5e45adea6250bee70ce1f063704a28192bd70b2e Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Tue, 8 Oct 2024 05:47:25 +0000 Subject: [PATCH] refactor(iroha_torii): spawn a single server task with only one `TcpListener` (#5122) Signed-off-by: 0x009922 <43530070+0x009922@users.noreply.github.com> --- crates/iroha_torii/Cargo.toml | 2 +- crates/iroha_torii/src/lib.rs | 99 +++++++++-------------------------- crates/irohad/src/lib.rs | 19 +++++-- 3 files changed, 41 insertions(+), 79 deletions(-) diff --git a/crates/iroha_torii/Cargo.toml b/crates/iroha_torii/Cargo.toml index 7a541e7fb86..c4155efe778 100644 --- a/crates/iroha_torii/Cargo.toml +++ b/crates/iroha_torii/Cargo.toml @@ -28,7 +28,7 @@ schema = ["iroha_schema", "iroha_schema_gen"] [dependencies] iroha_core = { workspace = true } iroha_config = { workspace = true } -iroha_primitives = { workspace = true } +iroha_primitives = { workspace = true, features = ["std"] } iroha_logger = { workspace = true } iroha_data_model = { workspace = true, features = ["http"] } iroha_version = { workspace = true } diff --git a/crates/iroha_torii/src/lib.rs b/crates/iroha_torii/src/lib.rs index 2ff9261d9b4..136fedeeecb 100644 --- a/crates/iroha_torii/src/lib.rs +++ b/crates/iroha_torii/src/lib.rs @@ -5,7 +5,7 @@ //! - `telemetry`: enables Status, Metrics, and API Version endpoints //! - `schema`: enables Data Model Schema endpoint -use std::{fmt::Debug, net::ToSocketAddrs, sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use axum::{ extract::{DefaultBodyLimit, WebSocketUpgrade}, @@ -14,8 +14,7 @@ use axum::{ routing::{get, post}, Router, }; -use error_stack::IntoReportCompat; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use error_stack::ResultExt; use iroha_config::{ base::{util::Bytes, WithOrigin}, parameters::actual::Torii as Config, @@ -35,7 +34,7 @@ use iroha_data_model::ChainId; use iroha_futures::supervisor::ShutdownSignal; use iroha_primitives::addr::SocketAddr; use iroha_torii_const::uri; -use tokio::{net::TcpListener, task}; +use tokio::net::TcpListener; use tower_http::{ timeout::TimeoutLayer, trace::{DefaultMakeSpan, TraceLayer}, @@ -239,79 +238,28 @@ impl Torii { )) } - /// Start main API endpoints. - /// - /// # Errors - /// Can fail due to listening to network or if http server fails - async fn start_api( - self: Arc, - shutdown_signal: &ShutdownSignal, - ) -> eyre::Result>>> { - let torii_address = self.address.value(); - - let handles = torii_address - .to_socket_addrs()? - .map(TcpListener::bind) - .collect::>() - .try_collect::>() - .await? - .into_iter() - .map(|listener| { - let signal = shutdown_signal.clone(); - let signal = async move { signal.receive().await }; - let torii = Arc::clone(&self); - let api_router = torii.create_api_router(); - - let serve_fut = async move { - axum::serve(listener, api_router) - .with_graceful_shutdown(signal) - .await - .map_err(eyre::Report::from) - }; - task::spawn(serve_fut) - }) - .collect(); - - Ok(handles) - } - /// To handle incoming requests `Torii` should be started first. /// /// # Errors /// Can fail due to listening to network or if http server fails - #[iroha_futures::telemetry_future] - pub async fn start( - self, - shutdown_signal: &ShutdownSignal, - ) -> error_stack::Result, eyre::Report> { - let torii = Arc::new(self); - let mut handles = vec![]; - - handles.extend( - Arc::clone(&torii) - .start_api(shutdown_signal) - .await - .into_report() - .map_err(|err| err.attach_printable(torii.address.clone().into_attachment()))?, - ); - - let run = handles - .into_iter() - .collect::>() - .for_each(|handle| { - match handle { - Err(error) => { - iroha_logger::error!(%error, "Join handle error"); - } - Ok(Err(error)) => { - iroha_logger::error!(%error, "Error while running torii"); - } - _ => {} - } - futures::future::ready(()) - }); - - Ok(run) + // #[iroha_futures::telemetry_future] + pub async fn start(self, shutdown_signal: ShutdownSignal) -> error_stack::Result<(), Error> { + let torii_address = self.address.value().clone(); + + let listener = match torii_address { + SocketAddr::Ipv4(v) => TcpListener::bind(std::net::SocketAddr::V4(v.into())).await, + SocketAddr::Ipv6(v) => TcpListener::bind(std::net::SocketAddr::V6(v.into())).await, + SocketAddr::Host(v) => TcpListener::bind((v.host.as_ref(), v.port)).await, + } + .change_context(Error::StartServer) + .attach_printable("failed to bind to the specified address") + .attach_printable_lazy(|| self.address.clone().into_attachment())?; + let api_router = self.create_api_router(); + + axum::serve(listener, api_router) + .with_graceful_shutdown(async move { shutdown_signal.receive().await }) + .await + .change_context(Error::FailedExit) } } @@ -339,6 +287,10 @@ pub enum Error { ConfigurationFailure(#[from] KisoError), /// Failed to find status segment by provided path StatusSegmentNotFound(#[source] eyre::Report), + /// Failed to start Torii + StartServer, + /// Torii server terminated with an error + FailedExit, } impl IntoResponse for Error { @@ -367,6 +319,7 @@ impl Error { #[cfg(feature = "profiling")] Pprof(_) => StatusCode::INTERNAL_SERVER_ERROR, ConfigurationFailure(_) => StatusCode::INTERNAL_SERVER_ERROR, + StartServer | FailedExit => unreachable!("these never occur during request handling"), } } diff --git a/crates/irohad/src/lib.rs b/crates/irohad/src/lib.rs index 9563e94970a..7875c67f69f 100644 --- a/crates/irohad/src/lib.rs +++ b/crates/irohad/src/lib.rs @@ -10,6 +10,7 @@ use std::{ future::Future, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use clap::Parser; @@ -34,7 +35,7 @@ use iroha_core::{ IrohaNetwork, }; use iroha_data_model::{block::SignedBlock, prelude::*}; -use iroha_futures::supervisor::{ShutdownSignal, Supervisor}; +use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal, Supervisor}; use iroha_genesis::GenesisBlock; use iroha_logger::{actor::LoggerHandle, InitConfig as LoggerInitConfig}; use iroha_primitives::addr::SocketAddr; @@ -323,10 +324,18 @@ impl Iroha { #[cfg(feature = "telemetry")] metrics_reporter, ) - .start(&supervisor.shutdown_signal()) - .await - .map_err(|report| report.change_context(StartError::StartTorii))?; - supervisor.monitor(tokio::spawn(torii_run)); + .start(supervisor.shutdown_signal()); + supervisor.monitor(Child::new( + tokio::spawn(async move { + if let Err(err) = torii_run.await { + iroha_logger::error!(?err, "Torii failed to terminate gracefully"); + // TODO: produce non-zero exit code or something + } else { + iroha_logger::debug!("Torii exited normally"); + }; + }), + OnShutdown::Wait(Duration::from_secs(5)), + )); supervisor.monitor(tokio::task::spawn(config_updates_relay(kiso, logger)));