From f57b4cb7e2071fd27a087f0309c2850a1cdec8a5 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 24 Jun 2024 13:30:45 -0400 Subject: [PATCH 1/2] refactor: Change MultiPass::get_identity to return a stream (#554) --- .../warp-ipfs/examples/identity-interface.rs | 177 +++++----- extensions/warp-ipfs/examples/ipfs-friends.rs | 21 +- extensions/warp-ipfs/examples/messenger.rs | 70 +--- .../examples/wasm-ipfs-friends/src/lib.rs | 20 +- extensions/warp-ipfs/src/lib.rs | 53 +-- extensions/warp-ipfs/src/store/identity.rs | 310 ++++++++---------- extensions/warp-ipfs/tests/accounts.rs | 14 +- tools/inspect/src/main.rs | 16 +- warp/src/js_exports/multipass.rs | 17 +- warp/src/multipass/identity.rs | 6 + warp/src/multipass/mod.rs | 60 +++- 11 files changed, 367 insertions(+), 397 deletions(-) diff --git a/extensions/warp-ipfs/examples/identity-interface.rs b/extensions/warp-ipfs/examples/identity-interface.rs index 2677b3ef3..411d92374 100644 --- a/extensions/warp-ipfs/examples/identity-interface.rs +++ b/extensions/warp-ipfs/examples/identity-interface.rs @@ -210,10 +210,9 @@ async fn main() -> anyhow::Result<()> { warp::multipass::MultiPassEventKind::FriendRequestReceived { from: did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + if !opt.autoaccept_friend { writeln!(stdout, "> Pending request from {username}. Do \"request accept {did}\" to accept.")?; } else { @@ -223,139 +222,116 @@ async fn main() -> anyhow::Result<()> { warp::multipass::MultiPassEventKind::FriendRequestSent { to: did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> A request has been sent to {username}. Do \"request close {did}\" to if you wish to close the request")?; } warp::multipass::MultiPassEventKind::IncomingFriendRequestRejected { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> You've rejected {username} request")?; }, warp::multipass::MultiPassEventKind::OutgoingFriendRequestRejected { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} rejected your request")?; }, warp::multipass::MultiPassEventKind::IncomingFriendRequestClosed { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has retracted their request")?; }, warp::multipass::MultiPassEventKind::OutgoingFriendRequestClosed { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + writeln!(stdout, "> Request for {username} has been retracted")?; }, warp::multipass::MultiPassEventKind::FriendAdded { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> You are now friends with {username}")?; }, warp::multipass::MultiPassEventKind::FriendRemoved { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has been removed from friends list")?; }, warp::multipass::MultiPassEventKind::IdentityOnline { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has came online")?; }, warp::multipass::MultiPassEventKind::IdentityOffline { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} went offline")?; }, warp::multipass::MultiPassEventKind::Blocked { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} was blocked")?; }, warp::multipass::MultiPassEventKind::Unblocked { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + writeln!(stdout, "> {username} was unblocked")?; }, warp::multipass::MultiPassEventKind::UnblockedBy { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + writeln!(stdout, "> {username} unblocked you")?; }, warp::multipass::MultiPassEventKind::BlockedBy { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + writeln!(stdout, "> {username} blocked you")?; }, warp::multipass::MultiPassEventKind::IdentityUpdate { did } => { let username = account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); + writeln!(stdout, "> {username} has been updated ")?; } } @@ -392,7 +368,7 @@ async fn main() -> anyhow::Result<()> { }; for friend in friends.iter() { let username = match account.get_identity(Identifier::did_key(friend.clone())).await { - Ok(idents) => idents.iter().filter(|ident| ident.did_key().eq(friend)).map(|ident| ident.username()).collect::>().first().cloned().unwrap_or_default(), + Ok(ident) => ident.username(), Err(_) => String::from("N/A") }; table.add_row(vec![ @@ -414,7 +390,7 @@ async fn main() -> anyhow::Result<()> { }; for item in block_list.iter() { let username = match account.get_identity(Identifier::did_key(item.clone())).await { - Ok(idents) => idents.iter().filter(|ident| ident.did_key().eq(item)).map(|ident| ident.username()).collect::>().first().cloned().unwrap_or_default(), + Ok(ident) => ident.username(), Err(_) => String::from("N/A") }; table.add_row(vec![ @@ -621,8 +597,8 @@ async fn main() -> anyhow::Result<()> { } }; for request in list.iter() { - let username = match account.get_identity(Identifier::did_key(request.clone())).await { - Ok(idents) => idents.iter().filter(|ident| ident.did_key().eq(request)).map(|ident| ident.username()).collect::>().first().cloned().unwrap_or(String::from("N/A")), + let username = match account.get_identity(Identifier::did_key(request.clone())).await { + Ok(ident) => ident.username(), Err(_) => String::from("N/A") }; table.add_row(vec![ @@ -644,7 +620,7 @@ async fn main() -> anyhow::Result<()> { }; for request in list.iter() { let username = match account.get_identity(Identifier::did_key(request.clone())).await { - Ok(idents) => idents.iter().filter(|ident| ident.did_key().eq(request)).map(|ident| ident.username()).collect::>().first().cloned().unwrap_or(String::from("N/A")), + Ok(ident) => ident.username(), Err(_) => String::from("N/A") }; table.add_row(vec![ @@ -717,7 +693,7 @@ async fn main() -> anyhow::Result<()> { writeln!(stdout, "banner updated")?; }, Some("lookup") => { - let idents = match cmd_line.next() { + let id = match cmd_line.next() { Some("username") => { let username = match cmd_line.next() { Some(username) => username, @@ -726,13 +702,7 @@ async fn main() -> anyhow::Result<()> { continue; } }; - match account.get_identity(Identifier::user_name(username)).await { - Ok(identity) => identity, - Err(e) => { - writeln!(stdout, "Error obtaining identity by username: {e}")?; - continue; - } - } + Identifier::user_name(username) }, Some("publickey") | Some("public-key") | Some("didkey") | Some("did-key") | Some("did") => { let mut keys = vec![]; @@ -746,51 +716,76 @@ async fn main() -> anyhow::Result<()> { }; keys.push(pk); } - match account.get_identity(Identifier::did_keys(keys)).await { - Ok(identity) => identity, - Err(e) => { - writeln!(stdout, "Error obtaining identity by public key: {e}")?; - continue; - } - } + Identifier::did_keys(keys) }, Some("own") | None => { - match account.identity().await { - Ok(identity) => vec![identity], + let identity = match account.identity().await { + Ok(identity) => identity, Err(e) => { writeln!(stdout, "Error obtaining own identity: {e}")?; continue; } - } + }; + + let mut table = Table::new(); + table.set_header(vec!["Username", "Public Key", "Created", "Last Updated", "Status Message", "Banner", "Picture", "Platform", "Status"]); + let status = account.identity_status(&identity.did_key()).await.unwrap_or(IdentityStatus::Offline); + let platform = account.identity_platform(&identity.did_key()).await.unwrap_or_default(); + let profile_picture = account.identity_picture(&identity.did_key()).await.unwrap_or_default(); + let profile_banner = account.identity_banner(&identity.did_key()).await.unwrap_or_default(); + let created = identity.created(); + let modified = identity.modified(); + + table.add_row(vec![ + identity.username(), + identity.did_key().to_string(), + created.to_string(), + modified.to_string(), + identity.status_message().unwrap_or_default(), + (!profile_banner.data().is_empty()).to_string(), + (!profile_picture.data().is_empty()).to_string(), + platform.to_string(), + format!("{status:?}"), + ]); + writeln!(stdout, "{table}")?; + continue; }, _ => { writeln!(stdout, "/lookup [username | publickey ...]")?; continue } }; - let mut table = Table::new(); - table.set_header(vec!["Username", "Public Key", "Created", "Last Updated", "Status Message", "Banner", "Picture", "Platform", "Status"]); - for identity in idents { - let status = account.identity_status(&identity.did_key()).await.unwrap_or(IdentityStatus::Offline); - let platform = account.identity_platform(&identity.did_key()).await.unwrap_or_default(); - let profile_picture = account.identity_picture(&identity.did_key()).await.unwrap_or_default(); - let profile_banner = account.identity_banner(&identity.did_key()).await.unwrap_or_default(); - let created = identity.created(); - let modified = identity.modified(); - - table.add_row(vec![ - identity.username(), - identity.did_key().to_string(), - created.to_string(), - modified.to_string(), - identity.status_message().unwrap_or_default(), - (!profile_banner.data().is_empty()).to_string(), - (!profile_picture.data().is_empty()).to_string(), - platform.to_string(), - format!("{status:?}"), - ]); - } - writeln!(stdout, "{table}")?; + let stream = account.get_identity(id); + tokio::spawn({ + let mut stdout = stdout.clone(); + let account = account.clone(); + async move { + let idents = stream.collect::>().await; + let mut table = Table::new(); + table.set_header(vec!["Username", "Public Key", "Created", "Last Updated", "Status Message", "Banner", "Picture", "Platform", "Status"]); + for identity in idents { + let status = account.identity_status(&identity.did_key()).await.unwrap_or(IdentityStatus::Offline); + let platform = account.identity_platform(&identity.did_key()).await.unwrap_or_default(); + let profile_picture = account.identity_picture(&identity.did_key()).await.unwrap_or_default(); + let profile_banner = account.identity_banner(&identity.did_key()).await.unwrap_or_default(); + let created = identity.created(); + let modified = identity.modified(); + + table.add_row(vec![ + identity.username(), + identity.did_key().to_string(), + created.to_string(), + modified.to_string(), + identity.status_message().unwrap_or_default(), + (!profile_banner.data().is_empty()).to_string(), + (!profile_picture.data().is_empty()).to_string(), + platform.to_string(), + format!("{status:?}"), + ]); + } + writeln!(stdout, "{table}").unwrap(); + } + }); } _ => continue } diff --git a/extensions/warp-ipfs/examples/ipfs-friends.rs b/extensions/warp-ipfs/examples/ipfs-friends.rs index 4f169b627..4c64f081f 100644 --- a/extensions/warp-ipfs/examples/ipfs-friends.rs +++ b/extensions/warp-ipfs/examples/ipfs-friends.rs @@ -3,7 +3,6 @@ use std::time::Duration; use futures::StreamExt; use warp::crypto::rand::{self, prelude::*}; -use warp::error::Error; use warp::multipass::identity::{Identifier, Identity}; use warp::multipass::{MultiPass, MultiPassEventKind}; use warp_ipfs::config::Config; @@ -77,20 +76,14 @@ async fn main() -> anyhow::Result<()> { println!("{} Outgoing request:", username(&ident_a)); for outgoing in account_a.list_outgoing_request().await? { - let ident = account_a - .get_identity(Identifier::from(outgoing)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let ident = account_a.get_identity(Identifier::from(outgoing)).await?; println!("To: {}", username(&ident)); println!(); } println!("{} Incoming request:", username(&ident_b)); for incoming in account_b.list_incoming_request().await? { - let ident = account_b - .get_identity(Identifier::from(incoming)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let ident = account_b.get_identity(Identifier::from(incoming)).await?; println!("From: {}", username(&ident)); println!(); @@ -115,10 +108,7 @@ async fn main() -> anyhow::Result<()> { println!("{} Friends:", username(&ident_a)); for friend in account_a.list_friends().await? { - let friend = account_a - .get_identity(Identifier::did_key(friend)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let friend = account_a.get_identity(Identifier::did_key(friend)).await?; println!("Username: {}", username(&friend)); println!("Public Key: {}", friend.did_key()); println!(); @@ -126,10 +116,7 @@ async fn main() -> anyhow::Result<()> { println!("{} Friends:", username(&ident_b)); for friend in account_b.list_friends().await? { - let friend = account_b - .get_identity(Identifier::did_key(friend)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let friend = account_b.get_identity(Identifier::did_key(friend)).await?; println!("Username: {}", username(&friend)); println!("Public Key: {}", friend.did_key()); println!(); diff --git a/extensions/warp-ipfs/examples/messenger.rs b/extensions/warp-ipfs/examples/messenger.rs index c1514f0f8..d6fb586c3 100644 --- a/extensions/warp-ipfs/examples/messenger.rs +++ b/extensions/warp-ipfs/examples/messenger.rs @@ -271,10 +271,8 @@ async fn main() -> anyhow::Result<()> { warp::multipass::MultiPassEventKind::FriendRequestReceived { from: did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); if !opt.autoaccept_friend { writeln!(stdout, "> Pending request from {username}. Do \"/accept-request {did}\" to accept.")?; } else { @@ -284,129 +282,103 @@ async fn main() -> anyhow::Result<()> { warp::multipass::MultiPassEventKind::FriendRequestSent { to: did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> A request has been sent to {username}. Do \"/close-request {did}\" to if you wish to close the request")?; } warp::multipass::MultiPassEventKind::IncomingFriendRequestRejected { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> You've rejected {username} request")?; }, warp::multipass::MultiPassEventKind::OutgoingFriendRequestRejected { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} rejected your request")?; }, warp::multipass::MultiPassEventKind::IncomingFriendRequestClosed { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has retracted their request")?; }, warp::multipass::MultiPassEventKind::OutgoingFriendRequestClosed { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> Request for {username} has been retracted")?; }, warp::multipass::MultiPassEventKind::FriendAdded { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> You are now friends with {username}")?; }, warp::multipass::MultiPassEventKind::FriendRemoved { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has been removed from friends list")?; }, warp::multipass::MultiPassEventKind::IdentityOnline { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} has came online")?; }, warp::multipass::MultiPassEventKind::IdentityOffline { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} went offline")?; }, warp::multipass::MultiPassEventKind::Blocked { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} was blocked")?; }, warp::multipass::MultiPassEventKind::Unblocked { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} was unblocked")?; }, warp::multipass::MultiPassEventKind::UnblockedBy { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} unblocked you")?; }, warp::multipass::MultiPassEventKind::BlockedBy { did } => { let username = new_account .get_identity(Identifier::did_key(did.clone())).await - .ok() - .and_then(|list| list.first().cloned()) .map(|ident| ident.username()) - .unwrap_or_else(|| did.to_string()); + .unwrap_or_else(|_| did.to_string()); writeln!(stdout, "> {username} blocked you")?; }, @@ -962,13 +934,7 @@ async fn main() -> anyhow::Result<()> { } }, Lookup(id) => { - let identities = match new_account.get_identity(id).await { - Ok(identity) => identity, - Err(e) => { - writeln!(stdout, "Error obtaining own identity: {e}")?; - continue; - } - }; + let identities = new_account.get_identity(id).collect::>().await; let mut table = Table::new(); table.set_header(vec!["Username", "Public Key", "Created", "Last Updated", "Status Message", "Banner", "Picture", "Platform", "Status"]); for identity in identities { @@ -1010,11 +976,7 @@ async fn get_username(account: &dyn MultiPass, did: DID) -> String { account .get_identity(Identifier::did_key(did.clone())) .await - .map(|list| { - list.first() - .map(|identity| format!("{}#{}", identity.username(), identity.short_id())) - .unwrap_or(did.to_string()) - }) + .map(|id| format!("{}#{}", id.username(), id.short_id())) .unwrap_or(did.to_string()) } diff --git a/extensions/warp-ipfs/examples/wasm-ipfs-friends/src/lib.rs b/extensions/warp-ipfs/examples/wasm-ipfs-friends/src/lib.rs index 50e545166..4433726e9 100644 --- a/extensions/warp-ipfs/examples/wasm-ipfs-friends/src/lib.rs +++ b/extensions/warp-ipfs/examples/wasm-ipfs-friends/src/lib.rs @@ -93,20 +93,14 @@ pub async fn run() -> Result<(), JsError> { body.append_p(&format!("{} Outgoing request:", username(&ident_a)))?; for outgoing in account_a.list_outgoing_request().await? { - let ident = account_a - .get_identity(Identifier::from(outgoing)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let ident = account_a.get_identity(Identifier::from(outgoing)).await?; body.append_p(&format!("To: {}", username(&ident)))?; body.append_p("")?; } body.append_p(&format!("{} Incoming request:", username(&ident_b)))?; for incoming in account_b.list_incoming_request().await? { - let ident = account_b - .get_identity(Identifier::from(incoming)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let ident = account_b.get_identity(Identifier::from(incoming)).await?; body.append_p(&format!("From: {}", username(&ident)))?; body.append_p("")?; @@ -131,10 +125,7 @@ pub async fn run() -> Result<(), JsError> { body.append_p(&format!("{} Friends:", username(&ident_a)))?; for friend in account_a.list_friends().await? { - let friend = account_a - .get_identity(Identifier::did_key(friend)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let friend = account_a.get_identity(Identifier::did_key(friend)).await?; body.append_p(&format!("Username: {}", username(&friend)))?; body.append_p(&format!("Public Key: {}", friend.did_key()))?; body.append_p("")?; @@ -142,10 +133,7 @@ pub async fn run() -> Result<(), JsError> { body.append_p(&format!("{} Friends:", username(&ident_b)))?; for friend in account_b.list_friends().await? { - let friend = account_b - .get_identity(Identifier::did_key(friend)) - .await - .and_then(|list| list.first().cloned().ok_or(Error::IdentityDoesntExist))?; + let friend = account_b.get_identity(Identifier::did_key(friend)).await?; body.append_p(&format!("Username: {}", username(&friend)))?; body.append_p(&format!("Public Key: {}", friend.did_key()))?; body.append_p("")?; diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 22885d9b6..d95a834d6 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -29,7 +29,7 @@ use config::Config; use store::document::ResolvedRootDocument; use store::event_subscription::EventSubscription; use store::files::FileStore; -use store::identity::{IdentityStore, LookupBy}; +use store::identity::IdentityStore; use store::message::MessageStore; use utils::ExtensionType; use warp::constellation::directory::Directory; @@ -47,8 +47,9 @@ use warp::multipass::identity::{ Identifier, Identity, IdentityImage, IdentityProfile, IdentityUpdate, Relationship, }; use warp::multipass::{ - identity, Friends, IdentityImportOption, IdentityInformation, ImportLocation, LocalIdentity, - MultiPass, MultiPassEvent, MultiPassEventKind, MultiPassEventStream, MultiPassImportExport, + identity, Friends, GetIdentity, IdentityImportOption, IdentityInformation, ImportLocation, + LocalIdentity, MultiPass, MultiPassEvent, MultiPassEventKind, MultiPassEventStream, + MultiPassImportExport, }; use warp::raygun::{ AttachmentEventStream, Conversation, ConversationSettings, EmbedState, GroupSettings, Location, @@ -743,6 +744,18 @@ impl WarpIpfs { .ok_or(Error::ConstellationExtensionUnavailable) } + pub(crate) fn direct_identity_store(&self) -> std::result::Result { + let store = self + .inner + .components + .read() + .as_ref() + .map(|com| com.identity_store.clone()) + .ok_or(Error::MultiPassExtensionUnavailable)?; + + Ok(store) + } + pub(crate) fn ipfs(&self) -> Result { self.inner .components @@ -751,11 +764,6 @@ impl WarpIpfs { .map(|com| com.ipfs.clone()) .ok_or(Error::MultiPassExtensionUnavailable) } - - pub(crate) async fn is_blocked_by(&self, pubkey: &DID) -> Result { - let identity = self.identity_store(true).await?; - identity.is_blocked_by(pubkey).await - } } impl Extension for WarpIpfs { @@ -852,16 +860,13 @@ impl MultiPass for WarpIpfs { Ok(profile) } - async fn get_identity(&self, id: Identifier) -> Result, Error> { - let store = self.identity_store(true).await?; - - let kind = match id { - Identifier::DID(pk) => LookupBy::DidKey(pk), - Identifier::Username(username) => LookupBy::Username(username), - Identifier::DIDList(list) => LookupBy::DidKeys(list), + fn get_identity(&self, id: Identifier) -> GetIdentity { + let store = match self.direct_identity_store() { + Ok(store) => store, + _ => return GetIdentity::new(id, stream::empty().boxed()), }; - store.lookup(kind).await + store.lookup(id) } } @@ -1436,15 +1441,13 @@ impl IdentityInformation for WarpIpfs { } async fn identity_relationship(&self, did: &DID) -> Result { - self.get_identity(Identifier::did_key(did.clone())) - .await? - .first() - .ok_or(Error::IdentityDoesntExist)?; - let friends = self.has_friend(did).await?; - let received_friend_request = self.received_friend_request_from(did).await?; - let sent_friend_request = self.sent_friend_request_to(did).await?; - let blocked = self.is_blocked(did).await?; - let blocked_by = self.is_blocked_by(did).await?; + let store = self.identity_store(true).await?; + store.lookup(did).await?; + let friends = store.is_friend(did).await?; + let received_friend_request = store.received_friend_request_from(did).await?; + let sent_friend_request = store.sent_friend_request_to(did).await?; + let blocked = store.is_blocked(did).await?; + let blocked_by = store.is_blocked_by(did).await?; let mut relationship = Relationship::default(); relationship.set_friends(friends); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index cced06f5f..9996cd32f 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{ channel::oneshot::{self, Canceled}, - stream::SelectAll, SinkExt, StreamExt, }; use futures_timeout::TimeoutExt; @@ -24,6 +23,8 @@ use web_time::Instant; use crate::shuttle::identity::client::IdentityCommand; use crate::shuttle::identity::{RequestEvent, RequestPayload}; +use warp::multipass::identity::{Identifier, ShortId}; +use warp::multipass::GetIdentity; use warp::{ constellation::file::FileType, multipass::identity::{IdentityImage, Platform}, @@ -307,7 +308,7 @@ pub enum LookupBy { DidKey(DID), DidKeys(Vec), Username(String), - ShortId(String), + ShortId(ShortId), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -767,7 +768,7 @@ impl IdentityStore { if self.identity_cache.get(&from).await.is_err() { // Attempt to send identity request to peer if identity is not available locally. if self.request(&from, RequestOption::Identity).await.is_err() { - if let Err(e) = self.lookup(LookupBy::DidKey(from.clone())).await { + if let Err(e) = self.lookup(from.clone()).await { tracing::warn!("Failed to request identity from {from}: {e}."); } } @@ -1877,193 +1878,174 @@ impl IdentityStore { &self.root_document } - //Note: We are calling `IdentityStore::cache` multiple times, but shouldnt have any impact on performance. - pub async fn lookup(&self, lookup: LookupBy) -> Result, Error> { - let own_did = self - .own_identity() - .await - .map(|identity| identity.did_key()) - .map_err(|_| Error::OtherWithContext("Identity store may not be initialized".into()))?; + pub fn lookup(&self, id: impl Into) -> GetIdentity { + let store = self.clone(); + let id = id.into(); + let lookup = match id { + Identifier::DID(ref pk) => LookupBy::DidKey(pk.clone()), + Identifier::Username(ref username) => LookupBy::Username(username.clone()), + Identifier::DIDList(ref list) => LookupBy::DidKeys(list.clone()), + }; - let cache = self.identity_cache.list().await; + let stream = async_stream::stream! { + // first lets evaluate the cache + let cache = store.identity_cache.list().await; - let mut idents_docs = match &lookup { - //Note: If this returns more than one identity, then its likely due to frontend cache not clearing out. - //TODO: Maybe move cache into the backend to serve as a secondary cache - LookupBy::DidKey(pubkey) => { - //Maybe we should omit our own key here? - if *pubkey == own_did { - return self.own_identity().await.map(|i| vec![i]); - } + // anything missing we will push off to additional discovery service + let mut missing = HashSet::new(); - if !self.discovery.contains(pubkey).await { - self.discovery.insert(pubkey).await?; - } - cache - .filter(|ident| { - let ident = ident.clone(); - async move { ident.did == *pubkey } - }) - .collect::>() - .await - } - LookupBy::DidKeys(list) => { - for pubkey in list { - if !pubkey.eq(&own_did) && !self.discovery.contains(pubkey).await { - if let Err(e) = self.discovery.insert(pubkey).await { - tracing::error!("Error inserting {pubkey} into discovery: {e}") + match lookup { + LookupBy::DidKey(ref did) => { + if did == &store.did_key { + // Note: We can ignore errors if node own identity if invalid or not created + if let Ok(id) = store.own_identity().await { + yield id; + return; } + return; } - } - let mut prestream = SelectAll::new(); + for await document in cache.filter(|ident| { + let val = &ident.did == did; + async move { val } + }){ + let id = document.into(); + yield id; + return + } - if list.contains(&own_did) { - if let Ok(own_identity) = self.own_identity_document().await { - prestream.push(futures::stream::iter(vec![own_identity])); + missing.insert(did.clone()); + }, + LookupBy::DidKeys(ref list) => { + for pubkey in list { + if !pubkey.eq(&store.did_key) && !store.discovery.contains(pubkey).await { + if let Err(e) = store.discovery.insert(pubkey).await { + tracing::error!("Error inserting {pubkey} into discovery: {e}") + } + } } - } - cache - .filter(|id| { - let id = id.clone(); - async move { list.contains(&id.did) } - }) - .chain(prestream.boxed()) - .collect::>() - .await - } - LookupBy::Username(username) if username.contains('#') => { - let split_data = username.split('#').collect::>(); - - if split_data.len() != 2 { - cache - .filter(|ident| { - let ident = ident.clone(); - async move { - ident - .username - .to_lowercase() - .contains(&username.to_lowercase()) + if list.contains(&store.did_key) { + if let Ok(own_identity) = store.own_identity_document().await { + yield own_identity.into(); + } + } + + let mut found = HashSet::new(); + + for await document in cache { + if list.contains(&document.did) { + found.insert(document.did.clone()); + yield document.into(); + } + } + + missing.extend(list.iter().filter(|did| !found.contains(did)).cloned()); + }, + LookupBy::Username(ref username) if username.contains('#') => { + let split_data = username.split('#').collect::>(); + + if split_data.len() != 2 { + for await document in cache { + if document.username.to_lowercase().contains(&username.to_lowercase()) { + yield document.into(); } - }) - .collect::>() - .await - } else { - match ( + } + } else if let (Some(name), Some(code)) = ( split_data.first().map(|s| s.to_lowercase()), split_data.last().map(|s| s.to_lowercase()), ) { - (Some(name), Some(code)) => { - cache - .filter(|ident| { - let ident = ident.clone(); - let name = name.clone(); - let code = code.clone(); - async move { - ident.username.to_lowercase().eq(&name) - && String::from_utf8_lossy(&ident.short_id) - .to_lowercase() - .eq(&code) - } - }) - .collect::>() - .await + for await document in cache { + if document.username.to_lowercase().eq(&name) && String::from_utf8_lossy(&document.short_id).to_lowercase().eq(&code) { + yield document.into(); + } } - _ => HashSet::new(), } } - } - LookupBy::Username(username) => { - let username = username.to_lowercase(); - cache - .filter(|ident| { - let ident = ident.clone(); - let username = username.clone(); - async move { ident.username.to_lowercase().contains(&username) } - }) - .collect::>() - .await - } - LookupBy::ShortId(id) => { - cache - .filter(|ident| { - let ident = ident.clone(); - let id = id.clone(); - async move { String::from_utf8_lossy(&ident.short_id).eq(&id) } - }) - .collect::>() - .await - } - }; - if idents_docs.is_empty() { - let kind = match lookup { - LookupBy::DidKey(did) => { - crate::shuttle::identity::protocol::Lookup::PublicKey { did } - } - LookupBy::DidKeys(list) => { - crate::shuttle::identity::protocol::Lookup::PublicKeys { dids: list } - } - LookupBy::Username(username) => { - crate::shuttle::identity::protocol::Lookup::Username { username, count: 0 } + LookupBy::Username(ref username) => { + let username = username.to_lowercase(); + for await document in cache { + if document.username.to_lowercase().eq(&username) { + yield document.into(); + } + } } LookupBy::ShortId(short_id) => { - crate::shuttle::identity::protocol::Lookup::ShortId { - short_id: short_id.try_into()?, + for await document in cache { + let id = ShortId::from(document.short_id); + if id == short_id { + yield document.into(); + return; + } } } - }; - if let DiscoveryConfig::Shuttle { addresses } = self.discovery.discovery_config() { - for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) { - let (tx, rx) = futures::channel::oneshot::channel(); - let _ = self - .identity_command - .clone() - .send(IdentityCommand::Lookup { - peer_id, - kind: kind.clone(), - response: tx, - }) - .await; + } - match rx.timeout(SHUTTLE_TIMEOUT).await { - Ok(Ok(Ok(list))) => { - for ident in &list { - let ident = ident.clone(); - _ = self.identity_cache.insert(&ident).await; + if !missing.is_empty() || matches!(lookup, LookupBy::Username(_) | LookupBy::ShortId(_)) { + let kind = match lookup { + LookupBy::DidKey(did) => { + crate::shuttle::identity::protocol::Lookup::PublicKey { did } + } + LookupBy::DidKeys(list) => { + crate::shuttle::identity::protocol::Lookup::PublicKeys { dids: list } + } + LookupBy::Username(username) => { + crate::shuttle::identity::protocol::Lookup::Username { username, count: 0 } + } + LookupBy::ShortId(short_id) => { + crate::shuttle::identity::protocol::Lookup::ShortId { short_id } + } + }; + if let DiscoveryConfig::Shuttle { addresses } = store.discovery.discovery_config() { + for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) { + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = store + .identity_command + .clone() + .send(IdentityCommand::Lookup { + peer_id, + kind: kind.clone(), + response: tx, + }) + .await; - if self.discovery.contains(&ident.did).await { - continue; + match rx.timeout(SHUTTLE_TIMEOUT).await { + Ok(Ok(Ok(list))) => { + for ident in &list { + let ident = ident.clone(); + let did = ident.did.clone(); + + _ = store.identity_cache.insert(&ident).await; + + yield ident.into(); + + if store.discovery.contains(&did).await { + continue; + } + let _ = store.discovery.insert(&did).await; } - let _ = self.discovery.insert(&ident.did).await; - } - idents_docs.extend(list.iter().cloned()); - break; - } - Ok(Ok(Err(e))) => { - error!("Error registering identity to {peer_id}: {e}"); - break; - } - Ok(Err(Canceled)) => { - error!("Channel been unexpectedly closed for {peer_id}"); - continue; - } - Err(_) => { - error!("Request timed out for {peer_id}"); - continue; + break; + } + Ok(Ok(Err(e))) => { + error!("Error registering identity to {peer_id}: {e}"); + break; + } + Ok(Err(Canceled)) => { + error!("Channel been unexpectedly closed for {peer_id}"); + continue; + } + Err(_) => { + error!("Request timed out for {peer_id}"); + continue; + } } } } } - } - - let list = idents_docs - .iter() - .filter_map(|doc| doc.resolve().ok()) - .collect::>(); + }; - Ok(list) + GetIdentity::new(id, stream.boxed()) } pub async fn identity_update(&mut self, identity: IdentityDocument) -> Result<(), Error> { @@ -2108,11 +2090,7 @@ impl IdentityStore { self.discovery_type(), DiscoveryConfig::None | DiscoveryConfig::Shuttle { .. } ) { - self.lookup(LookupBy::DidKey(did.clone())) - .await? - .first() - .cloned() - .ok_or(Error::IdentityDoesntExist)?; + self.lookup(did).await?; } let status: IdentityStatus = connected_to_peer(&self.ipfs, did.clone()) diff --git a/extensions/warp-ipfs/tests/accounts.rs b/extensions/warp-ipfs/tests/accounts.rs index 4b393df09..15ecd4849 100644 --- a/extensions/warp-ipfs/tests/accounts.rs +++ b/extensions/warp-ipfs/tests/accounts.rs @@ -71,11 +71,7 @@ mod test { //used to wait for the nodes to discover eachother and provide their identity to each other let identity_b = crate::common::timeout(Duration::from_secs(60), async { loop { - if let Ok(Some(id)) = account_a - .get_identity(did_b.clone().into()) - .await - .map(|s| s.first().cloned()) - { + if let Ok(id) = account_a.get_identity(did_b.clone().into()).await { break id; } } @@ -111,13 +107,7 @@ mod test { let identity_b = crate::common::timeout(Duration::from_secs(60), async { loop { - if let Some(id) = account_a - .get_identity(String::from("JaneDoe").into()) - .await - .expect("Should not fail") - .first() - .cloned() - { + if let Ok(id) = account_a.get_identity(String::from("JaneDoe").into()).await { break id; } } diff --git a/tools/inspect/src/main.rs b/tools/inspect/src/main.rs index 758a31284..6af74d2f4 100644 --- a/tools/inspect/src/main.rs +++ b/tools/inspect/src/main.rs @@ -3,6 +3,7 @@ use std::time::Instant; use clap::Parser; use comfy_table::Table; +use futures::StreamExt; use warp::constellation::Constellation; use warp::crypto::zeroize::Zeroizing; @@ -97,13 +98,11 @@ async fn main() -> anyhow::Result<()> { table.set_header(vec!["Username", "DID"]); let start_time = Instant::now(); - let identites = account - .get_identity(Identifier::DIDList(friends.clone())) - .await?; + let mut identites = account.get_identity(Identifier::DIDList(friends.clone())); let end_time = start_time.elapsed(); println!("Took {}ms to load friends identities", end_time.as_millis()); - for identity in identites { + while let Some(identity) = identites.next().await { table.add_row(vec![ format!("{}#{}", identity.username(), identity.short_id()), identity.did_key().to_string(), @@ -135,12 +134,9 @@ async fn main() -> anyhow::Result<()> { for convo in conversations { let recipients = account .get_identity(Identifier::DIDList(convo.recipients())) - .await - .map(|list| { - list.iter() - .map(|id| format!("{}#{}", id.username(), id.short_id())) - .collect::>() - })?; + .map(|id| format!("{}#{}", id.username(), id.short_id())) + .collect::>() + .await; let count = rg.get_message_count(convo.id()).await?; diff --git a/warp/src/js_exports/multipass.rs b/warp/src/js_exports/multipass.rs index 3cd19b8aa..ce1535858 100644 --- a/warp/src/js_exports/multipass.rs +++ b/warp/src/js_exports/multipass.rs @@ -1,3 +1,4 @@ +use crate::error::Error; use crate::{ crypto::DID, js_exports::stream::AsyncIterator, @@ -43,11 +44,17 @@ impl MultiPassBox { id_variant: Identifier, id_value: JsValue, ) -> Result { - self.inner - .get_identity(to_identifier_enum(id_variant, id_value)?) - .await - .map_err(|e| e.into()) - .map(|ok| serde_wasm_bindgen::to_value(&ok).unwrap()) + let id = to_identifier_enum(id_variant, id_value)?; + let single_id = matches!(id, crate::multipass::identity::Identifier::DID(_)); + let list = self + .inner + .get_identity(id.clone()) + .collect::>() + .await; + match (single_id, list.is_empty()) { + (true, true) => Err(Error::IdentityDoesntExist.into()), + (_, false) | (_, true) => Ok(serde_wasm_bindgen::to_value(&list).unwrap()), + } } pub async fn identity(&self) -> Result { diff --git a/warp/src/multipass/identity.rs b/warp/src/multipass/identity.rs index 14b407201..216367c6b 100644 --- a/warp/src/multipass/identity.rs +++ b/warp/src/multipass/identity.rs @@ -359,6 +359,12 @@ impl From for Identifier { } } +impl From<&DID> for Identifier { + fn from(did: &DID) -> Self { + Self::DID(did.clone()) + } +} + impl From for Identifier { fn from(username: String) -> Self { Self::Username(username) diff --git a/warp/src/multipass/mod.rs b/warp/src/multipass/mod.rs index b1538eaa6..06d7f9743 100644 --- a/warp/src/multipass/mod.rs +++ b/warp/src/multipass/mod.rs @@ -1,9 +1,13 @@ #![allow(clippy::result_large_err)] +use std::future::Future; use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; use dyn_clone::DynClone; use futures::stream::BoxStream; +use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; use identity::Identity; @@ -86,7 +90,7 @@ pub trait MultiPass: ) -> Result; /// Obtain an [`Identity`] using [`Identifier`] - async fn get_identity(&self, id: Identifier) -> Result, Error>; + fn get_identity(&self, id: Identifier) -> GetIdentity; } dyn_clone::clone_trait_object!(MultiPass); @@ -242,3 +246,57 @@ pub trait IdentityInformation: Send + Sync { Err(Error::Unimplemented) } } + +pub struct GetIdentity { + identifier: Identifier, + stream: Option>, +} + +impl GetIdentity { + pub fn new(identifier: Identifier, stream: BoxStream<'static, Identity>) -> Self { + GetIdentity { + identifier, + stream: Some(stream), + } + } +} + +impl Stream for GetIdentity { + type Item = Identity; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(stream) = self.stream.as_mut() else { + return Poll::Ready(None); + }; + + match futures::ready!(stream.poll_next_unpin(cx)) { + Some(item) => Poll::Ready(Some(item)), + None => { + self.stream.take(); + Poll::Ready(None) + } + } + } +} + +impl Future for GetIdentity { + type Output = Result; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + + let Some(stream) = this.stream.as_mut() else { + return Poll::Ready(Err(Error::IdentityDoesntExist)); + }; + + if matches!(this.identifier, Identifier::DIDList(_)) { + this.stream.take(); + return Poll::Ready(Err(Error::InvalidIdentifierCondition)); + } + + let result = match futures::ready!(stream.poll_next_unpin(cx)) { + Some(identity) => Ok(identity), + None => Err(Error::IdentityDoesntExist), + }; + this.stream.take(); + Poll::Ready(result) + } +} From e1b16c6c6729073aedd7a08f5d766c3635a21f50 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 24 Jun 2024 14:26:46 -0400 Subject: [PATCH 2/2] refactor: Change conversation topics and additional subscription (#538) --- extensions/warp-ipfs/src/shuttle/message.rs | 21 +++++++++++ extensions/warp-ipfs/src/shuttle/server.rs | 2 +- .../warp-ipfs/src/shuttle/store/messages.rs | 35 +++++++++++++++++-- .../src/shuttle/subscription_stream.rs | 15 +++++--- .../warp-ipfs/src/store/conversation.rs | 20 ++++------- extensions/warp-ipfs/src/store/message.rs | 6 ++-- extensions/warp-ipfs/src/store/mod.rs | 18 ++++++++++ 7 files changed, 92 insertions(+), 25 deletions(-) diff --git a/extensions/warp-ipfs/src/shuttle/message.rs b/extensions/warp-ipfs/src/shuttle/message.rs index e6f985632..57636c0e6 100644 --- a/extensions/warp-ipfs/src/shuttle/message.rs +++ b/extensions/warp-ipfs/src/shuttle/message.rs @@ -1,5 +1,26 @@ +use std::fmt::Display; + +use uuid::Uuid; +use warp::crypto::DID; + pub mod client; pub mod protocol; #[cfg(not(target_arch = "wasm32"))] pub mod server; + +pub trait ConversationTopic: Display { + fn base(&self) -> String; + fn event_topic(&self) -> String { + format!("{}/events", self.base()) + } + fn exchange_topic(&self, did: &DID) -> String { + format!("{}/exchange/{}", self.base(), did) + } +} + +impl ConversationTopic for Uuid { + fn base(&self) -> String { + format!("/conversation/{self}") + } +} diff --git a/extensions/warp-ipfs/src/shuttle/server.rs b/extensions/warp-ipfs/src/shuttle/server.rs index 15ebba09f..bfa224a84 100644 --- a/extensions/warp-ipfs/src/shuttle/server.rs +++ b/extensions/warp-ipfs/src/shuttle/server.rs @@ -178,7 +178,7 @@ impl ShuttleServer { identity.list().await.count().await ); - let mut subscriptions = Subscriptions::new(&ipfs, &identity); + let mut subscriptions = Subscriptions::new(&ipfs, &identity, &message); _ = subscriptions .subscribe("/identity/announce/v0".into()) .await; diff --git a/extensions/warp-ipfs/src/shuttle/store/messages.rs b/extensions/warp-ipfs/src/shuttle/store/messages.rs index 9c994d3fc..3a4e1f2be 100644 --- a/extensions/warp-ipfs/src/shuttle/store/messages.rs +++ b/extensions/warp-ipfs/src/shuttle/store/messages.rs @@ -1,8 +1,8 @@ // This module handles storing items in a mailbox for its intended recipients to fetch, download, and notify about this node // about it being delivered. Messages that are new or updated will be inserted in the same manner -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; -use futures::{stream, StreamExt}; +use futures::{stream, Stream, StreamExt}; use libipld::Cid; use rust_ipfs::{Ipfs, IpfsPath}; use std::path::PathBuf; @@ -95,6 +95,11 @@ impl MessageStorage { .message_delivered(member, conversation_id, message_id) .await } + + pub async fn list_conversations(&self) -> impl Stream { + let inner = &*self.inner.read().await; + inner.list_conversations() + } } impl MessageStorageInner { @@ -450,4 +455,30 @@ impl MessageStorageInner { Ok(()) } + + //TODO: Expose conversation type (after registration is impl) + fn list_conversations(&self) -> impl Stream { + let list = self.list; + let ipfs = self.ipfs.clone(); + + async_stream::stream! { + let list: BTreeMap = match list { + Some(cid) => ipfs + .get_dag(cid) + .local() + .deserialized() + .await + .unwrap_or_default(), + None => return, + }; + + for id in list.keys() { + let Ok(id) = Uuid::from_str(id) else { + continue; + }; + + yield id; + } + } + } } diff --git a/extensions/warp-ipfs/src/shuttle/subscription_stream.rs b/extensions/warp-ipfs/src/shuttle/subscription_stream.rs index 55f10ebda..fff20c8f7 100644 --- a/extensions/warp-ipfs/src/shuttle/subscription_stream.rs +++ b/extensions/warp-ipfs/src/shuttle/subscription_stream.rs @@ -5,17 +5,15 @@ use rust_ipfs::libp2p::gossipsub::Message; use rust_ipfs::Ipfs; use tokio_stream::StreamMap; -use crate::store::topics::PeerTopic; - -use super::store::identity::IdentityStorage; - +use super::store::{identity::IdentityStorage, messages::MessageStorage}; +use crate::store::topics::{ConversationTopic, PeerTopic}; #[derive(Clone)] pub struct Subscriptions { tx: futures::channel::mpsc::Sender, } impl Subscriptions { - pub fn new(ipfs: &Ipfs, identity: &IdentityStorage) -> Self { + pub fn new(ipfs: &Ipfs, identity: &IdentityStorage, message: &MessageStorage) -> Self { let (tx, rx) = futures::channel::mpsc::channel(1); let mut task = SubscriptionTask { @@ -28,14 +26,21 @@ impl Subscriptions { .insert("pending".into(), futures::stream::pending().boxed()); let identity = identity.clone(); + let message = message.clone(); tokio::spawn(async move { { let mut list = identity.list().await; + let mut conversations = message.list_conversations().await.boxed(); while let Some(id) = list.next().await { _ = task.subscribe(id.did.inbox()).await; _ = task.subscribe(id.did.messaging()).await; } + + while let Some(id) = conversations.next().await { + _ = task.subscribe(id.base()).await; + _ = task.subscribe(id.event_topic()).await; + } } task.run().await diff --git a/extensions/warp-ipfs/src/store/conversation.rs b/extensions/warp-ipfs/src/store/conversation.rs index 5330c3aa2..11f23fd87 100644 --- a/extensions/warp-ipfs/src/store/conversation.rs +++ b/extensions/warp-ipfs/src/store/conversation.rs @@ -27,8 +27,8 @@ use warp::{ use crate::store::{ecdh_encrypt, ecdh_encrypt_with_nonce, DidExt}; use super::{ - document::FileAttachmentDocument, ecdh_decrypt, keystore::Keystore, verify_serde_sig, - PeerIdExt, MAX_ATTACHMENT, MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, + document::FileAttachmentDocument, ecdh_decrypt, keystore::Keystore, topics::ConversationTopic, + verify_serde_sig, PeerIdExt, MAX_ATTACHMENT, MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE, }; #[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -87,23 +87,15 @@ impl ConversationDocument { } pub fn topic(&self) -> String { - format!("{}/{}", self.conversation_type(), self.id()) + self.id.base() } pub fn event_topic(&self) -> String { - format!("{}/events", self.topic()) + self.id.event_topic() } - pub fn files_topic(&self) -> String { - format!("{}/files", self.topic()) - } - - pub fn reqres_topic(&self, did: &DID) -> String { - format!("{}/reqres/{}", self.topic(), did) - } - - pub fn files_transfer(&self, id: Uuid) -> String { - format!("{}/{id}", self.files_topic()) + pub fn exchange_topic(&self, did: &DID) -> String { + self.id.exchange_topic(did) } pub fn recipients(&self) -> Vec { diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index b79793554..787dfff20 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -1358,7 +1358,7 @@ impl ConversationInner { .from_ipfs(&self.ipfs) .await?; - let topic = conversation.reqres_topic(did); + let topic = conversation.exchange_topic(did); let peers = self.ipfs.pubsub_peers(Some(topic.clone())).await?; let peer_id = did.to_peer_id()?; @@ -3161,7 +3161,7 @@ impl ConversationInner { let main_topic = conversation.topic(); let event_topic = conversation.event_topic(); - let request_topic = conversation.reqres_topic(&self.identity.did_key()); + let request_topic = conversation.exchange_topic(&self.identity.did_key()); let messaging_stream = self .ipfs @@ -4060,7 +4060,7 @@ async fn process_request_response_event( kind: ConversationResponseKind::Key { key }, }; - let topic = conversation.reqres_topic(&sender); + let topic = conversation.exchange_topic(&sender); let bytes = ecdh_encrypt(keypair, Some(&sender), serde_json::to_vec(&response)?)?; diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index 59616353b..f449c7629 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -48,6 +48,7 @@ pub const MAX_REQUEST: usize = 1_000; pub(super) mod topics { use std::fmt::Display; + use uuid::Uuid; use warp::crypto::DID; /// Topic to announce identity updates to the network @@ -67,6 +68,23 @@ pub(super) mod topics { } impl PeerTopic for DID {} + + pub trait ConversationTopic: Display { + fn base(&self) -> String; + fn event_topic(&self) -> String { + format!("{}/events", self.base()) + } + + fn exchange_topic(&self, did: &DID) -> String { + format!("{}/exchange/{}", self.base(), did) + } + } + + impl ConversationTopic for Uuid { + fn base(&self) -> String { + format!("/conversation/{self}") + } + } } pub(super) mod ds_key {