diff --git a/extensions/warp-ipfs/src/store/files.rs b/extensions/warp-ipfs/src/store/files.rs index 3aa72ea5a..188d8eaad 100644 --- a/extensions/warp-ipfs/src/store/files.rs +++ b/extensions/warp-ipfs/src/store/files.rs @@ -17,7 +17,7 @@ use warp::{ constellation::{ directory::Directory, ConstellationEventKind, ConstellationProgressStream, Progression, }, - crypto::cipher::Cipher, + crypto::{cipher::Cipher, DID}, error::Error, sync::RwLock, }; @@ -1103,3 +1103,23 @@ pub struct ConstellationRootFileDag { pub size: u64, pub key: Vec, } + +impl ConstellationRootFileDag { + pub fn assign_with_did(mut self, keypair: &DID, did: &DID) -> Result { + let key = ecdh_decrypt(keypair, None, &self.key)?; + let new_key = ecdh_encrypt(keypair, Some(did), key)?; + self.key = new_key; + Ok(self) + } + + pub fn assign_with_key( + mut self, + keypair: &DID, + cipher_key: impl AsRef<[u8]>, + ) -> Result { + let key = ecdh_decrypt(keypair, None, &self.key)?; + let new_key = Cipher::direct_encrypt(&key, cipher_key.as_ref())?; + self.key = new_key; + Ok(self) + } +} diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index d43f7d218..76c765b30 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use chrono::Utc; +use either::Either; use futures::channel::mpsc::{unbounded, UnboundedSender}; use futures::channel::oneshot::{self, Sender as OneshotSender}; use futures::stream::SelectAll; @@ -41,6 +42,7 @@ use crate::store::{ use super::conversation::{ConversationDocument, MessageDocument}; use super::discovery::Discovery; use super::document::conversation::Conversations; +use super::files::ConstellationRootFileDag; use super::identity::IdentityStore; use super::keystore::Keystore; use super::{verify_serde_sig, ConversationEvents, MessagingEvents}; @@ -2905,8 +2907,32 @@ impl MessageStore { return Err(Error::NoAttachments); } + let ipfs = self.ipfs.clone(); + let store = self.clone(); + let own_did = self.did.clone(); + + let chat_key = match conversation.conversation_type { + ConversationType::Direct => { + let recipient = conversation + .recipients() + .iter() + .filter(|did| (*own_did).ne(did)) + .cloned() + .collect::>() + .first() + .cloned() + .ok_or(Error::InvalidConversation)?; + Either::Left(recipient) + } + ConversationType::Group => { + let keystore = self.conversation_keystore(conversation.id()).await?; + let key = keystore.get_latest(&own_did, &own_did)?; + Either::Right(key) + } + }; + let stream = async_stream::stream! { let mut in_stack = vec![]; @@ -3032,6 +3058,8 @@ impl MessageStore { }; } + let chat_key = chat_key.clone(); + for await (progress, file) in streams { yield AttachmentKind::AttachedProgress(progress); if let Some(file) = file { @@ -3047,9 +3075,54 @@ impl MessageStore { new_file.set_thumbnail(&thumbnail); total_thumbnail_size += thumbnail.len(); } + + let reference = match file.reference() + .ok_or(Error::Other) + .and_then(|r| r.parse::().map_err(anyhow::Error::from).map_err(Error::from)) { + Ok(r) => r, + Err(e) => { + yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) }); + continue; + } + }; + + let dag = match ipfs.get_dag(reference) + .local() + .deserialized::() + .await { + Ok(d) => d, + Err(e) => { + yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) }); + continue; + } + }; + + let keypair = own_did.clone(); + + let result = match chat_key.as_ref() { + Either::Left(recipient) => dag.assign_with_did(&keypair, recipient), + Either::Right(key) => dag.assign_with_key(&keypair, key), + }; + + + let ipfs = ipfs.clone(); + + let fut = async move { + let dag = result?; + ipfs.dag().put().pin(false).serialize(dag)?.await.map(IpfsPath::from) + }; + + let ipfs_path = match fut.await { + Ok(path) => path, + Err(e) => { + yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) }); + continue; + } + }; + new_file.set_size(file.size()); new_file.set_hash(file.hash()); - new_file.set_reference(&file.reference().unwrap_or_default()); + new_file.set_reference(&ipfs_path.to_string()); attachments.push(new_file); } } @@ -3105,18 +3178,24 @@ impl MessageStore { pub async fn download( &self, - conversation: Uuid, + conversation_id: Uuid, message_id: Uuid, file: &str, path: PathBuf, _: bool, ) -> Result { + use tokio::io::AsyncWriteExt; + let constellation = self .filesystem .clone() .ok_or(Error::ConstellationExtensionUnavailable)?; - let message = self.get_message(conversation, message_id).await?; + let conv = self.get_conversation(conversation_id).await?; + + let message = self.get_message(conversation_id, message_id).await?; + + let own_did = &*self.did; if message.message_type() != MessageType::Attachment { return Err(Error::InvalidMessage); @@ -3137,7 +3216,38 @@ impl MessageStore { .ok_or(Error::FileNotFound)?; let ipfs = self.ipfs.clone(); - let _constellation = constellation.clone(); + + let root = ipfs + .get_dag(reference) + .deserialized::() + .await?; + + let key = match conv.conversation_type() { + ConversationType::Direct => { + let recipient = conv + .recipients() + .iter() + .filter(|did| (*own_did).ne(did)) + .cloned() + .collect::>() + .first() + .cloned() + .ok_or(Error::InvalidConversation)?; + ecdh_decrypt(own_did, Some(&recipient), &root.key)? + } + ConversationType::Group => { + let keystore = self.conversation_keystore(conversation_id).await?; + let key = keystore.get_latest(own_did, &message.sender())?; + Cipher::direct_decrypt(&root.key, &key)? + } + }; + + let cipher = Cipher::from(key); + + let link_ref = IpfsPath::from(root.link); + + let total_size = root.size; + let progress_stream = async_stream::stream! { yield Progression::CurrentProgress { name: attachment.name(), @@ -3145,35 +3255,87 @@ impl MessageStore { total: Some(attachment.size()), }; - let stream = ipfs.get_unixfs(reference, &path); + let mut f = match tokio::fs::File::create(&path).await { + Ok(f) => f, + Err(e) => { + yield Progression::ProgressFailed { + name: attachment.name(), + last_size: None, + error: Some(e.to_string()), + }; + return; + } + }; + + let stream = ipfs.cat_unixfs(link_ref, None) + .map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed(); + + let stream = match cipher.decrypt_async_stream(stream).await.map(|s| s.boxed()) { + Ok(s) => s, + Err(e) => { + yield Progression::ProgressFailed { + name: attachment.name(), + last_size: None, + error: Some(e.to_string()), + }; + return; + } + }; + + + let mut total_written = 0; for await event in stream { match event { - rust_ipfs::unixfs::UnixfsStatus::ProgressStatus { written, total_size } => { - yield Progression::CurrentProgress { - name: attachment.name(), - current: written, - total: total_size + Ok(data) => { + let fut = async { + f.write_all(&data).await?; + f.sync_all().await?; + Ok::<_, std::io::Error>(()) }; - }, - rust_ipfs::unixfs::UnixfsStatus::CompletedStatus { total_size, .. } => { - yield Progression::ProgressComplete { + + if let Err(e) = fut.await { + drop(f); + if let Err(e) = tokio::fs::remove_file(&path).await { + error!("Error removing file: {e}"); + } + yield Progression::ProgressFailed { + name: attachment.name(), + last_size: Some(total_written), + error: Some(e.to_string()), + }; + return; + } + + total_written += data.len(); + + yield Progression::CurrentProgress { name: attachment.name(), - total: total_size, + current: total_written, + total: Some(total_size as _), }; }, - rust_ipfs::unixfs::UnixfsStatus::FailedStatus { written, error, .. } => { + Err(e) => { + drop(f); if let Err(e) = tokio::fs::remove_file(&path).await { error!("Error removing file: {e}"); } yield Progression::ProgressFailed { name: attachment.name(), - last_size: Some(written), - error: error.map(|e| e.to_string()), + last_size: Some(total_written), + error: Some(e.to_string()), }; - }, + return; + } } } + + yield Progression::ProgressComplete { + name: attachment.name(), + total: Some(total_size as _), + }; + }; Ok(ConstellationProgressStream(progress_stream.boxed()))