Skip to content

Commit

Permalink
chore: Use tracing imports
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 3, 2023
1 parent 8cd15e8 commit 8b8f6bd
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 190 deletions.
3 changes: 1 addition & 2 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use store::files::FileStore;
use store::identity::{IdentityStore, LookupBy};
use store::message::MessageStore;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::log::{error, info, warn};
use tracing::{debug, trace};
use tracing::{debug, error, info, trace, warn};
use utils::ExtensionType;
use uuid::Uuid;
use warp::constellation::directory::Directory;
Expand Down
32 changes: 12 additions & 20 deletions extensions/warp-ipfs/src/store/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,12 @@ use std::{
};

use futures::StreamExt;
use rust_ipfs::{
libp2p::swarm::dial_opts::{DialOpts, PeerCondition},
p2p::MultiaddrExt,
Ipfs, Multiaddr, PeerId,
};
use rust_ipfs::{libp2p::swarm::dial_opts::DialOpts, p2p::MultiaddrExt, Ipfs, Multiaddr, PeerId};
use tokio::{
sync::{broadcast, RwLock},
task::JoinHandle,
time::Instant,
};
use tracing::log;
use warp::{crypto::DID, error::Error};

use crate::config::{self, Discovery as DiscoveryConfig, DiscoveryType};
Expand Down Expand Up @@ -70,7 +65,7 @@ impl Discovery {

if let Err(e) = discovery.ipfs.provide(cid).await {
//Maybe panic?
log::error!("Error providing key: {e}");
tracing::error!("Error providing key: {e}");
return;
}

Expand Down Expand Up @@ -119,7 +114,7 @@ impl Discovery {
};

if let Err(e) = self.ipfs.add_peer(peer_id, addr).await {
log::error!("Error adding peer to address book {e}");
tracing::error!("Error adding peer to address book {e}");
continue;
}

Expand All @@ -135,7 +130,7 @@ impl Discovery {
.rendezvous_register_namespace(namespace.clone(), None, *peer_id)
.await
{
log::error!("Error registering to namespace: {e}");
tracing::error!("Error registering to namespace: {e}");
continue;
}

Expand Down Expand Up @@ -170,7 +165,7 @@ impl Discovery {
.rendezvous_register_namespace(namespace.clone(), None, *peer_id)
.await
{
log::error!("Error registering to namespace: {e}");
tracing::error!("Error registering to namespace: {e}");
continue;
}
}
Expand Down Expand Up @@ -218,7 +213,7 @@ impl Discovery {
}
}
Err(e) => {
log::error!("Error performing discovery over {namespace}: {e}");
tracing::error!("Error performing discovery over {namespace}: {e}");
}
}
}
Expand Down Expand Up @@ -406,14 +401,12 @@ impl DiscoveryEntry {
discovery_type: DiscoveryType::RzPoint { .. },
..
} => {
let opts = DialOpts::peer_id(peer_id)
.condition(PeerCondition::Disconnected)
.build();
let opts = DialOpts::peer_id(peer_id).build();

log::debug!("Dialing {peer_id}");
tracing::debug!("Dialing {peer_id}");

if let Err(_e) = ipfs.connect(opts).await {
log::error!("Error connecting to {peer_id}: {_e}");
tracing::error!("Error connecting to {peer_id}: {_e}");
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
Expand All @@ -430,13 +423,12 @@ impl DiscoveryEntry {
config::Discovery::None => {
let opts = DialOpts::peer_id(peer_id)
.addresses(entry.relays.clone())
.condition(PeerCondition::Disconnected)
.build();

log::debug!("Dialing {peer_id}");
tracing::debug!("Dialing {peer_id}");

if let Err(_e) = ipfs.connect(opts).await {
log::error!("Error connecting to {peer_id}: {_e}");
tracing::error!("Error connecting to {peer_id}: {_e}");
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
Expand All @@ -449,7 +441,7 @@ impl DiscoveryEntry {
{
if let Ok(did) = peer_id.to_did() {
tokio::time::sleep(Duration::from_millis(500)).await;
log::info!("Connected to {did}. Emitting initial event");
tracing::info!("Connected to {did}. Emitting initial event");
let topic = format!("/peer/{did}/events");
let subscribed = ipfs
.pubsub_peers(Some(topic))
Expand Down
36 changes: 19 additions & 17 deletions extensions/warp-ipfs/src/store/document/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl IdentityCacheTask {
let mut list: HashSet<IdentityDocument> = match self.list {
Some(cid) => self
.ipfs
.get_dag(IpfsPath::from(cid))
.get_dag(cid)
.local()
.deserialized()
.await
Expand All @@ -194,17 +194,19 @@ impl IdentityCacheTask {

list.replace(document);

let cid = self.ipfs.dag().put().serialize(list)?.await?;
let cid = self.ipfs.dag().put().serialize(list)?.pin(false).await?;

let old_cid = self.list.take();

let remove_pin_and_block = async {
if let Some(old_cid) = old_cid {
if self.ipfs.is_pinned(&old_cid).await? {
self.ipfs.remove_pin(&old_cid, false).await?;
if old_cid != cid {
if self.ipfs.is_pinned(&old_cid).await? {
self.ipfs.remove_pin(&old_cid, false).await?;
}
// Do we want to remove the old block?
self.ipfs.remove_block(old_cid, false).await?;
}
// Do we want to remove the old block?
self.ipfs.remove_block(old_cid, false).await?;
}
Ok::<_, Error>(())
};
Expand All @@ -225,9 +227,7 @@ impl IdentityCacheTask {
None => {
list.insert(document);

let cid = self.ipfs.dag().put().serialize(list)?.await?;

self.ipfs.insert_pin(&cid, false).await?;
let cid = self.ipfs.dag().put().serialize(list)?.pin(false).await?;

if let Some(path) = self.path.as_ref() {
let cid = cid.to_string();
Expand All @@ -247,7 +247,7 @@ impl IdentityCacheTask {
let list: HashSet<IdentityDocument> = match self.list {
Some(cid) => self
.ipfs
.get_dag(IpfsPath::from(cid))
.get_dag(cid)
.local()
.deserialized()
.await
Expand All @@ -268,7 +268,7 @@ impl IdentityCacheTask {
let mut list: HashSet<IdentityDocument> = match self.list {
Some(cid) => self
.ipfs
.get_dag(IpfsPath::from(cid))
.get_dag(cid)
.local()
.deserialized()
.await
Expand All @@ -290,16 +290,18 @@ impl IdentityCacheTask {
return Err(Error::IdentityDoesntExist);
}

let cid = self.ipfs.dag().put().serialize(list)?.await?;
let cid = self.ipfs.dag().put().serialize(list)?.pin(false).await?;

let old_cid = self.list.take();

if let Some(old_cid) = old_cid {
if self.ipfs.is_pinned(&old_cid).await? {
self.ipfs.remove_pin(&old_cid, false).await?;
if cid != old_cid {
if self.ipfs.is_pinned(&old_cid).await? {
self.ipfs.remove_pin(&old_cid, false).await?;
}
// Do we want to remove the old block?
self.ipfs.remove_block(old_cid, false).await?;
}
// Do we want to remove the old block?
self.ipfs.remove_block(old_cid, false).await?;
}

if let Some(path) = self.path.as_ref() {
Expand All @@ -318,7 +320,7 @@ impl IdentityCacheTask {
let list: HashSet<IdentityDocument> = match self.list {
Some(cid) => self
.ipfs
.get_dag(IpfsPath::from(cid))
.get_dag(cid)
.local()
.deserialized()
.await
Expand Down
55 changes: 17 additions & 38 deletions extensions/warp-ipfs/src/store/document/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,8 @@ impl ConversationTask {
None => return Err(Error::InvalidConversation),
};

let mut conversation_map: BTreeMap<String, Cid> = self
.ipfs
.get_dag(IpfsPath::from(cid))
.local()
.deserialized()
.await?;
let mut conversation_map: BTreeMap<String, Cid> =
self.ipfs.get_dag(cid).local().deserialized().await?;

let document_cid = match conversation_map.remove(&id.to_string()) {
Some(cid) => cid,
Expand All @@ -315,7 +311,7 @@ impl ConversationTask {

let document: ConversationDocument = self
.ipfs
.get_dag(IpfsPath::from(document_cid))
.get_dag(document_cid)
.local()
.deserialized()
.await?;
Expand All @@ -328,20 +324,14 @@ impl ConversationTask {
None => return Ok(Vec::new()),
};

let conversation_map: BTreeMap<String, Cid> = self
.ipfs
.get_dag(IpfsPath::from(cid))
.local()
.deserialized()
.await?;
let conversation_map: BTreeMap<String, Cid> =
self.ipfs.get_dag(cid).local().deserialized().await?;

let list = FuturesUnordered::from_iter(conversation_map.values().map(|cid| {
self.ipfs
.get_dag(IpfsPath::from(*cid))
.local()
.deserialized()
.into_future()
}))
let list = FuturesUnordered::from_iter(
conversation_map
.values()
.map(|cid| self.ipfs.get_dag(*cid).local().deserialized().into_future()),
)
.filter_map(|result: Result<ConversationDocument, _>| async move { result.ok() })
.collect::<Vec<_>>()
.await;
Expand All @@ -355,16 +345,11 @@ impl ConversationTask {
None => return false,
};

let conversation_map: BTreeMap<String, Cid> = match self
.ipfs
.get_dag(IpfsPath::from(cid))
.local()
.deserialized()
.await
{
Ok(document) => document,
Err(_) => return false,
};
let conversation_map: BTreeMap<String, Cid> =
match self.ipfs.get_dag(cid).local().deserialized().await {
Ok(document) => document,
Err(_) => return false,
};

conversation_map.contains_key(&id.to_string())
}
Expand All @@ -381,7 +366,7 @@ impl ConversationTask {
self.ipfs.insert_pin(&cid, true).await?;

if let Some(old_cid) = old_map_cid {
if self.ipfs.is_pinned(&old_cid).await.unwrap_or_default() {
if old_cid != cid && self.ipfs.is_pinned(&old_cid).await.unwrap_or_default() {
self.ipfs.remove_pin(&old_cid, true).await?;
}
}
Expand All @@ -408,13 +393,7 @@ impl ConversationTask {
document.verify()?;

let mut map = match self.cid {
Some(cid) => {
self.ipfs
.get_dag(IpfsPath::from(cid))
.local()
.deserialized()
.await?
}
Some(cid) => self.ipfs.get_dag(cid).local().deserialized().await?,
None => BTreeMap::new(),
};

Expand Down
31 changes: 1 addition & 30 deletions extensions/warp-ipfs/src/store/document/identity.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use chrono::{DateTime, Utc};
use libipld::Cid;
use rust_ipfs::{Ipfs, IpfsPath, PeerId};
use serde::{Deserialize, Serialize};
use std::{hash::Hash, time::Duration};
use std::hash::Hash;
use warp::{
crypto::{did_key::CoreSign, Fingerprint, DID},
error::Error,
Expand Down Expand Up @@ -193,31 +192,3 @@ impl IdentityDocument {
Ok(())
}
}

pub async fn unixfs_fetch(
ipfs: &Ipfs,
cid: Cid,
timeout: Option<Duration>,
peers: &[PeerId],
local: bool,
limit: Option<usize>,
) -> Result<Vec<u8>, Error> {
let data = ipfs
.unixfs()
.cat(IpfsPath::from(cid), None, peers, local, timeout)
.await
.map_err(anyhow::Error::from)?;

if let Some(limit) = limit {
if data.len() > limit {
return Err(Error::InvalidLength {
context: "data".into(),
current: data.len(),
minimum: None,
maximum: Some(limit),
});
}
}

Ok(data)
}
8 changes: 5 additions & 3 deletions extensions/warp-ipfs/src/store/document/image_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use std::task::Poll;
use tracing::log;
use warp::{constellation::file::FileType, error::Error, multipass::identity::IdentityImage};

use super::identity::unixfs_fetch;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ImageDag {
pub link: Cid,
Expand Down Expand Up @@ -118,7 +116,11 @@ pub async fn get_image(
None => {}
}

let image = unixfs_fetch(ipfs, dag.link, None, peers, local, limit).await?;
let image = ipfs
.unixfs()
.cat(dag.link, None, peers, local, None)
.await
.map_err(anyhow::Error::from)?;

let mut id_img = IdentityImage::default();

Expand Down
Loading

0 comments on commit 8b8f6bd

Please sign in to comment.