Skip to content

Commit

Permalink
chore: Reduce spawned task and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Oct 16, 2023
1 parent efbc292 commit 3c6fbc2
Showing 1 changed file with 35 additions and 56 deletions.
91 changes: 35 additions & 56 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl MessageStore {
conversation_sender,
conversation_task_tx,
identity,
// friends,
discovery,
filesystem,
queue,
Expand All @@ -170,42 +169,36 @@ impl MessageStore {
};

info!("Loading existing conversations task");
if let Err(_e) = store.load_conversations().await {}
if let Err(e) = store.load_conversations().await {
warn!("Failed to load conversations: {e}");
}

info!("Loading queue");
if let Err(e) = store.load_queue().await {
warn!("Failed to load queue: {e}");
}

let mut stream = store
.ipfs
.pubsub_subscribe(store.did.messaging())
.await?
.boxed();

tokio::spawn({
let mut store = store.clone();
async move {
info!("MessagingStore task created");

tokio::spawn({
let store = store.clone();
async move {
info!("Loading queue");
// Load the queue in a separate task in case it is large
// Note: In the future this will not be needed once a req/res system
// is implemented
if let Err(_e) = store.load_queue().await {}
}
});

let did = &*(store.did.clone());
let Ok(stream) = store.ipfs.pubsub_subscribe(did.messaging()).await else {
error!("Unable to create subscription stream. Terminating task");
//TODO: Maybe panic?
return;
};
futures::pin_mut!(stream);
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));

loop {
tokio::select! {
message = stream.next() => {
if let Some(message) = message {
if let Ok(payload) = Payload::from_bytes(&message.data) {
if let Ok(data) = ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
if let Ok(events) = serde_json::from_slice::<ConversationEvents>(&data) {
if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}
Some(message) = stream.next() => {
if let Ok(payload) = Payload::from_bytes(&message.data) {
if let Ok(data) = ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
if let Ok(events) = serde_json::from_slice::<ConversationEvents>(&data) {
if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}
}
}
Expand Down Expand Up @@ -1463,17 +1456,11 @@ impl MessageStore {
return Ok(());
}

tokio::spawn({
let discovery = self.discovery.clone();
let recipients = list.clone();
async move {
for recipient in recipients {
if !discovery.contains(&recipient).await {
let _ = discovery.insert(recipient).await.ok();
}
}
for recipient in &list {
if !self.discovery.contains(recipient).await {
let _ = self.discovery.insert(recipient).await;
}
});
}

info!("Creating conversation");
let convo = ConversationDocument::new(
Expand Down Expand Up @@ -2171,10 +2158,7 @@ impl MessageStore {
}
}

let _ = self
.remove_conversation_keystore(conversation_id)
.await
.ok();
let _ = self.remove_conversation_keystore(conversation_id).await;

if let Err(e) = self
.event
Expand Down Expand Up @@ -2857,21 +2841,16 @@ impl MessageStore {
};

//We want to send the event to the recipients until the creator can remove them from the conversation directly
tokio::spawn({
let event = event.clone();
let mut store = self.clone();
let list = list.to_vec();
async move {
for did in list.iter() {
if let Err(_e) = store
.send_single_conversation_event(did, conversation_id, event.clone())
.await
{
//
}
}

for did in list.iter() {
if let Err(e) = self
.send_single_conversation_event(did, conversation_id, event.clone())
.await
{
tracing::log::error!("Error sending conversation event to {did}: {e}");
continue;
}
});
}

self.send_single_conversation_event(creator, conversation_id, event)
.await
Expand Down

0 comments on commit 3c6fbc2

Please sign in to comment.