Skip to content

Commit

Permalink
Merge branch 'main' into feat/shuttle-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Oct 28, 2023
2 parents 90cc705 + 3063ede commit 5288636
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 29 deletions.
15 changes: 2 additions & 13 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,19 +1244,8 @@ impl Friends for WarpIpfs {
#[async_trait::async_trait]
impl FriendsEvent for WarpIpfs {
async fn subscribe(&mut self) -> Result<MultiPassEventStream, Error> {
let mut rx = self.multipass_tx.subscribe();

let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => yield event,
Err(broadcast::error::RecvError::Closed) => break,
Err(_) => {}
};
}
};

Ok(MultiPassEventStream(Box::pin(stream)))
let store = self.identity_store(true).await?;
Ok(MultiPassEventStream(store.subscribe()))
}
}

Expand Down
17 changes: 17 additions & 0 deletions extensions/warp-ipfs/src/store/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,23 @@ impl IdentityStore {
pub async fn is_friend(&self, pubkey: &DID) -> Result<bool, Error> {
self.friends_list().await.map(|list| list.contains(pubkey))
}

#[tracing::instrument(skip(self))]
pub fn subscribe(&self) -> futures::stream::BoxStream<'static, MultiPassEventKind> {
let mut rx = self.event.subscribe();

let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => yield event,
Err(broadcast::error::RecvError::Closed) => break,
Err(_) => {}
};
}
};

stream.boxed()
}
}

impl IdentityStore {
Expand Down
108 changes: 95 additions & 13 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use warp::crypto::{generate, DID};
use warp::error::Error;
use warp::logging::tracing::log::{debug, error, info, trace};
use warp::logging::tracing::warn;
use warp::multipass::MultiPassEventKind;
use warp::raygun::{
AttachmentEventStream, AttachmentKind, Conversation, ConversationType, EmbedState, Location,
Message, MessageEvent, MessageEventKind, MessageOptions, MessageStatus, MessageStream,
Expand All @@ -33,15 +34,16 @@ use crate::spam_filter::SpamFilter;
use crate::store::payload::Payload;
use crate::store::{
connected_to_peer, ecdh_decrypt, ecdh_encrypt, get_keypair_did, sign_serde,
ConversationRequestKind, ConversationRequestResponse, ConversationResponseKind, PeerTopic,
ConversationRequestKind, ConversationRequestResponse, ConversationResponseKind, DidExt,
PeerTopic,
};

use super::conversation::{ConversationDocument, MessageDocument};
use super::discovery::Discovery;
use super::document::conversation::Conversations;
use super::identity::IdentityStore;
use super::keystore::Keystore;
use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, MessagingEvents};
use super::{verify_serde_sig, ConversationEvents, MessagingEvents};

type ConversationSender =
UnboundedSender<(MessagingEvents, Option<OneshotSender<Result<(), Error>>>)>;
Expand Down Expand Up @@ -163,6 +165,8 @@ impl MessageStore {

let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));

let mut identity_stream = store.identity.subscribe();

