diff --git a/extensions/warp-ipfs/src/store/keystore.rs b/extensions/warp-ipfs/src/store/keystore.rs index d05667cb0..ce15fcbdd 100644 --- a/extensions/warp-ipfs/src/store/keystore.rs +++ b/extensions/warp-ipfs/src/store/keystore.rs @@ -210,10 +210,10 @@ mod test { let mut keystore = Keystore::default(); let keypair = DID::default(); - let recipients = (0..100).map(|_| DID::default()).collect::>(); + let recipients = (0..10).map(|_| DID::default()).collect::>(); for recipient in recipients.iter() { - for key in (0..100).map(|_| generate::<32>()) { + for key in (0..recipients.len()).map(|_| generate::<32>()) { keystore.insert(&keypair, recipient, key)?; } } diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 74606c682..c725bff3d 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -157,7 +157,6 @@ impl MessageStore { conversation_sender, conversation_task_tx, identity, - // friends, discovery, filesystem, queue, @@ -170,46 +169,59 @@ impl MessageStore { }; info!("Loading existing conversations task"); - if let Err(_e) = store.load_conversations().await {} + if let Err(e) = store.load_conversations().await { + warn!("Failed to load conversations: {e}"); + } + + info!("Loading queue"); + if let Err(e) = store.load_queue().await { + warn!("Failed to load queue: {e}"); + } + + let mut stream = store + .ipfs + .pubsub_subscribe(store.did.messaging()) + .await? + .boxed(); tokio::spawn({ let mut store = store.clone(); async move { info!("MessagingStore task created"); - tokio::spawn({ - let store = store.clone(); - async move { - info!("Loading queue"); - // Load the queue in a separate task in case it is large - // Note: In the future this will not be needed once a req/res system - // is implemented - if let Err(_e) = store.load_queue().await {} - } - }); - - let did = &*(store.did.clone()); - let Ok(stream) = store.ipfs.pubsub_subscribe(did.messaging()).await else { - error!("Unable to create subscription stream. Terminating task"); - //TODO: Maybe panic? - return; - }; - futures::pin_mut!(stream); let mut interval = tokio::time::interval(Duration::from_millis(interval_ms)); + loop { tokio::select! { - message = stream.next() => { - if let Some(message) = message { - if let Ok(payload) = Payload::from_bytes(&message.data) { - if let Ok(data) = ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) { - if let Ok(events) = serde_json::from_slice::(&data) { - if let Err(e) = store.process_conversation(payload, events).await { - error!("Error processing conversation: {e}"); - } - } - } + Some(message) = stream.next() => { + let payload = match Payload::from_bytes(&message.data) { + Ok(payload) => payload, + Err(e) => { + tracing::log::warn!("Failed to parse payload data: {e}"); + continue; } - } + }; + + let data = match ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) { + Ok(d) => d, + Err(e) => { + tracing::log::warn!("Failed to decrypt message from {}: {e}", payload.sender()); + continue; + } + }; + + let events = match serde_json::from_slice::(&data) { + Ok(ev) => ev, + Err(e) => { + tracing::log::warn!("Failed to parse message: {e}"); + continue; + } + }; + + if let Err(e) = store.process_conversation(payload, events).await { + error!("Error processing conversation: {e}"); + } + } _ = interval.tick() => { if let Err(e) = store.process_queue().await { @@ -1463,17 +1475,11 @@ impl MessageStore { return Ok(()); } - tokio::spawn({ - let discovery = self.discovery.clone(); - let recipients = list.clone(); - async move { - for recipient in recipients { - if !discovery.contains(&recipient).await { - let _ = discovery.insert(recipient).await.ok(); - } - } + for recipient in &list { + if !self.discovery.contains(recipient).await { + let _ = self.discovery.insert(recipient).await; } - }); + } info!("Creating conversation"); let convo = ConversationDocument::new( @@ -2171,10 +2177,7 @@ impl MessageStore { } } - let _ = self - .remove_conversation_keystore(conversation_id) - .await - .ok(); + let _ = self.remove_conversation_keystore(conversation_id).await; if let Err(e) = self .event @@ -2857,21 +2860,16 @@ impl MessageStore { }; //We want to send the event to the recipients until the creator can remove them from the conversation directly - tokio::spawn({ - let event = event.clone(); - let mut store = self.clone(); - let list = list.to_vec(); - async move { - for did in list.iter() { - if let Err(_e) = store - .send_single_conversation_event(did, conversation_id, event.clone()) - .await - { - // - } - } + + for did in list.iter() { + if let Err(e) = self + .send_single_conversation_event(did, conversation_id, event.clone()) + .await + { + tracing::log::error!("Error sending conversation event to {did}: {e}"); + continue; } - }); + } self.send_single_conversation_event(creator, conversation_id, event) .await