Skip to content

Commit

Permalink
Merge branch 'main' into fix/blink-signaling
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Nov 6, 2023
2 parents 401b3c5 + 021d570 commit 71fb505
Show file tree
Hide file tree
Showing 15 changed files with 522 additions and 504 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ either = "1"
void = "1"

#ipfs dependency
rust-ipfs = "0.6.0"
rust-ipfs = "0.7.0"

# Blink related crates
# av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data
Expand Down
5 changes: 2 additions & 3 deletions extensions/warp-ipfs/src/behaviour/phonebook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rust_ipfs::libp2p::{
core::Endpoint,
swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId,
FromSwarm, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
FromSwarm, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -170,7 +170,7 @@ impl NetworkBehaviour for Behaviour {
) {
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
Expand Down Expand Up @@ -220,7 +220,6 @@ impl NetworkBehaviour for Behaviour {
fn poll(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
Expand Down
1 change: 0 additions & 1 deletion extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ impl WarpIpfs {
}
idconfig
}))
.with_autonat()
.with_bitswap(None)
.with_kademlia(
Some(either::Either::Left(KadConfig {
Expand Down
45 changes: 32 additions & 13 deletions extensions/warp-ipfs/src/store/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ use warp::{

use crate::store::ecdh_encrypt;

use super::{
document::utils::{GetIpldDag, GetLocalDag, ToCid},
ecdh_decrypt,
keystore::Keystore,
verify_serde_sig,
};
use super::{ecdh_decrypt, keystore::Keystore, verify_serde_sig};

#[derive(Debug, Clone, Serialize, Deserialize, Eq)]
pub struct ConversationDocument {
Expand Down Expand Up @@ -300,14 +295,29 @@ impl ConversationDocument {

pub async fn get_message_list(&self, ipfs: &Ipfs) -> Result<BTreeSet<MessageDocument>, Error> {
match self.messages {
Some(cid) => cid.get_local_dag(ipfs).await,
Some(cid) => ipfs
.dag()
.get()
.path(cid)
.local()
.deserialized()
.await
.map_err(anyhow::Error::from)
.map_err(Error::from),
None => Ok(BTreeSet::new()),
}
}

pub async fn get_raw_message_list(&self, ipfs: &Ipfs) -> Result<Ipld, Error> {
match self.messages {
Some(cid) => cid.get_ipld_dag(ipfs).await,
Some(cid) => ipfs
.dag()
.get()
.path(cid)
.local()
.await
.map_err(anyhow::Error::from)
.map_err(Error::from),
None => Ok(Ipld::List(vec![])),
}
}
Expand All @@ -317,7 +327,7 @@ impl ConversationDocument {
list: BTreeSet<MessageDocument>,
) -> Result<(), Error> {
self.modified = Utc::now();
let cid = list.to_cid(ipfs).await?;
let cid = ipfs.dag().put().serialize(list)?.await?;
self.messages = Some(cid);
Ok(())
}
Expand Down Expand Up @@ -531,7 +541,10 @@ impl ConversationDocument {
pub async fn delete_all_message(&mut self, ipfs: Ipfs) -> Result<(), Error> {
let cid = std::mem::take(&mut self.messages);
let (cid, messages): (Cid, BTreeSet<MessageDocument>) = match cid {
Some(cid) => (cid, cid.get_local_dag(&ipfs).await?),
Some(cid) => (
cid,
ipfs.dag().get().path(cid).local().deserialized().await?,
),
None => return Ok(()),
};

Expand Down Expand Up @@ -611,7 +624,7 @@ impl MessageDocument {
None => ecdh_encrypt(&did, Some(&sender), &bytes)?,
};

let message = data.to_cid(ipfs).await?;
let message = ipfs.dag().put().serialize(data)?.await?;

let sender = DIDEd25519Reference::from_did(&sender);

Expand Down Expand Up @@ -664,7 +677,7 @@ impl MessageDocument {
None => ecdh_encrypt(did, Some(&self.sender.to_did()), &bytes)?,
};

let message_cid = data.to_cid(ipfs).await?;
let message_cid = ipfs.dag().put().serialize(data)?.await?;

info!("Setting Message to document");
self.message = message_cid;
Expand All @@ -678,7 +691,13 @@ impl MessageDocument {
did: &DID,
keystore: Option<&Keystore>,
) -> Result<Message, Error> {
let bytes: Vec<u8> = self.message.get_local_dag(ipfs).await?;
let bytes: Vec<u8> = ipfs
.dag()
.get()
.path(self.message)
.local()
.deserialized()
.await?;

let sender = self.sender.to_did();
let data = match keystore {
Expand Down
6 changes: 6 additions & 0 deletions extensions/warp-ipfs/src/store/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ impl Discovery {
return Ok(());
}

let own_peer_id = self.ipfs.keypair()?.public().to_peer_id();

if peer_id == own_peer_id {
return Ok(());
}

let entry = DiscoveryEntry::new(
&self.ipfs,
peer_id,
Expand Down
147 changes: 70 additions & 77 deletions extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@ pub mod conversation;
pub mod identity;
pub mod image_dag;
pub mod root;
pub mod utils;

use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use ipfs::{Ipfs, IpfsPath};
use libipld::{
serde::{from_ipld, to_ipld},
Cid,
};
use ipfs::Ipfs;
use libipld::Cid;
use rust_ipfs as ipfs;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{collections::BTreeMap, str::FromStr, time::Duration};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, str::FromStr};
use uuid::Uuid;
use warp::{
crypto::{did_key::CoreSign, DID},
Expand All @@ -24,59 +20,10 @@ use warp::{

use crate::store::get_keypair_did;

use self::{identity::IdentityDocument, utils::GetLocalDag};
use self::identity::IdentityDocument;

use super::{identity::Request, keystore::Keystore};

#[async_trait::async_trait]
pub(crate) trait ToCid: Sized {
async fn to_cid(&self, ipfs: &Ipfs) -> Result<Cid, Error>;
}

#[async_trait::async_trait]
pub(crate) trait GetDag<D>: Sized {
async fn get_dag(&self, ipfs: &Ipfs, timeout: Option<Duration>) -> Result<D, Error>;
}

#[async_trait::async_trait]
impl<D: DeserializeOwned> GetDag<D> for Cid {
async fn get_dag(&self, ipfs: &Ipfs, timeout: Option<Duration>) -> Result<D, Error> {
IpfsPath::from(*self).get_dag(ipfs, timeout).await
}
}

#[async_trait::async_trait]
impl<D: DeserializeOwned> GetDag<D> for &Cid {
async fn get_dag(&self, ipfs: &Ipfs, timeout: Option<Duration>) -> Result<D, Error> {
IpfsPath::from(**self).get_dag(ipfs, timeout).await
}
}

#[async_trait::async_trait]
impl<D: DeserializeOwned> GetDag<D> for IpfsPath {
async fn get_dag(&self, ipfs: &Ipfs, timeout: Option<Duration>) -> Result<D, Error> {
let timeout = timeout.unwrap_or(std::time::Duration::from_secs(10));
match tokio::time::timeout(timeout, ipfs.get_dag(self.clone())).await {
Ok(Ok(ipld)) => from_ipld(ipld)
.map_err(anyhow::Error::from)
.map_err(Error::from),
Ok(Err(e)) => Err(Error::Any(e)),
Err(e) => Err(Error::from(anyhow::anyhow!("Timeout at {e}"))),
}
}
}

#[async_trait::async_trait]
impl<T> ToCid for T
where
T: Serialize + Clone + Send + Sync,
{
async fn to_cid(&self, ipfs: &Ipfs) -> Result<Cid, Error> {
let ipld = to_ipld(self.clone()).map_err(anyhow::Error::from)?;
ipfs.put_dag(ipld).await.map_err(Error::from)
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub struct ExtractedRootDocument {
pub identity: Identity,
Expand Down Expand Up @@ -159,9 +106,12 @@ impl RootDocument {

#[tracing::instrument(skip(self, ipfs))]
pub async fn verify(&self, ipfs: &Ipfs) -> Result<(), Error> {
let identity: IdentityDocument = self
.identity
.get_local_dag(ipfs)
let identity: IdentityDocument = ipfs
.dag()
.get()
.path(self.identity)
.local()
.deserialized()
.await
.map_err(|_| Error::IdentityInvalid)?;

Expand Down Expand Up @@ -196,44 +146,85 @@ impl RootDocument {
),
Error,
> {
let document: IdentityDocument = self
.identity
.get_local_dag(ipfs)
let document: IdentityDocument = ipfs
.dag()
.get()
.path(self.identity)
.local()
.deserialized()
.await
.map_err(|_| Error::IdentityInvalid)?;

let identity = document.resolve()?;

let friends = futures::future::ready(self.friends.ok_or(Error::Other))
.and_then(|document| async move { document.get_local_dag(ipfs).await })
.and_then(|document| async move {
ipfs.dag()
.get()
.path(document)
.local()
.deserialized()
.await
.map_err(Error::from)
})
.await
.unwrap_or_default();

let block_list = futures::future::ready(self.blocks.ok_or(Error::Other))
.and_then(|document| async move { document.get_local_dag(ipfs).await })
.and_then(|document| async move {
ipfs.dag()
.get()
.path(document)
.local()
.deserialized()
.await
.map_err(Error::from)
})
.await
.unwrap_or_default();

let block_by_list = futures::future::ready(self.block_by.ok_or(Error::Other))
.and_then(|document| async move { document.get_local_dag(ipfs).await })
.and_then(|document| async move {
ipfs.dag()
.get()
.path(document)
.local()
.deserialized()
.await
.map_err(Error::from)
})
.await
.unwrap_or_default();

let request = futures::future::ready(self.request.ok_or(Error::Other))
.and_then(|document| async move { document.get_local_dag(ipfs).await })
.and_then(|document| async move {
ipfs.dag()
.get()
.path(document)
.local()
.deserialized()
.await
.map_err(Error::from)
})
.await
.unwrap_or_default();

let conversation_keystore =
futures::future::ready(self.conversations_keystore.ok_or(Error::Other))
.and_then(|document| async move {
let map: BTreeMap<String, Cid> = document.get_local_dag(ipfs).await?;
let map: BTreeMap<String, Cid> = ipfs
.dag()
.get()
.path(document)
.local()
.deserialized()
.await?;
let mut resolved_map: BTreeMap<Uuid, Keystore> = BTreeMap::new();
for (k, v) in map
.iter()
.filter_map(|(k, v)| Uuid::from_str(k).map(|k| (k, *v)).ok())
{
if let Ok(store) = v.get_local_dag(ipfs).await {
if let Ok(store) = ipfs.dag().get().path(v).local().deserialized().await {
resolved_map.insert(k, store);
}
}
Expand Down Expand Up @@ -264,35 +255,37 @@ impl RootDocument {

let document = document.sign(&did_kp)?;

let identity = document.to_cid(ipfs).await?;
let identity = ipfs.dag().put().serialize(document)?.await?;
let has_friends = !data.friends.is_empty();
let has_blocks = !data.block_list.is_empty();
let has_block_by_list = !data.block_by_list.is_empty();
let has_requests = !data.request.is_empty();
let has_keystore = !data.conversation_keystore.is_empty();

let friends = has_friends
.then_some(data.friends.to_cid(ipfs).await.ok())
.then_some(ipfs.dag().put().serialize(data.friends)?.await.ok())
.flatten();

let blocks = has_blocks
.then_some(data.block_list.to_cid(ipfs).await.ok())
.then_some(ipfs.dag().put().serialize(data.block_list)?.await.ok())
.flatten();
let block_by = has_block_by_list
.then_some(data.block_by_list.to_cid(ipfs).await.ok())
.then_some(ipfs.dag().put().serialize(data.block_by_list)?.await.ok())
.flatten();
let request = has_requests
.then_some(data.request.to_cid(ipfs).await.ok())
.then_some(ipfs.dag().put().serialize(data.request)?.await.ok())
.flatten();

let conversations_keystore = has_keystore
.then_some({
let mut pointer_map: BTreeMap<String, Cid> = BTreeMap::new();
for (k, v) in data.conversation_keystore {
if let Ok(cid) = v.to_cid(ipfs).await {
if let Ok(cid) = ipfs.dag().put().serialize(v)?.await {
pointer_map.insert(k.to_string(), cid);
}
}
pointer_map.to_cid(ipfs).await.ok()

ipfs.dag().put().serialize(pointer_map)?.await.ok()
})
.flatten();

Expand Down
Loading

0 comments on commit 71fb505

Please sign in to comment.