Skip to content

Commit

Permalink
chore: Decrypt attachments
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Nov 7, 2023
1 parent 64f9803 commit 5f24c79
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 19 deletions.
22 changes: 21 additions & 1 deletion extensions/warp-ipfs/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use warp::{
constellation::{
directory::Directory, ConstellationEventKind, ConstellationProgressStream, Progression,
},
crypto::cipher::Cipher,
crypto::{cipher::Cipher, DID},
error::Error,
sync::RwLock,
};
Expand Down Expand Up @@ -1103,3 +1103,23 @@ pub struct ConstellationRootFileDag {
pub size: u64,
pub key: Vec<u8>,
}

impl ConstellationRootFileDag {
pub fn assign_with_did(mut self, keypair: &DID, did: &DID) -> Result<Self, Error> {
let key = ecdh_decrypt(keypair, None, &self.key)?;
let new_key = ecdh_encrypt(keypair, Some(did), key)?;
self.key = new_key;
Ok(self)
}

pub fn assign_with_key(
mut self,
keypair: &DID,
cipher_key: impl AsRef<[u8]>,
) -> Result<Self, Error> {
let key = ecdh_decrypt(keypair, None, &self.key)?;
let new_key = Cipher::direct_encrypt(&key, cipher_key.as_ref())?;
self.key = new_key;
Ok(self)
}
}
198 changes: 180 additions & 18 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};

use chrono::Utc;
use either::Either;
use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::channel::oneshot::{self, Sender as OneshotSender};
use futures::stream::SelectAll;
Expand Down Expand Up @@ -41,6 +42,7 @@ use crate::store::{
use super::conversation::{ConversationDocument, MessageDocument};
use super::discovery::Discovery;
use super::document::conversation::Conversations;
use super::files::ConstellationRootFileDag;
use super::identity::IdentityStore;
use super::keystore::Keystore;
use super::{verify_serde_sig, ConversationEvents, MessagingEvents};
Expand Down Expand Up @@ -2905,8 +2907,32 @@ impl MessageStore {
return Err(Error::NoAttachments);
}

let ipfs = self.ipfs.clone();

let store = self.clone();

let own_did = self.did.clone();

let chat_key = match conversation.conversation_type {
ConversationType::Direct => {
let recipient = conversation
.recipients()
.iter()
.filter(|did| (*own_did).ne(did))
.cloned()
.collect::<Vec<_>>()
.first()
.cloned()
.ok_or(Error::InvalidConversation)?;
Either::Left(recipient)
}
ConversationType::Group => {
let keystore = self.conversation_keystore(conversation.id()).await?;
let key = keystore.get_latest(&own_did, &own_did)?;
Either::Right(key)
}
};

let stream = async_stream::stream! {
let mut in_stack = vec![];

Expand Down Expand Up @@ -3032,6 +3058,8 @@ impl MessageStore {
};
}

let chat_key = chat_key.clone();

for await (progress, file) in streams {
yield AttachmentKind::AttachedProgress(progress);
if let Some(file) = file {
Expand All @@ -3047,9 +3075,54 @@ impl MessageStore {
new_file.set_thumbnail(&thumbnail);
total_thumbnail_size += thumbnail.len();
}

let reference = match file.reference()
.ok_or(Error::Other)
.and_then(|r| r.parse::<IpfsPath>().map_err(anyhow::Error::from).map_err(Error::from)) {
Ok(r) => r,
Err(e) => {
yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) });
continue;
}
};

let dag = match ipfs.get_dag(reference)
.local()
.deserialized::<ConstellationRootFileDag>()
.await {
Ok(d) => d,
Err(e) => {
yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) });
continue;
}
};

let keypair = own_did.clone();

let result = match chat_key.as_ref() {
Either::Left(recipient) => dag.assign_with_did(&keypair, recipient),
Either::Right(key) => dag.assign_with_key(&keypair, key),
};


let ipfs = ipfs.clone();

let fut = async move {
let dag = result?;
ipfs.dag().put().pin(false).serialize(dag)?.await.map(IpfsPath::from)
};

let ipfs_path = match fut.await {
Ok(path) => path,
Err(e) => {
yield AttachmentKind::AttachedProgress(Progression::ProgressFailed { name: file.name(), last_size: Some(file.size()), error: Some(e.to_string()) });
continue;
}
};

