Skip to content

Commit

Permalink
refactor(iroha_torii): spawn a single server task with only one `TcpL…
Browse files Browse the repository at this point in the history
…istener` (#5122)

Signed-off-by: 0x009922 <[email protected]>
  • Loading branch information
0x009922 authored Oct 8, 2024
1 parent 0059c39 commit 5e45ade
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 79 deletions.
2 changes: 1 addition & 1 deletion crates/iroha_torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ schema = ["iroha_schema", "iroha_schema_gen"]
[dependencies]
iroha_core = { workspace = true }
iroha_config = { workspace = true }
iroha_primitives = { workspace = true }
iroha_primitives = { workspace = true, features = ["std"] }
iroha_logger = { workspace = true }
iroha_data_model = { workspace = true, features = ["http"] }
iroha_version = { workspace = true }
Expand Down
99 changes: 26 additions & 73 deletions crates/iroha_torii/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! - `telemetry`: enables Status, Metrics, and API Version endpoints
//! - `schema`: enables Data Model Schema endpoint
use std::{fmt::Debug, net::ToSocketAddrs, sync::Arc, time::Duration};
use std::{fmt::Debug, sync::Arc, time::Duration};

use axum::{
extract::{DefaultBodyLimit, WebSocketUpgrade},
Expand All @@ -14,8 +14,7 @@ use axum::{
routing::{get, post},
Router,
};
use error_stack::IntoReportCompat;
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use error_stack::ResultExt;
use iroha_config::{
base::{util::Bytes, WithOrigin},
parameters::actual::Torii as Config,
Expand All @@ -35,7 +34,7 @@ use iroha_data_model::ChainId;
use iroha_futures::supervisor::ShutdownSignal;
use iroha_primitives::addr::SocketAddr;
use iroha_torii_const::uri;
use tokio::{net::TcpListener, task};
use tokio::net::TcpListener;
use tower_http::{
timeout::TimeoutLayer,
trace::{DefaultMakeSpan, TraceLayer},
Expand Down Expand Up @@ -239,79 +238,28 @@ impl Torii {
))
}

/// Start main API endpoints.
///
/// # Errors
/// Can fail due to listening to network or if http server fails
async fn start_api(
self: Arc<Self>,
shutdown_signal: &ShutdownSignal,
) -> eyre::Result<Vec<task::JoinHandle<eyre::Result<()>>>> {
let torii_address = self.address.value();

let handles = torii_address
.to_socket_addrs()?
.map(TcpListener::bind)
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<TcpListener>>()
.await?
.into_iter()
.map(|listener| {
let signal = shutdown_signal.clone();
let signal = async move { signal.receive().await };
let torii = Arc::clone(&self);
let api_router = torii.create_api_router();

let serve_fut = async move {
axum::serve(listener, api_router)
.with_graceful_shutdown(signal)
.await
.map_err(eyre::Report::from)
};
task::spawn(serve_fut)
})
.collect();

Ok(handles)
}

/// To handle incoming requests `Torii` should be started first.
///
/// # Errors
/// Can fail due to listening to network or if http server fails
#[iroha_futures::telemetry_future]
pub async fn start(
self,
shutdown_signal: &ShutdownSignal,
) -> error_stack::Result<impl core::future::Future<Output = ()>, eyre::Report> {
let torii = Arc::new(self);
let mut handles = vec![];

handles.extend(
Arc::clone(&torii)
.start_api(shutdown_signal)
.await
.into_report()
.map_err(|err| err.attach_printable(torii.address.clone().into_attachment()))?,
);

let run = handles
.into_iter()
.collect::<FuturesUnordered<_>>()
.for_each(|handle| {
match handle {
Err(error) => {
iroha_logger::error!(%error, "Join handle error");
}
Ok(Err(error)) => {
iroha_logger::error!(%error, "Error while running torii");
}
_ => {}
}
futures::future::ready(())
});

Ok(run)
// #[iroha_futures::telemetry_future]
pub async fn start(self, shutdown_signal: ShutdownSignal) -> error_stack::Result<(), Error> {
let torii_address = self.address.value().clone();

let listener = match torii_address {
SocketAddr::Ipv4(v) => TcpListener::bind(std::net::SocketAddr::V4(v.into())).await,
SocketAddr::Ipv6(v) => TcpListener::bind(std::net::SocketAddr::V6(v.into())).await,
SocketAddr::Host(v) => TcpListener::bind((v.host.as_ref(), v.port)).await,
}
.change_context(Error::StartServer)
.attach_printable("failed to bind to the specified address")
.attach_printable_lazy(|| self.address.clone().into_attachment())?;
let api_router = self.create_api_router();

axum::serve(listener, api_router)
.with_graceful_shutdown(async move { shutdown_signal.receive().await })
.await
.change_context(Error::FailedExit)
}
}

Expand Down Expand Up @@ -339,6 +287,10 @@ pub enum Error {
ConfigurationFailure(#[from] KisoError),
/// Failed to find status segment by provided path
StatusSegmentNotFound(#[source] eyre::Report),
/// Failed to start Torii
StartServer,
/// Torii server terminated with an error
FailedExit,
}

impl IntoResponse for Error {
Expand Down Expand Up @@ -367,6 +319,7 @@ impl Error {
#[cfg(feature = "profiling")]
Pprof(_) => StatusCode::INTERNAL_SERVER_ERROR,
ConfigurationFailure(_) => StatusCode::INTERNAL_SERVER_ERROR,
StartServer | FailedExit => unreachable!("these never occur during request handling"),
}
}

Expand Down
19 changes: 14 additions & 5 deletions crates/irohad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
future::Future,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};

use clap::Parser;
Expand All @@ -34,7 +35,7 @@ use iroha_core::{
IrohaNetwork,
};
use iroha_data_model::{block::SignedBlock, prelude::*};
use iroha_futures::supervisor::{ShutdownSignal, Supervisor};
use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal, Supervisor};
use iroha_genesis::GenesisBlock;
use iroha_logger::{actor::LoggerHandle, InitConfig as LoggerInitConfig};
use iroha_primitives::addr::SocketAddr;
Expand Down Expand Up @@ -323,10 +324,18 @@ impl Iroha {
#[cfg(feature = "telemetry")]
metrics_reporter,
)
.start(&supervisor.shutdown_signal())
.await
.map_err(|report| report.change_context(StartError::StartTorii))?;
supervisor.monitor(tokio::spawn(torii_run));
.start(supervisor.shutdown_signal());
supervisor.monitor(Child::new(
tokio::spawn(async move {
if let Err(err) = torii_run.await {
iroha_logger::error!(?err, "Torii failed to terminate gracefully");
// TODO: produce non-zero exit code or something
} else {
iroha_logger::debug!("Torii exited normally");
};
}),
OnShutdown::Wait(Duration::from_secs(5)),
));

supervisor.monitor(tokio::task::spawn(config_updates_relay(kiso, logger)));

Expand Down

0 comments on commit 5e45ade

Please sign in to comment.