From a47d1a9c2f7002527dcb572bbf135d18b7a06134 Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 5 Dec 2024 00:13:10 -0500 Subject: [PATCH] chore: enable load from mailbox --- .../warp-ipfs/src/store/message/task.rs | 330 ++++++++++-------- 1 file changed, 175 insertions(+), 155 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message/task.rs b/extensions/warp-ipfs/src/store/message/task.rs index 6b35b04fc..96fbf0d55 100644 --- a/extensions/warp-ipfs/src/store/message/task.rs +++ b/extensions/warp-ipfs/src/store/message/task.rs @@ -3,16 +3,18 @@ use chrono::Utc; use either::Either; use futures::channel::oneshot; use futures::stream::BoxStream; -use futures::{StreamExt, TryFutureExt}; +use futures::{pin_mut, StreamExt, TryFutureExt}; use futures_timer::Delay; use indexmap::{IndexMap, IndexSet}; use ipld_core::cid::Cid; +use pollable_map::futures::FutureMap; +use rust_ipfs::p2p::MultiaddrExt; use rust_ipfs::{libp2p::gossipsub::Message, Ipfs}; use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream}; use serde::{Deserialize, Serialize}; use std::borrow::BorrowMut; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -48,7 +50,7 @@ use crate::store::event_subscription::EventSubscription; use crate::store::message::attachment::AttachmentStream; use crate::store::topics::PeerTopic; use crate::store::{ - ecdh_shared_key, verify_serde_sig, ConversationEvents, ConversationImageType, + ecdh_shared_key, protocols, verify_serde_sig, ConversationEvents, ConversationImageType, MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE, }; use crate::utils::{ByteCollection, ExtensionType}; @@ -443,158 +445,176 @@ impl ConversationTask { impl ConversationTask { async fn load_from_mailbox(&mut self) -> Result<(), Error> { - // let config::Discovery::Shuttle { addresses } = self.discovery.discovery_config().clone() - // else { - // return Ok(()); - // }; - // - // let ipfs = self.ipfs.clone(); - // let message_command = self.message_command.clone(); - // let addresses = addresses.clone(); - // let conversation_id = self.conversation_id; - // - // let mut mailbox = BTreeMap::new(); - // let mut providers = vec![]; - // for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) { - // let (tx, rx) = futures::channel::oneshot::channel(); - // let _ = message_command - // .clone() - // .send(MessageCommand::FetchMailbox { - // peer_id, - // conversation_id, - // response: tx, - // }) - // .await; - // - // match rx.timeout(SHUTTLE_TIMEOUT).await { - // Ok(Ok(Ok(list))) => { - // providers.push(peer_id); - // mailbox.extend(list); - // break; - // } - // Ok(Ok(Err(e))) => { - // tracing::error!("unable to get mailbox to conversation {conversation_id} from {peer_id}: {e}"); - // break; - // } - // Ok(Err(_)) => { - // tracing::error!("Channel been unexpectedly closed for {peer_id}"); - // continue; - // } - // Err(_) => { - // tracing::error!("Request timed out for {peer_id}"); - // continue; - // } - // } - // } - // - // let conversation_mailbox = mailbox - // .into_iter() - // .filter_map(|(id, cid)| { - // let id = Uuid::from_str(&id).ok()?; - // Some((id, cid)) - // }) - // .collect::>(); - // - // let mut messages = - // FuturesUnordered::from_iter(conversation_mailbox.into_iter().map(|(id, cid)| { - // let ipfs = ipfs.clone(); - // async move { - // ipfs.fetch(&cid).recursive().await?; - // Ok((id, cid)) - // } - // .boxed() - // })) - // .filter_map(|res: Result<_, anyhow::Error>| async move { res.ok() }) - // .filter_map(|(_, cid)| { - // let ipfs = ipfs.clone(); - // let providers = providers.clone(); - // let addresses = addresses.clone(); - // let message_command = message_command.clone(); - // async move { - // let message_document = ipfs - // .get_dag(cid) - // .providers(&providers) - // .deserialized::() - // .await - // .ok()?; - // - // if !message_document.verify() { - // return None; - // } - // - // for peer_id in addresses.into_iter().filter_map(|addr| addr.peer_id()) { - // let _ = message_command - // .clone() - // .send(MessageCommand::MessageDelivered { - // peer_id, - // conversation_id, - // message_id: message_document.id, - // }) - // .await; - // } - // Some(message_document) - // } - // }) - // .collect::>() - // .await; - // - // messages.sort_by(|a, b| b.cmp(a)); - // - // for message in messages { - // if !message.verify() { - // continue; - // } - // let message_id = message.id; - // match self - // .document - // .contains(&self.ipfs, message_id) - // .await - // .unwrap_or_default() - // { - // true => { - // let current_message = self - // .document - // .get_message_document(&self.ipfs, message_id) - // .await?; - // - // self.document - // .update_message_document(&self.ipfs, &message) - // .await?; - // - // let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified ) - // | matches!( - // (message.modified, current_message.modified), - // (Some(_), None) - // ); - // - // match is_edited { - // true => { - // let _ = self.event_broadcast.send(MessageEventKind::MessageEdited { - // conversation_id, - // message_id, - // }); - // } - // false => { - // //TODO: Emit event showing message was updated in some way - // } - // } - // } - // false => { - // self.document - // .insert_message_document(&self.ipfs, &message) - // .await?; - // - // let _ = self - // .event_broadcast - // .send(MessageEventKind::MessageReceived { - // conversation_id, - // message_id, - // }); - // } - // } - // } - // - // self.set_document().await?; + let crate::config::Discovery::Shuttle { addresses } = + self.discovery.discovery_config().clone() + else { + return Ok(()); + }; + + if addresses.is_empty() { + return Err(Error::Other); + } + + let payload = PayloadBuilder::new( + self.identity.root_document().keypair(), + crate::shuttle::message::protocol::Request::FetchMailBox { + conversation_id: self.conversation_id, + }, + ) + .build()?; + + let bytes = payload.to_bytes().expect("valid deserialization"); + + let ipfs = self.ipfs.clone(); + + let addresses = addresses.clone(); + let conversation_id = self.conversation_id; + + let mut mailbox = BTreeMap::new(); + let mut providers = vec![]; + for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) { + let response = match ipfs + .send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone())) + .await + .and_then(|response| { + PayloadMessage::::from_bytes( + &response, + ) + .map_err(anyhow::Error::from) + }) { + Ok(response) => response, + Err(_e) => { + continue; + } + }; + + match response.message() { + crate::shuttle::message::protocol::Response::Mailbox { + conversation_id: retrieved_id, + content, + } => { + debug_assert_eq!(*retrieved_id, conversation_id); + providers.push(peer_id); + mailbox.extend(content.clone()); + break; + } + crate::shuttle::message::protocol::Response::Error(e) => { + tracing::error!(error = %e, %peer_id, "error handling request"); + } + _ => { + tracing::error!(%peer_id, "response from shuttle node was invalid"); + continue; + } + } + } + + let conversation_mailbox = mailbox + .into_iter() + .filter_map(|(id, cid)| { + let id = Uuid::from_str(&id).ok()?; + Some((id, cid)) + }) + .collect::>(); + + let mut messages = FutureMap::new(); + for (id, cid) in conversation_mailbox { + let ipfs = ipfs.clone(); + let providers = providers.clone(); + let root = self.identity.root_document().clone(); + let fut = async move { + ipfs.fetch(&cid).recursive().await?; + let message_document = ipfs + .get_dag(cid) + .providers(&providers) + .deserialized::() + .await?; + + if !message_document.verify() { + return Err(Error::InvalidMessage); + } + + let payload = PayloadBuilder::new( + root.keypair(), + crate::shuttle::message::protocol::Request::FetchMailBox { conversation_id }, + ) + .build()?; + + let bytes = payload.to_bytes().expect("valid deserialization"); + for peer_id in providers { + let _response = ipfs + .send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone())) + .await; + } + + Ok(message_document) + }; + messages.insert(id, Box::pin(fut)); + } + + pin_mut!(messages); + + let mut messages = messages + .filter_map(|(_, result)| async move { result.ok() }) + .collect::>() + .await; + + messages.sort_by(|a, b| b.cmp(a)); + + for message in messages { + if !message.verify() { + continue; + } + let message_id = message.id; + match self + .document + .contains(&self.ipfs, message_id) + .await + .unwrap_or_default() + { + true => { + let current_message = self + .document + .get_message_document(&self.ipfs, message_id) + .await?; + + self.document + .update_message_document(&self.ipfs, &message) + .await?; + + let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified ) + | matches!( + (message.modified, current_message.modified), + (Some(_), None) + ); + + match is_edited { + true => { + let _ = self.event_broadcast.send(MessageEventKind::MessageEdited { + conversation_id, + message_id, + }); + } + false => { + //TODO: Emit event showing message was updated in some way + } + } + } + false => { + self.document + .insert_message_document(&self.ipfs, &message) + .await?; + + let _ = self + .event_broadcast + .send(MessageEventKind::MessageReceived { + conversation_id, + message_id, + }); + } + } + } + + self.set_document().await?; Ok(()) }