diff --git a/Cargo.lock b/Cargo.lock index 2dd3be0..987f220 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5262,7 +5262,7 @@ dependencies = [ [[package]] name = "trident-storage" -version = "0.0.13" +version = "0.0.14" dependencies = [ "anyhow", "async-stream", diff --git a/src/iroh_node.rs b/src/iroh_node.rs index f38512c..25f4090 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -271,15 +271,16 @@ impl IrohNode { let ticket = DocTicket::from_str(table_ticket).map_err(Error::doc)?; match self.table_storages.entry(table_name.to_string()) { Entry::Occupied(entry) => { - if entry.get().iroh_doc().id() != ticket.capability.id() { + let iroh_doc = entry.get().iroh_doc(); + if iroh_doc.id() != ticket.capability.id() { return Err(Error::existing_table(table_name)); } - entry - .get() - .iroh_doc() + iroh_doc .start_sync(ticket.nodes) .await .map_err(Error::doc)?; + let storage0 = entry.get().clone(); + tokio::spawn(async move { storage0.download_missing().await }); Ok(entry.get().iroh_doc().id()) } Entry::Vacant(entry) => { diff --git a/src/storage.rs b/src/storage.rs index 24f3ac2..68a1687 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -6,7 +6,7 @@ use crate::sinks::Sink; use crate::utils::key_to_bytes; use crate::{IrohClient, IrohDoc}; use async_stream::stream; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use iroh::bytes::store::ExportMode; use iroh::bytes::Hash; use iroh::client::{Entry, LiveEvent}; @@ -14,7 +14,7 @@ use iroh::net::key::PublicKey; use iroh::net::NodeAddr; use iroh::rpc_protocol::{BlobDownloadRequest, DownloadLocation, SetTagOption, ShareMode}; use iroh::sync::store::{Query, SortBy, SortDirection}; -use iroh::sync::{AuthorId, ContentStatus}; +use iroh::sync::{AuthorId, ContentStatus, PeerIdBytes}; use iroh::ticket::DocTicket; use iroh_base::hash::BlobFormat; use lru::LruCache; @@ -219,6 +219,49 @@ impl Storage { Ok(storage) } + async fn download_entry_from_peers(&self, entry: &Entry, peers: &[PeerIdBytes]) -> Result<()> { + let key = std::str::from_utf8(entry.key()).map_err(Error::incorrect_key)?; + for peer in peers { + let node_addr = match PublicKey::from_bytes(&peer) { + Ok(public_key) => NodeAddr::new(public_key), + Err(_signing_error) => { + warn!("potential db corruption: peers per doc can't be decoded"); + continue; + } + }; + let progress = self + .sync_client + .blobs + .download(BlobDownloadRequest { + hash: entry.content_hash(), + format: BlobFormat::Raw, + peer: node_addr, + tag: SetTagOption::Auto, + out: DownloadLocation::External { + path: self.get_path(key), + in_place: true, + }, + }) + .await + .map_err(Error::io_error)?; + if progress.finish().await.map_err(Error::storage).is_ok() { + break; + } + } + Ok(()) + } + + pub async fn download_missing(&self) -> Result<()> { + let Ok(Some(peers)) = self.iroh_doc.get_sync_peers().await else { + return Ok(()); + }; + let mut stream = self.iroh_doc.get_many(Query::all()).await.unwrap(); + while let Some(entry) = stream.try_next().await.unwrap() { + self.download_entry_from_peers(&entry, &peers).await?; + } + Ok(()) + } + fn get_path(&self, key: &str) -> PathBuf { if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { let file_shard = &self.shards[&file_shard_config.name]; @@ -349,37 +392,7 @@ impl Storage { .map_err(Error::entry)?; if let Some(entry) = entry { if let Ok(Some(peers)) = self.iroh_doc.get_sync_peers().await { - for peer in peers { - let node_addr = match PublicKey::from_bytes(&peer) { - Ok(public_key) => NodeAddr::new(public_key), - Err(_signing_error) => { - warn!("potential db corruption: peers per doc can't be decoded"); - continue; - } - }; - let progress = self - .sync_client - .blobs - .download(BlobDownloadRequest { - hash: entry.content_hash(), - format: BlobFormat::Raw, - peer: node_addr, - tag: SetTagOption::Auto, - out: DownloadLocation::External { - path: shard.get_path_for(key), - in_place: true, - }, - }) - .await - .map_err(Error::io_error)?; - match progress.finish().await.map_err(Error::storage) { - Ok(_) => break, - Err(e) => { - eprintln!("{:?}", e); - panic!() - } - } - } + self.download_entry_from_peers(&entry, &peers).await?; } return Ok(Some(Box::new( entry