new_file.set_size(file.size());
new_file.set_hash(file.hash());
new_file.set_reference(&file.reference().unwrap_or_default());
new_file.set_reference(&ipfs_path.to_string());
attachments.push(new_file);
}
}
Expand Down Expand Up @@ -3105,18 +3178,24 @@ impl MessageStore {

pub async fn download(
&self,
conversation: Uuid,
conversation_id: Uuid,
message_id: Uuid,
file: &str,
path: PathBuf,
_: bool,
) -> Result<ConstellationProgressStream, Error> {
use tokio::io::AsyncWriteExt;

let constellation = self
.filesystem
.clone()
.ok_or(Error::ConstellationExtensionUnavailable)?;

let message = self.get_message(conversation, message_id).await?;
let conv = self.get_conversation(conversation_id).await?;

let message = self.get_message(conversation_id, message_id).await?;

let own_did = &*self.did;

if message.message_type() != MessageType::Attachment {
return Err(Error::InvalidMessage);
Expand All @@ -3137,43 +3216,126 @@ impl MessageStore {
.ok_or(Error::FileNotFound)?;

let ipfs = self.ipfs.clone();
let _constellation = constellation.clone();

let root = ipfs
.get_dag(reference)
.deserialized::<ConstellationRootFileDag>()
.await?;

let key = match conv.conversation_type() {
ConversationType::Direct => {
let recipient = conv
.recipients()
.iter()
.filter(|did| (*own_did).ne(did))
.cloned()
.collect::<Vec<_>>()
.first()
.cloned()
.ok_or(Error::InvalidConversation)?;
ecdh_decrypt(own_did, Some(&recipient), &root.key)?
}
ConversationType::Group => {
let keystore = self.conversation_keystore(conversation_id).await?;
let key = keystore.get_latest(own_did, &message.sender())?;
Cipher::direct_decrypt(&root.key, &key)?
}
};

let cipher = Cipher::from(key);

let link_ref = IpfsPath::from(root.link);

let total_size = root.size;

let progress_stream = async_stream::stream! {
yield Progression::CurrentProgress {
name: attachment.name(),
current: 0,
total: Some(attachment.size()),
};

let stream = ipfs.get_unixfs(reference, &path);
let mut f = match tokio::fs::File::create(&path).await {
Ok(f) => f,
Err(e) => {
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: None,
error: Some(e.to_string()),
};
return;
}
};

let stream = ipfs.cat_unixfs(link_ref, None)
.map(|s| s.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
.boxed();

let stream = match cipher.decrypt_async_stream(stream).await.map(|s| s.boxed()) {
Ok(s) => s,
Err(e) => {
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: None,
error: Some(e.to_string()),
};
return;
}
};


let mut total_written = 0;

for await event in stream {
match event {
rust_ipfs::unixfs::UnixfsStatus::ProgressStatus { written, total_size } => {
yield Progression::CurrentProgress {
name: attachment.name(),
current: written,
total: total_size
Ok(data) => {
let fut = async {
f.write_all(&data).await?;
f.sync_all().await?;
Ok::<_, std::io::Error>(())
};
},
rust_ipfs::unixfs::UnixfsStatus::CompletedStatus { total_size, .. } => {
yield Progression::ProgressComplete {

if let Err(e) = fut.await {
drop(f);
if let Err(e) = tokio::fs::remove_file(&path).await {
error!("Error removing file: {e}");
}
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: Some(total_written),
error: Some(e.to_string()),
};
return;
}

total_written += data.len();

yield Progression::CurrentProgress {
name: attachment.name(),
total: total_size,
current: total_written,
total: Some(total_size as _),
};
},
rust_ipfs::unixfs::UnixfsStatus::FailedStatus { written, error, .. } => {
Err(e) => {
drop(f);
if let Err(e) = tokio::fs::remove_file(&path).await {
error!("Error removing file: {e}");
}
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: Some(written),
error: error.map(|e| e.to_string()),
last_size: Some(total_written),
error: Some(e.to_string()),
};
},
return;
}
}
}

yield Progression::ProgressComplete {
name: attachment.name(),
total: Some(total_size as _),
};

};

Ok(ConstellationProgressStream(progress_stream.boxed()))
Expand Down

0 comments on commit 5f24c79

Please sign in to comment.