Skip to content

Commit

Permalink
chore: Implement message reference
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Nov 15, 2023
1 parent 1439cc1 commit 886c876
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 32 deletions.
28 changes: 24 additions & 4 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use warp::crypto::keypair::PhraseType;
use warp::crypto::zeroize::Zeroizing;
use warp::raygun::{
AttachmentEventStream, Conversation, EmbedState, Location, Message, MessageEvent,
MessageEventStream, MessageOptions, MessageStatus, Messages, PinState, RayGun,
RayGunAttachment, RayGunEventKind, RayGunEventStream, RayGunEvents, RayGunGroupConversation,
RayGunStream, ReactionState,
MessageEventStream, MessageOptions, MessageReference, MessageStatus, Messages, PinState,
RayGun, RayGunAttachment, RayGunEventKind, RayGunEventStream, RayGunEvents,
RayGunGroupConversation, RayGunStream, ReactionState,
};
use warp::sync::{Arc, RwLock};

Expand Down Expand Up @@ -304,7 +304,7 @@ impl WarpIpfs {
.set_keypair(keypair)
.with_rendezvous_client()
.set_transport_configuration(TransportConfig {
yamux_receive_window_size: 256*1024,
yamux_receive_window_size: 256 * 1024,
yamux_max_buffer_size: 1024 * 1024,
yamux_update_mode: UpdateMode::Read,
..Default::default()
Expand Down Expand Up @@ -1286,6 +1286,26 @@ impl RayGun for WarpIpfs {
.await
}

async fn get_message_references(
&self,
conversation_id: Uuid,
opt: MessageOptions,
) -> Result<BoxStream<'static, MessageReference>, Error> {
self.messaging_store()?
.get_message_references(conversation_id, opt)
.await
}

async fn get_message_reference(
&self,
conversation_id: Uuid,
message_id: Uuid,
) -> Result<MessageReference, Error> {
self.messaging_store()?
.get_message_reference(conversation_id, message_id)
.await
}

async fn message_status(
&self,
conversation_id: Uuid,
Expand Down
125 changes: 101 additions & 24 deletions extensions/warp-ipfs/src/store/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::{
stream::{self, BoxStream},
StreamExt, TryFutureExt,
};
use libipld::{Cid, Ipld};
use libipld::Cid;
use rust_ipfs::Ipfs;
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::{BTreeSet, HashMap};
Expand All @@ -15,8 +15,8 @@ use warp::{
error::Error,
logging::tracing::log::info,
raygun::{
Conversation, ConversationType, Message, MessageOptions, MessagePage, Messages,
MessagesType,
Conversation, ConversationType, Message, MessageOptions, MessagePage, MessageReference,
Messages, MessagesType,
},
};

Expand Down Expand Up @@ -286,19 +286,13 @@ impl ConversationDocument {
}

pub async fn messages_length(&self, ipfs: &Ipfs) -> Result<usize, Error> {
let document = self.get_raw_message_list(ipfs).await?;
if let Ipld::List(list) = document {
return Ok(list.len());
}
Err(Error::InvalidDataType)
self.get_message_list(ipfs).await.map(|l| l.len())
}

pub async fn get_message_list(&self, ipfs: &Ipfs) -> Result<BTreeSet<MessageDocument>, Error> {
match self.messages {
Some(cid) => ipfs
.dag()
.get()
.path(cid)
.get_dag(cid)
.local()
.deserialized()
.await
Expand All @@ -308,19 +302,6 @@ impl ConversationDocument {
}
}

pub async fn get_raw_message_list(&self, ipfs: &Ipfs) -> Result<Ipld, Error> {
match self.messages {
Some(cid) => ipfs
.dag()
.get()
.path(cid)
.local()
.await
.map_err(anyhow::Error::from)
.map_err(Error::from),
None => Ok(Ipld::List(vec![])),
}
}
pub async fn set_message_list(
&mut self,
ipfs: &Ipfs,
Expand All @@ -347,6 +328,65 @@ impl ConversationDocument {
Ok(list)
}

pub async fn get_messages_reference_stream<'a>(
&self,
ipfs: &Ipfs,
option: MessageOptions,
) -> Result<BoxStream<'a, MessageReference>, Error> {
let message_list = self.get_message_list(ipfs).await?;

if message_list.is_empty() {
return Ok(stream::empty().boxed());
}

let mut messages = Vec::from_iter(message_list);

if option.reverse() {
messages.reverse()
}

if option.first_message() && !messages.is_empty() {
let message = messages.first().copied().ok_or(Error::MessageNotFound)?;
return Ok(stream::once(async move { message.into() }).boxed());
}

if option.last_message() && !messages.is_empty() {
let message = messages.last().copied().ok_or(Error::MessageNotFound)?;
return Ok(stream::once(async move { message.into() }).boxed());
}

let stream = async_stream::stream! {
let mut remaining = option.limit();
for (index, document) in messages.iter().enumerate() {
if remaining.as_ref().map(|x| *x == 0).unwrap_or_default() {
break;
}
if let Some(range) = option.range() {
if range.start > index || range.end < index {
continue
}
}
if let Some(range) = option.date_range() {
if !(document.date >= range.start && document.date <= range.end) {
continue
}
}

if option.pinned() && !document.pinned {
continue;
}

if let Some(remaining) = remaining.as_mut() {
*remaining = remaining.saturating_sub(1);
}

yield document.into()
}
};

Ok(stream.boxed())
}

pub async fn get_messages_stream<'a>(
&self,
ipfs: &Ipfs,
Expand Down Expand Up @@ -587,9 +627,37 @@ pub struct MessageDocument {
pub conversation_id: Uuid,
pub sender: DIDEd25519Reference,
pub date: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub modified: Option<DateTime<Utc>>,
#[serde(default)]
pub pinned: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replied: Option<Uuid>,
pub message: Cid,
}

impl From<MessageDocument> for MessageReference {
fn from(document: MessageDocument) -> Self {
Self::from(&document)
}
}

impl From<&MessageDocument> for MessageReference {
fn from(document: &MessageDocument) -> Self {
let mut reference = MessageReference::default();
reference.set_id(document.id);
reference.set_conversation_id(document.conversation_id);
reference.set_date(document.date);
if let Some(modified) = document.modified {
reference.set_modified(modified);
}
reference.set_pinned(document.pinned);
reference.set_replied(document.replied);
reference.set_sender(document.sender.to_did());
reference
}
}

impl PartialOrd for MessageDocument {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
Expand All @@ -613,6 +681,9 @@ impl MessageDocument {
let conversation_id = message.conversation_id();
let date = message.date();
let sender = message.sender();
let pinned = message.pinned();
let modified = message.modified();
let replied = message.replied();

let bytes = serde_json::to_vec(&message)?;

Expand All @@ -634,6 +705,9 @@ impl MessageDocument {
conversation_id,
date,
message,
pinned,
modified,
replied,
};

Ok(document)
Expand Down Expand Up @@ -677,6 +751,9 @@ impl MessageDocument {
None => ecdh_encrypt(did, Some(&self.sender.to_did()), &bytes)?,
};

self.pinned = message.pinned();
self.modified = message.modified();

let message_cid = ipfs.dag().put().serialize(data)?.await?;

info!("Setting Message to document");
Expand Down
30 changes: 27 additions & 3 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::{Duration, Instant};
use chrono::Utc;
use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::channel::oneshot::{self, Sender as OneshotSender};
use futures::stream::SelectAll;
use futures::stream::{BoxStream, SelectAll};
use futures::{SinkExt, Stream, StreamExt};
use rust_ipfs::{Ipfs, IpfsPath, PeerId, SubscriptionStream};

Expand All @@ -25,8 +25,9 @@ use warp::logging::tracing::warn;
use warp::multipass::MultiPassEventKind;
use warp::raygun::{
AttachmentEventStream, AttachmentKind, Conversation, ConversationType, EmbedState, Location,
Message, MessageEvent, MessageEventKind, MessageOptions, MessageStatus, MessageStream,
MessageType, Messages, MessagesType, PinState, RayGunEventKind, Reaction, ReactionState,
Message, MessageEvent, MessageEventKind, MessageOptions, MessageReference, MessageStatus,
MessageStream, MessageType, Messages, MessagesType, PinState, RayGunEventKind, Reaction,
ReactionState,
};
use warp::sync::Arc;

Expand Down Expand Up @@ -2188,6 +2189,29 @@ impl MessageStore {
.await
}

pub async fn get_message_references<'a>(
&self,
conversation_id: Uuid,
opt: MessageOptions,
) -> Result<BoxStream<'a, MessageReference>, Error> {
let conversation = self.conversations.get(conversation_id).await?;
conversation
.get_messages_reference_stream(&self.ipfs, opt)
.await
}

pub async fn get_message_reference(
&self,
conversation_id: Uuid,
message_id: Uuid,
) -> Result<MessageReference, Error> {
let conversation = self.conversations.get(conversation_id).await?;
conversation
.get_message_document(&self.ipfs, message_id)
.await
.map(|document| document.into())
}

//TODO: Send a request to recipient(s) of the chat to ack if message been delivered if message is marked "sent" unless we receive an event acknowledging the message itself
//Note:
// - For group chat, this can be ignored unless we decide to have a full acknowledgement from all recipients in which case, we can mark it as "sent"
Expand Down
2 changes: 1 addition & 1 deletion tools/relay-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct Opt {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let opts = Opt::parse();

let path = opts.path;
Expand Down
Loading

0 comments on commit 886c876

Please sign in to comment.