Skip to content

Commit

Permalink
chore: Cleanup messaging (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Oct 16, 2023
1 parent efbc292 commit eeb7d14
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 61 deletions.
4 changes: 2 additions & 2 deletions extensions/warp-ipfs/src/store/keystore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ mod test {
let mut keystore = Keystore::default();

let keypair = DID::default();
let recipients = (0..100).map(|_| DID::default()).collect::<Vec<_>>();
let recipients = (0..10).map(|_| DID::default()).collect::<Vec<_>>();

for recipient in recipients.iter() {
for key in (0..100).map(|_| generate::<32>()) {
for key in (0..recipients.len()).map(|_| generate::<32>()) {
keystore.insert(&keypair, recipient, key)?;
}
}
Expand Down
116 changes: 57 additions & 59 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl MessageStore {
conversation_sender,
conversation_task_tx,
identity,
// friends,
discovery,
filesystem,
queue,
Expand All @@ -170,46 +169,59 @@ impl MessageStore {
};

info!("Loading existing conversations task");
if let Err(_e) = store.load_conversations().await {}
if let Err(e) = store.load_conversations().await {
warn!("Failed to load conversations: {e}");
}

info!("Loading queue");
if let Err(e) = store.load_queue().await {
warn!("Failed to load queue: {e}");
}

let mut stream = store
.ipfs
.pubsub_subscribe(store.did.messaging())
.await?
.boxed();

tokio::spawn({
let mut store = store.clone();
async move {
info!("MessagingStore task created");

tokio::spawn({
let store = store.clone();
async move {
info!("Loading queue");
// Load the queue in a separate task in case it is large
// Note: In the future this will not be needed once a req/res system
// is implemented
if let Err(_e) = store.load_queue().await {}
}
});

let did = &*(store.did.clone());
let Ok(stream) = store.ipfs.pubsub_subscribe(did.messaging()).await else {
error!("Unable to create subscription stream. Terminating task");
//TODO: Maybe panic?
return;
};
futures::pin_mut!(stream);
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));

loop {
tokio::select! {
message = stream.next() => {
if let Some(message) = message {
if let Ok(payload) = Payload::from_bytes(&message.data) {
if let Ok(data) = ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
if let Ok(events) = serde_json::from_slice::<ConversationEvents>(&data) {
if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}
}
}
Some(message) = stream.next() => {
let payload = match Payload::from_bytes(&message.data) {
Ok(payload) => payload,
Err(e) => {
tracing::log::warn!("Failed to parse payload data: {e}");
continue;
}
}
};

let data = match ecdh_decrypt(&store.did, Some(&payload.sender()), payload.data()) {
Ok(d) => d,
Err(e) => {
tracing::log::warn!("Failed to decrypt message from {}: {e}", payload.sender());
continue;
}
};

let events = match serde_json::from_slice::<ConversationEvents>(&data) {
Ok(ev) => ev,
Err(e) => {
tracing::log::warn!("Failed to parse message: {e}");
continue;
}
};

if let Err(e) = store.process_conversation(payload, events).await {
error!("Error processing conversation: {e}");
}

}
_ = interval.tick() => {
if let Err(e) = store.process_queue().await {
Expand Down Expand Up @@ -1463,17 +1475,11 @@ impl MessageStore {
return Ok(());
}

tokio::spawn({
let discovery = self.discovery.clone();
let recipients = list.clone();
async move {
for recipient in recipients {
if !discovery.contains(&recipient).await {
let _ = discovery.insert(recipient).await.ok();
}
}
for recipient in &list {
if !self.discovery.contains(recipient).await {
let _ = self.discovery.insert(recipient).await;
}
});
}

info!("Creating conversation");
let convo = ConversationDocument::new(
Expand Down Expand Up @@ -2171,10 +2177,7 @@ impl MessageStore {
}
}

let _ = self
.remove_conversation_keystore(conversation_id)
.await
.ok();
let _ = self.remove_conversation_keystore(conversation_id).await;

if let Err(e) = self
.event
Expand Down Expand Up @@ -2857,21 +2860,16 @@ impl MessageStore {
};

//We want to send the event to the recipients until the creator can remove them from the conversation directly
tokio::spawn({
let event = event.clone();
let mut store = self.clone();
let list = list.to_vec();
async move {
for did in list.iter() {
if let Err(_e) = store
.send_single_conversation_event(did, conversation_id, event.clone())
.await
{
//
}
}

for did in list.iter() {
if let Err(e) = self
.send_single_conversation_event(did, conversation_id, event.clone())
.await
{
tracing::log::error!("Error sending conversation event to {did}: {e}");
continue;
}
});
}

self.send_single_conversation_event(creator, conversation_id, event)
.await
Expand Down

0 comments on commit eeb7d14

Please sign in to comment.