Skip to content

Commit

Permalink
new flow works good
Browse files Browse the repository at this point in the history
  • Loading branch information
seemenkina committed Aug 8, 2024
1 parent 5ad3220 commit ebe7085
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 227 deletions.
1 change: 1 addition & 0 deletions ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ alloy = { git = "https://github.com/alloy-rs/alloy", features = [
"k256",
"rlp",
] }
tokio-util = "=0.7.11"

openmls = { version = "=0.5.0", features = ["test-utils"] }
rand = { version = "^0.8" }
Expand Down
22 changes: 5 additions & 17 deletions ds/src/chat_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use alloy::primitives::Address;
use alloy::providers::ProviderBuilder;
use alloy::rlp::Encodable;
use alloy::signers::Signature;
use alloy::signers::{local::PrivateKeySigner, SignerSync};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
Expand Down Expand Up @@ -74,7 +71,6 @@ impl ResponseMLSPayload {

pub struct ChatClient {
sender: mpsc::UnboundedSender<Message>,
pub request_in_progress: Arc<Mutex<()>>,
}

impl ChatClient {
Expand All @@ -92,7 +88,7 @@ impl ChatClient {
// Spawn a task to handle outgoing messages
tokio::spawn(async move {
while let Some(message) = receiver.lock().await.recv().await {
println!("Message from reciever: {}", message);
// println!("Message from reciever: {}", message);
if let Err(e) = write.send(message).await {
eprintln!("Error sending message: {}", e);
}
Expand All @@ -104,7 +100,6 @@ impl ChatClient {
let mut read = read;
while let Some(message) = read.next().await {
if let Ok(msg) = message {
println!("Message from read: {}", msg);
if let Err(e) = msg_sender.send(msg) {
eprintln!("Failed to send message to channel: {}", e);
}
Expand All @@ -121,29 +116,21 @@ impl ChatClient {
.send(Message::Text(join_json))
.map_err(|_| ChatServiceError::SendError)?;

Ok((
ChatClient {
sender,
request_in_progress: Arc::new(Mutex::new(())),
},
msg_receiver,
))
Ok((ChatClient { sender }, msg_receiver))
}

pub async fn send_request(&self, msg: ServerMessage) -> Result<(), ChatServiceError> {
let _guard = self.request_in_progress.lock().await;
self.send_message_to_server(msg)?;
Ok(())
}

pub async fn handle_response(&self) -> Result<(), ChatServiceError> {
drop(self.request_in_progress.lock().await);
Ok(())
}

pub fn send_message_to_server(&self, msg: ServerMessage) -> Result<(), ChatServiceError> {
let msg_json = serde_json::to_string(&msg).unwrap();
println!("Message to sender: {}", msg_json);
// println!("Message to sender: {}", msg_json);
self.sender
.send(Message::Text(msg_json))
.map_err(|_| ChatServiceError::SendError)?;
Expand All @@ -153,7 +140,8 @@ impl ChatClient {

#[test]
fn test_sign() {
let signer = PrivateKeySigner::from_str(
use alloy::signers::SignerSync;
let signer = alloy::signers::local::PrivateKeySigner::from_str(
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d",
)
.unwrap();
Expand Down
5 changes: 1 addition & 4 deletions ds/src/chat_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::{
net::{TcpListener, TcpStream},
net::TcpListener,
sync::{mpsc, Mutex},
task,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};

use crate::chat_client::ChatMessages;
use crate::ChatServiceError;

type Tx = mpsc::UnboundedSender<Message>;
Expand Down
116 changes: 115 additions & 1 deletion ds/src/ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use fred::{
types::Message,
};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::Receiver;
use tokio::sync::{
broadcast::Receiver,
mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;

use openmls::{framing::MlsMessageOut, prelude::TlsSerializeTrait};
// use waku_bindings::*;
Expand Down Expand Up @@ -68,14 +72,124 @@ impl RClient {
}
}

pub struct RClientPrivate {
chat_id: String,
sender_id: String,
token: CancellationToken,
sender: UnboundedSender<Vec<u8>>,
reciever: UnboundedReceiver<Vec<u8>>,
}

impl RClientPrivate {
pub async fn new_for_users(
chat_id: String,
sender_id: String,
) -> Result<Self, DeliveryServiceError> {
let redis_client = RedisClient::default();
let subscriber: SubscriberClient =
Builder::default_centralized().build_subscriber_client()?;
redis_client.init().await?;
subscriber.init().await?;
subscriber.subscribe(chat_id.clone()).await?;

let (sender_publish, mut receiver_publish) = mpsc::unbounded_channel::<Vec<u8>>();
let (sender_subscriber, receiver_subscriber) = mpsc::unbounded_channel::<Vec<u8>>();

let chat_id_c = chat_id.clone();
let redis_client_c = redis_client.clone();
tokio::spawn(async move {
while let Some(msg) = receiver_publish.recv().await {
redis_client_c
.publish(chat_id_c.to_owned(), msg.as_slice())
.await?;
}
Ok::<_, DeliveryServiceError>(())
});
let subscriber_c = subscriber.clone();
tokio::spawn(async move {
let mut ss = subscriber_c.message_rx();
while let Ok(msg) = ss.recv().await {
let bytes: Vec<u8> = msg.value.convert()?;
if let Err(e) = sender_subscriber.send(bytes) {
return Err(DeliveryServiceError::TokioSendError(e));
}
}
Ok(())
});

let token = CancellationToken::new();

let cloned_token = token.clone();
let chat_id_c = chat_id.clone();
tokio::spawn(async move {
token.cancelled().await;
subscriber.unsubscribe(chat_id_c).await?;
subscriber.quit().await?;
redis_client.quit().await?;
Ok::<_, DeliveryServiceError>(())
});

Ok(RClientPrivate {
chat_id: chat_id.clone(),
sender_id,
token: cloned_token,
sender: sender_publish,
reciever: receiver_subscriber,
})
}

pub fn msg_send(&mut self, msg: Vec<u8>) -> Result<(), DeliveryServiceError> {
let json_value = SenderStruct {
sender: self.sender_id.clone(),
msg,
};
let bytes = serde_json::to_vec(&json_value)?;
self.sender.send(bytes)?;
Ok(())
}

pub async fn msg_recv(&mut self) -> Result<Vec<u8>, DeliveryServiceError> {
while let Some(msg) = self.reciever.recv().await {
let m: SenderStruct = serde_json::from_slice(&msg)?;
if m.sender == self.sender_id {
continue;
}
return Ok(m.msg);
}
Err(DeliveryServiceError::EmptyMsgError)
}
}

#[derive(Debug, thiserror::Error)]
pub enum DeliveryServiceError {
#[error("Empty msg")]
EmptyMsgError,
#[error("Json error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Redis error: {0}")]
RedisError(#[from] RedisError),
#[error("Channel sender error: {0}")]
TokioSendError(#[from] SendError<Vec<u8>>),
#[error("Serialization problem: {0}")]
TlsError(#[from] tls_codec::Error),
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}

#[tokio::test]
async fn private_test() {
let mut alice = RClientPrivate::new_for_users("alicebob".to_string(), "alice".to_string())
.await
.unwrap();
let mut bob = RClientPrivate::new_for_users("alicebob".to_string(), "bob".to_string())
.await
.unwrap();

alice.msg_send("Hi bob".as_bytes().to_vec()).unwrap();
let msg = bob.msg_recv().await.unwrap();
assert_eq!("Hi bob".to_string(), String::from_utf8(msg).unwrap());

bob.msg_send("Hi alice".as_bytes().to_vec()).unwrap();
let msg = alice.msg_recv().await.unwrap();
assert_eq!("Hi alice".to_string(), String::from_utf8(msg).unwrap());
}
4 changes: 1 addition & 3 deletions sc_key_store/src/sc_ks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ impl<T: Transport + Clone, P: Provider<T, N>, N: Network> SCKeyStoreService

async fn add_user(&mut self, address: &str) -> Result<(), KeyStoreError> {
if self.does_user_exist(address).await? {
println!("User already exist");
// return Err(KeyStoreError::AlreadyExistedUserError);
return Ok(());
return Err(KeyStoreError::AlreadyExistedUserError);
}

let add_to_acl_binding = self.instance.addUser(Address::from_str(address)?);
Expand Down
19 changes: 18 additions & 1 deletion src/contact.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy::{hex::ToHexExt, primitives::SignatureError, signers::Signature};
use alloy::{hex::ToHexExt, primitives::SignatureError};
use ds::{
chat_client::{
ChatClient, ChatMessages, ReqMessageType, RequestMLSPayload, ResponseMLSPayload,
Expand All @@ -12,11 +12,13 @@ use openmls::prelude::MlsMessageOut;
use std::{collections::HashMap, sync::Arc};
use tls_codec::Serialize;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

pub const CHAT_SERVER_ADDR: &str = "ws://127.0.0.1:8080";

pub struct ContactsList {
contacts: Arc<Mutex<HashMap<String, Contact>>>,
pub future_req: HashMap<String, CancellationToken>,
pub chat_client: ChatClient,
}

Expand Down Expand Up @@ -51,6 +53,7 @@ impl ContactsList {
pub async fn new(chat_client: ChatClient) -> Result<Self, ContactError> {
Ok(ContactsList {
contacts: Arc::new(Mutex::new(HashMap::new())),
future_req: HashMap::new(),
chat_client,
})
}
Expand Down Expand Up @@ -82,6 +85,9 @@ impl ContactsList {
sc_address: String,
msg_type: ReqMessageType,
) -> Result<(), ContactError> {
self.future_req
.insert(user_address.to_string(), CancellationToken::new());

let req = ChatMessages::Request(RequestMLSPayload::new(sc_address, msg_type));
self.chat_client
.send_request(ServerMessage::InMessage {
Expand All @@ -90,6 +96,7 @@ impl ContactsList {
msg: serde_json::to_string(&req)?,
})
.await?;

Ok(())
}

Expand Down Expand Up @@ -195,6 +202,16 @@ impl ContactsList {

Ok(joiners_kp)
}

pub fn handle_response(&mut self, user_address: &str) -> Result<(), ContactError> {
match self.future_req.get(user_address) {
Some(token) => {
token.cancel();
Ok(())
}
None => Err(ContactError::UnknownUserError),
}
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
1 change: 0 additions & 1 deletion src/identity.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloy::primitives::Address;
use hex;
use std::collections::HashMap;

use openmls::{credentials::CredentialWithKey, key_packages::*, prelude::*};
Expand Down
Loading

0 comments on commit ebe7085

Please sign in to comment.