Skip to content

Commit

Permalink
feat: Encrypt and decrypt files
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Nov 7, 2023
1 parent 021d570 commit 64f9803
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 27 deletions.
2 changes: 1 addition & 1 deletion extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ impl Constellation for WarpIpfs {
&mut self,
name: &str,
total_size: Option<usize>,
stream: BoxStream<'static, Vec<u8>>,
stream: BoxStream<'static, std::io::Result<Vec<u8>>>,
) -> Result<ConstellationProgressStream, Error> {
self.file_store()?
.put_stream(name, total_size, stream)
Expand Down
220 changes: 200 additions & 20 deletions extensions/warp-ipfs/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use warp::{
constellation::{
directory::Directory, ConstellationEventKind, ConstellationProgressStream, Progression,
},
crypto::cipher::Cipher,
error::Error,
sync::RwLock,
};
Expand Down Expand Up @@ -289,6 +290,7 @@ impl FileStore {
}

let ipfs = self.ipfs.clone();
let key = self.ipfs.keypair().and_then(get_keypair_did)?;

let path = PathBuf::from(path);
if !path.is_file() {
Expand Down Expand Up @@ -328,6 +330,18 @@ impl FileStore {

let background = self.config.thumbnail_task;

let file = tokio::fs::File::open(&path).await?;

let reader = ReaderStream::new(file).map(|result| result.map(|x| x.into()));

let cipher = Cipher::new();
let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?;
let reader = cipher
.encrypt_async_stream(reader)
.await?
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let name = name.to_string();
let fs = self.clone();
let progress_stream = async_stream::stream! {
Expand All @@ -337,7 +351,7 @@ impl FileStore {
let mut total_written = 0;
let mut returned_path = None;

let mut stream = ipfs.unixfs().add(path.clone(), Some(AddOption { pin: true, ..Default::default() }));
let mut stream = ipfs.unixfs().add(reader, Some(AddOption { pin: true, ..Default::default() }));

while let Some(status) = stream.next().await {
let name = name.clone();
Expand Down Expand Up @@ -386,6 +400,30 @@ impl FileStore {
}
};

let cid = ipfs_path.root().cid().copied().expect("Cid to exist");

let root_file = ConstellationRootFileDag {
link: cid,
size: total_written as _,
key: encrypted_key,
};

let fut = async move {
ipfs.dag().put().serialize(root_file)?.pin(true).await.map(IpfsPath::from)
};

let ipfs_path = match fut.await {
Ok(path) => path,
Err(e) => {
yield Progression::ProgressFailed {
name,
last_size: Some(last_written),
error: Some(e.to_string()),
};
return;
}
};

let file = warp::constellation::file::File::new(&name);
file.set_size(total_written);
file.set_reference(&format!("{ipfs_path}"));
Expand Down Expand Up @@ -446,18 +484,40 @@ impl FileStore {
}

pub async fn get(&self, name: &str, path: &str) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;

let ipfs = self.ipfs.clone();

let item = self.current_directory()?.get_item_by_path(name)?;
let file = item.get_file()?;
let reference = file.reference().ok_or(Error::Other)?; //Reference not found

let mut stream = ipfs.get_unixfs(reference.parse::<IpfsPath>()?, path);
let root = ipfs
.get_dag(reference.parse::<IpfsPath>()?)
.local()
.deserialized::<ConstellationRootFileDag>()
.await?;

while let Some(status) = stream.next().await {
if let UnixfsStatus::FailedStatus { error, .. } = status {
return Err(error.map(Error::Any).unwrap_or(Error::Other));
}
let didkey = self.ipfs.keypair().and_then(get_keypair_did)?;

let key = ecdh_decrypt(&didkey, None, root.key)?;

let cipber = Cipher::from(key);

let ipfs_path = IpfsPath::from(root.link);

let stream = ipfs
.cat_unixfs(ipfs_path, None)
.map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let mut stream = cipber.decrypt_async_stream(stream).await?.boxed();

let mut f = tokio::fs::File::create(path).await?;

while let Some(data) = stream.try_next().await? {
f.write_all(&data).await?;
f.sync_all().await?;
}

//TODO: Validate file against the hashed reference
Expand Down Expand Up @@ -508,8 +568,15 @@ impl FileStore {
.insert_buffer(&name, buffer, width, height, exact)
.await;

let reader = ReaderStream::new(std::io::Cursor::new(buffer))
.map(|result| result.map(|x| x.into()))
let cipher = Cipher::new();

let reader =
ReaderStream::new(std::io::Cursor::new(buffer)).map(|result| result.map(|x| x.into()));

let reader = cipher
.encrypt_async_stream(reader)
.await?
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let mut total_written = 0;
Expand Down Expand Up @@ -538,9 +605,25 @@ impl FileStore {

let ipfs_path = returned_path.ok_or_else(|| anyhow::anyhow!("Cid was never set"))?;

let key = self.ipfs.keypair().and_then(get_keypair_did)?;

let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?;

let cid = ipfs_path.root().cid().copied().expect("Cid to exist");

let root_file = ConstellationRootFileDag {
link: cid,
size: total_written as _,
key: encrypted_key,
};

let dag_cid = ipfs.dag().put().serialize(root_file)?.pin(true).await?;

let new_path = IpfsPath::from(dag_cid);

let file = warp::constellation::file::File::new(name);
file.set_size(total_written);
file.set_reference(&format!("{ipfs_path}"));
file.set_reference(&format!("{new_path}"));
file.set_file_type(to_file_type(name));
file.hash_mut().hash_from_slice(buffer)?;

Expand All @@ -566,20 +649,43 @@ impl FileStore {
}

pub async fn get_buffer(&self, name: &str) -> Result<Vec<u8>, Error> {
use futures::AsyncWriteExt;
let ipfs = self.ipfs.clone();

let item = self.current_directory()?.get_item_by_path(name)?;
let file = item.get_file()?;
let reference = file.reference().ok_or(Error::Other)?; //Reference not found
let stream = ipfs.cat_unixfs(reference.parse::<IpfsPath>()?, None);
pin_mut!(stream);

let mut buffer = vec![];
while let Some(data) = stream.next().await {
let mut bytes = data.map_err(anyhow::Error::from)?;
buffer.append(&mut bytes);
let root = ipfs
.get_dag(reference.parse::<IpfsPath>()?)
.local()
.deserialized::<ConstellationRootFileDag>()
.await?;

let didkey = self.ipfs.keypair().and_then(get_keypair_did)?;

let key = ecdh_decrypt(&didkey, None, root.key)?;

let cipber = Cipher::from(key);

let ipfs_path = IpfsPath::from(root.link);

let stream = ipfs
.cat_unixfs(ipfs_path, None)
.map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let mut stream = cipber.decrypt_async_stream(stream).await?.boxed();

let mut f = futures::io::Cursor::new(vec![]);

while let Some(data) = stream.try_next().await? {
f.write_all(&data).await?;
f.flush().await?;
}

let buffer = f.into_inner();

//TODO: Validate file against the hashed reference
let _ = self
.constellation_tx
Expand All @@ -597,7 +703,7 @@ impl FileStore {
&mut self,
name: &str,
total_size: Option<usize>,
stream: BoxStream<'static, Vec<u8>>,
stream: BoxStream<'static, std::io::Result<Vec<u8>>>,
) -> Result<ConstellationProgressStream, Error> {
let name = name.trim();
if name.len() < 2 || name.len() > 256 {
Expand All @@ -620,11 +726,35 @@ impl FileStore {
return Err(Error::FileExist);
}

let key = self.ipfs.keypair().and_then(get_keypair_did)?;

let fs = self.clone();
let name = name.to_string();
let stream = stream.map(Ok::<_, std::io::Error>).boxed();

let cipher = Cipher::new();

let encrypted_key = ecdh_encrypt(&key, None, cipher.private_key())?;

let progress_stream = async_stream::stream! {

let stream = match cipher
.encrypt_async_stream(stream)
.await
.map(|s| {
s.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed()
}) {
Ok(s) => s,
Err(e) => {
yield Progression::ProgressFailed {
name,
last_size: None,
error: Some(e.to_string())
};
return;
}
};

let mut last_written = 0;

let mut total_written = 0;
Expand Down Expand Up @@ -694,6 +824,30 @@ impl FileStore {
}
};

let cid = ipfs_path.root().cid().copied().expect("Cid to exist");

let root_file = ConstellationRootFileDag {
link: cid,
size: total_written as _,
key: encrypted_key,
};

let fut = async move {
ipfs.dag().put().serialize(root_file)?.pin(true).await.map(IpfsPath::from)
};

let ipfs_path = match fut.await {
Ok(path) => path,
Err(e) => {
yield Progression::ProgressFailed {
name,
last_size: Some(last_written),
error: Some(e.to_string()),
};
return;
}
};

let file = warp::constellation::file::File::new(&name);
file.set_size(total_written);
file.set_reference(&format!("{ipfs_path}"));
Expand Down Expand Up @@ -741,11 +895,30 @@ impl FileStore {

let tx = self.constellation_tx.clone();

let didkey = self.ipfs.keypair().and_then(get_keypair_did)?;

let stream = async_stream::stream! {
let cat_stream = ipfs
.cat_unixfs(reference.parse::<IpfsPath>()?, None);

for await data in cat_stream {
let root = ipfs
.get_dag(reference.parse::<IpfsPath>()?)
.local()
.deserialized::<ConstellationRootFileDag>()
.await?;

let key = ecdh_decrypt(&didkey, None, root.key)?;

let cipber = Cipher::from(key);

let ipfs_path = IpfsPath::from(root.link);

let stream = ipfs
.cat_unixfs(ipfs_path, None)
.map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let stream = cipber.decrypt_async_stream(stream).await?.boxed();

for await data in stream {
match data {
Ok(data) => yield Ok(data),
Err(e) => yield Err(Error::from(anyhow::anyhow!("{e}"))),
Expand Down Expand Up @@ -923,3 +1096,10 @@ impl FileStore {
PathBuf::from(self.path.read().to_string_lossy().replace('\\', "/"))
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ConstellationRootFileDag {
pub link: Cid,
pub size: u64,
pub key: Vec<u8>,
}
2 changes: 1 addition & 1 deletion warp/src/constellation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub trait Constellation:
&mut self,
_: &str,
_: Option<usize>,
_: BoxStream<'static, Vec<u8>>,
_: BoxStream<'static, std::io::Result<Vec<u8>>>,
) -> Result<ConstellationProgressStream, Error> {
Err(Error::Unimplemented)
}
Expand Down
10 changes: 5 additions & 5 deletions warp/src/crypto/cipher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ impl Cipher {
}

/// Returns the stored key
pub fn private_key(&self) -> Vec<u8> {
self.private_key.to_owned()
pub fn private_key(&self) -> &[u8] {
&self.private_key
}

/// Used to generate and encrypt data with a random key
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Cipher {
stream: impl Stream<Item = std::result::Result<Vec<u8>, std::io::Error>> + Unpin + Send + 'a,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + Send + 'a> {
let cipher = Cipher::new();
let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key())));
let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key().to_vec())));

let cipher_stream = cipher.encrypt_async_stream(stream).await?;

Expand All @@ -156,7 +156,7 @@ impl Cipher {
reader: R,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + Send + 'a> {
let cipher = Cipher::new();
let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key())));
let key_stream = stream::iter(Ok::<_, Error>(Ok(cipher.private_key().to_vec())));

let cipher_stream = cipher.encrypt_async_read_to_stream(reader).await?;
let stream = key_stream.chain(cipher_stream);
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Cipher {
/// Encrypts and embeds private key into writer stream
pub fn self_encrypt_stream(reader: &mut impl Read, writer: &mut impl Write) -> Result<()> {
let cipher = Cipher::new();
writer.write_all(&cipher.private_key())?;
writer.write_all(cipher.private_key())?;
cipher.encrypt_stream(reader, writer)?;
Ok(())
}
Expand Down

0 comments on commit 64f9803

Please sign in to comment.