From f7992d771461b744be86301211bc56b73d87847d Mon Sep 17 00:00:00 2001 From: ashneverdawn <8341280+ashneverdawn@users.noreply.github.com> Date: Tue, 3 Dec 2024 13:37:20 -0400 Subject: [PATCH 1/3] Feat: community message attachments (#640) --- .../src/store/message/community_task.rs | 145 ++++- extensions/warp-ipfs/tests/community.rs | 599 +++++++++++++++++- warp/src/raygun/community.rs | 1 + 3 files changed, 718 insertions(+), 27 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index cf13174ed..c3773a6df 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -30,8 +30,8 @@ use warp::raygun::community::{ }; use warp::raygun::{ AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions, - MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind, - ReactionState, + MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState, + RayGunEventKind, ReactionState, }; use warp::{ crypto::{cipher::Cipher, generate}, @@ -68,6 +68,8 @@ use crate::{ }, }; +use super::attachment::AttachmentStream; + type AttachmentOneshot = (MessageDocument, oneshot::Sender>); #[allow(dead_code)] @@ -321,7 +323,7 @@ pub struct CommunityTask { community_id: Uuid, ipfs: Ipfs, root: RootDocumentMap, - _file: FileStore, + file: FileStore, identity: IdentityStore, discovery: Discovery, pending_key_exchange: IndexMap, bool)>>, @@ -332,7 +334,7 @@ pub struct CommunityTask { event_stream: SubscriptionStream, request_stream: SubscriptionStream, - _attachment_tx: futures::channel::mpsc::Sender, + attachment_tx: futures::channel::mpsc::Sender, attachment_rx: futures::channel::mpsc::Receiver, message_command: futures::channel::mpsc::Sender, event_broadcast: tokio::sync::broadcast::Sender, @@ -403,7 +405,7 @@ impl CommunityTask { community_id, ipfs: ipfs.clone(), root: root.clone(), - _file: file.clone(), + file: file.clone(), identity: identity.clone(), discovery: discovery.clone(), pending_key_exchange: Default::default(), @@ -414,7 +416,7 @@ impl CommunityTask { request_stream, event_stream, - _attachment_tx: atx, + attachment_tx: atx, attachment_rx: arx, event_broadcast: btx, _event_subscription, @@ -3304,29 +3306,132 @@ impl CommunityTask { } pub async fn attach_to_community_channel_message( &mut self, - _channel_id: Uuid, - _message_id: Option, - _locations: Vec, - _message: Vec, + channel_id: Uuid, + message_id: Option, + locations: Vec, + messages: Vec, ) -> Result<(Uuid, AttachmentEventStream), Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::SendMessages, + channel_id, + ) { + return Err(Error::Unauthorized); + } + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::SendAttachments, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let keystore = pubkey_or_keystore(&*self)?; + + let stream = AttachmentStream::new( + &self.ipfs, + self.root.keypair(), + &self.identity.did_key(), + &self.file, + channel_id, + keystore, + self.attachment_tx.clone(), + ) + .set_reply(message_id) + .set_locations(locations)? + .set_lines(messages)?; + + let message_id = stream.message_id(); + + Ok((message_id, stream.boxed())) } pub async fn download_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, - _path: PathBuf, + channel_id: Uuid, + message_id: Uuid, + file: String, + path: PathBuf, ) -> Result { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel.get_message_document(&self.ipfs, message_id).await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download(&self.ipfs, path, &members, None); + + Ok(stream) } pub async fn download_stream_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, + channel_id: Uuid, + message_id: Uuid, + file: String, ) -> Result>, Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel.get_message_document(&self.ipfs, message_id).await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download_stream(&self.ipfs, &members, None); + + Ok(stream) } async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> { diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index cd6abd0af..93d511765 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -2,15 +2,18 @@ mod common; #[cfg(test)] mod test { use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryStreamExt}; - use std::time::Duration; + use std::{path::PathBuf, time::Duration}; use uuid::Uuid; - use warp::raygun::{ - community::{ - Community, CommunityChannelPermission, CommunityChannelType, CommunityInvite, - CommunityPermission, RayGunCommunity, + use warp::{ + constellation::{Constellation, Progression}, + raygun::{ + community::{ + Community, CommunityChannelPermission, CommunityChannelType, CommunityInvite, + CommunityPermission, RayGunCommunity, + }, + Location, Message, MessageEvent, MessageEventKind, MessageEventStream, MessageOptions, + MessageReference, MessageStatus, Messages, RayGunEventKind, RayGunStream, }, - Message, MessageEvent, MessageEventKind, MessageEventStream, MessageOptions, - MessageReference, MessageStatus, Messages, RayGunEventKind, RayGunStream, }; #[cfg(target_arch = "wasm32")] @@ -4310,4 +4313,586 @@ mod test { .await?; Ok(()) } + #[async_test] + async fn unauthorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::SendAttachments, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::SendAttachments, + }, + ) + .await?; + + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let result = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await; + + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => { + assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)); + } + } + Ok(()) + } + #[async_test] + async fn authorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let _ = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await?; + Ok(()) + } + #[async_test] + async fn unauthorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message, + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::ViewChannel, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::ViewChannel, + }, + ) + .await?; + + let result = instance_b + .download_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name.to_string(), + PathBuf::new(), + ) + .await; + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)), + } + Ok(()) + } + #[async_test] + async fn authorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + let path = PathBuf::from("test::authorized_download_from_community_channel_message"); + let mut constellation_progress_stream = instance_b + .download_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name.to_string(), + path.clone(), + ) + .await?; + loop { + if let Some(progress) = constellation_progress_stream.next().await { + match progress { + Progression::ProgressComplete { name: _, total } => { + assert_eq!(total, Some(file.len())); + break; + } + Progression::ProgressFailed { + name: _, + last_size: _, + error, + } => panic!("progress shouldn't have failed: {}", error), + Progression::CurrentProgress { + name, + current, + total, + } => { + println!("name: {}, current: {}, total: {:?}", name, current, total) + } + } + } + } + let contents = fs::read(path.clone()).await?; + assert_eq!(contents, file.to_vec()); + fs::remove_file(path).await?; + + let msg = instance_a + .get_community_channel_message(community.id(), channel.id(), message_id) + .await?; + assert_eq!(msg.lines(), &message); + assert_eq!(msg.attachments().len(), 1); + Ok(()) + } + #[async_test] + async fn unauthorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = + Some("test::unauthorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + + instance_a + .revoke_community_channel_permission_for_all( + community.id(), + channel.id(), + CommunityChannelPermission::ViewChannel, + ) + .await?; + assert_next_msg_event( + vec![&mut stream_a, &mut stream_b], + Duration::from_secs(60), + MessageEventKind::RevokedCommunityChannelPermissionForAll { + community_id: community.id(), + channel_id: channel.id(), + permission: CommunityChannelPermission::ViewChannel, + }, + ) + .await?; + + let result = instance_b + .download_stream_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name, + ) + .await; + match result { + Ok(_) => panic!("should not have succeeded"), + Err(e) => assert_eq!(format!("{:?}", e), format!("{:?}", Error::Unauthorized)), + } + Ok(()) + } + #[async_test] + async fn authorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = + Some("test::authorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, _, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + + let mut stream_b = instance_b.get_community_stream(community.id()).await?; + let file_name = "my_file.txt"; + let file = b"Thanks for reading my file"; + instance_b.put_buffer(file_name, file).await?; + let locations = vec![Location::Constellation { + path: file_name.to_string(), + }]; + let message = vec!["here is my file".to_string()]; + let (message_id, mut attachment_event_stream) = instance_b + .attach_to_community_channel_message( + community.id(), + channel.id(), + None, + locations, + message.clone(), + ) + .await?; + loop { + if attachment_event_stream.next().await.is_none() { + break; + } + } + assert_eq!( + next_event(&mut stream_b, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageSent { + community_id: community.id(), + channel_id: channel.id(), + message_id, + } + ); + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::CommunityMessageReceived { + community_id: community.id(), + channel_id: channel.id(), + message_id + } + ); + let mut download_stream = instance_b + .download_stream_from_community_channel_message( + community.id(), + channel.id(), + message_id, + file_name, + ) + .await?; + + let mut bytes = vec![]; + loop { + match download_stream.next().await { + Some(b) => { + let mut b = b.unwrap().to_vec(); + bytes.append(&mut b); + } + None => break, + } + } + assert_eq!(bytes, file.to_vec()); + + let msg = instance_a + .get_community_channel_message(community.id(), channel.id(), message_id) + .await?; + assert_eq!(msg.lines(), &message); + assert_eq!(msg.attachments().len(), 1); + Ok(()) + } } diff --git a/warp/src/raygun/community.rs b/warp/src/raygun/community.rs index 65e6cda34..219470a05 100644 --- a/warp/src/raygun/community.rs +++ b/warp/src/raygun/community.rs @@ -274,6 +274,7 @@ pub enum CommunityPermission { pub enum CommunityChannelPermission { ViewChannel, SendMessages, + SendAttachments, } #[async_trait::async_trait] From c08b8bfad0ab22b344237f53680ad37f5853776d Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Wed, 4 Dec 2024 12:26:38 -0600 Subject: [PATCH 2/3] chore: replace CancellationToken with AbortableJoinHandle (#642) --- .../warp-ipfs/src/store/event_subscription.rs | 22 +++++-------------- extensions/warp-ipfs/src/store/files.rs | 18 +++++---------- extensions/warp-ipfs/src/store/message.rs | 20 +++-------------- extensions/warp-ipfs/src/store/queue.rs | 19 ++++------------ 4 files changed, 17 insertions(+), 62 deletions(-) diff --git a/extensions/warp-ipfs/src/store/event_subscription.rs b/extensions/warp-ipfs/src/store/event_subscription.rs index 47f1e1bee..d6d22caba 100644 --- a/extensions/warp-ipfs/src/store/event_subscription.rs +++ b/extensions/warp-ipfs/src/store/event_subscription.rs @@ -1,4 +1,4 @@ -use crate::rt::{Executor, LocalExecutor}; +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; use futures::{ channel::{ mpsc::{channel, Receiver, Sender}, @@ -7,11 +7,9 @@ use futures::{ stream::BoxStream, SinkExt, StreamExt, }; +use std::collections::VecDeque; use std::fmt::Debug; use std::task::{Poll, Waker}; -use std::{collections::VecDeque, sync::Arc}; -use tokio::select; -use tokio_util::sync::{CancellationToken, DropGuard}; use warp::error::Error; #[allow(clippy::large_enum_variant)] @@ -27,7 +25,7 @@ enum Command { #[derive(Clone, Debug)] pub struct EventSubscription { tx: Sender>, - _task_cancellation: Arc, + _handle: AbortableJoinHandle<()>, } impl EventSubscription { @@ -43,19 +41,9 @@ impl EventSubscription { rx, }; - let token = CancellationToken::new(); - let drop_guard = token.clone().drop_guard(); - executor.dispatch(async move { - select! { - _ = token.cancelled() => {} - _ = task.run() => {} - } - }); + let _handle = executor.spawn_abortable(async move { task.run().await }); - Self { - tx, - _task_cancellation: Arc::new(drop_guard), - } + Self { tx, _handle } } pub async fn subscribe<'a>(&self) -> Result, Error> { diff --git a/extensions/warp-ipfs/src/store/files.rs b/extensions/warp-ipfs/src/store/files.rs index 5385b08b5..b20e0df6a 100644 --- a/extensions/warp-ipfs/src/store/files.rs +++ b/extensions/warp-ipfs/src/store/files.rs @@ -15,7 +15,6 @@ use futures_finally::try_stream::FinallyTryStreamExt; use rust_ipfs::{unixfs::UnixfsStatus, Ipfs, IpfsPath}; -use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{Instrument, Span}; use warp::{ constellation::{ @@ -31,7 +30,7 @@ use super::{ document::root::RootDocumentMap, event_subscription::EventSubscription, MAX_THUMBNAIL_STREAM_SIZE, }; -use crate::rt::{Executor, LocalExecutor}; +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; use crate::{ config::{self, Config}, thumbnail::ThumbnailGenerator, @@ -44,7 +43,7 @@ pub struct FileStore { path: Arc>, config: config::Config, command_sender: mpsc::Sender, - _guard: Arc, + _handle: AbortableJoinHandle<()>, } impl FileStore { @@ -55,6 +54,7 @@ impl FileStore { constellation_tx: EventSubscription, span: &Span, ) -> Self { + let executor = LocalExecutor; let config = config.clone(); let index = Directory::new("root"); @@ -92,24 +92,16 @@ impl FileStore { let signal = Some(task.signal_tx.clone()); index.rebuild_paths(&signal); - let token = CancellationToken::new(); - let _guard = Arc::new(token.clone().drop_guard()); - let span = span.clone(); - LocalExecutor.dispatch(async move { - tokio::select! { - _ = task.run().instrument(span) => {} - _ = token.cancelled() => {} - } - }); + let _handle = executor.spawn_abortable(async move { task.run().await }.instrument(span)); FileStore { index, config, path, command_sender, - _guard, + _handle, } } } diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index d7144de71..af697b000 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -29,7 +29,6 @@ use ipld_core::cid::Cid; use rust_ipfs::{Ipfs, PeerId}; use serde::{Deserialize, Serialize}; -use tokio_util::sync::{CancellationToken, DropGuard}; use uuid::Uuid; use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt}; @@ -79,7 +78,7 @@ pub type DownloadStream = BoxStream<'static, Result>; #[derive(Clone)] pub struct MessageStore { inner: Arc>, - _task_cancellation: Arc, + _handle: AbortableJoinHandle<()>, } impl MessageStore { @@ -94,9 +93,6 @@ impl MessageStore { let executor = LocalExecutor; tracing::info!("Initializing MessageStore"); - let token = CancellationToken::new(); - let drop_guard = token.clone().drop_guard(); - let root = identity.root_document().clone(); let mut inner = ConversationInner { @@ -127,19 +123,9 @@ impl MessageStore { identity: identity.clone(), }; - executor.dispatch({ - async move { - tokio::select! { - _ = token.cancelled() => {} - _ = task.run() => {} - } - } - }); + let _handle = executor.spawn_abortable(task.run()); - Self { - inner, - _task_cancellation: Arc::new(drop_guard), - } + Self { inner, _handle } } } diff --git a/extensions/warp-ipfs/src/store/queue.rs b/extensions/warp-ipfs/src/store/queue.rs index 2d6013d88..8eb27ab0b 100644 --- a/extensions/warp-ipfs/src/store/queue.rs +++ b/extensions/warp-ipfs/src/store/queue.rs @@ -1,6 +1,6 @@ use futures::{channel::mpsc, StreamExt, TryFutureExt}; -use crate::rt::{Executor, LocalExecutor}; +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; use crate::store::{ ds_key::DataStoreKey, ecdh_encrypt, payload::PayloadBuilder, topics::PeerTopic, PeerIdExt, }; @@ -8,7 +8,6 @@ use ipld_core::cid::Cid; use rust_ipfs::{Ipfs, Keypair}; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; -use tokio_util::sync::{CancellationToken, DropGuard}; use warp::{crypto::DID, error::Error}; use web_time::Instant; @@ -228,7 +227,7 @@ pub struct QueueEntry { recipient: DID, keypair: Keypair, item: RequestResponsePayload, - drop_guard: Arc>>, + drop_guard: Arc>>>, executor: LocalExecutor, } @@ -249,9 +248,6 @@ impl QueueEntry { executor: LocalExecutor, }; - let token = CancellationToken::new(); - let drop_guard = token.clone().drop_guard(); - let fut = { let entry = entry.clone(); async move { @@ -332,16 +328,9 @@ impl QueueEntry { } }; - entry.executor.dispatch(async move { - futures::pin_mut!(fut); - - tokio::select! { - _ = token.cancelled() => {} - _ = &mut fut => {} - } - }); + let _handle = entry.executor.spawn_abortable(fut); - *entry.drop_guard.write().await = Some(drop_guard); + *entry.drop_guard.write().await = Some(_handle); entry } From fded32abc3818e3223d4914f01332df9465fdb7d Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 4 Dec 2024 13:43:39 -0500 Subject: [PATCH 3/3] chore: cleanup linting in test --- extensions/warp-ipfs/tests/community.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index 93d511765..b9bcf63fe 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -4876,16 +4876,13 @@ mod test { ) .await?; - let mut bytes = vec![]; - loop { - match download_stream.next().await { - Some(b) => { - let mut b = b.unwrap().to_vec(); - bytes.append(&mut b); - } - None => break, - } + let mut bytes = Vec::with_capacity(file.len()); + + while let Some(result) = download_stream.next().await { + let b = result.expect("valid"); + bytes.extend(b.to_vec()); } + assert_eq!(bytes, file.to_vec()); let msg = instance_a