From 196bb16aa30c0d8a5af947362db25d5190be6eb8 Mon Sep 17 00:00:00 2001 From: ryardley Date: Thu, 19 Dec 2024 18:01:08 +1100 Subject: [PATCH] Compiling --- .deploy/build.sh | 11 +- .deploy/deploy.sh | 29 ++- .deploy/docker-compose.yml | 22 ++- packages/ciphernode/Cargo.lock | 31 +++ packages/ciphernode/Cargo.toml | 1 + packages/ciphernode/enclave/Cargo.toml | 3 + packages/ciphernode/enclave/src/compile_id.rs | 13 ++ packages/ciphernode/enclave/src/main.rs | 7 +- packages/ciphernode/net/src/bin/p2p_test.rs | 20 +- packages/ciphernode/net/src/correlation_id.rs | 24 +++ packages/ciphernode/net/src/dialer.rs | 182 ++++++++++++++++++ packages/ciphernode/net/src/events.rs | 44 +++++ packages/ciphernode/net/src/lib.rs | 5 + .../ciphernode/net/src/network_manager.rs | 60 ++++-- packages/ciphernode/net/src/network_peer.rs | 140 +++++++++----- packages/ciphernode/net/src/retry.rs | 71 +++++++ 16 files changed, 577 insertions(+), 86 deletions(-) create mode 100644 packages/ciphernode/enclave/src/compile_id.rs create mode 100644 packages/ciphernode/net/src/correlation_id.rs create mode 100644 packages/ciphernode/net/src/dialer.rs create mode 100644 packages/ciphernode/net/src/events.rs create mode 100644 packages/ciphernode/net/src/retry.rs diff --git a/.deploy/build.sh b/.deploy/build.sh index dbde3504..f3bdfbdc 100755 --- a/.deploy/build.sh +++ b/.deploy/build.sh @@ -1,3 +1,12 @@ #!/usr/bin/env bash -docker build -t ghcr.io/gnosisguild/ciphernode:mytest -f ./packages/ciphernode/Dockerfile . +# Enable BuildKit +export DOCKER_BUILDKIT=1 + +mkdir -p /tmp/docker-cache + +time docker buildx build \ + --cache-from=type=local,src=/tmp/docker-cache \ + --cache-to=type=local,dest=/tmp/docker-cache \ + --load \ + -t ghcr.io/gnosisguild/ciphernode -f ./packages/ciphernode/Dockerfile . diff --git a/.deploy/deploy.sh b/.deploy/deploy.sh index 929d9a6c..397c8ec8 100755 --- a/.deploy/deploy.sh +++ b/.deploy/deploy.sh @@ -1,8 +1,27 @@ #!/usr/bin/env bash -if [ ! -f "./.deploy/.env" ]; then - echo "Environment file ./.deploy/.env not found!" - exit 1 -fi +wait_ready() { + local stack_name="$1" + until [ "$(docker stack services $stack_name --format '{{.Replicas}}' | awk -F'/' '$1 != $2')" = "" ]; do + printf "." + sleep 1 + done + echo -ne "\r\033[K" + echo "Stack $stack_name is ready!" +} -docker stack deploy -c .deploy/docker-compose.yml enclave-stack --detach=false +wait_removed() { + local stack_name="$1" + while docker stack ps $stack_name >/dev/null 2>&1; do + printf "." + sleep 1 + done + echo -ne "\r\033[K" + echo "Stack $stack_name is removed" +} + +stack_name=${1:-enclave} +docker stack rm $stack_name +wait_removed $stack_name +docker stack deploy -c docker-compose.yml --prune $stack_name +wait_ready $stack_name diff --git a/.deploy/docker-compose.yml b/.deploy/docker-compose.yml index 7c363abf..3a3d45e2 100644 --- a/.deploy/docker-compose.yml +++ b/.deploy/docker-compose.yml @@ -1,6 +1,6 @@ services: cn1: - image: ghcr.io/gnosisguild/ciphernode:20241210-44aac5b0 + image: ghcr.io/gnosisguild/ciphernode:latest volumes: - ./cn1.yaml:/home/ciphernode/.config/enclave/config.yaml:ro - cn1-data:/home/ciphernode/.local/share/enclave @@ -14,9 +14,11 @@ services: ADDRESS: "0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" QUIC_PORT: 9091 ports: - - "9091:9091" + - "9091:9091/udp" + - "9091:9091/tcp" deploy: replicas: 1 + endpoint_mode: dnsrr update_config: parallelism: 1 order: stop-first @@ -27,7 +29,7 @@ services: - global-network cn2: - image: ghcr.io/gnosisguild/ciphernode:20241210-44aac5b0 + image: ghcr.io/gnosisguild/ciphernode:latest depends_on: - cn1 volumes: @@ -43,7 +45,8 @@ services: ADDRESS: "0xdD2FD4581271e230360230F9337D5c0430Bf44C0" QUIC_PORT: 9092 ports: - - "9092:9092" + - "9092:9092/udp" + - "9092:9092/tcp" deploy: replicas: 1 update_config: @@ -56,7 +59,7 @@ services: - global-network cn3: - image: ghcr.io/gnosisguild/ciphernode:20241210-44aac5b0 + image: ghcr.io/gnosisguild/ciphernode:latest depends_on: - cn1 volumes: @@ -72,7 +75,8 @@ services: ADDRESS: "0x2546BcD3c84621e976D8185a91A922aE77ECEc30" QUIC_PORT: 9093 ports: - - "9093:9093" + - "9093:9093/udp" + - "9093:9093/tcp" deploy: replicas: 1 update_config: @@ -85,7 +89,7 @@ services: - global-network aggregator: - image: ghcr.io/gnosisguild/ciphernode:20241210-44aac5b0 + image: ghcr.io/gnosisguild/ciphernode:latest depends_on: - cn1 volumes: @@ -101,8 +105,8 @@ services: ADDRESS: "0x8626a6940E2eb28930eFb4CeF49B2d1F2C9C1199" QUIC_PORT: 9094 ports: - - "9094:9094" - + - "9094:9094/udp" + - "9094:9094/tcp" deploy: replicas: 1 update_config: diff --git a/packages/ciphernode/Cargo.lock b/packages/ciphernode/Cargo.lock index 0684a617..ffdfb98e 100644 --- a/packages/ciphernode/Cargo.lock +++ b/packages/ciphernode/Cargo.lock @@ -1676,6 +1676,20 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "compile-time" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e55ede5279d4d7c528906853743abeb26353ae1e6c440fcd6d18316c2c2dd903" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "rustc_version 0.4.0", + "semver 1.0.23", + "time", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2164,6 +2178,7 @@ dependencies = [ "anyhow", "cipher 0.1.0", "clap", + "compile-time", "config", "data", "dialoguer", @@ -2172,7 +2187,9 @@ dependencies = [ "enclave_node", "hex", "once_cell", + "petname", "phf", + "rand", "router", "serde", "serde_json", @@ -4583,6 +4600,20 @@ dependencies = [ "indexmap", ] +[[package]] +name = "petname" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd31dcfdbbd7431a807ef4df6edd6473228e94d5c805e8cf671227a21bad068" +dependencies = [ + "anyhow", + "clap", + "itertools 0.13.0", + "proc-macro2", + "quote", + "rand", +] + [[package]] name = "pharos" version = "0.5.3" diff --git a/packages/ciphernode/Cargo.toml b/packages/ciphernode/Cargo.toml index 8ce8f55a..ae75b191 100644 --- a/packages/ciphernode/Cargo.toml +++ b/packages/ciphernode/Cargo.toml @@ -37,6 +37,7 @@ bs58 = "0.5.1" base64 = "0.22.1" clap = { version = "4.5.17", features = ["derive"] } cipher = { path = "./cipher" } +compile-time = "0.2.0" dirs = "5.0.1" data = { path = "./data" } shellexpand = "3.1.0" diff --git a/packages/ciphernode/enclave/Cargo.toml b/packages/ciphernode/enclave/Cargo.toml index d38cf579..cbec2fca 100644 --- a/packages/ciphernode/enclave/Cargo.toml +++ b/packages/ciphernode/enclave/Cargo.toml @@ -28,6 +28,9 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } zeroize = { workspace = true } phf = { version = "0.11", features = ["macros"] } +compile-time = { workspace = true } +rand = { workspace = true } +petname = "2.0.2" [build-dependencies] serde_json = { workspace = true } diff --git a/packages/ciphernode/enclave/src/compile_id.rs b/packages/ciphernode/enclave/src/compile_id.rs new file mode 100644 index 00000000..38b4b9d4 --- /dev/null +++ b/packages/ciphernode/enclave/src/compile_id.rs @@ -0,0 +1,13 @@ +use petname::{Generator, Petnames}; +use rand::rngs::StdRng; +use rand::SeedableRng; + +static COMPILE_ID: u64 = compile_time::unix!(); + +/// Generate a unique compilation ID for the build based on the time of compilation +pub fn generate_id() -> String { + let mut rng = StdRng::seed_from_u64(COMPILE_ID); + Petnames::small() + .generate(&mut rng, 3, "_") + .unwrap_or("default-name".to_owned()) +} diff --git a/packages/ciphernode/enclave/src/main.rs b/packages/ciphernode/enclave/src/main.rs index 808b7629..3b2c4d5d 100644 --- a/packages/ciphernode/enclave/src/main.rs +++ b/packages/ciphernode/enclave/src/main.rs @@ -3,9 +3,11 @@ use clap::Parser; use commands::{aggregator, init, net, password, start, wallet, Commands}; use config::load_config; use enclave_core::{get_tag, set_tag}; -use tracing::instrument; +use tracing::{info, instrument}; use tracing_subscriber::EnvFilter; + pub mod commands; +mod compile_id; const OWO: &str = r#" ___ ___ ___ ___ ___ @@ -85,6 +87,9 @@ pub async fn main() { // .with_env_filter("[app{id=cn4}]=info") // .with_env_filter("[app{id=ag}]=info") .init(); + + info!("COMPILATION ID: '{}'", compile_id::generate_id()); + let cli = Cli::parse(); // Set the tag for all future traces diff --git a/packages/ciphernode/net/src/bin/p2p_test.rs b/packages/ciphernode/net/src/bin/p2p_test.rs index e44f43b8..bea5b098 100644 --- a/packages/ciphernode/net/src/bin/p2p_test.rs +++ b/packages/ciphernode/net/src/bin/p2p_test.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use net::correlation_id::CorrelationId; +use net::events::{NetworkPeerCommand, NetworkPeerEvent}; use net::NetworkPeer; use std::time::Duration; use std::{collections::HashSet, env, process}; @@ -19,7 +21,7 @@ async fn main() -> Result<()> { .with(tracing_subscriber::fmt::layer()) .init(); let name = env::args().nth(1).expect("need name"); - + let topic = "test-topic"; println!("{} starting up", name); let udp_port = env::var("QUIC_PORT") @@ -42,7 +44,7 @@ async fn main() -> Result<()> { // Extract input and outputs let tx = peer.tx(); - let mut rx = peer.rx().unwrap(); + let mut rx = peer.rx(); let router_task = tokio::spawn({ let name = name.clone(); @@ -60,7 +62,12 @@ async fn main() -> Result<()> { // Send our message first println!("{} sending message", name); - tx.send(name.as_bytes().to_vec()).await?; + tx.send(NetworkPeerCommand::GossipPublish { + correlation_id: CorrelationId::new(), + topic: topic.to_string(), + data: name.as_bytes().to_vec(), + }) + .await?; println!("{} message sent", name); let expected: HashSet = vec![ @@ -79,8 +86,8 @@ async fn main() -> Result<()> { // Wrap the message receiving loop in a timeout let receive_result = timeout(Duration::from_secs(10), async { while received != expected { - if let Some(msg) = rx.recv().await { - match String::from_utf8(msg) { + match rx.recv().await? { + NetworkPeerEvent::GossipData(msg) => match String::from_utf8(msg) { Ok(msg) => { if !received.contains(&msg) { println!("{} received '{}'", name, msg); @@ -88,7 +95,8 @@ async fn main() -> Result<()> { } } Err(e) => println!("{} received invalid UTF8: {}", name, e), - } + }, + _ => (), } } Ok::<(), anyhow::Error>(()) diff --git a/packages/ciphernode/net/src/correlation_id.rs b/packages/ciphernode/net/src/correlation_id.rs new file mode 100644 index 00000000..3d14bcab --- /dev/null +++ b/packages/ciphernode/net/src/correlation_id.rs @@ -0,0 +1,24 @@ +use std::{ + fmt::Display, + sync::atomic::{AtomicUsize, Ordering}, +}; + +static NEXT_CORRELATION_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug,Clone)] +pub struct CorrelationId { + id: usize, +} + +impl CorrelationId { + pub fn new() -> Self { + let id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::SeqCst); + Self { id } + } +} + +impl Display for CorrelationId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} diff --git a/packages/ciphernode/net/src/dialer.rs b/packages/ciphernode/net/src/dialer.rs new file mode 100644 index 00000000..7447fe88 --- /dev/null +++ b/packages/ciphernode/net/src/dialer.rs @@ -0,0 +1,182 @@ +use anyhow::Context; +use anyhow::Result; +use futures::future::join_all; +use libp2p::{ + multiaddr::Protocol, + swarm::{dial_opts::DialOpts, ConnectionId, DialError}, + Multiaddr, +}; +use std::net::ToSocketAddrs; +use tokio::sync::{broadcast, mpsc}; +use tracing::error; + +use crate::{ + events::{NetworkPeerCommand, NetworkPeerEvent}, + retry::{retry_with_backoff, to_retry, RetryError, BACKOFF_DELAY, BACKOFF_MAX_RETRIES}, +}; + +async fn dial_multiaddr( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + multiaddr_str: &str, +) -> Result<()> { + let multiaddr = &multiaddr_str.parse()?; + println!("Now dialing in to {}", multiaddr); + retry_with_backoff( + || attempt_connection(cmd_tx, event_tx, multiaddr), + BACKOFF_MAX_RETRIES, + BACKOFF_DELAY, + ) + .await?; + Ok(()) +} + +fn trace_error(r: Result<()>) { + if let Err(err) = r { + error!("{}", err); + } +} + +pub async fn dial_peers( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + peers: &Vec, +) -> Result<()> { + let futures: Vec<_> = peers + .iter() + .map(|addr| dial_multiaddr(cmd_tx, event_tx, addr)) + .collect(); + let results = join_all(futures).await; + results.into_iter().for_each(trace_error); + Ok(()) +} + +async fn attempt_connection( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + multiaddr: &Multiaddr, +) -> Result<(), RetryError> { + let mut event_rx = event_tx.subscribe(); + let multi = get_resolved_multiaddr(multiaddr).map_err(to_retry)?; + let opts: DialOpts = multi.clone().into(); + let dial_connection = opts.connection_id(); + println!("Dialing: '{}' with connection '{}'", multi, dial_connection); + cmd_tx + .send(NetworkPeerCommand::Dial(opts)) + .await + .map_err(to_retry)?; + wait_for_connection(&mut event_rx, dial_connection).await +} + +async fn wait_for_connection( + event_rx: &mut broadcast::Receiver, + dial_connection: ConnectionId, +) -> Result<(), RetryError> { + loop { + match event_rx.recv().await.map_err(to_retry)? { + NetworkPeerEvent::ConnectionEstablished { connection_id } => { + if connection_id == dial_connection { + println!("Connection Established"); + return Ok(()); + } + } + NetworkPeerEvent::DialError { error } => { + println!("DialError!"); + return match error.as_ref() { + // If we are dialing ourself then we should just fail + DialError::NoAddresses { .. } => { + println!("DialError received. Returning RetryError::Failure"); + Err(RetryError::Failure(error.clone().into())) + } + // Try again otherwise + _ => Err(RetryError::Retry(error.clone().into())), + }; + } + NetworkPeerEvent::OutgoingConnectionError { + connection_id, + error, + } => { + println!("OutgoingConnectionError!"); + if connection_id == dial_connection { + println!( + "Connection {} failed because of error {}. Retrying...", + connection_id, error + ); + return match error.as_ref() { + // If we are dialing ourself then we should just fail + DialError::NoAddresses { .. } => { + Err(RetryError::Failure(error.clone().into())) + } + // Try again otherwise + _ => Err(RetryError::Retry(error.clone().into())), + }; + } + } + _ => (), + } + } +} + +fn dns_to_ip_addr(original: &Multiaddr, ip_str: &str) -> Result { + let ip = ip_str.parse()?; + let mut new_addr = Multiaddr::empty(); + let mut skip_next = false; + + for proto in original.iter() { + if skip_next { + skip_next = false; + continue; + } + + match proto { + Protocol::Dns4(_) | Protocol::Dns6(_) => { + new_addr.push(Protocol::Ip4(ip)); + skip_next = false; + } + _ => new_addr.push(proto), + } + } + + Ok(new_addr) +} + +fn extract_dns_host(addr: &Multiaddr) -> Option { + // Iterate through the protocols in the multiaddr + for proto in addr.iter() { + match proto { + // Match on DNS4 or DNS6 protocols + Protocol::Dns4(hostname) | Protocol::Dns6(hostname) => { + return Some(hostname.to_string()) + } + _ => continue, + } + } + None +} + +fn get_resolved_multiaddr(value: &Multiaddr) -> Result { + let maybe_domain = extract_dns_host(value); + if let Some(domain) = maybe_domain { + let ip = resolve_ipv4(&domain)?; + let multi = dns_to_ip_addr(value, &ip)?; + return Ok(multi); + } else { + Ok(value.clone()) + } +} + +fn resolve_ipv4(domain: &str) -> Result { + let addr = format!("{}:0", domain) + .to_socket_addrs()? + .find(|addr| addr.ip().is_ipv4()) + .context("no IPv4 addresses found")?; + Ok(addr.ip().to_string()) +} + +fn resolve_ipv6(domain: &str) -> Result { + let addr = format!("{}:0", domain) + .to_socket_addrs()? + .find(|addr| addr.ip().is_ipv6()) + .context("no IPv6 addresses found")?; + Ok(addr.ip().to_string()) +} diff --git a/packages/ciphernode/net/src/events.rs b/packages/ciphernode/net/src/events.rs new file mode 100644 index 00000000..7719279e --- /dev/null +++ b/packages/ciphernode/net/src/events.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use actix::Message; +use libp2p::{ + gossipsub::{MessageId, PublishError}, + swarm::{dial_opts::DialOpts, ConnectionId, DialError}, +}; + +use crate::correlation_id::CorrelationId; + +pub enum NetworkPeerCommand { + GossipPublish { + topic: String, + data: Vec, + correlation_id: CorrelationId, + }, + Dial(DialOpts), +} + +#[derive(Message, Clone, Debug)] +#[rtype(result = "anyhow::Result<()>")] +pub enum NetworkPeerEvent { + GossipData(Vec), + GossipPublishError { + // TODO: return an error here? DialError is not Clonable so we have + // avoided passing it on + correlation_id: CorrelationId, + error: Arc, + }, + GossipPublished { + correlation_id: CorrelationId, + message_id: MessageId, + }, + DialError { + error: Arc, + }, + ConnectionEstablished { + connection_id: ConnectionId, + }, + OutgoingConnectionError { + connection_id: ConnectionId, + error: Arc, + }, +} diff --git a/packages/ciphernode/net/src/lib.rs b/packages/ciphernode/net/src/lib.rs index 42f3da11..394055bf 100644 --- a/packages/ciphernode/net/src/lib.rs +++ b/packages/ciphernode/net/src/lib.rs @@ -3,6 +3,11 @@ mod network_manager; mod network_peer; +mod dialer; +pub mod events; +mod retry; +pub mod correlation_id; pub use network_manager::*; pub use network_peer::*; + diff --git a/packages/ciphernode/net/src/network_manager.rs b/packages/ciphernode/net/src/network_manager.rs index 8969c908..bd2b4f87 100644 --- a/packages/ciphernode/net/src/network_manager.rs +++ b/packages/ciphernode/net/src/network_manager.rs @@ -1,8 +1,10 @@ +use crate::correlation_id::CorrelationId; +use crate::events::NetworkPeerCommand; +use crate::events::NetworkPeerEvent; use crate::NetworkPeer; /// Actor for connecting to an libp2p client via it's mpsc channel interface /// This Actor should be responsible for use actix::prelude::*; -use anyhow::anyhow; use anyhow::Result; use cipher::Cipher; use data::Repository; @@ -10,15 +12,18 @@ use enclave_core::{EnclaveEvent, EventBus, EventId, Subscribe}; use libp2p::identity::ed25519; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::select; +use tokio::sync::broadcast; +use tokio::sync::mpsc; use tracing::{error, info, instrument, trace}; /// NetworkManager Actor converts between EventBus events and Libp2p events forwarding them to a /// NetworkPeer for propagation over the p2p network pub struct NetworkManager { bus: Addr, - tx: Sender>, + tx: mpsc::Sender, sent_events: HashSet, + topic: String, } impl Actor for NetworkManager { @@ -31,20 +36,22 @@ struct LibP2pEvent(pub Vec); impl NetworkManager { /// Create a new NetworkManager actor - pub fn new(bus: Addr, tx: Sender>) -> Self { + pub fn new(bus: Addr, tx: mpsc::Sender, topic: &str) -> Self { Self { bus, tx, sent_events: HashSet::new(), + topic: topic.to_string(), } } pub fn setup( bus: Addr, - tx: Sender>, - mut rx: Receiver>, + tx: mpsc::Sender, + mut rx: broadcast::Receiver, + topic: &str, ) -> Addr { - let addr = NetworkManager::new(bus.clone(), tx).start(); + let addr = NetworkManager::new(bus.clone(), tx, topic).start(); // Listen on all events bus.do_send(Subscribe { @@ -54,10 +61,18 @@ impl NetworkManager { tokio::spawn({ let addr = addr.clone(); - async move { - while let Some(msg) = rx.recv().await { - addr.do_send(LibP2pEvent(msg)) + loop { + select! { + Ok(event) = rx.recv() => { + match event { + NetworkPeerEvent::GossipData(data) => { + addr.do_send(LibP2pEvent(data)) + }, + _ => () + } + } + } } } }); @@ -75,6 +90,7 @@ impl NetworkManager { enable_mdns: bool, repository: Repository>, ) -> Result<(Addr, tokio::task::JoinHandle>, String)> { + let topic = "tmp-enclave-gossip-topic"; info!("Reading from repository"); let mut bytes = if let Some(bytes) = repository.read().await? { let decrypted = cipher.decrypt_data(&bytes)?; @@ -94,15 +110,9 @@ impl NetworkManager { let ed25519_keypair = ed25519::Keypair::try_from_bytes(&mut bytes)?; let keypair: libp2p::identity::Keypair = ed25519_keypair.try_into()?; - let mut peer = NetworkPeer::new( - &keypair, - peers, - Some(quic_port), - "tmp-enclave-gossip-topic", - enable_mdns, - )?; - let rx = peer.rx().ok_or(anyhow!("Peer rx already taken"))?; - let p2p_addr = NetworkManager::setup(bus, peer.tx(), rx); + let mut peer = NetworkPeer::new(&keypair, peers, Some(quic_port), topic, enable_mdns)?; + let rx = peer.rx(); + let p2p_addr = NetworkManager::setup(bus, peer.tx(), rx, topic); let handle = tokio::spawn(async move { Ok(peer.start().await?) }); Ok((p2p_addr, handle, keypair.public().to_peer_id().to_string())) } @@ -129,6 +139,7 @@ impl Handler for NetworkManager { let sent_events = self.sent_events.clone(); let tx = self.tx.clone(); let evt = event.clone(); + let topic = self.topic.clone(); Box::pin(async move { let id: EventId = evt.clone().into(); @@ -145,8 +156,15 @@ impl Handler for NetworkManager { } match evt.to_bytes() { - Ok(bytes) => { - if let Err(e) = tx.send(bytes).await { + Ok(data) => { + if let Err(e) = tx + .send(NetworkPeerCommand::GossipPublish { + topic, + data, + correlation_id: CorrelationId::new(), + }) + .await + { error!(error=?e, "Error sending bytes to libp2p"); }; } diff --git a/packages/ciphernode/net/src/network_peer.rs b/packages/ciphernode/net/src/network_peer.rs index 17151076..00366439 100644 --- a/packages/ciphernode/net/src/network_peer.rs +++ b/packages/ciphernode/net/src/network_peer.rs @@ -8,15 +8,17 @@ use libp2p::{ kad::{store::MemoryStore, Behaviour as KademliaBehaviour}, mdns, swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent}, - Multiaddr, Swarm, + Swarm, }; use std::hash::{Hash, Hasher}; +use std::sync::Arc; use std::{hash::DefaultHasher, io::Error, time::Duration}; -use tokio::{ - select, - sync::mpsc::{channel, Receiver, Sender}, -}; -use tracing::{debug, error, info, trace, warn}; +use tokio::{select, sync::broadcast, sync::mpsc}; +use tracing::{debug, info, trace, warn}; + +use crate::dialer::dial_peers; +use crate::events::NetworkPeerCommand; +use crate::events::NetworkPeerEvent; #[derive(NetworkBehaviour)] pub struct NodeBehaviour { @@ -32,10 +34,9 @@ pub struct NetworkPeer { peers: Vec, udp_port: Option, topic: gossipsub::IdentTopic, - to_bus_tx: Sender>, // to event bus - from_net_rx: Option>>, // from network - to_net_tx: Sender>, // to network - from_bus_rx: Receiver>, // from event bus + event_tx: broadcast::Sender, // to event bus + cmd_tx: mpsc::Sender, // to network + cmd_rx: mpsc::Receiver, // from event bus } impl NetworkPeer { @@ -46,14 +47,13 @@ impl NetworkPeer { topic: &str, enable_mdns: bool, ) -> Result { - let (to_bus_tx, from_net_rx) = channel(100); // TODO : tune this param - let (to_net_tx, from_bus_rx) = channel(100); // TODO : tune this param + let (event_tx, _) = broadcast::channel(100); // TODO : tune this param + let (cmd_tx, cmd_rx) = mpsc::channel(100); // TODO : tune this param let swarm = libp2p::SwarmBuilder::with_existing_identity(id.clone()) .with_tokio() .with_quic() .with_behaviour(|key| create_mdns_kad_behaviour(enable_mdns, key))? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); // TODO: Use topics to manage network traffic instead of just using a single topic @@ -64,52 +64,84 @@ impl NetworkPeer { peers, udp_port, topic, - to_bus_tx, - from_net_rx: Some(from_net_rx), - to_net_tx, - from_bus_rx, + event_tx, + cmd_tx, + cmd_rx, }) } - pub fn rx(&mut self) -> Option>> { - self.from_net_rx.take() + pub fn rx(&mut self) -> broadcast::Receiver { + self.event_tx.subscribe() } - pub fn tx(&self) -> Sender> { - self.to_net_tx.clone() + pub fn tx(&self) -> mpsc::Sender { + self.cmd_tx.clone() } pub async fn start(&mut self) -> Result<()> { - let addr = match self.udp_port { - Some(port) => format!("/ip4/0.0.0.0/udp/{}/quic-v1", port), - None => "/ip4/0.0.0.0/udp/0/quic-v1".to_string(), - }; - info!("Requesting node.listen_on('{}')", addr); + let event_tx = self.event_tx.clone(); + let cmd_tx = self.cmd_tx.clone(); + let cmd_rx = &mut self.cmd_rx; + // Subscribe to topic self.swarm .behaviour_mut() .gossipsub .subscribe(&self.topic)?; + + // Listen on the quic port + let addr = match self.udp_port { + Some(port) => format!("/ip4/0.0.0.0/udp/{}/quic-v1", port), + None => "/ip4/0.0.0.0/udp/0/quic-v1".to_string(), + }; + + info!("Requesting node.listen_on('{}')", addr); self.swarm.listen_on(addr.parse()?)?; info!("Peers to dial: {:?}", self.peers); - for addr in self.peers.clone() { - let multiaddr: Multiaddr = addr.parse()?; - self.swarm.dial(multiaddr)?; - } + tokio::spawn({ + let event_tx = event_tx.clone(); + let peers = self.peers.clone(); + async move { + dial_peers(&cmd_tx, &event_tx, &peers).await?; + + return anyhow::Ok(()); + } + }); loop { select! { - Some(line) = self.from_bus_rx.recv() => { - if let Err(e) = self.swarm - .behaviour_mut().gossipsub - .publish(self.topic.clone(), line) { - error!(error=?e, "Error publishing line to swarm"); + // Process commands + Some(command) = cmd_rx.recv() => { + match command { + NetworkPeerCommand::GossipPublish { data, topic, correlation_id } => { + let gossipsub_behaviour = &mut self.swarm.behaviour_mut().gossipsub; + match gossipsub_behaviour + .publish(gossipsub::IdentTopic::new(topic), data) { + Ok(message_id) => { + event_tx.send(NetworkPeerEvent::GossipPublished { correlation_id, message_id })?; + }, + Err(e) => { + warn!(error=?e, "Could not publish to swarm. Retrying..."); + event_tx.send(NetworkPeerEvent::GossipPublishError { correlation_id, error: Arc::new(e) })?; + } + } + }, + NetworkPeerCommand::Dial(multi) => { + println!("DIAL: {:?}", multi); + match self.swarm.dial(multi) { + Ok(v) => println!("Dial returned {:?}", v), + Err(error) => { + println!("Dialing error! {}", error); + event_tx.send(NetworkPeerEvent::DialError { error: error.into() })?; + } + } + } } } - + // Process events event = self.swarm.select_next_some() => { - process_swarm_event(&mut self.swarm, &mut self.to_bus_tx, event).await? + process_swarm_event(&mut self.swarm, &event_tx, event).await? } } } @@ -167,16 +199,39 @@ fn create_mdns_kad_behaviour( async fn process_swarm_event( swarm: &mut Swarm, - to_bus_tx: &mut Sender>, + event_tx: &broadcast::Sender, event: SwarmEvent, ) -> Result<()> { match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + connection_id, + .. + } => { info!("Connected to {peer_id}"); + let remote_addr = endpoint.get_remote_address().clone(); + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, remote_addr.clone()); + + info!("Added address to kademlia {}", remote_addr); + swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + info!("Added peer to gossipsub {}", remote_addr); + event_tx.send(NetworkPeerEvent::ConnectionEstablished { connection_id })?; } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - warn!("Failed to dial {peer_id:?}: {error}"); + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + info!("Failed to dial {peer_id:?}: {error}"); + event_tx.send(NetworkPeerEvent::OutgoingConnectionError { + connection_id, + error: Arc::new(error), + })?; } SwarmEvent::IncomingConnectionError { error, .. } => { @@ -210,8 +265,7 @@ async fn process_swarm_event( message, })) => { trace!("Got message with id: {id} from peer: {peer_id}",); - trace!("{:?}", message); - to_bus_tx.send(message.data).await?; + event_tx.send(NetworkPeerEvent::GossipData(message.data))?; } SwarmEvent::NewListenAddr { address, .. } => { warn!("Local node is listening on {address}"); diff --git a/packages/ciphernode/net/src/retry.rs b/packages/ciphernode/net/src/retry.rs new file mode 100644 index 00000000..55ab151e --- /dev/null +++ b/packages/ciphernode/net/src/retry.rs @@ -0,0 +1,71 @@ +use std::{future::Future, time::Duration}; +use anyhow::Result; +use tokio::time::sleep; + +pub enum RetryError { + Failure(anyhow::Error), + Retry(anyhow::Error), +} + +pub fn to_retry(e: impl Into) -> RetryError { + RetryError::Retry(e.into()) +} + +pub const BACKOFF_DELAY: u64 = 500; +pub const BACKOFF_MAX_RETRIES: u32 = 10; + +/// Retries an async operation with exponential backoff +/// +/// # Arguments +/// * `operation` - Async function to retry +/// * `max_attempts` - Maximum number of retry attempts +/// * `initial_delay_ms` - Initial delay between retries in milliseconds +/// +/// # Returns +/// * `Result<()>` - Ok if the operation succeeded, Err if all retries failed +pub async fn retry_with_backoff( + operation: F, + max_attempts: u32, + initial_delay_ms: u64, +) -> Result<()> +where + F: Fn() -> Fut, + Fut: Future>, +{ + let mut current_attempt = 1; + let mut delay_ms = initial_delay_ms; + + loop { + match operation().await { + Ok(_) => return Ok(()), + Err(re) => { + match re { + RetryError::Retry(e) => { + if current_attempt >= max_attempts { + return Err(anyhow::anyhow!( + "Operation failed after {} attempts. Last error: {}", + max_attempts, + e + )); + } + + println!( + "Attempt {}/{} failed, retrying in {}ms: {}", + current_attempt, max_attempts, delay_ms, e + ); + + sleep(Duration::from_millis(delay_ms)).await; + current_attempt += 1; + delay_ms *= 2; // Exponential backoff + } + RetryError::Failure(e) => { + println!("FAILURE!: returning to caller."); + return Err(e); + } + } + } + } + } +} + +