diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index e840e3b16..9c81a826a 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -41,7 +41,7 @@ use super::{ cache::IdentityCache, identity::IdentityDocument, image_dag::get_image, root::RootDocumentMap, ResolvedRootDocument, RootDocument, }, - ecdh_decrypt, ecdh_encrypt, + ecdh_encrypt, event_subscription::EventSubscription, payload::PayloadMessage, phonebook::PhoneBook, @@ -530,12 +530,18 @@ impl IdentityStore { tracing::info!("Received event from {in_did}"); - let event = match ecdh_decrypt(store.root_document().keypair(), Some(&in_did), &message.data).and_then(|bytes| { - serde_json::from_slice::(&bytes).map_err(Error::from) - }) { - Ok(e) => e, + let payload: PayloadMessage = match PayloadMessage::from_bytes(&message.data) { + Ok(p) => p, Err(e) => { - tracing::error!("Failed to decrypt payload from {in_did}: {e}"); + tracing::error!(error = %e, from = %in_did, "failed to process payload"); + continue; + } + }; + + let event = match payload.message(store.root_document().keypair()) { + Ok(event) => event, + Err(e) => { + tracing::error!("Failed to decrypt payload from {in_did}: {e}"); continue; } }; @@ -545,11 +551,9 @@ impl IdentityStore { if let Err(e) = store.process_message(&in_did, event, false).await { tracing::error!("Failed to process identity message from {in_did}: {e}"); } - - } Some(event) = friend_stream.next() => { - let payload = match PayloadMessage::>::from_bytes(&event.data) { + let payload = match PayloadMessage::::from_bytes(&event.data) { Ok(p) => p, Err(_e) => { continue; @@ -568,17 +572,8 @@ impl IdentityStore { tracing::info!("Received event from {did}"); - let msg = match payload.message(None) { + let data = match payload.message(store.root_document.keypair()) { Ok(m) => m, - Err(_) => { - continue; - } - }; - - let data = match ecdh_decrypt(store.root_document().keypair(), Some(&did), msg).and_then(|bytes| { - serde_json::from_slice::(&bytes).map_err(Error::from) - }) { - Ok(pl) => pl, Err(e) => { if let Some(tx) = signal { let _ = tx.send(Err(e)); @@ -920,9 +915,11 @@ impl IdentityStore { let event = IdentityEvent::Request { option }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -950,8 +947,6 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = self.root_document.keypair(); - let mut identity = self.own_identity_document().await?; let is_friend = self.is_friend(out_did).await.unwrap_or_default(); @@ -980,9 +975,11 @@ impl IdentityStore { option: ResponseOption::Identity { identity: payload }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(kp_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -1035,9 +1032,11 @@ impl IdentityStore { }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -1112,9 +1111,11 @@ impl IdentityStore { option: ResponseOption::Metadata { data: metadata }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -2960,10 +2961,10 @@ impl IdentityStore { let kp = self.root_document.keypair(); - let payload_bytes = serde_json::to_vec(&payload)?; - - let bytes = ecdh_encrypt(kp, Some(recipient), payload_bytes)?; - let message = PayloadBuilder::new(kp, bytes).build()?; + let message = PayloadBuilder::new(kp, payload.clone()) + .add_recipient(recipient)? + .from_ipfs(&self.ipfs) + .await?; let message_bytes = message.to_bytes()?; diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 8fd0c1f06..464c8d784 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -35,7 +35,6 @@ use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt}; use crate::store::{ conversation::ConversationDocument, discovery::Discovery, - ecdh_decrypt, ecdh_encrypt, event_subscription::EventSubscription, files::FileStore, generate_shared_topic, @@ -2053,7 +2052,7 @@ impl ConversationTask { } } Some(message) = stream.next() => { - let payload = match PayloadMessage::>::from_bytes(&message.data) { + let payload = match PayloadMessage::::from_bytes(&message.data) { Ok(payload) => payload, Err(e) => { tracing::warn!("Failed to parse payload data: {e}"); @@ -2061,38 +2060,25 @@ impl ConversationTask { } }; - let sender = match payload.sender().to_did() { + let sender_peer_id = payload.sender(); + + let sender = match sender_peer_id.to_did() { Ok(did) => did, Err(e) => { - tracing::warn!(sender = %payload.sender(), error = %e, "unable to convert to did"); + tracing::warn!(sender = %sender_peer_id, error = %e, "unable to convert to did"); continue; } }; - let msg = match payload.message(None) { + let event = match payload.message(self.identity.root_document().keypair()) { Ok(m) => m, - Err(_) => { - continue - } - }; - - let data = match ecdh_decrypt(self.identity.root_document().keypair(), Some(&sender), msg) { - Ok(d) => d, - Err(e) => { - tracing::warn!(%sender, error = %e, "failed to decrypt message"); - continue; - } - }; - - let events = match serde_json::from_slice::(&data) { - Ok(ev) => ev, Err(e) => { - tracing::warn!(%sender, error = %e, "failed to parse message"); - continue; + tracing::error!(%sender, error = %e, "unable to obtain message from payload"); + continue } }; - if let Err(e) = process_conversation(&mut *self.inner.write().await, payload, events).await { + if let Err(e) = process_conversation(&mut *self.inner.write().await, *sender_peer_id, event).await { tracing::error!(%sender, error = %e, "error processing conversation"); } } @@ -2259,26 +2245,27 @@ impl ConversationInner { recipient: own_did.clone(), }; - let bytes = ecdh_encrypt(self.root.keypair(), Some(did), serde_json::to_vec(&event)?)?; - - let payload = PayloadBuilder::new(self.root.keypair(), bytes) + let payload = PayloadBuilder::new(self.root.keypair(), event) + .add_recipient(did)? .from_ipfs(&self.ipfs) .await?; + let payload_bytes = payload.to_bytes()?; + let peers = self.ipfs.pubsub_peers(Some(did.messaging())).await?; if !peers.contains(&peer_id) || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(did.messaging(), payload.to_bytes()?) + .pubsub_publish(did.messaging(), payload_bytes.clone()) .await .is_err()) { tracing::warn!(conversation_id = %convo_id, "Unable to publish to topic. Queuing event"); self.queue_event( did.clone(), - Queue::direct(peer_id, did.messaging(), payload.message(None)?.to_vec()), + Queue::direct(peer_id, did.messaging(), payload_bytes.to_vec()), ) .await; } @@ -2367,30 +2354,31 @@ impl ConversationInner { .filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok()) .collect::>(); - let event = serde_json::to_vec(&ConversationEvents::NewGroupConversation { + let event = ConversationEvents::NewGroupConversation { conversation: conversation.clone(), - })?; + }; - for (did, peer_id) in peer_id_list { - let bytes = ecdh_encrypt(self.root.keypair(), Some(&did), &event)?; + let payload = PayloadBuilder::new(self.root.keypair(), event) + .add_recipients(peer_id_list.iter().map(|(did, _)| did))? + .from_ipfs(&self.ipfs) + .await?; - let payload = PayloadBuilder::new(self.root.keypair(), bytes) - .from_ipfs(&self.ipfs) - .await?; + let payload_bytes = payload.to_bytes()?; + for (did, peer_id) in peer_id_list { let peers = self.ipfs.pubsub_peers(Some(did.messaging())).await?; if !peers.contains(&peer_id) || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(did.messaging(), payload.to_bytes()?) + .pubsub_publish(did.messaging(), payload_bytes.clone()) .await .is_err()) { tracing::warn!("Unable to publish to topic. Queuing event"); self.queue_event( did.clone(), - Queue::direct(peer_id, did.messaging(), payload.message(None)?.to_vec()), + Queue::direct(peer_id, did.messaging(), payload_bytes.to_vec()), ) .await; } @@ -2602,12 +2590,13 @@ impl ConversationInner { let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did), serde_json::to_vec(&request)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, request) + .add_recipient(did)? .from_ipfs(&self.ipfs) .await?; + let payload_bytes = payload.to_bytes()?; + let topic = conversation.exchange_topic(did); let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?; @@ -2616,14 +2605,14 @@ impl ConversationInner { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(topic.clone(), payload.to_bytes()?) + .pubsub_publish(topic.clone(), payload_bytes.clone()) .await .is_err()) { tracing::warn!(%conversation_id, "Unable to publish to topic"); self.queue_event( did.clone(), - Queue::direct(peer_id, topic.clone(), payload.message(None)?.to_vec()), + Queue::direct(peer_id, topic.clone(), payload_bytes.clone()), ) .await; } @@ -2648,9 +2637,8 @@ impl ConversationInner { let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did), serde_json::to_vec(&request)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, request) + .add_recipient(did)? .from_ipfs(&self.ipfs) .await?; @@ -2669,7 +2657,7 @@ impl ConversationInner { tracing::warn!(%community_id, "Unable to publish to topic"); self.queue_event( did.clone(), - Queue::direct(peer_id, topic.clone(), payload.message(None)?.to_vec()), + Queue::direct(peer_id, topic.clone(), payload.to_bytes()?), ) .await; } @@ -2725,19 +2713,19 @@ impl ConversationInner { .filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok()) .collect::>(); - let event = serde_json::to_vec(&ConversationEvents::DeleteConversation { + let event = ConversationEvents::DeleteConversation { conversation_id: document_type.id(), - })?; + }; - let main_timer = Instant::now(); - for (recipient, peer_id) in peer_id_list { - let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(&recipient), &event)?; + let payload = PayloadBuilder::new(self.root.keypair(), event) + .add_recipients(peer_id_list.iter().map(|(did, _)| did))? + .from_ipfs(&self.ipfs) + .await?; - let payload = PayloadBuilder::new(keypair, bytes) - .from_ipfs(&self.ipfs) - .await?; + let payload_bytes = payload.to_bytes()?; + let main_timer = Instant::now(); + for (recipient, peer_id) in peer_id_list { let peers = self.ipfs.pubsub_peers(Some(recipient.messaging())).await?; let timer = Instant::now(); let mut time = true; @@ -2745,7 +2733,7 @@ impl ConversationInner { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(recipient.messaging(), payload.to_bytes()?) + .pubsub_publish(recipient.messaging(), payload_bytes.clone()) .await .is_err()) { @@ -2756,11 +2744,7 @@ impl ConversationInner { // For now we will queue the message if we hit an error self.queue_event( recipient.clone(), - Queue::direct( - peer_id, - recipient.messaging(), - payload.message(None)?.to_vec(), - ), + Queue::direct(peer_id, recipient.messaging(), payload_bytes.clone()), ) .await; time = false; @@ -2829,16 +2813,15 @@ impl ConversationInner { did_key: &DID, event: ConversationEvents, ) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; - let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did_key), &event)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, event) + .add_recipient(did_key)? .from_ipfs(&self.ipfs) .await?; + let payload_bytes = payload.to_bytes()?; + let peer_id = did_key.to_peer_id()?; let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?; @@ -2848,18 +2831,14 @@ impl ConversationInner { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(did_key.messaging(), payload.to_bytes()?) + .pubsub_publish(did_key.messaging(), payload_bytes.clone()) .await .is_err()) { tracing::warn!(%conversation_id, "Unable to publish to topic. Queuing event"); self.queue_event( did_key.clone(), - Queue::direct( - peer_id, - did_key.messaging(), - payload.message(None)?.to_vec(), - ), + Queue::direct(peer_id, did_key.messaging(), payload_bytes.clone()), ) .await; time = false; @@ -2984,19 +2963,18 @@ impl ConversationInner { .filter_map(|(a, b)| b.to_peer_id().map(|pk| (a, pk)).ok()) .collect::>(); - let event = serde_json::to_vec(&ConversationEvents::DeleteCommunity { + let event = ConversationEvents::DeleteCommunity { community_id: doc.id(), - })?; + }; + + let keypair = self.root.keypair(); + let payload = PayloadBuilder::new(keypair, event) + .add_recipients(peer_id_list.iter().map(|(did, _)| did))? + .from_ipfs(&self.ipfs) + .await?; let main_timer = Instant::now(); for (recipient, peer_id) in peer_id_list { - let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(&recipient), &event)?; - - let payload = PayloadBuilder::new(keypair, bytes) - .from_ipfs(&self.ipfs) - .await?; - let peers = self.ipfs.pubsub_peers(Some(recipient.messaging())).await?; let timer = Instant::now(); let mut time = true; @@ -3015,11 +2993,7 @@ impl ConversationInner { // For now we will queue the message if we hit an error self.queue_event( recipient.clone(), - Queue::direct( - peer_id, - recipient.messaging(), - payload.message(None)?.to_vec(), - ), + Queue::direct(peer_id, recipient.messaging(), payload.to_bytes()?), ) .await; time = false; @@ -3090,7 +3064,7 @@ impl ConversationInner { async fn process_conversation( this: &mut ConversationInner, - data: PayloadMessage>, + sender: PeerId, event: ConversationEvents, ) -> Result<(), Error> { match event { @@ -3202,7 +3176,7 @@ async fn process_conversation( return Err(anyhow::anyhow!("Conversation {conversation_id} doesnt exist").into()); } - let sender = data.sender().to_did()?; + let sender = sender.to_did()?; match this.get(conversation_id).await { Ok(conversation) @@ -3265,7 +3239,7 @@ async fn process_conversation( return Err(anyhow::anyhow!("Community {community_id} doesnt exist").into()); } - let sender = data.sender().to_did()?; + let sender = sender.to_did()?; match this.get_community_document(community_id).await { Ok(community) if community.owner.eq(&sender) => community, @@ -3460,12 +3434,13 @@ async fn process_identity_events( struct Queue { peer: PeerId, topic: String, - data: Vec, + data: Bytes, sent: bool, } impl Queue { - pub fn direct(peer: PeerId, topic: String, data: Vec) -> Self { + pub fn direct(peer: PeerId, topic: String, data: impl Into) -> Self { + let data = data.into(); Queue { peer, topic, @@ -3478,7 +3453,6 @@ impl Queue { //TODO: Replace async fn _process_queue(this: &mut ConversationInner) { let mut changed = false; - let keypair = &this.root.keypair().clone(); for (did, items) in this.queue.iter_mut() { let Ok(peer_id) = did.to_peer_id() else { continue; @@ -3511,22 +3485,7 @@ async fn _process_queue(this: &mut ConversationInner) { continue; } - let payload = match PayloadBuilder::<_>::new(keypair, data.clone()) - .from_ipfs(&this.ipfs) - .await - { - Ok(p) => p, - Err(_e) => { - // tracing::warn!(error = %_e, "unable to build payload") - continue; - } - }; - - let Ok(bytes) = payload.to_bytes() else { - continue; - }; - - if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), bytes).await { + if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), data.clone()).await { tracing::error!("Error publishing to topic: {e}"); continue; } diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index de0ab4909..ad3a0a6a2 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -31,11 +31,7 @@ use warp::raygun::{ MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState, RayGunEventKind, ReactionState, }; -use warp::{ - crypto::{cipher::Cipher, generate}, - error::Error, - raygun::MessageEventKind, -}; +use warp::{crypto::generate, error::Error, raygun::MessageEventKind}; use web_time::Instant; use crate::store::community::{ @@ -326,7 +322,7 @@ pub struct CommunityTask { file: FileStore, identity: IdentityStore, discovery: Discovery, - pending_key_exchange: IndexMap, bool)>>, + pending_key_exchange: IndexMap>, document: CommunityDocument, keystore: Keystore, @@ -1180,16 +1176,15 @@ impl CommunityTask { did_key: &DID, event: ConversationEvents, ) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; - let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did_key), &event)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, event) + .add_recipient(did_key)? .from_ipfs(&self.ipfs) .await?; + let bytes = payload.to_bytes()?; + let peer_id = did_key.to_peer_id()?; let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?; @@ -1199,19 +1194,14 @@ impl CommunityTask { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(did_key.messaging(), payload.to_bytes()?) + .pubsub_publish(did_key.messaging(), bytes.clone()) .await .is_err()) { tracing::warn!(id=%&self.community_id, "Unable to publish to topic. Queuing event"); self.queue_event( did_key.clone(), - QueueItem::direct( - None, - peer_id, - did_key.messaging(), - payload.message(None)?.to_vec(), - ), + QueueItem::direct(None, peer_id, did_key.messaging(), bytes.clone()), ) .await; time = false; @@ -1226,51 +1216,49 @@ impl CommunityTask { } async fn process_msg_event(&mut self, msg: Message) -> Result<(), Error> { - let data = PayloadMessage::>::from_bytes(&msg.data)?; + let data = PayloadMessage::::from_bytes(&msg.data)?; let sender = data.sender().to_did()?; let keypair = self.root.keypair(); let id = self.community_id; - let bytes = { - let key = match self.keystore.get_latest(keypair, &sender) { - Ok(key) => key, - Err(Error::PublicKeyDoesntExist) => { - // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange - // So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again - - // Note: We can set aside the data without the payload being owned directly due to the data already been verified - // so we can own the data directly without worrying about the lifetime - // however, we may want to eventually validate the data to ensure it havent been tampered in some way - // while waiting for the response. - - self.pending_key_exchange - .entry(sender) - .or_default() - .push((data.message(None)?, false)); - - // Maybe send a request? Although we could, we should check to determine if one was previously sent or queued first, - // but for now we can leave this commented until the queue is removed and refactored. - // _ = self.request_key(id, &data.sender()).await; - - // Note: We will mark this as `Ok` since this is pending request to be resolved - return Ok(()); - } - Err(e) => { - tracing::warn!(id = %id, sender = %data.sender(), error = %e, "Failed to obtain key"); - return Err(e); - } - }; - - Cipher::direct_decrypt(&data.message(None)?, &key)? + let event = match self.keystore.get_latest(keypair, &sender) { + Ok(key) => data.message_from_key(&key)?, + Err(Error::PublicKeyDoesntExist) => { + // match data.message(keypair) { + // Ok(message) => { + // message + // } + // _ => { + // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange + // So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again + + // Note: We can set aside the data without the payload being owned directly due to the data already been verified + // so we can own the data directly without worrying about the lifetime + // however, we may want to eventually validate the data to ensure it havent been tampered in some way + // while waiting for the response. + let bytes = data.to_bytes()?; + self.pending_key_exchange + .entry(sender) + .or_default() + .push((bytes, false)); + + // Maybe send a request? Although we could, we should check to determine if one was previously sent or queued first, + // but for now we can leave this commented until the queue is removed and refactored. + // _ = self.request_key(id, &data.sender()).await; + + // Note: We will mark this as `Ok` since this is pending request to be resolved + return Ok(()); + // } + // } + } + Err(e) => { + tracing::warn!(id = %id, sender = %data.sender(), error = %e, "Failed to obtain key"); + return Err(e); + } }; - let event = serde_json::from_slice::(&bytes).map_err(|e| { - tracing::warn!(id = %id, sender = %data.sender(), error = %e, "Failed to deserialize message"); - e - })?; - message_event(self, &sender, event).await?; Ok(()) @@ -1300,12 +1288,13 @@ impl CommunityTask { let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did), serde_json::to_vec(&request)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, request) + .add_recipient(did)? .from_ipfs(&self.ipfs) .await?; + let bytes = payload.to_bytes()?; + let topic = community.exchange_topic(did); let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?; @@ -1314,19 +1303,14 @@ impl CommunityTask { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(topic.clone(), payload.to_bytes()?) + .pubsub_publish(topic.clone(), bytes.clone()) .await .is_err()) { tracing::warn!(id = %self.community_id, "Unable to publish to topic"); self.queue_event( did.clone(), - QueueItem::direct( - None, - peer_id, - topic.clone(), - payload.message(None)?.to_vec(), - ), + QueueItem::direct(None, peer_id, topic.clone(), bytes), ) .await; } @@ -1337,13 +1321,13 @@ impl CommunityTask { } pub async fn send_message_event(&self, event: CommunityMessagingEvents) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; - let key = self.community_key(None)?; - let bytes = Cipher::direct_encrypt(&event, &key)?; + let recipients = self.document.participants(); - let payload = PayloadBuilder::new(self.root.keypair(), bytes) + let payload = PayloadBuilder::new(self.root.keypair(), event) + .add_recipients(recipients)? + .set_key(key) .from_ipfs(&self.ipfs) .await?; @@ -3667,15 +3651,21 @@ impl CommunityTask { queue: bool, exclude: Vec, ) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; let keypair = self.root.keypair(); let own_did = self.identity.did_key(); let key = self.community_key(None)?; - let bytes = Cipher::direct_encrypt(&event, &key)?; + let recipients = self.document.participants(); - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, event) + .add_recipients( + recipients + .iter() + .filter(|did| own_did.ne(did)) + .filter(|did| !exclude.contains(did)), + )? + .set_key(key) .from_ipfs(&self.ipfs) .await?; @@ -3685,6 +3675,8 @@ impl CommunityTask { let recipients = self.document.participants().clone(); + let bytes = payload.to_bytes()?; + for recipient in recipients .iter() .filter(|did| own_did.ne(did)) @@ -3705,7 +3697,7 @@ impl CommunityTask { message_id, peer_id, self.document.topic(), - payload.message(None)?.to_vec(), + bytes.clone(), ), ) .await; @@ -3715,7 +3707,6 @@ impl CommunityTask { } if can_publish { - let bytes = payload.to_bytes()?; tracing::trace!(id = %self.community_id, "Payload size: {} bytes", bytes.len()); let timer = Instant::now(); let mut time = true; @@ -4522,13 +4513,11 @@ async fn process_request_response_event( let keypair = &this.root.keypair().clone(); let own_did = this.identity.did_key(); - let payload = PayloadMessage::>::from_bytes(&req.data)?; + let payload = PayloadMessage::::from_bytes(&req.data)?; let sender = payload.sender().to_did()?; - let data = ecdh_decrypt(keypair, Some(&sender), payload.message(None)?)?; - - let event = serde_json::from_slice::(&data)?; + let event = payload.message(keypair)?; tracing::debug!(id=%this.community_id, ?event, "Event received"); match event { @@ -4568,9 +4557,8 @@ async fn process_request_response_event( let topic = this.document.exchange_topic(&sender); - let bytes = ecdh_encrypt(keypair, Some(&sender), serde_json::to_vec(&response)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, response) + .add_recipient(&sender)? .from_ipfs(&this.ipfs) .await?; @@ -4588,7 +4576,7 @@ async fn process_request_response_event( || (peers.contains(&peer_id) && this .ipfs - .pubsub_publish(topic.clone(), bytes) + .pubsub_publish(topic.clone(), bytes.clone()) .await .is_err()) { @@ -4596,12 +4584,7 @@ async fn process_request_response_event( // TODO this.queue_event( sender.clone(), - QueueItem::direct( - None, - peer_id, - topic.clone(), - payload.message(None)?.to_vec(), - ), + QueueItem::direct(None, peer_id, topic.clone(), bytes.clone()), ) .await; } @@ -4671,8 +4654,8 @@ async fn process_pending_payload(this: &mut CommunityTask) { let event_fn = || { let keypair = root.keypair(); let key = store.get_latest(keypair, &sender)?; - let data = Cipher::direct_decrypt(&data, &key)?; - let event = serde_json::from_slice(&data)?; + let payload = PayloadMessage::<_>::from_bytes(&data)?; + let event = payload.message_from_key(&key)?; Ok::<_, Error>(event) }; @@ -4691,14 +4674,12 @@ async fn process_pending_payload(this: &mut CommunityTask) { } async fn process_community_event(this: &mut CommunityTask, message: Message) -> Result<(), Error> { - let payload = PayloadMessage::>::from_bytes(&message.data)?; + let payload = PayloadMessage::::from_bytes(&message.data)?; let sender = payload.sender().to_did()?; let key = this.community_key(Some(&sender))?; - let data = Cipher::direct_decrypt(&payload.message(None)?, &key)?; - - let event = match serde_json::from_slice::(&data)? { + let event = match payload.message_from_key(&key)? { event @ CommunityMessagingEvents::Event { .. } => event, _ => return Err(Error::Other), }; @@ -4739,12 +4720,13 @@ struct QueueItem { m_id: Option, peer: PeerId, topic: String, - data: Vec, + data: Bytes, sent: bool, } impl QueueItem { - pub fn direct(m_id: Option, peer: PeerId, topic: String, data: Vec) -> Self { + pub fn direct(m_id: Option, peer: PeerId, topic: String, data: impl Into) -> Self { + let data = data.into(); QueueItem { m_id, peer, @@ -4758,7 +4740,6 @@ impl QueueItem { //TODO: Replace async fn process_queue(this: &mut CommunityTask) { let mut changed = false; - let keypair = &this.root.keypair().clone(); for (did, items) in this.queue.iter_mut() { let Ok(peer_id) = did.to_peer_id() else { continue; @@ -4792,22 +4773,7 @@ async fn process_queue(this: &mut CommunityTask) { continue; } - let payload = match PayloadBuilder::<_>::new(keypair, data.clone()) - .from_ipfs(&this.ipfs) - .await - { - Ok(p) => p, - Err(_e) => { - // tracing::warn!(error = %_e, "unable to build payload") - continue; - } - }; - - let Ok(bytes) = payload.to_bytes() else { - continue; - }; - - if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), bytes).await { + if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), data.clone()).await { tracing::error!("Error publishing to topic: {e}"); continue; } diff --git a/extensions/warp-ipfs/src/store/message/task.rs b/extensions/warp-ipfs/src/store/message/task.rs index 204eac62c..bdae073e2 100644 --- a/extensions/warp-ipfs/src/store/message/task.rs +++ b/extensions/warp-ipfs/src/store/message/task.rs @@ -29,7 +29,7 @@ use warp::raygun::{ RayGunEventKind, }; use warp::{ - crypto::{cipher::Cipher, generate}, + crypto::generate, error::Error, raygun::{ ConversationType, GroupPermission, ImplGroupPermissions, MessageEventKind, PinState, @@ -235,7 +235,7 @@ pub struct ConversationTask { identity: IdentityStore, discovery: Discovery, pending_key_request_sent: IndexSet, - pending_key_exchange: IndexMap, bool)>>, + pending_key_exchange: IndexMap>, document: ConversationDocument, keystore: Keystore, @@ -964,16 +964,15 @@ impl ConversationTask { did_key: &DID, event: ConversationEvents, ) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; - let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did_key), &event)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, event) + .add_recipient(did_key)? .from_ipfs(&self.ipfs) .await?; + let bytes = payload.to_bytes()?; + let peer_id = did_key.to_peer_id()?; let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?; @@ -983,19 +982,14 @@ impl ConversationTask { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(did_key.messaging(), payload.to_bytes()?) + .pubsub_publish(did_key.messaging(), bytes.clone()) .await .is_err()) { tracing::warn!(id=%&self.conversation_id, "Unable to publish to topic. Queuing event"); self.queue_event( did_key.clone(), - QueueItem::direct( - None, - peer_id, - did_key.messaging(), - payload.message(None)?.to_vec(), - ), + QueueItem::direct(None, peer_id, did_key.messaging(), bytes), ) .await; time = false; @@ -1091,7 +1085,7 @@ impl ConversationTask { } async fn process_msg_event(&mut self, msg: Message) -> Result<(), Error> { - let data = PayloadMessage::>::from_bytes(&msg.data)?; + let data = PayloadMessage::::from_bytes(&msg.data)?; let sender = data.sender().to_did()?; let keypair = self.root.keypair(); @@ -1100,7 +1094,7 @@ impl ConversationTask { let id = self.conversation_id; - let bytes = match self.document.conversation_type() { + let event = match self.document.conversation_type() { ConversationType::Direct => { let list = self.document.recipients(); @@ -1114,44 +1108,56 @@ impl ConversationTask { return Err(Error::IdentityDoesntExist); }; - ecdh_decrypt(keypair, Some(member), data.message(None)?)? + if &sender != *member { + return Err(Error::IdentityDoesntExist); + } + + data.message(keypair)? } ConversationType::Group => { - let key = match self.keystore.get_latest(keypair, &sender) { - Ok(key) => key, + let bytes = data.to_bytes()?; + match self.keystore.get_latest(keypair, &sender) { + Ok(key) => data.message_from_key(&key)?, Err(Error::PublicKeyDoesntExist) => { - // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange - // So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again - _ = self.ping(&sender).await; - - // Note: We can set aside the data without the payload being owned directly due to the data already been verified - // so we can own the data directly without worrying about the lifetime - // however, we may want to eventually validate the data to ensure it havent been tampered in some way - // while waiting for the response. - - self.pending_key_exchange - .entry(sender) - .or_default() - .push((data.message(None)?, false)); - - // Note: We will mark this as `Ok` since this is pending request to be resolved - return Ok(()); + // Lets first try to get the message from the payload. If we are not apart of the list of recipients, we will then + // queue the payload itself. + match data.message(keypair) { + Ok(message) => message, + _ => { + // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange + // So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again + _ = self.ping(&sender).await; + + // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange + // So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again + + // Note: We can set aside the data without the payload being owned directly due to the data already been verified + // so we can own the data directly without worrying about the lifetime + // however, we may want to eventually validate the data to ensure it havent been tampered in some way + // while waiting for the response. + + self.pending_key_exchange + .entry(sender) + .or_default() + .push((bytes, false)); + + // Maybe send a request? Although we could, we should check to determine if one was previously sent or queued first, + // but for now we can leave this commented until the queue is removed and refactored. + // _ = self.request_key(id, &data.sender()).await; + + // Note: We will mark this as `Ok` since this is pending request to be resolved + return Ok(()); + } + } } Err(e) => { tracing::warn!(id = %id, sender = %data.sender(), error = %e, "Failed to obtain key"); return Err(e); } - }; - - Cipher::direct_decrypt(&data.message(None)?, &key)? + } } }; - let event = serde_json::from_slice::(&bytes).map_err(|e| { - tracing::warn!(id = %id, sender = %data.sender(), error = %e, "Failed to deserialize message"); - e - })?; - message_event(self, &sender, event).await?; Ok(()) @@ -1260,12 +1266,13 @@ impl ConversationTask { let keypair = self.root.keypair(); - let bytes = ecdh_encrypt(keypair, Some(did), serde_json::to_vec(&request)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, request) + .add_recipient(did)? .from_ipfs(&self.ipfs) .await?; + let bytes = payload.to_bytes()?; + let topic = conversation.exchange_topic(did); let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?; @@ -1274,14 +1281,14 @@ impl ConversationTask { || (peers.contains(&peer_id) && self .ipfs - .pubsub_publish(topic.clone(), payload.to_bytes()?) + .pubsub_publish(topic.clone(), bytes.clone()) .await .is_err()) { tracing::warn!(id = %self.conversation_id, "Unable to publish to topic"); self.queue_event( did.clone(), - QueueItem::direct(None, peer_id, topic.clone(), payload.message(None)?), + QueueItem::direct(None, peer_id, topic.clone(), bytes), ) .await; } @@ -1900,14 +1907,14 @@ impl ConversationTask { } pub async fn send_message_event(&self, event: MessagingEvents) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; - let key = self.conversation_key(None)?; - let bytes = Cipher::direct_encrypt(&event, &key)?; + let recipients = self.document.recipients(); - let payload = PayloadBuilder::new(self.root.keypair(), bytes) + let payload = PayloadBuilder::new(self.root.keypair(), event) + .set_key(key) .from_ipfs(&self.ipfs) + .add_recipients(recipients)? .await?; let peers = self @@ -2634,28 +2641,34 @@ impl ConversationTask { event: MessagingEvents, queue: bool, ) -> Result<(), Error> { - let event = serde_json::to_vec(&event)?; let keypair = self.root.keypair(); let own_did = self.identity.did_key(); - let key = self.conversation_key(None)?; + let recipients = self.document.recipients(); - let bytes = Cipher::direct_encrypt(&event, &key)?; + let participants = recipients + .iter() + .filter(|did| own_did.ne(did)) + .collect::>(); - let payload = PayloadBuilder::new(keypair, bytes) + let key = self.conversation_key(None)?; + + let payload = PayloadBuilder::new(keypair, event) + .add_recipients(participants)? + // Note: We should probably not use the conversation key here but have each payload message be encrypted with a unique key while the underlining message + // could be encrypted with the conversation key + // TODO: Determine if we should use the conversation key at the payload level. + .set_key(key) .from_ipfs(&self.ipfs) .await?; + let payload_bytes = payload.to_bytes()?; + let peers = self.ipfs.pubsub_peers(Some(self.document.topic())).await?; let mut can_publish = false; - for recipient in self - .document - .recipients() - .iter() - .filter(|did| own_did.ne(did)) - { + for recipient in recipients.iter().filter(|did| own_did.ne(did)) { let peer_id = recipient.to_peer_id()?; // We want to confirm that there is atleast one peer subscribed before attempting to send a message @@ -2671,7 +2684,7 @@ impl ConversationTask { message_id, peer_id, self.document.topic(), - payload.message(None)?, + payload_bytes.clone(), ), ) .await; @@ -3393,13 +3406,11 @@ async fn process_request_response_event( let keypair = &this.root.keypair().clone(); let own_did = this.identity.did_key(); - let payload = PayloadMessage::>::from_bytes(&req.data)?; + let payload = PayloadMessage::::from_bytes(&req.data)?; let sender = payload.sender().to_did()?; - let data = ecdh_decrypt(keypair, Some(&sender), payload.message(None)?)?; - - let event = serde_json::from_slice::(&data)?; + let event = payload.message(keypair)?; tracing::debug!(id=%this.conversation_id, ?event, "Event received"); match event { @@ -3444,9 +3455,8 @@ async fn process_request_response_event( let topic = this.document.exchange_topic(&sender); - let bytes = ecdh_encrypt(keypair, Some(&sender), serde_json::to_vec(&response)?)?; - - let payload = PayloadBuilder::new(keypair, bytes) + let payload = PayloadBuilder::new(keypair, response) + .add_recipient(&sender)? .from_ipfs(&this.ipfs) .await?; @@ -3464,7 +3474,7 @@ async fn process_request_response_event( || (peers.contains(&peer_id) && this .ipfs - .pubsub_publish(topic.clone(), bytes) + .pubsub_publish(topic.clone(), bytes.clone()) .await .is_err()) { @@ -3472,7 +3482,7 @@ async fn process_request_response_event( // TODO this.queue_event( sender.clone(), - QueueItem::direct(None, peer_id, topic.clone(), payload.message(None)?), + QueueItem::direct(None, peer_id, topic.clone(), bytes.clone()), ) .await; } @@ -3604,8 +3614,8 @@ async fn process_pending_payload(this: &mut ConversationTask) { let event_fn = || { let keypair = root.keypair(); let key = store.get_latest(keypair, &sender)?; - let data = Cipher::direct_decrypt(&data, &key)?; - let event = serde_json::from_slice(&data)?; + let payload = PayloadMessage::::from_bytes(&data)?; + let event = payload.message_from_key(&key)?; Ok::<_, Error>(event) }; @@ -3627,14 +3637,12 @@ async fn process_conversation_event( this: &mut ConversationTask, message: Message, ) -> Result<(), Error> { - let payload = PayloadMessage::>::from_bytes(&message.data)?; + let payload = PayloadMessage::::from_bytes(&message.data)?; let sender = payload.sender().to_did()?; let key = this.conversation_key(Some(&sender))?; - let data = Cipher::direct_decrypt(&payload.message(None)?, &key)?; - - let event = match serde_json::from_slice::(&data)? { + let event = match payload.message_from_key(&key)? { event @ MessagingEvents::Event { .. } => event, _ => return Err(Error::Other), }; @@ -3672,12 +3680,13 @@ struct QueueItem { m_id: Option, peer: PeerId, topic: String, - data: Vec, + data: Bytes, sent: bool, } impl QueueItem { - pub fn direct(m_id: Option, peer: PeerId, topic: String, data: Vec) -> Self { + pub fn direct(m_id: Option, peer: PeerId, topic: String, data: impl Into) -> Self { + let data = data.into(); QueueItem { m_id, peer, @@ -3691,7 +3700,6 @@ impl QueueItem { //TODO: Replace async fn process_queue(this: &mut ConversationTask) { let mut changed = false; - let keypair = &this.root.keypair().clone(); for (did, items) in this.queue.iter_mut() { let Ok(peer_id) = did.to_peer_id() else { continue; @@ -3725,22 +3733,7 @@ async fn process_queue(this: &mut ConversationTask) { continue; } - let payload = match PayloadBuilder::<_>::new(keypair, data.clone()) - .from_ipfs(&this.ipfs) - .await - { - Ok(p) => p, - Err(_e) => { - // tracing::warn!(error = %_e, "unable to build payload") - continue; - } - }; - - let Ok(bytes) = payload.to_bytes() else { - continue; - }; - - if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), bytes).await { + if let Err(e) = this.ipfs.pubsub_publish(topic.clone(), data.clone()).await { tracing::error!("Error publishing to topic: {e}"); continue; } diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index cbc30a34a..5ca8e00a9 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -128,7 +128,6 @@ pub(super) mod topics { } pub(super) mod ds_key { - use rust_ipfs::{Ipfs, Keypair, PeerId, PublicKey}; pub trait DataStoreKey { @@ -258,6 +257,20 @@ impl DidExt for DID { } } +impl DidExt for &DID { + fn to_public_key(&self) -> Result { + (*self).to_public_key() + } + + fn to_peer_id(&self) -> Result { + (*self).to_peer_id() + } + + fn to_keypair(&self) -> Result { + (*self).to_keypair() + } +} + pub trait VecExt { fn insert_item(&mut self, item: T) -> bool; fn remove_item(&mut self, item: &T) -> bool; diff --git a/extensions/warp-ipfs/src/store/payload.rs b/extensions/warp-ipfs/src/store/payload.rs index 8b3a43eaa..62f4fba41 100644 --- a/extensions/warp-ipfs/src/store/payload.rs +++ b/extensions/warp-ipfs/src/store/payload.rs @@ -219,7 +219,7 @@ impl PayloadMessage { co_signature: None, }; - if !recipients.is_empty() { + if !recipients.is_empty() || key.is_some() { let keypair = cosigner.unwrap_or(keypair); let new_key = generate::<64>(); @@ -244,10 +244,6 @@ impl PayloadMessage { new_map.insert(recipient, key_set); } - if new_map.is_empty() { - return Err(Error::EmptyMessage); // TODO: error for arb message being empty - } - let sender = payload.sender(); // Although we could decrypt any of the keys above, we will have an entry for the sender diff --git a/extensions/warp-ipfs/src/store/queue.rs b/extensions/warp-ipfs/src/store/queue.rs index 8eb27ab0b..730775415 100644 --- a/extensions/warp-ipfs/src/store/queue.rs +++ b/extensions/warp-ipfs/src/store/queue.rs @@ -281,11 +281,11 @@ impl QueueEntry { let res = async move { let kp = &entry.keypair; - let payload_bytes = serde_json::to_vec(&entry.item)?; - let bytes = ecdh_encrypt(kp, Some(&recipient), payload_bytes)?; - - let message = PayloadBuilder::new(kp, bytes).build()?; + let message = PayloadBuilder::new(kp, entry.item) + .add_recipient(&recipient)? + .from_ipfs(&entry.ipfs) + .await?; let message_bytes = message.to_bytes()?;