Skip to content

Commit

Permalink
Merge branch 'main' into refactor/conversation-enum-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Dec 20, 2024
2 parents 21bb413 + 0846cbd commit 18933a2
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 167 deletions.
78 changes: 53 additions & 25 deletions extensions/warp-ipfs/src/shuttle/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use rust_ipfs::{
use std::{path::Path, time::Duration};
use warp::error::{Error as WarpError, Error};

use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::store::topics::IDENTITY_ANNOUNCEMENT;
// use crate::shuttle::identity::protocol::RegisterError;
use super::{
identity::{
Expand All @@ -28,6 +26,8 @@ use super::{
},
subscription_stream::Subscriptions,
};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::store::topics::IDENTITY_ANNOUNCEMENT;
use crate::store::{
document::identity::IdentityDocument,
payload::{PayloadBuilder, PayloadMessage},
Expand Down Expand Up @@ -285,7 +285,9 @@ impl ShuttleTask {
continue;
}

let document = payload.message();
let Ok(document) = payload.message(None) else {
continue;
};

if document.verify().is_err() {
continue;
Expand Down Expand Up @@ -338,7 +340,27 @@ impl ShuttleTask {

tracing::info!(%sender_peer_id, "Processing Incoming Request");
let sender = payload.sender();
match payload.message() {

let message = match payload.message(None) {
Ok(message) => message,
Err(_e) => {
tracing::warn!(%sender, error = %_e, "could not parse payload");
let payload = payload_message_construct(
keypair,
None,
MessageResponse::Error("public key is invalid".into()),
)
.expect("Valid payload construction");

let bytes = payload.to_bytes().expect("valid deserialization");
_ = ipfs
.send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes))
.await;
return;
}
};

match message {
identity::protocol::Request::Register(Register::IsRegistered) => {
let peer_id = payload.sender();
let Ok(did) = peer_id.to_did() else {
Expand Down Expand Up @@ -391,8 +413,6 @@ impl ShuttleTask {
.await;
}
identity::protocol::Request::Register(Register::RegisterIdentity { root_cid }) => {
let root_cid = *root_cid;

tracing::debug!(%sender, %root_cid, "preloading root document");
if let Err(e) = ipfs.fetch(&root_cid).recursive().await {
tracing::warn!(%sender, %root_cid, error = %e, "unable to preload root document");
Expand Down Expand Up @@ -604,7 +624,7 @@ impl ShuttleTask {
.await;
}
identity::protocol::Mailbox::Send { did: to, request } => {
if !identity_storage.contains(to).await {
if !identity_storage.contains(&to).await {
tracing::warn!(%did, "Identity is not registered");
let payload = payload_message_construct(
keypair,
Expand Down Expand Up @@ -654,7 +674,7 @@ impl ShuttleTask {
return;
}

if let Err(e) = identity_storage.deliver_request(to, request).await {
if let Err(e) = identity_storage.deliver_request(&to, &request).await {
match e {
WarpError::InvalidSignature => {
tracing::warn!(%did, to = %to, "request could not be vertified");
Expand Down Expand Up @@ -751,7 +771,7 @@ impl ShuttleTask {

let keypair = ipfs.keypair();
tracing::debug!(%did, %package, "preloading root document");
if let Err(e) = ipfs.fetch(package).recursive().await {
if let Err(e) = ipfs.fetch(&package).recursive().await {
tracing::warn!(%did, %package, error = %e, "unable to preload root document");
return;
}
Expand All @@ -768,7 +788,7 @@ impl ShuttleTask {
}
};

let path = IpfsPath::from(*package)
let path = IpfsPath::from(package)
.sub_path("identity")
.expect("valid path");

Expand All @@ -786,7 +806,7 @@ impl ShuttleTask {
};

tracing::debug!(%did, %package, "root document preloaded");
if let Err(e) = identity_storage.update_user_document(&did, *package).await {
if let Err(e) = identity_storage.update_user_document(&did, package).await {
tracing::warn!(%did, %package, error = %e, "unable to store document");
return;
}
Expand Down Expand Up @@ -957,7 +977,26 @@ impl ShuttleTask {
};

tracing::info!(%peer_id, %did, "Processing Incoming Message Request");
match payload.message() {
let message = match payload.message(None) {
Ok(message) => message,
Err(_e) => {
tracing::warn!(%peer_id, error = %_e, "could not parse payload");
let payload = message::protocol::payload_message_construct(
keypair,
None,
MessageResponse::Error("public key is invalid".into()),
)
.expect("Valid payload construction");

let bytes = payload.to_bytes().expect("valid deserialization");
_ = ipfs
.send_response(sender_peer_id, id, (protocols::SHUTTLE_MESSAGE, bytes))
.await;
return;
}
};

match message {
message::protocol::Request::RegisterConversation(RegisterConversation {
..
}) => todo!(),
Expand All @@ -968,11 +1007,6 @@ impl ShuttleTask {
recipients,
message_cid,
} => {
let conversation_id = *conversation_id;
let message_id = *message_id;
let recipients = recipients.to_owned();
let message_cid = *message_cid;

tracing::info!(%conversation_id, %message_id, %did, "inserting message into mailbox");
if let Err(e) = message_storage
.insert_or_update(
Expand All @@ -993,9 +1027,6 @@ impl ShuttleTask {
conversation_id,
message_id,
} => {
let conversation_id = *conversation_id;
let message_id = *message_id;

tracing::info!(%conversation_id, %message_id, %did, "marking message as delivered");
if let Err(e) = message_storage
.message_delivered(&did, conversation_id, message_id)
Expand All @@ -1010,9 +1041,6 @@ impl ShuttleTask {
conversation_id,
message_id,
} => {
let conversation_id = *conversation_id;
let message_id = *message_id;

tracing::info!(%conversation_id, %message_id, %did, "removing message from mailbox");
if let Err(e) = message_storage
.remove_message(&did, conversation_id, message_id)
Expand All @@ -1026,11 +1054,11 @@ impl ShuttleTask {
},
message::protocol::Request::FetchMailBox { conversation_id } => {
let message = match message_storage
.get_unsent_messages(did, *conversation_id)
.get_unsent_messages(did, conversation_id)
.await
{
Ok(content) => message::protocol::Response::Mailbox {
conversation_id: *conversation_id,
conversation_id,
content,
},
Err(e) => message::protocol::Response::Error(e.to_string()),
Expand Down
Loading

0 comments on commit 18933a2

Please sign in to comment.