From f7992d771461b744be86301211bc56b73d87847d Mon Sep 17 00:00:00 2001 From: ashneverdawn <8341280+ashneverdawn@users.noreply.github.com> Date: Tue, 3 Dec 2024 13:37:20 -0400 Subject: [PATCH] Feat: community message attachments (#640) --- .../src/store/message/community_task.rs | 145 ++++- extensions/warp-ipfs/tests/community.rs | 599 +++++++++++++++++- warp/src/raygun/community.rs | 1 + 3 files changed, 718 insertions(+), 27 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index cf13174ed..c3773a6df 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -30,8 +30,8 @@ use warp::raygun::community::{ }; use warp::raygun::{ AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions, - MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind, - ReactionState, + MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState, + RayGunEventKind, ReactionState, }; use warp::{ crypto::{cipher::Cipher, generate}, @@ -68,6 +68,8 @@ use crate::{ }, }; +use super::attachment::AttachmentStream; + type AttachmentOneshot = (MessageDocument, oneshot::Sender>); #[allow(dead_code)] @@ -321,7 +323,7 @@ pub struct CommunityTask { community_id: Uuid, ipfs: Ipfs, root: RootDocumentMap, - _file: FileStore, + file: FileStore, identity: IdentityStore, discovery: Discovery, pending_key_exchange: IndexMap, bool)>>, @@ -332,7 +334,7 @@ pub struct CommunityTask { event_stream: SubscriptionStream, request_stream: SubscriptionStream, - _attachment_tx: futures::channel::mpsc::Sender, + attachment_tx: futures::channel::mpsc::Sender, attachment_rx: futures::channel::mpsc::Receiver, message_command: futures::channel::mpsc::Sender, event_broadcast: tokio::sync::broadcast::Sender, @@ -403,7 +405,7 @@ impl CommunityTask { community_id, ipfs: ipfs.clone(), root: root.clone(), - _file: file.clone(), + file: file.clone(), identity: identity.clone(), discovery: discovery.clone(), pending_key_exchange: Default::default(), @@ -414,7 +416,7 @@ impl CommunityTask { request_stream, event_stream, - _attachment_tx: atx, + attachment_tx: atx, attachment_rx: arx, event_broadcast: btx, _event_subscription, @@ -3304,29 +3306,132 @@ impl CommunityTask { } pub async fn attach_to_community_channel_message( &mut self, - _channel_id: Uuid, - _message_id: Option, - _locations: Vec, - _message: Vec, + channel_id: Uuid, + message_id: Option, + locations: Vec, + messages: Vec, ) -> Result<(Uuid, AttachmentEventStream), Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::SendMessages, + channel_id, + ) { + return Err(Error::Unauthorized); + } + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::SendAttachments, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let keystore = pubkey_or_keystore(&*self)?; + + let stream = AttachmentStream::new( + &self.ipfs, + self.root.keypair(), + &self.identity.did_key(), + &self.file, + channel_id, + keystore, + self.attachment_tx.clone(), + ) + .set_reply(message_id) + .set_locations(locations)? + .set_lines(messages)?; + + let message_id = stream.message_id(); + + Ok((message_id, stream.boxed())) } pub async fn download_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, - _path: PathBuf, + channel_id: Uuid, + message_id: Uuid, + file: String, + path: PathBuf, ) -> Result { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel.get_message_document(&self.ipfs, message_id).await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download(&self.ipfs, path, &members, None); + + Ok(stream) } pub async fn download_stream_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, + channel_id: Uuid, + message_id: Uuid, + file: String, ) -> Result>, Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel.get_message_document(&self.ipfs, message_id).await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download_stream(&self.ipfs, &members, None); + + Ok(stream) } async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> { diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index cd6abd0af..93d511765 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -2,15 +2,18 @@ mod common; #[cfg(test)] mod test { use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryStreamExt}; - use std::time::Duration; + use std::{path::PathBuf, time::Duration}; use uuid::Uuid; - use warp::raygun::{ - community::{ - Community, CommunityChannelPermission, CommunityChannelType, CommunityInvite, - CommunityPermission, RayGunCommunity, + use warp::{ + constellation::{Constellation, Progression}, + raygun::{ + community::{ + Community, CommunityChannelPermission, CommunityChannelType, CommunityInvite, + CommunityPermission, RayGunCommunity, + }, + Location, Message, MessageEvent, MessageEventKind, MessageEventStream, MessageOptions, + MessageReference, MessageStatus, Messages, RayGunEventKind, RayGunStream, }, - Message, MessageEvent, MessageEventKind, MessageEventStream, MessageOptions, - MessageReference, MessageStatus, Messages, RayGunEventKind, RayGunStream, }; #[cfg(target_arch = "wasm32")] @@ -4310,4 +4313,586 @@ mod test { .await?; Ok(()) } + #[async_test] + async fn unauthorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::SendAttachments, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::SendAttachments, + }, + ) + .await?; + + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let result = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await; + + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => { + assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)); + } + } + Ok(()) + } + #[async_test] + async fn authorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let _ = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await?; + Ok(()) + } + #[async_test] + async fn unauthorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::ViewChannel, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::ViewChannel, + }, + ) + .await?; + + let result = instance_b + .download_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name.to_string(), + PathBuf::new(), + ) + .await; + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)), + } + Ok(()) + } + #[async_test] + async fn authorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + let path = PathBuf::from("test::authorized_download_from_community_channel_message"); + let mut constellation_progress_stream = instance_b + .download_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name.to_string(), + path.clone(), + ) + .await?; + loop { + if let Some(progress) = constellation_progress_stream.next().await { + match progress { + Progression::ProgressComplete { name: _, total } => { + assert_eq!(total, Some(file.len())); + break; + } + Progression::ProgressFailed { + name: _, + last_size: _, + error, + } => panic!("progress shouldn't have failed: {}", error), + Progression::CurrentProgress { + name, + current, + total, + } => { + println!("name: {}, current: {}, total: {:?}", name, current, total) + } + } + } + } + let contents = fs::read(path.clone()).await?; + assert_eq!(contents, file.to_vec()); + fs::remove_file(path).await?; + + let msg = instance_a + .get_community_channel_message(community.id(), channel.id(), message_id) + .await?; + assert_eq!(msg.lines(), &message); + assert_eq!(msg.attachments().len(), 1); + Ok(()) + } + #[async_test] + async fn unauthorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = + Some("test::unauthorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::ViewChannel, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::ViewChannel, + }, + ) + .await?; + + let result = instance_b + .download_stream_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name, + ) + .await; + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)), + } + Ok(()) + } + #[async_test] + async fn authorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = + Some("test::authorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + let mut download_stream = instance_b + .download_stream_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name, + ) + .await?; + + let mut bytes = vec![]; + loop { + match download_stream.next().await { + Some(b) => { + let mut b = b.unwrap().to_vec(); + bytes.append(&mut b); + } + None => break, + } + } + assert_eq!(bytes, file.to_vec()); + + let msg = instance_a + .get_community_channel_message(community.id(), channel.id(), message_id) + .await?; + assert_eq!(msg.lines(), &message); + assert_eq!(msg.attachments().len(), 1); + Ok(()) + } } diff --git a/warp/src/raygun/community.rs b/warp/src/raygun/community.rs index 65e6cda34..219470a05 100644 --- a/warp/src/raygun/community.rs +++ b/warp/src/raygun/community.rs @@ -274,6 +274,7 @@ pub enum CommunityPermission { pub enum CommunityChannelPermission { ViewChannel, SendMessages, + SendAttachments, } #[async_trait::async_trait]