Skip to content

Commit

Permalink
chore: enable load from mailbox
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 5, 2024
1 parent c7a17d6 commit a47d1a9
Showing 1 changed file with 175 additions and 155 deletions.
330 changes: 175 additions & 155 deletions extensions/warp-ipfs/src/store/message/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use chrono::Utc;
use either::Either;
use futures::channel::oneshot;
use futures::stream::BoxStream;
use futures::{StreamExt, TryFutureExt};
use futures::{pin_mut, StreamExt, TryFutureExt};
use futures_timer::Delay;
use indexmap::{IndexMap, IndexSet};
use ipld_core::cid::Cid;
use pollable_map::futures::FutureMap;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{libp2p::gossipsub::Message, Ipfs};
use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream};
use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
Expand Down Expand Up @@ -48,7 +50,7 @@ use crate::store::event_subscription::EventSubscription;
use crate::store::message::attachment::AttachmentStream;
use crate::store::topics::PeerTopic;
use crate::store::{
ecdh_shared_key, verify_serde_sig, ConversationEvents, ConversationImageType,
ecdh_shared_key, protocols, verify_serde_sig, ConversationEvents, ConversationImageType,
MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE,
};
use crate::utils::{ByteCollection, ExtensionType};
Expand Down Expand Up @@ -443,158 +445,176 @@ impl ConversationTask {

impl ConversationTask {
async fn load_from_mailbox(&mut self) -> Result<(), Error> {
// let config::Discovery::Shuttle { addresses } = self.discovery.discovery_config().clone()
// else {
// return Ok(());
// };
//
// let ipfs = self.ipfs.clone();
// let message_command = self.message_command.clone();
// let addresses = addresses.clone();
// let conversation_id = self.conversation_id;
//
// let mut mailbox = BTreeMap::new();
// let mut providers = vec![];
// for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) {
// let (tx, rx) = futures::channel::oneshot::channel();
// let _ = message_command
// .clone()
// .send(MessageCommand::FetchMailbox {
// peer_id,
// conversation_id,
// response: tx,
// })
// .await;
//
// match rx.timeout(SHUTTLE_TIMEOUT).await {
// Ok(Ok(Ok(list))) => {
// providers.push(peer_id);
// mailbox.extend(list);
// break;
// }
// Ok(Ok(Err(e))) => {
// tracing::error!("unable to get mailbox to conversation {conversation_id} from {peer_id}: {e}");
// break;
// }
// Ok(Err(_)) => {
// tracing::error!("Channel been unexpectedly closed for {peer_id}");
// continue;
// }
// Err(_) => {
// tracing::error!("Request timed out for {peer_id}");
// continue;
// }
// }
// }
//
// let conversation_mailbox = mailbox
// .into_iter()
// .filter_map(|(id, cid)| {
// let id = Uuid::from_str(&id).ok()?;
// Some((id, cid))
// })
// .collect::<BTreeMap<Uuid, Cid>>();
//
// let mut messages =
// FuturesUnordered::from_iter(conversation_mailbox.into_iter().map(|(id, cid)| {
// let ipfs = ipfs.clone();
// async move {
// ipfs.fetch(&cid).recursive().await?;
// Ok((id, cid))
// }
// .boxed()
// }))
// .filter_map(|res: Result<_, anyhow::Error>| async move { res.ok() })
// .filter_map(|(_, cid)| {
// let ipfs = ipfs.clone();
// let providers = providers.clone();
// let addresses = addresses.clone();
// let message_command = message_command.clone();
// async move {
// let message_document = ipfs
// .get_dag(cid)
// .providers(&providers)
// .deserialized::<MessageDocument>()
// .await
// .ok()?;
//
// if !message_document.verify() {
// return None;
// }
//
// for peer_id in addresses.into_iter().filter_map(|addr| addr.peer_id()) {
// let _ = message_command
// .clone()
// .send(MessageCommand::MessageDelivered {
// peer_id,
// conversation_id,
// message_id: message_document.id,
// })
// .await;
// }
// Some(message_document)
// }
// })
// .collect::<Vec<_>>()
// .await;
//
// messages.sort_by(|a, b| b.cmp(a));
//
// for message in messages {
// if !message.verify() {
// continue;
// }
// let message_id = message.id;
// match self
// .document
// .contains(&self.ipfs, message_id)
// .await
// .unwrap_or_default()
// {
// true => {
// let current_message = self
// .document
// .get_message_document(&self.ipfs, message_id)
// .await?;
//
// self.document
// .update_message_document(&self.ipfs, &message)
// .await?;
//
// let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified )
// | matches!(
// (message.modified, current_message.modified),
// (Some(_), None)
// );
//
// match is_edited {
// true => {
// let _ = self.event_broadcast.send(MessageEventKind::MessageEdited {
// conversation_id,
// message_id,
// });
// }
// false => {
// //TODO: Emit event showing message was updated in some way
// }
// }
// }
// false => {
// self.document
// .insert_message_document(&self.ipfs, &message)
// .await?;
//
// let _ = self
// .event_broadcast
// .send(MessageEventKind::MessageReceived {
// conversation_id,
// message_id,
// });
// }
// }
// }
//
// self.set_document().await?;
let crate::config::Discovery::Shuttle { addresses } =
self.discovery.discovery_config().clone()
else {
return Ok(());
};

if addresses.is_empty() {
return Err(Error::Other);
}

let payload = PayloadBuilder::new(
self.identity.root_document().keypair(),
crate::shuttle::message::protocol::Request::FetchMailBox {
conversation_id: self.conversation_id,
},
)
.build()?;

let bytes = payload.to_bytes().expect("valid deserialization");

let ipfs = self.ipfs.clone();

let addresses = addresses.clone();
let conversation_id = self.conversation_id;

let mut mailbox = BTreeMap::new();
let mut providers = vec![];
for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) {
let response = match ipfs
.send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone()))
.await
.and_then(|response| {
PayloadMessage::<crate::shuttle::message::protocol::Response>::from_bytes(
&response,
)
.map_err(anyhow::Error::from)
}) {
Ok(response) => response,
Err(_e) => {
continue;
}
};

match response.message() {
crate::shuttle::message::protocol::Response::Mailbox {
conversation_id: retrieved_id,
content,
} => {
debug_assert_eq!(*retrieved_id, conversation_id);
providers.push(peer_id);
mailbox.extend(content.clone());
break;
}
crate::shuttle::message::protocol::Response::Error(e) => {
tracing::error!(error = %e, %peer_id, "error handling request");
}
_ => {
tracing::error!(%peer_id, "response from shuttle node was invalid");
continue;
}
}
}

