From 299da4a7c878f7c786c00e0a17addcae505eaa0e Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 20 Oct 2023 16:28:10 -0400 Subject: [PATCH] refactor: Move conversation functionality into its own task (#340) --- extensions/warp-ipfs/src/lib.rs | 1 - .../warp-ipfs/src/store/conversation.rs | 67 +- extensions/warp-ipfs/src/store/document.rs | 188 ++--- .../src/store/document/conversation.rs | 419 ++++++++++ .../warp-ipfs/src/store/document/root.rs | 90 ++- extensions/warp-ipfs/src/store/message.rs | 719 ++++-------------- 6 files changed, 710 insertions(+), 774 deletions(-) create mode 100644 extensions/warp-ipfs/src/store/document/conversation.rs diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 30342ed76..06181af60 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -1214,7 +1214,6 @@ impl RayGun for WarpIpfs { self.messaging_store()? .get_conversation(conversation_id) .await - .map(|convo| convo.into()) } async fn list_conversations(&self) -> Result, Error> { diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index 26211899c..fb69b773a 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use core::hash::Hash; use futures::{ - stream::{self, BoxStream, FuturesOrdered}, + stream::{self, BoxStream}, StreamExt, TryFutureExt, }; use libipld::{Cid, Ipld}; @@ -301,23 +301,8 @@ impl ConversationDocument { ipfs: &Ipfs, list: BTreeSet, ) -> Result<(), Error> { - let old_cid = self.messages; let cid = list.to_cid(ipfs).await?; - if !ipfs.is_pinned(&cid).await? { - ipfs.insert_pin(&cid, false).await?; - } - - if let Some(old_cid) = old_cid { - if old_cid != cid { - if ipfs.is_pinned(&old_cid).await? { - ipfs.remove_pin(&old_cid, false).await?; - } - ipfs.remove_block(old_cid).await?; - } - } - self.messages = Some(cid); - Ok(()) } @@ -524,7 +509,7 @@ impl ConversationDocument { .ok_or(Error::MessageNotFound)?; messages.remove(&document); self.set_message_list(ipfs, messages).await?; - document.remove(ipfs).await + Ok(()) } pub async fn delete_all_message(&mut self, ipfs: Ipfs) -> Result<(), Error> { @@ -538,27 +523,11 @@ impl ConversationDocument { ipfs.remove_pin(&cid, false).await?; } - let mut ids = vec![]; - for document in messages { if ipfs.is_pinned(&document.message).await? { ipfs.remove_pin(&document.message, false).await?; } - ids.push(document.message); } - - ipfs.remove_block(cid).await?; - - //TODO: Replace with gc within ipfs (when completed) in the future - // so we dont need to manually delete the blocks - FuturesOrdered::from_iter(ids.iter().map(|cid| { - let ipfs = ipfs.clone(); - async move { (ipfs, *cid) } - })) - .for_each_concurrent(None, |(ipfs, cid)| async move { - let _ = ipfs.remove_block(cid).await.ok(); - }) - .await; Ok(()) } } @@ -632,10 +601,6 @@ impl MessageDocument { let message = data.to_cid(ipfs).await?; - if !ipfs.is_pinned(&message).await? { - ipfs.insert_pin(&message, false).await?; - } - let sender = DIDEd25519Reference::from_did(&sender); let document = MessageDocument { @@ -649,15 +614,15 @@ impl MessageDocument { Ok(document) } - pub async fn remove(&self, ipfs: &Ipfs) -> Result<(), Error> { - let cid = self.message; - if ipfs.is_pinned(&cid).await? { - ipfs.remove_pin(&cid, false).await?; - } - ipfs.remove_block(cid).await?; + // pub async fn remove(&self, ipfs: &Ipfs) -> Result<(), Error> { + // let cid = self.message; + // if ipfs.is_pinned(&cid).await? { + // ipfs.remove_pin(&cid, false).await?; + // } + // ipfs.remove_block(cid).await?; - Ok(()) - } + // Ok(()) + // } pub async fn update( &mut self, @@ -668,7 +633,6 @@ impl MessageDocument { ) -> Result<(), Error> { info!("Updating message {} for {}", self.id, self.conversation_id); let old_message = self.resolve(ipfs, did, keystore).await?; - let old_document = self.message; if old_message.id() != message.id() || old_message.conversation_id() != message.conversation_id() @@ -690,20 +654,9 @@ impl MessageDocument { let message_cid = data.to_cid(ipfs).await?; - if !ipfs.is_pinned(&message_cid).await? { - ipfs.insert_pin(&message_cid, false).await?; - } - info!("Setting Message to document"); self.message = message_cid; info!("Message is updated"); - if old_document != message_cid { - if ipfs.is_pinned(&old_document).await? { - info!("Removing pin for {old_document}"); - ipfs.remove_pin(&old_document, false).await?; - } - ipfs.remove_block(old_document).await?; - } Ok(()) } diff --git a/extensions/warp-ipfs/src/store/document.rs b/extensions/warp-ipfs/src/store/document.rs index 42cf34af1..d884d1462 100644 --- a/extensions/warp-ipfs/src/store/document.rs +++ b/extensions/warp-ipfs/src/store/document.rs @@ -1,4 +1,5 @@ pub mod cache; +pub mod conversation; pub mod identity; pub mod root; pub mod utils; @@ -12,7 +13,8 @@ use libipld::{ }; use rust_ipfs as ipfs; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::time::Duration; +use std::{collections::BTreeMap, str::FromStr, time::Duration}; +use uuid::Uuid; use warp::{ crypto::{did_key::CoreSign, DID}, error::Error, @@ -23,7 +25,7 @@ use crate::store::get_keypair_did; use self::{identity::IdentityDocument, utils::GetLocalDag}; -use super::identity::Request; +use super::{identity::Request, keystore::Keystore}; #[async_trait::async_trait] pub(crate) trait ToCid: Sized { @@ -83,6 +85,7 @@ pub struct ExtractedRootDocument { pub block_list: Vec, pub block_by_list: Vec, pub request: Vec, + pub conversation_keystore: BTreeMap, pub signature: Option>, } @@ -123,6 +126,12 @@ pub struct RootDocument { /// array of request (Request) #[serde(skip_serializing_if = "Option::is_none")] pub request: Option, + /// map of conversations + #[serde(skip_serializing_if = "Option::is_none")] + pub conversations: Option, + /// map of keystore for group chat conversations + #[serde(skip_serializing_if = "Option::is_none")] + pub conversations_keystore: Option, /// Online/Away/Busy/Offline status #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, @@ -182,6 +191,7 @@ impl RootDocument { Vec, Vec, Vec, + BTreeMap, ), Error, > { @@ -213,6 +223,24 @@ impl RootDocument { .await .unwrap_or_default(); + let conversation_keystore = + futures::future::ready(self.conversations_keystore.ok_or(Error::Other)) + .and_then(|document| async move { + let map: BTreeMap = document.get_local_dag(ipfs).await?; + let mut resolved_map: BTreeMap = BTreeMap::new(); + for (k, v) in map + .iter() + .filter_map(|(k, v)| Uuid::from_str(k).map(|k| (k, *v)).ok()) + { + if let Ok(store) = v.get_local_dag(ipfs).await { + resolved_map.insert(k, store); + } + } + Ok(resolved_map) + }) + .await + .unwrap_or_default(); + Ok(( identity, self.created, @@ -221,6 +249,7 @@ impl RootDocument { block_list, block_by_list, request, + conversation_keystore, )) } @@ -239,6 +268,7 @@ impl RootDocument { let has_blocks = !data.block_list.is_empty(); let has_block_by_list = !data.block_by_list.is_empty(); let has_requests = !data.request.is_empty(); + let has_keystore = !data.conversation_keystore.is_empty(); let friends = has_friends .then_some(data.friends.to_cid(ipfs).await.ok()) @@ -253,10 +283,24 @@ impl RootDocument { .then_some(data.request.to_cid(ipfs).await.ok()) .flatten(); + let conversations_keystore = has_keystore + .then_some({ + let mut pointer_map: BTreeMap = BTreeMap::new(); + for (k, v) in data.conversation_keystore { + if let Ok(cid) = v.to_cid(ipfs).await { + pointer_map.insert(k.to_string(), cid); + } + } + pointer_map.to_cid(ipfs).await.ok() + }) + .flatten(); + let root_document = RootDocument { identity, created: Some(data.created), modified: Some(data.modified), + conversations: None, + conversations_keystore, friends, blocks, block_by, @@ -270,8 +314,16 @@ impl RootDocument { } pub async fn export(&self, ipfs: &Ipfs) -> Result { - let (identity, created, modified, friends, block_list, block_by_list, request) = - self.resolve(ipfs).await?; + let ( + identity, + created, + modified, + friends, + block_list, + block_by_list, + request, + conversation_keystore, + ) = self.resolve(ipfs).await?; let mut exported = ExtractedRootDocument { identity, @@ -281,6 +333,7 @@ impl RootDocument { block_list, block_by_list, request, + conversation_keystore, signature: None, }; @@ -293,130 +346,3 @@ impl RootDocument { Ok(exported) } } - -// Note: This is commented out temporarily due to a race condition that was found while testing. This may get reenabled and used in the near future -// #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -// pub struct ConversationRootDocument { -// pub did: DID, -// pub conversations: HashSet>, -// } - -// impl ConversationRootDocument { -// pub fn new(did: DID) -> Self { -// Self { -// did, -// conversations: Default::default(), -// } -// } -// } - -// impl ConversationRootDocument { -// pub async fn get_conversation( -// &self, -// ipfs: Ipfs, -// conversation_id: Uuid, -// ) -> Result { -// let document_type = self -// .get_conversation_document(ipfs.clone(), conversation_id) -// .await?; -// document_type.resolve(ipfs, None).await -// } - -// pub async fn get_conversation_document( -// &self, -// ipfs: Ipfs, -// conversation_id: Uuid, -// ) -> Result, Error> { -// FuturesUnordered::from_iter(self.conversations.iter().map(|document| { -// let ipfs = ipfs.clone(); -// async move { -// let document_type = document.clone(); -// document -// .resolve(ipfs, None) -// .await -// .map(|document| (document_type, document)) -// } -// })) -// .filter_map(|result| async { result.ok() }) -// .filter(|(_, document)| { -// let id = document.id; -// async move { id == conversation_id } -// }) -// .map(|(document_type, _)| document_type) -// .collect::>() -// .await -// .first() -// .cloned() -// .ok_or(Error::InvalidConversation) -// } - -// pub async fn list_conversations( -// &self, -// ipfs: Ipfs, -// ) -> Result, Error> { -// debug!("Loading conversations"); -// let list = FuturesUnordered::from_iter( -// self.conversations -// .iter() -// .map(|document| async { document.resolve(ipfs.clone(), None).await }), -// ) -// .filter_map(|res| async { res.ok() }) -// .collect::>() -// .await; -// info!("Conversations loaded"); -// Ok(list) -// } - -// pub async fn remove_conversation( -// &mut self, -// ipfs: Ipfs, -// conversation_id: Uuid, -// ) -> Result { -// info!("Removing conversation"); -// let document_type = self -// .get_conversation_document(ipfs.clone(), conversation_id) -// .await?; - -// if !self.conversations.remove(&document_type) { -// error!("Conversation doesnt exist"); -// return Err(Error::InvalidConversation); -// } - -// let conversation = document_type.resolve(ipfs.clone(), None).await?; -// if ipfs.is_pinned(&document_type.document).await? { -// info!("Unpinning document"); -// ipfs.remove_pin(&document_type.document, false).await?; -// info!("Document unpinned"); -// } -// ipfs.remove_block(document_type.document).await?; -// info!("Block removed"); - -// Ok(conversation) -// } - -// pub async fn update_conversation( -// &mut self, -// ipfs: Ipfs, -// conversation_id: Uuid, -// document: ConversationDocument, -// ) -> Result<(), Error> { -// let document_type = self -// .get_conversation_document(ipfs.clone(), conversation_id) -// .await?; - -// if !self.conversations.remove(&document_type) { -// return Err(Error::InvalidConversation); -// } - -// let document = document.to_document(ipfs.clone()).await?; - -// self.conversations.insert(document); - -// if ipfs.is_pinned(&document_type.document).await? { -// ipfs.remove_pin(&document_type.document, false).await?; -// } -// ipfs.remove_block(document_type.document).await?; - -// Ok(()) -// } -// } diff --git a/extensions/warp-ipfs/src/store/document/conversation.rs b/extensions/warp-ipfs/src/store/document/conversation.rs new file mode 100644 index 000000000..c413cde8d --- /dev/null +++ b/extensions/warp-ipfs/src/store/document/conversation.rs @@ -0,0 +1,419 @@ +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, + sync::Arc, +}; + +use futures::{ + channel::{mpsc, oneshot}, + stream::FuturesUnordered, + FutureExt, SinkExt, StreamExt, +}; +use libipld::Cid; +use rust_ipfs::{Ipfs, IpfsPath}; +use tracing::warn; +use uuid::Uuid; +use warp::{ + crypto::DID, + error::Error, + raygun::{ConversationType, MessageEventKind}, +}; + +use crate::store::{conversation::ConversationDocument, keystore::Keystore}; + +use super::{root::RootDocumentMap, utils::GetLocalDag, ToCid}; + +#[allow(clippy::large_enum_variant)] +enum ConversationCommand { + GetDocument { + id: Uuid, + response: oneshot::Sender>, + }, + GetKeystore { + id: Uuid, + response: oneshot::Sender>, + }, + SetDocument { + document: ConversationDocument, + response: oneshot::Sender>, + }, + SetKeystore { + id: Uuid, + document: Keystore, + response: oneshot::Sender>, + }, + Delete { + id: Uuid, + response: oneshot::Sender>, + }, + Contains { + id: Uuid, + response: oneshot::Sender>, + }, + List { + response: oneshot::Sender, Error>>, + }, + Subscribe { + id: Uuid, + response: oneshot::Sender, Error>>, + }, +} + +#[derive(Debug, Clone)] +pub struct Conversations { + tx: mpsc::Sender, + task: Arc>, +} + +impl Drop for Conversations { + fn drop(&mut self) { + if Arc::strong_count(&self.task) == 1 && !self.task.is_finished() { + self.task.abort(); + } + } +} + +impl Conversations { + pub async fn new( + ipfs: &Ipfs, + path: Option, + keypair: Arc, + root: RootDocumentMap, + ) -> Self { + let cid = match path.as_ref() { + Some(path) => tokio::fs::read(path.join(".message_id")) + .await + .map(|bytes| String::from_utf8_lossy(&bytes).to_string()) + .ok() + .and_then(|cid_str| cid_str.parse().ok()), + None => None, + }; + + let (tx, rx) = futures::channel::mpsc::channel(1); + + let mut task = ConversationTask { + ipfs: ipfs.clone(), + event_handler: Default::default(), + keypair, + path, + cid, + rx, + root, + }; + + let handle = tokio::spawn(async move { + task.start().await; + }); + + Self { + tx, + task: Arc::new(handle), + } + } + + pub async fn get(&self, id: Uuid) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::GetDocument { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn get_keystore(&self, id: Uuid) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::GetKeystore { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn contains(&self, id: Uuid) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::Contains { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn set(&self, document: ConversationDocument) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::SetDocument { + document, + response: tx, + }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn set_keystore(&self, id: Uuid, document: Keystore) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::SetKeystore { + id, + document, + response: tx, + }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn delete(&self, id: Uuid) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::Delete { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn list(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::List { response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn subscribe( + &self, + id: Uuid, + ) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(ConversationCommand::Subscribe { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } +} + +struct ConversationTask { + ipfs: Ipfs, + cid: Option, + path: Option, + keypair: Arc, + event_handler: HashMap>, + root: RootDocumentMap, + rx: mpsc::Receiver, +} + +impl ConversationTask { + async fn start(&mut self) { + while let Some(command) = self.rx.next().await { + match command { + ConversationCommand::GetDocument { id, response } => { + let _ = response.send(self.get(id).await); + } + ConversationCommand::SetDocument { document, response } => { + let _ = response.send(self.set_document(document).await); + } + ConversationCommand::List { response } => { + let _ = response.send(self.list().await); + } + ConversationCommand::Delete { id, response } => { + let _ = response.send(self.delete(id).await); + } + ConversationCommand::Subscribe { id, response } => { + let _ = response.send(self.subscribe(id).await); + } + ConversationCommand::Contains { id, response } => { + let _ = response.send(Ok(self.contains(id).await)); + } + ConversationCommand::GetKeystore { id, response } => { + let _ = response.send(self.get_keystore(id).await); + } + ConversationCommand::SetKeystore { + id, + document, + response, + } => { + let _ = response.send(self.set_keystore(id, document).await); + } + } + } + } + + async fn get(&self, id: Uuid) -> Result { + let cid = match self.cid { + Some(cid) => cid, + None => return Err(Error::InvalidConversation), + }; + + let path = IpfsPath::from(cid).sub_path(&id.to_string())?; + + let document: ConversationDocument = path.get_local_dag(&self.ipfs).await?; + document.verify()?; + Ok(document) + } + + async fn get_keystore(&self, id: Uuid) -> Result { + if !self.contains(id).await { + return Err(Error::InvalidConversation); + } + + self.root.get_conversation_keystore(id).await + } + + async fn set_keystore(&mut self, id: Uuid, document: Keystore) -> Result<(), Error> { + if !self.contains(id).await { + return Err(Error::InvalidConversation); + } + + let mut map = self.root.get_conversation_keystore_map().await?; + + let id = id.to_string(); + let cid = document.to_cid(&self.ipfs).await?; + + map.insert(id, cid); + + self.set_keystore_map(map).await + } + + async fn delete(&mut self, id: Uuid) -> Result { + let cid = match self.cid { + Some(cid) => cid, + None => return Err(Error::InvalidConversation), + }; + + let mut conversation_map: BTreeMap = cid.get_local_dag(&self.ipfs).await?; + + let document_cid = match conversation_map.remove(&id.to_string()) { + Some(cid) => cid, + None => return Err(Error::InvalidConversation), + }; + + self.set_map(conversation_map).await?; + + if let Ok(mut ks_map) = self.root.get_conversation_keystore_map().await { + if ks_map.remove(&id.to_string()).is_some() { + if let Err(e) = self.set_keystore_map(ks_map).await { + warn!("Failed to remove keystore for {id}: {e}"); + } + } + } + + let document: ConversationDocument = document_cid.get_local_dag(&self.ipfs).await?; + Ok(document) + } + + async fn list(&self) -> Result, Error> { + let cid = match self.cid { + Some(cid) => cid, + None => return Ok(Vec::new()), + }; + + let conversation_map: BTreeMap = cid.get_local_dag(&self.ipfs).await?; + + let list = FuturesUnordered::from_iter( + conversation_map + .values() + .map(|cid| (*cid).get_local_dag(&self.ipfs).boxed()), + ) + .filter_map(|result: Result| async move { result.ok() }) + .collect::>() + .await; + + Ok(list) + } + + async fn contains(&self, id: Uuid) -> bool { + let cid = match self.cid { + Some(cid) => cid, + None => return false, + }; + + let conversation_map: BTreeMap = match cid.get_local_dag(&self.ipfs).await { + Ok(document) => document, + Err(_) => return false, + }; + + conversation_map.contains_key(&id.to_string()) + } + + async fn set_keystore_map(&mut self, map: BTreeMap) -> Result<(), Error> { + self.root.set_conversation_keystore_map(map).await + } + + async fn set_map(&mut self, map: BTreeMap) -> Result<(), Error> { + let cid = map.to_cid(&self.ipfs).await?; + + let old_map_cid = self.cid.replace(cid); + + self.ipfs.insert_pin(&cid, true).await?; + + if let Some(old_cid) = old_map_cid { + if self.ipfs.is_pinned(&old_cid).await.unwrap_or_default() { + self.ipfs.remove_pin(&old_cid, true).await?; + } + } + + self.cid = Some(cid); + + if let Some(path) = self.path.as_ref() { + let cid = cid.to_string(); + if let Err(e) = tokio::fs::write(path.join(".message_id"), cid).await { + tracing::log::error!("Error writing to '.message_id': {e}.") + } + } + + Ok(()) + } + + async fn set_document(&mut self, mut document: ConversationDocument) -> Result<(), Error> { + if let Some(creator) = document.creator.as_ref() { + if creator.eq(&self.keypair) + && matches!(document.conversation_type, ConversationType::Group) + { + document.sign(&self.keypair)?; + } + } + + document.verify()?; + + let mut map = match self.cid { + Some(cid) => cid.get_local_dag(&self.ipfs).await?, + None => BTreeMap::new(), + }; + + let id = document.id().to_string(); + let cid = document.to_cid(&self.ipfs).await?; + + map.insert(id, cid); + + self.set_map(map).await + } + + async fn subscribe( + &mut self, + id: Uuid, + ) -> Result, Error> { + if !self.contains(id).await { + return Err(Error::InvalidConversation); + } + + if let Some(tx) = self.event_handler.get(&id) { + return Ok(tx.clone()); + } + + let (tx, _) = tokio::sync::broadcast::channel(1024); + + self.event_handler.insert(id, tx.clone()); + + Ok(tx) + } +} diff --git a/extensions/warp-ipfs/src/store/document/root.rs b/extensions/warp-ipfs/src/store/document/root.rs index b5d3a6fcb..b05f475d6 100644 --- a/extensions/warp-ipfs/src/store/document/root.rs +++ b/extensions/warp-ipfs/src/store/document/root.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; use futures::{ channel::{mpsc::Receiver, oneshot}, @@ -6,9 +6,10 @@ use futures::{ }; use libipld::Cid; use rust_ipfs::{Ipfs, IpfsPath}; +use uuid::Uuid; use warp::{crypto::DID, error::Error}; -use crate::store::{identity::Request, VecExt}; +use crate::store::{identity::Request, keystore::Keystore, VecExt}; use super::{ utils::{GetLocalDag, ToCid}, @@ -68,6 +69,17 @@ pub enum RootDocumentCommand { GetBlockByList { response: oneshot::Sender, Error>>, }, + SetKeystore { + document: BTreeMap, + response: oneshot::Sender>, + }, + GetKeystoreMap { + response: oneshot::Sender, Error>>, + }, + GetKeystore { + id: Uuid, + response: oneshot::Sender>, + }, } #[derive(Debug, Clone)] @@ -281,6 +293,42 @@ impl RootDocumentMap { .await; rx.await.map_err(anyhow::Error::from)? } + + pub async fn get_conversation_keystore_map(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(RootDocumentCommand::GetKeystoreMap { response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn get_conversation_keystore(&self, id: Uuid) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(RootDocumentCommand::GetKeystore { id, response: tx }) + .await; + rx.await.map_err(anyhow::Error::from)? + } + + pub async fn set_conversation_keystore_map( + &self, + document: BTreeMap, + ) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .tx + .clone() + .send(RootDocumentCommand::SetKeystore { + document, + response: tx, + }) + .await; + rx.await.map_err(anyhow::Error::from)? + } } struct RootDocumentTask { @@ -337,6 +385,15 @@ impl RootDocumentTask { RootDocumentCommand::GetBlockByList { response } => { let _ = response.send(self.blockby_list().await); } + RootDocumentCommand::SetKeystore { document, response } => { + let _ = response.send(self.set_conversation_keystore(document).await); + } + RootDocumentCommand::GetKeystore { id, response } => { + let _ = response.send(self.get_conversation_keystore(id).await); + } + RootDocumentCommand::GetKeystoreMap { response } => { + let _ = response.send(self.get_conversation_keystore_map().await); + } } } } @@ -622,4 +679,33 @@ impl RootDocumentTask { } Ok(()) } + + async fn set_conversation_keystore(&mut self, map: BTreeMap) -> Result<(), Error> { + let mut document = self.get_root_document().await?; + document.conversations_keystore = Some(map.to_cid(&self.ipfs).await?); + self.set_root_document(document).await + } + + async fn get_conversation_keystore_map(&self) -> Result, Error> { + let document = self.get_root_document().await?; + + let cid = match document.conversations_keystore { + Some(cid) => cid, + None => return Ok(BTreeMap::new()), + }; + + cid.get_local_dag(&self.ipfs).await + } + + async fn get_conversation_keystore(&self, id: Uuid) -> Result { + let document = self.get_root_document().await?; + + let cid = match document.conversations_keystore { + Some(cid) => cid, + None => return Ok(Keystore::new(id)), + }; + + let path = IpfsPath::from(cid).sub_path(&id.to_string())?; + path.get_local_dag(&self.ipfs).await + } } diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 325a9735c..fb1446c33 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -7,17 +7,14 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use chrono::Utc; -use futures::channel::mpsc::{unbounded, Sender, UnboundedSender}; +use futures::channel::mpsc::{unbounded, UnboundedSender}; use futures::channel::oneshot::{self, Sender as OneshotSender}; -use futures::stream::{FuturesUnordered, SelectAll}; +use futures::stream::SelectAll; use futures::{SinkExt, Stream, StreamExt}; use rust_ipfs::{Ipfs, IpfsPath, PeerId, SubscriptionStream}; -use libipld::Cid; use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast::{self, Receiver as BroadcastReceiver, Sender as BroadcastSender}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tokio_stream::wrappers::ReadDirStream; +use tokio::sync::broadcast::{Receiver as BroadcastReceiver, Sender as BroadcastSender}; use uuid::Uuid; use warp::constellation::{Constellation, ConstellationProgressStream, Progression}; use warp::crypto::cipher::Cipher; @@ -41,22 +38,14 @@ use crate::store::{ use super::conversation::{ConversationDocument, MessageDocument}; use super::discovery::Discovery; -use super::document::utils::{GetLocalDag, ToCid}; +use super::document::conversation::Conversations; use super::identity::IdentityStore; use super::keystore::Keystore; use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, MessagingEvents}; -const PERMIT_AMOUNT: usize = 1; - type ConversationSender = UnboundedSender<(MessagingEvents, Option>>)>; -#[allow(clippy::large_enum_variant)] -enum ConversationEventHandle { - Set(ConversationDocument, oneshot::Sender>), - Get(oneshot::Sender>), -} - #[derive(Clone)] #[allow(dead_code)] pub struct MessageStore { @@ -67,20 +56,12 @@ pub struct MessageStore { path: Option, // conversation cid - conversation_cid: Arc>>, - conversation_keystore_cid: Arc>>, - - conversation_lock: Arc>>>, - conversation_sender: Arc>>, - conversation_task_tx: Arc>>>, // identity store - // Note: Not used for now; identity: IdentityStore, - // friend store - // friends: FriendsStore, + conversations: Conversations, // discovery discovery: Discovery, @@ -88,12 +69,11 @@ pub struct MessageStore { // filesystem instance filesystem: Option>, - stream_sender: Arc>>>, - stream_task: Arc>>>, stream_reqres_task: Arc>>>, stream_event_task: Arc>>>, stream_conversation_task: Arc>>>, + // Queue queue: Arc>>>, @@ -131,31 +111,27 @@ impl MessageStore { } let queue = Arc::new(Default::default()); - let conversation_cid = Arc::new(Default::default()); let did = Arc::new(get_keypair_did(ipfs.keypair()?)?); let spam_filter = Arc::new(check_spam.then_some(SpamFilter::default()?)); let stream_task = Arc::new(Default::default()); let stream_event_task = Arc::new(Default::default()); let with_friends = Arc::new(AtomicBool::new(with_friends)); - let stream_sender = Arc::new(Default::default()); - let conversation_lock = Arc::new(Default::default()); - let conversation_sender = Arc::default(); - let conversation_keystore_cid = Arc::default(); - let conversation_task_tx = Arc::default(); let stream_reqres_task = Arc::default(); + let conversation_sender = Arc::default(); let stream_conversation_task = Arc::default(); + let root = identity.root_document().clone(); + + let conversations = Conversations::new(&ipfs, path.clone(), did.clone(), root).await; + let store = Self { path, ipfs, - stream_sender, stream_task, stream_event_task, stream_reqres_task, - conversation_cid, - conversation_lock, conversation_sender, - conversation_task_tx, + conversations, identity, discovery, filesystem, @@ -164,15 +140,9 @@ impl MessageStore { event, spam_filter, with_friends, - conversation_keystore_cid, stream_conversation_task, }; - info!("Loading existing conversations task"); - if let Err(e) = store.load_conversations().await { - warn!("Failed to load conversations: {e}"); - } - info!("Loading queue"); if let Err(e) = store.load_queue().await { warn!("Failed to load queue: {e}"); @@ -184,6 +154,8 @@ impl MessageStore { .await? .boxed(); + let _ = store.load_conversations().await; + tokio::spawn({ let mut store = store.clone(); async move { @@ -209,7 +181,7 @@ impl MessageStore { continue; } }; - + let events = match serde_json::from_slice::(&data) { Ok(ev) => ev, Err(e) => { @@ -217,11 +189,11 @@ impl MessageStore { continue; } }; - + if let Err(e) = store.process_conversation(payload, events).await { error!("Error processing conversation: {e}"); - } - + } + } _ = interval.tick() => { if let Err(e) = store.process_queue().await { @@ -237,108 +209,10 @@ impl MessageStore { Ok(store) } - async fn conversation_event_handle(&self, conversation_id: Uuid) { - let (tx, mut rx) = futures::channel::mpsc::channel(1); - let conversation_cid = self.conversation_cid.clone(); - let ipfs = self.ipfs.clone(); - let own_did = self.did.clone(); - let path = self.path.clone(); - let task = tokio::spawn(async move { - while let Some(event) = rx.next().await { - match event { - ConversationEventHandle::Set(mut document, ret) => { - let result = { - let own_did = own_did.clone(); - let ipfs = ipfs.clone(); - let conversation_cid = conversation_cid.clone(); - let path = path.clone(); - async move { - let own_did = &*own_did; - - if let Some(creator) = document.creator.as_ref() { - if creator.eq(own_did) - && matches!( - document.conversation_type, - ConversationType::Group - ) - { - document.sign(own_did)?; - } - } - - document.verify()?; - - let new_cid = document.to_cid(&ipfs).await?; - - let old_cid = conversation_cid - .write() - .await - .insert(conversation_id, new_cid); - - if let Some(old_cid) = old_cid { - if new_cid != old_cid { - if ipfs.is_pinned(&old_cid).await? { - if let Err(e) = ipfs.remove_pin(&old_cid, false).await { - error!("Unable to remove pin on {old_cid}: {e}"); - } - } - if let Err(e) = ipfs.remove_block(old_cid).await { - error!("Unable to remove {old_cid}: {e}"); - } - } - } - - if let Some(path) = path.as_ref() { - let cid = new_cid.to_string(); - if let Err(e) = tokio::fs::write( - path.join(conversation_id.to_string()), - cid, - ) - .await - { - error!("Unable to save info to file: {e}"); - } - } - Ok::<_, Error>(()) - } - }; - - let _ = ret.send(result.await); - } - ConversationEventHandle::Get(ret) => { - let result = { - let conversation_cid = conversation_cid.clone(); - let ipfs = ipfs.clone(); - async move { - let map = conversation_cid.read().await; - let cid = map - .get(&conversation_id) - .ok_or(Error::InvalidConversation)?; - let conversation: ConversationDocument = - (*cid).get_local_dag(&ipfs).await?; - conversation.verify().map(|_| conversation) - } - }; - - let _ = ret.send(result.await); - } - } - } - }); - self.conversation_task_tx - .write() - .await - .insert(conversation_id, tx); - self.stream_conversation_task - .write() - .await - .insert(conversation_id, task); - } - async fn start_event_task(&self, conversation_id: Uuid) { info!("Event Task started for {conversation_id}"); let did = self.did.clone(); - let Ok(mut conversation) = self.get_conversation(conversation_id).await else { + let Ok(mut conversation) = self.conversations.get(conversation_id).await else { return; }; conversation.recipients.clear(); @@ -368,7 +242,8 @@ impl MessageStore { match conversation_type { ConversationType::Direct => { let recipient = store - .get_conversation(conversation_id) + .conversations + .get(conversation_id) .await .map(|c| c.recipients()) .unwrap_or_default() @@ -430,7 +305,7 @@ impl MessageStore { async fn start_reqres_task(&self, conversation_id: Uuid) { info!("RequestResponse Task started for {conversation_id}"); let did = self.did.clone(); - let Ok(conversation) = self.get_conversation(conversation_id).await else { + let Ok(conversation) = self.conversations.get(conversation_id).await else { return; }; @@ -464,7 +339,7 @@ impl MessageStore { } => match kind { ConversationRequestKind::Key => { let Ok(conversation) = - store.get_conversation(conversation_id).await + store.conversations.get(conversation_id).await else { continue; }; @@ -510,7 +385,7 @@ impl MessageStore { if let Err(e) = store .set_conversation_keystore( conversation_id, - &keystore, + keystore, ) .await { @@ -612,7 +487,7 @@ impl MessageStore { ConversationResponseKind::Key { key } => { let sender = payload.sender(); let Ok(conversation) = - store.get_conversation(conversation_id).await + store.conversations.get(conversation_id).await else { continue; }; @@ -669,7 +544,7 @@ impl MessageStore { if let Err(e) = store .set_conversation_keystore( conversation_id, - &keystore, + keystore, ) .await { @@ -699,7 +574,7 @@ impl MessageStore { kind: ConversationRequestKind::Key, }; - let mut conversation = self.get_conversation(conversation_id).await?; + let mut conversation = self.conversations.get(conversation_id).await?; if !conversation.recipients().contains(did) { //TODO: user is not a recipient of the conversation @@ -750,23 +625,12 @@ impl MessageStore { } async fn start_task(&self, conversation_id: Uuid, stream: SubscriptionStream) { - self.conversation_event_handle(conversation_id).await; - let (tx, mut rx) = unbounded(); self.conversation_sender .write() .await .insert(conversation_id, tx); - let (tx, _) = broadcast::channel(1024); - - self.stream_sender.write().await.insert(conversation_id, tx); - - self.conversation_lock - .write() - .await - .insert(conversation_id, Arc::new(Semaphore::new(PERMIT_AMOUNT))); - info!("Task started for {conversation_id}"); let did = self.did.clone(); @@ -795,7 +659,7 @@ impl MessageStore { let own_did = &*did; - let Ok(conversation) = store.get_conversation(conversation_id).await else { + let Ok(conversation) = store.conversations.get(conversation_id).await else { continue; }; @@ -864,10 +728,9 @@ impl MessageStore { direction: MessageDirection, opt: EventOpt, ) -> Result { - let _guard = self.conversation_queue(conversation_id).await?; let tx = self.get_conversation_sender(conversation_id).await?; - let mut document = self.get_conversation(conversation_id).await?; + let mut document = self.conversations.get(conversation_id).await?; let keystore = match document.conversation_type { ConversationType::Direct => None, @@ -934,7 +797,7 @@ impl MessageStore { messages.insert(message_document); document.set_message_list(&self.ipfs, messages).await?; - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; let event = match direction { MessageDirection::In => MessageEventKind::MessageReceived { @@ -1033,7 +896,7 @@ impl MessageStore { .await?; list.replace(message_document); document.set_message_list(&self.ipfs, list).await?; - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if let Err(e) = tx.send(MessageEventKind::MessageEdited { conversation_id, @@ -1074,7 +937,7 @@ impl MessageStore { document.delete_message(&self.ipfs, message_id).await?; - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if let Err(e) = tx.send(MessageEventKind::MessageDeleted { conversation_id, @@ -1128,7 +991,7 @@ impl MessageStore { list.replace(message_document); document.set_message_list(&self.ipfs, list).await?; - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if let Err(e) = tx.send(event) { error!("Error broadcasting event: {e}"); @@ -1181,7 +1044,7 @@ impl MessageStore { list.replace(message_document); document.set_message_list(&self.ipfs, list).await?; - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if let Err(e) = tx.send(MessageEventKind::MessageReactionAdded { conversation_id, @@ -1221,7 +1084,8 @@ impl MessageStore { list.replace(message_document); document.set_message_list(&self.ipfs, list).await?; - self.set_conversation(conversation_id, document).await?; + + self.conversations.set(document).await?; if let Err(e) = tx.send(MessageEventKind::MessageReactionRemoved { conversation_id, @@ -1250,7 +1114,7 @@ impl MessageStore { document.recipients = list; document.signature = Some(signature); - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; tokio::spawn({ let store = self.clone(); @@ -1284,7 +1148,7 @@ impl MessageStore { document.recipients = list; document.excluded.remove(&recipient); document.signature = Some(signature); - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if can_emit { if let Err(e) = tx.send(MessageEventKind::RecipientRemoved { @@ -1321,7 +1185,7 @@ impl MessageStore { document.name = (!name.is_empty()).then_some(name.to_string()); document.signature = Some(signature); - self.set_conversation(conversation_id, document).await?; + self.conversations.set(document).await?; if let Err(e) = tx.send(MessageEventKind::ConversationNameUpdated { conversation_id, @@ -1363,8 +1227,6 @@ impl MessageStore { task.abort(); } - self.stream_sender.write().await.remove(&conversation_id); - if let Some(task) = self.stream_task.write().await.remove(&conversation_id) { info!("Attempting to end task for {conversation_id}"); task.abort(); @@ -1377,25 +1239,6 @@ impl MessageStore { { tx.close_channel(); } - - if let Some(mut tx) = self - .conversation_task_tx - .write() - .await - .remove(&conversation_id) - { - tx.close_channel(); - } - - if let Some(permit) = self - .conversation_lock - .write() - .await - .remove(&conversation_id) - { - permit.close(); - drop(permit); - } } } @@ -1431,12 +1274,7 @@ impl MessageStore { convo.id() ); - let cid = convo.to_cid(&self.ipfs).await?; - if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.insert_pin(&cid, false).await?; - } - - self.conversation_cid.write().await.insert(convo.id(), cid); + self.conversations.set(convo.clone()).await?; let stream = match self.ipfs.pubsub_subscribe(convo.topic()).await { Ok(stream) => stream, @@ -1448,12 +1286,6 @@ impl MessageStore { self.start_task(convo.id(), stream).await; - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - if let Err(e) = tokio::fs::write(path.join(convo.id().to_string()), cid).await { - error!("Unable to save info to file: {e}"); - } - } if let Err(e) = self.event.send(RayGunEventKind::ConversationCreated { conversation_id: convo.id(), }) { @@ -1503,13 +1335,11 @@ impl MessageStore { //Although we verify internally, this is just as a precaution convo.verify()?; - let cid = convo.to_cid(&self.ipfs).await?; - if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.insert_pin(&cid, false).await?; - } - self.conversation_cid.write().await.insert(convo.id(), cid); + let topic = convo.topic(); - let stream = match self.ipfs.pubsub_subscribe(convo.topic()).await { + self.conversations.set(convo).await?; + + let stream = match self.ipfs.pubsub_subscribe(topic).await { Ok(stream) => stream, Err(e) => { error!("Error subscribing to conversation: {e}"); @@ -1517,42 +1347,30 @@ impl MessageStore { } }; - self.set_conversation_keystore(conversation_id, &keystore) + self.set_conversation_keystore(conversation_id, keystore) .await?; - self.start_task(convo.id(), stream).await; - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - if let Err(e) = tokio::fs::write(path.join(convo.id().to_string()), cid).await { - error!("Unable to save info to file: {e}"); - } - } - if let Err(e) = self.event.send(RayGunEventKind::ConversationCreated { - conversation_id: convo.id(), - }) { + self.start_task(conversation_id, stream).await; + + if let Err(e) = self + .event + .send(RayGunEventKind::ConversationCreated { conversation_id }) + { error!("Error broadcasting event: {e}"); } - tokio::spawn({ - let store = self.clone(); - let recipients = list - .iter() - .filter(|d| did.ne(d)) - .cloned() - .collect::>(); - async move { - for recipient in recipients { - if let Err(_e) = store.request_key(conversation_id, &recipient).await {} - } + for recipient in list.iter().filter(|d| did.ne(d)) { + if let Err(e) = self.request_key(conversation_id, recipient).await { + tracing::log::warn!("Failed to send exchange request to {recipient}: {e}"); } - }); + } } ConversationEvents::LeaveConversation { conversation_id, recipient, signature, } => { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; if !matches!(conversation.conversation_type, ConversationType::Group) { return Err(anyhow::anyhow!("Can only leave from a group conversation")); @@ -1585,8 +1403,6 @@ impl MessageStore { //so we can exclude the recipient drop(conversation); - let _guard = self.conversation_queue(conversation_id).await?; - { //Small validation context let context = format!("exclude {}", recipient); @@ -1594,7 +1410,7 @@ impl MessageStore { verify_serde_sig(recipient.clone(), &context, &signature)?; } - let mut conversation = self.get_conversation(conversation_id).await?; + let mut conversation = self.conversations.get(conversation_id).await?; //Validate again since we have a permit if !conversation.recipients.contains(&recipient) { @@ -1609,7 +1425,7 @@ impl MessageStore { entry.insert(signature); can_emit = true; } - self.set_conversation(conversation_id, conversation).await?; + self.conversations.set(conversation).await?; if can_emit { let tx = self.get_conversation_sender(conversation_id).await?; if let Err(e) = tx.send(MessageEventKind::RecipientRemoved { @@ -1629,7 +1445,7 @@ impl MessageStore { let sender = data.sender(); - match self.get_conversation(conversation_id).await { + match self.conversations.get(conversation_id).await { Ok(conversation) if conversation.recipients().contains(&sender) && matches!( @@ -1650,29 +1466,16 @@ impl MessageStore { self.end_task(conversation_id).await; - let conversation_cid = self - .conversation_cid - .write() - .await - .remove(&conversation_id) - .ok_or(Error::InvalidConversation)?; - - if self.ipfs.is_pinned(&conversation_cid).await? { - if let Err(e) = self.ipfs.remove_pin(&conversation_cid, false).await { - error!("Unable to remove pin from {conversation_cid}: {e}"); - } - } - let mut document: ConversationDocument = - conversation_cid.get_local_dag(&self.ipfs).await?; + self.conversations.delete(conversation_id).await?; + let topic = document.topic(); self.queue.write().await.remove(&sender); tokio::spawn({ let ipfs = self.ipfs.clone(); async move { - let _ = document.delete_all_message(ipfs.clone()).await.ok(); - ipfs.remove_block(conversation_cid).await.ok(); + let _ = document.delete_all_message(ipfs.clone()).await; } }); @@ -1680,20 +1483,6 @@ impl MessageStore { warn!("topic should have been unsubscribed after dropping conversation."); } - if let Some(path) = self.path.as_ref() { - if let Err(e) = - tokio::fs::remove_file(path.join(conversation_id.to_string())).await - { - error!("Unable to remove conversation: {e}"); - } - } - - // Delete a keystore, if any, assigned to the conversation. - let _ = self - .remove_conversation_keystore(conversation_id) - .await - .ok(); - if let Err(e) = self .event .send(RayGunEventKind::ConversationDeleted { conversation_id }) @@ -1776,6 +1565,12 @@ impl MessageStore { } impl MessageStore { + pub async fn get_conversation(&self, id: Uuid) -> Result { + let document = self.conversations.get(id).await?; + + Ok(document.into()) + } + pub async fn create_conversation(&mut self, did_key: &DID) -> Result { if self.with_friends.load(Ordering::SeqCst) && !self.identity.is_friend(did_key).await? { return Err(Error::FriendDoesntExist); @@ -1818,16 +1613,14 @@ impl MessageStore { let conversation = ConversationDocument::new_direct(own_did, [own_did.clone(), did_key.clone()])?; - let cid = conversation.to_cid(&self.ipfs).await?; - let convo_id = conversation.id(); let topic = conversation.topic(); - self.conversation_cid.write().await.insert(convo_id, cid); + self.conversations.set(conversation.clone()).await?; let stream = self.ipfs.pubsub_subscribe(topic).await?; - self.start_task(conversation.id(), stream).await; + self.start_task(convo_id, stream).await; let peer_id = did_to_libp2p_pub(did_key)?.to_peer_id(); @@ -1869,19 +1662,11 @@ impl MessageStore { } if let Err(e) = self.event.send(RayGunEventKind::ConversationCreated { - conversation_id: conversation.id(), + conversation_id: convo_id, }) { error!("Error broadcasting event: {e}"); } - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); - - if let Err(e) = tokio::fs::write(path.join(conversation.id().to_string()), cid).await { - error!("Unable to save info to file: {e}"); - } - } - Ok(Conversation::from(&conversation)) } @@ -1943,19 +1728,19 @@ impl MessageStore { let recipient = conversation.recipients(); - let cid = conversation.to_cid(&self.ipfs).await?; - let convo_id = conversation.id(); let topic = conversation.topic(); - self.conversation_cid.write().await.insert(convo_id, cid); - let mut keystore = Keystore::new(conversation.id()); + self.conversations.set(conversation).await?; + + let mut keystore = Keystore::new(convo_id); keystore.insert(own_did, own_did, warp::crypto::generate::<64>())?; - self.set_conversation_keystore(convo_id, &keystore).await?; + + self.set_conversation_keystore(convo_id, keystore).await?; let stream = self.ipfs.pubsub_subscribe(topic).await?; - self.start_task(conversation.id(), stream).await; + self.start_task(convo_id, stream).await; let peer_id_list = recipient .clone() @@ -1966,13 +1751,8 @@ impl MessageStore { .map(|(did, pk)| (did, pk.to_peer_id())) .collect::>(); - if let Some(path) = self.path.as_ref() { - let cid = cid.to_string(); + let conversation = self.conversations.get(convo_id).await?; - if let Err(e) = tokio::fs::write(path.join(conversation.id().to_string()), cid).await { - error!("Unable to save info to file: {e}"); - } - } let event = serde_json::to_vec(&ConversationEvents::NewGroupConversation { creator: own_did.clone(), name: conversation.name(), @@ -2021,20 +1801,11 @@ impl MessageStore { error!("Error broadcasting event: {e}"); } - tokio::spawn({ - let store = self.clone(); - let conversation_id = conversation.id(); - let recipients = recipient - .iter() - .filter(|d| own_did.ne(d)) - .cloned() - .collect::>(); - async move { - for recipient in recipients { - if let Err(_e) = store.request_key(conversation_id, &recipient).await {} - } + for recipient in recipient.iter().filter(|d| own_did.ne(d)) { + if let Err(e) = self.request_key(conversation.id(), recipient).await { + tracing::log::warn!("Failed to send exchange request to {recipient}: {e}"); } - }); + } Ok(Conversation::from(&conversation)) } @@ -2046,22 +1817,8 @@ impl MessageStore { ) -> Result<(), Error> { self.end_task(conversation_id).await; - let conversation_cid = self - .conversation_cid - .write() - .await - .remove(&conversation_id) - .ok_or(Error::InvalidConversation)?; - - if self.ipfs.is_pinned(&conversation_cid).await? { - if let Err(e) = self.ipfs.remove_pin(&conversation_cid, false).await { - error!("Unable to remove pin from {conversation_cid}: {e}"); - } - } let mut document_type: ConversationDocument = - conversation_cid.get_local_dag(&self.ipfs).await?; - - self.ipfs.remove_block(conversation_cid).await?; + self.conversations.delete(conversation_id).await?; if broadcast { let recipients = document_type.recipients(); @@ -2171,14 +1928,6 @@ impl MessageStore { } }); - if let Some(path) = self.path.as_ref() { - if let Err(e) = tokio::fs::remove_file(path.join(conversation_id.to_string())).await { - error!("Unable to remove conversation: {e}"); - } - } - - let _ = self.remove_conversation_keystore(conversation_id).await; - if let Err(e) = self .event .send(RayGunEventKind::ConversationDeleted { conversation_id }) @@ -2188,102 +1937,33 @@ impl MessageStore { Ok(()) } - async fn conversation_queue( - &self, - conversation_id: Uuid, - ) -> Result { - let permit = self - .conversation_lock - .read() - .await - .get(&conversation_id) - .cloned() - .ok_or(Error::InvalidConversation)?; - - permit - .acquire_owned() - .await - .map_err(anyhow::Error::from) - .map_err(Error::from) - } - pub async fn load_conversations(&self) -> Result<(), Error> { - let Some(path) = self.path.as_ref() else { - return Ok(()); - }; - - if !path.is_dir() { - return Err(Error::InvalidDirectory); - } - - let mut entry_stream = ReadDirStream::new(tokio::fs::read_dir(path).await?); + let list = self.conversations.list().await.unwrap_or_default(); - while let Some(entry) = entry_stream.next().await { - let entry = entry?; - let entry_path = entry.path(); - if entry_path.is_file() && !entry_path.ends_with(".messaging_queue") { - let Some(filename) = entry_path - .file_name() - .map(|file| file.to_string_lossy().to_string()) - else { - continue; - }; - - //TODO: Maybe check file extension instead - let slices = filename.split('.').collect::>(); - - let keystore = slices - .last() - .map(|s| s.ends_with("keystore")) - .unwrap_or_default(); - - let Some(file_id) = slices.first() else { - continue; - }; + for conversation in &list { + let task = { + let store = self.clone(); + async move { + conversation.verify()?; - let Ok(id) = Uuid::from_str(file_id) else { - continue; - }; + let recipients = conversation.recipients(); - let Ok(cid_str) = tokio::fs::read(entry_path) - .await - .map(|bytes| String::from_utf8_lossy(&bytes).to_string()) - else { - continue; - }; - if let Ok(cid) = cid_str.parse::() { - if keystore { - self.conversation_keystore_cid.write().await.insert(id, cid); - } else { - let task = { - let store = self.clone(); - async move { - let conversation: ConversationDocument = - cid.get_local_dag(&store.ipfs).await?; - conversation.verify()?; - store.conversation_cid.write().await.insert(id, cid); - let recipients = conversation.recipients(); - - for recipient in recipients { - if !store.discovery.contains(&recipient).await { - let _ = store.discovery.insert(recipient).await.ok(); - } - } + for recipient in recipients { + if !store.discovery.contains(&recipient).await { + let _ = store.discovery.insert(recipient).await.ok(); + } + } - let stream = - store.ipfs.pubsub_subscribe(conversation.topic()).await?; + let stream = store.ipfs.pubsub_subscribe(conversation.topic()).await?; - store.start_task(conversation.id(), stream).await; + store.start_task(conversation.id(), stream).await; - Ok::<_, Error>(()) - } - }; - - if let Err(e) = task.await { - error!("Error loading conversation: {e}"); - } - } + Ok::<_, Error>(()) } + }; + + if let Err(e) = task.await { + error!("Error loading conversation {}: {e}", conversation.id()); } } @@ -2291,17 +1971,7 @@ impl MessageStore { } pub async fn list_conversation_documents(&self) -> Result, Error> { - let list = FuturesUnordered::from_iter( - self.conversation_cid - .read() - .await - .values() - .map(|cid| async { (*cid).get_local_dag(&self.ipfs).await }), - ) - .filter_map(|res| async { res.ok() }) - .collect::>() - .await; - Ok(list) + self.conversations.list().await } pub async fn list_conversations(&self) -> Result, Error> { @@ -2311,84 +1981,25 @@ impl MessageStore { } pub async fn messages_count(&self, conversation_id: Uuid) -> Result { - self.get_conversation(conversation_id) + self.conversations + .get(conversation_id) .await? .messages_length(&self.ipfs) .await } pub async fn conversation_keystore(&self, conversation_id: Uuid) -> Result { - let guard = self.conversation_keystore_cid.read().await; - - let cid = guard - .get(&conversation_id) - .ok_or(Error::InvalidConversation)?; - - cid.get_local_dag(&self.ipfs).await + self.conversations.get_keystore(conversation_id).await } pub async fn set_conversation_keystore( &self, conversation_id: Uuid, - keystore: &Keystore, + keystore: Keystore, ) -> Result<(), Error> { - let cid = keystore.to_cid(&self.ipfs).await?; - - if !self.ipfs.is_pinned(&cid).await? { - self.ipfs.insert_pin(&cid, false).await?; - } - - let old_cid = self - .conversation_keystore_cid - .write() - .await - .insert(conversation_id, cid); - - if let Some(old_cid) = old_cid { - if old_cid != cid { - if self.ipfs.is_pinned(&old_cid).await? { - self.ipfs.remove_pin(&old_cid, false).await?; - } - if let Err(_e) = self.ipfs.remove_block(old_cid).await {} - } - } - - if let Some(path) = self.path.as_ref() { - let keystore_cid = cid.to_string(); - if let Err(e) = tokio::fs::write( - path.join(format!("{}.keystore", conversation_id)), - keystore_cid, - ) + self.conversations + .set_keystore(conversation_id, keystore) .await - { - error!("Unable to save info to file: {e}"); - } - } - - Ok(()) - } - - pub async fn remove_conversation_keystore(&self, conversation_id: Uuid) -> Result<(), Error> { - let mut writer = self.conversation_keystore_cid.write().await; - let cid = writer - .remove(&conversation_id) - .ok_or(Error::InvalidConversation)?; - drop(writer); - - if self.ipfs.is_pinned(&cid).await? { - self.ipfs.remove_pin(&cid, false).await?; - } - if let Err(_e) = self.ipfs.remove_block(cid).await {} - - if let Some(path) = self.path.as_ref() { - if let Err(e) = - tokio::fs::remove_file(path.join(format!("{}.keystore", conversation_id))).await - { - error!("Unable to remove keystore: {e}"); - } - } - - Ok(()) } async fn send_single_conversation_event( @@ -2451,7 +2062,7 @@ impl MessageStore { conversation_id: Uuid, message_id: Uuid, ) -> Result { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let keystore = match conversation.conversation_type { ConversationType::Direct => None, ConversationType::Group => self.conversation_keystore(conversation.id()).await.ok(), @@ -2472,7 +2083,7 @@ impl MessageStore { conversation_id: Uuid, message_id: Uuid, ) -> Result { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; if matches!(conversation.conversation_type, ConversationType::Group) { //TODO: Handle message status for group @@ -2518,7 +2129,7 @@ impl MessageStore { conversation: Uuid, opt: MessageOptions, ) -> Result { - let conversation = self.get_conversation(conversation).await?; + let conversation = self.conversations.get(conversation).await?; let keystore = match conversation.conversation_type { ConversationType::Direct => None, ConversationType::Group => self.conversation_keystore(conversation.id()).await.ok(), @@ -2547,65 +2158,17 @@ impl MessageStore { } pub async fn exist(&self, conversation: Uuid) -> bool { - self.conversation_cid - .read() + self.conversations + .contains(conversation) .await - .contains_key(&conversation) - } - - pub async fn get_conversation( - &self, - conversation_id: Uuid, - ) -> Result { - let mut task_tx = self - .conversation_task_tx - .read() - .await - .get(&conversation_id) - .cloned() - .ok_or(Error::InvalidConversation)?; - - let (tx, rx) = oneshot::channel(); - task_tx - .send(ConversationEventHandle::Get(tx)) - .await - .map_err(anyhow::Error::from)?; - rx.await.map_err(anyhow::Error::from)? - } - - pub async fn set_conversation( - &self, - conversation_id: Uuid, - document: ConversationDocument, - ) -> Result<(), Error> { - let mut task_tx = self - .conversation_task_tx - .read() - .await - .get(&conversation_id) - .cloned() - .ok_or(Error::InvalidConversation)?; - - let (tx, rx) = oneshot::channel(); - task_tx - .send(ConversationEventHandle::Set(document, tx)) - .await - .map_err(anyhow::Error::from)?; - rx.await.map_err(anyhow::Error::from)? + .unwrap_or_default() } pub async fn get_conversation_sender( &self, conversation_id: Uuid, ) -> Result, Error> { - let tx = self - .stream_sender - .read() - .await - .get(&conversation_id) - .ok_or(Error::InvalidConversation)? - .clone(); - Ok(tx) + self.conversations.subscribe(conversation_id).await } pub async fn get_conversation_receiver( @@ -2624,7 +2187,6 @@ impl MessageStore { conversation_id: Uuid, ) -> Result, Error> { let mut rx = self.get_conversation_receiver(conversation_id).await?; - Ok(async_stream::stream! { loop { match rx.recv().await { @@ -2653,8 +2215,7 @@ impl MessageStore { }); } - let _guard = self.conversation_queue(conversation_id).await?; - let mut conversation = self.get_conversation(conversation_id).await?; + let mut conversation = self.conversations.get(conversation_id).await?; if matches!(conversation.conversation_type, ConversationType::Direct) { return Err(Error::InvalidConversation); @@ -2672,9 +2233,9 @@ impl MessageStore { conversation.name = (!name.is_empty()).then_some(name.to_string()); - self.set_conversation(conversation_id, conversation).await?; + self.conversations.set(conversation).await?; - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let Some(signature) = conversation.signature.clone() else { return Err(Error::InvalidSignature); @@ -2700,10 +2261,7 @@ impl MessageStore { conversation_id: Uuid, did_key: &DID, ) -> Result<(), Error> { - //Note: The Semaphore is used to "block" the stream while we add a recipient so we could have the next chance of - let _guard = self.conversation_queue(conversation_id).await?; - - let mut conversation = self.get_conversation(conversation_id).await?; + let mut conversation = self.conversations.get(conversation_id).await?; if matches!(conversation.conversation_type, ConversationType::Direct) { return Err(Error::InvalidConversation); @@ -2733,9 +2291,9 @@ impl MessageStore { conversation.recipients.push(did_key.clone()); - self.set_conversation(conversation_id, conversation).await?; + self.conversations.set(conversation).await?; - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let Some(signature) = conversation.signature.clone() else { return Err(Error::InvalidSignature); }; @@ -2768,7 +2326,6 @@ impl MessageStore { .await?; if let Err(_e) = self.request_key(conversation_id, did_key).await {} - drop(_guard); Ok(()) } @@ -2778,9 +2335,7 @@ impl MessageStore { did_key: &DID, broadcast: bool, ) -> Result<(), Error> { - let _guard = self.conversation_queue(conversation_id).await?; - - let mut conversation = self.get_conversation(conversation_id).await?; + let mut conversation = self.conversations.get(conversation_id).await?; if matches!(conversation.conversation_type, ConversationType::Direct) { return Err(Error::InvalidConversation); @@ -2805,11 +2360,9 @@ impl MessageStore { } conversation.recipients.retain(|did| did.ne(did_key)); - self.set_conversation(conversation_id, conversation).await?; - - drop(_guard); + self.conversations.set(conversation).await?; - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let Some(signature) = conversation.signature.clone() else { return Err(Error::InvalidSignature); @@ -2892,7 +2445,7 @@ impl MessageStore { conversation_id: Uuid, messages: Vec, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; if messages.is_empty() { @@ -2961,7 +2514,7 @@ impl MessageStore { message_id: Uuid, messages: Vec, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; if messages.is_empty() { return Err(Error::EmptyMessage); @@ -3024,7 +2577,7 @@ impl MessageStore { message_id: Uuid, messages: Vec, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; if messages.is_empty() { @@ -3090,7 +2643,7 @@ impl MessageStore { message_id: Uuid, broadcast: bool, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; let event = MessagingEvents::Delete { @@ -3118,7 +2671,7 @@ impl MessageStore { message_id: Uuid, state: PinState, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; let own_did = &*self.did; @@ -3156,7 +2709,7 @@ impl MessageStore { state: ReactionState, emoji: String, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; let own_did = &*self.did; @@ -3212,7 +2765,7 @@ impl MessageStore { }); } } - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let mut tx = self.conversation_tx(conversation_id).await?; //TODO: Send directly if constellation isnt present // this will require uploading to ipfs directly from here @@ -3525,7 +3078,7 @@ impl MessageStore { conversation_id: Uuid, event: MessageEvent, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let own_did = &*self.did; let event = MessagingEvents::Event { @@ -3542,7 +3095,7 @@ impl MessageStore { conversation_id: Uuid, event: MessageEvent, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let own_did = &*self.did; let event = MessagingEvents::Event { @@ -3559,7 +3112,7 @@ impl MessageStore { conversation_id: Uuid, event: MessagingEvents, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation_id).await?; + let conversation = self.conversations.get(conversation_id).await?; let own_did = &*self.did; @@ -3612,7 +3165,7 @@ impl MessageStore { event: MessagingEvents, queue: bool, ) -> Result<(), Error> { - let conversation = self.get_conversation(conversation).await?; + let conversation = self.conversations.get(conversation).await?; let own_did = &*self.did;