Skip to content

Commit

Permalink
chore: Expand stream branch and log errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Oct 16, 2023
1 parent 3c6fbc2 commit ab0fd9a
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,34 @@ impl MessageStore {
loop {
tokio::select! {
Some(message) = stream.next() => {
if let Ok(payload) = Payload::from_bytes(&message.data) {
if let Ok(data) = ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
if let Ok(events) = serde_json::from_slice::<ConversationEvents>(&data) {
if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}
}
let payload = match Payload::from_bytes(&message.data) {
Ok(payload) => payload,
Err(e) => {
tracing::log::warn!("Failed to parse payload data: {e}");
continue;
}
}
};

let data = match ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
Ok(d) => d,
Err(e) => {
tracing::log::warn!("Failed to decrypt message from {}: {e}", payload.sender());
continue;
}
};

let events = match serde_json::from_slice::<ConversationEvents>(&data) {
Ok(ev) => ev,
Err(e) => {
tracing::log::warn!("Failed to parse message: {e}");
continue;
}
};

if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}

}
_ = interval.tick() => {
if let Err(e) = store.process_queue().await {
Expand Down

0 comments on commit ab0fd9a

Please sign in to comment.