From 886c876bcb83c14ae3a76e8ad04cf32dff14eeb0 Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 15 Nov 2023 12:02:18 -0500 Subject: [PATCH] chore: Implement message reference --- extensions/warp-ipfs/src/lib.rs | 28 +++- .../warp-ipfs/src/store/conversation.rs | 125 ++++++++++++++---- extensions/warp-ipfs/src/store/message.rs | 30 ++++- tools/relay-server/src/main.rs | 2 +- warp/src/raygun/mod.rs | 123 +++++++++++++++++ 5 files changed, 276 insertions(+), 32 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index cf40f0bca..37a6cbc94 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -46,9 +46,9 @@ use warp::crypto::keypair::PhraseType; use warp::crypto::zeroize::Zeroizing; use warp::raygun::{ AttachmentEventStream, Conversation, EmbedState, Location, Message, MessageEvent, - MessageEventStream, MessageOptions, MessageStatus, Messages, PinState, RayGun, - RayGunAttachment, RayGunEventKind, RayGunEventStream, RayGunEvents, RayGunGroupConversation, - RayGunStream, ReactionState, + MessageEventStream, MessageOptions, MessageReference, MessageStatus, Messages, PinState, + RayGun, RayGunAttachment, RayGunEventKind, RayGunEventStream, RayGunEvents, + RayGunGroupConversation, RayGunStream, ReactionState, }; use warp::sync::{Arc, RwLock}; @@ -304,7 +304,7 @@ impl WarpIpfs { .set_keypair(keypair) .with_rendezvous_client() .set_transport_configuration(TransportConfig { - yamux_receive_window_size: 256*1024, + yamux_receive_window_size: 256 * 1024, yamux_max_buffer_size: 1024 * 1024, yamux_update_mode: UpdateMode::Read, ..Default::default() @@ -1286,6 +1286,26 @@ impl RayGun for WarpIpfs { .await } + async fn get_message_references( + &self, + conversation_id: Uuid, + opt: MessageOptions, + ) -> Result, Error> { + self.messaging_store()? + .get_message_references(conversation_id, opt) + .await + } + + async fn get_message_reference( + &self, + conversation_id: Uuid, + message_id: Uuid, + ) -> Result { + self.messaging_store()? + .get_message_reference(conversation_id, message_id) + .await + } + async fn message_status( &self, conversation_id: Uuid, diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index 3aa836708..91cecf1d7 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -4,7 +4,7 @@ use futures::{ stream::{self, BoxStream}, StreamExt, TryFutureExt, }; -use libipld::{Cid, Ipld}; +use libipld::Cid; use rust_ipfs::Ipfs; use serde::{Deserialize, Deserializer, Serialize}; use std::collections::{BTreeSet, HashMap}; @@ -15,8 +15,8 @@ use warp::{ error::Error, logging::tracing::log::info, raygun::{ - Conversation, ConversationType, Message, MessageOptions, MessagePage, Messages, - MessagesType, + Conversation, ConversationType, Message, MessageOptions, MessagePage, MessageReference, + Messages, MessagesType, }, }; @@ -286,19 +286,13 @@ impl ConversationDocument { } pub async fn messages_length(&self, ipfs: &Ipfs) -> Result { - let document = self.get_raw_message_list(ipfs).await?; - if let Ipld::List(list) = document { - return Ok(list.len()); - } - Err(Error::InvalidDataType) + self.get_message_list(ipfs).await.map(|l| l.len()) } pub async fn get_message_list(&self, ipfs: &Ipfs) -> Result, Error> { match self.messages { Some(cid) => ipfs - .dag() - .get() - .path(cid) + .get_dag(cid) .local() .deserialized() .await @@ -308,19 +302,6 @@ impl ConversationDocument { } } - pub async fn get_raw_message_list(&self, ipfs: &Ipfs) -> Result { - match self.messages { - Some(cid) => ipfs - .dag() - .get() - .path(cid) - .local() - .await - .map_err(anyhow::Error::from) - .map_err(Error::from), - None => Ok(Ipld::List(vec![])), - } - } pub async fn set_message_list( &mut self, ipfs: &Ipfs, @@ -347,6 +328,65 @@ impl ConversationDocument { Ok(list) } + pub async fn get_messages_reference_stream<'a>( + &self, + ipfs: &Ipfs, + option: MessageOptions, + ) -> Result, Error> { + let message_list = self.get_message_list(ipfs).await?; + + if message_list.is_empty() { + return Ok(stream::empty().boxed()); + } + + let mut messages = Vec::from_iter(message_list); + + if option.reverse() { + messages.reverse() + } + + if option.first_message() && !messages.is_empty() { + let message = messages.first().copied().ok_or(Error::MessageNotFound)?; + return Ok(stream::once(async move { message.into() }).boxed()); + } + + if option.last_message() && !messages.is_empty() { + let message = messages.last().copied().ok_or(Error::MessageNotFound)?; + return Ok(stream::once(async move { message.into() }).boxed()); + } + + let stream = async_stream::stream! { + let mut remaining = option.limit(); + for (index, document) in messages.iter().enumerate() { + if remaining.as_ref().map(|x| *x == 0).unwrap_or_default() { + break; + } + if let Some(range) = option.range() { + if range.start > index || range.end < index { + continue + } + } + if let Some(range) = option.date_range() { + if !(document.date >= range.start && document.date <= range.end) { + continue + } + } + + if option.pinned() && !document.pinned { + continue; + } + + if let Some(remaining) = remaining.as_mut() { + *remaining = remaining.saturating_sub(1); + } + + yield document.into() + } + }; + + Ok(stream.boxed()) + } + pub async fn get_messages_stream<'a>( &self, ipfs: &Ipfs, @@ -587,9 +627,37 @@ pub struct MessageDocument { pub conversation_id: Uuid, pub sender: DIDEd25519Reference, pub date: DateTime, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub modified: Option>, + #[serde(default)] + pub pinned: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replied: Option, pub message: Cid, } +impl From for MessageReference { + fn from(document: MessageDocument) -> Self { + Self::from(&document) + } +} + +impl From<&MessageDocument> for MessageReference { + fn from(document: &MessageDocument) -> Self { + let mut reference = MessageReference::default(); + reference.set_id(document.id); + reference.set_conversation_id(document.conversation_id); + reference.set_date(document.date); + if let Some(modified) = document.modified { + reference.set_modified(modified); + } + reference.set_pinned(document.pinned); + reference.set_replied(document.replied); + reference.set_sender(document.sender.to_did()); + reference + } +} + impl PartialOrd for MessageDocument { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -613,6 +681,9 @@ impl MessageDocument { let conversation_id = message.conversation_id(); let date = message.date(); let sender = message.sender(); + let pinned = message.pinned(); + let modified = message.modified(); + let replied = message.replied(); let bytes = serde_json::to_vec(&message)?; @@ -634,6 +705,9 @@ impl MessageDocument { conversation_id, date, message, + pinned, + modified, + replied, }; Ok(document) @@ -677,6 +751,9 @@ impl MessageDocument { None => ecdh_encrypt(did, Some(&self.sender.to_did()), &bytes)?, }; + self.pinned = message.pinned(); + self.modified = message.modified(); + let message_cid = ipfs.dag().put().serialize(data)?.await?; info!("Setting Message to document"); diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index d43f7d218..dd6eb1fc0 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -9,7 +9,7 @@ use std::time::{Duration, Instant}; use chrono::Utc; use futures::channel::mpsc::{unbounded, UnboundedSender}; use futures::channel::oneshot::{self, Sender as OneshotSender}; -use futures::stream::SelectAll; +use futures::stream::{BoxStream, SelectAll}; use futures::{SinkExt, Stream, StreamExt}; use rust_ipfs::{Ipfs, IpfsPath, PeerId, SubscriptionStream}; @@ -25,8 +25,9 @@ use warp::logging::tracing::warn; use warp::multipass::MultiPassEventKind; use warp::raygun::{ AttachmentEventStream, AttachmentKind, Conversation, ConversationType, EmbedState, Location, - Message, MessageEvent, MessageEventKind, MessageOptions, MessageStatus, MessageStream, - MessageType, Messages, MessagesType, PinState, RayGunEventKind, Reaction, ReactionState, + Message, MessageEvent, MessageEventKind, MessageOptions, MessageReference, MessageStatus, + MessageStream, MessageType, Messages, MessagesType, PinState, RayGunEventKind, Reaction, + ReactionState, }; use warp::sync::Arc; @@ -2188,6 +2189,29 @@ impl MessageStore { .await } + pub async fn get_message_references<'a>( + &self, + conversation_id: Uuid, + opt: MessageOptions, + ) -> Result, Error> { + let conversation = self.conversations.get(conversation_id).await?; + conversation + .get_messages_reference_stream(&self.ipfs, opt) + .await + } + + pub async fn get_message_reference( + &self, + conversation_id: Uuid, + message_id: Uuid, + ) -> Result { + let conversation = self.conversations.get(conversation_id).await?; + conversation + .get_message_document(&self.ipfs, message_id) + .await + .map(|document| document.into()) + } + //TODO: Send a request to recipient(s) of the chat to ack if message been delivered if message is marked "sent" unless we receive an event acknowledging the message itself //Note: // - For group chat, this can be ignored unless we decide to have a full acknowledgement from all recipients in which case, we can mark it as "sent" diff --git a/tools/relay-server/src/main.rs b/tools/relay-server/src/main.rs index 40bc2dc18..88fac69d5 100644 --- a/tools/relay-server/src/main.rs +++ b/tools/relay-server/src/main.rs @@ -53,7 +53,7 @@ struct Opt { #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); - + let opts = Opt::parse(); let path = opts.path; diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index a9734be1d..2937b0cf2 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -518,6 +518,115 @@ pub enum MessageType { #[display(fmt = "event")] Event, } + +#[derive(Default, Clone, Debug, PartialEq, Eq)] +pub struct MessageReference { + /// ID of the Message + id: Uuid, + + /// Conversion id where `Message` is associated with. + conversation_id: Uuid, + + /// ID of the sender of the message + sender: DID, + + /// Timestamp of the message + date: DateTime, + + /// Timestamp of when message was modified + modified: Option>, + + /// Pin a message over other messages + pinned: bool, + + /// ID of the message being replied to + replied: Option, + + /// Indication that a message been deleted + deleted: bool, +} + +impl PartialOrd for MessageReference { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MessageReference { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.date.cmp(&other.date) + } +} + +// Getter functions +impl MessageReference { + pub fn id(&self) -> Uuid { + self.id + } + + pub fn conversation_id(&self) -> Uuid { + self.conversation_id + } + + pub fn sender(&self) -> DID { + self.sender.clone() + } + + pub fn date(&self) -> DateTime { + self.date + } + + pub fn modified(&self) -> Option> { + self.modified + } + + pub fn pinned(&self) -> bool { + self.pinned + } + + pub fn replied(&self) -> Option { + self.replied + } + + pub fn deleted(&self) -> bool { + self.deleted + } +} + +impl MessageReference { + pub fn set_id(&mut self, id: Uuid) { + self.id = id + } + + pub fn set_conversation_id(&mut self, id: Uuid) { + self.conversation_id = id + } + + pub fn set_sender(&mut self, id: DID) { + self.sender = id + } + + pub fn set_date(&mut self, date: DateTime) { + self.date = date + } + + pub fn set_modified(&mut self, date: DateTime) { + self.modified = Some(date) + } + + pub fn set_pinned(&mut self, pin: bool) { + self.pinned = pin + } + + pub fn set_replied(&mut self, replied: Option) { + self.replied = replied + } + + pub fn set_delete(&mut self, deleted: bool) { + self.deleted = deleted + } +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, warp_derive::FFIVec, FFIFree)] pub struct Message { /// ID of the Message @@ -866,6 +975,20 @@ pub trait RayGun: Err(Error::Unimplemented) } + /// Retrieve all message references from a conversation + async fn get_message_references( + &self, + _: Uuid, + _: MessageOptions, + ) -> Result, Error> { + Err(Error::Unimplemented) + } + + /// Retrieve a message reference from a conversation + async fn get_message_reference(&self, _: Uuid, _: Uuid) -> Result { + Err(Error::Unimplemented) + } + /// Retrieve all messages from a conversation async fn get_messages( &self,