diff --git a/ds/Cargo.toml b/ds/Cargo.toml index e7615c4..242e789 100644 --- a/ds/Cargo.toml +++ b/ds/Cargo.toml @@ -23,6 +23,7 @@ alloy = { git = "https://github.com/alloy-rs/alloy", features = [ "k256", "rlp", ] } +tokio-util = "=0.7.11" openmls = { version = "=0.5.0", features = ["test-utils"] } rand = { version = "^0.8" } diff --git a/ds/src/chat_client.rs b/ds/src/chat_client.rs index 6a0efa1..136659b 100644 --- a/ds/src/chat_client.rs +++ b/ds/src/chat_client.rs @@ -1,8 +1,5 @@ use alloy::primitives::Address; -use alloy::providers::ProviderBuilder; -use alloy::rlp::Encodable; use alloy::signers::Signature; -use alloy::signers::{local::PrivateKeySigner, SignerSync}; use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -74,7 +71,6 @@ impl ResponseMLSPayload { pub struct ChatClient { sender: mpsc::UnboundedSender, - pub request_in_progress: Arc>, } impl ChatClient { @@ -92,7 +88,7 @@ impl ChatClient { // Spawn a task to handle outgoing messages tokio::spawn(async move { while let Some(message) = receiver.lock().await.recv().await { - println!("Message from reciever: {}", message); + // println!("Message from reciever: {}", message); if let Err(e) = write.send(message).await { eprintln!("Error sending message: {}", e); } @@ -104,7 +100,6 @@ impl ChatClient { let mut read = read; while let Some(message) = read.next().await { if let Ok(msg) = message { - println!("Message from read: {}", msg); if let Err(e) = msg_sender.send(msg) { eprintln!("Failed to send message to channel: {}", e); } @@ -121,29 +116,21 @@ impl ChatClient { .send(Message::Text(join_json)) .map_err(|_| ChatServiceError::SendError)?; - Ok(( - ChatClient { - sender, - request_in_progress: Arc::new(Mutex::new(())), - }, - msg_receiver, - )) + Ok((ChatClient { sender }, msg_receiver)) } pub async fn send_request(&self, msg: ServerMessage) -> Result<(), ChatServiceError> { - let _guard = self.request_in_progress.lock().await; self.send_message_to_server(msg)?; Ok(()) } pub async fn handle_response(&self) -> Result<(), ChatServiceError> { - drop(self.request_in_progress.lock().await); Ok(()) } pub fn send_message_to_server(&self, msg: ServerMessage) -> Result<(), ChatServiceError> { let msg_json = serde_json::to_string(&msg).unwrap(); - println!("Message to sender: {}", msg_json); + // println!("Message to sender: {}", msg_json); self.sender .send(Message::Text(msg_json)) .map_err(|_| ChatServiceError::SendError)?; @@ -153,7 +140,8 @@ impl ChatClient { #[test] fn test_sign() { - let signer = PrivateKeySigner::from_str( + use alloy::signers::SignerSync; + let signer = alloy::signers::local::PrivateKeySigner::from_str( "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", ) .unwrap(); diff --git a/ds/src/chat_server.rs b/ds/src/chat_server.rs index 62092ca..978e372 100644 --- a/ds/src/chat_server.rs +++ b/ds/src/chat_server.rs @@ -3,14 +3,11 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::{ - net::{TcpListener, TcpStream}, + net::TcpListener, sync::{mpsc, Mutex}, - task, }; -use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_tungstenite::{accept_async, tungstenite::protocol::Message}; -use crate::chat_client::ChatMessages; use crate::ChatServiceError; type Tx = mpsc::UnboundedSender; diff --git a/ds/src/ds.rs b/ds/src/ds.rs index e4c761d..4742b1b 100644 --- a/ds/src/ds.rs +++ b/ds/src/ds.rs @@ -5,7 +5,11 @@ use fred::{ types::Message, }; use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast::Receiver; +use tokio::sync::{ + broadcast::Receiver, + mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, +}; +use tokio_util::sync::CancellationToken; use openmls::{framing::MlsMessageOut, prelude::TlsSerializeTrait}; // use waku_bindings::*; @@ -68,14 +72,124 @@ impl RClient { } } +pub struct RClientPrivate { + chat_id: String, + sender_id: String, + token: CancellationToken, + sender: UnboundedSender>, + reciever: UnboundedReceiver>, +} + +impl RClientPrivate { + pub async fn new_for_users( + chat_id: String, + sender_id: String, + ) -> Result { + let redis_client = RedisClient::default(); + let subscriber: SubscriberClient = + Builder::default_centralized().build_subscriber_client()?; + redis_client.init().await?; + subscriber.init().await?; + subscriber.subscribe(chat_id.clone()).await?; + + let (sender_publish, mut receiver_publish) = mpsc::unbounded_channel::>(); + let (sender_subscriber, receiver_subscriber) = mpsc::unbounded_channel::>(); + + let chat_id_c = chat_id.clone(); + let redis_client_c = redis_client.clone(); + tokio::spawn(async move { + while let Some(msg) = receiver_publish.recv().await { + redis_client_c + .publish(chat_id_c.to_owned(), msg.as_slice()) + .await?; + } + Ok::<_, DeliveryServiceError>(()) + }); + let subscriber_c = subscriber.clone(); + tokio::spawn(async move { + let mut ss = subscriber_c.message_rx(); + while let Ok(msg) = ss.recv().await { + let bytes: Vec = msg.value.convert()?; + if let Err(e) = sender_subscriber.send(bytes) { + return Err(DeliveryServiceError::TokioSendError(e)); + } + } + Ok(()) + }); + + let token = CancellationToken::new(); + + let cloned_token = token.clone(); + let chat_id_c = chat_id.clone(); + tokio::spawn(async move { + token.cancelled().await; + subscriber.unsubscribe(chat_id_c).await?; + subscriber.quit().await?; + redis_client.quit().await?; + Ok::<_, DeliveryServiceError>(()) + }); + + Ok(RClientPrivate { + chat_id: chat_id.clone(), + sender_id, + token: cloned_token, + sender: sender_publish, + reciever: receiver_subscriber, + }) + } + + pub fn msg_send(&mut self, msg: Vec) -> Result<(), DeliveryServiceError> { + let json_value = SenderStruct { + sender: self.sender_id.clone(), + msg, + }; + let bytes = serde_json::to_vec(&json_value)?; + self.sender.send(bytes)?; + Ok(()) + } + + pub async fn msg_recv(&mut self) -> Result, DeliveryServiceError> { + while let Some(msg) = self.reciever.recv().await { + let m: SenderStruct = serde_json::from_slice(&msg)?; + if m.sender == self.sender_id { + continue; + } + return Ok(m.msg); + } + Err(DeliveryServiceError::EmptyMsgError) + } +} + #[derive(Debug, thiserror::Error)] pub enum DeliveryServiceError { + #[error("Empty msg")] + EmptyMsgError, #[error("Json error: {0}")] JsonError(#[from] serde_json::Error), #[error("Redis error: {0}")] RedisError(#[from] RedisError), + #[error("Channel sender error: {0}")] + TokioSendError(#[from] SendError>), #[error("Serialization problem: {0}")] TlsError(#[from] tls_codec::Error), #[error("Unknown error: {0}")] Other(anyhow::Error), } + +#[tokio::test] +async fn private_test() { + let mut alice = RClientPrivate::new_for_users("alicebob".to_string(), "alice".to_string()) + .await + .unwrap(); + let mut bob = RClientPrivate::new_for_users("alicebob".to_string(), "bob".to_string()) + .await + .unwrap(); + + alice.msg_send("Hi bob".as_bytes().to_vec()).unwrap(); + let msg = bob.msg_recv().await.unwrap(); + assert_eq!("Hi bob".to_string(), String::from_utf8(msg).unwrap()); + + bob.msg_send("Hi alice".as_bytes().to_vec()).unwrap(); + let msg = alice.msg_recv().await.unwrap(); + assert_eq!("Hi alice".to_string(), String::from_utf8(msg).unwrap()); +} diff --git a/sc_key_store/src/sc_ks.rs b/sc_key_store/src/sc_ks.rs index 73bd0a0..8af043f 100644 --- a/sc_key_store/src/sc_ks.rs +++ b/sc_key_store/src/sc_ks.rs @@ -41,9 +41,7 @@ impl, N: Network> SCKeyStoreService async fn add_user(&mut self, address: &str) -> Result<(), KeyStoreError> { if self.does_user_exist(address).await? { - println!("User already exist"); - // return Err(KeyStoreError::AlreadyExistedUserError); - return Ok(()); + return Err(KeyStoreError::AlreadyExistedUserError); } let add_to_acl_binding = self.instance.addUser(Address::from_str(address)?); diff --git a/src/contact.rs b/src/contact.rs index a3224b3..aacfe6e 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1,4 +1,4 @@ -use alloy::{hex::ToHexExt, primitives::SignatureError, signers::Signature}; +use alloy::{hex::ToHexExt, primitives::SignatureError}; use ds::{ chat_client::{ ChatClient, ChatMessages, ReqMessageType, RequestMLSPayload, ResponseMLSPayload, @@ -12,11 +12,13 @@ use openmls::prelude::MlsMessageOut; use std::{collections::HashMap, sync::Arc}; use tls_codec::Serialize; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub const CHAT_SERVER_ADDR: &str = "ws://127.0.0.1:8080"; pub struct ContactsList { contacts: Arc>>, + pub future_req: HashMap, pub chat_client: ChatClient, } @@ -51,6 +53,7 @@ impl ContactsList { pub async fn new(chat_client: ChatClient) -> Result { Ok(ContactsList { contacts: Arc::new(Mutex::new(HashMap::new())), + future_req: HashMap::new(), chat_client, }) } @@ -82,6 +85,9 @@ impl ContactsList { sc_address: String, msg_type: ReqMessageType, ) -> Result<(), ContactError> { + self.future_req + .insert(user_address.to_string(), CancellationToken::new()); + let req = ChatMessages::Request(RequestMLSPayload::new(sc_address, msg_type)); self.chat_client .send_request(ServerMessage::InMessage { @@ -90,6 +96,7 @@ impl ContactsList { msg: serde_json::to_string(&req)?, }) .await?; + Ok(()) } @@ -195,6 +202,16 @@ impl ContactsList { Ok(joiners_kp) } + + pub fn handle_response(&mut self, user_address: &str) -> Result<(), ContactError> { + match self.future_req.get(user_address) { + Some(token) => { + token.cancel(); + Ok(()) + } + None => Err(ContactError::UnknownUserError), + } + } } #[derive(Debug, thiserror::Error)] diff --git a/src/identity.rs b/src/identity.rs index bf530b4..ba58488 100644 --- a/src/identity.rs +++ b/src/identity.rs @@ -1,5 +1,4 @@ use alloy::primitives::Address; -use hex; use std::collections::HashMap; use openmls::{credentials::CredentialWithKey, key_packages::*, prelude::*}; diff --git a/src/main.rs b/src/main.rs index 0fd8bfe..5ca564c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,12 @@ -use alloy::{network::NetworkWallet, providers::ProviderBuilder, signers::local::PrivateKeySigner}; +use alloy::{providers::ProviderBuilder, signers::local::PrivateKeySigner}; use clap::Parser; use ds::{ chat_client::{ChatClient, ChatMessages, ReqMessageType}, - chat_server::{start_server, ServerMessage}, + chat_server::ServerMessage, }; -use openmls::framing::MlsMessageIn; -use std::{any::Any, error::Error, fs::File, io::Read, str::FromStr}; -use tls_codec::Deserialize; -use tokio::sync::mpsc; + +use std::{borrow::BorrowMut, error::Error, str::FromStr, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::tungstenite::protocol::Message as TokioMessage; use tokio_util::sync::CancellationToken; @@ -18,7 +17,6 @@ use de_mls::{ #[tokio::main] async fn main() -> Result<(), Box> { - // console_subscriber::init(); let token = CancellationToken::new(); let (cli_tx, mut cli_gr_rx) = mpsc::channel::(100); @@ -29,7 +27,8 @@ async fn main() -> Result<(), Box> { let (client, mut client_recv) = ChatClient::connect("ws://127.0.0.1:8080", &user_address.to_string()).await?; //// Create user - let mut user = User::new(&args.user_priv_key, client).await?; + let user_n = User::new(&args.user_priv_key, client).await?; + let user_arc = Arc::new(Mutex::new(user_n)); let (messages_tx, messages_rx) = mpsc::channel::(100); messages_tx @@ -45,6 +44,7 @@ async fn main() -> Result<(), Box> { let res_msg_tx = messages_tx.clone(); let main_token = token.clone(); + let user = user_arc.clone(); let h2 = tokio::spawn(async move { let (redis_tx, mut redis_rx) = mpsc::channel::>(100); loop { @@ -52,100 +52,55 @@ async fn main() -> Result<(), Box> { Some(msg) = client_recv.recv() => { if let TokioMessage::Text(text) = msg { if let Ok(chat_message) = serde_json::from_str::(&text) { - let m = format!("MSG:\n{:?}", chat_message); - res_msg_tx.send(Msg::Input(Message::System(m))).await?; - match chat_message { - ServerMessage::SystemJoin{username}=>{ - let msg = format!("Client1 received SystemJoin message for user: {}",username); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - } - ServerMessage::InMessage { from, to, msg } => { - let m = format!("S::InMessage:\n{:?}", msg); - res_msg_tx.send(Msg::Input(Message::System(m))).await?; - if let Ok(chat_msg) = serde_json::from_str::(&msg) { - let m = format!("ChatMessage: {:?}", chat_msg); - res_msg_tx.send(Msg::Input(Message::System(m))).await?; - match chat_msg { - ChatMessages::Request(req) => { - let res = user.send_responce_on_request(req, &from); - match res { - Ok(_) => { - let msg = format!("Succesfully create responce"); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - } - } - }, - ChatMessages::Response(resp) => { - let res = user.parce_responce(resp, "test".to_string()).await; - match res { - Ok(_) => { - let msg = format!("Succesfully parse responce"); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - } - } - let res = user.contacts.chat_client.handle_response().await; - match res { - Ok(_) => { - let msg = format!("Succesfully handle responce"); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - } - } - }, - ChatMessages::Welcome(welcome) => { - let wbytes = hex::decode(welcome).unwrap(); - let welc = MlsMessageIn::tls_deserialize_bytes(wbytes).unwrap(); - let welcome = welc.into_welcome(); - if welcome.is_some() { - let res = user.join_group(welcome.unwrap()).await; - match res { - Ok(mut buf) => { - let msg = format!("Succesfully join to the group: {:#?}", buf.1); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; + if let ServerMessage::InMessage { from, to, msg } = chat_message { + if let Ok(chat_msg) = serde_json::from_str::(&msg) { + match chat_msg { + ChatMessages::Request(req) => { + let res = user.as_ref().lock().await.send_responce_on_request(req, to[0].clone(), &from); + if let Err(err) = res { + res_msg_tx + .send(Msg::Input(Message::Error(err.to_string()))) + .await?; + } + }, + ChatMessages::Response(resp) => { + let res = user.as_ref().lock().await.parce_responce(resp, "test".to_string()).await; + if let Err(err) = res { + res_msg_tx + .send(Msg::Input(Message::Error(err.to_string()))) + .await?; + } + }, + ChatMessages::Welcome(welcome) => { + let res = user.as_ref().lock().await.join_group(welcome).await; + match res { + Ok(mut buf) => { + let msg = format!("Succesfully join to the group: {:#?}", buf.1); + res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - let redis_tx = redis_tx.clone(); - tokio::spawn(async move { - while let Ok(msg) = buf.0.recv().await { - let bytes: Vec = msg.value.convert()?; - redis_tx.send(bytes).await?; - } - Ok::<_, CliError>(()) - }); - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - }, - }; - } else { + let redis_tx = redis_tx.clone(); + tokio::spawn(async move { + while let Ok(msg) = buf.0.recv().await { + let bytes: Vec = msg.value.convert()?; + redis_tx.send(bytes).await?; + } + Ok::<_, CliError>(()) + }); + }, + Err(err) => { res_msg_tx - .send(Msg::Input(Message::Error(UserError::EmptyWelcomeMessageError.to_string()))) + .send(Msg::Input(Message::Error(err.to_string()))) .await?; - } - }, - } - } else { - res_msg_tx - .send(Msg::Input(Message::Error(UserError::InvalidChatMessageError.to_string()))) - .await?; + }, + }; + }, } - }, - } + } else { + res_msg_tx + .send(Msg::Input(Message::Error(UserError::InvalidChatMessageError.to_string()))) + .await?; + } + }; } else { res_msg_tx .send(Msg::Input(Message::Error(UserError::InvalidServerMessageError.to_string()))) @@ -154,7 +109,7 @@ async fn main() -> Result<(), Box> { } } Some(val) = redis_rx.recv() =>{ - let res = user.receive_msg(val).await; + let res = user.as_ref().lock().await.receive_msg(val).await; match res { Ok(msg) => { match msg { @@ -175,9 +130,9 @@ async fn main() -> Result<(), Box> { Commands::CreateGroup { group_name, storage_address, storage_url } => { let client_provider = ProviderBuilder::new() .with_recommended_fillers() - .wallet(user.get_wallet()) + .wallet(user.as_ref().lock().await.get_wallet()) .on_http(storage_url); - let res = user.connect_to_smart_contract(&storage_address, client_provider).await; + let res = user.as_ref().lock().await.connect_to_smart_contract(&storage_address, client_provider).await; match res { Ok(_) => { let msg = format!("Successfully connect to Smart Contract on address {:}\n", storage_address); @@ -190,7 +145,7 @@ async fn main() -> Result<(), Box> { }, }; - let res = user.create_group(group_name.clone()).await; + let res = user.as_ref().lock().await.create_group(group_name.clone()).await; match res { Ok(mut br) => { let msg = format!("Successfully create group: {:?}", group_name.clone()); @@ -213,81 +168,65 @@ async fn main() -> Result<(), Box> { }; }, Commands::Invite { group_name, users_wallet_addrs } => { - let res = user.contacts.send_req_msg_to_user( - user.identity.to_string(), - &users_wallet_addrs[0], - "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512".to_string(), - ReqMessageType::InviteToGroup, - ).await; - match res { - Ok(_) => { - let msg = format!("Send request {:?} to the group {:}\n", - users_wallet_addrs, group_name - ); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - }, - }; + let u_c = user.clone(); + let res_msg_tx_c = messages_tx.clone(); + tokio::spawn(async move { + for user_wallet in users_wallet_addrs.iter() { + let u_c_f = u_c.as_ref(); + let opt_token = + { + let mut u_c_ff = u_c_f.lock().await; + if !u_c_ff.contacts.does_user_in_contacts(user_wallet).await { + u_c_ff.contacts.add_new_contact(user_wallet).await.unwrap(); + } + u_c_ff.contacts + .send_req_msg_to_user( + user_address.to_string(), + user_wallet, + "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512".to_string(), + ReqMessageType::InviteToGroup, + ) + .await.unwrap(); - let _ = user.contacts.chat_client.request_in_progress.lock().await; + u_c_ff.contacts.future_req.get(user_wallet).cloned() + }; - let res = user.add_user_to_acl(&users_wallet_addrs[0]).await; - match res { - Ok(_) => { - let msg = format!("Add user to acl {} for group {}\n", - &users_wallet_addrs[0], group_name - ); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?;}, - Err(err) => {res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?;}, - } + match opt_token { + Some(token) => token.cancelled().await, + None => return Err(CliError::SplitLineError), + }; - // for user_wallet in users_wallet_addrs.iter() { - // if user.contacts.does_user_in_contacts(user_wallet) { - // user.add_user_to_acl(user_wallet).await?; - // user.contacts.send_req_msg_to_user( - // user.identity.to_string(), - // user_wallet, - // user.sc_ks.as_mut().unwrap().get_sc_adsress(), - // ReqMessageType::InviteToGroup, - // )?; - // } else { - // user.contacts.send_req_msg_to_user( - // user.identity.to_string(), - // user_wallet, - // user.sc_ks.as_mut().unwrap().get_sc_adsress(), - // ReqMessageType::InviteToGroup, - // )?; - // user.add_user_to_acl(user_wallet).await?; - // } - // } + { + let mut u_c = u_c.as_ref().lock().await; + u_c.contacts.future_req.remove(user_wallet); + u_c.add_user_to_acl(user_wallet).await.unwrap(); + } - let res = user.invite(users_wallet_addrs.clone(), group_name.clone()).await; - match res { - Ok(_) => { - let msg = format!("Invite {:?} to the group {:}\n", - users_wallet_addrs, group_name - ); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; - }, - Err(err) => { - res_msg_tx - .send(Msg::Input(Message::Error(err.to_string()))) - .await?; - }, - }; + } + + let res = u_c.as_ref().lock().await.invite(users_wallet_addrs.clone(), group_name.clone()).await; + match res { + Ok(_) => { + let msg = format!("Invite {:?} to the group {:}\n", + users_wallet_addrs, group_name + ); + res_msg_tx_c.send(Msg::Input(Message::System(msg))).await?; + }, + Err(err) => { + res_msg_tx_c + .send(Msg::Input(Message::Error(err.to_string()))) + .await?; + }, + }; + Ok::<_, CliError>(()) + }); }, Commands::SendMessage { group_name, msg } => { let message = msg.join(" "); - let res = user.send_msg(&message, group_name.clone(), user.identity.to_string()).await; + let res = user.as_ref().lock().await.send_msg(&message, group_name.clone(), user_address.to_string()).await; match res { Ok(_) => { - res_msg_tx.send(Msg::Input(Message::Mine(group_name, user.identity.to_string(), message ))).await?; + res_msg_tx.send(Msg::Input(Message::Mine(group_name, user_address.to_string(), message ))).await?; }, Err(err) => { res_msg_tx diff --git a/src/user.rs b/src/user.rs index 6a9b7b0..f881905 100644 --- a/src/user.rs +++ b/src/user.rs @@ -139,8 +139,6 @@ where return Err(UserError::EmptyScConnection); } self.sc_ks.as_mut().unwrap().add_user(user_address).await?; - - self.contacts.add_new_contact(user_address).await?; Ok(()) } @@ -168,6 +166,29 @@ where return Err(UserError::EmptyScConnection); } + // for user_wallet in users.iter() { + // if !self.contacts.does_user_in_contacts(user_wallet).await { + // self.contacts.add_new_contact(user_wallet).await?; + // } + // self.contacts + // .send_req_msg_to_user( + // self.identity.to_string(), + // user_wallet, + // self.sc_ks.as_mut().unwrap().get_sc_adsress(), + // ReqMessageType::InviteToGroup, + // ) + // .await?; + // println!("waiting token"); + // match self.contacts.future_req.get(user_wallet) { + // Some(token) => token.cancelled().await, + // None => return Err(UserError::UnknownUserError), + // }; + // println!("cancelled token"); + // self.contacts.future_req.remove(user_wallet); + + // self.add_user_to_acl(user_wallet).await?; + // } + let users_for_invite = self .contacts .prepare_joiners(users.clone(), group_name.clone()) @@ -308,14 +329,22 @@ where pub async fn join_group( &mut self, - welcome: Welcome, + welcome: String, ) -> Result<(Receiver, String), UserError> { + let wbytes = hex::decode(welcome).unwrap(); + let welc = MlsMessageIn::tls_deserialize_bytes(wbytes).unwrap(); + let welcome = welc.into_welcome(); + if welcome.is_none() { + return Err(UserError::EmptyWelcomeMessageError); + } + let group_config = MlsGroupConfig::builder() .use_ratchet_tree_extension(true) .build(); // TODO: After we move from openmls, we will have to delete the used key package here ourselves. - let mls_group = MlsGroup::new_from_welcome(&self.provider, &group_config, welcome, None)?; + let mls_group = + MlsGroup::new_from_welcome(&self.provider, &group_config, welcome.unwrap(), None)?; let group_id = mls_group.group_id().to_vec(); let group_name = String::from_utf8(group_id)?; @@ -405,6 +434,7 @@ where pub fn send_responce_on_request( &mut self, req: RequestMLSPayload, + self_address: String, user_address: &str, ) -> Result<(), UserError> { match req.msg_type { @@ -415,14 +445,11 @@ where .generate_key_package(CIPHERSUITE, &self.provider)?; let resp = ResponseMLSPayload::new( signature, - self.identity.to_string(), + self_address.clone(), key_package.tls_serialize_detached()?, ); - self.contacts.send_resp_msg_to_user( - self.identity.to_string(), - user_address, - resp, - )?; + self.contacts + .send_resp_msg_to_user(self_address, user_address, resp)?; Ok(()) } @@ -439,12 +466,21 @@ where return Err(UserError::EmptyScConnection); } let (user_wallet, kp) = resp.validate(self.sc_ks.as_ref().unwrap().get_sc_adsress())?; + self.contacts - .add_key_package_to_contact(&user_wallet, kp, group_name) + .add_key_package_to_contact(&user_wallet, kp, group_name.clone()) .await?; + self.contacts.handle_response(&user_wallet)?; Ok(()) } + + pub fn get_sc_address(&self) -> Result { + if self.sc_ks.is_none() { + return Err(UserError::EmptyScConnection); + } + Ok(self.sc_ks.as_ref().unwrap().get_sc_adsress()) + } } impl Group { @@ -496,6 +532,8 @@ pub enum UserError { InvalidChatMessageError, #[error("Message from server is invalid")] InvalidServerMessageError, + #[error("Unknown user")] + UnknownUserError, #[error(transparent)] DeliveryServiceError(#[from] DeliveryServiceError), diff --git a/tests/sc_key_store_test.rs b/tests/sc_key_store_test.rs index d38a5b9..8546ff3 100644 --- a/tests/sc_key_store_test.rs +++ b/tests/sc_key_store_test.rs @@ -117,7 +117,11 @@ async fn test_input_request() { if let Ok(chat_msg) = serde_json::from_str::(&msg) { match chat_msg { ChatMessages::Request(req) => { - let res = alice.send_responce_on_request(req, &from); + let res = alice.send_responce_on_request( + req, + to[0].clone(), + &from, + ); match res { Ok(_) => { println!("Succesfully create responce"); @@ -140,26 +144,18 @@ async fn test_input_request() { } } ChatMessages::Welcome(welcome) => { - let wbytes = hex::decode(welcome).unwrap(); - let welc = - MlsMessageIn::tls_deserialize_bytes(wbytes).unwrap(); - let welcome = welc.into_welcome(); - if welcome.is_some() { - let res = alice.join_group(welcome.unwrap()).await; - match res { - Ok(mut buf) => { - let msg = format!( - "Succesfully join to the group: {:#?}", - buf.1 - ); - } - Err(err) => { - eprintln!("Error: {}", err); - } - }; - } else { - eprintln!("Error: empty welcome"); - } + let res = alice.join_group(welcome).await; + match res { + Ok(mut buf) => { + let msg = format!( + "Succesfully join to the group: {:#?}", + buf.1 + ); + } + Err(err) => { + eprintln!("Error: {}", err); + } + }; } } }