Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turmoil integration #137

Merged
merged 3 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- core/config: impl `FromStr` for `Secret` ([#135]).
- core/init: emit the `elfo_start_time_seconds` metric.
- core/actor: implement `Display` for `ActorMeta` ([#74]).
- network: add `idle_timeout` to detect and disconnect stuck connections ([#137]).
- network: add the `turmoil` transport for testing distributed actors ([#137]).

### Changed
- **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`.
Expand All @@ -27,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#74]: https://github.com/elfo-rs/elfo/issues/74
[#135]: https://github.com/elfo-rs/elfo/pull/135
[#136]: https://github.com/elfo-rs/elfo/pull/136
[#137]: https://github.com/elfo-rs/elfo/pull/137

## [0.2.0-alpha.16] - 2024-07-24
### Added
Expand Down
5 changes: 4 additions & 1 deletion elfo-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ rust-version.workspace = true
[lints]
workspace = true

[features]
turmoil06 = ["dep:turmoil06"]

[dependencies]
elfo-core = { version = "0.2.0-alpha.16", path = "../elfo-core", features = ["unstable", "network"] }
elfo-utils = { version = "0.2.6", path = "../elfo-utils" }
Expand All @@ -27,14 +30,14 @@ eyre = "0.6.8"
fxhash = "0.2.1"
futures = "0.3.21"
tokio = { workspace = true, features = ["net", "io-util"] }
tokio-util = "0.7"
tracing = "0.1.25"
parking_lot = "0.12"
humantime-serde = "1"
kanal = "0.1.0-pre8"
bitflags = "2.3.2"
lz4_flex = { version = "0.11.1", default-features = false, features = ["std"] }
byteorder = "1.4.3"
turmoil06 = { package = "turmoil", version = "0.6", optional = true }

[dev-dependencies]
tracing-test = "0.2.4" # TODO: actually unused?
47 changes: 43 additions & 4 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,33 @@ use serde::{
#[derive(Debug, Deserialize)]
pub struct Config {
/// A list of addresses to listen on.
#[serde(default)]
pub listen: Vec<Transport>,
/// How often nodes should ping each other.
/// `5s` by default
#[serde(with = "humantime_serde", default = "default_ping_interval")]
pub ping_interval: Duration,
/// How to discover other nodes.
#[serde(default)]
pub discovery: DiscoveryConfig, // TODO: optional?
/// Compression settings.
#[serde(default)]
pub compression: CompressionConfig,
/// How often nodes should ping each other.
///
/// Pings are used to measure RTT and detect dead connections.
/// For the latest purpose, see `idle_timeout`.
///
/// `5s` by default.
#[serde(with = "humantime_serde", default = "default_ping_interval")]
pub ping_interval: Duration,
/// The maximum inactivity time of every connection.
///
/// If no data is received on a connection for over `idle_timeout` time,
/// the connection is considered dead and will be automatically closed.
///
/// This timeout is checked every `ping_interval` time, so the actual time
/// lies in the range of `idle_timeout` to `idle_timeout + ping_interval`.
///
/// `30s` by default.
#[serde(with = "humantime_serde", default = "default_idle_timeout")]
pub idle_timeout: Duration,
}

/// Compression settings.
Expand All @@ -64,6 +80,10 @@ fn default_ping_interval() -> Duration {
Duration::from_secs(5)
}

fn default_idle_timeout() -> Duration {
Duration::from_secs(30)
}

/// How to discover other nodes.
#[derive(Debug, Deserialize, Default)]
pub struct DiscoveryConfig {
Expand All @@ -90,6 +110,12 @@ pub enum Transport {
#[cfg(unix)]
#[display("uds://{}", "_0.display()")]
Uds(PathBuf),
/// Turmoil v0.6 transport ("turmoil06://host").
///
/// Useful for testing purposes only.
#[cfg(feature = "turmoil06")]
#[display("turmoil06://{_0}")]
Turmoil06(String),
}

impl FromStr for Transport {
Expand All @@ -114,6 +140,8 @@ impl FromStr for Transport {
);
Ok(Transport::Uds(PathBuf::from(addr)))
}
#[cfg(feature = "turmoil06")]
"turmoil06" => Ok(Transport::Turmoil06(addr.into())),
proto => bail!("unknown protocol: {proto}"),
}
}
Expand Down Expand Up @@ -164,6 +192,10 @@ mod tests {
Transport::from_str("tcp://127.0.0.1:4242").unwrap(),
Transport::Tcp("127.0.0.1:4242".into())
);
assert_eq!(
Transport::from_str("tcp://alice:4242").unwrap(),
Transport::Tcp("alice:4242".into())
);

// UDS
#[cfg(unix)]
Expand All @@ -181,5 +213,12 @@ mod tests {
"path to UDS socket cannot be directory"
);
}

// Turmoil06
#[cfg(feature = "turmoil06")]
assert_eq!(
Transport::from_str("turmoil06://alice").unwrap(),
Transport::Turmoil06("alice".into())
);
}
}
62 changes: 62 additions & 0 deletions elfo-network/src/socket/idleness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use tokio::time::Instant;

// === IdleTracker ===

/// Measures the time since the last activity on a socket.
/// It's used to implement health checks based on idle timeout.
pub(crate) struct IdleTracker {
track: IdleTrack,
prev_value: u32,
prev_time: Instant,
}

impl IdleTracker {
pub(super) fn new() -> (Self, IdleTrack) {
let track = IdleTrack(<_>::default());
let this = Self {
track: track.clone(),
prev_value: track.get(),
prev_time: Instant::now(),
};

(this, track)
}

/// Returns the elapsed time since the last `check()` call
/// that observed any [`IdleTrack::update()`] calls.
pub(crate) fn check(&mut self) -> Duration {
let now = Instant::now();
let new_value = self.track.get();

if self.prev_value != new_value {
self.prev_value = new_value;
self.prev_time = now;
}

now.duration_since(self.prev_time)
}
}

// === IdleTrack ===

#[derive(Clone)]
pub(super) struct IdleTrack(Arc<AtomicU32>);

impl IdleTrack {
/// Marks this socket as non-idle.
pub(super) fn update(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}

fn get(&self) -> u32 {
self.0.load(Ordering::Relaxed)
}
}
31 changes: 25 additions & 6 deletions elfo-network/src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::{future::Future, time::Duration};

use derive_more::{Constructor, Display};
use eyre::{eyre, Result, WrapErr};
use futures::StreamExt;
use futures::{stream::BoxStream, StreamExt};
use metrics::counter;
use tokio::io;
use tracing::{trace, warn};

use elfo_core::addr::{NodeLaunchId, NodeNo};
use elfo_utils::likely;

use self::idleness::{IdleTrack, IdleTracker};
use crate::{
codec::{decode::EnvelopeDetails, encode::EncodeError, format::NetworkEnvelope},
config::Transport,
Expand All @@ -20,6 +21,7 @@ use crate::{
};

mod handshake;
mod idleness;
mod raw;

bitflags::bitflags! {
Expand All @@ -34,6 +36,7 @@ pub(crate) struct Socket {
pub(crate) peer: Peer,
pub(crate) read: ReadHalf,
pub(crate) write: WriteHalf,
pub(crate) idle: IdleTracker,
}

#[derive(Display, Clone, Constructor)]
Expand All @@ -53,18 +56,22 @@ impl Socket {
(FramedRead::none(), FramedWrite::none(None))
};

let (idle_tracker, idle_track) = IdleTracker::new();

Self {
info: raw.info,
peer: Peer::new(handshake.node_no, handshake.launch_id),
read: ReadHalf::new(framed_read, raw.read),
read: ReadHalf::new(framed_read, raw.read, idle_track),
write: WriteHalf::new(framed_write, raw.write),
idle: idle_tracker,
}
}
}

pub(crate) struct ReadHalf {
framing: FramedRead,
read: raw::OwnedReadHalf,
idle: IdleTrack,
}

#[derive(Debug)]
Expand All @@ -83,8 +90,12 @@ where
}

impl ReadHalf {
fn new(framing: FramedRead, read: raw::OwnedReadHalf) -> Self {
Self { framing, read }
fn new(framing: FramedRead, read: raw::OwnedReadHalf, idle: IdleTrack) -> Self {
Self {
framing,
read,
idle,
}
}

fn report_framing_metrics(&mut self) {
Expand All @@ -104,13 +115,15 @@ impl ReadHalf {
let envelope = loop {
let buffer = match self.framing.read()? {
FramedReadState::NeedMoreData { buffer } => {
trace!(message = "framed read strategy requested more data");
trace!("framed read strategy requested more data");
buffer
}
FramedReadState::EnvelopeSkipped(details) => {
self.idle.update();
return Err(ReadError::EnvelopeSkipped(details));
}
FramedReadState::Done { decoded } => {
self.idle.update();
let (protocol, name) = decoded.payload.protocol_and_name();
trace!(
message = "framed read strategy decoded single envelope",
Expand All @@ -123,6 +136,12 @@ impl ReadHalf {
};

let bytes_read = io::AsyncReadExt::read(&mut self.read, buffer).await?;

// Large messages cannot be read in a single `read()` call, so we should
// additionally update the idle tracker even without waiting until the
// message is fully decoded to prevent false positive disconnects.
self.idle.update();

if bytes_read == 0 {
// EOF.
return Ok(None);
Expand Down Expand Up @@ -239,7 +258,7 @@ pub(crate) async fn listen(
node_no: NodeNo,
launch_id: NodeLaunchId,
capabilities: Capabilities,
) -> Result<futures::stream::BoxStream<'static, Socket>> {
) -> Result<BoxStream<'static, Socket>> {
let stream = timeout(LISTEN_TIMEOUT, raw::listen(addr)).await?;
let stream = stream
.map(move |mut raw_socket| async move {
Expand Down
Loading