From 0891dce2d119c9e141bff864bf9a0e376e3e8458 Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 7 Oct 2023 08:42:54 -0400 Subject: [PATCH] refactor: Merge friend module into identity --- extensions/warp-ipfs/src/lib.rs | 78 +- extensions/warp-ipfs/src/store/document.rs | 4 +- extensions/warp-ipfs/src/store/friends.rs | 1041 -------------------- extensions/warp-ipfs/src/store/identity.rs | 994 +++++++++++++++++-- extensions/warp-ipfs/src/store/message.rs | 19 +- extensions/warp-ipfs/src/store/mod.rs | 1 - extensions/warp-ipfs/src/store/queue.rs | 2 +- 7 files changed, 973 insertions(+), 1166 deletions(-) delete mode 100644 extensions/warp-ipfs/src/store/friends.rs diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index e0ef926ea..bc3340c0e 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -28,7 +28,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use store::document::ExtractedRootDocument; use store::files::FileStore; -use store::friends::FriendsStore; use store::identity::{IdentityStore, LookupBy}; use store::message::MessageStore; use tokio::sync::broadcast; @@ -78,7 +77,6 @@ pub struct WarpIpfs { identity_guard: Arc>, ipfs: Arc>>, tesseract: Tesseract, - friend_store: Arc>>, identity_store: Arc>>, message_store: Arc>>, file_store: Arc>>, @@ -148,7 +146,6 @@ impl WarpIpfs { config, tesseract, ipfs: Default::default(), - friend_store: Default::default(), identity_store: Default::default(), message_store: Default::default(), file_store: Default::default(), @@ -179,9 +176,7 @@ impl WarpIpfs { async fn initialize_store(&self, init: bool) -> anyhow::Result<()> { let tesseract = self.tesseract.clone(); - if init && self.identity_store.read().is_some() && self.friend_store.read().is_some() - || self.initialized.load(Ordering::SeqCst) - { + if init && self.identity_store.read().is_some() || self.initialized.load(Ordering::SeqCst) { warn!("Identity is already loaded"); anyhow::bail!(Error::IdentityExist) } @@ -462,28 +457,29 @@ impl WarpIpfs { tesseract.clone(), config.store_setting.auto_push, self.multipass_tx.clone(), + pb_tx, &config, discovery.clone(), ) .await?; info!("Identity store initialized"); - let friend_store = FriendsStore::new( - ipfs.clone(), - identity_store.clone(), - discovery.clone(), - config.clone(), - tesseract.clone(), - self.multipass_tx.clone(), - pb_tx, - ) - .await?; - info!("friends store initialized"); + // let friend_store = FriendsStore::new( + // ipfs.clone(), + // identity_store.clone(), + // discovery.clone(), + // config.clone(), + // tesseract.clone(), + // self.multipass_tx.clone(), + // + // ) + // .await?; + // info!("friends store initialized"); - identity_store.set_friend_store(friend_store.clone()).await; + // identity_store.set_friend_store(friend_store.clone()).await; *self.identity_store.write() = Some(identity_store.clone()); - *self.friend_store.write() = Some(friend_store.clone()); + // *self.friend_store.write() = Some(friend_store.clone()); *self.ipfs.write() = Some(ipfs.clone()); @@ -496,7 +492,7 @@ impl WarpIpfs { ipfs.clone(), config.path.map(|path| path.join("messages")), identity_store, - friend_store, + // friend_store, discovery, Some(Box::new(self.clone()) as Box), false, @@ -517,14 +513,6 @@ impl WarpIpfs { Ok(()) } - pub(crate) async fn friend_store(&self) -> Result { - self.identity_store(true).await?; - self.friend_store - .read() - .clone() - .ok_or(Error::MultiPassExtensionUnavailable) - } - pub(crate) async fn identity_store(&self, created: bool) -> Result { let store = self.identity_store_sync()?; if created && !store.local_id_created().await { @@ -578,8 +566,8 @@ impl WarpIpfs { } pub(crate) async fn is_blocked_by(&self, pubkey: &DID) -> Result { - let friends = self.friend_store().await?; - friends.is_blocked_by(pubkey).await + let identity = self.identity_store(true).await?; + identity.is_blocked_by(pubkey).await } } @@ -1059,77 +1047,77 @@ impl MultiPassImportExport for WarpIpfs { #[async_trait::async_trait] impl Friends for WarpIpfs { async fn send_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.send_request(pubkey).await } async fn accept_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.accept_request(pubkey).await } async fn deny_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.reject_request(pubkey).await } async fn close_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.close_request(pubkey).await } async fn list_incoming_request(&self) -> Result, Error> { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.list_incoming_request().await } async fn list_outgoing_request(&self) -> Result, Error> { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.list_outgoing_request().await } async fn received_friend_request_from(&self, did: &DID) -> Result { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.received_friend_request_from(did).await } async fn sent_friend_request_to(&self, did: &DID) -> Result { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.sent_friend_request_to(did).await } async fn remove_friend(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.remove_friend(pubkey, true).await } async fn block(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.block(pubkey).await } async fn is_blocked(&self, did: &DID) -> Result { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.is_blocked(did).await } async fn unblock(&mut self, pubkey: &DID) -> Result<(), Error> { - let mut store = self.friend_store().await?; + let mut store = self.identity_store(true).await?; store.unblock(pubkey).await } async fn block_list(&self) -> Result, Error> { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.block_list().await.map(Vec::from_iter) } async fn list_friends(&self) -> Result, Error> { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.friends_list().await.map(Vec::from_iter) } async fn has_friend(&self, pubkey: &DID) -> Result { - let store = self.friend_store().await?; + let store = self.identity_store(true).await?; store.is_friend(pubkey).await } } diff --git a/extensions/warp-ipfs/src/store/document.rs b/extensions/warp-ipfs/src/store/document.rs index af8caad37..2d7855957 100644 --- a/extensions/warp-ipfs/src/store/document.rs +++ b/extensions/warp-ipfs/src/store/document.rs @@ -19,7 +19,7 @@ use crate::store::get_keypair_did; use self::{identity::IdentityDocument, utils::GetLocalDag}; -use super::friends::Request; +use super::identity::Request; #[async_trait::async_trait] pub(crate) trait ToCid: Sized { @@ -76,7 +76,7 @@ pub struct ExtractedRootDocument { pub friends: Vec, pub block_list: Vec, pub block_by_list: Vec, - pub request: Vec, + pub request: Vec, pub signature: Option>, } diff --git a/extensions/warp-ipfs/src/store/friends.rs b/extensions/warp-ipfs/src/store/friends.rs deleted file mode 100644 index 1ccff442d..000000000 --- a/extensions/warp-ipfs/src/store/friends.rs +++ /dev/null @@ -1,1041 +0,0 @@ -#![allow(clippy::await_holding_lock)] -use futures::channel::oneshot; -use futures::StreamExt; -use ipfs::{Ipfs, PeerId}; -use rust_ipfs as ipfs; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; -use tokio::sync::{broadcast, RwLock}; -use tracing::log::{self, error, warn}; -use warp::crypto::DID; -use warp::error::Error; -use warp::multipass::MultiPassEventKind; -use warp::sync::Arc; - -use warp::tesseract::Tesseract; - -use crate::behaviour::phonebook::PhoneBookCommand; -use crate::config::Config as MpIpfsConfig; -use crate::store::{ecdh_decrypt, ecdh_encrypt, PeerIdExt, PeerTopic}; - -use super::identity::{IdentityStore, LookupBy, RequestOption}; -use super::phonebook::PhoneBook; -use super::queue::Queue; -use super::{did_keypair, did_to_libp2p_pub, discovery, libp2p_pub_to_did}; - -#[allow(clippy::type_complexity)] -#[derive(Clone)] -pub struct FriendsStore { - ipfs: Ipfs, - - // Identity Store - identity: IdentityStore, - - discovery: discovery::Discovery, - - // keypair - did_key: Arc, - - // Queue to handle sending friend request - queue: Queue, - - phonebook: PhoneBook, - - wait_on_response: Option, - - signal: Arc>>>>, - - tx: broadcast::Sender, -} - -#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] -pub enum Request { - In(DID), - Out(DID), -} - -impl From for RequestType { - fn from(request: Request) -> Self { - RequestType::from(&request) - } -} - -impl From<&Request> for RequestType { - fn from(request: &Request) -> Self { - match request { - Request::In(_) => RequestType::Incoming, - Request::Out(_) => RequestType::Outgoing, - } - } -} - -impl Request { - pub fn r#type(&self) -> RequestType { - self.into() - } - - pub fn did(&self) -> &DID { - match self { - Request::In(did) => did, - Request::Out(did) => did, - } - } -} - -#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Hash, Eq)] -#[serde(rename_all = "lowercase", tag = "type")] -pub enum Event { - /// Event indicating a friend request - Request, - /// Event accepting the request - Accept, - /// Remove identity as a friend - Remove, - /// Reject friend request, if any - Reject, - /// Retract a sent friend request - Retract, - /// Block user - Block, - /// Unblock user - Unblock, - /// Indiciation of a response to a request - Response, -} - -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Hash, Eq)] -pub struct RequestResponsePayload { - pub sender: DID, - pub event: Event, -} - -#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)] -pub enum RequestType { - Incoming, - Outgoing, -} - -impl FriendsStore { - pub async fn new( - ipfs: Ipfs, - identity: IdentityStore, - discovery: discovery::Discovery, - config: MpIpfsConfig, - tesseract: Tesseract, - tx: broadcast::Sender, - pb_tx: futures::channel::mpsc::Sender, - ) -> anyhow::Result { - let did_key = Arc::new(did_keypair(&tesseract)?); - - let queue = Queue::new( - ipfs.clone(), - did_key.clone(), - config.path.clone(), - discovery.clone(), - ); - - let phonebook = PhoneBook::new( - ipfs.clone(), - discovery.clone(), - tx.clone(), - config.store_setting.emit_online_event, - pb_tx, - ); - - let signal = Default::default(); - let wait_on_response = config.store_setting.friend_request_response_duration; - let store = Self { - ipfs, - identity, - discovery, - did_key, - queue, - phonebook, - tx, - signal, - wait_on_response, - }; - - log::info!("Loading queue"); - if let Err(_e) = store.queue.load().await {} - - let phonebook = store.phonebook(); - log::info!("Loading friends list into phonebook"); - if let Ok(friends) = store.friends_list().await { - if let Err(_e) = phonebook.add_friend_list(friends).await { - error!("Error adding friends in phonebook: {_e}"); - } - } - - // scan through friends list to see if there is any incoming request or outgoing request matching - // and clear them out of the request list as a precautionary measure - let friends = store.friends_list().await.unwrap_or_default(); - - for friend in friends { - let list = store.list_all_raw_request().await.unwrap_or_default(); - - // cleanup outgoing - for req in list.iter().filter(|req| req.did().eq(&friend)) { - let _ = store.identity.root_document_remove_request(req).await.ok(); - } - } - - let stream = store - .ipfs - .pubsub_subscribe(store.did_key.inbox()) - .await?; - - tokio::spawn({ - let mut store = store.clone(); - async move { - // autoban the blocklist - // TODO: implement configuration open to autoban at the libp2p level - // Note: If this is done, we would need to attempt reconnection if the user - // is ever unblocked - // match store.block_list().await { - // Ok(list) => { - // for pubkey in list { - // if let Ok(peer_id) = did_to_libp2p_pub(&pubkey).map(|p| p.to_peer_id()) - // { - // if let Err(e) = store.ipfs.ban_peer(peer_id).await { - // error!("Error banning peer: {e}"); - // } - // } - // } - // } - // Err(e) => { - // error!("Error loading block list: {e}"); - // } - // }; - - futures::pin_mut!(stream); - - while let Some(message) = stream.next().await { - let Some(peer_id) = message.source else { - //Note: Due to configuration, we should ALWAYS have a peer set in its source - // thus we can ignore the request if no peer is provided - continue; - }; - - let Ok(did) = peer_id.to_did() else { - //Note: The peer id is embedded with ed25519 public key, therefore we can decode it into a did key - // otherwise we can ignore - continue; - }; - - if let Err(e) = store.check_request_message(&did, &message.data).await { - error!("Error: {e}"); - } - } - } - }); - tokio::task::yield_now().await; - Ok(store) - } - - pub(crate) fn phonebook(&self) -> &PhoneBook { - &self.phonebook - } - - //TODO: Implement Errors - #[tracing::instrument(skip(self, data))] - async fn check_request_message(&mut self, did: &DID, data: &[u8]) -> anyhow::Result<()> { - let pk_did = &*self.did_key; - - let bytes = ecdh_decrypt(pk_did, Some(did), data)?; - - log::trace!("received payload size: {} bytes", bytes.len()); - - let data = serde_json::from_slice::(&bytes)?; - - log::info!("Received event from {did}"); - - if self - .list_incoming_request() - .await - .unwrap_or_default() - .contains(&data.sender) - && data.event == Event::Request - { - warn!("Request exist locally. Skipping"); - return Ok(()); - } - - //TODO: Send error if dropped early due to error when processing request - let mut signal = self.signal.write().await.remove(&data.sender); - - log::debug!("Event {:?}", data.event); - - // Before we validate the request, we should check to see if the key is blocked - // If it is, skip the request so we dont wait resources storing it. - if self.is_blocked(&data.sender).await? && !matches!(data.event, Event::Block) { - log::warn!("Received event from a blocked identity."); - let payload = RequestResponsePayload { - sender: (*self.did_key).clone(), - event: Event::Block, - }; - - self.broadcast_request((&data.sender, &payload), false, true) - .await?; - - return Ok(()); - } - - match data.event { - Event::Accept => { - let list = self.list_all_raw_request().await?; - - let Some(item) = list - .iter() - .filter(|req| req.r#type() == RequestType::Outgoing) - .find(|req| data.sender.eq(req.did())) - .cloned() - else { - anyhow::bail!( - "Unable to locate pending request. Already been accepted or rejected?" - ) - }; - - // Maybe just try the function instead and have it be a hard error? - if self - .identity - .root_document_remove_request(&item) - .await - .is_err() - { - anyhow::bail!( - "Unable to locate pending request. Already been accepted or rejected?" - ) - } - - self.add_friend(item.did()).await?; - } - Event::Request => { - if self.is_friend(&data.sender).await? { - log::debug!("Friend already exist. Remitting event"); - let payload = RequestResponsePayload { - sender: (*self.did_key).clone(), - event: Event::Accept, - }; - - self.broadcast_request((&data.sender, &payload), false, false) - .await?; - - return Ok(()); - } - - let list = self.list_all_raw_request().await?; - - if let Some(inner_req) = list - .iter() - .find(|request| { - request.r#type() == RequestType::Outgoing && data.sender.eq(request.did()) - }) - .cloned() - { - //Because there is also a corresponding outgoing request for the incoming request - //we can automatically add them - self.identity - .root_document_remove_request(&inner_req) - .await?; - self.add_friend(inner_req.did()).await?; - } else { - self.identity - .root_document_add_request(&Request::In(data.sender.clone())) - .await?; - - tokio::spawn({ - let store = self.identity.clone(); - let from = data.sender.clone(); - async move { - let _ = tokio::time::timeout(Duration::from_secs(10), async { - loop { - if let Ok(list) = - store.lookup(LookupBy::DidKey(from.clone())).await - { - if !list.is_empty() { - break; - } - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - }) - .await - .ok(); - - store.emit_event(MultiPassEventKind::FriendRequestReceived { from }); - } - }); - } - let payload = RequestResponsePayload { - sender: (*self.did_key).clone(), - event: Event::Response, - }; - - self.broadcast_request((&data.sender, &payload), false, false) - .await?; - } - Event::Reject => { - let list = self.list_all_raw_request().await?; - let internal_request = list - .iter() - .find(|request| { - request.r#type() == RequestType::Outgoing && data.sender.eq(request.did()) - }) - .cloned() - .ok_or(Error::FriendRequestDoesntExist)?; - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - self.emit_event(MultiPassEventKind::OutgoingFriendRequestRejected { - did: data.sender, - }); - } - Event::Remove => { - if self.is_friend(&data.sender).await? { - self.remove_friend(&data.sender, false).await?; - } - } - Event::Retract => { - let list = self.list_all_raw_request().await?; - let internal_request = list - .iter() - .find(|request| { - request.r#type() == RequestType::Incoming && data.sender.eq(request.did()) - }) - .cloned() - .ok_or(Error::FriendRequestDoesntExist)?; - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - self.emit_event(MultiPassEventKind::IncomingFriendRequestClosed { - did: data.sender, - }); - } - Event::Block => { - if self.has_request_from(&data.sender).await? { - self.emit_event(MultiPassEventKind::IncomingFriendRequestClosed { - did: data.sender.clone(), - }); - } else if self.sent_friend_request_to(&data.sender).await? { - self.emit_event(MultiPassEventKind::OutgoingFriendRequestRejected { - did: data.sender.clone(), - }); - } - - let list = self.list_all_raw_request().await?; - for req in list.iter().filter(|req| req.did().eq(&data.sender)) { - self.identity.root_document_remove_request(req).await?; - } - - if self.is_friend(&data.sender).await? { - self.remove_friend(&data.sender, false).await?; - } - - let completed = self - .identity - .root_document_add_block_by(&data.sender) - .await - .is_ok(); - if completed { - tokio::spawn({ - let store = self.identity.clone(); - let sender = data.sender.clone(); - async move { - let _ = store.push(&sender).await.ok(); - let _ = store.request(&sender, RequestOption::Identity).await.ok(); - } - }); - - if let Err(e) = self - .tx - .send(MultiPassEventKind::BlockedBy { did: data.sender }) - { - error!("Error broadcasting event: {e}"); - } - } - - if let Some(tx) = std::mem::take(&mut signal) { - log::debug!("Signaling broadcast of response..."); - let _ = tx.send(Err(Error::BlockedByUser)); - } - } - Event::Unblock => { - let completed = self - .identity - .root_document_remove_block_by(&data.sender) - .await - .is_ok(); - - if completed { - tokio::spawn({ - let store = self.identity.clone(); - let sender = data.sender.clone(); - async move { - let _ = store.push(&sender).await.ok(); - let _ = store.request(&sender, RequestOption::Identity).await.ok(); - } - }); - self.emit_event(MultiPassEventKind::UnblockedBy { did: data.sender }); - } - } - Event::Response => { - if let Some(tx) = std::mem::take(&mut signal) { - log::debug!("Signaling broadcast of response..."); - let _ = tx.send(Ok(())); - } - } - }; - if let Some(tx) = std::mem::take(&mut signal) { - log::debug!("Signaling broadcast of response..."); - let _ = tx.send(Ok(())); - } - - Ok(()) - } - - async fn local(&self) -> anyhow::Result<(ipfs::libp2p::identity::PublicKey, PeerId)> { - let (local_ipfs_public_key, local_peer_id) = self - .ipfs - .identity(None) - .await - .map(|info| (info.public_key.clone(), info.peer_id))?; - Ok((local_ipfs_public_key, local_peer_id)) - } -} - -impl FriendsStore { - #[tracing::instrument(skip(self))] - pub async fn send_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - if local_public_key.eq(pubkey) { - return Err(Error::CannotSendSelfFriendRequest); - } - - if self.is_friend(pubkey).await? { - return Err(Error::FriendExist); - } - - if self.is_blocked_by(pubkey).await? { - return Err(Error::BlockedByUser); - } - - if self.is_blocked(pubkey).await? { - return Err(Error::PublicKeyIsBlocked); - } - - if self.has_request_from(pubkey).await? { - return self.accept_request(pubkey).await; - } - - let list = self.list_all_raw_request().await?; - - if list - .iter() - .any(|request| request.r#type() == RequestType::Outgoing && request.did().eq(pubkey)) - { - // since the request has already been sent, we should not be sending it again - return Err(Error::FriendRequestExist); - } - - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Request, - }; - - self.broadcast_request((pubkey, &payload), true, true).await - } - - #[tracing::instrument(skip(self))] - pub async fn accept_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - if local_public_key.eq(pubkey) { - return Err(Error::CannotAcceptSelfAsFriend); - } - - if !self.has_request_from(pubkey).await? { - return Err(Error::FriendRequestDoesntExist); - } - - let list = self.list_all_raw_request().await?; - - let internal_request = list - .iter() - .find(|request| request.r#type() == RequestType::Incoming && request.did().eq(pubkey)) - .cloned() - .ok_or(Error::CannotFindFriendRequest)?; - - if self.is_friend(pubkey).await? { - warn!("Already friends. Removing request"); - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - return Ok(()); - } - - let payload = RequestResponsePayload { - event: Event::Accept, - sender: local_public_key, - }; - - self.add_friend(pubkey).await?; - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - self.broadcast_request((pubkey, &payload), false, true) - .await - } - - #[tracing::instrument(skip(self))] - pub async fn reject_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - if local_public_key.eq(pubkey) { - return Err(Error::CannotDenySelfAsFriend); - } - - if !self.has_request_from(pubkey).await? { - return Err(Error::FriendRequestDoesntExist); - } - - let list = self.list_all_raw_request().await?; - - // Although the request been validated before storing, we should validate again just to be safe - let internal_request = list - .iter() - .find(|request| request.r#type() == RequestType::Incoming && request.did().eq(pubkey)) - .cloned() - .ok_or(Error::CannotFindFriendRequest)?; - - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Reject, - }; - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - self.broadcast_request((pubkey, &payload), false, true) - .await - } - - #[tracing::instrument(skip(self))] - pub async fn close_request(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - let list = self.list_all_raw_request().await?; - - let internal_request = list - .iter() - .find(|request| request.r#type() == RequestType::Outgoing && request.did().eq(pubkey)) - .cloned() - .ok_or(Error::CannotFindFriendRequest)?; - - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Retract, - }; - - self.identity - .root_document_remove_request(&internal_request) - .await?; - - if let Some(entry) = self.queue.get(pubkey).await { - if entry.event == Event::Request { - self.queue.remove(pubkey).await; - self.emit_event(MultiPassEventKind::OutgoingFriendRequestClosed { - did: pubkey.clone(), - }); - - return Ok(()); - } - } - - self.broadcast_request((pubkey, &payload), false, true) - .await - } - - #[tracing::instrument(skip(self))] - pub async fn has_request_from(&self, pubkey: &DID) -> Result { - self.list_incoming_request() - .await - .map(|list| list.contains(pubkey)) - } -} - -impl FriendsStore { - #[tracing::instrument(skip(self))] - pub async fn block_list(&self) -> Result, Error> { - self.identity.root_document_get_blocks().await - } - - #[tracing::instrument(skip(self))] - pub async fn is_blocked(&self, public_key: &DID) -> Result { - self.block_list() - .await - .map(|list| list.contains(public_key)) - } - - #[tracing::instrument(skip(self))] - pub async fn block(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - if local_public_key.eq(pubkey) { - return Err(Error::CannotBlockOwnKey); - } - - if self.is_blocked(pubkey).await? { - return Err(Error::PublicKeyIsBlocked); - } - - self.identity.root_document_add_block(pubkey).await?; - - // Remove anything from queue related to the key - self.queue.remove(pubkey).await; - - let list = self.list_all_raw_request().await?; - for req in list.iter().filter(|req| req.did().eq(pubkey)) { - self.identity.root_document_remove_request(req).await?; - } - - if self.is_friend(pubkey).await? { - if let Err(e) = self.remove_friend(pubkey, false).await { - error!("Error removing item from friend list: {e}"); - } - } - - // Since we want to broadcast the remove request, banning the peer after would not allow that to happen - // Although this may get uncomment in the future to block connections regardless if its sent or not, or - // if we decide to send the request through a relay to broadcast it to the peer, however - // the moment this extension is reloaded the block list are considered as a "banned peer" in libp2p - - // let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); - - // self.ipfs.ban_peer(peer_id).await?; - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Block, - }; - - self.broadcast_request((pubkey, &payload), false, true) - .await - } - - #[tracing::instrument(skip(self))] - pub async fn unblock(&mut self, pubkey: &DID) -> Result<(), Error> { - let (local_ipfs_public_key, _) = self.local().await?; - - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - if local_public_key.eq(pubkey) { - return Err(Error::CannotUnblockOwnKey); - } - - if !self.is_blocked(pubkey).await? { - return Err(Error::PublicKeyIsntBlocked); - } - - self.identity.root_document_remove_block(pubkey).await?; - - let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); - self.ipfs.unban_peer(peer_id).await?; - - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Unblock, - }; - - self.broadcast_request((pubkey, &payload), false, true) - .await - } -} - -impl FriendsStore { - pub async fn block_by_list(&self) -> Result, Error> { - self.identity.root_document_get_block_by().await - } - - pub async fn is_blocked_by(&self, pubkey: &DID) -> Result { - self.block_by_list().await.map(|list| list.contains(pubkey)) - } -} - -impl FriendsStore { - pub async fn friends_list(&self) -> Result, Error> { - self.identity.root_document_get_friends().await - } - - // Should not be called directly but only after a request is accepted - #[tracing::instrument(skip(self))] - pub async fn add_friend(&mut self, pubkey: &DID) -> Result<(), Error> { - if self.is_friend(pubkey).await? { - return Err(Error::FriendExist); - } - - if self.is_blocked(pubkey).await? { - return Err(Error::PublicKeyIsBlocked); - } - - self.identity.root_document_add_friend(pubkey).await?; - - let phonebook = self.phonebook(); - if let Err(_e) = phonebook.add_friend(pubkey).await { - error!("Error: {_e}"); - } - - // Push to give an update in the event any wasnt transmitted during the initial push - // We dont care if this errors or not. - let _ = self.identity.push(pubkey).await.ok(); - - self.emit_event(MultiPassEventKind::FriendAdded { - did: pubkey.clone(), - }); - - Ok(()) - } - - #[tracing::instrument(skip(self, broadcast))] - pub async fn remove_friend(&mut self, pubkey: &DID, broadcast: bool) -> Result<(), Error> { - if !self.is_friend(pubkey).await? { - return Err(Error::FriendDoesntExist); - } - - self.identity.root_document_remove_friend(pubkey).await?; - - let phonebook = self.phonebook(); - - if let Err(_e) = phonebook.remove_friend(pubkey).await { - error!("Error: {_e}"); - } - - if broadcast { - let (local_ipfs_public_key, _) = self.local().await?; - let local_public_key = libp2p_pub_to_did(&local_ipfs_public_key)?; - - let payload = RequestResponsePayload { - sender: local_public_key, - event: Event::Remove, - }; - - self.broadcast_request((pubkey, &payload), false, true) - .await?; - } - - self.emit_event(MultiPassEventKind::FriendRemoved { - did: pubkey.clone(), - }); - - Ok(()) - } - - #[tracing::instrument(skip(self))] - pub async fn is_friend(&self, pubkey: &DID) -> Result { - self.friends_list().await.map(|list| list.contains(pubkey)) - } -} - -impl FriendsStore { - pub async fn list_all_raw_request(&self) -> Result, Error> { - self.identity.root_document_get_requests().await - } - - pub async fn received_friend_request_from(&self, did: &DID) -> Result { - self.list_incoming_request() - .await - .map(|list| list.iter().any(|request| request.eq(did))) - } - - #[tracing::instrument(skip(self))] - pub async fn list_incoming_request(&self) -> Result, Error> { - self.list_all_raw_request().await.map(|list| { - list.iter() - .filter_map(|request| match request { - Request::In(request) => Some(request), - _ => None, - }) - .cloned() - .collect::>() - }) - } - - #[tracing::instrument(skip(self))] - pub async fn sent_friend_request_to(&self, did: &DID) -> Result { - self.list_outgoing_request() - .await - .map(|list| list.iter().any(|request| request.eq(did))) - } - - #[tracing::instrument(skip(self))] - pub async fn list_outgoing_request(&self) -> Result, Error> { - self.list_all_raw_request().await.map(|list| { - list.iter() - .filter_map(|request| match request { - Request::Out(request) => Some(request), - _ => None, - }) - .cloned() - .collect::>() - }) - } - - #[tracing::instrument(skip(self))] - pub async fn broadcast_request( - &mut self, - (recipient, payload): (&DID, &RequestResponsePayload), - store_request: bool, - queue_broadcast: bool, - ) -> Result<(), Error> { - let remote_peer_id = did_to_libp2p_pub(recipient)?.to_peer_id(); - - if !self.discovery.contains(recipient).await { - self.discovery.insert(recipient).await?; - } - - if store_request { - let outgoing_request = Request::Out(recipient.clone()); - let list = self.list_all_raw_request().await?; - if !list.contains(&outgoing_request) { - self.identity - .root_document_add_request(&outgoing_request) - .await?; - } - } - - let kp = &*self.did_key; - - let payload_bytes = serde_json::to_vec(&payload)?; - - let bytes = ecdh_encrypt(kp, Some(recipient), payload_bytes)?; - - log::trace!("Request Payload size: {} bytes", bytes.len()); - - log::info!("Sending event to {recipient}"); - - let peers = self.ipfs.pubsub_peers(Some(recipient.inbox())).await?; - - let mut queued = false; - - let wait = self.wait_on_response.is_some(); - - let mut rx = (matches!(payload.event, Event::Request) && wait).then_some({ - let (tx, rx) = oneshot::channel(); - self.signal.write().await.insert(recipient.clone(), tx); - rx - }); - - let start = Instant::now(); - if !peers.contains(&remote_peer_id) - || (peers.contains(&remote_peer_id) - && self - .ipfs - .pubsub_publish(recipient.inbox(), bytes) - .await - .is_err()) - && queue_broadcast - { - self.queue.insert(recipient, payload.clone()).await; - queued = true; - self.signal.write().await.remove(recipient); - } - - if !queued { - let end = start.elapsed(); - log::trace!("Took {}ms to send event", end.as_millis()); - } - - if !queued && matches!(payload.event, Event::Request) { - if let Some(rx) = std::mem::take(&mut rx) { - if let Some(timeout) = self.wait_on_response { - let start = Instant::now(); - if let Ok(Ok(res)) = tokio::time::timeout(timeout, rx).await { - let end = start.elapsed(); - log::trace!("Took {}ms to receive a response", end.as_millis()); - res? - } - } - } - } - - match payload.event { - Event::Request => { - self.emit_event(MultiPassEventKind::FriendRequestSent { - to: recipient.clone(), - }); - } - Event::Retract => { - self.emit_event(MultiPassEventKind::OutgoingFriendRequestClosed { - did: recipient.clone(), - }); - } - Event::Reject => { - self.emit_event(MultiPassEventKind::IncomingFriendRequestRejected { - did: recipient.clone(), - }); - } - Event::Block => { - tokio::spawn({ - let store = self.identity.clone(); - let recipient = recipient.clone(); - async move { - let _ = store.push(&recipient).await.ok(); - let _ = store - .request(&recipient, RequestOption::Identity) - .await - .ok(); - } - }); - self.emit_event(MultiPassEventKind::Blocked { - did: recipient.clone(), - }); - } - Event::Unblock => { - tokio::spawn({ - let store = self.identity.clone(); - let recipient = recipient.clone(); - async move { - let _ = store.push(&recipient).await.ok(); - let _ = store - .request(&recipient, RequestOption::Identity) - .await - .ok(); - } - }); - - self.emit_event(MultiPassEventKind::Unblocked { - did: recipient.clone(), - }); - } - _ => {} - }; - Ok(()) - } -} - -impl FriendsStore { - pub fn emit_event(&self, event: MultiPassEventKind) { - if let Err(e) = self.tx.send(event) { - error!("Error broadcasting event: {e}"); - } - } -} diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 3809914c5..b51126782 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -2,6 +2,7 @@ //onto the lock. #![allow(clippy::clone_on_copy)] use crate::{ + behaviour::phonebook::PhoneBookCommand, config::{self, Discovery as DiscoveryConfig, UpdateEvents}, store::{did_to_libp2p_pub, discovery::Discovery, PeerIdExt, PeerTopic, VecExt}, }; @@ -15,12 +16,12 @@ use libipld::Cid; use rust_ipfs as ipfs; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, path::PathBuf, time::{Duration, Instant}, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, RwLock}; use tracing::{ log::{self, error}, warn, @@ -39,15 +40,15 @@ use warp::{ }; use super::{ - connected_to_peer, + connected_to_peer, did_keypair, document::{ identity::{unixfs_fetch, IdentityDocument}, utils::GetLocalDag, ExtractedRootDocument, RootDocument, ToCid, }, - ecdh_decrypt, ecdh_encrypt, - friends::{FriendsStore, Request}, - libp2p_pub_to_did, + ecdh_decrypt, ecdh_encrypt, libp2p_pub_to_did, + phonebook::PhoneBook, + queue::Queue, }; #[derive(Clone)] @@ -64,6 +65,18 @@ pub struct IdentityStore { online_status: Arc>>, + // keypair + did_key: Arc, + + // Queue to handle sending friend request + queue: Queue, + + phonebook: PhoneBook, + + wait_on_response: Option, + + signal: Arc>>>>, + discovery: Discovery, config: config::Config, @@ -76,8 +89,73 @@ pub struct IdentityStore { Arc>>>, event: broadcast::Sender, +} - friend_store: Arc>>, +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub enum Request { + In(DID), + Out(DID), +} + +impl From for RequestType { + fn from(request: Request) -> Self { + RequestType::from(&request) + } +} + +impl From<&Request> for RequestType { + fn from(request: &Request) -> Self { + match request { + Request::In(_) => RequestType::Incoming, + Request::Out(_) => RequestType::Outgoing, + } + } +} + +impl Request { + pub fn r#type(&self) -> RequestType { + self.into() + } + + pub fn did(&self) -> &DID { + match self { + Request::In(did) => did, + Request::Out(did) => did, + } + } +} + +#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Hash, Eq)] +#[serde(rename_all = "lowercase", tag = "type")] +pub enum Event { + /// Event indicating a friend request + Request, + /// Event accepting the request + Accept, + /// Remove identity as a friend + Remove, + /// Reject friend request, if any + Reject, + /// Retract a sent friend request + Retract, + /// Block user + Block, + /// Unblock user + Unblock, + /// Indiciation of a response to a request + Response, +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Hash, Eq)] +pub struct RequestResponsePayload { + pub sender: DID, + pub event: Event, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestType { + Incoming, + Outgoing, } #[allow(clippy::large_enum_variant)] @@ -156,12 +234,13 @@ impl std::fmt::Debug for ResponseOption { } impl IdentityStore { - pub async fn new( + pub async fn new( ipfs: Ipfs, path: Option, tesseract: Tesseract, interval: Option, tx: broadcast::Sender, + pb_tx: futures::channel::mpsc::Sender, config: &config::Config, discovery: Discovery, ) -> Result { @@ -177,8 +256,27 @@ impl IdentityStore { let online_status = Arc::default(); let root_task = Arc::default(); let task_send = Arc::default(); - let event = tx; - let friend_store = Arc::default(); + let event = tx.clone(); + + let did_key = Arc::new(did_keypair(&tesseract)?); + + let queue = Queue::new( + ipfs.clone(), + did_key.clone(), + config.path.clone(), + discovery.clone(), + ); + + let phonebook = PhoneBook::new( + ipfs.clone(), + discovery.clone(), + tx.clone(), + config.store_setting.emit_online_event, + pb_tx, + ); + + let signal = Default::default(); + let wait_on_response = config.store_setting.friend_request_response_duration; let store = Self { ipfs, @@ -193,7 +291,11 @@ impl IdentityStore { root_task, task_send, event, - friend_store, + did_key, + queue, + phonebook, + signal, + wait_on_response, }; if store.path.is_some() { @@ -221,15 +323,39 @@ impl IdentityStore { let mut discovery_rx = store.discovery.events(); + log::info!("Loading queue"); + if let Err(_e) = store.queue.load().await {} + + let phonebook = &store.phonebook; + log::info!("Loading friends list into phonebook"); + if let Ok(friends) = store.friends_list().await { + if let Err(_e) = phonebook.add_friend_list(friends).await { + error!("Error adding friends in phonebook: {_e}"); + } + } + + // scan through friends list to see if there is any incoming request or outgoing request matching + // and clear them out of the request list as a precautionary measure + let friends = store.friends_list().await.unwrap_or_default(); + + for friend in friends { + let list = store.list_all_raw_request().await.unwrap_or_default(); + + // cleanup outgoing + for req in list.iter().filter(|req| req.did().eq(&friend)) { + let _ = store.root_document_remove_request(req).await.ok(); + } + } + + let friend_stream = store.ipfs.pubsub_subscribe(store.did_key.inbox()).await?; + tokio::spawn({ let mut store = store.clone(); async move { let _main_stream = main_stream; - if let Err(e) = store.discovery.start().await { - warn!("Error starting discovery service: {e}. Will not be able to discover peers over namespace"); - } futures::pin_mut!(event_stream); + futures::pin_mut!(friend_stream); let auto_push = interval.is_some(); @@ -247,24 +373,40 @@ impl IdentityStore { loop { tokio::select! { - message = event_stream.next() => { - if let Some(message) = message { - let entry = match message.source { - Some(peer_id) => match store.discovery.get(peer_id).await.ok() { - Some(entry) => entry.peer_id().to_did().ok(), - None => { - let _ = store.discovery.insert(peer_id).await.ok(); - peer_id.to_did().ok() - }, + Some(message) = event_stream.next() => { + let entry = match message.source { + Some(peer_id) => match store.discovery.get(peer_id).await.ok() { + Some(entry) => entry.peer_id().to_did().ok(), + None => { + let _ = store.discovery.insert(peer_id).await.ok(); + peer_id.to_did().ok() }, - None => continue, - }; - if let Some(in_did) = entry { - if let Err(e) = store.process_message(in_did, &message.data).await { - error!("Error: {e}"); - } + }, + None => continue, + }; + if let Some(in_did) = entry { + if let Err(e) = store.process_message(in_did, &message.data).await { + error!("Error: {e}"); } } + + } + Some(event) = friend_stream.next() => { + let Some(peer_id) = event.source else { + //Note: Due to configuration, we should ALWAYS have a peer set in its source + // thus we can ignore the request if no peer is provided + continue; + }; + + let Ok(did) = peer_id.to_did() else { + //Note: The peer id is embedded with ed25519 public key, therefore we can decode it into a did key + // otherwise we can ignore + continue; + }; + + if let Err(e) = store.check_request_message(&did, &event.data).await { + error!("Error: {e}"); + } } // Used as the initial request/push Ok(push) = discovery_rx.recv() => { @@ -289,12 +431,8 @@ impl IdentityStore { Ok(store) } - pub async fn set_friend_store(&self, store: FriendsStore) { - *self.friend_store.write().await = Some(store) - } - - async fn friend_store(&self) -> Result { - self.friend_store.read().await.clone().ok_or(Error::Other) + pub(crate) fn phonebook(&self) -> &PhoneBook { + &self.phonebook } async fn start_root_task(&self) { @@ -696,6 +834,244 @@ impl IdentityStore { *self.task_send.write().await = Some(tx); *self.root_task.write().await = Some(task); } + //TODO: Implement Errors + #[tracing::instrument(skip(self, data))] + async fn check_request_message(&mut self, did: &DID, data: &[u8]) -> anyhow::Result<()> { + let pk_did = &*self.did_key; + + let bytes = ecdh_decrypt(pk_did, Some(did), data)?; + + log::trace!("received payload size: {} bytes", bytes.len()); + + let data = serde_json::from_slice::(&bytes)?; + + log::info!("Received event from {did}"); + + if self + .list_incoming_request() + .await + .unwrap_or_default() + .contains(&data.sender) + && data.event == Event::Request + { + warn!("Request exist locally. Skipping"); + return Ok(()); + } + + //TODO: Send error if dropped early due to error when processing request + let mut signal = self.signal.write().await.remove(&data.sender); + + log::debug!("Event {:?}", data.event); + + // Before we validate the request, we should check to see if the key is blocked + // If it is, skip the request so we dont wait resources storing it. + if self.is_blocked(&data.sender).await? && !matches!(data.event, Event::Block) { + log::warn!("Received event from a blocked identity."); + let payload = RequestResponsePayload { + sender: (*self.did_key).clone(), + event: Event::Block, + }; + + self.broadcast_request((&data.sender, &payload), false, true) + .await?; + + return Ok(()); + } + + match data.event { + Event::Accept => { + let list = self.list_all_raw_request().await?; + + let Some(item) = list + .iter() + .filter(|req| req.r#type() == RequestType::Outgoing) + .find(|req| data.sender.eq(req.did())) + .cloned() + else { + anyhow::bail!( + "Unable to locate pending request. Already been accepted or rejected?" + ) + }; + + // Maybe just try the function instead and have it be a hard error? + if self.root_document_remove_request(&item).await.is_err() { + anyhow::bail!( + "Unable to locate pending request. Already been accepted or rejected?" + ) + } + + self.add_friend(item.did()).await?; + } + Event::Request => { + if self.is_friend(&data.sender).await? { + log::debug!("Friend already exist. Remitting event"); + let payload = RequestResponsePayload { + sender: (*self.did_key).clone(), + event: Event::Accept, + }; + + self.broadcast_request((&data.sender, &payload), false, false) + .await?; + + return Ok(()); + } + + let list = self.list_all_raw_request().await?; + + if let Some(inner_req) = list + .iter() + .find(|request| { + request.r#type() == RequestType::Outgoing && data.sender.eq(request.did()) + }) + .cloned() + { + //Because there is also a corresponding outgoing request for the incoming request + //we can automatically add them + self.root_document_remove_request(&inner_req).await?; + self.add_friend(inner_req.did()).await?; + } else { + self.root_document_add_request(&Request::In(data.sender.clone())) + .await?; + + tokio::spawn({ + let store = self.clone(); + let from = data.sender.clone(); + async move { + let _ = tokio::time::timeout(Duration::from_secs(10), async { + loop { + if let Ok(list) = + store.lookup(LookupBy::DidKey(from.clone())).await + { + if !list.is_empty() { + break; + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }) + .await + .ok(); + + store.emit_event(MultiPassEventKind::FriendRequestReceived { from }); + } + }); + } + let payload = RequestResponsePayload { + sender: (*self.did_key).clone(), + event: Event::Response, + }; + + self.broadcast_request((&data.sender, &payload), false, false) + .await?; + } + Event::Reject => { + let list = self.list_all_raw_request().await?; + let internal_request = list + .iter() + .find(|request| { + request.r#type() == RequestType::Outgoing && data.sender.eq(request.did()) + }) + .cloned() + .ok_or(Error::FriendRequestDoesntExist)?; + + self.root_document_remove_request(&internal_request).await?; + + self.emit_event(MultiPassEventKind::OutgoingFriendRequestRejected { + did: data.sender, + }); + } + Event::Remove => { + if self.is_friend(&data.sender).await? { + self.remove_friend(&data.sender, false).await?; + } + } + Event::Retract => { + let list = self.list_all_raw_request().await?; + let internal_request = list + .iter() + .find(|request| { + request.r#type() == RequestType::Incoming && data.sender.eq(request.did()) + }) + .cloned() + .ok_or(Error::FriendRequestDoesntExist)?; + + self.root_document_remove_request(&internal_request).await?; + + self.emit_event(MultiPassEventKind::IncomingFriendRequestClosed { + did: data.sender, + }); + } + Event::Block => { + if self.has_request_from(&data.sender).await? { + self.emit_event(MultiPassEventKind::IncomingFriendRequestClosed { + did: data.sender.clone(), + }); + } else if self.sent_friend_request_to(&data.sender).await? { + self.emit_event(MultiPassEventKind::OutgoingFriendRequestRejected { + did: data.sender.clone(), + }); + } + + let list = self.list_all_raw_request().await?; + for req in list.iter().filter(|req| req.did().eq(&data.sender)) { + self.root_document_remove_request(req).await?; + } + + if self.is_friend(&data.sender).await? { + self.remove_friend(&data.sender, false).await?; + } + + let completed = self.root_document_add_block_by(&data.sender).await.is_ok(); + if completed { + tokio::spawn({ + let store = self.clone(); + let sender = data.sender.clone(); + async move { + let _ = store.push(&sender).await.ok(); + let _ = store.request(&sender, RequestOption::Identity).await.ok(); + } + }); + + self.emit_event(MultiPassEventKind::BlockedBy { did: data.sender }); + } + + if let Some(tx) = std::mem::take(&mut signal) { + log::debug!("Signaling broadcast of response..."); + let _ = tx.send(Err(Error::BlockedByUser)); + } + } + Event::Unblock => { + let completed = self + .root_document_remove_block_by(&data.sender) + .await + .is_ok(); + + if completed { + tokio::spawn({ + let store = self.clone(); + let sender = data.sender.clone(); + async move { + let _ = store.push(&sender).await.ok(); + let _ = store.request(&sender, RequestOption::Identity).await.ok(); + } + }); + self.emit_event(MultiPassEventKind::UnblockedBy { did: data.sender }); + } + } + Event::Response => { + if let Some(tx) = std::mem::take(&mut signal) { + log::debug!("Signaling broadcast of response..."); + let _ = tx.send(Ok(())); + } + } + }; + if let Some(tx) = std::mem::take(&mut signal) { + log::debug!("Signaling broadcast of response..."); + let _ = tx.send(Ok(())); + } + + Ok(()) + } async fn push_iter>(&self, list: I) { for did in list { @@ -756,20 +1132,11 @@ impl IdentityStore { let mut identity = self.own_identity_document().await?; - let is_friend = match self.friend_store().await { - Ok(store) => store.is_friend(out_did).await.unwrap_or_default(), - _ => false, - }; + let is_friend = self.is_friend(out_did).await.unwrap_or_default(); - let is_blocked = match self.friend_store().await { - Ok(store) => store.is_blocked(out_did).await.unwrap_or_default(), - _ => false, - }; + let is_blocked = self.is_blocked(out_did).await.unwrap_or_default(); - let is_blocked_by = match self.friend_store().await { - Ok(store) => store.is_blocked_by(out_did).await.unwrap_or_default(), - _ => false, - }; + let is_blocked_by = self.is_blocked_by(out_did).await.unwrap_or_default(); let share_platform = self.config.store_setting.share_platform; @@ -932,6 +1299,7 @@ impl IdentityStore { } #[tracing::instrument(skip(self, message))] + #[allow(clippy::if_same_then_else)] async fn process_message(&mut self, in_did: DID, message: &[u8]) -> anyhow::Result<()> { let pk_did = self.get_keypair_did()?; @@ -1024,12 +1392,9 @@ impl IdentityStore { } else if matches!( self.config.store_setting.update_events, UpdateEvents::FriendsOnly | UpdateEvents::EmitFriendsOnly - ) { - if let Ok(store) = self.friend_store().await { - if store.is_friend(&document_did).await.unwrap_or_default() { - emit = true; - } - } + ) && self.is_friend(&document_did).await.unwrap_or_default() + { + emit = true; } tokio::spawn({ let store = self.clone(); @@ -1193,12 +1558,9 @@ impl IdentityStore { } else if matches!( self.config.store_setting.update_events, UpdateEvents::FriendsOnly | UpdateEvents::EmitFriendsOnly - ) { - if let Ok(store) = self.friend_store().await { - if store.is_friend(&document_did).await.unwrap_or_default() { - emit = true; - } - } + ) && self.is_friend(&document_did).await.unwrap_or_default() + { + emit = true; } if emit { @@ -1380,16 +1742,15 @@ impl IdentityStore { self.save_cid(root_cid).await?; self.update_identity().await?; - if let Ok(store) = self.friend_store().await { - log::info!("Loading friends list into phonebook"); - if let Ok(friends) = store.friends_list().await { - let phonebook = store.phonebook(); + log::info!("Loading friends list into phonebook"); + if let Ok(friends) = self.friends_list().await { + let phonebook = self.phonebook(); - if let Err(_e) = phonebook.add_friend_list(friends).await { - error!("Error adding friends in phonebook: {_e}"); - } + if let Err(_e) = phonebook.add_friend_list(friends).await { + error!("Error adding friends in phonebook: {_e}"); } } + Ok(identity) } @@ -2169,3 +2530,504 @@ impl IdentityStore { let _ = self.event.send(event); } } + +impl IdentityStore { + #[tracing::instrument(skip(self))] + pub async fn send_request(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + if local_public_key.eq(pubkey) { + return Err(Error::CannotSendSelfFriendRequest); + } + + if self.is_friend(pubkey).await? { + return Err(Error::FriendExist); + } + + if self.is_blocked_by(pubkey).await? { + return Err(Error::BlockedByUser); + } + + if self.is_blocked(pubkey).await? { + return Err(Error::PublicKeyIsBlocked); + } + + if self.has_request_from(pubkey).await? { + return self.accept_request(pubkey).await; + } + + let list = self.list_all_raw_request().await?; + + if list + .iter() + .any(|request| request.r#type() == RequestType::Outgoing && request.did().eq(pubkey)) + { + // since the request has already been sent, we should not be sending it again + return Err(Error::FriendRequestExist); + } + + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Request, + }; + + self.broadcast_request((pubkey, &payload), true, true).await + } + + #[tracing::instrument(skip(self))] + pub async fn accept_request(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + if local_public_key.eq(pubkey) { + return Err(Error::CannotAcceptSelfAsFriend); + } + + if !self.has_request_from(pubkey).await? { + return Err(Error::FriendRequestDoesntExist); + } + + let list = self.list_all_raw_request().await?; + + let internal_request = list + .iter() + .find(|request| request.r#type() == RequestType::Incoming && request.did().eq(pubkey)) + .cloned() + .ok_or(Error::CannotFindFriendRequest)?; + + if self.is_friend(pubkey).await? { + warn!("Already friends. Removing request"); + + self.root_document_remove_request(&internal_request).await?; + + return Ok(()); + } + + let payload = RequestResponsePayload { + event: Event::Accept, + sender: local_public_key, + }; + + self.add_friend(pubkey).await?; + + self.root_document_remove_request(&internal_request).await?; + + self.broadcast_request((pubkey, &payload), false, true) + .await + } + + #[tracing::instrument(skip(self))] + pub async fn reject_request(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + if local_public_key.eq(pubkey) { + return Err(Error::CannotDenySelfAsFriend); + } + + if !self.has_request_from(pubkey).await? { + return Err(Error::FriendRequestDoesntExist); + } + + let list = self.list_all_raw_request().await?; + + // Although the request been validated before storing, we should validate again just to be safe + let internal_request = list + .iter() + .find(|request| request.r#type() == RequestType::Incoming && request.did().eq(pubkey)) + .cloned() + .ok_or(Error::CannotFindFriendRequest)?; + + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Reject, + }; + + self.root_document_remove_request(&internal_request).await?; + + self.broadcast_request((pubkey, &payload), false, true) + .await + } + + #[tracing::instrument(skip(self))] + pub async fn close_request(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + let list = self.list_all_raw_request().await?; + + let internal_request = list + .iter() + .find(|request| request.r#type() == RequestType::Outgoing && request.did().eq(pubkey)) + .cloned() + .ok_or(Error::CannotFindFriendRequest)?; + + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Retract, + }; + + self.root_document_remove_request(&internal_request).await?; + + if let Some(entry) = self.queue.get(pubkey).await { + if entry.event == Event::Request { + self.queue.remove(pubkey).await; + self.emit_event(MultiPassEventKind::OutgoingFriendRequestClosed { + did: pubkey.clone(), + }); + + return Ok(()); + } + } + + self.broadcast_request((pubkey, &payload), false, true) + .await + } + + #[tracing::instrument(skip(self))] + pub async fn has_request_from(&self, pubkey: &DID) -> Result { + self.list_incoming_request() + .await + .map(|list| list.contains(pubkey)) + } +} + +impl IdentityStore { + #[tracing::instrument(skip(self))] + pub async fn block_list(&self) -> Result, Error> { + self.root_document_get_blocks().await + } + + #[tracing::instrument(skip(self))] + pub async fn is_blocked(&self, public_key: &DID) -> Result { + self.block_list() + .await + .map(|list| list.contains(public_key)) + } + + #[tracing::instrument(skip(self))] + pub async fn block(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + if local_public_key.eq(pubkey) { + return Err(Error::CannotBlockOwnKey); + } + + if self.is_blocked(pubkey).await? { + return Err(Error::PublicKeyIsBlocked); + } + + self.root_document_add_block(pubkey).await?; + + // Remove anything from queue related to the key + self.queue.remove(pubkey).await; + + let list = self.list_all_raw_request().await?; + for req in list.iter().filter(|req| req.did().eq(pubkey)) { + self.root_document_remove_request(req).await?; + } + + if self.is_friend(pubkey).await? { + if let Err(e) = self.remove_friend(pubkey, false).await { + error!("Error removing item from friend list: {e}"); + } + } + + // Since we want to broadcast the remove request, banning the peer after would not allow that to happen + // Although this may get uncomment in the future to block connections regardless if its sent or not, or + // if we decide to send the request through a relay to broadcast it to the peer, however + // the moment this extension is reloaded the block list are considered as a "banned peer" in libp2p + + // let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); + + // self.ipfs.ban_peer(peer_id).await?; + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Block, + }; + + self.broadcast_request((pubkey, &payload), false, true) + .await + } + + #[tracing::instrument(skip(self))] + pub async fn unblock(&mut self, pubkey: &DID) -> Result<(), Error> { + let local_public_key = (&*self.did_key).clone(); + + if local_public_key.eq(pubkey) { + return Err(Error::CannotUnblockOwnKey); + } + + if !self.is_blocked(pubkey).await? { + return Err(Error::PublicKeyIsntBlocked); + } + + self.root_document_remove_block(pubkey).await?; + + let peer_id = did_to_libp2p_pub(pubkey)?.to_peer_id(); + self.ipfs.unban_peer(peer_id).await?; + + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Unblock, + }; + + self.broadcast_request((pubkey, &payload), false, true) + .await + } +} + +impl IdentityStore { + pub async fn block_by_list(&self) -> Result, Error> { + self.root_document_get_block_by().await + } + + pub async fn is_blocked_by(&self, pubkey: &DID) -> Result { + self.block_by_list().await.map(|list| list.contains(pubkey)) + } +} + +impl IdentityStore { + pub async fn friends_list(&self) -> Result, Error> { + self.root_document_get_friends().await + } + + // Should not be called directly but only after a request is accepted + #[tracing::instrument(skip(self))] + pub async fn add_friend(&mut self, pubkey: &DID) -> Result<(), Error> { + if self.is_friend(pubkey).await? { + return Err(Error::FriendExist); + } + + if self.is_blocked(pubkey).await? { + return Err(Error::PublicKeyIsBlocked); + } + + self.root_document_add_friend(pubkey).await?; + + let phonebook = self.phonebook(); + if let Err(_e) = phonebook.add_friend(pubkey).await { + error!("Error: {_e}"); + } + + // Push to give an update in the event any wasnt transmitted during the initial push + // We dont care if this errors or not. + let _ = self.push(pubkey).await.ok(); + + self.emit_event(MultiPassEventKind::FriendAdded { + did: pubkey.clone(), + }); + + Ok(()) + } + + #[tracing::instrument(skip(self, broadcast))] + pub async fn remove_friend(&mut self, pubkey: &DID, broadcast: bool) -> Result<(), Error> { + if !self.is_friend(pubkey).await? { + return Err(Error::FriendDoesntExist); + } + + self.root_document_remove_friend(pubkey).await?; + + let phonebook = self.phonebook(); + + if let Err(_e) = phonebook.remove_friend(pubkey).await { + error!("Error: {_e}"); + } + + if broadcast { + let local_public_key = (&*self.did_key).clone(); + + let payload = RequestResponsePayload { + sender: local_public_key, + event: Event::Remove, + }; + + self.broadcast_request((pubkey, &payload), false, true) + .await?; + } + + self.emit_event(MultiPassEventKind::FriendRemoved { + did: pubkey.clone(), + }); + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub async fn is_friend(&self, pubkey: &DID) -> Result { + self.friends_list().await.map(|list| list.contains(pubkey)) + } +} + +impl IdentityStore { + pub async fn list_all_raw_request(&self) -> Result, Error> { + self.root_document_get_requests().await + } + + pub async fn received_friend_request_from(&self, did: &DID) -> Result { + self.list_incoming_request() + .await + .map(|list| list.iter().any(|request| request.eq(did))) + } + + #[tracing::instrument(skip(self))] + pub async fn list_incoming_request(&self) -> Result, Error> { + self.list_all_raw_request().await.map(|list| { + list.iter() + .filter_map(|request| match request { + Request::In(request) => Some(request), + _ => None, + }) + .cloned() + .collect::>() + }) + } + + #[tracing::instrument(skip(self))] + pub async fn sent_friend_request_to(&self, did: &DID) -> Result { + self.list_outgoing_request() + .await + .map(|list| list.iter().any(|request| request.eq(did))) + } + + #[tracing::instrument(skip(self))] + pub async fn list_outgoing_request(&self) -> Result, Error> { + self.list_all_raw_request().await.map(|list| { + list.iter() + .filter_map(|request| match request { + Request::Out(request) => Some(request), + _ => None, + }) + .cloned() + .collect::>() + }) + } + + #[tracing::instrument(skip(self))] + pub async fn broadcast_request( + &mut self, + (recipient, payload): (&DID, &RequestResponsePayload), + store_request: bool, + queue_broadcast: bool, + ) -> Result<(), Error> { + let remote_peer_id = did_to_libp2p_pub(recipient)?.to_peer_id(); + + if !self.discovery.contains(recipient).await { + self.discovery.insert(recipient).await?; + } + + if store_request { + let outgoing_request = Request::Out(recipient.clone()); + let list = self.list_all_raw_request().await?; + if !list.contains(&outgoing_request) { + self.root_document_add_request(&outgoing_request).await?; + } + } + + let kp = &*self.did_key; + + let payload_bytes = serde_json::to_vec(&payload)?; + + let bytes = ecdh_encrypt(kp, Some(recipient), payload_bytes)?; + + log::trace!("Request Payload size: {} bytes", bytes.len()); + + log::info!("Sending event to {recipient}"); + + let peers = self.ipfs.pubsub_peers(Some(recipient.inbox())).await?; + + let mut queued = false; + + let wait = self.wait_on_response.is_some(); + + let mut rx = (matches!(payload.event, Event::Request) && wait).then_some({ + let (tx, rx) = oneshot::channel(); + self.signal.write().await.insert(recipient.clone(), tx); + rx + }); + + let start = Instant::now(); + if !peers.contains(&remote_peer_id) + || (peers.contains(&remote_peer_id) + && self + .ipfs + .pubsub_publish(recipient.inbox(), bytes) + .await + .is_err()) + && queue_broadcast + { + self.queue.insert(recipient, payload.clone()).await; + queued = true; + self.signal.write().await.remove(recipient); + } + + if !queued { + let end = start.elapsed(); + log::trace!("Took {}ms to send event", end.as_millis()); + } + + if !queued && matches!(payload.event, Event::Request) { + if let Some(rx) = std::mem::take(&mut rx) { + if let Some(timeout) = self.wait_on_response { + let start = Instant::now(); + if let Ok(Ok(res)) = tokio::time::timeout(timeout, rx).await { + let end = start.elapsed(); + log::trace!("Took {}ms to receive a response", end.as_millis()); + res? + } + } + } + } + + match payload.event { + Event::Request => { + self.emit_event(MultiPassEventKind::FriendRequestSent { + to: recipient.clone(), + }); + } + Event::Retract => { + self.emit_event(MultiPassEventKind::OutgoingFriendRequestClosed { + did: recipient.clone(), + }); + } + Event::Reject => { + self.emit_event(MultiPassEventKind::IncomingFriendRequestRejected { + did: recipient.clone(), + }); + } + Event::Block => { + tokio::spawn({ + let store = self.clone(); + let recipient = recipient.clone(); + async move { + let _ = store.push(&recipient).await.ok(); + let _ = store + .request(&recipient, RequestOption::Identity) + .await + .ok(); + } + }); + self.emit_event(MultiPassEventKind::Blocked { + did: recipient.clone(), + }); + } + Event::Unblock => { + tokio::spawn({ + let store = self.clone(); + let recipient = recipient.clone(); + async move { + let _ = store.push(&recipient).await.ok(); + let _ = store + .request(&recipient, RequestOption::Identity) + .await + .ok(); + } + }); + + self.emit_event(MultiPassEventKind::Unblocked { + did: recipient.clone(), + }); + } + _ => {} + }; + Ok(()) + } +} diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 72e10179f..88e6cf6f1 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -44,7 +44,6 @@ use crate::store::{ use super::conversation::{ConversationDocument, MessageDocument}; use super::discovery::Discovery; use super::document::utils::{GetLocalDag, ToCid}; -use super::friends::FriendsStore; use super::identity::IdentityStore; use super::keystore::Keystore; use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, DidExt, MessagingEvents}; @@ -83,7 +82,7 @@ pub struct MessageStore { identity: IdentityStore, // friend store - friends: FriendsStore, + // friends: FriendsStore, // discovery discovery: Discovery, @@ -117,7 +116,7 @@ impl MessageStore { ipfs: Ipfs, path: Option, identity: IdentityStore, - friends: FriendsStore, + // friends: FriendsStore, discovery: Discovery, filesystem: Option>, _: bool, @@ -160,7 +159,7 @@ impl MessageStore { conversation_sender, conversation_task_tx, identity, - friends, + // friends, discovery, filesystem, queue, @@ -1433,7 +1432,7 @@ impl MessageStore { return Ok(()); } - if let Ok(true) = self.friends.is_blocked(&recipient).await { + if let Ok(true) = self.identity.is_blocked(&recipient).await { //TODO: Signal back to close conversation warn!("{recipient} is blocked"); return Ok(()); @@ -1800,11 +1799,11 @@ impl MessageStore { impl MessageStore { pub async fn create_conversation(&mut self, did_key: &DID) -> Result { - if self.with_friends.load(Ordering::SeqCst) && !self.friends.is_friend(did_key).await? { + if self.with_friends.load(Ordering::SeqCst) && !self.identity.is_friend(did_key).await? { return Err(Error::FriendDoesntExist); } - if let Ok(true) = self.friends.is_blocked(did_key).await { + if let Ok(true) = self.identity.is_blocked(did_key).await { return Err(Error::PublicKeyIsBlocked); } @@ -1935,12 +1934,12 @@ impl MessageStore { let mut removal = vec![]; for did in recipients.iter() { - if self.with_friends.load(Ordering::SeqCst) && !self.friends.is_friend(did).await? { + if self.with_friends.load(Ordering::SeqCst) && !self.identity.is_friend(did).await? { info!("{did} is not on the friends list.. removing from list"); removal.push(did.clone()); } - if let Ok(true) = self.friends.is_blocked(did).await { + if let Ok(true) = self.identity.is_blocked(did).await { info!("{did} is blocked.. removing from list"); removal.push(did.clone()); } @@ -2749,7 +2748,7 @@ impl MessageStore { return Err(Error::PublicKeyInvalid); } - if self.friends.is_blocked(did_key).await? { + if self.identity.is_blocked(did_key).await? { return Err(Error::PublicKeyIsBlocked); } diff --git a/extensions/warp-ipfs/src/store/mod.rs b/extensions/warp-ipfs/src/store/mod.rs index 258deb436..a696ba312 100644 --- a/extensions/warp-ipfs/src/store/mod.rs +++ b/extensions/warp-ipfs/src/store/mod.rs @@ -2,7 +2,6 @@ pub mod conversation; pub mod discovery; pub mod document; pub mod files; -pub mod friends; pub mod identity; pub mod keystore; pub mod message; diff --git a/extensions/warp-ipfs/src/store/queue.rs b/extensions/warp-ipfs/src/store/queue.rs index 400e210ac..c113a2a9a 100644 --- a/extensions/warp-ipfs/src/store/queue.rs +++ b/extensions/warp-ipfs/src/store/queue.rs @@ -21,7 +21,7 @@ use warp::{ use crate::store::{ecdh_encrypt, PeerIdExt, PeerTopic}; -use super::{connected_to_peer, discovery::Discovery, friends::RequestResponsePayload}; +use super::{connected_to_peer, discovery::Discovery, identity::RequestResponsePayload}; pub struct Queue { path: Option,