From 87e5c936949ba331e1ef48326d718f9d8d0ca04f Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 13 Dec 2024 08:10:13 -0600 Subject: [PATCH] chore: resolve conversation list and file index --- extensions/warp-ipfs/src/store/document.rs | 27 ++++++ .../warp-ipfs/src/store/document/files.rs | 97 +++++++++++++++++-- 2 files changed, 118 insertions(+), 6 deletions(-) diff --git a/extensions/warp-ipfs/src/store/document.rs b/extensions/warp-ipfs/src/store/document.rs index c8537a19a..b47a6a1f9 100644 --- a/extensions/warp-ipfs/src/store/document.rs +++ b/extensions/warp-ipfs/src/store/document.rs @@ -294,6 +294,17 @@ impl RootDocument { } }); + let fut_conversation_list = + futures::future::ready(self.conversations.ok_or(Error::Other)).and_then(|document| { + let ipfs = ipfs.clone(); + async move { + ipfs.get_dag(document) + .await + .map_err(anyhow::Error::from) + .map_err(Error::from) + } + }); + let fut_blocked_by_list = futures::future::ready(self.block_by.ok_or(Error::Other)) .and_then(|document| { let ipfs = ipfs.clone(); @@ -316,6 +327,20 @@ impl RootDocument { } }); + let fut_file_index = futures::future::ready(self.file_index.ok_or(Error::Other)).and_then(|document| { + let ipfs = ipfs.clone(); + async move { + let index: DirectoryDocument = ipfs.get_dag(document).deserialized() + .await + .map_err(anyhow::Error::from) + .map_err(Error::from)?; + + index.reconstruct_document_path(&ipfs, true).await?; + + Ok(()) + } + }); + let fut_keystore = futures::future::ready(self.keystore.ok_or(Error::Other)).and_then( |document| { let ipfs = ipfs.clone(); @@ -348,7 +373,9 @@ impl RootDocument { let _ = tokio::join!( fut_friends, + fut_file_index, fut_block_list, + fut_conversation_list, fut_blocked_by_list, fut_requests_list, fut_keystore diff --git a/extensions/warp-ipfs/src/store/document/files.rs b/extensions/warp-ipfs/src/store/document/files.rs index 7ebb7c69a..63fa406b2 100644 --- a/extensions/warp-ipfs/src/store/document/files.rs +++ b/extensions/warp-ipfs/src/store/document/files.rs @@ -5,6 +5,7 @@ use chrono::{DateTime, Utc}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryFutureExt}; use ipld_core::cid::Cid; +use pollable_map::futures::FutureMap; use rust_ipfs::{Ipfs, IpfsPath}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -85,7 +86,6 @@ impl DirectoryDocument { if let Some(cid) = self.items { let list = ipfs .get_dag(cid) - .timeout(Duration::from_secs(10)) .deserialized::>() .await .unwrap_or_default(); @@ -107,7 +107,6 @@ impl DirectoryDocument { directory.set_thumbnail_reference(&IpfsPath::from(cid).to_string()); let image: ImageDag = ipfs .get_dag(cid) - .timeout(Duration::from_secs(10)) .deserialized() .await?; @@ -116,7 +115,6 @@ impl DirectoryDocument { if resolve_thumbnail { let data = ipfs .cat_unixfs(image.link) - .timeout(Duration::from_secs(10)) .max_length(MAX_THUMBNAIL_SIZE) .await .unwrap_or_default(); @@ -128,9 +126,49 @@ impl DirectoryDocument { directory.rebuild_paths(&None); Ok(directory) } + + // #[async_recursion::async_recursion] + pub(crate) async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> { + if let Some(cid) = self.items { + let list = ipfs + .get_dag(cid) + .deserialized::>() + .await + .unwrap_or_default(); + + let items_resolved = FuturesUnordered::from_iter( + list.into_iter() + .map(|item| item.reconstruct_document_path(ipfs, resolve_thumbnail).into_future()), + ); + + futures::pin_mut!(items_resolved); + + while let Some(item) = items_resolved.next().await { + if let Err(e) = item { + tracing::warn!(error = %e, "unable to reconstruct item document"); + } + } + } + + if let Some(cid) = self.thumbnail { + let image: ImageDag = ipfs + .get_dag(cid) + .deserialized() + .await?; + + if resolve_thumbnail { + ipfs + .cat_unixfs(image.link) + .max_length(MAX_THUMBNAIL_SIZE) + .await + .unwrap_or_default(); + } + } + Ok(()) + } } -#[derive(Clone, Debug, Copy, Deserialize, Serialize)] +#[derive(Clone, Debug, Copy, Deserialize, Serialize, Ord, PartialOrd, Eq, PartialEq)] #[serde(rename_all = "snake_case")] pub enum ItemDocument { Directory(Cid), @@ -155,6 +193,29 @@ impl ItemDocument { Ok(document) } + pub async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> { + match self { + ItemDocument::Directory(cid) => { + let document: DirectoryDocument = ipfs + .get_dag(cid) + .deserialized() + .await + .map_err(anyhow::Error::from)?; + + document.reconstruct_document_path(ipfs, resolve_thumbnail).await + } + ItemDocument::File(cid) => { + let document: FileDocument = ipfs + .get_dag(cid) + .deserialized() + .await + .map_err(anyhow::Error::from)?; + + document.reconstruct_document_path(ipfs, resolve_thumbnail).await + } + } + } + pub async fn resolve(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result { let item = match *self { ItemDocument::Directory(cid) => { @@ -241,6 +302,32 @@ impl FileDocument { Ok(document) } + pub async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> { + if let Some(cid) = self.thumbnail { + let image: ImageDag = ipfs + .get_dag(cid) + .deserialized() + .await?; + + + if resolve_thumbnail { + ipfs + .cat_unixfs(image.link) + .max_length(MAX_THUMBNAIL_SIZE) + .await.map_err(anyhow::Error::from)?; + } + } + + // TODO: determine if we should also concurrently fetch the file + // if let Some(cid) = self + // .reference + // .as_ref() + // .and_then(|cid| Cid::from_str(cid).ok()) + // { + // } + Ok(()) + } + pub fn to_attachment(&self) -> Result { let data = self.reference.clone().ok_or(Error::FileNotFound)?; Ok(FileAttachmentDocument { @@ -268,7 +355,6 @@ impl FileDocument { file.set_thumbnail_reference(&IpfsPath::from(cid).to_string()); let image: ImageDag = ipfs .get_dag(cid) - .timeout(Duration::from_secs(10)) .deserialized() .await?; @@ -277,7 +363,6 @@ impl FileDocument { if resolve_thumbnail { let data = ipfs .cat_unixfs(image.link) - .timeout(Duration::from_secs(10)) .max_length(MAX_THUMBNAIL_SIZE) .await .unwrap_or_default();