Skip to content

Commit

Permalink
Merge branch 'main' into feat/rg-id-event
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Oct 27, 2023
2 parents 3e83c78 + ac0dbef commit c50d5c3
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ either = "1"
void = "1"

#ipfs dependency
rust-ipfs = "0.5.0"
rust-ipfs = "0.6.0"

# Blink related crates
# av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data
Expand Down
45 changes: 24 additions & 21 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,7 @@ impl WarpIpfs {

info!("Starting ipfs");
let mut uninitialized = UninitializedIpfs::empty()
.set_listening_addrs(config.listen_on.clone())
.set_custom_behaviour(behaviour)
.set_keypair(keypair)
.enable_rendezvous_client()
.set_transport_configuration(TransportConfig {
yamux_update_mode: UpdateMode::Read,
..Default::default()
})
.listen_as_external_addr()
.enable_relay(true)
.set_swarm_configuration(swarm_configuration)
.set_identify_configuration({
.with_identify(Some({
let mut idconfig = IdentifyConfiguration {
protocol_version: "/satellite/warp/0.1".into(),
..Default::default()
Expand All @@ -292,21 +281,35 @@ impl WarpIpfs {
idconfig.agent_version = agent.clone();
}
idconfig
})
.set_kad_configuration(
KadConfig {
}))
.with_autonat()
.with_bitswap(None)
.with_kademlia(
Some(either::Either::Left(KadConfig {
query_timeout: std::time::Duration::from_secs(60),
publication_interval: Some(Duration::from_secs(30 * 60)),
provider_record_ttl: Some(Duration::from_secs(60 * 60)),
insert_method: KadInserts::Manual,
..Default::default()
},
})),
Default::default(),
)
.set_pubsub_configuration(PubsubConfig {
.with_ping(None)
.with_pubsub(Some(PubsubConfig {
max_transmit_size: config.ipfs_setting.pubsub.max_transmit_size,
..Default::default()
});
}))
.with_relay(true)
.set_listening_addrs(config.listen_on.clone())
.with_custom_behaviour(behaviour)
.set_keypair(keypair)
.with_rendezvous_client()
.set_transport_configuration(TransportConfig {
yamux_update_mode: UpdateMode::Read,
..Default::default()
})
.listen_as_external_addr()
.set_swarm_configuration(swarm_configuration);

if let Some(path) = self.config.path.as_ref() {
info!("Instance will be persistent");
Expand All @@ -324,7 +327,7 @@ impl WarpIpfs {
}

if config.ipfs_setting.memory_transport {
uninitialized = uninitialized.set_custom_transport(Box::new(
uninitialized = uninitialized.with_custom_transport(Box::new(
|keypair, relay| -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let noise_config = rust_ipfs::libp2p::noise::Config::new(keypair)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Expand All @@ -350,11 +353,11 @@ impl WarpIpfs {
}

if config.ipfs_setting.portmapping {
uninitialized = uninitialized.enable_upnp();
uninitialized = uninitialized.with_upnp();
}

if config.ipfs_setting.mdns.enable {
uninitialized = uninitialized.enable_mdns();
uninitialized = uninitialized.with_mdns();
}

let ipfs = uninitialized.start().await?;
Expand Down
108 changes: 105 additions & 3 deletions extensions/warp-ipfs/src/store/document/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use futures::{
use libipld::Cid;
use rust_ipfs::{Ipfs, IpfsPath};
use uuid::Uuid;
use warp::{crypto::DID, error::Error};
use warp::{crypto::DID, error::Error, multipass::identity::IdentityStatus};

use crate::store::{identity::Request, keystore::Keystore, VecExt};
use crate::store::{ecdh_encrypt, identity::Request, keystore::Keystore, VecExt};

use super::{
identity::IdentityDocument,
utils::{GetLocalDag, ToCid},
RootDocument,
ExtractedRootDocument, RootDocument,
};

#[allow(clippy::large_enum_variant)]
Expand All @@ -25,6 +26,13 @@ pub enum RootDocumentCommand {
document: RootDocument,
response: oneshot::Sender<Result<(), Error>>,
},
Identity {
response: oneshot::Sender<Result<IdentityDocument, Error>>,
},
SetIdentityStatus {
status: IdentityStatus,
response: oneshot::Sender<Result<(), Error>>,
},
AddFriend {
did: DID,
response: oneshot::Sender<Result<(), Error>>,
Expand Down Expand Up @@ -80,6 +88,12 @@ pub enum RootDocumentCommand {
id: Uuid,
response: oneshot::Sender<Result<Keystore, Error>>,
},
Export {
response: oneshot::Sender<Result<ExtractedRootDocument, Error>>,
},
ExportEncrypted {
response: oneshot::Sender<Result<Vec<u8>, Error>>,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -150,6 +164,29 @@ impl RootDocumentMap {
rx.await.map_err(anyhow::Error::from)?
}

pub async fn identity(&self) -> Result<IdentityDocument, Error> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.clone()
.send(RootDocumentCommand::Identity { response: tx })
.await;
rx.await.map_err(anyhow::Error::from)?
}

pub async fn set_status_indicator(&self, status: IdentityStatus) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.clone()
.send(RootDocumentCommand::SetIdentityStatus {
status,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}

pub async fn add_friend(&self, did: &DID) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
let _ = self
Expand Down Expand Up @@ -294,6 +331,26 @@ impl RootDocumentMap {
rx.await.map_err(anyhow::Error::from)?
}

pub async fn export(&self) -> Result<ExtractedRootDocument, Error> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.clone()
.send(RootDocumentCommand::Export { response: tx })
.await;
rx.await.map_err(anyhow::Error::from)?
}

pub async fn export_bytes(&self) -> Result<Vec<u8>, Error> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.clone()
.send(RootDocumentCommand::ExportEncrypted { response: tx })
.await;
rx.await.map_err(anyhow::Error::from)?
}

pub async fn get_conversation_keystore_map(&self) -> Result<BTreeMap<String, Cid>, Error> {
let (tx, rx) = oneshot::channel();
let _ = self
Expand Down Expand Up @@ -349,6 +406,9 @@ impl RootDocumentTask {
RootDocumentCommand::Set { document, response } => {
let _ = response.send(self.set_root_document(document).await);
}
RootDocumentCommand::Identity { response } => {
let _ = response.send(self.identity().await);
}
RootDocumentCommand::AddFriend { did, response } => {
let _ = response.send(self.add_friend(did).await);
}
Expand Down Expand Up @@ -394,6 +454,15 @@ impl RootDocumentTask {
RootDocumentCommand::GetKeystoreMap { response } => {
let _ = response.send(self.get_conversation_keystore_map().await);
}
RootDocumentCommand::Export { response } => {
let _ = response.send(self.export().await);
}
RootDocumentCommand::ExportEncrypted { response } => {
let _ = response.send(self.export_bytes().await);
}
RootDocumentCommand::SetIdentityStatus { status, response } => {
let _ = response.send(self.set_identity_status(status).await);
}
}
}
}
Expand All @@ -411,6 +480,15 @@ impl RootDocumentTask {
Ok(document)
}

async fn identity(&self) -> Result<IdentityDocument, Error> {
let root = self.get_root_document().await?;
let path = IpfsPath::from(root.identity);
let document: IdentityDocument = path.get_local_dag(&self.ipfs).await?;
document.verify()?;

Ok(document)
}

async fn set_root_document(&mut self, document: RootDocument) -> Result<(), Error> {
let old_cid = self.cid;

Expand Down Expand Up @@ -444,6 +522,18 @@ impl RootDocumentTask {
Ok(())
}

async fn set_identity_status(&mut self, status: IdentityStatus) -> Result<(), Error> {
let mut root = self.get_root_document().await?;
let mut identity = self.identity().await?;
root.status = Some(status);
identity.status = Some(status);

let identity = identity.sign(&self.keypair)?;
root.identity = identity.to_cid(&self.ipfs).await?;

self.set_root_document(root).await
}

async fn request_list(&self) -> Result<Vec<Request>, Error> {
let cid = match self.cid {
Some(cid) => cid,
Expand Down Expand Up @@ -708,4 +798,16 @@ impl RootDocumentTask {
let path = IpfsPath::from(cid).sub_path(&id.to_string())?;
path.get_local_dag(&self.ipfs).await
}

async fn export(&self) -> Result<ExtractedRootDocument, Error> {
let document = self.get_root_document().await?;
document.export(&self.ipfs).await
}

async fn export_bytes(&self) -> Result<Vec<u8>, Error> {
let export = self.export().await?;

let bytes = serde_json::to_vec(&export)?;
ecdh_encrypt(&self.keypair, None, bytes)
}
}
30 changes: 6 additions & 24 deletions extensions/warp-ipfs/src/store/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use super::{
cache::IdentityCache, identity::IdentityDocument, image_dag::get_image,
root::RootDocumentMap, utils::GetLocalDag, ExtractedRootDocument, RootDocument, ToCid,
},
ecdh_decrypt, ecdh_encrypt, libp2p_pub_to_did,
ecdh_decrypt, ecdh_encrypt,
phonebook::PhoneBook,
queue::Queue,
};
Expand Down Expand Up @@ -1542,9 +1542,7 @@ impl IdentityStore {

#[tracing::instrument(skip(self))]
pub async fn set_identity_status(&mut self, status: IdentityStatus) -> Result<(), Error> {
let mut root_document = self.root_document.get().await?;
root_document.status = Some(status);
self.root_document.set(root_document).await?;
self.root_document.set_status_indicator(status).await?;
*self.online_status.write().await = Some(status);
self.push_to_all().await;
Ok(())
Expand Down Expand Up @@ -1606,31 +1604,15 @@ impl IdentityStore {
}

pub async fn own_identity_document(&self) -> Result<IdentityDocument, Error> {
let root_document = self.root_document.get().await?;
let path = IpfsPath::from(root_document.identity);
let identity: IdentityDocument = path.get_local_dag(&self.ipfs).await?;
let identity = self.root_document.identity().await?;
identity.verify()?;
Ok(identity)
}

pub async fn own_identity(&self) -> Result<Identity, Error> {
let root_document = self.root_document.get().await?;

let path = IpfsPath::from(root_document.identity);
let identity = self
.get_local_dag::<IdentityDocument>(path)
.await?
.resolve()?;

let public_key = identity.did_key();
let kp_public_key = libp2p_pub_to_did(&self.get_keypair()?.public())?;
if public_key != kp_public_key {
//Note if we reach this point, the identity would need to be reconstructed
return Err(Error::IdentityDoesntExist);
}

*self.online_status.write().await = root_document.status;
Ok(identity)
let identity = self.own_identity_document().await?;
*self.online_status.write().await = identity.status;
Ok(identity.into())
}

#[tracing::instrument(skip(self))]
Expand Down
12 changes: 6 additions & 6 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,9 +1725,9 @@ impl MessageStore {
}

//Temporary limit
if self.list_conversations().await.unwrap_or_default().len() >= 32 {
return Err(Error::ConversationLimitReached);
}
// if self.list_conversations().await.unwrap_or_default().len() >= 256 {
// return Err(Error::ConversationLimitReached);
// }

if !self.discovery.contains(did_key).await {
self.discovery.insert(did_key).await?;
Expand Down Expand Up @@ -1836,9 +1836,9 @@ impl MessageStore {
}

//Temporary limit
if self.list_conversations().await.unwrap_or_default().len() >= 32 {
return Err(Error::ConversationLimitReached);
}
// if self.list_conversations().await.unwrap_or_default().len() >= 256 {
// return Err(Error::ConversationLimitReached);
// }

for recipient in &recipients {
if !self.discovery.contains(recipient).await {
Expand Down

0 comments on commit c50d5c3

Please sign in to comment.