Skip to content

Commit

Permalink
feat: better download handling in sync engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Apr 23, 2024
1 parent d6ff0cf commit e189e2a
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 262 deletions.
3 changes: 1 addition & 2 deletions iroh/src/sync_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl SyncEngine {
endpoint.clone(),
gossip.clone(),
bao_store,
downloader.clone(),
downloader,
to_live_actor_recv,
live_actor_tx.clone(),
to_gossip_actor,
Expand All @@ -87,7 +87,6 @@ impl SyncEngine {
to_gossip_actor_recv,
sync.clone(),
gossip,
downloader,
live_actor_tx.clone(),
);
let live_actor_task = tokio::task::spawn(
Expand Down
16 changes: 7 additions & 9 deletions iroh/src/sync_engine/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tokio::{
use tracing::{debug, error, trace};

use super::live::{Op, ToLiveActor};
use iroh_bytes::downloader::Downloader;

#[derive(strum::Display, Debug)]
pub enum ToGossipActor {
Expand All @@ -35,7 +34,6 @@ pub struct GossipActor {
inbox: mpsc::Receiver<ToGossipActor>,
sync: SyncHandle,
gossip: Gossip,
downloader: Downloader,
to_sync_actor: mpsc::Sender<ToLiveActor>,
joined: HashSet<NamespaceId>,
want_join: HashSet<NamespaceId>,
Expand All @@ -47,14 +45,12 @@ impl GossipActor {
inbox: mpsc::Receiver<ToGossipActor>,
sync: SyncHandle,
gossip: Gossip,
downloader: Downloader,
to_sync_actor: mpsc::Sender<ToLiveActor>,
) -> Self {
Self {
inbox,
sync,
gossip,
downloader,
to_sync_actor,
joined: Default::default(),
want_join: Default::default(),
Expand Down Expand Up @@ -176,11 +172,13 @@ impl GossipActor {
.await?;
}
Op::ContentReady(hash) => {
// Inform the downloader that we now know that this peer has the content
// for this hash.
self.downloader
.nodes_have(hash, vec![msg.delivered_from])
.await;
self.to_sync_actor
.send(ToLiveActor::NeighborContentReady {
namespace,
node: msg.delivered_from,
hash,
})
.await?;
}
Op::SyncReport(report) => {
self.to_sync_actor
Expand Down
116 changes: 88 additions & 28 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(missing_docs)]

use std::collections::HashSet;
use std::{collections::HashMap, time::SystemTime};

use anyhow::{Context, Result};
use futures::FutureExt;
use iroh_bytes::downloader::{DownloadRequest, Downloader};
use iroh_bytes::downloader::{DownloadError, DownloadRequest, Downloader};
use iroh_bytes::get::Stats;
use iroh_bytes::HashAndFormat;
use iroh_bytes::{store::EntryStatus, Hash};
use iroh_gossip::{net::Gossip, proto::TopicId};
use iroh_net::NodeId;
use iroh_net::{key::PublicKey, MagicEndpoint, NodeAddr};
use iroh_sync::{
actor::{OpenOpts, SyncHandle},
Expand Down Expand Up @@ -91,6 +94,11 @@ pub enum ToLiveActor {
from: PublicKey,
report: SyncReport,
},
NeighborContentReady {
namespace: NamespaceId,
node: PublicKey,
hash: Hash,
},
NeighborUp {
namespace: NamespaceId,
peer: PublicKey,
Expand Down Expand Up @@ -124,6 +132,7 @@ type SyncConnectRes = (
Result<SyncFinished, ConnectError>,
);
type SyncAcceptRes = Result<SyncFinished, AcceptError>;
type DownloadRes = (NamespaceId, Hash, Result<Stats, DownloadError>);

// Currently peers might double-sync in both directions.
pub struct LiveActor<B: iroh_bytes::store::Store> {
Expand All @@ -148,7 +157,11 @@ pub struct LiveActor<B: iroh_bytes::store::Store> {
/// Running sync futures (from accept).
running_sync_accept: JoinSet<SyncAcceptRes>,
/// Running download futures.
pending_downloads: JoinSet<Option<(NamespaceId, Hash)>>,
download_tasks: JoinSet<DownloadRes>,
/// Content hashes which are wanted but not yet queued because no provider was found.
missing_hashes: HashSet<Hash>,
/// Content hashes queued in downloader.
queued_hashes: HashSet<Hash>,

/// Subscribers to actor events
subscribers: SubscribersMap,
Expand Down Expand Up @@ -184,8 +197,10 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
running_sync_connect: Default::default(),
running_sync_accept: Default::default(),
subscribers: Default::default(),
pending_downloads: Default::default(),
download_tasks: Default::default(),
state: Default::default(),
missing_hashes: Default::default(),
queued_hashes: Default::default(),
}
}

Expand Down Expand Up @@ -230,14 +245,10 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
let res = res.context("running_sync_accept closed")?;
self.on_sync_via_accept_finished(res).await;
}
Some(res) = self.pending_downloads.join_next(), if !self.pending_downloads.is_empty() => {
Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
trace!(?i, "tick: pending_downloads");
let res = res.context("pending_downloads closed")?;
if let Some((namespace, hash)) = res {
self.subscribers.send(&namespace, Event::ContentReady { hash }).await;
// Inform our neighbors that we have new content ready.
self.broadcast_neighbors(namespace, &Op::ContentReady(hash)).await;
}
let (namespace, hash, res) = res.context("pending_downloads closed")?;
self.on_download_ready(namespace, hash, res).await;

}
}
Expand Down Expand Up @@ -310,6 +321,13 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
let outcome = self.accept_sync_request(namespace, peer);
reply.send(outcome).ok();
}
ToLiveActor::NeighborContentReady {
namespace,
node,
hash,
} => {
self.on_neighbor_content_ready(namespace, node, hash).await;
}
};
Ok(true)
}
Expand Down Expand Up @@ -583,6 +601,34 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
}
}

async fn on_download_ready(
&mut self,
namespace: NamespaceId,
hash: Hash,
res: Result<Stats, DownloadError>,
) {
self.queued_hashes.remove(&hash);
if res.is_ok() {
self.subscribers
.send(&namespace, Event::ContentReady { hash })
.await;
// Inform our neighbors that we have new content ready.
self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
.await;
} else {
self.missing_hashes.insert(hash);
}
}

async fn on_neighbor_content_ready(
&mut self,
namespace: NamespaceId,
node: NodeId,
hash: Hash,
) {
self.start_download(namespace, hash, node, true).await;
}

#[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
let namespace = report.namespace;
Expand Down Expand Up @@ -630,32 +676,46 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
} => {
// A new entry was inserted from initial sync or gossip. Queue downloading the
// content.
let hash = entry.content_hash();
let entry_status = self.bao_store.entry_status(&hash).await?;
// TODO: Make downloads configurable.
if matches!(entry_status, EntryStatus::NotFound | EntryStatus::Partial)
&& should_download
{
let mut nodes = vec![];
if let ContentStatus::Complete = remote_content_status {
if should_download {
let hash = entry.content_hash();
if matches!(remote_content_status, ContentStatus::Complete) {
let node_id = PublicKey::from_bytes(&from)?;
nodes.push(node_id);
};
let req = DownloadRequest::untagged(HashAndFormat::raw(hash), nodes);
let handle = self.downloader.queue(req).await;

self.pending_downloads.spawn(async move {
// NOTE: this ignores the result for now, simply keeping the option
let res = handle.await.ok();
res.map(|_| (namespace, hash))
});
self.start_download(namespace, hash, node_id, false).await;
} else {
self.missing_hashes.insert(hash);
}
}
}
}

Ok(())
}

async fn start_download(
&mut self,
namespace: NamespaceId,
hash: Hash,
node: PublicKey,
only_if_missing: bool,
) {
let entry_status = self.bao_store.entry_status(&hash).await;
if matches!(entry_status, Ok(EntryStatus::Complete)) {
self.missing_hashes.remove(&hash);
return;
}
if self.queued_hashes.contains(&hash) {
self.downloader.nodes_have(hash, vec![node]).await;
} else if !only_if_missing || self.missing_hashes.contains(&hash) {
let req = DownloadRequest::untagged(HashAndFormat::raw(hash), vec![node]);
let handle = self.downloader.queue(req).await;

self.queued_hashes.insert(hash);
self.missing_hashes.remove(&hash);
self.download_tasks
.spawn(async move { (namespace, hash, handle.await) });
}
}

#[instrument("accept", skip_all)]
pub async fn handle_connection(&mut self, conn: quinn::Connecting) {
let to_actor_tx = self.sync_actor_tx.clone();
Expand Down
Loading

0 comments on commit e189e2a

Please sign in to comment.