From a9bbf8e4102dedd3f79c9b3121496671e637e9cd Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 23 Dec 2024 10:05:36 -0500 Subject: [PATCH] refactor: use PayloadMessage for handling identity and friends requests/response. --- extensions/warp-ipfs/src/store/identity.rs | 67 +++++++++++----------- extensions/warp-ipfs/src/store/queue.rs | 8 +-- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index e840e3b16..9c81a826a 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -41,7 +41,7 @@ use super::{ cache::IdentityCache, identity::IdentityDocument, image_dag::get_image, root::RootDocumentMap, ResolvedRootDocument, RootDocument, }, - ecdh_decrypt, ecdh_encrypt, + ecdh_encrypt, event_subscription::EventSubscription, payload::PayloadMessage, phonebook::PhoneBook, @@ -530,12 +530,18 @@ impl IdentityStore { tracing::info!("Received event from {in_did}"); - let event = match ecdh_decrypt(store.root_document().keypair(), Some(&in_did), &message.data).and_then(|bytes| { - serde_json::from_slice::(&bytes).map_err(Error::from) - }) { - Ok(e) => e, + let payload: PayloadMessage = match PayloadMessage::from_bytes(&message.data) { + Ok(p) => p, Err(e) => { - tracing::error!("Failed to decrypt payload from {in_did}: {e}"); + tracing::error!(error = %e, from = %in_did, "failed to process payload"); + continue; + } + }; + + let event = match payload.message(store.root_document().keypair()) { + Ok(event) => event, + Err(e) => { + tracing::error!("Failed to decrypt payload from {in_did}: {e}"); continue; } }; @@ -545,11 +551,9 @@ impl IdentityStore { if let Err(e) = store.process_message(&in_did, event, false).await { tracing::error!("Failed to process identity message from {in_did}: {e}"); } - - } Some(event) = friend_stream.next() => { - let payload = match PayloadMessage::>::from_bytes(&event.data) { + let payload = match PayloadMessage::::from_bytes(&event.data) { Ok(p) => p, Err(_e) => { continue; @@ -568,17 +572,8 @@ impl IdentityStore { tracing::info!("Received event from {did}"); - let msg = match payload.message(None) { + let data = match payload.message(store.root_document.keypair()) { Ok(m) => m, - Err(_) => { - continue; - } - }; - - let data = match ecdh_decrypt(store.root_document().keypair(), Some(&did), msg).and_then(|bytes| { - serde_json::from_slice::(&bytes).map_err(Error::from) - }) { - Ok(pl) => pl, Err(e) => { if let Some(tx) = signal { let _ = tx.send(Err(e)); @@ -920,9 +915,11 @@ impl IdentityStore { let event = IdentityEvent::Request { option }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -950,8 +947,6 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = self.root_document.keypair(); - let mut identity = self.own_identity_document().await?; let is_friend = self.is_friend(out_did).await.unwrap_or_default(); @@ -980,9 +975,11 @@ impl IdentityStore { option: ResponseOption::Identity { identity: payload }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(kp_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -1035,9 +1032,11 @@ impl IdentityStore { }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -1112,9 +1111,11 @@ impl IdentityStore { option: ResponseOption::Metadata { data: metadata }, }; - let payload_bytes = serde_json::to_vec(&event)?; + let payload = PayloadBuilder::new(pk_did, event.clone()) + .add_recipient(out_did)? + .await?; - let bytes = ecdh_encrypt(pk_did, Some(out_did), payload_bytes)?; + let bytes = payload.to_bytes()?; tracing::info!(to = %out_did, event = ?event, payload_size = bytes.len(), "Sending event"); @@ -2960,10 +2961,10 @@ impl IdentityStore { let kp = self.root_document.keypair(); - let payload_bytes = serde_json::to_vec(&payload)?; - - let bytes = ecdh_encrypt(kp, Some(recipient), payload_bytes)?; - let message = PayloadBuilder::new(kp, bytes).build()?; + let message = PayloadBuilder::new(kp, payload.clone()) + .add_recipient(recipient)? + .from_ipfs(&self.ipfs) + .await?; let message_bytes = message.to_bytes()?; diff --git a/extensions/warp-ipfs/src/store/queue.rs b/extensions/warp-ipfs/src/store/queue.rs index 8eb27ab0b..730775415 100644 --- a/extensions/warp-ipfs/src/store/queue.rs +++ b/extensions/warp-ipfs/src/store/queue.rs @@ -281,11 +281,11 @@ impl QueueEntry { let res = async move { let kp = &entry.keypair; - let payload_bytes = serde_json::to_vec(&entry.item)?; - let bytes = ecdh_encrypt(kp, Some(&recipient), payload_bytes)?; - - let message = PayloadBuilder::new(kp, bytes).build()?; + let message = PayloadBuilder::new(kp, entry.item) + .add_recipient(&recipient)? + .from_ipfs(&entry.ipfs) + .await?; let message_bytes = message.to_bytes()?;