Skip to content

Commit

Permalink
feat(network): use idle_timeout also for control connections
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 21, 2024
1 parent 6afe445 commit 9c647f3
Showing 1 changed file with 56 additions and 34 deletions.
90 changes: 56 additions & 34 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{sync::Arc, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};

use eyre::{bail, eyre, Result, WrapErr};
use futures::StreamExt;
use tracing::{debug, error, info, warn};

use elfo_core::{
message, msg, scope, AnyMessage, Envelope, Message, MoveOwnership, RestartPolicy,
_priv::MessageKind, addr::GroupNo, messages::ConfigUpdated, stream::Stream, RestartParams,
Topology,
message, msg, scope, tracing::TraceId, AnyMessage, Envelope, Message, MoveOwnership,
RestartPolicy, _priv::MessageKind, addr::GroupNo, messages::ConfigUpdated, stream::Stream,
RestartParams, Topology,
};

use crate::{
Expand Down Expand Up @@ -73,10 +73,12 @@ pub(super) struct Discovery {
node_map: Arc<NodeMap>,
}

// TODO: move control connections to dedicated actors.
// TODO: detect duplicate nodes.
// TODO: discover tick.
// TODO: status of in-progress connections
// TODO: launch_id changed.
// TODO: graceful termination.

impl Discovery {
pub(super) fn new(ctx: NetworkContext, topology: Topology) -> Self {
Expand Down Expand Up @@ -252,11 +254,13 @@ impl Discovery {
);

let node_map = self.node_map.clone();
let idle_timeout = self.ctx.config().idle_timeout;
self.ctx.attach(Stream::once(async move {
let info = socket.info.clone();
let peer = socket.peer.clone();

let result = accept_connection(socket, msg.role, transport, &node_map.this).await;
let result =
accept_connection(socket, msg.role, transport, &node_map.this, idle_timeout).await;
match result {
Ok(accepted) => Ok(accepted),
Err(err) => {
Expand Down Expand Up @@ -328,8 +332,6 @@ impl Discovery {
}),
);
});

// TODO: start ping-pong process on the socket.
}
ConnectionRole::Data(remote) => {
let local_group_name = self
Expand Down Expand Up @@ -392,14 +394,13 @@ impl Discovery {

fn control_maintenance(&mut self, mut socket: Socket, transport: Option<Transport>) {
self.ctx.attach(Stream::once(async move {
// TODO: graceful termination.
let err = control_maintenance(&mut socket).await.unwrap_err();

info!(
message = "control connection closed",
socket = %socket.info,
peer = %socket.peer,
reason = %err,
reason = format!("{:#}", err), // TODO: use `AsRef<dyn Error>`
);

ControlConnectionFailed { transport }
Expand All @@ -412,15 +413,16 @@ async fn accept_connection(
role: ConnectionRole,
transport: Option<Transport>,
this_node: &NodeInfo,
idle_timeout: Duration,
) -> Result<ConnectionAccepted> {
let role = match role {
ConnectionRole::Unknown => {
msg!(match recv(&mut socket).await? {
msg!(match recv(&mut socket, idle_timeout).await? {
msg @ internode::SwitchToControl => {
let my_msg = internode::SwitchToControl {
groups: this_node.groups.clone(),
};
send_regular(&mut socket, my_msg).await?;
send_regular(&mut socket, idle_timeout, my_msg).await?;
ConnectionRole::Control(msg)
}
msg @ internode::SwitchToData => {
Expand All @@ -429,7 +431,7 @@ async fn accept_connection(
your_group_no: msg.my_group_no,
initial_window: INITIAL_WINDOW_SIZE,
};
send_regular(&mut socket, my_msg).await?;
send_regular(&mut socket, idle_timeout, my_msg).await?;
ConnectionRole::Data(msg)
}
envelope =>
Expand All @@ -440,13 +442,13 @@ async fn accept_connection(
})
}
ConnectionRole::Control(msg) => {
send_regular(&mut socket, msg).await?;
let msg = recv_regular::<internode::SwitchToControl>(&mut socket).await?;
send_regular(&mut socket, idle_timeout, msg).await?;
let msg = recv_regular::<internode::SwitchToControl>(&mut socket, idle_timeout).await?;
ConnectionRole::Control(msg)
}
ConnectionRole::Data(msg) => {
send_regular(&mut socket, msg).await?;
let msg = recv_regular::<internode::SwitchToData>(&mut socket).await?;
send_regular(&mut socket, idle_timeout, msg).await?;
let msg = recv_regular::<internode::SwitchToData>(&mut socket, idle_timeout).await?;
ConnectionRole::Data(msg)
}
};
Expand All @@ -459,12 +461,23 @@ async fn accept_connection(
}

async fn control_maintenance(socket: &mut Socket) -> Result<()> {
// TODO: we should use these values from the config.
// However, prior to it, this code should be rewritten to split logic of sending
// pings and responding to pings. So, for now, we use hardcoded large
// values. It detects dead connections, but not configurable.
// Also, we should reuse `IdleTracker` (used in data connections) here.
let ping_interval = Duration::from_secs(10);
let idle_timeout = Duration::from_secs(120);

let mut interval = tokio::time::interval(ping_interval);

loop {
send_regular(socket, internode::Ping { payload: 0 }).await?;
recv_regular::<internode::Ping>(socket).await?;
send_regular(socket, internode::Pong { payload: 0 }).await?;
recv_regular::<internode::Pong>(socket).await?;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
interval.tick().await;
scope::set_trace_id(TraceId::generate());
send_regular(socket, idle_timeout, internode::Ping { payload: 0 }).await?;
recv_regular::<internode::Ping>(socket, idle_timeout).await?;
send_regular(socket, idle_timeout, internode::Pong { payload: 0 }).await?;
recv_regular::<internode::Pong>(socket, idle_timeout).await?;
}
}

Expand All @@ -479,34 +492,39 @@ fn infer_connections<'a>(
})
}

async fn send_regular<M: Message>(socket: &mut Socket, msg: M) -> Result<()> {
let name = msg.name();
async fn send_regular<M: Message>(
socket: &mut Socket,
idle_timeout: Duration,
message: M,
) -> Result<()> {
let name = message.name();
let envelope = NetworkEnvelope {
sender: NetworkAddr::NULL, // doesn't matter
recipient: NetworkAddr::NULL, // doesn't matter
trace_id: scope::trace_id(),
payload: NetworkEnvelopePayload::Regular {
message: AnyMessage::new(msg),
message: AnyMessage::new(message),
},
};

let send_future = socket.write.send(&envelope);
send_future
timeout(idle_timeout, send_future)
.await
.wrap_err_with(|| eyre!("cannot send {}", name))
}

async fn recv(socket: &mut Socket) -> Result<Envelope> {
let envelope = socket
.read
.recv()
.await
.map_err(|e| match e {
async fn recv(socket: &mut Socket, idle_timeout: Duration) -> Result<Envelope> {
let receiving = async {
socket.read.recv().await.map_err(|err| match err {
ReadError::EnvelopeSkipped(..) => eyre!("failed to decode message"),
ReadError::Fatal(report) => report,
})
};

let envelope = timeout(idle_timeout, receiving)
.await
.wrap_err("cannot receive a message")?
.ok_or_else(|| eyre!("connection closed before receiving any messages"))?;
.ok_or_else(|| eyre!("connection closed by peer"))?;

let message = match envelope.payload {
NetworkEnvelopePayload::Regular { message } => message,
Expand All @@ -522,8 +540,8 @@ async fn recv(socket: &mut Socket) -> Result<Envelope> {
))
}

async fn recv_regular<M: Message>(socket: &mut Socket) -> Result<M> {
msg!(match recv(socket).await? {
async fn recv_regular<M: Message>(socket: &mut Socket, idle_timeout: Duration) -> Result<M> {
msg!(match recv(socket, idle_timeout).await? {
msg @ M => Ok(msg),
envelope => Err(unexpected_message_error(
envelope,
Expand All @@ -539,3 +557,7 @@ fn unexpected_message_error(envelope: Envelope, expected: &[&str]) -> eyre::Repo
expected.join(" or "),
)
}

async fn timeout<T>(duration: Duration, fut: impl Future<Output = Result<T>>) -> Result<T> {
tokio::time::timeout(duration, fut).await?
}

0 comments on commit 9c647f3

Please sign in to comment.