Skip to content

Commit

Permalink
Feat: community message attachments (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashneverdawn authored Dec 3, 2024
1 parent ae3158c commit f7992d7
Show file tree
Hide file tree
Showing 3 changed files with 718 additions and 27 deletions.
145 changes: 125 additions & 20 deletions extensions/warp-ipfs/src/store/message/community_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use warp::raygun::community::{
};
use warp::raygun::{
AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions,
MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind,
ReactionState,
MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState,
RayGunEventKind, ReactionState,
};
use warp::{
crypto::{cipher::Cipher, generate},
Expand Down Expand Up @@ -68,6 +68,8 @@ use crate::{
},
};

use super::attachment::AttachmentStream;

type AttachmentOneshot = (MessageDocument, oneshot::Sender<Result<(), Error>>);

#[allow(dead_code)]
Expand Down Expand Up @@ -321,7 +323,7 @@ pub struct CommunityTask {
community_id: Uuid,
ipfs: Ipfs,
root: RootDocumentMap,
_file: FileStore,
file: FileStore,
identity: IdentityStore,
discovery: Discovery,
pending_key_exchange: IndexMap<DID, Vec<(Vec<u8>, bool)>>,
Expand All @@ -332,7 +334,7 @@ pub struct CommunityTask {
event_stream: SubscriptionStream,
request_stream: SubscriptionStream,

_attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_rx: futures::channel::mpsc::Receiver<AttachmentOneshot>,
message_command: futures::channel::mpsc::Sender<MessageCommand>,
event_broadcast: tokio::sync::broadcast::Sender<MessageEventKind>,
Expand Down Expand Up @@ -403,7 +405,7 @@ impl CommunityTask {
community_id,
ipfs: ipfs.clone(),
root: root.clone(),
_file: file.clone(),
file: file.clone(),
identity: identity.clone(),
discovery: discovery.clone(),
pending_key_exchange: Default::default(),
Expand All @@ -414,7 +416,7 @@ impl CommunityTask {
request_stream,
event_stream,

_attachment_tx: atx,
attachment_tx: atx,
attachment_rx: arx,
event_broadcast: btx,
_event_subscription,
Expand Down Expand Up @@ -3304,29 +3306,132 @@ impl CommunityTask {
}
pub async fn attach_to_community_channel_message(
&mut self,
_channel_id: Uuid,
_message_id: Option<Uuid>,
_locations: Vec<Location>,
_message: Vec<String>,
channel_id: Uuid,
message_id: Option<Uuid>,
locations: Vec<Location>,
messages: Vec<String>,
) -> Result<(Uuid, AttachmentEventStream), Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::SendMessages,
channel_id,
) {
return Err(Error::Unauthorized);
}
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::SendAttachments,
channel_id,
) {
return Err(Error::Unauthorized);
}

let keystore = pubkey_or_keystore(&*self)?;

let stream = AttachmentStream::new(
&self.ipfs,
self.root.keypair(),
&self.identity.did_key(),
&self.file,
channel_id,
keystore,
self.attachment_tx.clone(),
)
.set_reply(message_id)
.set_locations(locations)?
.set_lines(messages)?;

let message_id = stream.message_id();

Ok((message_id, stream.boxed()))
}
pub async fn download_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
_path: PathBuf,
channel_id: Uuid,
message_id: Uuid,
file: String,
path: PathBuf,
) -> Result<ConstellationProgressStream, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel.get_message_document(&self.ipfs, message_id).await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download(&self.ipfs, path, &members, None);

Ok(stream)
}
pub async fn download_stream_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
channel_id: Uuid,
message_id: Uuid,
file: String,
) -> Result<BoxStream<'static, Result<Bytes, std::io::Error>>, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel.get_message_document(&self.ipfs, message_id).await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download_stream(&self.ipfs, &members, None);

Ok(stream)
}

async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> {
Expand Down
Loading

0 comments on commit f7992d7

Please sign in to comment.