diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index fee90c845..1f803cbf9 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -1244,19 +1244,8 @@ impl Friends for WarpIpfs { #[async_trait::async_trait] impl FriendsEvent for WarpIpfs { async fn subscribe(&mut self) -> Result { - let mut rx = self.multipass_tx.subscribe(); - - let stream = async_stream::stream! { - loop { - match rx.recv().await { - Ok(event) => yield event, - Err(broadcast::error::RecvError::Closed) => break, - Err(_) => {} - }; - } - }; - - Ok(MultiPassEventStream(Box::pin(stream))) + let store = self.identity_store(true).await?; + Ok(MultiPassEventStream(store.subscribe())) } } diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 8a7be2dea..b689f0ec5 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -2398,6 +2398,23 @@ impl IdentityStore { pub async fn is_friend(&self, pubkey: &DID) -> Result { self.friends_list().await.map(|list| list.contains(pubkey)) } + + #[tracing::instrument(skip(self))] + pub fn subscribe(&self) -> futures::stream::BoxStream<'static, MultiPassEventKind> { + let mut rx = self.event.subscribe(); + + let stream = async_stream::stream! { + loop { + match rx.recv().await { + Ok(event) => yield event, + Err(broadcast::error::RecvError::Closed) => break, + Err(_) => {} + }; + } + }; + + stream.boxed() + } } impl IdentityStore { diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index fc14e0602..dc4a34d70 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -22,6 +22,7 @@ use warp::crypto::{generate, DID}; use warp::error::Error; use warp::logging::tracing::log::{debug, error, info, trace}; use warp::logging::tracing::warn; +use warp::multipass::MultiPassEventKind; use warp::raygun::{ AttachmentEventStream, AttachmentKind, Conversation, ConversationType, EmbedState, Location, Message, MessageEvent, MessageEventKind, MessageOptions, MessageStatus, MessageStream, @@ -33,7 +34,8 @@ use crate::spam_filter::SpamFilter; use crate::store::payload::Payload; use crate::store::{ connected_to_peer, ecdh_decrypt, ecdh_encrypt, get_keypair_did, sign_serde, - ConversationRequestKind, ConversationRequestResponse, ConversationResponseKind, PeerTopic, + ConversationRequestKind, ConversationRequestResponse, ConversationResponseKind, DidExt, + PeerTopic, }; use super::conversation::{ConversationDocument, MessageDocument}; @@ -41,7 +43,7 @@ use super::discovery::Discovery; use super::document::conversation::Conversations; use super::identity::IdentityStore; use super::keystore::Keystore; -use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, MessagingEvents}; +use super::{verify_serde_sig, ConversationEvents, MessagingEvents}; type ConversationSender = UnboundedSender<(MessagingEvents, Option>>)>; @@ -163,6 +165,8 @@ impl MessageStore { let mut interval = tokio::time::interval(Duration::from_millis(interval_ms)); + let mut identity_stream = store.identity.subscribe(); + loop { tokio::select! { Some(message) = stream.next() => { @@ -195,6 +199,11 @@ impl MessageStore { } } + Some(id_event) = identity_stream.next() => { + if let Err(e) = store.process_identity_events(id_event).await { + warn!("Failed to process event: {e}"); + } + } _ = interval.tick() => { if let Err(e) = store.process_queue().await { error!("Error processing queue: {e}"); @@ -209,6 +218,81 @@ impl MessageStore { Ok(store) } + async fn process_identity_events(&mut self, event: MultiPassEventKind) -> Result<(), Error> { + match event { + MultiPassEventKind::FriendAdded { did } => { + if !self.with_friends.load(Ordering::SeqCst) { + return Ok(()); + } + + match self.create_conversation(&did).await { + Ok(_) | Err(Error::ConversationExist { .. }) => return Ok(()), + Err(e) => return Err(e), + } + } + + MultiPassEventKind::Blocked { did } | MultiPassEventKind::BlockedBy { did } => { + let list = self.conversations.list().await?; + + for conversation in list.iter().filter(|c| c.recipients().contains(&did)) { + let id = conversation.id(); + match conversation.conversation_type { + ConversationType::Direct => { + if let Err(e) = self.delete_conversation(id, true).await { + warn!("Failed to delete conversation {id}: {e}"); + continue; + } + } + ConversationType::Group => { + if conversation.creator != Some((*self.did).clone()) { + continue; + } + + if let Err(e) = self.remove_recipient(id, &did, true).await { + warn!("Failed to remove {did} from conversation {id}: {e}"); + continue; + } + } + } + } + } + MultiPassEventKind::FriendRemoved { did } => { + if !self.with_friends.load(Ordering::SeqCst) { + return Ok(()); + } + + let list = self.conversations.list().await?; + + for conversation in list.iter().filter(|c| c.recipients().contains(&did)) { + let id = conversation.id(); + match conversation.conversation_type { + ConversationType::Direct => { + if let Err(e) = self.delete_conversation(id, true).await { + warn!("Failed to delete conversation {id}: {e}"); + continue; + } + } + ConversationType::Group => { + if conversation.creator != Some((*self.did).clone()) { + continue; + } + + if let Err(e) = self.remove_recipient(id, &did, true).await { + warn!("Failed to remove {did} from conversation {id}: {e}"); + continue; + } + } + } + } + } + MultiPassEventKind::IdentityOnline { .. } => { + //TODO: Check queue and process any entry once peer is subscribed to the respective topics. + } + _ => {} + } + Ok(()) + } + async fn start_event_task(&self, conversation_id: Uuid) { info!("Event Task started for {conversation_id}"); @@ -466,8 +550,8 @@ impl MessageStore { .ipfs .pubsub_peers(Some(topic.clone())) .await?; - let peer_id = did_to_libp2p_pub(&sender) - .map(|pk| pk.to_peer_id())?; + + let peer_id = sender.to_peer_id()?; let bytes = payload.to_bytes()?; @@ -628,7 +712,7 @@ impl MessageStore { let topic = conversation.reqres_topic(did); let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?; - let peer_id = did_to_libp2p_pub(did).map(|pk| pk.to_peer_id())?; + let peer_id = did.to_peer_id()?; if !peers.contains(&peer_id) || (peers.contains(&peer_id) @@ -715,7 +799,6 @@ impl MessageStore { Cipher::direct_decrypt(data.data(), &key) } }; - drop(conversation); let bytes = match bytes_results { Ok(b) => b, @@ -1656,7 +1739,7 @@ impl MessageStore { self.start_task(convo_id, stream).await; - let peer_id = did_to_libp2p_pub(did_key)?.to_peer_id(); + let peer_id = did_key.to_peer_id()?; let event = ConversationEvents::NewConversation { recipient: own_did.clone(), @@ -1781,8 +1864,7 @@ impl MessageStore { .iter() .filter(|did| own_did.ne(did)) .map(|did| (did.clone(), did)) - .filter_map(|(a, b)| did_to_libp2p_pub(b).map(|pk| (a, pk)).ok()) - .map(|(did, pk)| (did, pk.to_peer_id())) + .filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok()) .collect::>(); let conversation = self.conversations.get(convo_id).await?; @@ -1891,8 +1973,8 @@ impl MessageStore { .iter() .filter(|did| own_did.ne(did)) .map(|did| (did.clone(), did)) - .filter_map(|(a, b)| did_to_libp2p_pub(b).map(|pk| (a, pk)).ok()) - .map(|(did, pk)| (did, pk.to_peer_id())) + .filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok()) + .map(|(did, pk)| (did, pk)) .collect::>(); let event = serde_json::to_vec(&ConversationEvents::DeleteConversation { @@ -2051,7 +2133,7 @@ impl MessageStore { let payload = Payload::new(own_did, &bytes, &signature); - let peer_id = did_to_libp2p_pub(did_key)?.to_peer_id(); + let peer_id = did_key.to_peer_id()?; let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?; let mut time = true; @@ -3238,7 +3320,7 @@ impl MessageStore { .iter() .filter(|did| own_did.ne(did)) { - let peer_id = did_to_libp2p_pub(recipient)?.to_peer_id(); + let peer_id = recipient.to_peer_id()?; // We want to confirm that there is atleast one peer subscribed before attempting to send a message match peers.contains(&peer_id) { diff --git a/extensions/warp-ipfs/tests/direct.rs b/extensions/warp-ipfs/tests/direct.rs index f1c612479..3721a4f46 100644 --- a/extensions/warp-ipfs/tests/direct.rs +++ b/extensions/warp-ipfs/tests/direct.rs @@ -4,8 +4,12 @@ mod common; mod test { use futures::StreamExt; use std::time::Duration; - use warp::raygun::{ - ConversationType, MessageEvent, MessageEventKind, PinState, RayGunEventKind, ReactionState, + use warp::{ + multipass::MultiPassEventKind, + raygun::{ + ConversationType, MessageEvent, MessageEventKind, PinState, RayGunEventKind, + ReactionState, + }, }; use crate::common::create_accounts_and_chat; @@ -808,4 +812,110 @@ mod test { Ok(()) } + + #[tokio::test] + async fn delete_conversation_when_blocked() -> anyhow::Result<()> { + let accounts = create_accounts_and_chat(vec![ + ( + None, + None, + Some("test::delete_conversation_when_blocked".into()), + ), + ( + None, + None, + Some("test::delete_conversation_when_blocked".into()), + ), + ]) + .await?; + + let (mut _account_a, mut chat_a, did_a, _) = accounts.first().cloned().unwrap(); + let (mut _account_b, mut chat_b, did_b, _) = accounts.last().cloned().unwrap(); + + let mut account_subscribe_a = _account_a.subscribe().await?; + let mut account_subscribe_b = _account_b.subscribe().await?; + + let mut chat_subscribe_a = chat_a.subscribe().await?; + let mut chat_subscribe_b = chat_b.subscribe().await?; + + chat_a.create_conversation(&did_b).await?; + + let id_a = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_a.next().await + { + break conversation_id; + } + } + }) + .await?; + + let id_b = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_b.next().await + { + break conversation_id; + } + } + }) + .await?; + + assert_eq!(id_a, id_b); + + let conversation = chat_a.get_conversation(id_a).await?; + assert_eq!(conversation.conversation_type(), ConversationType::Direct); + assert_eq!(conversation.recipients().len(), 2); + assert!(conversation.recipients().contains(&did_a)); + assert!(conversation.recipients().contains(&did_b)); + + _account_a.block(&did_b).await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::Blocked { did }) = account_subscribe_a.next().await + { + assert_eq!(did, did_b); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationDeleted { .. }) = + chat_subscribe_a.next().await + { + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::BlockedBy { did }) = + account_subscribe_b.next().await + { + assert_eq!(did, did_a); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationDeleted { .. }) = + chat_subscribe_b.next().await + { + break; + } + } + }) + .await?; + Ok(()) + } } diff --git a/extensions/warp-ipfs/tests/group.rs b/extensions/warp-ipfs/tests/group.rs index 5613f303b..d7124de82 100644 --- a/extensions/warp-ipfs/tests/group.rs +++ b/extensions/warp-ipfs/tests/group.rs @@ -5,7 +5,10 @@ mod test { use crate::common::create_accounts_and_chat; use futures::StreamExt; - use warp::raygun::{ConversationType, MessageEventKind, RayGunEventKind}; + use warp::{ + multipass::MultiPassEventKind, + raygun::{ConversationType, MessageEventKind, RayGunEventKind}, + }; #[tokio::test] async fn create_empty_group_conversation() -> anyhow::Result<()> { @@ -683,4 +686,308 @@ mod test { assert_eq!(message_c, message_d); Ok(()) } + + #[tokio::test] + async fn remove_recipient_from_conversation_when_blocked() -> anyhow::Result<()> { + let accounts = create_accounts_and_chat(vec![ + ( + None, + None, + Some("test::remove_recipient_from_conversation_when_blocked".into()), + ), + ( + None, + None, + Some("test::remove_recipient_from_conversation_when_blocked".into()), + ), + ( + None, + None, + Some("test::remove_recipient_from_conversation_when_blocked".into()), + ), + ]) + .await?; + + let (mut _account_a, mut chat_a, did_a, _) = accounts[0].clone(); + let (_account_b, mut chat_b, did_b, _) = accounts[1].clone(); + let (mut _account_c, mut chat_c, did_c, _) = accounts[2].clone(); + + let mut account_subscribe_a = _account_a.subscribe().await?; + let mut account_subscribe_c = _account_c.subscribe().await?; + + let mut chat_subscribe_a = chat_a.subscribe().await?; + let mut chat_subscribe_b = chat_b.subscribe().await?; + let mut chat_subscribe_c = chat_c.subscribe().await?; + + chat_a + .create_group_conversation(None, vec![did_b.clone(), did_c.clone()]) + .await?; + + let id_a = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_a.next().await + { + break conversation_id; + } + } + }) + .await?; + + let id_b = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_b.next().await + { + break conversation_id; + } + } + }) + .await?; + + let id_c = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_c.next().await + { + break conversation_id; + } + } + }) + .await?; + + assert_eq!(id_a, id_b); + assert_eq!(id_b, id_c); + + let conversation = chat_a.get_conversation(id_a).await?; + assert_eq!(conversation.conversation_type(), ConversationType::Group); + assert_eq!(conversation.recipients().len(), 3); + assert!(conversation.recipients().contains(&did_a)); + assert!(conversation.recipients().contains(&did_b)); + assert!(conversation.recipients().contains(&did_c)); + + + let mut conversation_a = chat_a.get_conversation_stream(id_a).await?; + let mut conversation_b = chat_b.get_conversation_stream(id_b).await?; + + _account_a.block(&did_c).await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::Blocked { did }) = account_subscribe_a.next().await + { + assert_eq!(did, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MessageEventKind::RecipientRemoved { + conversation_id, + recipient, + }) = conversation_a.next().await + { + assert_eq!(conversation_id, id_a); + assert_eq!(recipient, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MessageEventKind::RecipientRemoved { + conversation_id, + recipient, + }) = conversation_b.next().await + { + assert_eq!(conversation_id, id_a); + assert_eq!(recipient, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::BlockedBy { did }) = + account_subscribe_c.next().await + { + assert_eq!(did, did_a); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationDeleted { .. }) = + chat_subscribe_c.next().await + { + break; + } + } + }) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn delete_group_conversation_when_blocking_creator() -> anyhow::Result<()> { + let accounts = create_accounts_and_chat(vec![ + ( + None, + None, + Some("test::delete_group_conversation_when_blocking_creator".into()), + ), + ( + None, + None, + Some("test::delete_group_conversation_when_blocking_creator".into()), + ), + ( + None, + None, + Some("test::delete_group_conversation_when_blocking_creator".into()), + ), + ]) + .await?; + + let (mut _account_a, mut chat_a, did_a, _) = accounts[0].clone(); + let (_account_b, mut chat_b, did_b, _) = accounts[1].clone(); + let (mut _account_c, mut chat_c, did_c, _) = accounts[2].clone(); + + let mut account_subscribe_a = _account_a.subscribe().await?; + let mut account_subscribe_c = _account_c.subscribe().await?; + + let mut chat_subscribe_a = chat_a.subscribe().await?; + let mut chat_subscribe_b = chat_b.subscribe().await?; + let mut chat_subscribe_c = chat_c.subscribe().await?; + + chat_a + .create_group_conversation(None, vec![did_b.clone(), did_c.clone()]) + .await?; + + let id_a = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_a.next().await + { + break conversation_id; + } + } + }) + .await?; + + let id_b = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_b.next().await + { + break conversation_id; + } + } + }) + .await?; + + let id_c = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationCreated { conversation_id }) = + chat_subscribe_c.next().await + { + break conversation_id; + } + } + }) + .await?; + + assert_eq!(id_a, id_b); + assert_eq!(id_b, id_c); + + let conversation = chat_a.get_conversation(id_a).await?; + assert_eq!(conversation.conversation_type(), ConversationType::Group); + assert_eq!(conversation.recipients().len(), 3); + assert!(conversation.recipients().contains(&did_a)); + assert!(conversation.recipients().contains(&did_b)); + assert!(conversation.recipients().contains(&did_c)); + + + let mut conversation_a = chat_a.get_conversation_stream(id_a).await?; + let mut conversation_b = chat_b.get_conversation_stream(id_b).await?; + + _account_c.block(&did_a).await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::BlockedBy { did }) = account_subscribe_a.next().await + { + assert_eq!(did, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MessageEventKind::RecipientRemoved { + conversation_id, + recipient, + }) = conversation_a.next().await + { + assert_eq!(conversation_id, id_a); + assert_eq!(recipient, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MessageEventKind::RecipientRemoved { + conversation_id, + recipient, + }) = conversation_b.next().await + { + assert_eq!(conversation_id, id_a); + assert_eq!(recipient, did_c); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(MultiPassEventKind::Blocked { did }) = + account_subscribe_c.next().await + { + assert_eq!(did, did_a); + break; + } + } + }) + .await?; + + tokio::time::timeout(Duration::from_secs(60), async { + loop { + if let Some(RayGunEventKind::ConversationDeleted { .. }) = + chat_subscribe_c.next().await + { + break; + } + } + }) + .await?; + + Ok(()) + } }