let conversation_mailbox = mailbox
.into_iter()
.filter_map(|(id, cid)| {
let id = Uuid::from_str(&id).ok()?;
Some((id, cid))
})
.collect::<BTreeMap<Uuid, Cid>>();

let mut messages = FutureMap::new();
for (id, cid) in conversation_mailbox {
let ipfs = ipfs.clone();
let providers = providers.clone();
let root = self.identity.root_document().clone();
let fut = async move {
ipfs.fetch(&cid).recursive().await?;
let message_document = ipfs
.get_dag(cid)
.providers(&providers)
.deserialized::<MessageDocument>()
.await?;

if !message_document.verify() {
return Err(Error::InvalidMessage);
}

let payload = PayloadBuilder::new(
root.keypair(),
crate::shuttle::message::protocol::Request::FetchMailBox { conversation_id },
)
.build()?;

let bytes = payload.to_bytes().expect("valid deserialization");
for peer_id in providers {
let _response = ipfs
.send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone()))
.await;
}

Ok(message_document)
};
messages.insert(id, Box::pin(fut));
}

pin_mut!(messages);

let mut messages = messages
.filter_map(|(_, result)| async move { result.ok() })
.collect::<Vec<_>>()
.await;

messages.sort_by(|a, b| b.cmp(a));

for message in messages {
if !message.verify() {
continue;
}
let message_id = message.id;
match self
.document
.contains(&self.ipfs, message_id)
.await
.unwrap_or_default()
{
true => {
let current_message = self
.document
.get_message_document(&self.ipfs, message_id)
.await?;

self.document
.update_message_document(&self.ipfs, &message)
.await?;

let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified )
| matches!(
(message.modified, current_message.modified),
(Some(_), None)
);

match is_edited {
true => {
let _ = self.event_broadcast.send(MessageEventKind::MessageEdited {
conversation_id,
message_id,
});
}
false => {
//TODO: Emit event showing message was updated in some way
}
}
}
false => {
self.document
.insert_message_document(&self.ipfs, &message)
.await?;

let _ = self
.event_broadcast
.send(MessageEventKind::MessageReceived {
conversation_id,
message_id,
});
}
}
}

self.set_document().await?;

Ok(())
}
Expand Down

0 comments on commit a47d1a9

Please sign in to comment.