From 64f98036b6b8d14ce5f198803ee72180e96ceeae Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 6 Nov 2023 22:29:15 -0500 Subject: [PATCH] feat: Encrypt and decrypt files --- extensions/warp-ipfs/src/lib.rs | 2 +- extensions/warp-ipfs/src/store/files.rs | 220 +++++++++++++++++++++--- warp/src/constellation/mod.rs | 2 +- warp/src/crypto/cipher.rs | 10 +- 4 files changed, 207 insertions(+), 27 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 178da780d..fcccde621 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -1516,7 +1516,7 @@ impl Constellation for WarpIpfs { &mut self, name: &str, total_size: Option, - stream: BoxStream<'static, Vec>, + stream: BoxStream<'static, std::io::Result>>, ) -> Result { self.file_store()? .put_stream(name, total_size, stream) diff --git a/extensions/warp-ipfs/src/store/files.rs b/extensions/warp-ipfs/src/store/files.rs index d0371aea7..3aa72ea5a 100644 --- a/extensions/warp-ipfs/src/store/files.rs +++ b/extensions/warp-ipfs/src/store/files.rs @@ -17,6 +17,7 @@ use warp::{ constellation::{ directory::Directory, ConstellationEventKind, ConstellationProgressStream, Progression, }, + crypto::cipher::Cipher, error::Error, sync::RwLock, }; @@ -289,6 +290,7 @@ impl FileStore { } let ipfs = self.ipfs.clone(); + let key = self.ipfs.keypair().and_then(get_keypair_did)?; let path = PathBuf::from(path); if !path.is_file() { @@ -328,6 +330,18 @@ impl FileStore { let background = self.config.thumbnail_task; + let file = tokio::fs::File::open(&path).await?; + + let reader = ReaderStream::new(file).map(|result| result.map(|x| x.into())); + + let cipher = Cipher::new(); + let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?; + let reader = cipher + .encrypt_async_stream(reader) + .await? + .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed(); + let name = name.to_string(); let fs = self.clone(); let progress_stream = async_stream::stream! { @@ -337,7 +351,7 @@ impl FileStore { let mut total_written = 0; let mut returned_path = None; - let mut stream = ipfs.unixfs().add(path.clone(), Some(AddOption { pin: true, ..Default::default() })); + let mut stream = ipfs.unixfs().add(reader, Some(AddOption { pin: true, ..Default::default() })); while let Some(status) = stream.next().await { let name = name.clone(); @@ -386,6 +400,30 @@ impl FileStore { } }; + let cid = ipfs_path.root().cid().copied().expect("Cid to exist"); + + let root_file = ConstellationRootFileDag { + link: cid, + size: total_written as _, + key: encrypted_key, + }; + + let fut = async move { + ipfs.dag().put().serialize(root_file)?.pin(true).await.map(IpfsPath::from) + }; + + let ipfs_path = match fut.await { + Ok(path) => path, + Err(e) => { + yield Progression::ProgressFailed { + name, + last_size: Some(last_written), + error: Some(e.to_string()), + }; + return; + } + }; + let file = warp::constellation::file::File::new(&name); file.set_size(total_written); file.set_reference(&format!("{ipfs_path}")); @@ -446,18 +484,40 @@ impl FileStore { } pub async fn get(&self, name: &str, path: &str) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + let ipfs = self.ipfs.clone(); let item = self.current_directory()?.get_item_by_path(name)?; let file = item.get_file()?; let reference = file.reference().ok_or(Error::Other)?; //Reference not found - let mut stream = ipfs.get_unixfs(reference.parse::()?, path); + let root = ipfs + .get_dag(reference.parse::()?) + .local() + .deserialized::() + .await?; - while let Some(status) = stream.next().await { - if let UnixfsStatus::FailedStatus { error, .. } = status { - return Err(error.map(Error::Any).unwrap_or(Error::Other)); - } + let didkey = self.ipfs.keypair().and_then(get_keypair_did)?; + + let key = ecdh_decrypt(&didkey, None, root.key)?; + + let cipber = Cipher::from(key); + + let ipfs_path = IpfsPath::from(root.link); + + let stream = ipfs + .cat_unixfs(ipfs_path, None) + .map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed(); + + let mut stream = cipber.decrypt_async_stream(stream).await?.boxed(); + + let mut f = tokio::fs::File::create(path).await?; + + while let Some(data) = stream.try_next().await? { + f.write_all(&data).await?; + f.sync_all().await?; } //TODO: Validate file against the hashed reference @@ -508,8 +568,15 @@ impl FileStore { .insert_buffer(&name, buffer, width, height, exact) .await; - let reader = ReaderStream::new(std::io::Cursor::new(buffer)) - .map(|result| result.map(|x| x.into())) + let cipher = Cipher::new(); + + let reader = + ReaderStream::new(std::io::Cursor::new(buffer)).map(|result| result.map(|x| x.into())); + + let reader = cipher + .encrypt_async_stream(reader) + .await? + .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) .boxed(); let mut total_written = 0; @@ -538,9 +605,25 @@ impl FileStore { let ipfs_path = returned_path.ok_or_else(|| anyhow::anyhow!("Cid was never set"))?; + let key = self.ipfs.keypair().and_then(get_keypair_did)?; + + let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?; + + let cid = ipfs_path.root().cid().copied().expect("Cid to exist"); + + let root_file = ConstellationRootFileDag { + link: cid, + size: total_written as _, + key: encrypted_key, + }; + + let dag_cid = ipfs.dag().put().serialize(root_file)?.pin(true).await?; + + let new_path = IpfsPath::from(dag_cid); + let file = warp::constellation::file::File::new(name); file.set_size(total_written); - file.set_reference(&format!("{ipfs_path}")); + file.set_reference(&format!("{new_path}")); file.set_file_type(to_file_type(name)); file.hash_mut().hash_from_slice(buffer)?; @@ -566,20 +649,43 @@ impl FileStore { } pub async fn get_buffer(&self, name: &str) -> Result, Error> { + use futures::AsyncWriteExt; let ipfs = self.ipfs.clone(); let item = self.current_directory()?.get_item_by_path(name)?; let file = item.get_file()?; let reference = file.reference().ok_or(Error::Other)?; //Reference not found - let stream = ipfs.cat_unixfs(reference.parse::()?, None); - pin_mut!(stream); - let mut buffer = vec![]; - while let Some(data) = stream.next().await { - let mut bytes = data.map_err(anyhow::Error::from)?; - buffer.append(&mut bytes); + let root = ipfs + .get_dag(reference.parse::()?) + .local() + .deserialized::() + .await?; + + let didkey = self.ipfs.keypair().and_then(get_keypair_did)?; + + let key = ecdh_decrypt(&didkey, None, root.key)?; + + let cipber = Cipher::from(key); + + let ipfs_path = IpfsPath::from(root.link); + + let stream = ipfs + .cat_unixfs(ipfs_path, None) + .map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed(); + + let mut stream = cipber.decrypt_async_stream(stream).await?.boxed(); + + let mut f = futures::io::Cursor::new(vec![]); + + while let Some(data) = stream.try_next().await? { + f.write_all(&data).await?; + f.flush().await?; } + let buffer = f.into_inner(); + //TODO: Validate file against the hashed reference let _ = self .constellation_tx @@ -597,7 +703,7 @@ impl FileStore { &mut self, name: &str, total_size: Option, - stream: BoxStream<'static, Vec>, + stream: BoxStream<'static, std::io::Result>>, ) -> Result { let name = name.trim(); if name.len() < 2 || name.len() > 256 { @@ -620,11 +726,35 @@ impl FileStore { return Err(Error::FileExist); } + let key = self.ipfs.keypair().and_then(get_keypair_did)?; + let fs = self.clone(); let name = name.to_string(); - let stream = stream.map(Ok::<_, std::io::Error>).boxed(); + + let cipher = Cipher::new(); + + let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?; + let progress_stream = async_stream::stream! { + let stream = match cipher + .encrypt_async_stream(stream) + .await + .map(|s| { + s.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed() + }) { + Ok(s) => s, + Err(e) => { + yield Progression::ProgressFailed { + name, + last_size: None, + error: Some(e.to_string()) + }; + return; + } + }; + let mut last_written = 0; let mut total_written = 0; @@ -694,6 +824,30 @@ impl FileStore { } }; + let cid = ipfs_path.root().cid().copied().expect("Cid to exist"); + + let root_file = ConstellationRootFileDag { + link: cid, + size: total_written as _, + key: encrypted_key, + }; + + let fut = async move { + ipfs.dag().put().serialize(root_file)?.pin(true).await.map(IpfsPath::from) + }; + + let ipfs_path = match fut.await { + Ok(path) => path, + Err(e) => { + yield Progression::ProgressFailed { + name, + last_size: Some(last_written), + error: Some(e.to_string()), + }; + return; + } + }; + let file = warp::constellation::file::File::new(&name); file.set_size(total_written); file.set_reference(&format!("{ipfs_path}")); @@ -741,11 +895,30 @@ impl FileStore { let tx = self.constellation_tx.clone(); + let didkey = self.ipfs.keypair().and_then(get_keypair_did)?; + let stream = async_stream::stream! { - let cat_stream = ipfs - .cat_unixfs(reference.parse::()?, None); - for await data in cat_stream { + let root = ipfs + .get_dag(reference.parse::()?) + .local() + .deserialized::() + .await?; + + let key = ecdh_decrypt(&didkey, None, root.key)?; + + let cipber = Cipher::from(key); + + let ipfs_path = IpfsPath::from(root.link); + + let stream = ipfs + .cat_unixfs(ipfs_path, None) + .map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + .boxed(); + + let stream = cipber.decrypt_async_stream(stream).await?.boxed(); + + for await data in stream { match data { Ok(data) => yield Ok(data), Err(e) => yield Err(Error::from(anyhow::anyhow!("{e}"))), @@ -923,3 +1096,10 @@ impl FileStore { PathBuf::from(self.path.read().to_string_lossy().replace('\\', "/")) } } + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct ConstellationRootFileDag { + pub link: Cid, + pub size: u64, + pub key: Vec, +} diff --git a/warp/src/constellation/mod.rs b/warp/src/constellation/mod.rs index f79560522..3aa299d1e 100644 --- a/warp/src/constellation/mod.rs +++ b/warp/src/constellation/mod.rs @@ -198,7 +198,7 @@ pub trait Constellation: &mut self, _: &str, _: Option, - _: BoxStream<'static, Vec>, + _: BoxStream<'static, std::io::Result>>, ) -> Result { Err(Error::Unimplemented) } diff --git a/warp/src/crypto/cipher.rs b/warp/src/crypto/cipher.rs index 6d8bb1031..2bb2554a4 100644 --- a/warp/src/crypto/cipher.rs +++ b/warp/src/crypto/cipher.rs @@ -59,8 +59,8 @@ impl Cipher { } /// Returns the stored key - pub fn private_key(&self) -> Vec { - self.private_key.to_owned() + pub fn private_key(&self) -> &[u8] { + &self.private_key } /// Used to generate and encrypt data with a random key @@ -130,7 +130,7 @@ impl Cipher { stream: impl Stream, std::io::Error>> + Unpin + Send + 'a, ) -> Result>> + Send + 'a> { let cipher = Cipher::new(); - let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key()))); + let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key().to_vec()))); let cipher_stream = cipher.encrypt_async_stream(stream).await?; @@ -156,7 +156,7 @@ impl Cipher { reader: R, ) -> Result>> + Send + 'a> { let cipher = Cipher::new(); - let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key()))); + let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key().to_vec()))); let cipher_stream = cipher.encrypt_async_read_to_stream(reader).await?; let stream = key_stream.chain(cipher_stream); @@ -381,7 +381,7 @@ impl Cipher { /// Encrypts and embeds private key into writer stream pub fn self_encrypt_stream(reader: &mut impl Read, writer: &mut impl Write) -> Result<()> { let cipher = Cipher::new(); - writer.write_all(&cipher.private_key())?; + writer.write_all(cipher.private_key())?; cipher.encrypt_stream(reader, writer)?; Ok(()) }