diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index 428a52ae..6a0dee2e 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -1,13 +1,13 @@ -use std::{sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; use tracing::{debug, error, info, warn}; use elfo_core::{ - message, msg, scope, AnyMessage, Envelope, Message, MoveOwnership, RestartPolicy, - _priv::MessageKind, addr::GroupNo, messages::ConfigUpdated, stream::Stream, RestartParams, - Topology, + message, msg, scope, tracing::TraceId, AnyMessage, Envelope, Message, MoveOwnership, + RestartPolicy, _priv::MessageKind, addr::GroupNo, messages::ConfigUpdated, stream::Stream, + RestartParams, Topology, }; use crate::{ @@ -73,10 +73,12 @@ pub(super) struct Discovery { node_map: Arc, } +// TODO: move control connections to dedicated actors. // TODO: detect duplicate nodes. // TODO: discover tick. // TODO: status of in-progress connections // TODO: launch_id changed. +// TODO: graceful termination. impl Discovery { pub(super) fn new(ctx: NetworkContext, topology: Topology) -> Self { @@ -252,11 +254,13 @@ impl Discovery { ); let node_map = self.node_map.clone(); + let idle_timeout = self.ctx.config().idle_timeout; self.ctx.attach(Stream::once(async move { let info = socket.info.clone(); let peer = socket.peer.clone(); - let result = accept_connection(socket, msg.role, transport, &node_map.this).await; + let result = + accept_connection(socket, msg.role, transport, &node_map.this, idle_timeout).await; match result { Ok(accepted) => Ok(accepted), Err(err) => { @@ -328,8 +332,6 @@ impl Discovery { }), ); }); - - // TODO: start ping-pong process on the socket. } ConnectionRole::Data(remote) => { let local_group_name = self @@ -392,14 +394,13 @@ impl Discovery { fn control_maintenance(&mut self, mut socket: Socket, transport: Option) { self.ctx.attach(Stream::once(async move { - // TODO: graceful termination. let err = control_maintenance(&mut socket).await.unwrap_err(); info!( message = "control connection closed", socket = %socket.info, peer = %socket.peer, - reason = %err, + reason = format!("{:#}", err), // TODO: use `AsRef` ); ControlConnectionFailed { transport } @@ -412,15 +413,16 @@ async fn accept_connection( role: ConnectionRole, transport: Option, this_node: &NodeInfo, + idle_timeout: Duration, ) -> Result { let role = match role { ConnectionRole::Unknown => { - msg!(match recv(&mut socket).await? { + msg!(match recv(&mut socket, idle_timeout).await? { msg @ internode::SwitchToControl => { let my_msg = internode::SwitchToControl { groups: this_node.groups.clone(), }; - send_regular(&mut socket, my_msg).await?; + send_regular(&mut socket, idle_timeout, my_msg).await?; ConnectionRole::Control(msg) } msg @ internode::SwitchToData => { @@ -429,7 +431,7 @@ async fn accept_connection( your_group_no: msg.my_group_no, initial_window: INITIAL_WINDOW_SIZE, }; - send_regular(&mut socket, my_msg).await?; + send_regular(&mut socket, idle_timeout, my_msg).await?; ConnectionRole::Data(msg) } envelope => @@ -440,13 +442,13 @@ async fn accept_connection( }) } ConnectionRole::Control(msg) => { - send_regular(&mut socket, msg).await?; - let msg = recv_regular::(&mut socket).await?; + send_regular(&mut socket, idle_timeout, msg).await?; + let msg = recv_regular::(&mut socket, idle_timeout).await?; ConnectionRole::Control(msg) } ConnectionRole::Data(msg) => { - send_regular(&mut socket, msg).await?; - let msg = recv_regular::(&mut socket).await?; + send_regular(&mut socket, idle_timeout, msg).await?; + let msg = recv_regular::(&mut socket, idle_timeout).await?; ConnectionRole::Data(msg) } }; @@ -459,12 +461,23 @@ async fn accept_connection( } async fn control_maintenance(socket: &mut Socket) -> Result<()> { + // TODO: we should use these values from the config. + // However, prior to it, this code should be rewritten to split logic of sending + // pings and responding to pings. So, for now, we use hardcoded large + // values. It detects dead connections, but not configurable. + // Also, we should reuse `IdleTracker` (used in data connections) here. + let ping_interval = Duration::from_secs(10); + let idle_timeout = Duration::from_secs(120); + + let mut interval = tokio::time::interval(ping_interval); + loop { - send_regular(socket, internode::Ping { payload: 0 }).await?; - recv_regular::(socket).await?; - send_regular(socket, internode::Pong { payload: 0 }).await?; - recv_regular::(socket).await?; - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + interval.tick().await; + scope::set_trace_id(TraceId::generate()); + send_regular(socket, idle_timeout, internode::Ping { payload: 0 }).await?; + recv_regular::(socket, idle_timeout).await?; + send_regular(socket, idle_timeout, internode::Pong { payload: 0 }).await?; + recv_regular::(socket, idle_timeout).await?; } } @@ -479,34 +492,39 @@ fn infer_connections<'a>( }) } -async fn send_regular(socket: &mut Socket, msg: M) -> Result<()> { - let name = msg.name(); +async fn send_regular( + socket: &mut Socket, + idle_timeout: Duration, + message: M, +) -> Result<()> { + let name = message.name(); let envelope = NetworkEnvelope { sender: NetworkAddr::NULL, // doesn't matter recipient: NetworkAddr::NULL, // doesn't matter trace_id: scope::trace_id(), payload: NetworkEnvelopePayload::Regular { - message: AnyMessage::new(msg), + message: AnyMessage::new(message), }, }; let send_future = socket.write.send(&envelope); - send_future + timeout(idle_timeout, send_future) .await .wrap_err_with(|| eyre!("cannot send {}", name)) } -async fn recv(socket: &mut Socket) -> Result { - let envelope = socket - .read - .recv() - .await - .map_err(|e| match e { +async fn recv(socket: &mut Socket, idle_timeout: Duration) -> Result { + let receiving = async { + socket.read.recv().await.map_err(|err| match err { ReadError::EnvelopeSkipped(..) => eyre!("failed to decode message"), ReadError::Fatal(report) => report, }) + }; + + let envelope = timeout(idle_timeout, receiving) + .await .wrap_err("cannot receive a message")? - .ok_or_else(|| eyre!("connection closed before receiving any messages"))?; + .ok_or_else(|| eyre!("connection closed by peer"))?; let message = match envelope.payload { NetworkEnvelopePayload::Regular { message } => message, @@ -522,8 +540,8 @@ async fn recv(socket: &mut Socket) -> Result { )) } -async fn recv_regular(socket: &mut Socket) -> Result { - msg!(match recv(socket).await? { +async fn recv_regular(socket: &mut Socket, idle_timeout: Duration) -> Result { + msg!(match recv(socket, idle_timeout).await? { msg @ M => Ok(msg), envelope => Err(unexpected_message_error( envelope, @@ -539,3 +557,7 @@ fn unexpected_message_error(envelope: Envelope, expected: &[&str]) -> eyre::Repo expected.join(" or "), ) } + +async fn timeout(duration: Duration, fut: impl Future>) -> Result { + tokio::time::timeout(duration, fut).await? +}