diff --git a/extensions/warp-ipfs/src/store/community.rs b/extensions/warp-ipfs/src/store/community.rs index 209406db5..e2707b003 100644 --- a/extensions/warp-ipfs/src/store/community.rs +++ b/extensions/warp-ipfs/src/store/community.rs @@ -99,6 +99,10 @@ pub struct CommunityDocument { #[serde(default)] pub deleted: bool, #[serde(skip_serializing_if = "Option::is_none")] + pub icon: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub banner: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub signature: Option, } @@ -226,6 +230,8 @@ impl CommunityDocument { permissions, invites: IndexMap::new(), deleted: false, + icon: None, + banner: None, signature: None, }; document.sign(keypair)?; diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index c3773a6df..417f87503 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -10,7 +10,7 @@ use indexmap::{IndexMap, IndexSet}; use ipld_core::cid::Cid; use rust_ipfs::libp2p::gossipsub::Message; use rust_ipfs::p2p::MultiaddrExt; -use rust_ipfs::Ipfs; +use rust_ipfs::{Ipfs, IpfsPath}; use rust_ipfs::{PeerId, SubscriptionStream}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; @@ -47,13 +47,17 @@ use crate::store::community::{ }; use crate::store::conversation::message::MessageDocument; use crate::store::discovery::Discovery; +use crate::store::document::files::FileDocument; +use crate::store::document::image_dag::ImageDag; use crate::store::ds_key::DataStoreKey; use crate::store::event_subscription::EventSubscription; use crate::store::topics::PeerTopic; use crate::store::{ - CommunityUpdateKind, ConversationEvents, MAX_COMMUNITY_CHANNELS, MAX_COMMUNITY_DESCRIPTION, + CommunityUpdateKind, ConversationEvents, ConversationImageType, MAX_COMMUNITY_CHANNELS, + MAX_COMMUNITY_DESCRIPTION, MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE, MAX_MESSAGE_SIZE, MAX_REACTIONS, MIN_MESSAGE_SIZE, SHUTTLE_TIMEOUT, }; +use crate::utils::{ByteCollection, ExtensionType}; use crate::{ // rt::LocalExecutor, store::{ @@ -702,19 +706,25 @@ impl CommunityTask { let _ = response.send(result); } CommunityTaskCommand::GetCommunityIcon { response } => { - let result = self.get_community_icon().await; + let result = self.get_community_image(ConversationImageType::Icon).await; let _ = response.send(result); } CommunityTaskCommand::GetCommunityBanner { response } => { - let result = self.get_community_banner().await; + let result = self + .get_community_image(ConversationImageType::Banner) + .await; let _ = response.send(result); } CommunityTaskCommand::EditCommunityIcon { response, location } => { - let result = self.edit_community_icon(location).await; + let result = self + .edit_community_image(location, ConversationImageType::Icon) + .await; let _ = response.send(result); } CommunityTaskCommand::EditCommunityBanner { response, location } => { - let result = self.edit_community_banner(location).await; + let result = self + .edit_community_image(location, ConversationImageType::Banner) + .await; let _ = response.send(result); } CommunityTaskCommand::CreateCommunityInvite { @@ -1379,32 +1389,194 @@ impl CommunityTask { ) .await } + async fn get_community_image( + &self, + image_type: ConversationImageType, + ) -> Result { + let (cid, max_size) = match image_type { + ConversationImageType::Icon => { + let cid = self.document.icon.ok_or(Error::Other)?; + (cid, MAX_CONVERSATION_ICON_SIZE) + } + ConversationImageType::Banner => { + let cid = self.document.banner.ok_or(Error::Other)?; + (cid, MAX_CONVERSATION_BANNER_SIZE) + } + }; - pub async fn get_community_icon(&self) -> Result { - Err(Error::Unimplemented) - } - pub async fn get_community_banner(&self) -> Result { - Err(Error::Unimplemented) - } - pub async fn edit_community_icon(&mut self, _location: Location) -> Result<(), Error> { - let own_did = &self.identity.did_key(); - if !self - .document - .has_permission(own_did, &CommunityPermission::EditIcon) - { - return Err(Error::Unauthorized); + let dag: ImageDag = self.ipfs.get_dag(cid).deserialized().await?; + + if dag.size > max_size as _ { + return Err(Error::InvalidLength { + context: "image".into(), + current: dag.size as _, + minimum: None, + maximum: Some(max_size), + }); } - Err(Error::Unimplemented) + + let image = self + .ipfs + .cat_unixfs(dag.link) + .max_length(dag.size as _) + .await + .map_err(anyhow::Error::from)?; + + let mut img = ConversationImage::default(); + img.set_image_type(dag.mime); + img.set_data(image.into()); + Ok(img) } - pub async fn edit_community_banner(&mut self, _location: Location) -> Result<(), Error> { + async fn edit_community_image( + &mut self, + location: Location, + image_type: ConversationImageType, + ) -> Result<(), Error> { + let max_size = match image_type { + ConversationImageType::Banner => MAX_CONVERSATION_BANNER_SIZE, + ConversationImageType::Icon => MAX_CONVERSATION_ICON_SIZE, + }; let own_did = &self.identity.did_key(); - if !self - .document - .has_permission(own_did, &CommunityPermission::EditBanner) - { - return Err(Error::Unauthorized); + match image_type { + ConversationImageType::Icon => { + if !self + .document + .has_permission(own_did, &CommunityPermission::EditIcon) + { + return Err(Error::Unauthorized); + } + } + ConversationImageType::Banner => { + if !self + .document + .has_permission(own_did, &CommunityPermission::EditBanner) + { + return Err(Error::Unauthorized); + } + } } - Err(Error::Unimplemented) + let (cid, size, ext) = match location { + Location::Constellation { path } => { + let file = self + .file + .root_directory() + .get_item_by_path(&path) + .and_then(|item| item.get_file())?; + + let extension = file.file_type(); + + if file.size() > max_size { + return Err(Error::InvalidLength { + context: "image".into(), + current: file.size(), + minimum: Some(1), + maximum: Some(max_size), + }); + } + + let document = FileDocument::new(&self.ipfs, &file).await?; + let cid = document + .reference + .as_ref() + .and_then(|reference| IpfsPath::from_str(reference).ok()) + .and_then(|path| path.root().cid().copied()) + .ok_or(Error::OtherWithContext("invalid reference".into()))?; + + (cid, document.size, extension) + } + Location::Disk { path } => { + #[cfg(target_arch = "wasm32")] + { + _ = path; + unreachable!() + } + #[cfg(not(target_arch = "wasm32"))] + { + use crate::utils::ReaderStream; + use tokio_util::compat::TokioAsyncReadCompatExt; + + let extension = path + .extension() + .and_then(std::ffi::OsStr::to_str) + .map(ExtensionType::from) + .unwrap_or(ExtensionType::Other) + .into(); + + let file = tokio::fs::File::open(path).await?; + let size = file.metadata().await?.len() as _; + let stream = + ReaderStream::from_reader_with_cap(file.compat(), 512, Some(max_size)) + .boxed(); + let path = self.ipfs.add_unixfs(stream).pin(false).await?; + let cid = path.root().cid().copied().expect("valid cid in path"); + (cid, size, extension) + } + } + Location::Stream { + // NOTE: `name` and `size` would not be used here as we are only storing the data. If we are to store in constellation too, we would make use of these fields + name: _, + size: _, + stream, + } => { + let bytes = ByteCollection::new_with_max_capacity(stream, max_size).await?; + + let bytes_len = bytes.len(); + + let path = self.ipfs.add_unixfs(bytes.clone()).pin(false).await?; + let cid = path.root().cid().copied().expect("valid cid in path"); + + let cursor = std::io::Cursor::new(bytes); + + let image = image::ImageReader::new(cursor).with_guessed_format()?; + + let format = image + .format() + .and_then(|format| ExtensionType::try_from(format).ok()) + .unwrap_or(ExtensionType::Other) + .into(); + + (cid, bytes_len, format) + } + }; + + let dag = ImageDag { + link: cid, + size: size as _, + mime: ext, + }; + + let cid = self.ipfs.put_dag(dag).await?; + + let kind = match image_type { + ConversationImageType::Icon => { + self.document.icon.replace(cid); + CommunityUpdateKind::EditIcon + } + ConversationImageType::Banner => { + self.document.banner.replace(cid); + CommunityUpdateKind::EditBanner + } + }; + + self.set_document().await?; + + let event = CommunityMessagingEvents::UpdateCommunity { + community: self.document.clone(), + kind, + }; + + let message_event = match image_type { + ConversationImageType::Icon => MessageEventKind::EditedCommunityIcon { + community_id: self.community_id, + }, + ConversationImageType::Banner => MessageEventKind::EditedCommunityBanner { + community_id: self.community_id, + }, + }; + + let _ = self.event_broadcast.send(message_event); + + self.publish(None, event, true, vec![]).await } pub async fn create_community_invite( @@ -4159,6 +4331,24 @@ async fn message_event( tracing::warn!(%community_id, error = %e, "Error broadcasting event"); } } + CommunityUpdateKind::EditIcon => { + this.replace_document(community).await?; + if let Err(e) = this + .event_broadcast + .send(MessageEventKind::EditedCommunityIcon { community_id }) + { + tracing::warn!(%community_id, error = %e, "Error broadcasting event"); + } + } + CommunityUpdateKind::EditBanner => { + this.replace_document(community).await?; + if let Err(e) = this + .event_broadcast + .send(MessageEventKind::EditedCommunityBanner { community_id }) + { + tracing::warn!(%community_id, error = %e, "Error broadcasting event"); + } + } CommunityUpdateKind::GrantCommunityPermission { permission, role_id, diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index 664312b8c..04f5c057d 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -519,6 +519,8 @@ pub enum CommunityUpdateKind { EditCommunityDescription { description: Option, }, + EditIcon, + EditBanner, GrantCommunityPermission { permission: CommunityPermission, role_id: RoleId, diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index b9bcf63fe..470b34963 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -29,6 +29,11 @@ mod test { use crate::common::create_accounts; + const RED_PNG: [u8; 30] = [ + 66, 77, 30, 0, 0, 0, 0, 0, 0, 0, 26, 0, 0, 0, 12, 0, 0, 0, 1, 0, 1, 0, 1, 0, 24, 0, 0, 0, + 255, 0, + ]; + async fn assert_next_msg_event( streams: Vec<&mut MessageEventStream>, timeout: Duration, @@ -427,27 +432,227 @@ mod test { // assert!(false); // Ok(()) // } - // #[async_test] - // async fn unauthorized_edit_community_icon() -> anyhow::Result<()> { - // assert!(false); - // Ok(()) - // } - // #[async_test] - // async fn authorized_edit_community_icon() -> anyhow::Result<()> { - // assert!(false); - // Ok(()) - // } - // #[async_test] - // async fn unauthorized_edit_community_banner() -> anyhow::Result<()> { - // assert!(false); - // Ok(()) - // } - // #[async_test] - // async fn authorized_edit_community_banner() -> anyhow::Result<()> { - // assert!(false); - // Ok(()) - // } + #[async_test] + async fn unauthorized_edit_community_icon() -> anyhow::Result<()> { + let context = Some("test::unauthorized_edit_community_icon".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "red.png"; + instance_b.put_buffer(file_name, &RED_PNG).await?; + let location = Location::Constellation { + path: file_name.to_string(), + }; + let result = instance_b + .edit_community_icon(community.id(), location) + .await; + assert_eq!( + format!("{:?}", result), + format!("{:?}", Err::<(), Error>(Error::Unauthorized)), + ); + Ok(()) + } + #[async_test] + async fn authorized_edit_community_icon() -> anyhow::Result<()> { + let context = Some("test::authorized_edit_community_icon".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + instance_a + .grant_community_permission_for_all(community.id(), CommunityPermission::EditIcon) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "red.png"; + instance_b.put_buffer(file_name, &RED_PNG).await?; + let location = Location::Constellation { + path: file_name.to_string(), + }; + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + instance_b + .edit_community_icon(community.id(), location) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::EditedCommunityIcon { + community_id: community.id(), + }, + ) + .await?; + + let image = instance_a.get_community_icon(community.id()).await?; + assert_eq!(image.data(), &RED_PNG); + Ok(()) + } + #[async_test] + async fn unauthorized_edit_community_banner() -> anyhow::Result<()> { + let context = Some("test::unauthorized_edit_community_banner".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "red.png"; + instance_b.put_buffer(file_name, &RED_PNG).await?; + let location = Location::Constellation { + path: file_name.to_string(), + }; + let result = instance_b + .edit_community_banner(community.id(), location) + .await; + assert_eq!( + format!("{:?}", result), + format!("{:?}", Err::<(), Error>(Error::Unauthorized)), + ); + Ok(()) + } + #[async_test] + async fn authorized_edit_community_banner() -> anyhow::Result<()> { + let context = Some("test::authorized_edit_community_icon".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + instance_a + .grant_community_permission_for_all(community.id(), CommunityPermission::EditBanner) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "red.png"; + instance_b.put_buffer(file_name, &RED_PNG).await?; + let location = Location::Constellation { + path: file_name.to_string(), + }; + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + instance_b + .edit_community_banner(community.id(), location) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::EditedCommunityBanner { + community_id: community.id(), + }, + ) + .await?; + + let image = instance_a.get_community_banner(community.id()).await?; + assert_eq!(image.data(), &RED_PNG); + Ok(()) + } #[async_test] async fn unauthorized_create_community_invite() -> anyhow::Result<()> { let context = Some("test::unauthorized_create_community_invite".into()); @@ -4371,8 +4576,7 @@ mod test { .await?; let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4434,8 +4638,7 @@ mod test { ); let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4491,8 +4694,7 @@ mod test { let mut stream_b = instance_b.get_community_stream(community.id()).await?; let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4601,8 +4803,7 @@ mod test { let mut stream_b = instance_b.get_community_stream(community.id()).await?; let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4651,7 +4852,7 @@ mod test { if let Some(progress) = constellation_progress_stream.next().await { match progress { Progression::ProgressComplete { name: _, total } => { - assert_eq!(total, Some(file.len())); + assert_eq!(total, Some(RED_PNG.len())); break; } Progression::ProgressFailed { @@ -4670,7 +4871,7 @@ mod test { } } let contents = fs::read(path.clone()).await?; - assert_eq!(contents, file.to_vec()); + assert_eq!(contents, RED_PNG.clone()); fs::remove_file(path).await?; let msg = instance_a @@ -4721,8 +4922,7 @@ mod test { let mut stream_b = instance_b.get_community_stream(community.id()).await?; let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4831,8 +5031,7 @@ mod test { let mut stream_b = instance_b.get_community_stream(community.id()).await?; let file_name = "my_file.txt"; - let file = b"Thanks for reading my file"; - instance_b.put_buffer(file_name, file).await?; + instance_b.put_buffer(file_name, &RED_PNG).await?; let locations = vec![Location::Constellation { path: file_name.to_string(), }]; @@ -4876,14 +5075,14 @@ mod test { ) .await?; - let mut bytes = Vec::with_capacity(file.len()); + let mut bytes = Vec::with_capacity(RED_PNG.len()); while let Some(result) = download_stream.next().await { let b = result.expect("valid"); bytes.extend(b.to_vec()); } - assert_eq!(bytes, file.to_vec()); + assert_eq!(bytes, RED_PNG); let msg = instance_a .get_community_channel_message(community.id(), channel.id(), message_id) diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index c9c329cc8..c0f7f0994 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -188,6 +188,12 @@ pub enum MessageEventKind { community_id: Uuid, description: Option, }, + EditedCommunityIcon { + community_id: Uuid, + }, + EditedCommunityBanner { + community_id: Uuid, + }, GrantedCommunityPermission { community_id: Uuid, permission: CommunityPermission,