diff --git a/ds/src/chat_client.rs b/ds/src/chat_client.rs index 136659b..8c31f02 100644 --- a/ds/src/chat_client.rs +++ b/ds/src/chat_client.rs @@ -8,62 +8,82 @@ use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::tungstenite::protocol::Message; use crate::chat_server::ServerMessage; -use crate::ChatServiceError; +use crate::DeliveryServiceError; -// pub const REQUEST: &str = "You are joining the group with smart contract: "; - -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub enum ChatMessages { Request(RequestMLSPayload), Response(ResponseMLSPayload), Welcome(String), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub enum ReqMessageType { InviteToGroup, RemoveFromGroup, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub struct RequestMLSPayload { - pub msg: String, + sc_address: String, + group_name: String, pub msg_type: ReqMessageType, } impl RequestMLSPayload { - pub fn new(sc_address: String, msg_type: ReqMessageType) -> Self { + pub fn new(sc_address: String, group_name: String, msg_type: ReqMessageType) -> Self { RequestMLSPayload { - msg: sc_address, + sc_address, + group_name, msg_type, } } + + pub fn msg_to_sign(&self) -> String { + self.sc_address.to_owned() + &self.group_name + } + + pub fn group_name(&self) -> String { + self.group_name.clone() + } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub struct ResponseMLSPayload { signature: String, user_address: String, + pub group_name: String, key_package: Vec, } impl ResponseMLSPayload { - pub fn new(signature: String, user_address: String, key_package: Vec) -> Self { + pub fn new( + signature: String, + user_address: String, + group_name: String, + key_package: Vec, + ) -> Self { Self { signature, user_address, + group_name, key_package, } } - pub fn validate(&self, sc_address: String) -> Result<(String, Vec), ChatServiceError> { + pub fn validate( + &self, + sc_address: String, + group_name: String, + ) -> Result<(String, Vec), DeliveryServiceError> { let recover_sig: Signature = serde_json::from_str(&self.signature)?; let addr = Address::from_str(&self.user_address)?; // Recover the signer from the message. - let recovered = recover_sig.recover_address_from_msg(sc_address)?; + let recovered = + recover_sig.recover_address_from_msg(sc_address.to_owned() + &group_name)?; if recovered.ne(&addr) { - return Err(ChatServiceError::ValidationError); + return Err(DeliveryServiceError::ValidationError(recovered.to_string())); } Ok((self.user_address.clone(), self.key_package.clone())) } @@ -75,10 +95,10 @@ pub struct ChatClient { impl ChatClient { pub async fn connect( - addr: &str, - username: &str, - ) -> Result<(Self, mpsc::UnboundedReceiver), ChatServiceError> { - let (ws_stream, _) = tokio_tungstenite::connect_async(addr).await?; + server_addr: &str, + username: String, + ) -> Result<(Self, mpsc::UnboundedReceiver), DeliveryServiceError> { + let (ws_stream, _) = tokio_tungstenite::connect_async(server_addr).await?; let (mut write, read) = ws_stream.split(); let (sender, receiver) = mpsc::unbounded_channel(); let (msg_sender, msg_receiver) = mpsc::unbounded_channel(); @@ -88,23 +108,22 @@ impl ChatClient { // Spawn a task to handle outgoing messages tokio::spawn(async move { while let Some(message) = receiver.lock().await.recv().await { - // println!("Message from reciever: {}", message); - if let Err(e) = write.send(message).await { - eprintln!("Error sending message: {}", e); + if let Err(err) = write.send(message).await { + return Err(DeliveryServiceError::SenderError(err.to_string())); } } + Ok(()) }); // Spawn a task to handle incoming messages tokio::spawn(async move { let mut read = read; - while let Some(message) = read.next().await { - if let Ok(msg) = message { - if let Err(e) = msg_sender.send(msg) { - eprintln!("Failed to send message to channel: {}", e); - } + while let Some(Ok(message)) = read.next().await { + if let Err(err) = msg_sender.send(message) { + return Err(DeliveryServiceError::SenderError(err.to_string())); } } + Ok(()) }); // Send a SystemJoin message when registering @@ -114,26 +133,16 @@ impl ChatClient { let join_json = serde_json::to_string(&join_msg).unwrap(); sender .send(Message::Text(join_json)) - .map_err(|_| ChatServiceError::SendError)?; + .map_err(|err| DeliveryServiceError::SenderError(err.to_string()))?; Ok((ChatClient { sender }, msg_receiver)) } - pub async fn send_request(&self, msg: ServerMessage) -> Result<(), ChatServiceError> { - self.send_message_to_server(msg)?; - Ok(()) - } - - pub async fn handle_response(&self) -> Result<(), ChatServiceError> { - Ok(()) - } - - pub fn send_message_to_server(&self, msg: ServerMessage) -> Result<(), ChatServiceError> { + pub fn send_message(&self, msg: ServerMessage) -> Result<(), DeliveryServiceError> { let msg_json = serde_json::to_string(&msg).unwrap(); - // println!("Message to sender: {}", msg_json); self.sender .send(Message::Text(msg_json)) - .map_err(|_| ChatServiceError::SendError)?; + .map_err(|err| DeliveryServiceError::SenderError(err.to_string()))?; Ok(()) } } @@ -163,6 +172,7 @@ fn test_sign() { fn json_test() { let inner_msg = ChatMessages::Request(RequestMLSPayload::new( "sc_address".to_string(), + "group_name".to_string(), ReqMessageType::InviteToGroup, )); @@ -183,19 +193,14 @@ fn json_test() { //// if let Ok(chat_message) = serde_json::from_str::(&json_server_msg) { - println!("Server: {:?}", chat_message); + assert_eq!(chat_message, server_msg); match chat_message { ServerMessage::InMessage { from, to, msg } => { - println!("Chat: {:?}", msg); if let Ok(chat_msg) = serde_json::from_str::(&msg) { - match chat_msg { - ChatMessages::Request(req) => println!("Request: {:?}", req), - ChatMessages::Response(_) => println!("Response"), - ChatMessages::Welcome(_) => println!("Welcome"), - } + assert_eq!(chat_msg, inner_msg); } } - ServerMessage::SystemJoin { username } => println!("SystemJoin"), + ServerMessage::SystemJoin { username } => {} } } } diff --git a/ds/src/chat_server.rs b/ds/src/chat_server.rs index 978e372..9703bdd 100644 --- a/ds/src/chat_server.rs +++ b/ds/src/chat_server.rs @@ -8,12 +8,12 @@ use tokio::{ }; use tokio_tungstenite::{accept_async, tungstenite::protocol::Message}; -use crate::ChatServiceError; +use crate::DeliveryServiceError; type Tx = mpsc::UnboundedSender; type PeerMap = Arc>>; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(tag = "type")] pub enum ServerMessage { InMessage { @@ -26,7 +26,7 @@ pub enum ServerMessage { }, } -pub async fn start_server(addr: &str) -> Result<(), ChatServiceError> { +pub async fn start_server(addr: &str) -> Result<(), DeliveryServiceError> { let listener = TcpListener::bind(addr).await?; let peers = PeerMap::new(Mutex::new(HashMap::new())); @@ -46,7 +46,7 @@ pub async fn start_server(addr: &str) -> Result<(), ChatServiceError> { async fn handle_connection( peers: PeerMap, stream: tokio::net::TcpStream, -) -> Result<(), ChatServiceError> { +) -> Result<(), DeliveryServiceError> { let ws_stream = accept_async(stream).await?; let (mut write, mut read) = ws_stream.split(); let (sender, receiver) = mpsc::unbounded_channel(); @@ -57,16 +57,15 @@ async fn handle_connection( // Spawn a task to handle outgoing messages tokio::spawn(async move { while let Some(message) = receiver.lock().await.recv().await { - println!("raw message out: {}", message); - if let Err(e) = write.send(message).await { - eprintln!("Error sending message: {}", e); + if let Err(err) = write.send(message).await { + return Err(DeliveryServiceError::SenderError(err.to_string())); } } + Ok(()) }); // Handle incoming messages while let Some(Ok(Message::Text(text))) = read.next().await { - println!("raw message in {}", text); if let Ok(chat_message) = serde_json::from_str::(&text) { match chat_message { ServerMessage::SystemJoin { @@ -81,12 +80,7 @@ async fn handle_connection( } ServerMessage::InMessage { from, to, msg } => { println!("Received message from {} to {:?}: {}", from, to, msg); - println!( - "\t got contact list {:?}", - peers.lock().await.keys().collect::>() - ); for recipient in to { - println!("\t rcpt {}", recipient); if let Some(recipient_sender) = peers.lock().await.get(&recipient) { let message = ServerMessage::InMessage { from: from.clone(), @@ -96,7 +90,9 @@ async fn handle_connection( let message_json = serde_json::to_string(&message).unwrap(); recipient_sender .send(Message::Text(message_json)) - .map_err(|_| ChatServiceError::SendError)?; + .map_err(|err| { + DeliveryServiceError::SenderError(err.to_string()) + })?; } } } diff --git a/ds/src/ds.rs b/ds/src/ds.rs index 4742b1b..b7b9992 100644 --- a/ds/src/ds.rs +++ b/ds/src/ds.rs @@ -1,21 +1,15 @@ use fred::{ clients::{RedisClient, SubscriberClient}, - error::RedisError, prelude::*, types::Message, }; use serde::{Deserialize, Serialize}; -use tokio::sync::{ - broadcast::Receiver, - mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, -}; -use tokio_util::sync::CancellationToken; +use tokio::sync::broadcast::Receiver; -use openmls::{framing::MlsMessageOut, prelude::TlsSerializeTrait}; +use crate::DeliveryServiceError; // use waku_bindings::*; pub struct RClient { - group_id: String, client: RedisClient, sub_client: SubscriberClient, // broadcaster: Receiver, @@ -28,7 +22,7 @@ pub struct SenderStruct { } impl RClient { - pub async fn new_for_group( + pub async fn new_with_group( group_id: String, ) -> Result<(Self, Receiver), DeliveryServiceError> { let redis_client = RedisClient::default(); @@ -39,157 +33,28 @@ impl RClient { subscriber.subscribe(group_id.clone()).await?; Ok(( RClient { - group_id, client: redis_client, sub_client: subscriber.clone(), - // broadcaster: subscriber.message_rx(), }, subscriber.message_rx(), )) } - pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> { - self.sub_client.unsubscribe(self.group_id.clone()).await?; - self.sub_client.quit().await?; - self.client.quit().await?; + pub async fn remove_group(&mut self, group_id: String) -> Result<(), DeliveryServiceError> { + self.sub_client.unsubscribe(group_id).await?; Ok(()) } pub async fn msg_send( &mut self, - msg: MlsMessageOut, + msg: Vec, sender: String, + group_id: String, ) -> Result<(), DeliveryServiceError> { - let buf = msg.tls_serialize_detached()?; - - let json_value = SenderStruct { sender, msg: buf }; + let json_value = SenderStruct { sender, msg }; let bytes = serde_json::to_vec(&json_value)?; - self.client - .publish(self.group_id.clone(), bytes.as_slice()) - .await?; - - Ok(()) - } -} - -pub struct RClientPrivate { - chat_id: String, - sender_id: String, - token: CancellationToken, - sender: UnboundedSender>, - reciever: UnboundedReceiver>, -} - -impl RClientPrivate { - pub async fn new_for_users( - chat_id: String, - sender_id: String, - ) -> Result { - let redis_client = RedisClient::default(); - let subscriber: SubscriberClient = - Builder::default_centralized().build_subscriber_client()?; - redis_client.init().await?; - subscriber.init().await?; - subscriber.subscribe(chat_id.clone()).await?; + self.client.publish(group_id, bytes.as_slice()).await?; - let (sender_publish, mut receiver_publish) = mpsc::unbounded_channel::>(); - let (sender_subscriber, receiver_subscriber) = mpsc::unbounded_channel::>(); - - let chat_id_c = chat_id.clone(); - let redis_client_c = redis_client.clone(); - tokio::spawn(async move { - while let Some(msg) = receiver_publish.recv().await { - redis_client_c - .publish(chat_id_c.to_owned(), msg.as_slice()) - .await?; - } - Ok::<_, DeliveryServiceError>(()) - }); - let subscriber_c = subscriber.clone(); - tokio::spawn(async move { - let mut ss = subscriber_c.message_rx(); - while let Ok(msg) = ss.recv().await { - let bytes: Vec = msg.value.convert()?; - if let Err(e) = sender_subscriber.send(bytes) { - return Err(DeliveryServiceError::TokioSendError(e)); - } - } - Ok(()) - }); - - let token = CancellationToken::new(); - - let cloned_token = token.clone(); - let chat_id_c = chat_id.clone(); - tokio::spawn(async move { - token.cancelled().await; - subscriber.unsubscribe(chat_id_c).await?; - subscriber.quit().await?; - redis_client.quit().await?; - Ok::<_, DeliveryServiceError>(()) - }); - - Ok(RClientPrivate { - chat_id: chat_id.clone(), - sender_id, - token: cloned_token, - sender: sender_publish, - reciever: receiver_subscriber, - }) - } - - pub fn msg_send(&mut self, msg: Vec) -> Result<(), DeliveryServiceError> { - let json_value = SenderStruct { - sender: self.sender_id.clone(), - msg, - }; - let bytes = serde_json::to_vec(&json_value)?; - self.sender.send(bytes)?; Ok(()) } - - pub async fn msg_recv(&mut self) -> Result, DeliveryServiceError> { - while let Some(msg) = self.reciever.recv().await { - let m: SenderStruct = serde_json::from_slice(&msg)?; - if m.sender == self.sender_id { - continue; - } - return Ok(m.msg); - } - Err(DeliveryServiceError::EmptyMsgError) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum DeliveryServiceError { - #[error("Empty msg")] - EmptyMsgError, - #[error("Json error: {0}")] - JsonError(#[from] serde_json::Error), - #[error("Redis error: {0}")] - RedisError(#[from] RedisError), - #[error("Channel sender error: {0}")] - TokioSendError(#[from] SendError>), - #[error("Serialization problem: {0}")] - TlsError(#[from] tls_codec::Error), - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} - -#[tokio::test] -async fn private_test() { - let mut alice = RClientPrivate::new_for_users("alicebob".to_string(), "alice".to_string()) - .await - .unwrap(); - let mut bob = RClientPrivate::new_for_users("alicebob".to_string(), "bob".to_string()) - .await - .unwrap(); - - alice.msg_send("Hi bob".as_bytes().to_vec()).unwrap(); - let msg = bob.msg_recv().await.unwrap(); - assert_eq!("Hi bob".to_string(), String::from_utf8(msg).unwrap()); - - bob.msg_send("Hi alice".as_bytes().to_vec()).unwrap(); - let msg = alice.msg_recv().await.unwrap(); - assert_eq!("Hi alice".to_string(), String::from_utf8(msg).unwrap()); } diff --git a/ds/src/lib.rs b/ds/src/lib.rs index 7dd89af..debb230 100644 --- a/ds/src/lib.rs +++ b/ds/src/lib.rs @@ -1,30 +1,34 @@ use alloy::{hex::FromHexError, primitives::SignatureError}; +use fred::error::RedisError; pub mod chat_client; pub mod chat_server; pub mod ds; -#[derive(thiserror::Error, Debug)] -pub enum ChatServiceError { - #[error("Failed to bind to address")] - BindError(#[from] std::io::Error), - #[error("WebSocket handshake error")] +#[derive(Debug, thiserror::Error)] +pub enum DeliveryServiceError { + #[error("Validation failed: {0}")] + ValidationError(String), + + #[error("Redis operation failed: {0}")] + RedisError(#[from] RedisError), + #[error("Failed to send message to channel: {0}")] + SenderError(String), + #[error("WebSocket handshake failed.")] HandshakeError(#[from] tokio_tungstenite::tungstenite::Error), - #[error("Json error: {0}")] + #[error("Serialization error: {0}")] + TlsError(#[from] tls_codec::Error), + #[error("JSON processing error: {0}")] JsonError(#[from] serde_json::Error), - #[error("Unable to parce the address: {0}")] - AlloyFromHexError(#[from] FromHexError), + #[error("Failed to bind to the address.")] + BindError(#[from] std::io::Error), - #[error("Unable to recover the signature: {0}")] + #[error("Failed to parse address: {0}")] + AlloyFromHexError(#[from] FromHexError), + #[error("Failed to recover signature: {0}")] AlloySignatureError(#[from] SignatureError), - #[error("Lock poisoned")] - LockError, - #[error("Send error")] - SendError, - #[error("Task error")] - TaskError, - #[error("Validation error")] - ValidationError, + #[error("An unknown error occurred: {0}")] + Other(anyhow::Error), } diff --git a/sc_key_store/src/lib.rs b/sc_key_store/src/lib.rs index 88e3c4a..f5d2a7a 100644 --- a/sc_key_store/src/lib.rs +++ b/sc_key_store/src/lib.rs @@ -19,14 +19,18 @@ pub trait SCKeyStoreService { #[derive(Debug, thiserror::Error)] pub enum KeyStoreError { - #[error("User already exist")] - AlreadyExistedUserError, - #[error("Unknown user")] - UnknownUserError, - #[error("Alloy contract error: {0}")] - AlloyError(#[from] alloy::contract::Error), - #[error("Unable to parce the address: {0}")] - AlloyFromHexError(#[from] FromHexError), - #[error("Unknown error: {0}")] - Other(anyhow::Error), + #[error("User already exists.")] + UserAlreadyExistsError, + + #[error("User not found.")] + UserNotFoundError, + + #[error("Alloy contract operation failed: {0}")] + AlloyContractError(#[from] alloy::contract::Error), + + #[error("Failed to parse address: {0}")] + AddressParseError(#[from] FromHexError), + + #[error("An unexpected error occurred: {0}")] + UnexpectedError(anyhow::Error), } diff --git a/sc_key_store/src/sc_ks.rs b/sc_key_store/src/sc_ks.rs index 8af043f..434218d 100644 --- a/sc_key_store/src/sc_ks.rs +++ b/sc_key_store/src/sc_ks.rs @@ -22,7 +22,7 @@ where } } - pub fn get_sc_adsress(&self) -> String { + pub fn sc_adsress(&self) -> String { self.address.clone() } } @@ -35,33 +35,33 @@ impl, N: Network> SCKeyStoreService let res = self.instance.userExists(address).call().await; match res { Ok(is_exist) => Ok(is_exist._0), - Err(err) => Err(KeyStoreError::AlloyError(err)), + Err(err) => Err(KeyStoreError::AlloyContractError(err)), } } async fn add_user(&mut self, address: &str) -> Result<(), KeyStoreError> { if self.does_user_exist(address).await? { - return Err(KeyStoreError::AlreadyExistedUserError); + return Err(KeyStoreError::UserAlreadyExistsError); } let add_to_acl_binding = self.instance.addUser(Address::from_str(address)?); let res = add_to_acl_binding.send().await; match res { Ok(_) => Ok(()), - Err(err) => Err(KeyStoreError::AlloyError(err)), + Err(err) => Err(KeyStoreError::AlloyContractError(err)), } } async fn remove_user(&self, address: &str) -> Result<(), KeyStoreError> { if !self.does_user_exist(address).await? { - return Err(KeyStoreError::UnknownUserError); + return Err(KeyStoreError::UserNotFoundError); } let remove_user_binding = self.instance.removeUser(Address::from_str(address)?); let res = remove_user_binding.send().await; match res { Ok(_) => Ok(()), - Err(err) => Err(KeyStoreError::AlloyError(err)), + Err(err) => Err(KeyStoreError::AlloyContractError(err)), } } } diff --git a/src/cli.rs b/src/cli.rs index f9e56f9..65a3f54 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -4,8 +4,6 @@ use crossterm::{ execute, terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; - -use fred::error::RedisError; use ratatui::{ backend::CrosstermBackend, layout::{Constraint, Direction, Layout}, @@ -19,14 +17,14 @@ use std::{ sync::Arc, }; use tokio::{ - sync::mpsc::error::SendError, sync::mpsc::{Receiver, Sender}, sync::Mutex, - task::JoinError, }; use tokio_util::sync::CancellationToken; use url::Url; +use crate::CliError; + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] pub struct Args { @@ -113,19 +111,29 @@ pub async fn event_handler( if cli.is_err() { messages_tx .send(Msg::Input(Message::System("Unknown command".to_string()))) - .await?; + .await + .map_err(|err| CliError::SenderError(err.to_string()))?; continue; } - cli_tx.send(cli.unwrap().command).await?; + cli_tx + .send(cli.unwrap().command) + .await + .map_err(|err| CliError::SenderError(err.to_string()))?; } KeyCode::Esc => { - messages_tx.send(Msg::Exit).await?; + messages_tx + .send(Msg::Exit) + .await + .map_err(|err| CliError::SenderError(err.to_string()))?; token.cancel(); break; } _ => {} } - messages_tx.send(Msg::Refresh(input.clone())).await?; + messages_tx + .send(Msg::Refresh(input.clone())) + .await + .map_err(|err| CliError::SenderError(err.to_string()))?; } } Ok::<_, CliError>(()) @@ -232,27 +240,3 @@ pub async fn terminal_handler( terminal_lock.show_cursor()?; Ok(()) } - -#[derive(Debug, thiserror::Error)] -pub enum CliError { - #[error("Can't split the line")] - SplitLineError, - - #[error("Problem from std::io library: {0}")] - IoError(#[from] std::io::Error), - - #[error("Can't send control message into channel: {0}")] - SendMsgError(#[from] SendError), - #[error("Can't send bytes into channel: {0}")] - SendVecError(#[from] SendError>), - #[error("Can't send command into channel: {0}")] - SendCommandError(#[from] SendError), - - #[error("Redis error: {0}")] - RedisError(#[from] RedisError), - #[error("Failed from tokio join: {0}")] - TokioJoinError(#[from] JoinError), - - #[error("Unknown error: {0}")] - AnyHowError(anyhow::Error), -} diff --git a/src/contact.rs b/src/contact.rs index aacfe6e..825fb8f 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1,10 +1,9 @@ -use alloy::{hex::ToHexExt, primitives::SignatureError}; +use alloy::hex::ToHexExt; use ds::{ chat_client::{ ChatClient, ChatMessages, ReqMessageType, RequestMLSPayload, ResponseMLSPayload, }, chat_server::ServerMessage, - ChatServiceError, }; // use waku_bindings::*; @@ -14,26 +13,28 @@ use tls_codec::Serialize; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; +use crate::ContactError; + pub const CHAT_SERVER_ADDR: &str = "ws://127.0.0.1:8080"; pub struct ContactsList { contacts: Arc>>, + group_id2sc: HashMap, pub future_req: HashMap, pub chat_client: ChatClient, } pub struct Contact { - user_address: String, // map group_name to key_package bytes - user_kp2group: HashMap>, + group_id2user_kp: HashMap>, // user_p2p_addr: WakuPeers, } impl Contact { pub fn get_relevant_kp(&mut self, group_name: String) -> Result, ContactError> { - match self.user_kp2group.remove(&group_name) { + match self.group_id2user_kp.remove(&group_name) { Some(kp) => Ok(kp.clone()), - None => Err(ContactError::UnknownKeyPackageForGroupError), + None => Err(ContactError::MissingKeyPackageForGroup), } } @@ -42,8 +43,8 @@ impl Contact { key_package: Vec, group_name: String, ) -> Result<(), ContactError> { - match self.user_kp2group.insert(group_name, key_package) { - Some(_) => Err(ContactError::DoubleUserError), + match self.group_id2user_kp.insert(group_name, key_package) { + Some(_) => Err(ContactError::DuplicateUserError), None => Ok(()), } } @@ -53,6 +54,7 @@ impl ContactsList { pub async fn new(chat_client: ChatClient) -> Result { Ok(ContactsList { contacts: Arc::new(Mutex::new(HashMap::new())), + group_id2sc: HashMap::new(), future_req: HashMap::new(), chat_client, }) @@ -68,34 +70,36 @@ impl ContactsList { let welcome_str: String = bytes.encode_hex(); let msg = ChatMessages::Welcome(welcome_str); - self.chat_client - .send_message_to_server(ServerMessage::InMessage { - from: self_address, - to: users_address, - msg: serde_json::to_string(&msg)?, - })?; + self.chat_client.send_message(ServerMessage::InMessage { + from: self_address, + to: users_address, + msg: serde_json::to_string(&msg)?, + })?; Ok(()) } - pub async fn send_req_msg_to_user( + pub fn send_msg_req( &mut self, self_address: String, - user_address: &str, - sc_address: String, + user_address: String, + group_name: String, msg_type: ReqMessageType, ) -> Result<(), ContactError> { self.future_req - .insert(user_address.to_string(), CancellationToken::new()); + .insert(user_address.clone(), CancellationToken::new()); + + let sc_address = match self.group_id2sc.get(&group_name).cloned() { + Some(sc) => sc, + None => return Err(ContactError::MissingSmartContractForGroup), + }; - let req = ChatMessages::Request(RequestMLSPayload::new(sc_address, msg_type)); - self.chat_client - .send_request(ServerMessage::InMessage { - from: self_address, - to: vec![user_address.to_string()], - msg: serde_json::to_string(&req)?, - }) - .await?; + let req = ChatMessages::Request(RequestMLSPayload::new(sc_address, group_name, msg_type)); + self.chat_client.send_message(ServerMessage::InMessage { + from: self_address, + to: vec![user_address], + msg: serde_json::to_string(&req)?, + })?; Ok(()) } @@ -107,51 +111,27 @@ impl ContactsList { resp: ResponseMLSPayload, ) -> Result<(), ContactError> { let resp_j = ChatMessages::Response(resp); - self.chat_client - .send_message_to_server(ServerMessage::InMessage { - from: self_address, - to: vec![user_address.to_string()], - msg: serde_json::to_string(&resp_j)?, - })?; + self.chat_client.send_message(ServerMessage::InMessage { + from: self_address, + to: vec![user_address.to_string()], + msg: serde_json::to_string(&resp_j)?, + })?; Ok(()) } - // pub fn recive_resp_msg( - // &mut self, - // msg_bytes: &str, - // sc_address: &str, - // ) -> Result<(), ContactError> { - // let resp: ResponseMLSPayload = serde_json::from_str(msg_bytes)?; - // let check_msg = REQUEST.to_owned() + sc_address; - - // if !self.contacts.contains_key(&resp.user_address) { - // return Err(ContactError::UnknownUserError); - // } - - // // Verify msg: https://alloy.rs/examples/wallets/verify_message.html - // let signature = Signature::from_str(&resp.signature)?; - // let addr = signature.recover_address_from_msg(&check_msg)?; - // if addr.to_string() != resp.user_address { - // return Err(ContactError::InvalidSignature); - // } - - // Ok(()) - // } - pub async fn add_new_contact(&mut self, user_address: &str) -> Result<(), ContactError> { let mut contacts = self.contacts.lock().await; if contacts.contains_key(user_address) { - return Err(ContactError::DoubleUserError); + return Err(ContactError::DuplicateUserError); } match contacts.insert( user_address.to_string(), Contact { - user_address: user_address.to_string(), - user_kp2group: HashMap::new(), + group_id2user_kp: HashMap::new(), }, ) { - Some(_) => Err(ContactError::DoubleUserError), + Some(_) => Err(ContactError::DuplicateUserError), None => Ok(()), } } @@ -165,7 +145,7 @@ impl ContactsList { let mut contacts = self.contacts.lock().await; match contacts.get_mut(user_wallet) { Some(user) => user.add_key_package(key_package, group_name)?, - None => return Err(ContactError::UnknownUserError), + None => return Err(ContactError::UserNotFoundError), } Ok(()) } @@ -184,19 +164,19 @@ impl ContactsList { for user_wallet in user_wallets { if joiners_kp.contains_key(&user_wallet) { - return Err(ContactError::DoubleUserError); + return Err(ContactError::DuplicateUserError); } let mut contacts = self.contacts.lock().await; match contacts.get_mut(&user_wallet) { Some(contact) => match contact.get_relevant_kp(group_name.clone()) { Ok(kp) => match joiners_kp.insert(user_wallet, kp) { - Some(_) => return Err(ContactError::DoubleUserError), + Some(_) => return Err(ContactError::DuplicateUserError), None => continue, }, Err(err) => return Err(err), }, - None => return Err(ContactError::UnknownUserError), + None => return Err(ContactError::UserNotFoundError), } } @@ -209,29 +189,25 @@ impl ContactsList { token.cancel(); Ok(()) } - None => Err(ContactError::UnknownUserError), + None => Err(ContactError::UserNotFoundError), } } -} -#[derive(Debug, thiserror::Error)] -pub enum ContactError { - #[error("Key package for given group doesnt exist")] - UnknownKeyPackageForGroupError, - #[error("Unknown user")] - UnknownUserError, - #[error("Double user in joiners users list")] - DoubleUserError, - #[error("Invalid user address inside signature")] - InvalidSignature, - - #[error(transparent)] - ChatServiceError(#[from] ChatServiceError), - - #[error("Can't parce signature: {0}")] - AlloySignatureError(#[from] SignatureError), - #[error("Json error: {0}")] - JsonError(#[from] serde_json::Error), - #[error("Serialization problem: {0}")] - TlsError(#[from] tls_codec::Error), + pub fn insert_group2sc( + &mut self, + group_name: String, + sc_address: String, + ) -> Result<(), ContactError> { + match self.group_id2sc.insert(group_name, sc_address) { + Some(_) => Err(ContactError::GroupAlreadyExistsError), + None => Ok(()), + } + } + + pub fn group2sc(&self, group_name: String) -> Result { + match self.group_id2sc.get(&group_name).cloned() { + Some(addr) => Ok(addr), + None => Err(ContactError::GroupNotFoundError(group_name)), + } + } } diff --git a/src/identity.rs b/src/identity.rs index ba58488..c5bff39 100644 --- a/src/identity.rs +++ b/src/identity.rs @@ -3,11 +3,12 @@ use std::collections::HashMap; use openmls::{credentials::CredentialWithKey, key_packages::*, prelude::*}; use openmls_basic_credential::SignatureKeyPair; -use openmls_rust_crypto::MemoryKeyStoreError; use openmls_traits::types::Ciphersuite; use mls_crypto::openmls_provider::MlsCryptoProvider; +use crate::IdentityError; + pub struct Identity { pub(crate) kp: HashMap, KeyPackage>, pub(crate) credential_with_key: CredentialWithKey, @@ -81,19 +82,3 @@ impl ToString for Identity { Address::from_slice(self.credential_with_key.credential.identity()).to_string() } } - -#[derive(Debug, thiserror::Error)] -pub enum IdentityError { - #[error("Something wrong while creating new key package: {0}")] - MlsKeyPackageNewError(#[from] KeyPackageNewError), - #[error(transparent)] - MlsLibraryError(#[from] LibraryError), - #[error("Something wrong with signature: {0}")] - MlsCryptoError(#[from] CryptoError), - #[error("Can't save signature key")] - MlsKeyStoreError(#[from] MemoryKeyStoreError), - #[error("Something wrong with credential: {0}")] - MlsCredentialError(#[from] CredentialError), - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} diff --git a/src/lib.rs b/src/lib.rs index d3fcbd8..bfd0a58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,48 +1,153 @@ +use alloy::{hex::FromHexError, primitives::SignatureError, signers::local::LocalSignerError}; +use ds::DeliveryServiceError; +use fred::error::RedisError; +use openmls::{error::LibraryError, prelude::*}; +use openmls_rust_crypto::MemoryKeyStoreError; +use sc_key_store::KeyStoreError; +use std::{str::Utf8Error, string::FromUtf8Error}; +use tokio::task::JoinError; + pub mod cli; pub mod contact; pub mod conversation; pub mod identity; pub mod user; -use std::{fs::File, io}; +#[derive(Debug, thiserror::Error)] +pub enum CliError { + #[error("Can't split the line")] + SplitLineError, + #[error("Failed to cancel token")] + TokenCancellingError, + + #[error("Problem from std::io library: {0}")] + IoError(#[from] std::io::Error), -const SC_ADDRESS: &str = "contracts/broadcast/Deploy.s.sol/31337/run-latest.json"; + #[error("Failed to send message to channel: {0}")] + SenderError(String), -pub fn get_contract_address() -> Result { - let file = File::open(SC_ADDRESS)?; - let json: serde_json::Value = serde_json::from_reader(file)?; + #[error("Redis error: {0}")] + RedisError(#[from] RedisError), + #[error("Failed from tokio join: {0}")] + TokioJoinError(#[from] JoinError), - let returns = match json.get("returns") { - Some(v) => v, - None => return Err(HelperError::UnknownJsonFieldError), - }; + #[error("Unknown error: {0}")] + AnyHowError(anyhow::Error), +} - let sc_keystore = match returns.get("scKeystore") { - Some(v) => v, - None => return Err(HelperError::UnknownJsonFieldError), - }; +#[derive(Debug, thiserror::Error)] +pub enum ContactError { + #[error("Key package for the specified group does not exist.")] + MissingKeyPackageForGroup, + #[error("SmartContract address for the specified group does not exist.")] + MissingSmartContractForGroup, + #[error("User not found.")] + UserNotFoundError, + #[error("Group not found: {0}")] + GroupNotFoundError(String), + #[error("Duplicate user found in joiners list.")] + DuplicateUserError, + #[error("Group already exists")] + GroupAlreadyExistsError, + #[error("Invalid user address in signature.")] + InvalidUserSignatureError, - let address = match sc_keystore.get("value") { - Some(v) => v, - None => return Err(HelperError::UnknownJsonFieldError), - }; + #[error(transparent)] + DeliveryServiceError(#[from] DeliveryServiceError), - match address.as_str() { - Some(res) => Ok(res.to_string()), - None => Err(HelperError::ParserError), - } + #[error("Failed to parse signature: {0}")] + AlloySignatureParsingError(#[from] SignatureError), + #[error("JSON processing error: {0}")] + JsonProcessingError(#[from] serde_json::Error), + #[error("Serialization error: {0}")] + SerializationError(#[from] tls_codec::Error), } #[derive(Debug, thiserror::Error)] -pub enum HelperError { - #[error("Field doesn't exist")] - UnknownJsonFieldError, - #[error("Parser Error")] - ParserError, - #[error("Can't read file: {0}")] - IoError(#[from] io::Error), - #[error("Json Error: {0}")] - JsonError(#[from] serde_json::Error), - #[error("Unknown error: {0}")] +pub enum IdentityError { + #[error("Failed to create new key package: {0}")] + MlsKeyPackageCreationError(#[from] KeyPackageNewError), + #[error(transparent)] + MlsLibraryError(#[from] LibraryError), + #[error("Failed to create signature: {0}")] + MlsCryptoError(#[from] CryptoError), + #[error("Failed to save signature key: {0}")] + MlsKeyStoreError(#[from] MemoryKeyStoreError), + #[error("Failed to create credential: {0}")] + MlsCredentialError(#[from] CredentialError), + #[error("An unknown error occurred: {0}")] Other(anyhow::Error), } + +#[derive(Debug, thiserror::Error)] +pub enum UserError { + #[error("User lacks connection to the smart contract.")] + MissingSmartContractConnection, + #[error("Group not found: {0}")] + GroupNotFoundError(String), + #[error("Group already exists: {0}")] + GroupAlreadyExistsError(String), + #[error("Unsupported message type.")] + UnsupportedMessageType, + #[error("User already exists: {0}")] + UserAlreadyExistsError(String), + #[error("Welcome message cannot be empty.")] + EmptyWelcomeMessageError, + #[error("Message from user is invalid")] + InvalidChatMessageError, + #[error("Message from server is invalid")] + InvalidServerMessageError, + #[error("User not found.")] + UserNotFoundError, + + #[error(transparent)] + DeliveryServiceError(#[from] DeliveryServiceError), + #[error(transparent)] + KeyStoreError(#[from] KeyStoreError), + #[error(transparent)] + IdentityError(#[from] IdentityError), + #[error(transparent)] + ContactError(#[from] ContactError), + + #[error("Error while creating MLS group: {0}")] + MlsGroupCreationError(#[from] NewGroupError), + #[error("Error while adding member to MLS group: {0}")] + MlsAddMemberError(#[from] AddMembersError), + #[error("Error while merging pending commit in MLS group: {0}")] + MlsMergePendingCommitError(#[from] MergePendingCommitError), + #[error("Error while merging commit in MLS group: {0}")] + MlsMergeCommitError(#[from] MergeCommitError), + #[error("Error processing unverified message: {0}")] + MlsProcessMessageError(#[from] ProcessMessageError), + #[error("Error while creating message: {0}")] + MlsCreateMessageError(#[from] CreateMessageError), + #[error("Failed to create staged join: {0}")] + MlsWelcomeError(#[from] WelcomeError), + #[error("Failed to remove member from MLS group: {0}")] + MlsRemoveMemberError(#[from] RemoveMembersError), + #[error("Failed to validate user key package: {0}")] + MlsKeyPackageVerificationError(#[from] KeyPackageVerifyError), + + #[error("UTF-8 parsing error: {0}")] + Utf8ParsingError(#[from] FromUtf8Error), + #[error("UTF-8 string parsing error: {0}")] + Utf8StringParsingError(#[from] Utf8Error), + + #[error("JSON processing error: {0}")] + JsonError(#[from] serde_json::Error), + #[error("Serialization error: {0}")] + SerializationError(#[from] tls_codec::Error), + + #[error("Failed to parse address: {0}")] + AddressParsingError(#[from] FromHexError), + #[error("Failed to parse signer: {0}")] + SignerParsingError(#[from] LocalSignerError), + + #[error("Signing error: {0}")] + SigningError(#[from] alloy::signers::Error), + #[error("I/O error: {0}")] + IoError(#[from] std::io::Error), + + #[error("An unknown error occurred: {0}")] + UnknownError(anyhow::Error), +} diff --git a/src/main.rs b/src/main.rs index 5ca564c..d0d2d04 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,14 @@ use alloy::{providers::ProviderBuilder, signers::local::PrivateKeySigner}; use clap::Parser; -use ds::{ - chat_client::{ChatClient, ChatMessages, ReqMessageType}, - chat_server::ServerMessage, -}; - -use std::{borrow::BorrowMut, error::Error, str::FromStr, sync::Arc}; +use std::{error::Error, str::FromStr, sync::Arc}; use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::tungstenite::protocol::Message as TokioMessage; use tokio_util::sync::CancellationToken; -use de_mls::{ - cli::*, - user::{User, UserError}, +use de_mls::{cli::*, user::User, CliError, UserError}; +use ds::{ + chat_client::{ChatClient, ChatMessages}, + chat_server::ServerMessage, }; #[tokio::main] @@ -23,9 +19,9 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); let signer = PrivateKeySigner::from_str(&args.user_priv_key)?; - let user_address = signer.address(); + let user_address = signer.address().to_string(); let (client, mut client_recv) = - ChatClient::connect("ws://127.0.0.1:8080", &user_address.to_string()).await?; + ChatClient::connect("ws://127.0.0.1:8080", user_address.clone()).await?; //// Create user let user_n = User::new(&args.user_priv_key, client).await?; let user_arc = Arc::new(Mutex::new(user_n)); @@ -34,7 +30,7 @@ async fn main() -> Result<(), Box> { messages_tx .send(Msg::Input(Message::System(format!( "Hello, {:}", - user_address + user_address.clone() )))) .await?; @@ -56,19 +52,19 @@ async fn main() -> Result<(), Box> { if let Ok(chat_msg) = serde_json::from_str::(&msg) { match chat_msg { ChatMessages::Request(req) => { - let res = user.as_ref().lock().await.send_responce_on_request(req, to[0].clone(), &from); + let res = user.as_ref().lock().await.send_responce_on_request(req, &from); if let Err(err) = res { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; } }, ChatMessages::Response(resp) => { - let res = user.as_ref().lock().await.parce_responce(resp, "test".to_string()).await; + let res = user.as_ref().lock().await.parce_responce(resp).await; if let Err(err) = res { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; } }, ChatMessages::Welcome(welcome) => { @@ -76,13 +72,13 @@ async fn main() -> Result<(), Box> { match res { Ok(mut buf) => { let msg = format!("Succesfully join to the group: {:#?}", buf.1); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; + res_msg_tx.send(Msg::Input(Message::System(msg))).await.map_err(|err| CliError::SenderError(err.to_string()))?; let redis_tx = redis_tx.clone(); tokio::spawn(async move { while let Ok(msg) = buf.0.recv().await { let bytes: Vec = msg.value.convert()?; - redis_tx.send(bytes).await?; + redis_tx.send(bytes).await.map_err(|err| CliError::SenderError(err.to_string()))?; } Ok::<_, CliError>(()) }); @@ -90,7 +86,7 @@ async fn main() -> Result<(), Box> { Err(err) => { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; }, @@ -98,13 +94,13 @@ async fn main() -> Result<(), Box> { } else { res_msg_tx .send(Msg::Input(Message::Error(UserError::InvalidChatMessageError.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; } }; } else { res_msg_tx .send(Msg::Input(Message::Error(UserError::InvalidServerMessageError.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; } } } @@ -113,14 +109,14 @@ async fn main() -> Result<(), Box> { match res { Ok(msg) => { match msg { - Some(m) => res_msg_tx.send(Msg::Input(Message::Incoming(m.group, m.author, m.message))).await?, + Some(m) => res_msg_tx.send(Msg::Input(Message::Incoming(m.group, m.author, m.message))).await.map_err(|err| CliError::SenderError(err.to_string()))?, None => continue } }, Err(err) => { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; } @@ -130,18 +126,18 @@ async fn main() -> Result<(), Box> { Commands::CreateGroup { group_name, storage_address, storage_url } => { let client_provider = ProviderBuilder::new() .with_recommended_fillers() - .wallet(user.as_ref().lock().await.get_wallet()) + .wallet(user.as_ref().lock().await.wallet()) .on_http(storage_url); let res = user.as_ref().lock().await.connect_to_smart_contract(&storage_address, client_provider).await; match res { Ok(_) => { let msg = format!("Successfully connect to Smart Contract on address {:}\n", storage_address); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; + res_msg_tx.send(Msg::Input(Message::System(msg))).await.map_err(|err| CliError::SenderError(err.to_string()))?; }, Err(err) => { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; @@ -149,13 +145,13 @@ async fn main() -> Result<(), Box> { match res { Ok(mut br) => { let msg = format!("Successfully create group: {:?}", group_name.clone()); - res_msg_tx.send(Msg::Input(Message::System(msg))).await?; + res_msg_tx.send(Msg::Input(Message::System(msg))).await.map_err(|err| CliError::SenderError(err.to_string()))?; let redis_tx = redis_tx.clone(); tokio::spawn(async move { while let Ok(msg) = br.recv().await { let bytes: Vec = msg.value.convert()?; - redis_tx.send(bytes).await?; + redis_tx.send(bytes).await.map_err(|err| CliError::SenderError(err.to_string()))?; } Ok::<_, CliError>(()) }); @@ -163,59 +159,63 @@ async fn main() -> Result<(), Box> { Err(err) => { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; }, Commands::Invite { group_name, users_wallet_addrs } => { - let u_c = user.clone(); + let user_clone = user.clone(); let res_msg_tx_c = messages_tx.clone(); tokio::spawn(async move { for user_wallet in users_wallet_addrs.iter() { - let u_c_f = u_c.as_ref(); + let user_clone_ref = user_clone.as_ref(); let opt_token = { - let mut u_c_ff = u_c_f.lock().await; - if !u_c_ff.contacts.does_user_in_contacts(user_wallet).await { - u_c_ff.contacts.add_new_contact(user_wallet).await.unwrap(); + let mut user_clone_ref_lock = user_clone_ref.lock().await; + let res = user_clone_ref_lock.handle_send_req(user_wallet, group_name.clone()).await; + match res { + Ok(token) => { + token + }, + Err(err) => { + res_msg_tx_c + .send(Msg::Input(Message::Error(err.to_string()))) + .await.map_err(|err| CliError::SenderError(err.to_string()))?; + None + }, } - u_c_ff.contacts - .send_req_msg_to_user( - user_address.to_string(), - user_wallet, - "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512".to_string(), - ReqMessageType::InviteToGroup, - ) - .await.unwrap(); - - u_c_ff.contacts.future_req.get(user_wallet).cloned() }; match opt_token { Some(token) => token.cancelled().await, - None => return Err(CliError::SplitLineError), + None => return Err(CliError::TokenCancellingError), }; { - let mut u_c = u_c.as_ref().lock().await; - u_c.contacts.future_req.remove(user_wallet); - u_c.add_user_to_acl(user_wallet).await.unwrap(); + let mut user_clone_ref_lock = user_clone.as_ref().lock().await; + user_clone_ref_lock.contacts.future_req.remove(user_wallet); + let res = user_clone_ref_lock.add_user_to_acl(user_wallet).await; + if let Err(err) = res { + res_msg_tx_c + .send(Msg::Input(Message::Error(err.to_string()))) + .await.map_err(|err| CliError::SenderError(err.to_string()))?; + }; } } - let res = u_c.as_ref().lock().await.invite(users_wallet_addrs.clone(), group_name.clone()).await; + let res = user_clone.as_ref().lock().await.invite(users_wallet_addrs.clone(), group_name.clone()).await; match res { Ok(_) => { let msg = format!("Invite {:?} to the group {:}\n", users_wallet_addrs, group_name ); - res_msg_tx_c.send(Msg::Input(Message::System(msg))).await?; + res_msg_tx_c.send(Msg::Input(Message::System(msg))).await.map_err(|err| CliError::SenderError(err.to_string()))?; }, Err(err) => { res_msg_tx_c .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; Ok::<_, CliError>(()) @@ -223,20 +223,20 @@ async fn main() -> Result<(), Box> { }, Commands::SendMessage { group_name, msg } => { let message = msg.join(" "); - let res = user.as_ref().lock().await.send_msg(&message, group_name.clone(), user_address.to_string()).await; + let res = user.as_ref().lock().await.send_msg(&message, group_name.clone(), user_address.clone()).await; match res { Ok(_) => { - res_msg_tx.send(Msg::Input(Message::Mine(group_name, user_address.to_string(), message ))).await?; + res_msg_tx.send(Msg::Input(Message::Mine(group_name, user_address.clone(), message ))).await.map_err(|err| CliError::SenderError(err.to_string()))?; }, Err(err) => { res_msg_tx .send(Msg::Input(Message::Error(err.to_string()))) - .await?; + .await.map_err(|err| CliError::SenderError(err.to_string()))?; }, }; }, Commands::Exit => { - res_msg_tx.send(Msg::Input(Message::System("Bye!".to_string()))).await?; + res_msg_tx.send(Msg::Input(Message::System("Bye!".to_string()))).await.map_err(|err| CliError::SenderError(err.to_string()))?; break }, } @@ -245,7 +245,7 @@ async fn main() -> Result<(), Box> { break; } else => { - res_msg_tx.send(Msg::Input(Message::System("Something went wrong".to_string()))).await?; + res_msg_tx.send(Msg::Input(Message::System("Something went wrong".to_string()))).await.map_err(|err| CliError::SenderError(err.to_string()))?; break } }; diff --git a/src/user.rs b/src/user.rs index f881905..c93e58b 100644 --- a/src/user.rs +++ b/src/user.rs @@ -1,39 +1,31 @@ use alloy::{ - hex::{self, FromHexError}, + hex::{self}, network::{EthereumWallet, Network}, primitives::Address, providers::Provider, - signers::{ - local::{LocalSignerError, PrivateKeySigner}, - SignerSync, - }, + signers::{local::PrivateKeySigner, SignerSync}, transports::Transport, }; use fred::types::Message; use openmls::{group::*, prelude::*}; -use openmls_rust_crypto::MemoryKeyStoreError; use std::{ cell::RefCell, collections::HashMap, fmt::Display, - str::{from_utf8, FromStr, Utf8Error}, - string::FromUtf8Error, + str::{from_utf8, FromStr}, }; use tokio::sync::broadcast::Receiver; +use tokio_util::sync::CancellationToken; use ds::{ chat_client::{ChatClient, ReqMessageType, RequestMLSPayload, ResponseMLSPayload}, ds::*, - ChatServiceError, }; use mls_crypto::openmls_provider::*; use sc_key_store::{sc_ks::ScKeyStorage, *}; -use crate::{ - contact::ContactError, - identity::{Identity, IdentityError}, -}; use crate::{contact::ContactsList, conversation::*}; +use crate::{identity::Identity, UserError}; pub struct Group { group_name: String, @@ -105,7 +97,7 @@ where let group_id = group_name.as_bytes(); if self.groups.contains_key(&group_name) { - return Err(UserError::AlreadyExistedGroupError(group_name)); + return Err(UserError::GroupAlreadyExistsError(group_name)); } let group_config = MlsGroupConfig::builder() @@ -120,7 +112,7 @@ where self.identity.credential_with_key.clone(), )?; - let (rc, broadcaster) = RClient::new_for_group(group_name.clone()).await?; + let (rc, broadcaster) = RClient::new_with_group(group_name.clone()).await?; let group = Group { group_name: group_name.clone(), conversation: Conversation::default(), @@ -130,13 +122,15 @@ where // content_topics: Vec::new(), }; - self.groups.insert(group_name, group); + self.groups.insert(group_name.clone(), group); + self.contacts + .insert_group2sc(group_name, self.sc_address()?)?; Ok(broadcaster) } pub async fn add_user_to_acl(&mut self, user_address: &str) -> Result<(), UserError> { if self.sc_ks.is_none() { - return Err(UserError::EmptyScConnection); + return Err(UserError::MissingSmartContractConnection); } self.sc_ks.as_mut().unwrap().add_user(user_address).await?; Ok(()) @@ -147,7 +141,7 @@ where mut signed_kp: &[u8], ) -> Result { if self.sc_ks.is_none() { - return Err(UserError::EmptyScConnection); + return Err(UserError::MissingSmartContractConnection); } let key_package_in = KeyPackageIn::tls_deserialize(&mut signed_kp)?; @@ -163,32 +157,9 @@ where group_name: String, ) -> Result<(), UserError> { if self.sc_ks.is_none() { - return Err(UserError::EmptyScConnection); + return Err(UserError::MissingSmartContractConnection); } - // for user_wallet in users.iter() { - // if !self.contacts.does_user_in_contacts(user_wallet).await { - // self.contacts.add_new_contact(user_wallet).await?; - // } - // self.contacts - // .send_req_msg_to_user( - // self.identity.to_string(), - // user_wallet, - // self.sc_ks.as_mut().unwrap().get_sc_adsress(), - // ReqMessageType::InviteToGroup, - // ) - // .await?; - // println!("waiting token"); - // match self.contacts.future_req.get(user_wallet) { - // Some(token) => token.cancelled().await, - // None => return Err(UserError::UnknownUserError), - // }; - // println!("cancelled token"); - // self.contacts.future_req.remove(user_wallet); - - // self.add_user_to_acl(user_wallet).await?; - // } - let users_for_invite = self .contacts .prepare_joiners(users.clone(), group_name.clone()) @@ -204,7 +175,7 @@ where // Build a proposal with this key package and do the MLS bits. let group = match self.groups.get_mut(&group_name) { Some(g) => g, - None => return Err(UserError::UnknownGroupError(group_name)), + None => return Err(UserError::GroupNotFoundError(group_name)), }; let (out_messages, welcome, _group_info) = group.mls_group.borrow_mut().add_members( @@ -215,7 +186,11 @@ where group .rc_client - .msg_send(out_messages, self.identity.to_string()) + .msg_send( + out_messages.tls_serialize_detached()?, + self.identity.to_string(), + group_name, + ) .await?; // Second, process the invitation on our end. group @@ -245,7 +220,7 @@ where MlsMessageInBody::PublicMessage(message) => { self.process_protocol_msg(message.into())? } - _ => return Err(UserError::MessageTypeError), + _ => return Err(UserError::UnsupportedMessageType), }; Ok(msg) } @@ -257,7 +232,7 @@ where let group_name = from_utf8(message.group_id().as_slice())?.to_string(); let group = match self.groups.get_mut(&group_name) { Some(g) => g, - None => return Err(UserError::UnknownGroupError(group_name)), + None => return Err(UserError::GroupNotFoundError(group_name)), }; let mut mls_group = group.mls_group.borrow_mut(); @@ -314,7 +289,7 @@ where ) -> Result<(), UserError> { let group = match self.groups.get_mut(&group_name) { Some(g) => g, - None => return Err(UserError::UnknownGroupError(group_name)), + None => return Err(UserError::GroupNotFoundError(group_name)), }; let message_out = group.mls_group.borrow_mut().create_message( @@ -323,7 +298,10 @@ where msg.as_bytes(), )?; - group.rc_client.msg_send(message_out, sender).await?; + group + .rc_client + .msg_send(message_out.tls_serialize_detached()?, sender, group_name) + .await?; Ok(()) } @@ -349,7 +327,7 @@ where let group_id = mls_group.group_id().to_vec(); let group_name = String::from_utf8(group_id)?; - let (rc, br) = RClient::new_for_group(group_name.clone()).await?; + let (rc, br) = RClient::new_with_group(group_name.clone()).await?; let group = Group { group_name: group_name.clone(), conversation: Conversation::default(), @@ -358,7 +336,7 @@ where }; match self.groups.insert(group_name.clone(), group) { - Some(old) => Err(UserError::AlreadyExistedGroupError(old.group_name)), + Some(old) => Err(UserError::GroupAlreadyExistsError(old.group_name)), None => Ok((br, group_name)), } } @@ -397,7 +375,7 @@ where group_name: String, ) -> Result>, UserError> { self.groups.get(&group_name).map_or_else( - || Err(UserError::UnknownGroupError(group_name)), + || Err(UserError::GroupNotFoundError(group_name)), |g| { Ok(g.conversation .get(100) @@ -409,7 +387,7 @@ where pub fn group_members(&self, group_name: String) -> Result, UserError> { let group = match self.groups.get(&group_name) { Some(g) => g, - None => return Err(UserError::UnknownGroupError(group_name)), + None => return Err(UserError::GroupNotFoundError(group_name)), }; Ok(group.group_members(self.identity.signature_pub_key().as_slice())) } @@ -421,11 +399,11 @@ where Ok(self.groups.keys().map(|k| k.to_owned()).collect()) } - pub fn get_wallet(&self) -> EthereumWallet { + pub fn wallet(&self) -> EthereumWallet { EthereumWallet::from(self.eth_signer.clone()) } - fn generate_signature(&self, msg: String) -> Result { + fn sign(&self, msg: String) -> Result { let signature = self.eth_signer.sign_message_sync(msg.as_bytes())?; let res = serde_json::to_string(&signature)?; Ok(res) @@ -434,18 +412,19 @@ where pub fn send_responce_on_request( &mut self, req: RequestMLSPayload, - self_address: String, user_address: &str, ) -> Result<(), UserError> { + let self_address = self.identity.to_string(); match req.msg_type { ReqMessageType::InviteToGroup => { - let signature = self.generate_signature(req.msg)?; + let signature = self.sign(req.msg_to_sign())?; let key_package = self .identity .generate_key_package(CIPHERSUITE, &self.provider)?; let resp = ResponseMLSPayload::new( signature, self_address.clone(), + req.group_name(), key_package.tls_serialize_detached()?, ); self.contacts @@ -457,15 +436,13 @@ where } } - pub async fn parce_responce( - &mut self, - resp: ResponseMLSPayload, - group_name: String, - ) -> Result<(), UserError> { + pub async fn parce_responce(&mut self, resp: ResponseMLSPayload) -> Result<(), UserError> { if self.sc_ks.is_none() { - return Err(UserError::EmptyScConnection); + return Err(UserError::MissingSmartContractConnection); } - let (user_wallet, kp) = resp.validate(self.sc_ks.as_ref().unwrap().get_sc_adsress())?; + let group_name = resp.group_name.clone(); + let sc_address = self.contacts.group2sc(group_name.clone())?; + let (user_wallet, kp) = resp.validate(sc_address, group_name.clone())?; self.contacts .add_key_package_to_contact(&user_wallet, kp, group_name.clone()) @@ -475,11 +452,31 @@ where Ok(()) } - pub fn get_sc_address(&self) -> Result { + pub fn sc_address(&self) -> Result { if self.sc_ks.is_none() { - return Err(UserError::EmptyScConnection); + return Err(UserError::MissingSmartContractConnection); } - Ok(self.sc_ks.as_ref().unwrap().get_sc_adsress()) + Ok(self.sc_ks.as_ref().unwrap().sc_adsress()) + } + + pub async fn handle_send_req( + &mut self, + user_wallet: &str, + group_name: String, + ) -> Result, UserError> { + if !self.contacts.does_user_in_contacts(user_wallet).await { + self.contacts.add_new_contact(user_wallet).await?; + } + self.contacts + .send_msg_req( + self.identity.to_string(), + user_wallet.to_owned(), + group_name, + ReqMessageType::InviteToGroup, + ) + .unwrap(); + + Ok(self.contacts.future_req.get(user_wallet).cloned()) } } @@ -513,75 +510,3 @@ pub enum GroupError { #[error("Unknown group member : {0}")] UnknownGroupMemberError(String), } - -#[derive(Debug, thiserror::Error)] -pub enum UserError { - #[error("User doesn't have connection to smart contract")] - EmptyScConnection, - #[error("Unknown group: {0}")] - UnknownGroupError(String), - #[error("Group already exist: {0}")] - AlreadyExistedGroupError(String), - #[error("Unsupported message type")] - MessageTypeError, - #[error("User already exist: {0}")] - AlreadyExistedUserError(String), - #[error("Empty welcome message")] - EmptyWelcomeMessageError, - #[error("Inner Message from server is invalid")] - InvalidChatMessageError, - #[error("Message from server is invalid")] - InvalidServerMessageError, - #[error("Unknown user")] - UnknownUserError, - - #[error(transparent)] - DeliveryServiceError(#[from] DeliveryServiceError), - #[error(transparent)] - KeyStoreError(#[from] KeyStoreError), - #[error(transparent)] - IdentityError(#[from] IdentityError), - #[error(transparent)] - ContactError(#[from] ContactError), - #[error(transparent)] - ChatServiceError(#[from] ChatServiceError), - - #[error("Something wrong while creating Mls group: {0}")] - MlsGroupCreationError(#[from] NewGroupError), - #[error("Something wrong while adding member to Mls group: {0}")] - MlsAddMemberError(#[from] AddMembersError), - #[error("Something wrong while merging pending commit: {0}")] - MlsMergePendingCommitError(#[from] MergePendingCommitError), - #[error("Something wrong while merging commit: {0}")] - MlsMergeCommitError(#[from] MergeCommitError), - #[error("Error processing unverified message: {0}")] - MlsProcessMessageError(#[from] ProcessMessageError), - #[error("Something wrong while creating message: {0}")] - MlsCreateMessageError(#[from] CreateMessageError), - #[error("Failed to create staged join: {0}")] - MlsWelcomeError(#[from] WelcomeError), - #[error("Failed to remove member from group: {0}")] - MlsRemoveMembersError(#[from] RemoveMembersError), - #[error("Failed to validate user key_pacakge: {0}")] - MlsKeyPackageVerifyError(#[from] KeyPackageVerifyError), - - #[error("Parse String UTF8 error: {0}")] - ParseUTF8Error(#[from] FromUtf8Error), - #[error("Parse str UTF8 error: {0}")] - ParseStrUTF8Error(#[from] Utf8Error), - #[error("Json error: {0}")] - JsonError(#[from] serde_json::Error), - #[error("Serialization problem: {0}")] - TlsError(#[from] tls_codec::Error), - #[error("Unable to parce the address: {0}")] - AlloyFromHexError(#[from] FromHexError), - #[error("Unable to parce the signer: {0}")] - AlloyParceSignerError(#[from] LocalSignerError), - #[error("Unable to sign the message: {0}")] - AlloySignersError(#[from] alloy::signers::Error), - #[error("Write to stdout error")] - IoError(#[from] std::io::Error), - - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} diff --git a/tests/sc_key_store_test.rs b/tests/sc_key_store_test.rs deleted file mode 100644 index 8546ff3..0000000 --- a/tests/sc_key_store_test.rs +++ /dev/null @@ -1,242 +0,0 @@ -use alloy::providers::ProviderBuilder; -use alloy::{network::EthereumWallet, primitives::Address, signers::local::PrivateKeySigner}; -use de_mls::contact::CHAT_SERVER_ADDR; -use de_mls::user::User; -use ds::chat_client::{self, ChatClient, ChatMessages}; -use ds::chat_server::ServerMessage; -use std::str::FromStr; -use std::time::Duration; -use tungstenite::Message; - -use openmls::prelude::*; -use openmls_basic_credential::SignatureKeyPair; - -use mls_crypto::openmls_provider::{MlsCryptoProvider, CIPHERSUITE}; - -pub fn alice_addr_test() -> (Address, EthereumWallet) { - let alice_address = Address::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266").unwrap(); // anvil default key 0 - let signer = PrivateKeySigner::from_str( - "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", - ) - .unwrap(); - let wallet = EthereumWallet::from(signer); - (alice_address, wallet) -} - -pub fn bob_addr_test() -> (Address, EthereumWallet) { - let bob_address = Address::from_str("0x70997970C51812dc3A010C7d01b50e0d17dc79C8").unwrap(); // anvil default key 0 - let signer = PrivateKeySigner::from_str( - "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", - ) - .unwrap(); - let wallet = EthereumWallet::from(signer); - (bob_address, wallet) -} - -pub fn carla_addr_test() -> (Address, EthereumWallet) { - let carla_address = Address::from_str("0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC").unwrap(); // anvil default key 0 - let signer = PrivateKeySigner::from_str( - "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a", - ) - .unwrap(); - let wallet = EthereumWallet::from(signer); - (carla_address, wallet) -} - -pub(crate) fn test_identity( - address: Address, - crypto: &MlsCryptoProvider, -) -> (KeyPackage, SignatureKeyPair) { - let ciphersuite = CIPHERSUITE; - let signature_keys = SignatureKeyPair::new(ciphersuite.signature_algorithm()).unwrap(); - let credential = Credential::new(address.to_vec(), CredentialType::Basic).unwrap(); - - let credential_with_key = CredentialWithKey { - credential, - signature_key: signature_keys.to_public_vec().into(), - }; - signature_keys.store(crypto.key_store()).unwrap(); - - let key_package = KeyPackage::builder() - .build( - CryptoConfig { - ciphersuite, - version: ProtocolVersion::default(), - }, - crypto, - &signature_keys, - credential_with_key.clone(), - ) - .unwrap(); - - (key_package, signature_keys) -} - -#[tokio::test] -async fn test_input_request() { - let (chat_client, mut client_recv) = ChatClient::connect( - CHAT_SERVER_ADDR, - "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - ) - .await - .unwrap(); - let mut alice = User::new( - "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", - chat_client, - ) - .await - .unwrap(); - - let client_provider = ProviderBuilder::new() - .with_recommended_fillers() - .wallet(alice.get_wallet()) - .on_http(url::Url::from_str("http://localhost:8545").unwrap()); - - let res = alice - .connect_to_smart_contract( - "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512", - client_provider, - ) - .await; - - assert!(res.is_ok()); - - let h2 = tokio::spawn(async move { - while let Some(msg) = client_recv.recv().await { - if let Message::Text(text) = msg { - if let Ok(chat_message) = serde_json::from_str::(&text) { - match chat_message { - // ServerMessage::InMessage { from, to, msg } => { - // println!("alice received TextMsg from {}: {}", from, msg); - // } - ServerMessage::SystemJoin { username } => { - println!("Client1 received SystemJoin message for user: {}", username); - } - ServerMessage::InMessage { from, to, msg } => { - println!("Client1 received inmessage message for user: {}", from); - if let Ok(chat_msg) = serde_json::from_str::(&msg) { - match chat_msg { - ChatMessages::Request(req) => { - let res = alice.send_responce_on_request( - req, - to[0].clone(), - &from, - ); - match res { - Ok(_) => { - println!("Succesfully create responce"); - } - Err(err) => { - eprintln!("Error: {}", err); - } - } - } - ChatMessages::Response(resp) => { - let res = - alice.parce_responce(resp, "test".to_string()).await; - match res { - Ok(_) => { - println!("Succesfully parse responce"); - } - Err(err) => { - eprintln!("Error: {}", err); - } - } - } - ChatMessages::Welcome(welcome) => { - let res = alice.join_group(welcome).await; - match res { - Ok(mut buf) => { - let msg = format!( - "Succesfully join to the group: {:#?}", - buf.1 - ); - } - Err(err) => { - eprintln!("Error: {}", err); - } - }; - } - } - } - } - } - } - } - } - }); - - let (chat_client_bob, mut client_recv_bo) = ChatClient::connect( - CHAT_SERVER_ADDR, - "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", - ) - .await - .unwrap(); - // let mut bob = User::new( - // "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", - // chat_client_bob, - // ) - // .await - // .unwrap(); - // - let client2_send_task = tokio::spawn(async move { - loop { - println!("send msg"); - tokio::time::sleep(Duration::from_secs(7)).await; // Delay between messages - let message = ServerMessage::InMessage { - from: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8".to_string(), - to: vec!["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266".to_string()], - msg: "Hello from user1!".to_string(), - }; - if let Err(e) = chat_client_bob.send_message_to_server(message) { - eprintln!("Client2 failed to send message: {}", e); - } - } - }); - - let _ = tokio::join!(h2, client2_send_task); - - // let bob_signer = PrivateKeySigner::from_str( - // "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", - // ) - // .unwrap(); - // let bob_address = bob_signer.address(); - - // let crypto = MlsCryptoProvider::default(); - // let (bob_key, bob_sign_pk) = test_identity(bob_address, &crypto); - - // let bytes_key = bob_key.tls_serialize_detached().unwrap(); - - // let res = alice.add_user_to_acl(&bob_address.to_string()).await; - - // assert!(res.is_ok()); - - // let res = alice.restore_key_package(&bytes_key).await; - - // assert!(res.is_ok()) -} - -#[tokio::test] -async fn bob_test() { - - // let client_provider_bob = ProviderBuilder::new() - // .with_recommended_fillers() - // .wallet(bob.get_wallet()) - // .on_http(url::Url::from_str("http://localhost:8545").unwrap()); - - // let res = bob - // .connect_to_smart_contract( - // "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512", - // client_provider_bob, - // ) - // .await; - - // bob.contacts - // .send_req_msg_to_user( - // "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266".to_string(), - // "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", - // "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512".to_string(), - // chat_client::ReqMessageType::InviteToGroup, - // ) - // .unwrap(); -}