loop {
tokio::select! {
Some(message) = stream.next() => {
Expand Down Expand Up @@ -195,6 +199,11 @@ impl MessageStore {
}

}
Some(id_event) = identity_stream.next() => {
if let Err(e) = store.process_identity_events(id_event).await {
warn!("Failed to process event: {e}");
}
}
_ = interval.tick() => {
if let Err(e) = store.process_queue().await {
error!("Error processing queue: {e}");
Expand All @@ -209,6 +218,81 @@ impl MessageStore {
Ok(store)
}

async fn process_identity_events(&mut self, event: MultiPassEventKind) -> Result<(), Error> {
match event {
MultiPassEventKind::FriendAdded { did } => {
if !self.with_friends.load(Ordering::SeqCst) {
return Ok(());
}

match self.create_conversation(&did).await {
Ok(_) | Err(Error::ConversationExist { .. }) => return Ok(()),
Err(e) => return Err(e),
}
}

MultiPassEventKind::Blocked { did } | MultiPassEventKind::BlockedBy { did } => {
let list = self.conversations.list().await?;

for conversation in list.iter().filter(|c| c.recipients().contains(&did)) {
let id = conversation.id();
match conversation.conversation_type {
ConversationType::Direct => {
if let Err(e) = self.delete_conversation(id, true).await {
warn!("Failed to delete conversation {id}: {e}");
continue;
}
}
ConversationType::Group => {
if conversation.creator != Some((*self.did).clone()) {
continue;
}

if let Err(e) = self.remove_recipient(id, &did, true).await {
warn!("Failed to remove {did} from conversation {id}: {e}");
continue;
}
}
}
}
}
MultiPassEventKind::FriendRemoved { did } => {
if !self.with_friends.load(Ordering::SeqCst) {
return Ok(());
}

let list = self.conversations.list().await?;

for conversation in list.iter().filter(|c| c.recipients().contains(&did)) {
let id = conversation.id();
match conversation.conversation_type {
ConversationType::Direct => {
if let Err(e) = self.delete_conversation(id, true).await {
warn!("Failed to delete conversation {id}: {e}");
continue;
}
}
ConversationType::Group => {
if conversation.creator != Some((*self.did).clone()) {
continue;
}

if let Err(e) = self.remove_recipient(id, &did, true).await {
warn!("Failed to remove {did} from conversation {id}: {e}");
continue;
}
}
}
}
}
MultiPassEventKind::IdentityOnline { .. } => {
//TODO: Check queue and process any entry once peer is subscribed to the respective topics.
}
_ => {}
}
Ok(())
}

async fn start_event_task(&self, conversation_id: Uuid) {
info!("Event Task started for {conversation_id}");

Expand Down Expand Up @@ -466,8 +550,8 @@ impl MessageStore {
.ipfs
.pubsub_peers(Some(topic.clone()))
.await?;
let peer_id = did_to_libp2p_pub(&sender)
.map(|pk| pk.to_peer_id())?;

let peer_id = sender.to_peer_id()?;

let bytes = payload.to_bytes()?;

Expand Down Expand Up @@ -628,7 +712,7 @@ impl MessageStore {
let topic = conversation.reqres_topic(did);

let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?;
let peer_id = did_to_libp2p_pub(did).map(|pk| pk.to_peer_id())?;
let peer_id = did.to_peer_id()?;

if !peers.contains(&peer_id)
|| (peers.contains(&peer_id)
Expand Down Expand Up @@ -715,7 +799,6 @@ impl MessageStore {
Cipher::direct_decrypt(data.data(), &key)
}
};
drop(conversation);

let bytes = match bytes_results {
Ok(b) => b,
Expand Down Expand Up @@ -1656,7 +1739,7 @@ impl MessageStore {

self.start_task(convo_id, stream).await;

let peer_id = did_to_libp2p_pub(did_key)?.to_peer_id();
let peer_id = did_key.to_peer_id()?;

let event = ConversationEvents::NewConversation {
recipient: own_did.clone(),
Expand Down Expand Up @@ -1781,8 +1864,7 @@ impl MessageStore {
.iter()
.filter(|did| own_did.ne(did))
.map(|did| (did.clone(), did))
.filter_map(|(a, b)| did_to_libp2p_pub(b).map(|pk| (a, pk)).ok())
.map(|(did, pk)| (did, pk.to_peer_id()))
.filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok())
.collect::<Vec<_>>();

let conversation = self.conversations.get(convo_id).await?;
Expand Down Expand Up @@ -1891,8 +1973,8 @@ impl MessageStore {
.iter()
.filter(|did| own_did.ne(did))
.map(|did| (did.clone(), did))
.filter_map(|(a, b)| did_to_libp2p_pub(b).map(|pk| (a, pk)).ok())
.map(|(did, pk)| (did, pk.to_peer_id()))
.filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok())
.map(|(did, pk)| (did, pk))
.collect::<Vec<_>>();

let event = serde_json::to_vec(&ConversationEvents::DeleteConversation {
Expand Down Expand Up @@ -2051,7 +2133,7 @@ impl MessageStore {

let payload = Payload::new(own_did, &bytes, &signature);

let peer_id = did_to_libp2p_pub(did_key)?.to_peer_id();
let peer_id = did_key.to_peer_id()?;
let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?;

let mut time = true;
Expand Down Expand Up @@ -3238,7 +3320,7 @@ impl MessageStore {
.iter()
.filter(|did| own_did.ne(did))
{
let peer_id = did_to_libp2p_pub(recipient)?.to_peer_id();
let peer_id = recipient.to_peer_id()?;

// We want to confirm that there is atleast one peer subscribed before attempting to send a message
match peers.contains(&peer_id) {
Expand Down
114 changes: 112 additions & 2 deletions extensions/warp-ipfs/tests/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ mod common;
mod test {
use futures::StreamExt;
use std::time::Duration;
use warp::raygun::{
ConversationType, MessageEvent, MessageEventKind, PinState, RayGunEventKind, ReactionState,
use warp::{
multipass::MultiPassEventKind,
raygun::{
ConversationType, MessageEvent, MessageEventKind, PinState, RayGunEventKind,
ReactionState,
},
};

use crate::common::create_accounts_and_chat;
Expand Down Expand Up @@ -808,4 +812,110 @@ mod test {

Ok(())
}

#[tokio::test]
async fn delete_conversation_when_blocked() -> anyhow::Result<()> {
let accounts = create_accounts_and_chat(vec![
(
None,
None,
Some("test::delete_conversation_when_blocked".into()),
),
(
None,
None,
Some("test::delete_conversation_when_blocked".into()),
),
])
.await?;

let (mut _account_a, mut chat_a, did_a, _) = accounts.first().cloned().unwrap();
let (mut _account_b, mut chat_b, did_b, _) = accounts.last().cloned().unwrap();

let mut account_subscribe_a = _account_a.subscribe().await?;
let mut account_subscribe_b = _account_b.subscribe().await?;

let mut chat_subscribe_a = chat_a.subscribe().await?;
let mut chat_subscribe_b = chat_b.subscribe().await?;

chat_a.create_conversation(&did_b).await?;

let id_a = tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(RayGunEventKind::ConversationCreated { conversation_id }) =
chat_subscribe_a.next().await
{
break conversation_id;
}
}
})
.await?;

let id_b = tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(RayGunEventKind::ConversationCreated { conversation_id }) =
chat_subscribe_b.next().await
{
break conversation_id;
}
}
})
.await?;

assert_eq!(id_a, id_b);

let conversation = chat_a.get_conversation(id_a).await?;
assert_eq!(conversation.conversation_type(), ConversationType::Direct);
assert_eq!(conversation.recipients().len(), 2);
assert!(conversation.recipients().contains(&did_a));
assert!(conversation.recipients().contains(&did_b));

_account_a.block(&did_b).await?;

tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(MultiPassEventKind::Blocked { did }) = account_subscribe_a.next().await
{
assert_eq!(did, did_b);
break;
}
}
})
.await?;

tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(RayGunEventKind::ConversationDeleted { .. }) =
chat_subscribe_a.next().await
{
break;
}
}
})
.await?;

tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(MultiPassEventKind::BlockedBy { did }) =
account_subscribe_b.next().await
{
assert_eq!(did, did_a);
break;
}
}
})
.await?;

tokio::time::timeout(Duration::from_secs(60), async {
loop {
if let Some(RayGunEventKind::ConversationDeleted { .. }) =
chat_subscribe_b.next().await
{
break;
}
}
})
.await?;
Ok(())
}
}
Loading

0 comments on commit 5288636

Please sign in to comment.