diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 3ea70ef2b7..33ce2ec1e7 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -78,7 +78,7 @@ impl SyncEngine { endpoint.clone(), gossip.clone(), bao_store, - downloader.clone(), + downloader, to_live_actor_recv, live_actor_tx.clone(), to_gossip_actor, @@ -87,7 +87,6 @@ impl SyncEngine { to_gossip_actor_recv, sync.clone(), gossip, - downloader, live_actor_tx.clone(), ); let live_actor_task = tokio::task::spawn( diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index 8912887a9f..7835c28232 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -15,7 +15,6 @@ use tokio::{ use tracing::{debug, error, trace}; use super::live::{Op, ToLiveActor}; -use iroh_bytes::downloader::Downloader; #[derive(strum::Display, Debug)] pub enum ToGossipActor { @@ -35,7 +34,6 @@ pub struct GossipActor { inbox: mpsc::Receiver, sync: SyncHandle, gossip: Gossip, - downloader: Downloader, to_sync_actor: mpsc::Sender, joined: HashSet, want_join: HashSet, @@ -47,14 +45,12 @@ impl GossipActor { inbox: mpsc::Receiver, sync: SyncHandle, gossip: Gossip, - downloader: Downloader, to_sync_actor: mpsc::Sender, ) -> Self { Self { inbox, sync, gossip, - downloader, to_sync_actor, joined: Default::default(), want_join: Default::default(), @@ -176,11 +172,13 @@ impl GossipActor { .await?; } Op::ContentReady(hash) => { - // Inform the downloader that we now know that this peer has the content - // for this hash. - self.downloader - .nodes_have(hash, vec![msg.delivered_from]) - .await; + self.to_sync_actor + .send(ToLiveActor::NeighborContentReady { + namespace, + node: msg.delivered_from, + hash, + }) + .await?; } Op::SyncReport(report) => { self.to_sync_actor diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index ac39a2aa89..8a1f2a5997 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -1,13 +1,16 @@ #![allow(missing_docs)] +use std::collections::HashSet; use std::{collections::HashMap, time::SystemTime}; use anyhow::{Context, Result}; use futures::FutureExt; -use iroh_bytes::downloader::{DownloadRequest, Downloader}; +use iroh_bytes::downloader::{DownloadError, DownloadRequest, Downloader}; +use iroh_bytes::get::Stats; use iroh_bytes::HashAndFormat; use iroh_bytes::{store::EntryStatus, Hash}; use iroh_gossip::{net::Gossip, proto::TopicId}; +use iroh_net::NodeId; use iroh_net::{key::PublicKey, MagicEndpoint, NodeAddr}; use iroh_sync::{ actor::{OpenOpts, SyncHandle}, @@ -91,6 +94,11 @@ pub enum ToLiveActor { from: PublicKey, report: SyncReport, }, + NeighborContentReady { + namespace: NamespaceId, + node: PublicKey, + hash: Hash, + }, NeighborUp { namespace: NamespaceId, peer: PublicKey, @@ -124,6 +132,7 @@ type SyncConnectRes = ( Result, ); type SyncAcceptRes = Result; +type DownloadRes = (NamespaceId, Hash, Result); // Currently peers might double-sync in both directions. pub struct LiveActor { @@ -148,7 +157,11 @@ pub struct LiveActor { /// Running sync futures (from accept). running_sync_accept: JoinSet, /// Running download futures. - pending_downloads: JoinSet>, + download_tasks: JoinSet, + /// Content hashes which are wanted but not yet queued because no provider was found. + missing_hashes: HashSet, + /// Content hashes queued in downloader. + queued_hashes: HashSet, /// Subscribers to actor events subscribers: SubscribersMap, @@ -184,8 +197,10 @@ impl LiveActor { running_sync_connect: Default::default(), running_sync_accept: Default::default(), subscribers: Default::default(), - pending_downloads: Default::default(), + download_tasks: Default::default(), state: Default::default(), + missing_hashes: Default::default(), + queued_hashes: Default::default(), } } @@ -230,14 +245,10 @@ impl LiveActor { let res = res.context("running_sync_accept closed")?; self.on_sync_via_accept_finished(res).await; } - Some(res) = self.pending_downloads.join_next(), if !self.pending_downloads.is_empty() => { + Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => { trace!(?i, "tick: pending_downloads"); - let res = res.context("pending_downloads closed")?; - if let Some((namespace, hash)) = res { - self.subscribers.send(&namespace, Event::ContentReady { hash }).await; - // Inform our neighbors that we have new content ready. - self.broadcast_neighbors(namespace, &Op::ContentReady(hash)).await; - } + let (namespace, hash, res) = res.context("pending_downloads closed")?; + self.on_download_ready(namespace, hash, res).await; } } @@ -310,6 +321,13 @@ impl LiveActor { let outcome = self.accept_sync_request(namespace, peer); reply.send(outcome).ok(); } + ToLiveActor::NeighborContentReady { + namespace, + node, + hash, + } => { + self.on_neighbor_content_ready(namespace, node, hash).await; + } }; Ok(true) } @@ -583,6 +601,34 @@ impl LiveActor { } } + async fn on_download_ready( + &mut self, + namespace: NamespaceId, + hash: Hash, + res: Result, + ) { + self.queued_hashes.remove(&hash); + if res.is_ok() { + self.subscribers + .send(&namespace, Event::ContentReady { hash }) + .await; + // Inform our neighbors that we have new content ready. + self.broadcast_neighbors(namespace, &Op::ContentReady(hash)) + .await; + } else { + self.missing_hashes.insert(hash); + } + } + + async fn on_neighbor_content_ready( + &mut self, + namespace: NamespaceId, + node: NodeId, + hash: Hash, + ) { + self.start_download(namespace, hash, node, true).await; + } + #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))] async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) { let namespace = report.namespace; @@ -630,25 +676,14 @@ impl LiveActor { } => { // A new entry was inserted from initial sync or gossip. Queue downloading the // content. - let hash = entry.content_hash(); - let entry_status = self.bao_store.entry_status(&hash).await?; - // TODO: Make downloads configurable. - if matches!(entry_status, EntryStatus::NotFound | EntryStatus::Partial) - && should_download - { - let mut nodes = vec![]; - if let ContentStatus::Complete = remote_content_status { + if should_download { + let hash = entry.content_hash(); + if matches!(remote_content_status, ContentStatus::Complete) { let node_id = PublicKey::from_bytes(&from)?; - nodes.push(node_id); - }; - let req = DownloadRequest::untagged(HashAndFormat::raw(hash), nodes); - let handle = self.downloader.queue(req).await; - - self.pending_downloads.spawn(async move { - // NOTE: this ignores the result for now, simply keeping the option - let res = handle.await.ok(); - res.map(|_| (namespace, hash)) - }); + self.start_download(namespace, hash, node_id, false).await; + } else { + self.missing_hashes.insert(hash); + } } } } @@ -656,6 +691,31 @@ impl LiveActor { Ok(()) } + async fn start_download( + &mut self, + namespace: NamespaceId, + hash: Hash, + node: PublicKey, + only_if_missing: bool, + ) { + let entry_status = self.bao_store.entry_status(&hash).await; + if matches!(entry_status, Ok(EntryStatus::Complete)) { + self.missing_hashes.remove(&hash); + return; + } + if self.queued_hashes.contains(&hash) { + self.downloader.nodes_have(hash, vec![node]).await; + } else if !only_if_missing || self.missing_hashes.contains(&hash) { + let req = DownloadRequest::untagged(HashAndFormat::raw(hash), vec![node]); + let handle = self.downloader.queue(req).await; + + self.queued_hashes.insert(hash); + self.missing_hashes.remove(&hash); + self.download_tasks + .spawn(async move { (namespace, hash, handle.await) }); + } + } + #[instrument("accept", skip_all)] pub async fn handle_connection(&mut self, conn: quinn::Connecting) { let to_actor_tx = self.sync_actor_tx.clone(); diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index d2161932d4..8436639de8 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use iroh::{ client::{mem::Doc, Entry, LiveEvent}, node::{Builder, Node}, @@ -16,7 +16,7 @@ use iroh::{ use iroh_net::key::{PublicKey, SecretKey}; use quic_rpc::transport::misc::DummyServerEndpoint; use rand::{CryptoRng, Rng, SeedableRng}; -use tracing::{debug, info}; +use tracing::{debug, error_span, info, Instrument}; use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_bytes::Hash; @@ -683,227 +683,225 @@ async fn test_download_policies() -> Result<()> { Ok(()) } -// TODO: reenable when passing consistently -// /// Test sync between many nodes with propagation through sync reports. -// #[tokio::test(flavor = "multi_thread")] -// async fn sync_big() -> Result<()> { -// setup_logging(); -// let mut rng = test_rng(b"sync_big"); -// let rt = test_runtime(); -// let n_nodes = std::env::var("NODES") -// .map(|v| v.parse().expect("NODES must be a number")) -// .unwrap_or(10); -// let n_entries_init = 1; - -// tokio::task::spawn(async move { -// for i in 0.. { -// tokio::time::sleep(Duration::from_secs(1)).await; -// info!("tick {i}"); -// } -// }); - -// let nodes = spawn_nodes(rt, n_nodes, &mut rng).await?; -// let peer_ids = nodes.iter().map(|node| node.node_id()).collect::>(); -// let clients = nodes.iter().map(|node| node.client()).collect::>(); -// let authors = collect_futures(clients.iter().map(|c| c.authors.create())).await?; - -// let doc0 = clients[0].docs.create().await?; -// let mut ticket = doc0.share(ShareMode::Write).await?; -// // do not join for now, just import without any peer info -// let peer0 = ticket.nodes[0].clone(); -// ticket.nodes = vec![]; - -// let mut docs = vec![]; -// docs.push(doc0); -// docs.extend_from_slice( -// &collect_futures( -// clients -// .iter() -// .skip(1) -// .map(|c| c.docs.import(ticket.clone())), -// ) -// .await?, -// ); - -// let mut expected = vec![]; - -// // create initial data on each node -// publish(&docs, &mut expected, n_entries_init, |i, j| { -// ( -// authors[i], -// format!("init/{}/{j}", peer_ids[i].fmt_short()), -// format!("init:{i}:{j}"), -// ) -// }) -// .await?; - -// // assert initial data -// for (i, doc) in docs.iter().enumerate() { -// let entries = get_all_with_content(doc).await?; -// let mut expected = expected -// .iter() -// .filter(|e| e.author == authors[i]) -// .cloned() -// .collect::>(); -// expected.sort(); -// assert_eq!(entries, expected, "phase1 pre-sync correct"); -// } - -// // setup event streams -// let events = collect_futures(docs.iter().map(|d| d.subscribe())).await?; - -// // join nodes together -// for (i, doc) in docs.iter().enumerate().skip(1) { -// info!(me = %peer_ids[i].fmt_short(), peer = %peer0.peer_id.fmt_short(), "join"); -// doc.start_sync(vec![peer0.clone()]).await?; -// } - -// // wait for InsertRemote events stuff to happen -// info!("wait for all peers to receive insert events"); -// let expected_inserts = (n_nodes - 1) * n_entries_init; -// let mut tasks = tokio::task::JoinSet::default(); -// for (i, events) in events.into_iter().enumerate() { -// let doc = docs[i].clone(); -// let me = doc.id().fmt_short(); -// let expected = expected.clone(); -// let fut = async move { -// wait_for_events(events, expected_inserts, TIMEOUT, |e| { -// matches!(e, LiveEvent::InsertRemote { .. }) -// }) -// .await?; -// let entries = get_all(&doc).await?; -// if entries != expected { -// Err(anyhow!( -// "node {i} failed (has {} entries but expected to have {})", -// entries.len(), -// expected.len() -// )) -// } else { -// info!( -// "received and checked all {} expected entries", -// expected.len() -// ); -// Ok(()) -// } -// } -// .instrument(error_span!("sync-test", %me)); -// let fut = fut.map(move |r| r.with_context(move || format!("node {i} ({me})"))); -// tasks.spawn(fut); -// } - -// while let Some(res) = tasks.join_next().await { -// res??; -// } - -// assert_all_docs(&docs, &peer_ids, &expected, "after initial sync").await; - -// info!("shutdown"); -// for node in nodes { -// node.shutdown(); -// } - -// Ok(()) -// } - -// /// Get all entries of a document. -// async fn get_all(doc: &Doc) -> anyhow::Result> { -// let entries = doc.get_many(GetFilter::All).await?; -// let entries = entries.collect::>().await; -// entries.into_iter().collect() -// } - -// /// Get all entries of a document with the blob content. -// async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { -// let entries = doc.get_many(GetFilter::All).await?; -// let entries = entries.and_then(|entry| async { -// let content = doc.read_to_bytes(&entry).await; -// content.map(|c| (entry, c)) -// }); -// let entries = entries.collect::>().await; -// let entries = entries.into_iter().collect::>>()?; -// Ok(entries) -// } - -// async fn publish( -// docs: &[Doc], -// expected: &mut Vec, -// n: usize, -// cb: impl Fn(usize, usize) -> (AuthorId, String, String), -// ) -> anyhow::Result<()> { -// for (i, doc) in docs.iter().enumerate() { -// for j in 0..n { -// let (author, key, value) = cb(i, j); -// doc.set_bytes(author, key.as_bytes().to_vec(), value.as_bytes().to_vec()) -// .await?; -// expected.push(ExpectedEntry { author, key, value }); -// } -// } -// expected.sort(); -// Ok(()) -// } - -// /// Collect an iterator into futures by joining them all and failing if any future failed. -// async fn collect_futures( -// futs: impl IntoIterator>>, -// ) -> anyhow::Result> { -// futures::future::join_all(futs) -// .await -// .into_iter() -// .collect::>>() -// } - -// /// Collect `count` events from the `events` stream, only collecting events for which `matcher` -// /// returns true. -// async fn wait_for_events( -// mut events: impl Stream> + Send + Unpin + 'static, -// count: usize, -// timeout: Duration, -// matcher: impl Fn(&LiveEvent) -> bool, -// ) -> anyhow::Result> { -// let mut res = Vec::with_capacity(count); -// let sleep = tokio::time::sleep(timeout); -// tokio::pin!(sleep); -// while res.len() < count { -// tokio::select! { -// () = &mut sleep => { -// bail!("Failed to collect {count} elements in {timeout:?} (collected only {})", res.len()); -// }, -// event = events.try_next() => { -// let event = event?; -// match event { -// None => bail!("stream ended after {} items, but expected {count}", res.len()), -// Some(event) => if matcher(&event) { -// res.push(event); -// debug!("recv event {} of {count}", res.len()); -// } -// } -// } -// } -// } -// Ok(res) -// } - -// async fn assert_all_docs( -// docs: &[Doc], -// peer_ids: &[PublicKey], -// expected: &Vec, -// label: &str, -// ) { -// info!("validate all peers: {label}"); -// for (i, doc) in docs.iter().enumerate() { -// let entries = get_all(doc).await.unwrap_or_else(|err| { -// panic!("failed to get entries for peer {:?}: {err:?}", peer_ids[i]) -// }); -// assert_eq!( -// &entries, -// expected, -// "{label}: peer {i} {:?} failed (have {} but expected {})", -// peer_ids[i], -// entries.len(), -// expected.len() -// ); -// } -// } +/// Test sync between many nodes with propagation through sync reports. +#[tokio::test(flavor = "multi_thread")] +async fn sync_big() -> Result<()> { + setup_logging(); + let mut rng = test_rng(b"sync_big"); + let n_nodes = std::env::var("NODES") + .map(|v| v.parse().expect("NODES must be a number")) + .unwrap_or(10); + let n_entries_init = 1; + + tokio::task::spawn(async move { + for i in 0.. { + tokio::time::sleep(Duration::from_secs(1)).await; + info!("tick {i}"); + } + }); + + let nodes = spawn_nodes(n_nodes, &mut rng).await?; + let node_ids = nodes.iter().map(|node| node.node_id()).collect::>(); + let clients = nodes.iter().map(|node| node.client()).collect::>(); + let authors = collect_futures(clients.iter().map(|c| c.authors.create())).await?; + + let doc0 = clients[0].docs.create().await?; + let mut ticket = doc0.share(ShareMode::Write).await?; + // do not join for now, just import without any peer info + let peer0 = ticket.nodes[0].clone(); + ticket.nodes = vec![]; + + let mut docs = vec![]; + docs.push(doc0); + docs.extend_from_slice( + &collect_futures( + clients + .iter() + .skip(1) + .map(|c| c.docs.import(ticket.clone())), + ) + .await?, + ); + + let mut expected = vec![]; + + // create initial data on each node + publish(&docs, &mut expected, n_entries_init, |i, j| { + ( + authors[i], + format!("init/{}/{j}", node_ids[i].fmt_short()), + format!("init:{i}:{j}"), + ) + }) + .await?; + + // assert initial data + for (i, doc) in docs.iter().enumerate() { + let entries = get_all_with_content(doc).await?; + let mut expected = expected + .iter() + .filter(|e| e.author == authors[i]) + .cloned() + .collect::>(); + expected.sort(); + assert_eq!(entries, expected, "phase1 pre-sync correct"); + } + + // setup event streams + let events = collect_futures(docs.iter().map(|d| d.subscribe())).await?; + + // join nodes together + for (i, doc) in docs.iter().enumerate().skip(1) { + info!(me = %node_ids[i].fmt_short(), peer = %peer0.node_id.fmt_short(), "join"); + doc.start_sync(vec![peer0.clone()]).await?; + } + + // wait for InsertRemote events stuff to happen + info!("wait for all peers to receive insert events"); + let expected_inserts = (n_nodes - 1) * n_entries_init; + let mut tasks = tokio::task::JoinSet::default(); + for (i, events) in events.into_iter().enumerate() { + let doc = docs[i].clone(); + let me = doc.id().fmt_short(); + let expected = expected.clone(); + let fut = async move { + wait_for_events(events, expected_inserts, TIMEOUT, |e| { + matches!(e, LiveEvent::InsertRemote { .. }) + }) + .await?; + let entries = get_all(&doc).await?; + if entries != expected { + Err(anyhow!( + "node {i} failed (has {} entries but expected to have {})", + entries.len(), + expected.len() + )) + } else { + info!( + "received and checked all {} expected entries", + expected.len() + ); + Ok(()) + } + } + .instrument(error_span!("sync-test", %me)); + let fut = fut.map(move |r| r.with_context(move || format!("node {i} ({me})"))); + tasks.spawn(fut); + } + + while let Some(res) = tasks.join_next().await { + res??; + } + + assert_all_docs(&docs, &node_ids, &expected, "after initial sync").await; + + info!("shutdown"); + for node in nodes { + node.shutdown(); + } + + Ok(()) +} + +/// Get all entries of a document. +async fn get_all(doc: &Doc) -> anyhow::Result> { + let entries = doc.get_many(Query::all()).await?; + let entries = entries.collect::>().await; + entries.into_iter().collect() +} + +/// Get all entries of a document with the blob content. +async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { + let entries = doc.get_many(Query::all()).await?; + let entries = entries.and_then(|entry| async { + let content = entry.content_bytes(doc).await; + content.map(|c| (entry, c)) + }); + let entries = entries.collect::>().await; + let entries = entries.into_iter().collect::>>()?; + Ok(entries) +} + +async fn publish( + docs: &[Doc], + expected: &mut Vec, + n: usize, + cb: impl Fn(usize, usize) -> (AuthorId, String, String), +) -> anyhow::Result<()> { + for (i, doc) in docs.iter().enumerate() { + for j in 0..n { + let (author, key, value) = cb(i, j); + doc.set_bytes(author, key.as_bytes().to_vec(), value.as_bytes().to_vec()) + .await?; + expected.push(ExpectedEntry { author, key, value }); + } + } + expected.sort(); + Ok(()) +} + +/// Collect an iterator into futures by joining them all and failing if any future failed. +async fn collect_futures( + futs: impl IntoIterator>>, +) -> anyhow::Result> { + futures::future::join_all(futs) + .await + .into_iter() + .collect::>>() +} + +/// Collect `count` events from the `events` stream, only collecting events for which `matcher` +/// returns true. +async fn wait_for_events( + mut events: impl Stream> + Send + Unpin + 'static, + count: usize, + timeout: Duration, + matcher: impl Fn(&LiveEvent) -> bool, +) -> anyhow::Result> { + let mut res = Vec::with_capacity(count); + let sleep = tokio::time::sleep(timeout); + tokio::pin!(sleep); + while res.len() < count { + tokio::select! { + () = &mut sleep => { + bail!("Failed to collect {count} elements in {timeout:?} (collected only {})", res.len()); + }, + event = events.try_next() => { + let event = event?; + match event { + None => bail!("stream ended after {} items, but expected {count}", res.len()), + Some(event) => if matcher(&event) { + res.push(event); + debug!("recv event {} of {count}", res.len()); + } + } + } + } + } + Ok(res) +} + +async fn assert_all_docs( + docs: &[Doc], + node_ids: &[PublicKey], + expected: &Vec, + label: &str, +) { + info!("validate all peers: {label}"); + for (i, doc) in docs.iter().enumerate() { + let entries = get_all(doc).await.unwrap_or_else(|err| { + panic!("failed to get entries for peer {:?}: {err:?}", node_ids[i]) + }); + assert_eq!( + &entries, + expected, + "{label}: peer {i} {:?} failed (have {} but expected {})", + node_ids[i], + entries.len(), + expected.len() + ); + } +} #[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Clone)] struct ExpectedEntry {