Skip to content

Commit

Permalink
chore: resolve conversation list and file index
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 13, 2024
1 parent c76b33a commit 87e5c93
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 6 deletions.
27 changes: 27 additions & 0 deletions extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ impl RootDocument {
}
});

let fut_conversation_list =
futures::future::ready(self.conversations.ok_or(Error::Other)).and_then(|document| {
let ipfs = ipfs.clone();
async move {
ipfs.get_dag(document)
.await
.map_err(anyhow::Error::from)
.map_err(Error::from)
}
});

let fut_blocked_by_list = futures::future::ready(self.block_by.ok_or(Error::Other))
.and_then(|document| {
let ipfs = ipfs.clone();
Expand All @@ -316,6 +327,20 @@ impl RootDocument {
}
});

let fut_file_index = futures::future::ready(self.file_index.ok_or(Error::Other)).and_then(|document| {
let ipfs = ipfs.clone();
async move {
let index: DirectoryDocument = ipfs.get_dag(document).deserialized()
.await
.map_err(anyhow::Error::from)
.map_err(Error::from)?;

index.reconstruct_document_path(&ipfs, true).await?;

Ok(())
}
});

let fut_keystore = futures::future::ready(self.keystore.ok_or(Error::Other)).and_then(
|document| {
let ipfs = ipfs.clone();
Expand Down Expand Up @@ -348,7 +373,9 @@ impl RootDocument {

let _ = tokio::join!(
fut_friends,
fut_file_index,
fut_block_list,
fut_conversation_list,
fut_blocked_by_list,
fut_requests_list,
fut_keystore
Expand Down
97 changes: 91 additions & 6 deletions extensions/warp-ipfs/src/store/document/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use chrono::{DateTime, Utc};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use ipld_core::cid::Cid;
use pollable_map::futures::FutureMap;
use rust_ipfs::{Ipfs, IpfsPath};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
Expand Down Expand Up @@ -85,7 +86,6 @@ impl DirectoryDocument {
if let Some(cid) = self.items {
let list = ipfs
.get_dag(cid)
.timeout(Duration::from_secs(10))
.deserialized::<Vec<ItemDocument>>()
.await
.unwrap_or_default();
Expand All @@ -107,7 +107,6 @@ impl DirectoryDocument {
directory.set_thumbnail_reference(&IpfsPath::from(cid).to_string());
let image: ImageDag = ipfs
.get_dag(cid)
.timeout(Duration::from_secs(10))
.deserialized()
.await?;

Expand All @@ -116,7 +115,6 @@ impl DirectoryDocument {
if resolve_thumbnail {
let data = ipfs
.cat_unixfs(image.link)
.timeout(Duration::from_secs(10))
.max_length(MAX_THUMBNAIL_SIZE)
.await
.unwrap_or_default();
Expand All @@ -128,9 +126,49 @@ impl DirectoryDocument {
directory.rebuild_paths(&None);
Ok(directory)
}

// #[async_recursion::async_recursion]
pub(crate) async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> {
if let Some(cid) = self.items {
let list = ipfs
.get_dag(cid)
.deserialized::<Vec<ItemDocument>>()
.await
.unwrap_or_default();

let items_resolved = FuturesUnordered::from_iter(
list.into_iter()
.map(|item| item.reconstruct_document_path(ipfs, resolve_thumbnail).into_future()),
);

futures::pin_mut!(items_resolved);

while let Some(item) = items_resolved.next().await {
if let Err(e) = item {
tracing::warn!(error = %e, "unable to reconstruct item document");
}
}
}

if let Some(cid) = self.thumbnail {
let image: ImageDag = ipfs
.get_dag(cid)
.deserialized()
.await?;

if resolve_thumbnail {
ipfs
.cat_unixfs(image.link)
.max_length(MAX_THUMBNAIL_SIZE)
.await
.unwrap_or_default();
}
}
Ok(())
}
}

#[derive(Clone, Debug, Copy, Deserialize, Serialize)]
#[derive(Clone, Debug, Copy, Deserialize, Serialize, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ItemDocument {
Directory(Cid),
Expand All @@ -155,6 +193,29 @@ impl ItemDocument {
Ok(document)
}

pub async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> {
match self {
ItemDocument::Directory(cid) => {
let document: DirectoryDocument = ipfs
.get_dag(cid)
.deserialized()
.await
.map_err(anyhow::Error::from)?;

document.reconstruct_document_path(ipfs, resolve_thumbnail).await
}
ItemDocument::File(cid) => {
let document: FileDocument = ipfs
.get_dag(cid)
.deserialized()
.await
.map_err(anyhow::Error::from)?;

document.reconstruct_document_path(ipfs, resolve_thumbnail).await
}
}
}

pub async fn resolve(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<Item, Error> {
let item = match *self {
ItemDocument::Directory(cid) => {
Expand Down Expand Up @@ -241,6 +302,32 @@ impl FileDocument {
Ok(document)
}

pub async fn reconstruct_document_path(&self, ipfs: &Ipfs, resolve_thumbnail: bool) -> Result<(), Error> {
if let Some(cid) = self.thumbnail {
let image: ImageDag = ipfs
.get_dag(cid)
.deserialized()
.await?;


if resolve_thumbnail {
ipfs
.cat_unixfs(image.link)
.max_length(MAX_THUMBNAIL_SIZE)
.await.map_err(anyhow::Error::from)?;
}
}

// TODO: determine if we should also concurrently fetch the file
// if let Some(cid) = self
// .reference
// .as_ref()
// .and_then(|cid| Cid::from_str(cid).ok())
// {
// }
Ok(())
}

pub fn to_attachment(&self) -> Result<FileAttachmentDocument, Error> {
let data = self.reference.clone().ok_or(Error::FileNotFound)?;
Ok(FileAttachmentDocument {
Expand Down Expand Up @@ -268,7 +355,6 @@ impl FileDocument {
file.set_thumbnail_reference(&IpfsPath::from(cid).to_string());
let image: ImageDag = ipfs
.get_dag(cid)
.timeout(Duration::from_secs(10))
.deserialized()
.await?;

Expand All @@ -277,7 +363,6 @@ impl FileDocument {
if resolve_thumbnail {
let data = ipfs
.cat_unixfs(image.link)
.timeout(Duration::from_secs(10))
.max_length(MAX_THUMBNAIL_SIZE)
.await
.unwrap_or_default();
Expand Down

0 comments on commit 87e5c93

Please sign in to comment.