From 021d57041dce757d0a9f671400c50e906bc92d15 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 6 Nov 2023 14:36:44 -0500 Subject: [PATCH] chore: Update dependency (#356) --- Cargo.toml | 2 +- .../warp-ipfs/src/behaviour/phonebook.rs | 5 +- extensions/warp-ipfs/src/lib.rs | 1 - .../warp-ipfs/src/store/conversation.rs | 45 ++- extensions/warp-ipfs/src/store/discovery.rs | 6 + extensions/warp-ipfs/src/store/document.rs | 147 ++++---- .../warp-ipfs/src/store/document/cache.rs | 318 +++++++++--------- .../src/store/document/conversation.rs | 64 +++- .../warp-ipfs/src/store/document/identity.rs | 2 - .../warp-ipfs/src/store/document/image_dag.rs | 20 +- .../warp-ipfs/src/store/document/root.rs | 206 +++++++++--- .../warp-ipfs/src/store/document/utils.rs | 97 ------ extensions/warp-ipfs/src/store/files.rs | 83 ++--- extensions/warp-ipfs/src/store/identity.rs | 18 +- extensions/warp-ipfs/src/store/message.rs | 12 +- 15 files changed, 522 insertions(+), 504 deletions(-) delete mode 100644 extensions/warp-ipfs/src/store/document/utils.rs diff --git a/Cargo.toml b/Cargo.toml index a96b6b9e1..889188e01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ either = "1" void = "1" #ipfs dependency -rust-ipfs = "0.6.0" +rust-ipfs = "0.7.0" # Blink related crates # av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data diff --git a/extensions/warp-ipfs/src/behaviour/phonebook.rs b/extensions/warp-ipfs/src/behaviour/phonebook.rs index 09da18254..686e1b562 100644 --- a/extensions/warp-ipfs/src/behaviour/phonebook.rs +++ b/extensions/warp-ipfs/src/behaviour/phonebook.rs @@ -7,7 +7,7 @@ use rust_ipfs::libp2p::{ core::Endpoint, swarm::{ derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId, - FromSwarm, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + FromSwarm, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, }; @@ -170,7 +170,7 @@ impl NetworkBehaviour for Behaviour { ) { } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, @@ -220,7 +220,6 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, cx: &mut Context, - _: &mut impl PollParameters, ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 5b0bf65ab..178da780d 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -282,7 +282,6 @@ impl WarpIpfs { } idconfig })) - .with_autonat() .with_bitswap(None) .with_kademlia( Some(either::Either::Left(KadConfig { diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index eeb70829d..3aa836708 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -22,12 +22,7 @@ use warp::{ use crate::store::ecdh_encrypt; -use super::{ - document::utils::{GetIpldDag, GetLocalDag, ToCid}, - ecdh_decrypt, - keystore::Keystore, - verify_serde_sig, -}; +use super::{ecdh_decrypt, keystore::Keystore, verify_serde_sig}; #[derive(Debug, Clone, Serialize, Deserialize, Eq)] pub struct ConversationDocument { @@ -300,14 +295,29 @@ impl ConversationDocument { pub async fn get_message_list(&self, ipfs: &Ipfs) -> Result, Error> { match self.messages { - Some(cid) => cid.get_local_dag(ipfs).await, + Some(cid) => ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .map_err(anyhow::Error::from) + .map_err(Error::from), None => Ok(BTreeSet::new()), } } pub async fn get_raw_message_list(&self, ipfs: &Ipfs) -> Result { match self.messages { - Some(cid) => cid.get_ipld_dag(ipfs).await, + Some(cid) => ipfs + .dag() + .get() + .path(cid) + .local() + .await + .map_err(anyhow::Error::from) + .map_err(Error::from), None => Ok(Ipld::List(vec![])), } } @@ -317,7 +327,7 @@ impl ConversationDocument { list: BTreeSet, ) -> Result<(), Error> { self.modified = Utc::now(); - let cid = list.to_cid(ipfs).await?; + let cid = ipfs.dag().put().serialize(list)?.await?; self.messages = Some(cid); Ok(()) } @@ -531,7 +541,10 @@ impl ConversationDocument { pub async fn delete_all_message(&mut self, ipfs: Ipfs) -> Result<(), Error> { let cid = std::mem::take(&mut self.messages); let (cid, messages): (Cid, BTreeSet) = match cid { - Some(cid) => (cid, cid.get_local_dag(&ipfs).await?), + Some(cid) => ( + cid, + ipfs.dag().get().path(cid).local().deserialized().await?, + ), None => return Ok(()), }; @@ -611,7 +624,7 @@ impl MessageDocument { None => ecdh_encrypt(&did, Some(&sender), &bytes)?, }; - let message = data.to_cid(ipfs).await?; + let message = ipfs.dag().put().serialize(data)?.await?; let sender = DIDEd25519Reference::from_did(&sender); @@ -664,7 +677,7 @@ impl MessageDocument { None => ecdh_encrypt(did, Some(&self.sender.to_did()), &bytes)?, }; - let message_cid = data.to_cid(ipfs).await?; + let message_cid = ipfs.dag().put().serialize(data)?.await?; info!("Setting Message to document"); self.message = message_cid; @@ -678,7 +691,13 @@ impl MessageDocument { did: &DID, keystore: Option<&Keystore>, ) -> Result { - let bytes: Vec = self.message.get_local_dag(ipfs).await?; + let bytes: Vec = ipfs + .dag() + .get() + .path(self.message) + .local() + .deserialized() + .await?; let sender = self.sender.to_did(); let data = match keystore { diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 5c71f012a..f18284ab5 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -254,6 +254,12 @@ impl Discovery { return Ok(()); } + let own_peer_id = self.ipfs.keypair()?.public().to_peer_id(); + + if peer_id == own_peer_id { + return Ok(()); + } + let entry = DiscoveryEntry::new( &self.ipfs, peer_id, diff --git a/extensions/warp-ipfs/src/store/document.rs b/extensions/warp-ipfs/src/store/document.rs index 3e48d5829..dc16c9136 100644 --- a/extensions/warp-ipfs/src/store/document.rs +++ b/extensions/warp-ipfs/src/store/document.rs @@ -3,18 +3,14 @@ pub mod conversation; pub mod identity; pub mod image_dag; pub mod root; -pub mod utils; use chrono::{DateTime, Utc}; use futures::TryFutureExt; -use ipfs::{Ipfs, IpfsPath}; -use libipld::{ - serde::{from_ipld, to_ipld}, - Cid, -}; +use ipfs::Ipfs; +use libipld::Cid; use rust_ipfs as ipfs; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{collections::BTreeMap, str::FromStr, time::Duration}; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, str::FromStr}; use uuid::Uuid; use warp::{ crypto::{did_key::CoreSign, DID}, @@ -24,59 +20,10 @@ use warp::{ use crate::store::get_keypair_did; -use self::{identity::IdentityDocument, utils::GetLocalDag}; +use self::identity::IdentityDocument; use super::{identity::Request, keystore::Keystore}; -#[async_trait::async_trait] -pub(crate) trait ToCid: Sized { - async fn to_cid(&self, ipfs: &Ipfs) -> Result; -} - -#[async_trait::async_trait] -pub(crate) trait GetDag: Sized { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result; -} - -#[async_trait::async_trait] -impl GetDag for Cid { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result { - IpfsPath::from(*self).get_dag(ipfs, timeout).await - } -} - -#[async_trait::async_trait] -impl GetDag for &Cid { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result { - IpfsPath::from(**self).get_dag(ipfs, timeout).await - } -} - -#[async_trait::async_trait] -impl GetDag for IpfsPath { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result { - let timeout = timeout.unwrap_or(std::time::Duration::from_secs(10)); - match tokio::time::timeout(timeout, ipfs.get_dag(self.clone())).await { - Ok(Ok(ipld)) => from_ipld(ipld) - .map_err(anyhow::Error::from) - .map_err(Error::from), - Ok(Err(e)) => Err(Error::Any(e)), - Err(e) => Err(Error::from(anyhow::anyhow!("Timeout at {e}"))), - } - } -} - -#[async_trait::async_trait] -impl ToCid for T -where - T: Serialize + Clone + Send + Sync, -{ - async fn to_cid(&self, ipfs: &Ipfs) -> Result { - let ipld = to_ipld(self.clone()).map_err(anyhow::Error::from)?; - ipfs.put_dag(ipld).await.map_err(Error::from) - } -} - #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct ExtractedRootDocument { pub identity: Identity, @@ -159,9 +106,12 @@ impl RootDocument { #[tracing::instrument(skip(self, ipfs))] pub async fn verify(&self, ipfs: &Ipfs) -> Result<(), Error> { - let identity: IdentityDocument = self - .identity - .get_local_dag(ipfs) + let identity: IdentityDocument = ipfs + .dag() + .get() + .path(self.identity) + .local() + .deserialized() .await .map_err(|_| Error::IdentityInvalid)?; @@ -196,44 +146,85 @@ impl RootDocument { ), Error, > { - let document: IdentityDocument = self - .identity - .get_local_dag(ipfs) + let document: IdentityDocument = ipfs + .dag() + .get() + .path(self.identity) + .local() + .deserialized() .await .map_err(|_| Error::IdentityInvalid)?; let identity = document.resolve()?; let friends = futures::future::ready(self.friends.ok_or(Error::Other)) - .and_then(|document| async move { document.get_local_dag(ipfs).await }) + .and_then(|document| async move { + ipfs.dag() + .get() + .path(document) + .local() + .deserialized() + .await + .map_err(Error::from) + }) .await .unwrap_or_default(); let block_list = futures::future::ready(self.blocks.ok_or(Error::Other)) - .and_then(|document| async move { document.get_local_dag(ipfs).await }) + .and_then(|document| async move { + ipfs.dag() + .get() + .path(document) + .local() + .deserialized() + .await + .map_err(Error::from) + }) .await .unwrap_or_default(); let block_by_list = futures::future::ready(self.block_by.ok_or(Error::Other)) - .and_then(|document| async move { document.get_local_dag(ipfs).await }) + .and_then(|document| async move { + ipfs.dag() + .get() + .path(document) + .local() + .deserialized() + .await + .map_err(Error::from) + }) .await .unwrap_or_default(); let request = futures::future::ready(self.request.ok_or(Error::Other)) - .and_then(|document| async move { document.get_local_dag(ipfs).await }) + .and_then(|document| async move { + ipfs.dag() + .get() + .path(document) + .local() + .deserialized() + .await + .map_err(Error::from) + }) .await .unwrap_or_default(); let conversation_keystore = futures::future::ready(self.conversations_keystore.ok_or(Error::Other)) .and_then(|document| async move { - let map: BTreeMap = document.get_local_dag(ipfs).await?; + let map: BTreeMap = ipfs + .dag() + .get() + .path(document) + .local() + .deserialized() + .await?; let mut resolved_map: BTreeMap = BTreeMap::new(); for (k, v) in map .iter() .filter_map(|(k, v)| Uuid::from_str(k).map(|k| (k, *v)).ok()) { - if let Ok(store) = v.get_local_dag(ipfs).await { + if let Ok(store) = ipfs.dag().get().path(v).local().deserialized().await { resolved_map.insert(k, store); } } @@ -264,7 +255,7 @@ impl RootDocument { let document = document.sign(&did_kp)?; - let identity = document.to_cid(ipfs).await?; + let identity = ipfs.dag().put().serialize(document)?.await?; let has_friends = !data.friends.is_empty(); let has_blocks = !data.block_list.is_empty(); let has_block_by_list = !data.block_by_list.is_empty(); @@ -272,27 +263,29 @@ impl RootDocument { let has_keystore = !data.conversation_keystore.is_empty(); let friends = has_friends - .then_some(data.friends.to_cid(ipfs).await.ok()) + .then_some(ipfs.dag().put().serialize(data.friends)?.await.ok()) .flatten(); + let blocks = has_blocks - .then_some(data.block_list.to_cid(ipfs).await.ok()) + .then_some(ipfs.dag().put().serialize(data.block_list)?.await.ok()) .flatten(); let block_by = has_block_by_list - .then_some(data.block_by_list.to_cid(ipfs).await.ok()) + .then_some(ipfs.dag().put().serialize(data.block_by_list)?.await.ok()) .flatten(); let request = has_requests - .then_some(data.request.to_cid(ipfs).await.ok()) + .then_some(ipfs.dag().put().serialize(data.request)?.await.ok()) .flatten(); let conversations_keystore = has_keystore .then_some({ let mut pointer_map: BTreeMap = BTreeMap::new(); for (k, v) in data.conversation_keystore { - if let Ok(cid) = v.to_cid(ipfs).await { + if let Ok(cid) = ipfs.dag().put().serialize(v)?.await { pointer_map.insert(k.to_string(), cid); } } - pointer_map.to_cid(ipfs).await.ok() + + ipfs.dag().put().serialize(pointer_map)?.await.ok() }) .flatten(); diff --git a/extensions/warp-ipfs/src/store/document/cache.rs b/extensions/warp-ipfs/src/store/document/cache.rs index e71fa5a2f..dce36fa16 100644 --- a/extensions/warp-ipfs/src/store/document/cache.rs +++ b/extensions/warp-ipfs/src/store/document/cache.rs @@ -8,10 +8,10 @@ use futures::{ SinkExt, StreamExt, }; use libipld::Cid; -use rust_ipfs::Ipfs; +use rust_ipfs::{Ipfs, IpfsPath}; use warp::{crypto::DID, error::Error}; -use super::{identity::IdentityDocument, utils::GetLocalDag, ToCid}; +use super::identity::IdentityDocument; #[allow(clippy::large_enum_variant)] enum IdentityCacheCommand { @@ -149,183 +149,183 @@ impl IdentityCacheTask { while let Some(command) = self.rx.next().await { match command { IdentityCacheCommand::Insert { document, response } => { - if let Err(e) = document.verify() { - let _ = response.send(Err(e)); - continue; - } - - let mut list: HashSet = match self.list { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), - None => HashSet::new(), - }; - - let old_document = list - .iter() - .find(|old_doc| { - document.did == old_doc.did && document.short_id == old_doc.short_id - }) - .cloned(); - - match old_document { - Some(old_document) => { - if !old_document.different(&document) { - let _ = response.send(Ok(None)); - continue; - } - - list.replace(document); - - let cid = match list.to_cid(&self.ipfs).await { - Ok(cid) => cid, - Err(e) => { - let _ = response.send(Err(e)); - continue; - } - }; - - let old_cid = self.list.take(); - - let remove_pin_and_block = async { - if let Some(old_cid) = old_cid { - if self.ipfs.is_pinned(&old_cid).await? { - self.ipfs.remove_pin(&old_cid, false).await?; - } - // Do we want to remove the old block? - self.ipfs.remove_block(old_cid).await?; - } - Ok::<_, Error>(()) - }; - - if let Err(e) = remove_pin_and_block.await { - let _ = response.send(Err(e)); - continue; - } - - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await - { - tracing::log::error!("Error writing cid to file: {e}"); - } - } - - self.list = Some(cid); - - let _ = response.send(Ok(Some(old_document.clone()))); - } - None => { - list.insert(document); - - let cid = match list.to_cid(&self.ipfs).await { - Ok(cid) => cid, - Err(e) => { - let _ = response.send(Err(e)); - continue; - } - }; - - if let Err(e) = self.ipfs.insert_pin(&cid, false).await { - let _ = response.send(Err(e.into())); - continue; - } - - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await - { - tracing::log::error!("Error writing cid to file: {e}"); - } - } - - self.list = Some(cid); - - let _ = response.send(Ok(None)); - } - } + _ = response.send(self.insert(document).await) } IdentityCacheCommand::Get { did, response } => { - let list: HashSet = match self.list { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), - None => HashSet::new(), - }; - - let document = list - .iter() - .find(|document| document.did == did) - .cloned() - .ok_or(Error::IdentityDoesntExist); - - let _ = response.send(document); + let _ = response.send(self.get(did).await); } IdentityCacheCommand::Remove { did, response } => { - let mut list: HashSet = match self.list { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), - None => { - let _ = response.send(Err(Error::IdentityDoesntExist)); - continue; - } - }; + let _ = response.send(self.remove(did).await); + } + IdentityCacheCommand::List { response } => { + let _ = response.send(self.list().await); + } + } + } + } - let old_document = list.iter().find(|document| document.did == did).cloned(); + async fn insert( + &mut self, + document: IdentityDocument, + ) -> Result, Error> { + document.verify()?; + + let mut list: HashSet = match self.list { + Some(cid) => self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await + .unwrap_or_default(), + None => HashSet::new(), + }; - if old_document.is_none() { - let _ = response.send(Err(Error::IdentityDoesntExist)); - continue; - } + let old_document = list + .iter() + .find(|old_doc| document.did == old_doc.did && document.short_id == old_doc.short_id) + .cloned(); - let document = old_document.expect("Exist"); + match old_document { + Some(old_document) => { + if !old_document.different(&document) { + return Ok(None); + } - if !list.remove(&document) { - let _ = response.send(Err(Error::IdentityDoesntExist)); - continue; - } + list.replace(document); - let cid = match list.to_cid(&self.ipfs).await { - Ok(cid) => cid, - Err(e) => { - let _ = response.send(Err(e)); - continue; - } - }; + let cid = self.ipfs.dag().put().serialize(list)?.await?; - let old_cid = self.list.take(); + let old_cid = self.list.take(); - let remove_pin_and_block = async { - if let Some(old_cid) = old_cid { - if self.ipfs.is_pinned(&old_cid).await? { - self.ipfs.remove_pin(&old_cid, false).await?; - } - // Do we want to remove the old block? - self.ipfs.remove_block(old_cid).await?; + let remove_pin_and_block = async { + if let Some(old_cid) = old_cid { + if self.ipfs.is_pinned(&old_cid).await? { + self.ipfs.remove_pin(&old_cid, false).await?; } - Ok::<_, Error>(()) - }; - - if let Err(e) = remove_pin_and_block.await { - let _ = response.send(Err(e)); - continue; + // Do we want to remove the old block? + self.ipfs.remove_block(old_cid, false).await?; } + Ok::<_, Error>(()) + }; - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await { - tracing::log::error!("Error writing cid to file: {e}"); - } + remove_pin_and_block.await?; + + if let Some(path) = self.path.as_ref() { + let cid = cid.to_string(); + if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await { + tracing::log::error!("Error writing cid to file: {e}"); } + } - self.list = Some(cid); + self.list = Some(cid); - let _ = response.send(Ok(())); - } - IdentityCacheCommand::List { response } => { - let list: HashSet = match self.list { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), - None => HashSet::new(), - }; + Ok(Some(old_document.clone())) + } + None => { + list.insert(document); + + let cid = self.ipfs.dag().put().serialize(list)?.await?; - let _ = response.send(Ok(Vec::from_iter(list))); + self.ipfs.insert_pin(&cid, false).await?; + + if let Some(path) = self.path.as_ref() { + let cid = cid.to_string(); + if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await { + tracing::log::error!("Error writing cid to file: {e}"); + } } + + self.list = Some(cid); + + Ok(None) } } } + + async fn get(&self, did: DID) -> Result { + let list: HashSet = match self.list { + Some(cid) => self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await + .unwrap_or_default(), + None => HashSet::new(), + }; + + let document = list + .iter() + .find(|document| document.did == did) + .cloned() + .ok_or(Error::IdentityDoesntExist)?; + + Ok(document) + } + + async fn remove(&mut self, did: DID) -> Result<(), Error> { + let mut list: HashSet = match self.list { + Some(cid) => self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await + .unwrap_or_default(), + None => { + return Err(Error::IdentityDoesntExist); + } + }; + + let old_document = list.iter().find(|document| document.did == did).cloned(); + + if old_document.is_none() { + return Err(Error::IdentityDoesntExist); + } + + let document = old_document.expect("Exist"); + + if !list.remove(&document) { + return Err(Error::IdentityDoesntExist); + } + + let cid = self.ipfs.dag().put().serialize(list)?.await?; + + let old_cid = self.list.take(); + + if let Some(old_cid) = old_cid { + if self.ipfs.is_pinned(&old_cid).await? { + self.ipfs.remove_pin(&old_cid, false).await?; + } + // Do we want to remove the old block? + self.ipfs.remove_block(old_cid, false).await?; + } + + if let Some(path) = self.path.as_ref() { + let cid = cid.to_string(); + if let Err(e) = tokio::fs::write(path.join(".cache_id"), cid).await { + tracing::log::error!("Error writing cid to file: {e}"); + } + } + + self.list = Some(cid); + + Ok(()) + } + + async fn list(&self) -> Result, Error> { + let list: HashSet = match self.list { + Some(cid) => self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await + .unwrap_or_default(), + None => HashSet::new(), + }; + + Ok(Vec::from_iter(list)) + } } diff --git a/extensions/warp-ipfs/src/store/document/conversation.rs b/extensions/warp-ipfs/src/store/document/conversation.rs index efc066267..881fcdabc 100644 --- a/extensions/warp-ipfs/src/store/document/conversation.rs +++ b/extensions/warp-ipfs/src/store/document/conversation.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, + future::IntoFuture, path::PathBuf, sync::Arc, }; @@ -7,7 +8,7 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, stream::FuturesUnordered, - FutureExt, SinkExt, StreamExt, + SinkExt, StreamExt, }; use libipld::Cid; use rust_ipfs::{Ipfs, IpfsPath}; @@ -21,7 +22,7 @@ use warp::{ use crate::store::{conversation::ConversationDocument, keystore::Keystore}; -use super::{root::RootDocumentMap, utils::GetLocalDag, ToCid}; +use super::root::RootDocumentMap; #[allow(clippy::large_enum_variant)] enum ConversationCommand { @@ -256,7 +257,7 @@ impl ConversationTask { let path = IpfsPath::from(cid).sub_path(&id.to_string())?; - let document: ConversationDocument = path.get_local_dag(&self.ipfs).await?; + let document: ConversationDocument = self.ipfs.get_dag(path).local().deserialized().await?; document.verify()?; Ok(document) } @@ -277,7 +278,7 @@ impl ConversationTask { let mut map = self.root.get_conversation_keystore_map().await?; let id = id.to_string(); - let cid = document.to_cid(&self.ipfs).await?; + let cid = self.ipfs.dag().put().serialize(document)?.await?; map.insert(id, cid); @@ -290,7 +291,12 @@ impl ConversationTask { None => return Err(Error::InvalidConversation), }; - let mut conversation_map: BTreeMap = cid.get_local_dag(&self.ipfs).await?; + let mut conversation_map: BTreeMap = self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await?; let document_cid = match conversation_map.remove(&id.to_string()) { Some(cid) => cid, @@ -307,7 +313,12 @@ impl ConversationTask { } } - let document: ConversationDocument = document_cid.get_local_dag(&self.ipfs).await?; + let document: ConversationDocument = self + .ipfs + .get_dag(IpfsPath::from(document_cid)) + .local() + .deserialized() + .await?; Ok(document) } @@ -317,13 +328,20 @@ impl ConversationTask { None => return Ok(Vec::new()), }; - let conversation_map: BTreeMap = cid.get_local_dag(&self.ipfs).await?; - - let list = FuturesUnordered::from_iter( - conversation_map - .values() - .map(|cid| (*cid).get_local_dag(&self.ipfs).boxed()), - ) + let conversation_map: BTreeMap = self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await?; + + let list = FuturesUnordered::from_iter(conversation_map.values().map(|cid| { + self.ipfs + .get_dag(IpfsPath::from(*cid)) + .local() + .deserialized() + .into_future() + })) .filter_map(|result: Result| async move { result.ok() }) .collect::>() .await; @@ -337,7 +355,13 @@ impl ConversationTask { None => return false, }; - let conversation_map: BTreeMap = match cid.get_local_dag(&self.ipfs).await { + let conversation_map: BTreeMap = match self + .ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await + { Ok(document) => document, Err(_) => return false, }; @@ -350,7 +374,7 @@ impl ConversationTask { } async fn set_map(&mut self, map: BTreeMap) -> Result<(), Error> { - let cid = map.to_cid(&self.ipfs).await?; + let cid = self.ipfs.dag().put().serialize(map)?.await?; let old_map_cid = self.cid.replace(cid); @@ -386,12 +410,18 @@ impl ConversationTask { document.verify()?; let mut map = match self.cid { - Some(cid) => cid.get_local_dag(&self.ipfs).await?, + Some(cid) => { + self.ipfs + .get_dag(IpfsPath::from(cid)) + .local() + .deserialized() + .await? + } None => BTreeMap::new(), }; let id = document.id().to_string(); - let cid = document.to_cid(&self.ipfs).await?; + let cid = self.ipfs.dag().put().serialize(document)?.await?; map.insert(id, cid); diff --git a/extensions/warp-ipfs/src/store/document/identity.rs b/extensions/warp-ipfs/src/store/document/identity.rs index 5951192dd..d878bcdcb 100644 --- a/extensions/warp-ipfs/src/store/document/identity.rs +++ b/extensions/warp-ipfs/src/store/document/identity.rs @@ -208,8 +208,6 @@ pub async fn unixfs_fetch( let mut stream = ipfs .unixfs() .cat(IpfsPath::from(cid), None, peers, local, timeout) - .await - .map_err(anyhow::Error::from)? .boxed(); let mut data = vec![]; diff --git a/extensions/warp-ipfs/src/store/document/image_dag.rs b/extensions/warp-ipfs/src/store/document/image_dag.rs index 87dc7868a..30e752b21 100644 --- a/extensions/warp-ipfs/src/store/document/image_dag.rs +++ b/extensions/warp-ipfs/src/store/document/image_dag.rs @@ -1,15 +1,12 @@ use futures::{stream::BoxStream, StreamExt}; use libipld::{serde::to_ipld, Cid}; -use rust_ipfs::{Ipfs, PeerId}; +use rust_ipfs::{Ipfs, IpfsPath, PeerId}; use serde::{Deserialize, Serialize}; use std::task::Poll; use tracing::log; use warp::{constellation::file::FileType, error::Error, multipass::identity::IdentityImage}; -use super::{ - identity::unixfs_fetch, - utils::{GetDag, GetLocalDag}, -}; +use super::identity::unixfs_fetch; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct ImageDag { @@ -25,7 +22,7 @@ pub async fn store_photo( file_type: FileType, limit: Option, ) -> Result { - let mut stream = ipfs.add_unixfs(stream).await?; + let mut stream = ipfs.add_unixfs(stream); let mut size = 0; @@ -106,10 +103,13 @@ pub async fn get_image( local: bool, limit: Option, ) -> Result { - let dag: ImageDag = match local { - true => cid.get_local_dag(ipfs).await?, - false => cid.get_dag(ipfs, None).await?, - }; + let mut dag = ipfs.dag().get_dag(IpfsPath::from(cid)); + + if local { + dag = dag.local(); + } + + let dag: ImageDag = dag.deserialized().await?; match limit { Some(size) if dag.size > size as _ => { diff --git a/extensions/warp-ipfs/src/store/document/root.rs b/extensions/warp-ipfs/src/store/document/root.rs index 89963da7a..70d39f2aa 100644 --- a/extensions/warp-ipfs/src/store/document/root.rs +++ b/extensions/warp-ipfs/src/store/document/root.rs @@ -11,11 +11,7 @@ use warp::{crypto::DID, error::Error, multipass::identity::IdentityStatus}; use crate::store::{ecdh_encrypt, identity::Request, keystore::Keystore, VecExt}; -use super::{ - identity::IdentityDocument, - utils::{GetLocalDag, ToCid}, - ExtractedRootDocument, RootDocument, -}; +use super::{identity::IdentityDocument, ExtractedRootDocument, RootDocument}; #[allow(clippy::large_enum_variant)] pub enum RootDocumentCommand { @@ -471,7 +467,15 @@ impl RootDocumentTask { impl RootDocumentTask { async fn get_root_document(&self) -> Result { let document: RootDocument = match self.cid { - Some(cid) => cid.get_local_dag(&self.ipfs).await?, + Some(cid) => { + self.ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await? + } None => return Err(Error::Other), }; @@ -483,7 +487,8 @@ impl RootDocumentTask { async fn identity(&self) -> Result { let root = self.get_root_document().await?; let path = IpfsPath::from(root.identity); - let document: IdentityDocument = path.get_local_dag(&self.ipfs).await?; + let document: IdentityDocument = + self.ipfs.dag().get_dag(path).local().deserialized().await?; document.verify()?; Ok(document) @@ -497,7 +502,8 @@ impl RootDocumentTask { //Precautionary check document.verify(&self.ipfs).await?; - let root_cid = document.to_cid(&self.ipfs).await?; + let root_cid = self.ipfs.dag().put().serialize(document)?.await?; + if !self.ipfs.is_pinned(&root_cid).await? { self.ipfs.insert_pin(&root_cid, true).await?; } @@ -507,7 +513,7 @@ impl RootDocumentTask { if self.ipfs.is_pinned(&old_cid).await? { self.ipfs.remove_pin(&old_cid, true).await?; } - self.ipfs.remove_block(old_cid).await?; + self.ipfs.remove_block(old_cid, false).await?; } } @@ -529,7 +535,7 @@ impl RootDocumentTask { identity.status = Some(status); let identity = identity.sign(&self.keypair)?; - root.identity = identity.to_cid(&self.ipfs).await?; + root.identity = self.ipfs.dag().put().serialize(identity)?.await?; self.set_root_document(root).await } @@ -540,7 +546,15 @@ impl RootDocumentTask { None => return Ok(vec![]), }; let path = IpfsPath::from(cid).sub_path("request")?; - let list: Vec = path.get_local_dag(&self.ipfs).await.unwrap_or_default(); + let list: Vec = self + .ipfs + .dag() + .get() + .path(path) + .local() + .deserialized() + .await + .unwrap_or_default(); Ok(list) } @@ -548,7 +562,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.request; let mut list: Vec = match document.request { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -556,13 +578,14 @@ impl RootDocumentTask { return Err(Error::FriendRequestExist); } - document.request = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.request = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } @@ -573,7 +596,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.request; let mut list: Vec = match document.request { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -581,13 +612,14 @@ impl RootDocumentTask { return Err(Error::FriendRequestExist); } - document.request = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.request = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } @@ -600,7 +632,15 @@ impl RootDocumentTask { None => return Ok(vec![]), }; let path = IpfsPath::from(cid).sub_path("friends")?; - let list: Vec = path.get_local_dag(&self.ipfs).await.unwrap_or_default(); + let list: Vec = self + .ipfs + .dag() + .get() + .path(path) + .local() + .deserialized() + .await + .unwrap_or_default(); Ok(list) } @@ -608,7 +648,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.friends; let mut list: Vec = match document.friends { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -616,13 +664,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::FriendExist); } - document.friends = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.friends = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } @@ -633,7 +682,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.friends; let mut list: Vec = match document.friends { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -641,13 +698,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::FriendDoesntExist); } - document.friends = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.friends = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } @@ -660,7 +718,15 @@ impl RootDocumentTask { None => return Ok(vec![]), }; let path = IpfsPath::from(cid).sub_path("blocks")?; - let list: Vec = path.get_local_dag(&self.ipfs).await.unwrap_or_default(); + let list: Vec = self + .ipfs + .dag() + .get() + .path(path) + .local() + .deserialized() + .await + .unwrap_or_default(); Ok(list) } @@ -668,7 +734,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.blocks; let mut list: Vec = match document.blocks { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -676,13 +750,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::PublicKeyIsBlocked); } - document.blocks = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.blocks = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } Ok(()) @@ -692,7 +767,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.blocks; let mut list: Vec = match document.blocks { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -700,13 +783,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::PublicKeyIsntBlocked); } - document.blocks = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.blocks = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } Ok(()) @@ -718,7 +802,15 @@ impl RootDocumentTask { None => return Ok(vec![]), }; let path = IpfsPath::from(cid).sub_path("block_by")?; - let list: Vec = path.get_local_dag(&self.ipfs).await.unwrap_or_default(); + let list: Vec = self + .ipfs + .dag() + .get() + .path(path) + .local() + .deserialized() + .await + .unwrap_or_default(); Ok(list) } @@ -726,7 +818,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.block_by; let mut list: Vec = match document.block_by { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -734,13 +834,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::PublicKeyIsntBlocked); } - document.block_by = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.block_by = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } Ok(()) @@ -750,7 +851,15 @@ impl RootDocumentTask { let mut document = self.get_root_document().await?; let old_document = document.block_by; let mut list: Vec = match document.block_by { - Some(cid) => cid.get_local_dag(&self.ipfs).await.unwrap_or_default(), + Some(cid) => self + .ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), None => vec![], }; @@ -758,13 +867,14 @@ impl RootDocumentTask { return Err::<_, Error>(Error::PublicKeyIsntBlocked); } - document.block_by = (!list.is_empty()).then_some(list.to_cid(&self.ipfs).await?); + document.block_by = + (!list.is_empty()).then_some(self.ipfs.dag().put().serialize(list)?.await?); self.set_root_document(document).await?; if let Some(cid) = old_document { if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } Ok(()) @@ -772,7 +882,7 @@ impl RootDocumentTask { async fn set_conversation_keystore(&mut self, map: BTreeMap) -> Result<(), Error> { let mut document = self.get_root_document().await?; - document.conversations_keystore = Some(map.to_cid(&self.ipfs).await?); + document.conversations_keystore = Some(self.ipfs.dag().put().serialize(map)?.await?); self.set_root_document(document).await } @@ -784,7 +894,14 @@ impl RootDocumentTask { None => return Ok(BTreeMap::new()), }; - cid.get_local_dag(&self.ipfs).await + self.ipfs + .dag() + .get() + .path(cid) + .local() + .deserialized() + .await + .map_err(Error::from) } async fn get_conversation_keystore(&self, id: Uuid) -> Result { @@ -796,7 +913,14 @@ impl RootDocumentTask { }; let path = IpfsPath::from(cid).sub_path(&id.to_string())?; - path.get_local_dag(&self.ipfs).await + self.ipfs + .dag() + .get() + .path(path) + .local() + .deserialized() + .await + .map_err(Error::from) } async fn export(&self) -> Result { diff --git a/extensions/warp-ipfs/src/store/document/utils.rs b/extensions/warp-ipfs/src/store/document/utils.rs deleted file mode 100644 index 753c55def..000000000 --- a/extensions/warp-ipfs/src/store/document/utils.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::time::Duration; - -use libipld::{ - serde::{from_ipld, to_ipld}, - Cid, Ipld, -}; -use rust_ipfs::{Ipfs, IpfsPath}; -use serde::{de::DeserializeOwned, Serialize}; -use warp::error::Error; - -#[async_trait::async_trait] -pub(crate) trait ToCid: Sized { - async fn to_cid(&self, ipfs: &Ipfs) -> Result; -} - -#[async_trait::async_trait] -impl ToCid for T -where - T: Serialize + Clone + Send + Sync, -{ - async fn to_cid(&self, ipfs: &Ipfs) -> Result { - let ipld = to_ipld(self.clone()).map_err(anyhow::Error::from)?; - ipfs.put_dag(ipld).await.map_err(Error::from) - } -} - -#[async_trait::async_trait] -pub(crate) trait GetDag: Sized { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result; -} - -#[async_trait::async_trait] -impl GetDag for Cid { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result { - IpfsPath::from(*self).get_dag(ipfs, timeout).await - } -} - -#[async_trait::async_trait] -pub(crate) trait GetIpldDag: Sized { - async fn get_ipld_dag(&self, ipfs: &Ipfs) -> Result; -} - -#[async_trait::async_trait] -impl GetDag for IpfsPath { - async fn get_dag(&self, ipfs: &Ipfs, timeout: Option) -> Result { - let timeout = timeout.unwrap_or(std::time::Duration::from_secs(10)); - match tokio::time::timeout(timeout, ipfs.get_dag(self.clone())).await { - Ok(Ok(ipld)) => from_ipld(ipld) - .map_err(anyhow::Error::from) - .map_err(Error::from), - Ok(Err(e)) => Err(Error::Any(e)), - Err(e) => Err(Error::from(anyhow::anyhow!("Timeout at {e}"))), - } - } -} - -#[async_trait::async_trait] -pub(crate) trait GetLocalDag: Sized { - async fn get_local_dag(&self, ipfs: &Ipfs) -> Result; -} - -#[async_trait::async_trait] -impl GetLocalDag for Cid { - async fn get_local_dag(&self, ipfs: &Ipfs) -> Result { - IpfsPath::from(*self).get_local_dag(ipfs).await - } -} - -#[async_trait::async_trait] -impl GetLocalDag for IpfsPath { - async fn get_local_dag(&self, ipfs: &Ipfs) -> Result { - match ipfs.dag().get(self.clone(), &[], true).await { - Ok(ipld) => from_ipld(ipld) - .map_err(anyhow::Error::from) - .map_err(Error::from), - Err(e) => Err(Error::Any(e.into())), - } - } -} - -#[async_trait::async_trait] -impl GetIpldDag for Cid { - async fn get_ipld_dag(&self, ipfs: &Ipfs) -> Result { - IpfsPath::from(*self).get_ipld_dag(ipfs).await - } -} - -#[async_trait::async_trait] -impl GetIpldDag for IpfsPath { - async fn get_ipld_dag(&self, ipfs: &Ipfs) -> Result { - match ipfs.dag().get(self.clone(), &[], true).await { - Ok(ipld) => Ok(ipld), - Err(e) => Err(Error::from(anyhow::anyhow!("{e}"))), - } - } -} diff --git a/extensions/warp-ipfs/src/store/files.rs b/extensions/warp-ipfs/src/store/files.rs index 3b407bd6d..d0371aea7 100644 --- a/extensions/warp-ipfs/src/store/files.rs +++ b/extensions/warp-ipfs/src/store/files.rs @@ -124,8 +124,6 @@ impl FileStore { .ipfs .unixfs() .cat(IpfsPath::from(cid), None, &[], true, None) - .await - .map_err(anyhow::Error::from)? .boxed(); let mut data = vec![]; @@ -154,17 +152,13 @@ impl FileStore { let data_stream = stream::once(async move { Ok::<_, std::io::Error>(data) }).boxed(); - let mut stream = self - .ipfs - .unixfs() - .add( - AddOpt::Stream(data_stream), - Some(AddOption { - pin: true, - ..Default::default() - }), - ) - .await?; + let mut stream = self.ipfs.unixfs().add( + AddOpt::Stream(data_stream), + Some(AddOption { + pin: true, + ..Default::default() + }), + ); let mut ipfs_path = None; @@ -231,7 +225,7 @@ impl FileStore { } for cid in pinned_blocks { - self.ipfs.remove_block(cid).await?; + self.ipfs.remove_block(cid, false).await?; } } } @@ -343,17 +337,7 @@ impl FileStore { let mut total_written = 0; let mut returned_path = None; - let mut stream = match ipfs.unixfs().add(path.clone(), Some(AddOption { pin: true, ..Default::default() })).await { - Ok(ste) => ste, - Err(e) => { - yield Progression::ProgressFailed { - name, - last_size: Some(last_written), - error: Some(e.to_string()), - }; - return; - } - }; + let mut stream = ipfs.unixfs().add(path.clone(), Some(AddOption { pin: true, ..Default::default() })); while let Some(status) = stream.next().await { let name = name.clone(); @@ -468,9 +452,7 @@ impl FileStore { let file = item.get_file()?; let reference = file.reference().ok_or(Error::Other)?; //Reference not found - let mut stream = ipfs - .get_unixfs(reference.parse::()?, path) - .await?; + let mut stream = ipfs.get_unixfs(reference.parse::()?, path); while let Some(status) = stream.next().await { if let UnixfsStatus::FailedStatus { error, .. } = status { @@ -533,16 +515,13 @@ impl FileStore { let mut total_written = 0; let mut returned_path = None; - let mut stream = ipfs - .unixfs() - .add( - reader, - Some(AddOption { - pin: true, - ..Default::default() - }), - ) - .await?; + let mut stream = ipfs.unixfs().add( + reader, + Some(AddOption { + pin: true, + ..Default::default() + }), + ); while let Some(status) = stream.next().await { match status { @@ -592,10 +571,7 @@ impl FileStore { let item = self.current_directory()?.get_item_by_path(name)?; let file = item.get_file()?; let reference = file.reference().ok_or(Error::Other)?; //Reference not found - let stream = ipfs - .cat_unixfs(reference.parse::()?, None) - .await - .map_err(anyhow::Error::from)?; + let stream = ipfs.cat_unixfs(reference.parse::()?, None); pin_mut!(stream); let mut buffer = vec![]; @@ -654,17 +630,7 @@ impl FileStore { let mut total_written = 0; let mut returned_path = None; - let mut stream = match ipfs.unixfs().add(stream, Some(AddOption { pin: true, ..Default::default() })).await { - Ok(ste) => ste, - Err(e) => { - yield Progression::ProgressFailed { - name, - last_size: Some(last_written), - error: Some(e.to_string()), - }; - return; - } - }; + let mut stream = ipfs.unixfs().add(stream, Some(AddOption { pin: true, ..Default::default() })); while let Some(status) = stream.next().await { let n = name.clone(); @@ -777,9 +743,7 @@ impl FileStore { let stream = async_stream::stream! { let cat_stream = ipfs - .cat_unixfs(reference.parse::()?, None) - .await - .map_err(anyhow::Error::from)?; + .cat_unixfs(reference.parse::()?, None); for await data in cat_stream { match data { @@ -850,7 +814,7 @@ impl FileStore { } for cid in pinned_blocks { - ipfs.remove_block(cid).await?; + ipfs.remove_block(cid, false).await?; } directory.remove_item(&item.name())?; @@ -920,10 +884,7 @@ impl FileStore { let reference = file.reference().ok_or(Error::FileNotFound)?; - let stream = ipfs - .cat_unixfs(reference.parse::()?, None) - .await - .map_err(anyhow::Error::from)?; + let stream = ipfs.cat_unixfs(reference.parse::()?, None); pin_mut!(stream); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index f74b77dbf..1e8690050 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -8,10 +8,10 @@ use crate::{ use chrono::Utc; use futures::{channel::oneshot, StreamExt}; -use ipfs::{Ipfs, IpfsPath, Keypair}; +use ipfs::{Ipfs, Keypair}; use libipld::Cid; use rust_ipfs as ipfs; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, path::PathBuf, @@ -44,7 +44,7 @@ use super::{ connected_to_peer, did_keypair, document::{ cache::IdentityCache, identity::IdentityDocument, image_dag::get_image, - root::RootDocumentMap, utils::GetLocalDag, ExtractedRootDocument, RootDocument, ToCid, + root::RootDocumentMap, ExtractedRootDocument, RootDocument, }, ecdh_decrypt, ecdh_encrypt, phonebook::PhoneBook, @@ -1322,7 +1322,7 @@ impl IdentityStore { let did_kp = self.get_keypair_did()?; let identity = identity.sign(&did_kp)?; - let ident_cid = identity.to_cid(&self.ipfs).await?; + let ident_cid = self.ipfs.dag().put().serialize(identity)?.await?; let root_document = RootDocument { identity: ident_cid, @@ -1331,7 +1331,7 @@ impl IdentityStore { self.root_document.set(root_document).await?; - let identity = identity.resolve()?; + let identity = self.root_document.identity().await?.resolve()?; Ok(identity) } @@ -1476,7 +1476,7 @@ impl IdentityStore { log::debug!("Updating document"); let mut root_document = self.root_document.get().await?; - let ident_cid = identity.to_cid(&self.ipfs).await?; + let ident_cid = self.ipfs.dag().put().serialize(identity)?.await?; root_document.identity = ident_cid; self.root_document @@ -1597,10 +1597,6 @@ impl IdentityStore { .map_err(anyhow::Error::from) } - pub async fn get_local_dag(&self, path: IpfsPath) -> Result { - path.get_local_dag(&self.ipfs).await - } - pub async fn own_identity_document(&self) -> Result { let identity = self.root_document.identity().await?; identity.verify()?; @@ -1712,7 +1708,7 @@ impl IdentityStore { } for cid in pinned_blocks { - ipfs.remove_block(cid).await?; + ipfs.remove_block(cid, false).await?; } Ok(()) diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index dc4a34d70..d43f7d218 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -3145,17 +3145,7 @@ impl MessageStore { total: Some(attachment.size()), }; - let stream = match ipfs.get_unixfs(reference, &path).await { - Ok(stream) => stream, - Err(e) => { - yield Progression::ProgressFailed { - name: attachment.name(), - last_size: None, - error: Some(e.to_string()), - }; - return; - }, - }; + let stream = ipfs.get_unixfs(reference, &path); for await event in stream { match event {