Skip to content

Commit

Permalink
refactor: Move conversation functionality into its own task (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Oct 20, 2023
1 parent b6c69d6 commit 299da4a
Show file tree
Hide file tree
Showing 6 changed files with 710 additions and 774 deletions.
1 change: 0 additions & 1 deletion extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,6 @@ impl RayGun for WarpIpfs {
self.messaging_store()?
.get_conversation(conversation_id)
.await
.map(|convo| convo.into())
}

async fn list_conversations(&self) -> Result<Vec<Conversation>, Error> {
Expand Down
67 changes: 10 additions & 57 deletions extensions/warp-ipfs/src/store/conversation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use core::hash::Hash;
use futures::{
stream::{self, BoxStream, FuturesOrdered},
stream::{self, BoxStream},
StreamExt, TryFutureExt,
};
use libipld::{Cid, Ipld};
Expand Down Expand Up @@ -301,23 +301,8 @@ impl ConversationDocument {
ipfs: &Ipfs,
list: BTreeSet<MessageDocument>,
) -> Result<(), Error> {
let old_cid = self.messages;
let cid = list.to_cid(ipfs).await?;
if !ipfs.is_pinned(&cid).await? {
ipfs.insert_pin(&cid, false).await?;
}

if let Some(old_cid) = old_cid {
if old_cid != cid {
if ipfs.is_pinned(&old_cid).await? {
ipfs.remove_pin(&old_cid, false).await?;
}
ipfs.remove_block(old_cid).await?;
}
}

self.messages = Some(cid);

Ok(())
}

Expand Down Expand Up @@ -524,7 +509,7 @@ impl ConversationDocument {
.ok_or(Error::MessageNotFound)?;
messages.remove(&document);
self.set_message_list(ipfs, messages).await?;
document.remove(ipfs).await
Ok(())
}

pub async fn delete_all_message(&mut self, ipfs: Ipfs) -> Result<(), Error> {
Expand All @@ -538,27 +523,11 @@ impl ConversationDocument {
ipfs.remove_pin(&cid, false).await?;
}

let mut ids = vec![];

for document in messages {
if ipfs.is_pinned(&document.message).await? {
ipfs.remove_pin(&document.message, false).await?;
}
ids.push(document.message);
}

ipfs.remove_block(cid).await?;

//TODO: Replace with gc within ipfs (when completed) in the future
// so we dont need to manually delete the blocks
FuturesOrdered::from_iter(ids.iter().map(|cid| {
let ipfs = ipfs.clone();
async move { (ipfs, *cid) }
}))
.for_each_concurrent(None, |(ipfs, cid)| async move {
let _ = ipfs.remove_block(cid).await.ok();
})
.await;
Ok(())
}
}
Expand Down Expand Up @@ -632,10 +601,6 @@ impl MessageDocument {

let message = data.to_cid(ipfs).await?;

if !ipfs.is_pinned(&message).await? {
ipfs.insert_pin(&message, false).await?;
}

let sender = DIDEd25519Reference::from_did(&sender);

let document = MessageDocument {
Expand All @@ -649,15 +614,15 @@ impl MessageDocument {
Ok(document)
}

pub async fn remove(&self, ipfs: &Ipfs) -> Result<(), Error> {
let cid = self.message;
if ipfs.is_pinned(&cid).await? {
ipfs.remove_pin(&cid, false).await?;
}
ipfs.remove_block(cid).await?;
// pub async fn remove(&self, ipfs: &Ipfs) -> Result<(), Error> {
// let cid = self.message;
// if ipfs.is_pinned(&cid).await? {
// ipfs.remove_pin(&cid, false).await?;
// }
// ipfs.remove_block(cid).await?;

Ok(())
}
// Ok(())
// }

pub async fn update(
&mut self,
Expand All @@ -668,7 +633,6 @@ impl MessageDocument {
) -> Result<(), Error> {
info!("Updating message {} for {}", self.id, self.conversation_id);
let old_message = self.resolve(ipfs, did, keystore).await?;
let old_document = self.message;

if old_message.id() != message.id()
|| old_message.conversation_id() != message.conversation_id()
Expand All @@ -690,20 +654,9 @@ impl MessageDocument {

let message_cid = data.to_cid(ipfs).await?;

if !ipfs.is_pinned(&message_cid).await? {
ipfs.insert_pin(&message_cid, false).await?;
}

info!("Setting Message to document");
self.message = message_cid;
info!("Message is updated");
if old_document != message_cid {
if ipfs.is_pinned(&old_document).await? {
info!("Removing pin for {old_document}");
ipfs.remove_pin(&old_document, false).await?;
}
ipfs.remove_block(old_document).await?;
}
Ok(())
}

Expand Down
Loading

0 comments on commit 299da4a

Please sign in to comment.