Skip to content

Commit

Permalink
[feat] Download missing blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Mar 2, 2024
1 parent b864a63 commit 74a68bf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,16 @@ impl IrohNode {
let ticket = DocTicket::from_str(table_ticket).map_err(Error::doc)?;
match self.table_storages.entry(table_name.to_string()) {
Entry::Occupied(entry) => {
if entry.get().iroh_doc().id() != ticket.capability.id() {
let iroh_doc = entry.get().iroh_doc();
if iroh_doc.id() != ticket.capability.id() {
return Err(Error::existing_table(table_name));
}
entry
.get()
.iroh_doc()
iroh_doc
.start_sync(ticket.nodes)
.await
.map_err(Error::doc)?;
let storage0 = entry.get().clone();
tokio::spawn(async move { storage0.download_missing().await });
Ok(entry.get().iroh_doc().id())
}
Entry::Vacant(entry) => {
Expand Down
79 changes: 46 additions & 33 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use crate::sinks::Sink;
use crate::utils::key_to_bytes;
use crate::{IrohClient, IrohDoc};
use async_stream::stream;
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use iroh::bytes::store::ExportMode;
use iroh::bytes::Hash;
use iroh::client::{Entry, LiveEvent};
use iroh::net::key::PublicKey;
use iroh::net::NodeAddr;
use iroh::rpc_protocol::{BlobDownloadRequest, DownloadLocation, SetTagOption, ShareMode};
use iroh::sync::store::{Query, SortBy, SortDirection};
use iroh::sync::{AuthorId, ContentStatus};
use iroh::sync::{AuthorId, ContentStatus, PeerIdBytes};
use iroh::ticket::DocTicket;
use iroh_base::hash::BlobFormat;
use lru::LruCache;
Expand Down Expand Up @@ -219,6 +219,49 @@ impl Storage {
Ok(storage)
}

async fn download_entry_from_peers(&self, entry: &Entry, peers: &[PeerIdBytes]) -> Result<()> {
let key = std::str::from_utf8(entry.key()).map_err(Error::incorrect_key)?;
for peer in peers {
let node_addr = match PublicKey::from_bytes(&peer) {
Ok(public_key) => NodeAddr::new(public_key),
Err(_signing_error) => {
warn!("potential db corruption: peers per doc can't be decoded");
continue;
}
};
let progress = self
.sync_client
.blobs
.download(BlobDownloadRequest {
hash: entry.content_hash(),
format: BlobFormat::Raw,
peer: node_addr,
tag: SetTagOption::Auto,
out: DownloadLocation::External {
path: self.get_path(key),
in_place: true,
},
})
.await
.map_err(Error::io_error)?;
if progress.finish().await.map_err(Error::storage).is_ok() {
break;
}
}
Ok(())
}

pub async fn download_missing(&self) -> Result<()> {
let Ok(Some(peers)) = self.iroh_doc.get_sync_peers().await else {
return Ok(());
};
let mut stream = self.iroh_doc.get_many(Query::all()).await.unwrap();
while let Some(entry) = stream.try_next().await.unwrap() {
self.download_entry_from_peers(&entry, &peers).await?;
}
Ok(())
}

fn get_path(&self, key: &str) -> PathBuf {
if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() {
let file_shard = &self.shards[&file_shard_config.name];
Expand Down Expand Up @@ -349,37 +392,7 @@ impl Storage {
.map_err(Error::entry)?;
if let Some(entry) = entry {
if let Ok(Some(peers)) = self.iroh_doc.get_sync_peers().await {
for peer in peers {
let node_addr = match PublicKey::from_bytes(&peer) {
Ok(public_key) => NodeAddr::new(public_key),
Err(_signing_error) => {
warn!("potential db corruption: peers per doc can't be decoded");
continue;
}
};
let progress = self
.sync_client
.blobs
.download(BlobDownloadRequest {
hash: entry.content_hash(),
format: BlobFormat::Raw,
peer: node_addr,
tag: SetTagOption::Auto,
out: DownloadLocation::External {
path: shard.get_path_for(key),
in_place: true,
},
})
.await
.map_err(Error::io_error)?;
match progress.finish().await.map_err(Error::storage) {
Ok(_) => break,
Err(e) => {
eprintln!("{:?}", e);
panic!()
}
}
}
self.download_entry_from_peers(&entry, &peers).await?;
}
return Ok(Some(Box::new(
entry
Expand Down

0 comments on commit 74a68bf

Please sign in to comment.