From 3c308f1d6f14d20d3d7c9db9dcf2201b7e6f63bf Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Thu, 23 May 2024 12:58:45 -0400 Subject: [PATCH] refactor: Use keypair internally within warp-ipfs (#522) --- extensions/warp-ipfs/src/lib.rs | 39 +- .../warp-ipfs/src/store/conversation.rs | 107 ++--- extensions/warp-ipfs/src/store/discovery.rs | 4 +- extensions/warp-ipfs/src/store/document.rs | 51 +-- .../warp-ipfs/src/store/document/cache.rs | 24 +- .../warp-ipfs/src/store/document/files.rs | 6 +- .../warp-ipfs/src/store/document/identity.rs | 17 +- .../warp-ipfs/src/store/document/root.rs | 69 ++-- extensions/warp-ipfs/src/store/identity.rs | 136 +++---- extensions/warp-ipfs/src/store/keystore.rs | 37 +- extensions/warp-ipfs/src/store/message.rs | 384 +++++++++--------- extensions/warp-ipfs/src/store/mod.rs | 132 +++--- extensions/warp-ipfs/src/store/queue.rs | 69 +--- 13 files changed, 553 insertions(+), 522 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 010564b1b..229184068 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -1166,15 +1166,18 @@ impl MultiPassImportExport for WarpIpfs { } => { let keypair = warp::crypto::keypair::did_from_mnemonic(&passphrase, None)?; + let bytes = Zeroizing::new(keypair.private_key_bytes()); + let internal_keypair = rust_ipfs::Keypair::ed25519_from_bytes(bytes) + .map_err(|_| Error::PrivateKeyInvalid)?; + let bytes = fs::read(path).await?; - let decrypted_bundle = ecdh_decrypt(&keypair, None, bytes)?; + + let decrypted_bundle = ecdh_decrypt(&internal_keypair, None, bytes)?; let exported_document = serde_json::from_slice::(&decrypted_bundle)?; exported_document.verify()?; - let bytes = Zeroizing::new(keypair.private_key_bytes()); - warp::crypto::keypair::mnemonic_into_tesseract( &mut self.tesseract, &passphrase, @@ -1183,11 +1186,7 @@ impl MultiPassImportExport for WarpIpfs { false, )?; - self.init_ipfs( - rust_ipfs::Keypair::ed25519_from_bytes(bytes) - .map_err(|_| Error::PrivateKeyInvalid)?, - ) - .await?; + self.init_ipfs(internal_keypair).await?; let mut store = self.identity_store(false).await?; @@ -1199,16 +1198,19 @@ impl MultiPassImportExport for WarpIpfs { } => { let keypair = warp::crypto::keypair::did_from_mnemonic(&passphrase, None)?; + let bytes = Zeroizing::new(keypair.private_key_bytes()); + let internal_keypair = rust_ipfs::Keypair::ed25519_from_bytes(bytes) + .map_err(|_| Error::PrivateKeyInvalid)?; + let bytes = std::mem::take(buffer); - let decrypted_bundle = ecdh_decrypt(&keypair, None, bytes)?; + let decrypted_bundle = ecdh_decrypt(&internal_keypair, None, bytes)?; + let exported_document = serde_json::from_slice::(&decrypted_bundle)?; exported_document.verify()?; - let bytes = Zeroizing::new(keypair.private_key_bytes()); - warp::crypto::keypair::mnemonic_into_tesseract( &mut self.tesseract, &passphrase, @@ -1217,11 +1219,7 @@ impl MultiPassImportExport for WarpIpfs { false, )?; - self.init_ipfs( - rust_ipfs::Keypair::ed25519_from_bytes(bytes) - .map_err(|_| Error::PrivateKeyInvalid)?, - ) - .await?; + self.init_ipfs(internal_keypair).await?; let mut store = self.identity_store(false).await?; @@ -1233,6 +1231,9 @@ impl MultiPassImportExport for WarpIpfs { } => { let keypair = warp::crypto::keypair::did_from_mnemonic(&passphrase, None)?; let bytes = Zeroizing::new(keypair.private_key_bytes()); + let internal_keypair = rust_ipfs::Keypair::ed25519_from_bytes(bytes) + .map_err(|_| Error::PrivateKeyInvalid)?; + warp::crypto::keypair::mnemonic_into_tesseract( &mut self.tesseract, &passphrase, @@ -1241,11 +1242,7 @@ impl MultiPassImportExport for WarpIpfs { false, )?; - self.init_ipfs( - rust_ipfs::Keypair::ed25519_from_bytes(bytes) - .map_err(|_| Error::PrivateKeyInvalid)?, - ) - .await?; + self.init_ipfs(internal_keypair).await?; let mut store = self.identity_store(false).await?; diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index 2806720ed..c0c633616 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -6,19 +6,16 @@ use futures::{ StreamExt, TryFutureExt, }; use libipld::Cid; -use rust_ipfs::{Ipfs, IpfsPath}; +use rust_ipfs::{Ipfs, IpfsPath, Keypair}; use serde::{Deserialize, Deserializer, Serialize}; -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use std::{ collections::{BTreeSet, HashMap}, time::Duration, }; use uuid::Uuid; use warp::{ - crypto::{ - cipher::Cipher, did_key::CoreSign, hash::sha256_iter, DIDKey, Ed25519KeyPair, KeyMaterial, - DID, - }, + crypto::{cipher::Cipher, hash::sha256_iter, DIDKey, Ed25519KeyPair, KeyMaterial, DID}, error::Error, raygun::{ Conversation, ConversationSettings, ConversationType, DirectConversationSettings, @@ -27,11 +24,11 @@ use warp::{ }, }; -use crate::store::{ecdh_encrypt, ecdh_encrypt_with_nonce}; +use crate::store::{ecdh_encrypt, ecdh_encrypt_with_nonce, DidExt}; use super::{ document::FileAttachmentDocument, ecdh_decrypt, keystore::Keystore, verify_serde_sig, - MAX_ATTACHMENT, MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, + PeerIdExt, MAX_ATTACHMENT, MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, }; #[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -138,7 +135,7 @@ impl ConversationDocument { impl ConversationDocument { #[allow(clippy::too_many_arguments)] pub fn new( - did: &DID, + keypair: &Keypair, name: Option, mut recipients: Vec, restrict: Vec, @@ -149,9 +146,10 @@ impl ConversationDocument { creator: Option, signature: Option, ) -> Result { + let did = keypair.to_did()?; let id = id.unwrap_or_else(Uuid::new_v4); - if !recipients.contains(did) { + if !recipients.contains(&did) { recipients.push(did.clone()); } @@ -186,8 +184,8 @@ impl ConversationDocument { } if let Some(creator) = document.creator.as_ref() { - if creator.eq(did) { - document.sign(did)?; + if creator.eq(&did) { + document.sign(keypair)?; } } @@ -195,12 +193,13 @@ impl ConversationDocument { } pub fn new_direct( - did: &DID, + keypair: &Keypair, recipients: [DID; 2], settings: DirectConversationSettings, ) -> Result { + let did = keypair.to_did()?; let conversation_id = Some(super::generate_shared_topic( - did, + keypair, recipients .iter() .filter(|peer| did.ne(peer)) @@ -211,7 +210,7 @@ impl ConversationDocument { )?); Self::new( - did, + keypair, None, recipients.to_vec(), vec![], @@ -225,15 +224,16 @@ impl ConversationDocument { } pub fn new_group( - did: &DID, + keypair: &Keypair, name: Option, recipients: impl IntoIterator, restrict: &[DID], settings: GroupSettings, ) -> Result { let conversation_id = Some(Uuid::new_v4()); + let creator = Some(keypair.to_did()?); Self::new( - did, + keypair, name, recipients.into_iter().collect(), restrict.to_vec(), @@ -241,21 +241,22 @@ impl ConversationDocument { ConversationSettings::Group(settings), None, None, - Some(did.clone()), + creator, None, ) } } impl ConversationDocument { - pub fn sign(&mut self, did: &DID) -> Result<(), Error> { + pub fn sign(&mut self, keypair: &Keypair) -> Result<(), Error> { if let ConversationSettings::Group(settings) = self.settings { assert_eq!(self.conversation_type(), ConversationType::Group); + let did = keypair.to_did()?; let Some(creator) = self.creator.clone() else { return Err(Error::PublicKeyInvalid); }; - if !settings.members_can_add_participants() && !creator.eq(did) { + if !settings.members_can_add_participants() && !creator.eq(&did) { return Err(Error::PublicKeyInvalid); } @@ -283,7 +284,7 @@ impl ConversationDocument { None, ); - let signature = did.sign(&construct); + let signature = keypair.sign(&construct).expect("not RSA"); self.signature = Some(bs58::encode(signature).into_string()); } Ok(()) @@ -296,6 +297,8 @@ impl ConversationDocument { return Err(Error::PublicKeyInvalid); }; + let creator_pk = creator.to_public_key()?; + let Some(signature) = &self.signature else { return Err(Error::InvalidSignature); }; @@ -335,9 +338,9 @@ impl ConversationDocument { ), }; - creator - .verify(&construct, &signature) - .map_err(|e| anyhow::anyhow!("{:?}", e))?; + if !creator_pk.verify(&construct, &signature) { + return Err(Error::InvalidSignature); + } } Ok(()) } @@ -408,12 +411,12 @@ impl ConversationDocument { pub async fn get_messages( &self, ipfs: &Ipfs, - did: Arc, + keypair: &Keypair, option: MessageOptions, keystore: Either, ) -> Result, Error> { let list = self - .get_messages_stream(ipfs, did, option, keystore) + .get_messages_stream(ipfs, keypair, option, keystore) .await? .collect::>() .await; @@ -482,7 +485,7 @@ impl ConversationDocument { pub async fn get_messages_stream<'a>( &self, ipfs: &Ipfs, - did: Arc, + keypair: &Keypair, option: MessageOptions, keystore: Either, ) -> Result, Error> { @@ -502,7 +505,7 @@ impl ConversationDocument { let message = messages .first() .ok_or(Error::MessageNotFound)? - .resolve(ipfs, &did, true, keystore.as_ref()) + .resolve(ipfs, keypair, true, keystore.as_ref()) .await?; return Ok(stream::once(async { message }).boxed()); } @@ -511,12 +514,13 @@ impl ConversationDocument { let message = messages .last() .ok_or(Error::MessageNotFound)? - .resolve(ipfs, &did, true, keystore.as_ref()) + .resolve(ipfs, keypair, true, keystore.as_ref()) .await?; return Ok(stream::once(async { message }).boxed()); } let keystore = keystore.clone(); let ipfs = ipfs.clone(); + let keypair = keypair.clone(); let stream = async_stream::stream! { let mut remaining = option.limit(); for (index, document) in messages.iter().enumerate() { @@ -538,7 +542,7 @@ impl ConversationDocument { continue; } - if let Ok(message) = document.resolve(&ipfs, &did, true, keystore.as_ref()).await { + if let Ok(message) = document.resolve(&ipfs, &keypair, true, keystore.as_ref()).await { let should_yield = if let Some(keyword) = option.keyword() { message .lines() @@ -563,7 +567,7 @@ impl ConversationDocument { pub async fn get_messages_pages( &self, ipfs: &Ipfs, - did: &DID, + did: &Keypair, option: MessageOptions, keystore: Either<&DID, &Keystore>, ) -> Result { @@ -647,12 +651,12 @@ impl ConversationDocument { pub async fn get_message( &self, ipfs: &Ipfs, - did: &DID, + keypair: &Keypair, message_id: Uuid, keystore: Either<&DID, &Keystore>, ) -> Result { self.get_message_document(ipfs, message_id) - .and_then(|doc| async move { doc.resolve(ipfs, did, true, keystore).await }) + .and_then(|doc| async move { doc.resolve(ipfs, keypair, true, keystore).await }) .await } @@ -744,7 +748,7 @@ impl Ord for MessageDocument { impl MessageDocument { pub async fn new( ipfs: &Ipfs, - keypair: &DID, + keypair: &Keypair, message: Message, key: Either<&DID, &Keystore>, ) -> Result { @@ -832,6 +836,11 @@ impl MessageDocument { }; let sender = self.sender.to_did(); + let Ok(sender_pk) = sender.to_public_key() else { + // Note: Although unlikely, we will return false instead of refactoring this function to return an error + // since an invalid public key also signals a invalid message. + return false; + }; let hash = sha256_iter( [ Some(self.conversation_id.as_bytes().to_vec()), @@ -847,7 +856,7 @@ impl MessageDocument { None, ); - sender.verify(&hash, signature.as_ref()).is_ok() + sender_pk.verify(&hash, signature.as_ref()) } pub async fn raw_encrypted_message(&self, ipfs: &Ipfs) -> Result, Error> { @@ -881,14 +890,15 @@ impl MessageDocument { pub async fn update( &mut self, ipfs: &Ipfs, - did: &DID, + keypair: &Keypair, message: Message, signature: Option>, key: Either<&DID, &Keystore>, nonce: Option<&[u8]>, ) -> Result<(), Error> { + let did = &keypair.to_did()?; tracing::info!(id = %self.conversation_id, message_id = %self.id, "Updating message"); - let old_message = self.resolve(ipfs, did, true, key).await?; + let old_message = self.resolve(ipfs, keypair, true, key).await?; let sender = self.sender.to_did(); @@ -943,17 +953,17 @@ impl MessageDocument { let data = match (key, nonce) { (Either::Right(keystore), Some(nonce)) => { - let key = keystore.get_latest(did, &sender)?; + let key = keystore.get_latest(keypair, &sender)?; Cipher::direct_encrypt_with_nonce(&bytes, &key, nonce)? } (Either::Left(key), Some(nonce)) => { - ecdh_encrypt_with_nonce(did, Some(key), &bytes, nonce)? + ecdh_encrypt_with_nonce(keypair, Some(key), &bytes, nonce)? } (Either::Right(keystore), None) => { - let key = keystore.get_latest(did, &sender)?; + let key = keystore.get_latest(keypair, &sender)?; Cipher::direct_encrypt(&bytes, &key)? } - (Either::Left(key), None) => ecdh_encrypt(did, Some(key), &bytes)?, + (Either::Left(key), None) => ecdh_encrypt(keypair, Some(key), &bytes)?, }; let message = ipfs.dag().put().serialize(data).await?; @@ -962,7 +972,7 @@ impl MessageDocument { match (sender.eq(did), signature) { (true, None) => { - *self = self.sign(did)?; + *self = self.sign(keypair)?; } (false, None) | (true, Some(_)) => return Err(Error::InvalidMessage), (false, Some(sig)) => { @@ -982,7 +992,7 @@ impl MessageDocument { pub async fn resolve( &self, ipfs: &Ipfs, - did: &DID, + keypair: &Keypair, local: bool, key: Either<&DID, &Keystore>, ) -> Result { @@ -1054,8 +1064,8 @@ impl MessageDocument { let sender = self.sender.to_did(); let data = match key { - Either::Left(exchange) => ecdh_decrypt(did, Some(exchange), &bytes)?, - Either::Right(keystore) => keystore.try_decrypt(did, &sender, &bytes)?, + Either::Left(exchange) => ecdh_decrypt(keypair, Some(exchange), &bytes)?, + Either::Right(keystore) => keystore.try_decrypt(keypair, &sender, &bytes)?, }; let lines: Vec = serde_json::from_slice(&data)?; @@ -1081,9 +1091,10 @@ impl MessageDocument { Ok(message) } - fn sign(mut self, keypair: &DID) -> Result { + fn sign(mut self, keypair: &Keypair) -> Result { + let did = &keypair.to_did()?; let sender = self.sender.to_did(); - if !sender.eq(keypair) { + if !sender.eq(did) { return Err(Error::PublicKeyInvalid); } @@ -1102,7 +1113,7 @@ impl MessageDocument { None, ); - let signature = keypair.sign(&hash); + let signature = keypair.sign(&hash).expect("not RSA"); self.signature = Some(MessageSignature::try_from(signature)?); Ok(self) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 80dcc9006..986570c5b 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -17,7 +17,7 @@ use warp::{crypto::DID, error::Error}; use crate::config::{Discovery as DiscoveryConfig, DiscoveryType}; -use super::{did_to_libp2p_pub, DidExt, PeerIdExt, PeerType}; +use super::{DidExt, PeerIdExt, PeerType}; //TODO: Deprecate for separate discovery service #[derive(Clone)] @@ -271,7 +271,7 @@ impl Discovery { pub async fn get>(&self, peer_type: P) -> Result { let peer_id = match &peer_type.into() { PeerType::PeerId(peer_id) => *peer_id, - PeerType::DID(did_key) => did_to_libp2p_pub(did_key).map(|pk| pk.to_peer_id())?, + PeerType::DID(did_key) => did_key.to_peer_id()?, }; if !self.contains(peer_id).await { diff --git a/extensions/warp-ipfs/src/store/document.rs b/extensions/warp-ipfs/src/store/document.rs index 67826ee64..ca7eea0a4 100644 --- a/extensions/warp-ipfs/src/store/document.rs +++ b/extensions/warp-ipfs/src/store/document.rs @@ -1,9 +1,3 @@ -pub mod cache; -pub mod files; -pub mod identity; -pub mod image_dag; -pub mod root; - use std::{collections::BTreeMap, path::Path, str::FromStr, time::Duration}; use chrono::{DateTime, Utc}; @@ -23,14 +17,11 @@ use warp::{ file::{File, FileType}, Progression, }, - crypto::{did_key::CoreSign, DID}, error::Error, multipass::identity::{Identity, IdentityStatus}, }; -use crate::store::get_keypair_did; - -use super::keystore::Keystore; +use super::{keystore::Keystore, DidExt}; use self::{ files::{DirectoryDocument, FileDocument}, @@ -38,6 +29,12 @@ use self::{ image_dag::ImageDag, }; +pub mod cache; +pub mod files; +pub mod identity; +pub mod image_dag; +pub mod root; + #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct ResolvedRootDocument { pub identity: Identity, @@ -58,10 +55,12 @@ impl ResolvedRootDocument { let mut doc = self.clone(); let signature = doc.signature.take().ok_or(Error::InvalidSignature)?; let bytes = serde_json::to_vec(&doc)?; - self.identity - .did_key() - .verify(&bytes, &signature) - .map_err(|_| Error::InvalidSignature)?; + let identity_public_key = self.identity.did_key().to_public_key()?; + + if !identity_public_key.verify(&bytes, &signature) { + return Err(Error::InvalidSignature); + } + Ok(()) } } @@ -106,15 +105,15 @@ pub struct RootDocument { } impl RootDocument { - #[tracing::instrument(skip(self, did))] - pub fn sign(mut self, did: &DID) -> Result { + #[tracing::instrument(skip(self, keypair))] + pub fn sign(mut self, keypair: &Keypair) -> Result { //In case there is a signature already exist self.signature = None; self.modified = Utc::now(); let bytes = serde_json::to_vec(&self)?; - let signature = did.sign(&bytes); + let signature = keypair.sign(&bytes).expect("not RSA"); self.signature = Some(bs58::encode(signature).into_string()); Ok(self) } @@ -128,16 +127,18 @@ impl RootDocument { .await .map_err(|_| Error::IdentityInvalid)?; + let identity_public_key = identity.did.to_public_key()?; + let mut root_document = self.clone(); let signature = std::mem::take(&mut root_document.signature).ok_or(Error::InvalidSignature)?; let bytes = serde_json::to_vec(&root_document)?; let sig = bs58::decode(&signature).into_vec()?; - identity - .did - .verify(&bytes, &sig) - .map_err(|_| Error::InvalidSignature)?; + if !identity_public_key.verify(&bytes, &sig) { + return Err(Error::InvalidSignature); + } + Ok(()) } @@ -249,7 +250,7 @@ impl RootDocument { let bytes = serde_json::to_vec(&exported)?; let kp = keypair.unwrap_or_else(|| ipfs.keypair()); - let signature = kp.sign(&bytes).map_err(anyhow::Error::from)?; + let signature = kp.sign(&bytes).expect("not RSA key"); exported.signature = Some(signature); Ok(exported) @@ -320,15 +321,15 @@ impl RootDocument { self.verify(ipfs).await } + // TODO: Include optional keypair to represent the actual keypair pub async fn import(ipfs: &Ipfs, data: ResolvedRootDocument) -> Result { data.verify()?; let keypair = ipfs.keypair(); - let did_kp = get_keypair_did(keypair)?; let document: IdentityDocument = data.identity.into(); - let document = document.sign(&did_kp)?; + let document = document.sign(keypair)?; let identity = ipfs.dag().put().serialize(document).await?; @@ -392,7 +393,7 @@ impl RootDocument { root_document.file_index = cid; } - let root_document = root_document.sign(&did_kp)?; + let root_document = root_document.sign(keypair)?; Ok(root_document) } diff --git a/extensions/warp-ipfs/src/store/document/cache.rs b/extensions/warp-ipfs/src/store/document/cache.rs index bcc647868..5d17322f3 100644 --- a/extensions/warp-ipfs/src/store/document/cache.rs +++ b/extensions/warp-ipfs/src/store/document/cache.rs @@ -250,7 +250,7 @@ mod test { use chrono::Utc; use futures::StreamExt; - use rust_ipfs::UninitializedIpfsNoop; + use rust_ipfs::{Keypair, UninitializedIpfsNoop}; use warp::{ crypto::{ rand::{self, seq::SliceRandom}, @@ -259,10 +259,14 @@ mod test { multipass::identity::SHORT_ID_SIZE, }; - use crate::store::document::{cache::IdentityCache, identity::IdentityDocument}; + use crate::store::{ + document::{cache::IdentityCache, identity::IdentityDocument}, + PeerIdExt, + }; - fn random_document() -> (DID, IdentityDocument) { - let did_key = DID::default(); + fn random_document() -> (Keypair, DID, IdentityDocument) { + let keypair = Keypair::generate_ed25519(); + let did_key = keypair.to_did().expect("valid keypair"); let fingerprint = did_key.fingerprint(); let bytes = fingerprint.as_bytes(); let time = Utc::now(); @@ -281,11 +285,11 @@ mod test { signature: None, }; - let document = document.sign(&did_key).expect("valid"); + let document = document.sign(&keypair).expect("valid"); document.verify().expect("valid"); - (did_key, document) + (keypair, did_key, document) } async fn pregenerated_cache() -> IdentityCache { @@ -297,7 +301,7 @@ mod test { let cache = IdentityCache::new(&ipfs).await; for _ in 0..N { - let (_, document) = random_document(); + let (_, _, document) = random_document(); cache.insert(&document).await.expect("inserted"); } @@ -308,7 +312,7 @@ mod test { async fn new_identity_cache() -> anyhow::Result<()> { let cache = pregenerated_cache::<0>().await; - let (_, document) = random_document(); + let (_, _, document) = random_document(); cache.insert(&document).await?; @@ -323,7 +327,7 @@ mod test { async fn update_existing_identity_cache() -> anyhow::Result<()> { let cache = pregenerated_cache::<0>().await; - let (did_key, mut document) = random_document(); + let (keypair, _, mut document) = random_document(); let old_doc = document.clone(); @@ -331,7 +335,7 @@ mod test { document.username = String::from("NewName"); - let document = document.sign(&did_key).expect("valid"); + let document = document.sign(&keypair).expect("valid"); let old_document = cache .insert(&document) diff --git a/extensions/warp-ipfs/src/store/document/files.rs b/extensions/warp-ipfs/src/store/document/files.rs index 09add455e..c62c24639 100644 --- a/extensions/warp-ipfs/src/store/document/files.rs +++ b/extensions/warp-ipfs/src/store/document/files.rs @@ -302,7 +302,6 @@ impl FileDocument { #[cfg(test)] mod test { - use std::sync::Arc; use rust_ipfs::{Ipfs, UninitializedIpfsNoop}; use tracing::Span; @@ -313,16 +312,13 @@ mod test { use super::DirectoryDocument; use crate::config::Config; use crate::store::document::root::RootDocumentMap; - use crate::store::get_keypair_did; use crate::store::{event_subscription::EventSubscription, files::FileStore}; async fn file_store( ipfs: &Ipfs, event: &EventSubscription, ) -> Result { - let key = get_keypair_did(ipfs.keypair())?; - - let root_document = RootDocumentMap::new(ipfs, Arc::new(key)).await; + let root_document = RootDocumentMap::new(ipfs, None).await; let store = FileStore::new( ipfs.clone(), root_document, diff --git a/extensions/warp-ipfs/src/store/document/identity.rs b/extensions/warp-ipfs/src/store/document/identity.rs index 5a8616757..28ebaff66 100644 --- a/extensions/warp-ipfs/src/store/document/identity.rs +++ b/extensions/warp-ipfs/src/store/document/identity.rs @@ -1,14 +1,15 @@ use chrono::{DateTime, Utc}; use libipld::Cid; +use rust_ipfs::Keypair; use serde::{Deserialize, Serialize}; use std::hash::Hash; use warp::{ - crypto::{did_key::CoreSign, Fingerprint, DID}, + crypto::{Fingerprint, DID}, error::Error, multipass::identity::{Identity, IdentityStatus, Platform, SHORT_ID_SIZE}, }; -use crate::store::{MAX_STATUS_LENGTH, MAX_USERNAME_LENGTH, MIN_USERNAME_LENGTH}; +use crate::store::{DidExt, MAX_STATUS_LENGTH, MAX_USERNAME_LENGTH, MIN_USERNAME_LENGTH}; #[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] @@ -208,7 +209,7 @@ impl IdentityDocument { Ok(self.into()) } - pub fn sign(mut self, did: &DID) -> Result { + pub fn sign(mut self, keypair: &Keypair) -> Result { let metadata = self.metadata; //We blank out the metadata since it will not be used as apart of the @@ -219,7 +220,7 @@ impl IdentityDocument { self.modified = Utc::now(); let bytes = serde_json::to_vec(&self)?; - let signature = bs58::encode(did.sign(&bytes)).into_string(); + let signature = bs58::encode(keypair.sign(&bytes).expect("not RSA")).into_string(); self.metadata = metadata; self.signature = Some(signature); Ok(self) @@ -273,9 +274,11 @@ impl IdentityDocument { let signature = std::mem::take(&mut payload.signature).ok_or(Error::InvalidSignature)?; let signature_bytes = bs58::decode(signature).into_vec()?; let bytes = serde_json::to_vec(&payload)?; - self.did - .verify(&bytes, &signature_bytes) - .map_err(|_| Error::InvalidSignature)?; + let pk = self.did.to_public_key()?; + if !pk.verify(&bytes, &signature_bytes) { + return Err(Error::InvalidSignature); + } + Ok(()) } } diff --git a/extensions/warp-ipfs/src/store/document/root.rs b/extensions/warp-ipfs/src/store/document/root.rs index 3ae0c8b46..8a76a1fe9 100644 --- a/extensions/warp-ipfs/src/store/document/root.rs +++ b/extensions/warp-ipfs/src/store/document/root.rs @@ -6,9 +6,10 @@ use futures::{ StreamExt, }; use libipld::Cid; -use rust_ipfs::{Ipfs, IpfsPath}; +use rust_ipfs::{Ipfs, IpfsPath, Keypair}; use tokio::sync::RwLock; use uuid::Uuid; + use warp::{ constellation::directory::Directory, crypto::DID, error::Error, multipass::identity::IdentityStatus, @@ -25,11 +26,13 @@ use super::{ #[derive(Debug, Clone)] pub struct RootDocumentMap { + ipfs: Ipfs, + keypair: Option, inner: Arc>, } impl RootDocumentMap { - pub async fn new(ipfs: &Ipfs, keypair: Arc) -> Self { + pub async fn new(ipfs: &Ipfs, keypair: Option) -> Self { let key = ipfs.root(); let cid = ipfs @@ -43,13 +46,15 @@ impl RootDocumentMap { let mut inner = RootDocumentInner { ipfs: ipfs.clone(), - keypair, + keypair: keypair.clone(), cid, }; inner.migrate().await; Self { + ipfs: ipfs.clone(), + keypair, inner: Arc::new(RwLock::new(inner)), } } @@ -209,16 +214,23 @@ impl RootDocumentMap { let inner = &mut *self.inner.write().await; inner.set_root_index(root).await } + + pub fn keypair(&self) -> &Keypair { + self.keypair.as_ref().unwrap_or(self.ipfs.keypair()) + } } #[derive(Debug)] struct RootDocumentInner { - keypair: Arc, + keypair: Option, ipfs: Ipfs, cid: Option, } impl RootDocumentInner { + fn keypair(&self) -> &Keypair { + self.keypair.as_ref().unwrap_or(self.ipfs.keypair()) + } async fn migrate(&mut self) { let mut root = match self.get_root_document().await { Ok(r) => r, @@ -304,7 +316,7 @@ impl RootDocumentInner { document: RootDocument, local: bool, ) -> Result<(), Error> { - let document = document.sign(&self.keypair)?; + let document = document.sign(self.keypair())?; //Precautionary check document.verify(&self.ipfs).await?; @@ -348,7 +360,7 @@ impl RootDocumentInner { let mut root = self.get_root_document().await?; let mut identity = self.identity().await?; identity.metadata.status = Some(status); - let identity = identity.sign(&self.keypair)?; + let identity = identity.sign(self.keypair())?; root.identity = self.ipfs.dag().put().serialize(identity).await?; self.set_root_document(root).await @@ -367,7 +379,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(); @@ -385,7 +397,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -398,7 +410,7 @@ impl RootDocumentInner { document.request = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -419,7 +431,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -432,7 +444,7 @@ impl RootDocumentInner { document.request = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -455,7 +467,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(); @@ -473,7 +485,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -486,7 +498,7 @@ impl RootDocumentInner { document.friends = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -538,7 +550,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -551,7 +563,7 @@ impl RootDocumentInner { document.friends = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -575,7 +587,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(); @@ -605,7 +617,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -618,7 +630,7 @@ impl RootDocumentInner { document.blocks = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -640,7 +652,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -653,7 +665,7 @@ impl RootDocumentInner { document.blocks = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -677,7 +689,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(); @@ -695,7 +707,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -708,7 +720,7 @@ impl RootDocumentInner { document.block_by = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -730,7 +742,7 @@ impl RootDocumentInner { .deserialized::>() .await .and_then(|bytes| { - let bytes = ecdh_decrypt(&self.keypair, None, bytes)?; + let bytes = ecdh_decrypt(self.keypair(), None, bytes)?; serde_json::from_slice(&bytes).map_err(anyhow::Error::from) }) .unwrap_or_default(), @@ -743,7 +755,7 @@ impl RootDocumentInner { document.block_by = match !list.is_empty() { true => { - let bytes = ecdh_encrypt(&self.keypair, None, serde_json::to_vec(&list)?)?; + let bytes = ecdh_encrypt(self.keypair(), None, serde_json::to_vec(&list)?)?; Some(self.ipfs.dag().put().serialize(bytes).await?) } false => None, @@ -897,14 +909,15 @@ impl RootDocumentInner { async fn export(&self) -> Result { let document = self.get_root_document().await?; - document.resolve(&self.ipfs, None).await + document.resolve(&self.ipfs, self.keypair.as_ref()).await } async fn export_bytes(&self) -> Result, Error> { let export = self.export().await?; let bytes = serde_json::to_vec(&export)?; - ecdh_encrypt(&self.keypair, None, bytes) + + ecdh_encrypt(self.keypair(), None, bytes) } async fn set_root_cid(&mut self, cid: Cid) -> Result<(), Error> { diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 9cacff26a..c97ca244e 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -13,7 +13,8 @@ use futures::{ }; use futures_timeout::TimeoutExt; use futures_timer::Delay; -use ipfs::{p2p::MultiaddrExt, Ipfs, Keypair}; +use ipfs::Keypair; +use ipfs::{p2p::MultiaddrExt, Ipfs}; use libipld::Cid; use rust_ipfs as ipfs; use serde::{Deserialize, Serialize}; @@ -24,11 +25,10 @@ use web_time::Instant; use shuttle::identity::{RequestEvent, RequestPayload}; use warp::{ constellation::file::FileType, - crypto::{did_key::CoreSign, zeroize::Zeroizing}, multipass::identity::{IdentityImage, Platform}, }; use warp::{ - crypto::{did_key::Generate, DIDKey, Ed25519KeyPair, Fingerprint, DID}, + crypto::{DIDKey, Ed25519KeyPair, Fingerprint, DID}, error::Error, multipass::{ identity::{Identity, IdentityStatus, SHORT_ID_SIZE}, @@ -39,11 +39,11 @@ use warp::{ use crate::{ config::{self, Discovery as DiscoveryConfig}, - store::{did_to_libp2p_pub, discovery::Discovery, topics::PeerTopic, DidExt, PeerIdExt}, + store::{discovery::Discovery, topics::PeerTopic, DidExt, PeerIdExt}, }; use super::{ - connected_to_peer, did_keypair, + connected_to_peer, document::{ cache::IdentityCache, identity::IdentityDocument, image_dag::get_image, root::RootDocumentMap, ResolvedRootDocument, RootDocument, @@ -68,8 +68,8 @@ pub struct IdentityStore { identity_cache: IdentityCache, - // keypair - did_key: Arc, + // public key representation in did format + did_key: DID, // Queue to handle sending friend request queue: Queue, @@ -251,26 +251,30 @@ pub enum RequestResponsePayloadVersion { } impl RequestResponsePayload { - pub fn new(keypair: &DID, event: Event) -> Result { + pub fn new(keypair: &Keypair, event: Event) -> Result { let request = Self::new_unsigned(keypair, event); request.sign(keypair) } - pub fn new_unsigned(keypair: &DID, event: Event) -> Self { + pub fn new_unsigned(keypair: &Keypair, event: Event) -> Self { + // Note: We can expect here because: + // - Any invalid keypair would have already triggered an error a head of time + // - We dont accept any non-ed25519 keypairs at this time + let sender = keypair.to_did().expect("valid ed25519"); Self { version: RequestResponsePayloadVersion::V1, - sender: keypair.clone(), + sender, event, created: None, signature: None, } } - pub fn sign(mut self, keypair: &DID) -> Result { + pub fn sign(mut self, keypair: &Keypair) -> Result { self.signature = None; self.created = Some(Utc::now()); let bytes = serde_json::to_vec(&self)?; - let signature = keypair.sign(&bytes); + let signature = keypair.sign(&bytes).expect("not RSA"); self.signature = Some(signature); Ok(self) } @@ -279,9 +283,13 @@ impl RequestResponsePayload { let mut doc = self.clone(); let signature = doc.signature.take().ok_or(Error::InvalidSignature)?; let bytes = serde_json::to_vec(&doc)?; - doc.sender - .verify(&bytes, &signature) - .map_err(|_| Error::InvalidSignature) + let sender_pk = doc.sender.to_public_key()?; + + if !sender_pk.verify(&bytes, &signature) { + return Err(Error::InvalidSignature); + } + + Ok(()) } } @@ -373,11 +381,14 @@ impl IdentityStore { let event = tx.clone(); - let did_key = Arc::new(did_keypair(&tesseract)?); + let root_document = RootDocumentMap::new(&ipfs, None).await; - let root_document = RootDocumentMap::new(&ipfs, did_key.clone()).await; + let did_key = root_document + .keypair() + .to_did() + .expect("valid ed25519 keypair"); - let queue = Queue::new(ipfs.clone(), did_key.clone(), discovery.clone()); + let queue = Queue::new(ipfs.clone(), &root_document, discovery.clone()); let signal = Default::default(); @@ -412,7 +423,7 @@ impl IdentityStore { } false => { if let Err(e) = store.register().await { - tracing::warn!(did = %store.did_key, error = %e, "Unable to register identity"); + tracing::warn!(did = %ident.did_key(), error = %e, "Unable to register identity"); } } } @@ -549,7 +560,7 @@ impl IdentityStore { tracing::info!("Received event from {in_did}"); - let event = match ecdh_decrypt(&store.did_key, Some(&in_did), &message.data).and_then(|bytes| { + let event = match ecdh_decrypt(store.root_document().keypair(), Some(&in_did), &message.data).and_then(|bytes| { serde_json::from_slice::(&bytes).map_err(Error::from) }) { Ok(e) => e, @@ -586,7 +597,7 @@ impl IdentityStore { tracing::info!("Received event from {did}"); - let data = match ecdh_decrypt(&store.did_key, Some(&did), &event.data).and_then(|bytes| { + let data = match ecdh_decrypt(store.root_document().keypair(), Some(&did), &event.data).and_then(|bytes| { serde_json::from_slice::(&bytes).map_err(Error::from) }) { Ok(pl) => pl, @@ -637,8 +648,7 @@ impl IdentityStore { &self.phonebook } - /// did key with private key embedded - pub(crate) fn did_key(&self) -> Arc { + pub fn did_key(&self) -> DID { self.did_key.clone() } @@ -684,7 +694,7 @@ impl IdentityStore { // If it is, skip the request so we dont wait resources storing it. if self.is_blocked(&data.sender).await? && !matches!(data.event, Event::Block) { tracing::warn!("Received event from a blocked identity."); - let payload = RequestResponsePayload::new(&self.did_key, Event::Block)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Block)?; return self .broadcast_request(&data.sender, &payload, false, true) @@ -718,7 +728,8 @@ impl IdentityStore { if self.is_friend(&data.sender).await? { tracing::debug!("Friend already exist. Remitting event"); - let payload = RequestResponsePayload::new(&self.did_key, Event::Accept)?; + let payload = + RequestResponsePayload::new(self.root_document.keypair(), Event::Accept)?; return self .broadcast_request(&data.sender, &payload, false, false) @@ -759,7 +770,8 @@ impl IdentityStore { .await; } - let payload = RequestResponsePayload::new(&self.did_key, Event::Response)?; + let payload = + RequestResponsePayload::new(self.root_document.keypair(), Event::Response)?; self.broadcast_request(&data.sender, &payload, false, false) .await?; @@ -916,7 +928,7 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = &*self.did_key; + let pk_did = self.root_document.keypair(); let event = IdentityEvent::Request { option }; @@ -950,7 +962,7 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = &*self.did_key; + let pk_did = self.root_document.keypair(); let mut identity = self.own_identity_document().await?; @@ -989,9 +1001,9 @@ impl IdentityStore { identity.metadata = metadata; } - let kp_did = self.did_key(); + let kp_did = self.root_document.keypair(); - let payload = identity.sign(&kp_did)?; + let payload = identity.sign(kp_did)?; let event = IdentityEvent::Receive { option: ResponseOption::Identity { identity: payload }, @@ -1027,7 +1039,7 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = &*self.did_key; + let pk_did = self.root_document.keypair(); let identity = self.own_identity_document().await?; @@ -1082,7 +1094,7 @@ impl IdentityStore { return Err(Error::IdentityDoesntExist); } - let pk_did = &*self.did_key; + let pk_did = self.root_document.keypair(); let identity = self.own_identity_document().await?; @@ -1583,8 +1595,7 @@ impl IdentityStore { signature: None, }; - let did_kp = self.did_key(); - let identity = identity.sign(&did_kp)?; + let identity = identity.sign(self.root_document.keypair())?; let ident_cid = self.ipfs.dag().put().serialize(identity).await?; @@ -2047,9 +2058,9 @@ impl IdentityStore { } pub async fn identity_update(&mut self, identity: IdentityDocument) -> Result<(), Error> { - let kp = self.did_key(); + let kp = self.root_document.keypair(); - let identity = identity.sign(&kp)?; + let identity = identity.sign(kp)?; tracing::debug!("Updating document"); let mut root_document = self.root_document.get().await?; @@ -2149,27 +2160,10 @@ impl IdentityStore { .ok_or(Error::IdentityDoesntExist) } - pub fn get_keypair(&self) -> anyhow::Result { - match self.tesseract.retrieve("keypair") { - Ok(keypair) => { - let kp = bs58::decode(keypair).into_vec()?; - let id_kp = warp::crypto::ed25519_dalek::Keypair::from_bytes(&kp)?; - let bytes = Zeroizing::new(id_kp.secret.to_bytes()); - Ok(Keypair::ed25519_from_bytes(bytes)?) - } - Err(_) => anyhow::bail!(Error::PrivateKeyInvalid), - } - } - - pub fn get_keypair_did(&self) -> anyhow::Result { - let kp = Zeroizing::new(self.get_raw_keypair()?.to_bytes()); - let kp = warp::crypto::ed25519_dalek::Keypair::from_bytes(&*kp)?; - let did = DIDKey::Ed25519(Ed25519KeyPair::from_secret_key(kp.secret.as_bytes())); - Ok(did.into()) - } - pub fn get_raw_keypair(&self) -> anyhow::Result { - self.get_keypair()? + self.root_document + .keypair() + .clone() .try_into_ed25519() .map_err(anyhow::Error::from) } @@ -2312,7 +2306,7 @@ impl IdentityStore { impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn send_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let local_public_key = (*self.did_key).clone(); + let local_public_key = self.did_key.clone(); if local_public_key.eq(pubkey) { return Err(Error::CannotSendSelfFriendRequest); @@ -2344,14 +2338,14 @@ impl IdentityStore { return Err(Error::FriendRequestExist); } - let payload = RequestResponsePayload::new(&self.did_key, Event::Request)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Request)?; self.broadcast_request(pubkey, &payload, true, true).await } #[tracing::instrument(skip(self))] pub async fn accept_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let local_public_key = (*self.did_key).clone(); + let local_public_key = self.did_key.clone(); if local_public_key.eq(pubkey) { return Err(Error::CannotAcceptSelfAsFriend); @@ -2378,7 +2372,7 @@ impl IdentityStore { return Ok(()); } - let payload = RequestResponsePayload::new(&self.did_key, Event::Accept)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Accept)?; self.root_document.remove_request(internal_request).await?; self.add_friend(pubkey).await?; @@ -2388,7 +2382,7 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn reject_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let local_public_key = (*self.did_key).clone(); + let local_public_key = self.did_key.clone(); if local_public_key.eq(pubkey) { return Err(Error::CannotDenySelfAsFriend); @@ -2406,7 +2400,7 @@ impl IdentityStore { .find(|request| request.r#type() == RequestType::Incoming && request.did().eq(pubkey)) .ok_or(Error::CannotFindFriendRequest)?; - let payload = RequestResponsePayload::new(&self.did_key, Event::Reject)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Reject)?; self.root_document.remove_request(internal_request).await?; @@ -2424,7 +2418,7 @@ impl IdentityStore { .find(|request| request.r#type() == RequestType::Outgoing && request.did().eq(pubkey)) .ok_or(Error::CannotFindFriendRequest)?; - let payload = RequestResponsePayload::new(&self.did_key, Event::Retract)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Retract)?; self.root_document.remove_request(internal_request).await?; @@ -2468,7 +2462,7 @@ impl IdentityStore { #[tracing::instrument(skip(self))] pub async fn block(&mut self, pubkey: &DID) -> Result<(), Error> { - let local_public_key = (*self.did_key).clone(); + let local_public_key = self.did_key.clone(); if local_public_key.eq(pubkey) { return Err(Error::CannotBlockOwnKey); @@ -2504,14 +2498,15 @@ impl IdentityStore { // let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); // self.ipfs.ban_peer(peer_id).await?; - let payload = RequestResponsePayload::new(&self.did_key, Event::Block)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Block)?; self.broadcast_request(pubkey, &payload, false, true).await } #[tracing::instrument(skip(self))] pub async fn unblock(&mut self, pubkey: &DID) -> Result<(), Error> { - let local_public_key = (*self.did_key).clone(); + let peer_id = pubkey.to_peer_id()?; + let local_public_key = self.did_key.clone(); if local_public_key.eq(pubkey) { return Err(Error::CannotUnblockOwnKey); @@ -2525,10 +2520,9 @@ impl IdentityStore { _ = self.export_root_document().await; - let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); self.ipfs.unban_peer(peer_id).await?; - let payload = RequestResponsePayload::new(&self.did_key, Event::Unblock)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Unblock)?; self.broadcast_request(pubkey, &payload, false, true).await } @@ -2599,7 +2593,7 @@ impl IdentityStore { } if broadcast { - let payload = RequestResponsePayload::new(&self.did_key, Event::Remove)?; + let payload = RequestResponsePayload::new(self.root_document.keypair(), Event::Remove)?; self.broadcast_request(pubkey, &payload, false, true) .await?; @@ -2678,7 +2672,7 @@ impl IdentityStore { store_request: bool, queue_broadcast: bool, ) -> Result<(), Error> { - let remote_peer_id = did_to_libp2p_pub(recipient)?.to_peer_id(); + let remote_peer_id = recipient.to_peer_id()?; if !self.discovery.contains(recipient).await { self.discovery.insert(recipient).await?; @@ -2697,7 +2691,7 @@ impl IdentityStore { } } - let kp = &*self.did_key; + let kp = self.root_document.keypair(); let payload_bytes = serde_json::to_vec(&payload)?; diff --git a/extensions/warp-ipfs/src/store/keystore.rs b/extensions/warp-ipfs/src/store/keystore.rs index 3b7f684b4..99754aa44 100644 --- a/extensions/warp-ipfs/src/store/keystore.rs +++ b/extensions/warp-ipfs/src/store/keystore.rs @@ -3,6 +3,7 @@ use std::{ fmt::Debug, }; +use rust_ipfs::Keypair; use serde::{Deserialize, Serialize}; use uuid::Uuid; use warp::{ @@ -33,11 +34,11 @@ impl Keystore { pub fn insert>( &mut self, - did: &DID, + keypair: &Keypair, recipient: &DID, key: K, ) -> Result<(), Error> { - let key = super::ecdh_encrypt(did, None, key)?; + let key = super::ecdh_encrypt(keypair, None, key)?; match self.recipient_key.entry(recipient.clone()) { Entry::Occupied(mut entry) => { @@ -61,22 +62,22 @@ impl Keystore { self.recipient_key.contains_key(recipient) } - pub fn get_latest(&self, did: &DID, recipient: &DID) -> Result, Error> { + pub fn get_latest(&self, keypair: &Keypair, recipient: &DID) -> Result, Error> { self.recipient_key .get(recipient) .and_then(|list| { list.last() - .and_then(|entry| super::ecdh_decrypt(did, None, entry).ok()) + .and_then(|entry| super::ecdh_decrypt(keypair, None, entry).ok()) }) .ok_or(Error::PublicKeyDoesntExist) } - pub fn get_all(&self, did: &DID, recipient: &DID) -> Result>, Error> { + pub fn get_all(&self, keypair: &Keypair, recipient: &DID) -> Result>, Error> { self.recipient_key .get(recipient) .map(|list| { list.iter() - .filter_map(|entry| super::ecdh_decrypt(did, None, entry).ok()) + .filter_map(|entry| super::ecdh_decrypt(keypair, None, entry).ok()) .collect::>() }) .ok_or(Error::PublicKeyDoesntExist) @@ -92,8 +93,13 @@ impl Keystore { #[allow(dead_code)] impl Keystore { - pub fn try_decrypt(&self, did: &DID, recipient: &DID, data: &[u8]) -> Result, Error> { - let keys = self.get_all(did, recipient)?; + pub fn try_decrypt( + &self, + keypair: &Keypair, + recipient: &DID, + data: &[u8], + ) -> Result, Error> { + let keys = self.get_all(keypair, recipient)?; for key in keys { if let Ok(data) = Cipher::direct_decrypt(data, &key) { return Ok(data); @@ -155,7 +161,10 @@ impl Eq for KeyEntry {} #[cfg(test)] mod test { + use crate::store::PeerIdExt; + use super::Keystore; + use rust_ipfs::Keypair; use warp::crypto::{ cipher::Cipher, generate, @@ -167,10 +176,11 @@ mod test { fn keystore_test() -> anyhow::Result<()> { let mut keystore = Keystore::default(); - let keypair = DID::default(); + let keypair = Keypair::generate_ed25519(); + let kp_did = keypair.to_did()?; let recipient = DID::default(); - assert_ne!(keypair, recipient); + assert_ne!(kp_did, recipient); let key = generate::<32>(); @@ -186,10 +196,11 @@ mod test { fn keystore_get_latest() -> anyhow::Result<()> { let mut keystore = Keystore::default(); - let keypair = DID::default(); + let keypair = Keypair::generate_ed25519(); + let kp_did = keypair.to_did()?; let recipient = DID::default(); - assert_ne!(keypair, recipient); + assert_ne!(kp_did, recipient); let key_1 = generate::<32>(); let key_2 = generate::<32>(); @@ -209,7 +220,7 @@ mod test { let mut rng = rand::thread_rng(); let mut keystore = Keystore::default(); - let keypair = DID::default(); + let keypair = Keypair::generate_ed25519(); let recipients = (0..10).map(|_| DID::default()).collect::>(); for recipient in recipients.iter() { diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 19712c6a6..347452dad 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -25,7 +25,7 @@ use futures::{ FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, }; use libipld::Cid; -use rust_ipfs::{libp2p::gossipsub::Message, p2p::MultiaddrExt, Ipfs, PeerId}; +use rust_ipfs::{libp2p::gossipsub::Message, p2p::MultiaddrExt, Ipfs, Keypair, PeerId}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -67,7 +67,8 @@ use crate::{ }; use super::{ - document::root::RootDocumentMap, ds_key::DataStoreKey, MAX_MESSAGE_SIZE, SHUTTLE_TIMEOUT, + document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt, MAX_MESSAGE_SIZE, + SHUTTLE_TIMEOUT, }; const CHAT_DIRECTORY: &str = "chat_media"; @@ -97,8 +98,6 @@ impl MessageStore { ) -> Self { info!("Initializing MessageStore"); - let keypair = identity.did_key(); - let (tx, rx) = futures::channel::mpsc::channel(1024); let token = CancellationToken::new(); @@ -111,7 +110,6 @@ impl MessageStore { let mut inner = ConversationInner { ipfs: ipfs.clone(), event_handler: Default::default(), - keypair: keypair.clone(), conversation_task: HashMap::new(), command_tx: tx, identity: identity.clone(), @@ -517,7 +515,7 @@ impl ConversationTask { let sender = payload.sender(); - let data = match ecdh_decrypt(&self.identity.did_key(), Some(&sender), payload.data()) { + let data = match ecdh_decrypt(self.identity.root_document().keypair(), Some(&sender), payload.data()) { Ok(d) => d, Err(e) => { tracing::warn!(%sender, error = %e, "failed to decrypt message"); @@ -611,7 +609,6 @@ impl ConversationTask { struct ConversationInner { ipfs: Ipfs, - keypair: Arc, event_handler: HashMap>, conversation_task: HashMap, root: RootDocumentMap, @@ -863,7 +860,9 @@ impl ConversationInner { return Err(Error::PublicKeyIsBlocked); } - if did == &*self.keypair { + let own_did = self.identity.did_key(); + + if did == &own_did { return Err(Error::CannotCreateConversation); } @@ -874,7 +873,7 @@ impl ConversationInner { .find(|conversation| { conversation.conversation_type() == ConversationType::Direct && conversation.recipients().contains(did) - && conversation.recipients().contains(&self.keypair) + && conversation.recipients().contains(&own_did) }) .map(Conversation::from) { @@ -892,8 +891,8 @@ impl ConversationInner { let settings = DirectConversationSettings::default(); let conversation = ConversationDocument::new_direct( - &self.keypair, - [(*self.keypair).clone(), did.clone()], + self.root.keypair(), + [own_did.clone(), did.clone()], settings, )?; @@ -906,14 +905,14 @@ impl ConversationInner { let peer_id = did.to_peer_id()?; let event = ConversationEvents::NewConversation { - recipient: (*self.keypair).clone(), + recipient: own_did.clone(), settings, }; - let bytes = ecdh_encrypt(&self.keypair, Some(did), serde_json::to_vec(&event)?)?; - let signature = sign_serde(&self.keypair, &bytes)?; + let bytes = ecdh_encrypt(self.root.keypair(), Some(did), serde_json::to_vec(&event)?)?; + let signature = sign_serde(self.root.keypair(), &bytes)?; - let payload = Payload::new(&self.keypair, &bytes, &signature); + let payload = Payload::new(&own_did, &bytes, &signature); let peers = self.ipfs.pubsub_peers(Some(did.messaging())).await?; @@ -954,7 +953,7 @@ impl ConversationInner { mut recipients: HashSet, settings: GroupSettings, ) -> Result { - let own_did = &*(self.keypair.clone()); + let own_did = &self.identity.did_key(); if recipients.contains(own_did) { return Err(Error::CannotCreateConversation); @@ -1001,8 +1000,13 @@ impl ConversationInner { let restricted = self.root.get_blocks().await.unwrap_or_default(); - let conversation = - ConversationDocument::new_group(own_did, name, recipients, &restricted, settings)?; + let conversation = ConversationDocument::new_group( + self.root.keypair(), + name, + recipients, + &restricted, + settings, + )?; let recipient = conversation.recipients(); @@ -1011,7 +1015,7 @@ impl ConversationInner { self.set_document(conversation).await?; let mut keystore = Keystore::new(conversation_id); - keystore.insert(own_did, own_did, warp::crypto::generate::<64>())?; + keystore.insert(self.root.keypair(), own_did, warp::crypto::generate::<64>())?; self.set_keystore(conversation_id, keystore).await?; @@ -1031,8 +1035,8 @@ impl ConversationInner { })?; for (did, peer_id) in peer_id_list { - let bytes = ecdh_encrypt(own_did, Some(&did), &event)?; - let signature = sign_serde(own_did, &bytes)?; + let bytes = ecdh_encrypt(self.root.keypair(), Some(&did), &event)?; + let signature = sign_serde(self.root.keypair(), &bytes)?; let payload = Payload::new(own_did, &bytes, &signature); @@ -1147,11 +1151,11 @@ impl ConversationInner { } pub async fn set_document(&mut self, mut document: ConversationDocument) -> Result<(), Error> { + let keypair = self.root.keypair(); if let Some(creator) = document.creator.as_ref() { - if creator.eq(&self.keypair) - && matches!(document.conversation_type(), ConversationType::Group) - { - document.sign(&self.keypair)?; + let did = keypair.to_did()?; + if creator.eq(&did) && matches!(document.conversation_type(), ConversationType::Group) { + document.sign(keypair)?; } } @@ -1233,7 +1237,9 @@ impl ConversationInner { async fn process_msg_event(&mut self, id: Uuid, msg: Message) -> Result<(), Error> { let data = Payload::from_bytes(&msg.data)?; - let own_did = &*self.keypair; + let keypair = self.root.keypair(); + + let own_did = keypair.to_did()?; let conversation = self.get(id).await?; @@ -1243,7 +1249,7 @@ impl ConversationInner { let recipients = list .iter() - .filter(|did| (*self.keypair).ne(did)) + .filter(|did| own_did.ne(did)) .collect::>(); let Some(member) = recipients.first() else { @@ -1251,12 +1257,12 @@ impl ConversationInner { return Err(Error::IdentityDoesntExist); }; - ecdh_decrypt(own_did, Some(member), data.data())? + ecdh_decrypt(keypair, Some(member), data.data())? } ConversationType::Group => { let store = self.get_keystore(id).await?; - let key = match store.get_latest(own_did, &data.sender()) { + let key = match store.get_latest(keypair, &data.sender()) { Ok(key) => key, Err(Error::PublicKeyDoesntExist) => { // If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange @@ -1313,12 +1319,14 @@ impl ConversationInner { return Err(Error::PublicKeyInvalid); } - let own_did = &self.keypair; + let keypair = self.root.keypair(); + + let own_did = keypair.to_did()?; - let bytes = ecdh_encrypt(own_did, Some(did), serde_json::to_vec(&request)?)?; - let signature = sign_serde(own_did, &bytes)?; + let bytes = ecdh_encrypt(keypair, Some(did), serde_json::to_vec(&request)?)?; + let signature = sign_serde(keypair, &bytes)?; - let payload = Payload::new(own_did, &bytes, &signature); + let payload = Payload::new(&own_did, &bytes, &signature); let topic = conversation.reqres_topic(did); @@ -1365,10 +1373,12 @@ impl ConversationInner { ) -> Result { let conversation = self.get(conversation_id).await?; - let keystore = pubkey_or_keystore(self, conversation_id, &self.keypair).await?; + let keypair = self.root.keypair(); + + let keystore = pubkey_or_keystore(self, conversation_id, keypair).await?; conversation - .get_message(&self.ipfs, &self.keypair, message_id, keystore.as_ref()) + .get_message(&self.ipfs, keypair, message_id, keystore.as_ref()) .await } @@ -1402,30 +1412,27 @@ impl ConversationInner { ) -> Result { let conversation = self.get(conversation_id).await?; - // let keystore = match conversation.conversation_type { - // ConversationType::Direct => None, - // ConversationType::Group { .. } => self.get_keystore(conversation_id).await.ok(), - // }; + let keypair = self.root.keypair(); - let keystore = pubkey_or_keystore(self, conversation_id, &self.keypair).await?; + let keystore = pubkey_or_keystore(self, conversation_id, keypair).await?; let m_type = opt.messages_type(); match m_type { MessagesType::Stream => { let stream = conversation - .get_messages_stream(&self.ipfs, self.keypair.clone(), opt, keystore) + .get_messages_stream(&self.ipfs, keypair, opt, keystore) .await?; Ok(Messages::Stream(stream)) } MessagesType::List => { let list = conversation - .get_messages(&self.ipfs, self.keypair.clone(), opt, keystore) + .get_messages(&self.ipfs, keypair, opt, keystore) .await?; Ok(Messages::List(list)) } MessagesType::Pages { .. } => { conversation - .get_messages_pages(&self.ipfs, &self.keypair, opt, keystore.as_ref()) + .get_messages_pages(&self.ipfs, keypair, opt, keystore.as_ref()) .await } } @@ -1455,10 +1462,12 @@ impl ConversationInner { return Err(Error::MessageNotFound); } + let own_did = self.identity.did_key(); + let list = conversation .recipients() .iter() - .filter(|did| (*self.keypair).ne(did)) + .filter(|did| own_did.ne(did)) .cloned() .collect::>(); @@ -1514,7 +1523,8 @@ impl ConversationInner { }); } - let own_did = &*self.keypair; + let keypair = self.root.keypair(); + let own_did = self.identity.did_key(); let mut message = warp::raygun::Message::default(); message.set_conversation_id(conversation.id()); @@ -1522,10 +1532,9 @@ impl ConversationInner { message.set_lines(messages.clone()); let message_id = message.id(); - let keystore = pubkey_or_keystore(self, conversation.id(), &self.keypair).await?; + let keystore = pubkey_or_keystore(self, conversation.id(), keypair).await?; - let message = - MessageDocument::new(&self.ipfs, &self.keypair, message, keystore.as_ref()).await?; + let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?; let message_cid = conversation .insert_message_document(&self.ipfs, message) @@ -1603,19 +1612,21 @@ impl ConversationInner { }); } - let keystore = pubkey_or_keystore(&*self, conversation.id(), &self.keypair).await?; + let keypair = self.root.keypair(); + + let keystore = pubkey_or_keystore(self, conversation.id(), keypair).await?; let mut message_document = conversation .get_message_document(&self.ipfs, message_id) .await?; let mut message = message_document - .resolve(&self.ipfs, &self.keypair, true, keystore.as_ref()) + .resolve(&self.ipfs, keypair, true, keystore.as_ref()) .await?; let sender = message.sender(); - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if sender.ne(own_did) { return Err(Error::InvalidMessage); @@ -1625,14 +1636,7 @@ impl ConversationInner { message.set_modified(Utc::now()); message_document - .update( - &self.ipfs, - &self.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&self.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; let nonce = message_document.nonce_from_message(&self.ipfs).await?; @@ -1715,7 +1719,9 @@ impl ConversationInner { }); } - let own_did = &*self.keypair; + let keypair = self.root.keypair(); + + let own_did = self.identity.did_key(); let mut message = warp::raygun::Message::default(); message.set_conversation_id(conversation.id()); @@ -1723,10 +1729,9 @@ impl ConversationInner { message.set_lines(messages); message.set_replied(Some(message_id)); - let keystore = pubkey_or_keystore(self, conversation.id(), &self.keypair).await?; + let keystore = pubkey_or_keystore(self, conversation.id(), keypair).await?; - let message = - MessageDocument::new(&self.ipfs, &self.keypair, message, keystore.as_ref()).await?; + let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?; let message_id = message.id; @@ -1825,14 +1830,17 @@ impl ConversationInner { let mut conversation = self.get(conversation_id).await?; let tx = self.subscribe(conversation_id).await?; - let keystore = pubkey_or_keystore(self, conversation.id(), &self.keypair).await?; + let keypair = self.root.keypair(); + let own_did = self.identity.did_key(); + + let keystore = pubkey_or_keystore(self, conversation.id(), keypair).await?; let mut message_document = conversation .get_message_document(&self.ipfs, message_id) .await?; let mut message = message_document - .resolve(&self.ipfs, &self.keypair, true, keystore.as_ref()) + .resolve(&self.ipfs, keypair, true, keystore.as_ref()) .await?; let event = match state { @@ -1859,14 +1867,7 @@ impl ConversationInner { }; message_document - .update( - &self.ipfs, - &self.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&self.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; let message_cid = conversation @@ -1899,7 +1900,7 @@ impl ConversationInner { let event = MessagingEvents::Pin { conversation_id, - member: (*self.keypair).clone(), + member: own_did, message_id, state, }; @@ -1917,14 +1918,18 @@ impl ConversationInner { let mut conversation = self.get(conversation_id).await?; let tx = self.subscribe(conversation_id).await?; - let keystore = pubkey_or_keystore(self, conversation.id(), &self.keypair).await?; + let keypair = self.root.keypair(); + + let own_did = self.identity.did_key(); + + let keystore = pubkey_or_keystore(self, conversation.id(), keypair).await?; let mut message_document = conversation .get_message_document(&self.ipfs, message_id) .await?; let mut message = message_document - .resolve(&self.ipfs, &self.keypair, true, keystore.as_ref()) + .resolve(&self.ipfs, keypair, true, keystore.as_ref()) .await?; let recipients = conversation.recipients(); @@ -1937,21 +1942,14 @@ impl ConversationInner { ReactionState::Add => { let entry = reactions.entry(emoji.clone()).or_default(); - if entry.contains(&self.keypair) { + if entry.contains(&own_did) { return Err(Error::ReactionExist); } - entry.push((*self.keypair).clone()); + entry.push(own_did.clone()); message_document - .update( - &self.ipfs, - &self.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&self.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; message_cid = conversation @@ -1962,7 +1960,7 @@ impl ConversationInner { _ = tx.send(MessageEventKind::MessageReactionAdded { conversation_id, message_id, - did_key: (*self.keypair).clone(), + did_key: own_did.clone(), reaction: emoji.clone(), }); } @@ -1971,11 +1969,11 @@ impl ConversationInner { BTreeEntry::Occupied(mut e) => { let list = e.get_mut(); - if !list.contains(&self.keypair) { + if !list.contains(&own_did) { return Err(Error::ReactionDoesntExist); } - list.retain(|did| did != &(*self.keypair).clone()); + list.retain(|did| did != &own_did); if list.is_empty() { e.remove(); } @@ -1984,14 +1982,7 @@ impl ConversationInner { }; message_document - .update( - &self.ipfs, - &self.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&self.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; message_cid = conversation @@ -2003,7 +1994,7 @@ impl ConversationInner { _ = tx.send(MessageEventKind::MessageReactionRemoved { conversation_id, message_id, - did_key: (*self.keypair).clone(), + did_key: own_did.clone(), reaction: emoji.clone(), }); } @@ -2011,7 +2002,7 @@ impl ConversationInner { let event = MessagingEvents::React { conversation_id, - reactor: (*self.keypair).clone(), + reactor: own_did, message_id, state, emoji, @@ -2078,6 +2069,8 @@ impl ConversationInner { } let conversation = self.get(conversation_id).await?; + let keypair = self.root.keypair(); + let mut constellation = self.file.clone(); let files = locations @@ -2114,9 +2107,11 @@ impl ConversationInner { assert_eq!(media_dir.name(), conversation_id.to_string()); let mut atx = self.attachment_tx.clone(); - let keystore = pubkey_or_keystore(self, conversation_id, &self.keypair).await?; + let keystore = pubkey_or_keystore(self, conversation_id, keypair).await?; let ipfs = self.ipfs.clone(); - let keypair = self.keypair.clone(); + let own_did = self.identity.did_key(); + + let keypair = keypair.clone(); let message_id = Uuid::new_v4(); @@ -2243,12 +2238,11 @@ impl ConversationInner { return Err(Error::NoAttachments); } - let own_did = &*keypair; let mut message = warp::raygun::Message::default(); message.set_id(message_id); message.set_message_type(MessageType::Attachment); message.set_conversation_id(conversation.id()); - message.set_sender(own_did.clone()); + message.set_sender(own_did); message.set_attachment(attachments); message.set_lines(messages.clone()); message.set_replied(reply_id); @@ -2407,7 +2401,7 @@ impl ConversationInner { return Err(Error::InvalidConversation); }; - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if creator.ne(own_did) { return Err(Error::PublicKeyInvalid); @@ -2455,7 +2449,7 @@ impl ConversationInner { return Err(Error::InvalidConversation); }; - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if creator.ne(own_did) { return Err(Error::PublicKeyInvalid); @@ -2519,7 +2513,7 @@ impl ConversationInner { return Err(Error::InvalidConversation); }; - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if !settings.members_can_change_name() && creator.ne(own_did) { return Err(Error::PublicKeyInvalid); @@ -2563,7 +2557,7 @@ impl ConversationInner { return Err(Error::InvalidConversation); }; - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if !settings.members_can_add_participants() && creator.ne(own_did) { return Err(Error::PublicKeyInvalid); @@ -2630,7 +2624,7 @@ impl ConversationInner { return Err(Error::InvalidConversation); }; - let own_did = &*self.keypair; + let own_did = &self.identity.did_key(); if creator.ne(own_did) { return Err(Error::PublicKeyInvalid); @@ -2683,13 +2677,14 @@ impl ConversationInner { let document_type = self.delete(conversation_id).await?; + let own_did = &self.identity.did_key(); + if broadcast { let recipients = document_type.recipients(); let mut can_broadcast = true; if matches!(document_type.conversation_type(), ConversationType::Group) { - let own_did = &*self.keypair; let creator = document_type .creator .as_ref() @@ -2712,9 +2707,6 @@ impl ConversationInner { } } - let keypair = self.keypair.clone(); - let own_did = &*keypair; - if can_broadcast { let peer_id_list = recipients .clone() @@ -2730,8 +2722,9 @@ impl ConversationInner { let main_timer = Instant::now(); for (recipient, peer_id) in peer_id_list { - let bytes = ecdh_encrypt(own_did, Some(&recipient), &event)?; - let signature = sign_serde(own_did, &bytes)?; + let keypair = self.root.keypair(); + let bytes = ecdh_encrypt(keypair, Some(&recipient), &event)?; + let signature = sign_serde(keypair, &bytes)?; let payload = Payload::new(own_did, &bytes, &signature); @@ -2794,10 +2787,10 @@ impl ConversationInner { list: &[DID], conversation_id: Uuid, ) -> Result<(), Error> { - let own_did = &*self.keypair; + let own_did = self.identity.did_key(); let context = format!("exclude {}", own_did); - let signature = sign_serde(own_did, &context)?; + let signature = sign_serde(self.root.keypair(), &context)?; let signature = bs58::encode(signature).into_string(); let event = ConversationEvents::LeaveConversation { @@ -2827,11 +2820,11 @@ impl ConversationInner { conversation_id: Uuid, event: MessageEvent, ) -> Result<(), Error> { - let own_did = &*self.keypair; + let member = self.identity.did_key(); let event = MessagingEvents::Event { conversation_id, - member: own_did.clone(), + member, event, cancelled: false, }; @@ -2843,11 +2836,11 @@ impl ConversationInner { conversation_id: Uuid, event: MessageEvent, ) -> Result<(), Error> { - let own_did = &*self.keypair; + let member = self.identity.did_key(); let event = MessagingEvents::Event { conversation_id, - member: own_did.clone(), + member, event, cancelled: true, }; @@ -2861,7 +2854,7 @@ impl ConversationInner { ) -> Result<(), Error> { let conversation = self.get(conversation_id).await?; - let own_did = &*self.keypair; + let own_did = self.identity.did_key(); let event = serde_json::to_vec(&event)?; @@ -2869,8 +2862,8 @@ impl ConversationInner { let bytes = Cipher::direct_encrypt(&event, &key)?; - let signature = sign_serde(own_did, &bytes)?; - let payload = Payload::new(own_did, &bytes, &signature); + let signature = sign_serde(self.root.keypair(), &bytes)?; + let payload = Payload::new(&own_did, &bytes, &signature); let peers = self .ipfs @@ -2895,12 +2888,12 @@ impl ConversationInner { settings: ConversationSettings, ) -> Result<(), Error> { let mut conversation = self.get(conversation_id).await?; - let own_did = &*self.keypair; + let own_did = self.identity.did_key(); let Some(creator) = &conversation.creator else { return Err(Error::InvalidConversation); }; - if creator != own_did { + if creator != &own_did { return Err(Error::PublicKeyInvalid); } @@ -2934,15 +2927,16 @@ impl ConversationInner { let conversation = self.get(conversation_id).await?; let event = serde_json::to_vec(&event)?; - let keypair = self.keypair.clone(); + let keypair = self.root.keypair(); + let own_did = self.identity.did_key(); let key = self.conversation_key(conversation_id, None).await?; let bytes = Cipher::direct_encrypt(&event, &key)?; - let signature = sign_serde(&keypair, &bytes)?; + let signature = sign_serde(keypair, &bytes)?; - let payload = Payload::new(&keypair, &bytes, &signature); + let payload = Payload::new(&own_did, &bytes, &signature); let peers = self.ipfs.pubsub_peers(Some(conversation.topic())).await?; @@ -2951,7 +2945,7 @@ impl ConversationInner { for recipient in conversation .recipients() .iter() - .filter(|did| (*keypair).ne(did)) + .filter(|did| own_did.ne(did)) { let peer_id = recipient.to_peer_id()?; @@ -3004,10 +2998,13 @@ impl ConversationInner { ) -> Result<(), Error> { let event = serde_json::to_vec(&event)?; - let bytes = ecdh_encrypt(&self.keypair, Some(did_key), &event)?; - let signature = sign_serde(&self.keypair, &bytes)?; + let keypair = self.root.keypair(); + let own_did = self.identity.did_key(); - let payload = Payload::new(&self.keypair, &bytes, &signature); + let bytes = ecdh_encrypt(keypair, Some(did_key), &event)?; + let signature = sign_serde(keypair, &bytes)?; + + let payload = Payload::new(&own_did, &bytes, &signature); let peer_id = did_key.to_peer_id()?; let peers = self.ipfs.pubsub_peers(Some(did_key.messaging())).await?; @@ -3050,7 +3047,7 @@ impl ConversationInner { let main_topic = conversation.topic(); let event_topic = conversation.event_topic(); - let request_topic = conversation.reqres_topic(&self.keypair); + let request_topic = conversation.reqres_topic(&self.identity.did_key()); let messaging_stream = self .ipfs @@ -3121,22 +3118,25 @@ impl ConversationInner { member: Option<&DID>, ) -> Result, Error> { let conversation = self.get(conversation_id).await?; + let keypair = self.root.keypair(); + let own_did = self.identity.did_key(); + match conversation.conversation_type() { ConversationType::Direct => { let list = conversation.recipients(); let recipients = list .iter() - .filter(|did| (*self.keypair).ne(did)) + .filter(|did| own_did.ne(did)) .collect::>(); let member = recipients.first().ok_or(Error::InvalidConversation)?; - ecdh_shared_key(&self.keypair, Some(member)) + ecdh_shared_key(keypair, Some(member)) } ConversationType::Group => { - let recipient = member.unwrap_or(&*self.keypair); + let recipient = member.unwrap_or(&own_did); let keystore = self.get_keystore(conversation.id()).await?; - keystore.get_latest(&self.keypair, recipient) + keystore.get_latest(keypair, recipient) } } } @@ -3158,10 +3158,11 @@ async fn process_conversation( recipient, settings, } => { - let did = &*this.keypair; + let keypair = this.root.keypair(); + let did = this.identity.did_key(); tracing::info!("New conversation event received from {recipient}"); let conversation_id = - generate_shared_topic(did, &recipient, Some("direct-conversation"))?; + generate_shared_topic(keypair, &recipient, Some("direct-conversation"))?; if this.contains(conversation_id).await { tracing::warn!(%conversation_id, "Conversation exist"); @@ -3179,7 +3180,7 @@ async fn process_conversation( let list = [did.clone(), recipient]; tracing::info!(%conversation_id, "Creating conversation"); - let convo = ConversationDocument::new_direct(did, list, settings)?; + let convo = ConversationDocument::new_direct(keypair, list, settings)?; let conversation_type = convo.conversation_type(); this.set_document(convo).await?; @@ -3193,6 +3194,9 @@ async fn process_conversation( .await; } ConversationEvents::NewGroupConversation { mut conversation } => { + let keypair = this.root.keypair(); + let did = this.identity.did_key(); + let conversation_id = conversation.id; tracing::info!(%conversation_id, "New group conversation event received"); @@ -3201,7 +3205,7 @@ async fn process_conversation( return Ok(()); } - if !conversation.recipients.contains(&this.keypair) { + if !conversation.recipients.contains(&did) { warn!(%conversation_id, "was added to conversation but never was apart of the conversation."); return Ok(()); } @@ -3217,7 +3221,7 @@ async fn process_conversation( let conversation_type = conversation.conversation_type(); let mut keystore = Keystore::new(conversation_id); - keystore.insert(&this.keypair, &this.keypair, warp::crypto::generate::<64>())?; + keystore.insert(keypair, &did, warp::crypto::generate::<64>())?; conversation.verify()?; @@ -3233,9 +3237,8 @@ async fn process_conversation( let conversation = this.get(conversation_id).await?; tracing::info!(%conversation_id, "{} conversation created", conversation_type); - let keypair = this.keypair.clone(); - for recipient in conversation.recipients.iter().filter(|d| (*keypair).ne(d)) { + for recipient in conversation.recipients.iter().filter(|d| did.ne(d)) { if let Err(e) = this.request_key(conversation_id, recipient).await { tracing::warn!(%conversation_id, error = %e, %recipient, "Failed to send exchange request"); } @@ -3260,7 +3263,7 @@ async fn process_conversation( return Err(anyhow::anyhow!("Group conversation requires a creator").into()); }; - let own_did = &*this.keypair; + let own_did = this.identity.did_key(); // Precaution if recipient.eq(creator) { @@ -3275,7 +3278,7 @@ async fn process_conversation( tracing::info!("{recipient} is leaving group conversation {conversation_id}"); - if creator.eq(own_did) { + if creator.eq(&own_did) { this.remove_recipient(conversation_id, &recipient, false) .await?; } else { @@ -3354,7 +3357,10 @@ async fn message_event( let mut document = this.get(conversation_id).await?; let tx = this.subscribe(conversation_id).await?; - let keystore = pubkey_or_keystore(this, conversation_id, &this.keypair).await?; + let keypair = this.root.keypair(); + let own_did = this.identity.did_key(); + + let keystore = pubkey_or_keystore(this, conversation_id, keypair).await?; match events { MessagingEvents::New { message } => { @@ -3377,7 +3383,7 @@ async fn message_event( } let resolved_message = message - .resolve(&this.ipfs, &this.keypair, false, keystore.as_ref()) + .resolve(&this.ipfs, keypair, false, keystore.as_ref()) .await?; let lines_value_length: usize = resolved_message @@ -3429,7 +3435,7 @@ async fn message_event( .await?; let mut message = message_document - .resolve(&this.ipfs, &this.keypair, true, keystore.as_ref()) + .resolve(&this.ipfs, keypair, true, keystore.as_ref()) .await?; let lines_value_length: usize = lines @@ -3461,9 +3467,9 @@ async fn message_event( message_document .update( &this.ipfs, - &this.keypair, + keypair, message, - (!signature.is_empty() && sender.ne(&this.keypair)).then_some(signature), + (!signature.is_empty() && sender.ne(&own_did)).then_some(signature), keystore.as_ref(), Some(nonce.as_slice()), ) @@ -3522,7 +3528,7 @@ async fn message_event( .await?; let mut message = message_document - .resolve(&this.ipfs, &this.keypair, true, keystore.as_ref()) + .resolve(&this.ipfs, keypair, true, keystore.as_ref()) .await?; let event = match state { @@ -3549,14 +3555,7 @@ async fn message_event( }; message_document - .update( - &this.ipfs, - &this.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&this.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; document @@ -3581,7 +3580,7 @@ async fn message_event( .await?; let mut message = message_document - .resolve(&this.ipfs, &this.keypair, true, keystore.as_ref()) + .resolve(&this.ipfs, keypair, true, keystore.as_ref()) .await?; let reactions = message.reactions_mut(); @@ -3597,14 +3596,7 @@ async fn message_event( entry.push(reactor.clone()); message_document - .update( - &this.ipfs, - &this.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&this.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; document @@ -3640,14 +3632,7 @@ async fn message_event( }; message_document - .update( - &this.ipfs, - &this.keypair, - message, - None, - keystore.as_ref(), - None, - ) + .update(&this.ipfs, keypair, message, None, keystore.as_ref(), None) .await?; document @@ -3797,6 +3782,8 @@ async fn process_identity_events( //TODO: Tie this into a configuration let with_friends = false; + let own_did = this.identity.did_key(); + match event { MultiPassEventKind::FriendAdded { did } => { if !with_friends { @@ -3822,7 +3809,7 @@ async fn process_identity_events( } } ConversationType::Group => { - if conversation.creator != Some((*this.keypair).clone()) { + if conversation.creator != Some(own_did.clone()) { continue; } @@ -3839,7 +3826,6 @@ async fn process_identity_events( } } MultiPassEventKind::Unblocked { did } => { - let own_did = (*this.keypair).clone(); let list = this.list().await; for conversation in list @@ -3874,7 +3860,7 @@ async fn process_identity_events( } } ConversationType::Group => { - if conversation.creator != Some((*this.keypair).clone()) { + if conversation.creator != Some(own_did.clone()) { continue; } @@ -3899,13 +3885,16 @@ async fn process_request_response_event( conversation_id: Uuid, req: Message, ) -> Result<(), Error> { + let keypair = &this.root.keypair().clone(); + let own_did = this.identity.did_key(); + let conversation = this.get(conversation_id).await?; let payload = Payload::from_bytes(&req.data)?; let sender = payload.sender(); - let data = ecdh_decrypt(&this.keypair, Some(&sender), payload.data())?; + let data = ecdh_decrypt(keypair, Some(&sender), payload.data())?; let event = serde_json::from_slice::(&data)?; @@ -3928,11 +3917,11 @@ async fn process_request_response_event( let mut keystore = this.get_keystore(conversation_id).await?; - let raw_key = match keystore.get_latest(&this.keypair, &this.keypair) { + let raw_key = match keystore.get_latest(keypair, &own_did) { Ok(key) => key, Err(Error::PublicKeyDoesntExist) => { let key = generate::<64>().into(); - keystore.insert(&this.keypair, &this.keypair, &key)?; + keystore.insert(keypair, &own_did, &key)?; this.set_keystore(conversation_id, keystore).await?; key @@ -3943,7 +3932,7 @@ async fn process_request_response_event( } }; - let key = ecdh_encrypt(&this.keypair, Some(&sender), raw_key)?; + let key = ecdh_encrypt(keypair, Some(&sender), raw_key)?; let response = ConversationRequestResponse::Response { conversation_id, @@ -3952,11 +3941,10 @@ async fn process_request_response_event( let topic = conversation.reqres_topic(&sender); - let bytes = - ecdh_encrypt(&this.keypair, Some(&sender), serde_json::to_vec(&response)?)?; - let signature = sign_serde(&this.keypair, &bytes)?; + let bytes = ecdh_encrypt(keypair, Some(&sender), serde_json::to_vec(&response)?)?; + let signature = sign_serde(keypair, &bytes)?; - let payload = Payload::new(&this.keypair, &bytes, &signature); + let payload = Payload::new(&own_did, &bytes, &signature); let peers = this.ipfs.pubsub_peers(Some(topic.clone())).await?; @@ -4010,9 +3998,9 @@ async fn process_request_response_event( } let mut keystore = this.get_keystore(conversation_id).await?; - let raw_key = ecdh_decrypt(&this.keypair, Some(&sender), key)?; + let raw_key = ecdh_decrypt(keypair, Some(&sender), key)?; - keystore.insert(&this.keypair, &sender, raw_key)?; + keystore.insert(keypair, &sender, raw_key)?; this.set_keystore(conversation_id, keystore).await?; @@ -4058,9 +4046,11 @@ async fn process_pending_payload(this: &mut ConversationInner) { continue; }; + let keypair = &this.root.keypair().clone(); + for (sender, data) in list { let fut = async { - let key = store.get_latest(&this.keypair, &sender)?; + let key = store.get_latest(keypair, &sender)?; let data = Cipher::direct_decrypt(&data, &key)?; let event = serde_json::from_slice(&data)?; message_event(this, conversation_id, event).await @@ -4154,6 +4144,8 @@ impl Queue { //TODO: Replace async fn process_queue(this: &mut ConversationInner) { let mut changed = false; + let keypair = &this.root.keypair().clone(); + let own_did = this.identity.did_key(); for (did, items) in this.queue.iter_mut() { let Ok(peer_id) = did.to_peer_id() else { @@ -4187,11 +4179,11 @@ async fn process_queue(this: &mut ConversationInner) { continue; } - let Ok(signature) = sign_serde(&this.keypair, &data) else { + let Ok(signature) = sign_serde(keypair, &data) else { continue; }; - let payload = Payload::new(&this.keypair, data, &signature); + let payload = Payload::new(&own_did, data, &signature); let Ok(bytes) = payload.to_bytes() else { continue; @@ -4221,16 +4213,18 @@ async fn process_queue(this: &mut ConversationInner) { async fn pubkey_or_keystore( conversation: &ConversationInner, conversation_id: Uuid, - keypair: &DID, + keypair: &Keypair, ) -> Result, Error> { let document = conversation.get(conversation_id).await?; let keystore = match document.conversation_type() { ConversationType::Direct => { let list = document.recipients(); + let own_did = keypair.to_did()?; + let recipients = list .into_iter() - .filter(|did| keypair.ne(did)) + .filter(|did| own_did.ne(did)) .collect::>(); let member = recipients diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index 9979d98c9..58b6a20f5 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -17,21 +17,20 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use uuid::Uuid; -use ipfs::{Keypair, PeerId, PublicKey}; +use ipfs::{libp2p::identity::KeyType, Keypair, PeerId, PublicKey}; use warp::{ crypto::{ cipher::Cipher, - did_key::{CoreSign, Generate, ECDH}, + did_key::{Generate, ECDH}, hash::sha256_hash, zeroize::Zeroizing, - DIDKey, Ed25519KeyPair, KeyMaterial, DID, + Ed25519KeyPair, KeyMaterial, DID, }, error::Error, multipass::identity::IdentityStatus, raygun::{ ConversationSettings, DirectConversationSettings, MessageEvent, PinState, ReactionState, }, - tesseract::Tesseract, }; pub const MAX_THUMBNAIL_SIZE: usize = 5_242_880; @@ -209,18 +208,43 @@ impl PeerIdExt for PeerId { anyhow::bail!("PeerId does not contain inline public key"); } let public_key = PublicKey::try_decode_protobuf(multihash.digest())?; - libp2p_pub_to_did(&public_key) + sealed::libp2p_pub_to_did(&public_key) + } +} + +impl PeerIdExt for PublicKey { + fn to_public_key(&self) -> Result { + Ok(self.clone()) + } + + fn to_did(&self) -> Result { + sealed::libp2p_pub_to_did(self) + } +} + +impl PeerIdExt for Keypair { + fn to_public_key(&self) -> Result { + Ok(self.public()) + } + + fn to_did(&self) -> Result { + sealed::libp2p_pub_to_did(&self.public()) } } pub trait DidExt { + fn to_public_key(&self) -> Result; fn to_peer_id(&self) -> Result; fn to_keypair(&self) -> Result; } impl DidExt for DID { + fn to_public_key(&self) -> Result { + sealed::did_to_libp2p_pub(self) + } + fn to_peer_id(&self) -> Result { - did_to_libp2p_pub(self).map(|p| p.to_peer_id()) + self.to_public_key().map(|p| p.to_peer_id()) } fn to_keypair(&self) -> Result { @@ -392,21 +416,28 @@ pub enum ConversationUpdateKind { } // Note that this are temporary -fn sign_serde(did: &DID, data: &D) -> anyhow::Result> { +fn sign_serde(keypair: &Keypair, data: &D) -> anyhow::Result> { let bytes = serde_json::to_vec(data)?; - Ok(did.as_ref().sign(&bytes)) + Ok(keypair.sign(&bytes).expect("not RSA")) } // Note that this are temporary fn verify_serde_sig(pk: DID, data: &D, signature: &[u8]) -> anyhow::Result<()> { let bytes = serde_json::to_vec(data)?; - pk.as_ref() - .verify(&bytes, signature) - .map_err(|e| anyhow::anyhow!("{:?}", e))?; + let pk = pk.to_public_key()?; + if !pk.verify(&bytes, signature) { + return Err(Error::InvalidSignature.into()); + } + Ok(()) } -pub fn generate_shared_topic(did_a: &DID, did_b: &DID, seed: Option<&str>) -> anyhow::Result { +pub fn generate_shared_topic( + keypair: &Keypair, + did_b: &DID, + seed: Option<&str>, +) -> anyhow::Result { + let did_a = sealed::get_keypair_did(keypair)?; let x25519_a = Ed25519KeyPair::from_secret_key(&did_a.private_key_bytes()).get_x25519(); let x25519_b = Ed25519KeyPair::from_public_key(&did_b.public_key_bytes()).get_x25519(); let shared_key = x25519_a.key_exchange(&x25519_b); @@ -414,39 +445,45 @@ pub fn generate_shared_topic(did_a: &DID, did_b: &DID, seed: Option<&str>) -> an Uuid::from_slice(&topic_hash[..topic_hash.len() / 2]).map_err(anyhow::Error::from) } -pub fn get_keypair_did(keypair: &ipfs::Keypair) -> anyhow::Result { - let kp = Zeroizing::new(keypair.clone().try_into_ed25519()?.to_bytes()); - let kp = warp::crypto::ed25519_dalek::Keypair::from_bytes(&*kp)?; - let did = DIDKey::Ed25519(Ed25519KeyPair::from_secret_key(kp.secret.as_bytes())); - Ok(did.into()) -} +mod sealed { + use rust_ipfs::{Keypair, PublicKey}; + use warp::crypto::KeyMaterial; + use warp::crypto::{zeroize::Zeroizing, DIDKey, Ed25519KeyPair, DID}; + use warp::{crypto::did_key::Generate, error::Error}; + pub fn get_keypair_did(keypair: &Keypair) -> anyhow::Result { + let kp = Zeroizing::new(keypair.clone().try_into_ed25519()?.to_bytes()); + let kp = warp::crypto::ed25519_dalek::Keypair::from_bytes(&*kp)?; + let did = DIDKey::Ed25519(Ed25519KeyPair::from_secret_key(kp.secret.as_bytes())); + Ok(did.into()) + } -fn did_to_libp2p_pub(public_key: &DID) -> anyhow::Result { - let pub_key = - ipfs::libp2p::identity::ed25519::PublicKey::try_from_bytes(&public_key.public_key_bytes())?; - Ok(ipfs::libp2p::identity::PublicKey::from(pub_key)) -} + pub fn did_to_libp2p_pub(public_key: &DID) -> anyhow::Result { + let pub_key = rust_ipfs::libp2p::identity::ed25519::PublicKey::try_from_bytes( + &public_key.public_key_bytes(), + )?; + Ok(PublicKey::from(pub_key)) + } -fn libp2p_pub_to_did(public_key: &ipfs::libp2p::identity::PublicKey) -> anyhow::Result { - let pk = match public_key.clone().try_into_ed25519() { - Ok(pk) => { - let did: DIDKey = Ed25519KeyPair::from_public_key(&pk.to_bytes()).into(); - did.into() - } - _ => anyhow::bail!(Error::PublicKeyInvalid), - }; - Ok(pk) + pub fn libp2p_pub_to_did(public_key: &PublicKey) -> anyhow::Result { + let pk = match public_key.clone().try_into_ed25519() { + Ok(pk) => { + let did: DIDKey = Ed25519KeyPair::from_public_key(&pk.to_bytes()).into(); + did.into() + } + _ => anyhow::bail!(Error::PublicKeyInvalid), + }; + Ok(pk) + } } -fn did_keypair(tesseract: &Tesseract) -> anyhow::Result { - let kp = tesseract.retrieve("keypair")?; - let kp = bs58::decode(kp).into_vec()?; - let id_kp = warp::crypto::ed25519_dalek::Keypair::from_bytes(&kp)?; - let did = DIDKey::Ed25519(Ed25519KeyPair::from_secret_key(id_kp.secret.as_bytes())); - Ok(did.into()) -} +pub(crate) fn ecdh_shared_key( + keypair: &Keypair, + recipient: Option<&DID>, +) -> Result, Error> { + assert!(keypair.key_type() != KeyType::RSA); + + let did = sealed::get_keypair_did(keypair)?; -pub(crate) fn ecdh_shared_key(did: &DID, recipient: Option<&DID>) -> Result, Error> { let prikey = Ed25519KeyPair::from_secret_key(&did.private_key_bytes()).get_x25519(); let did_pubkey = match recipient { Some(did) => did.public_key_bytes(), @@ -460,7 +497,7 @@ pub(crate) fn ecdh_shared_key(did: &DID, recipient: Option<&DID>) -> Result>( - did: &DID, + did: &Keypair, recipient: Option<&DID>, data: K, ) -> Result, Error> { @@ -471,11 +508,14 @@ pub(crate) fn ecdh_encrypt>( } pub(crate) fn ecdh_encrypt_with_nonce>( - did: &DID, + keypair: &Keypair, recipient: Option<&DID>, data: K, nonce: &[u8], ) -> Result, Error> { + assert!(keypair.key_type() != KeyType::RSA); + + let did = sealed::get_keypair_did(keypair)?; let prikey = Ed25519KeyPair::from_secret_key(&did.private_key_bytes()).get_x25519(); let did_pubkey = match recipient { Some(did) => did.public_key_bytes(), @@ -490,11 +530,11 @@ pub(crate) fn ecdh_encrypt_with_nonce>( } pub(crate) fn ecdh_decrypt>( - did: &DID, + keypair: &Keypair, recipient: Option<&DID>, data: K, ) -> Result, Error> { - let prik = Zeroizing::new(ecdh_shared_key(did, recipient)?); + let prik = Zeroizing::new(ecdh_shared_key(keypair, recipient)?); let data = Cipher::direct_decrypt(data.as_ref(), &prik)?; Ok(data) @@ -574,7 +614,7 @@ mod test { use rust_ipfs::Keypair; use warp::crypto::DID; - use crate::store::did_to_libp2p_pub; + use crate::store::DidExt; use super::PeerIdExt; @@ -584,7 +624,7 @@ mod test { assert!(peer_id.to_did().is_ok()); let random_did = DID::default(); - let public_key = did_to_libp2p_pub(&random_did)?; + let public_key = random_did.to_public_key()?; let peer_id = public_key.to_peer_id(); diff --git a/extensions/warp-ipfs/src/store/queue.rs b/extensions/warp-ipfs/src/store/queue.rs index 8954e8885..e46f4b7e4 100644 --- a/extensions/warp-ipfs/src/store/queue.rs +++ b/extensions/warp-ipfs/src/store/queue.rs @@ -1,54 +1,39 @@ use futures::{channel::mpsc, StreamExt, TryFutureExt}; use libipld::Cid; -use rust_ipfs::Ipfs; +use rust_ipfs::{Ipfs, Keypair}; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::error; -use warp::{ - crypto::{ - cipher::Cipher, - did_key::{Generate, ECDH}, - zeroize::Zeroizing, - Ed25519KeyPair, KeyMaterial, DID, - }, - error::Error, -}; +use warp::{crypto::DID, error::Error}; use web_time::Instant; use crate::store::{ds_key::DataStoreKey, ecdh_encrypt, topics::PeerTopic, PeerIdExt}; -use super::{connected_to_peer, discovery::Discovery, identity::RequestResponsePayload}; +use super::{ + connected_to_peer, discovery::Discovery, document::root::RootDocumentMap, ecdh_decrypt, + identity::RequestResponsePayload, +}; +#[derive(Clone)] pub struct Queue { ipfs: Ipfs, entries: Arc>>, removal: mpsc::UnboundedSender, - did: Arc, + keypair: Keypair, discovery: Discovery, } -impl Clone for Queue { - fn clone(&self) -> Self { - Self { - ipfs: self.ipfs.clone(), - entries: self.entries.clone(), - removal: self.removal.clone(), - did: self.did.clone(), - discovery: self.discovery.clone(), - } - } -} - impl Queue { - pub fn new(ipfs: Ipfs, did: Arc, discovery: Discovery) -> Queue { + pub fn new(ipfs: Ipfs, root: &RootDocumentMap, discovery: Discovery) -> Queue { let (tx, mut rx) = mpsc::unbounded(); + let keypair = root.keypair().clone(); let queue = Queue { ipfs, entries: Default::default(), removal: tx, - did, + keypair, discovery, }; @@ -83,7 +68,7 @@ impl Queue { self.ipfs.clone(), did.clone(), payload, - self.did.clone(), + &self.keypair, self.removal.clone(), ) .await; @@ -160,14 +145,7 @@ impl Queue { } }; - let prikey = Ed25519KeyPair::from_secret_key(&self.did.private_key_bytes()).get_x25519(); - let pubkey = Ed25519KeyPair::from_public_key(&self.did.public_key_bytes()).get_x25519(); - - let prik = std::panic::catch_unwind(|| prikey.key_exchange(&pubkey)) - .map(Zeroizing::new) - .map_err(|_| anyhow::anyhow!("Error performing key exchange"))?; - - let data = Cipher::direct_decrypt(&data, &prik)?; + let data = ecdh_decrypt(&self.keypair, None, data)?; let map: HashMap = serde_json::from_slice(&data)?; @@ -200,18 +178,7 @@ impl Queue { } }; - let prikey = Ed25519KeyPair::from_secret_key(&self.did.private_key_bytes()).get_x25519(); - let pubkey = Ed25519KeyPair::from_public_key(&self.did.public_key_bytes()).get_x25519(); - - let prik = match std::panic::catch_unwind(|| prikey.key_exchange(&pubkey)) { - Ok(pri) => Zeroizing::new(pri), - Err(e) => { - error!("Error generating key: {e:?}"); - return; - } - }; - - let data = match Cipher::direct_encrypt(&bytes, &prik) { + let data = match ecdh_encrypt(&self.keypair, None, bytes) { Ok(d) => d, Err(e) => { error!("Error encrypting queue: {e}"); @@ -256,7 +223,7 @@ impl Queue { pub struct QueueEntry { ipfs: Ipfs, recipient: DID, - did: Arc, + keypair: Keypair, item: RequestResponsePayload, drop_guard: Arc>>, } @@ -266,13 +233,13 @@ impl QueueEntry { ipfs: Ipfs, recipient: DID, item: RequestResponsePayload, - did: Arc, + keypair: &Keypair, tx: mpsc::UnboundedSender, ) -> QueueEntry { let entry = QueueEntry { ipfs, recipient, - did, + keypair: keypair.clone(), item, drop_guard: Default::default(), }; @@ -313,7 +280,7 @@ impl QueueEntry { let recipient = entry.recipient.clone(); let res = async move { - let kp = &*entry.did; + let kp = &entry.keypair; let payload_bytes = serde_json::to_vec(&entry.item)?; let bytes = ecdh_encrypt(kp, Some(&recipient), payload_bytes)?;