diff --git a/extensions/warp-ipfs/shuttle/src/main.rs b/extensions/warp-ipfs/shuttle/src/main.rs index 6b00d75d3..e13e2c807 100644 --- a/extensions/warp-ipfs/shuttle/src/main.rs +++ b/extensions/warp-ipfs/shuttle/src/main.rs @@ -117,7 +117,7 @@ async fn main() -> Result<(), Box> { let local_peer_id = keypair.public().to_peer_id(); println!("Local PeerID: {local_peer_id}"); - let _ = shuttle::server::ShuttleServer::new( + let _handle = shuttle::server::ShuttleServer::new( &keypair, path, opts.enable_relay_server, diff --git a/extensions/warp-ipfs/src/shuttle/server.rs b/extensions/warp-ipfs/src/shuttle/server.rs index 8fa3177d7..fbed6f2be 100644 --- a/extensions/warp-ipfs/src/shuttle/server.rs +++ b/extensions/warp-ipfs/src/shuttle/server.rs @@ -1,17 +1,13 @@ -use bytes::Bytes; -use futures::channel::oneshot; use futures::stream::BoxStream; use futures::{future::BoxFuture, FutureExt, StreamExt}; use pollable_map::futures::ordered::OrderedFutureSet; use pollable_map::stream::StreamMap; +use rust_ipfs::libp2p::request_response::InboundRequestId; use rust_ipfs::{ libp2p::swarm::behaviour::toggle::Toggle, p2p::{IdentifyConfiguration, RelayConfig, TransportConfig}, FDLimit, Ipfs, IpfsPath, Keypair, Multiaddr, NetworkBehaviour, PeerId, UninitializedIpfs, }; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::{path::Path, time::Duration}; use warp::error::{Error as WarpError, Error}; @@ -45,7 +41,7 @@ use super::{ subscription_stream::Subscriptions, }; -type OneshotSender = oneshot::Sender; +// type OneshotSender = oneshot::Sender; type IdentityMessage = identity::protocol::Request; type IdentityPayload = PayloadMessage; @@ -64,22 +60,8 @@ pub struct ShuttleServer { _handle: AbortableJoinHandle<()>, } -type IdReqSt = BoxStream< - 'static, - ( - PeerId, - Result, - oneshot::Sender, - ), ->; -type MsgReqSt = BoxStream< - 'static, - ( - PeerId, - Result, - oneshot::Sender, - ), ->; +type IdReqSt = BoxStream<'static, (PeerId, InboundRequestId, Result)>; +type MsgReqSt = BoxStream<'static, (PeerId, InboundRequestId, Result)>; #[allow(clippy::type_complexity)] #[allow(dead_code)] @@ -205,20 +187,20 @@ impl ShuttleServer { let identity_request_response = ipfs .requests_subscribe(protocols::SHUTTLE_IDENTITY) .await? - .map(|(peer_id, request, response)| { + .map(|(peer_id, id, request)| { let payload: Result, _> = PayloadMessage::from_bytes(&request); - (peer_id, payload, response) + (peer_id, id, payload) }) .boxed(); let message_request_response = ipfs .requests_subscribe(protocols::SHUTTLE_MESSAGE) .await? - .map(|(peer_id, request, response)| { + .map(|(peer_id, id, request)| { let payload: Result, _> = PayloadMessage::from_bytes(&request); - (peer_id, payload, response) + (peer_id, id, payload) }) .boxed(); @@ -255,11 +237,11 @@ impl ShuttleTask { loop { tokio::select! { biased; - Some((peer_id, request, response)) = self.identity_request_response.next() => { - self.process_identity_events(peer_id, request, response); + Some((peer_id, id, request)) = self.identity_request_response.next() => { + self.process_identity_events(peer_id, id, request); } - Some((peer_id, request, response)) = self.message_request_response.next() => { - self.process_message_events(peer_id, request, response); + Some((peer_id, id, request)) = self.message_request_response.next() => { + self.process_message_events(peer_id, id, request); } _ = self.requests.next() => { // @@ -270,32 +252,37 @@ impl ShuttleTask { fn process_identity_events( &mut self, - peer_id: PeerId, + sender_peer_id: PeerId, + id: InboundRequestId, payload: Result, - resp: OneshotSender, ) { let ipfs = self.ipfs.clone(); let identity_storage = self.identity_storage.clone(); let mut subscriptions = self.subscriptions.clone(); - let payload = match payload { - Ok(payload) => payload, - Err(e) => { - let payload = - payload_message_construct(ipfs.keypair(), None, Response::Error(e.to_string())) - .expect("Valid payload construction"); - - let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); - return; - } - }; - let fut = async move { let keypair = ipfs.keypair(); - tracing::info!(%peer_id, "Processing Incoming Request"); + + let payload = match payload { + Ok(payload) => payload, + Err(e) => { + let payload = payload_message_construct( + ipfs.keypair(), + None, + Response::Error(e.to_string()), + ) + .expect("Valid payload construction"); + + let bytes = payload.to_bytes().expect("valid deserialization"); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; + return; + } + }; + + tracing::info!(%sender_peer_id, "Processing Incoming Request"); let sender = payload.sender(); - let resp = resp; match payload.message() { identity::protocol::Request::Register(Register::IsRegistered) => { let peer_id = payload.sender(); @@ -311,7 +298,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; }; @@ -327,7 +316,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -340,7 +331,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; } identity::protocol::Request::Register(Register::RegisterIdentity { root_cid }) => { let root_cid = *root_cid; @@ -383,7 +376,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -400,7 +395,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -422,7 +419,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -453,7 +452,9 @@ impl ShuttleTask { ) .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; } identity::protocol::Request::Mailbox(event) => { let peer_id = payload.sender(); @@ -469,7 +470,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; }; @@ -486,7 +489,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -512,7 +517,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; } identity::protocol::Mailbox::FetchFrom { .. } => { tracing::warn!(%did, "accessed to unimplemented request"); @@ -529,7 +540,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; } identity::protocol::Mailbox::Send { did: to, request } => { if !identity_storage.contains(to).await { @@ -546,7 +563,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; return; } @@ -565,7 +588,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; return; } @@ -587,7 +616,13 @@ impl ShuttleTask { let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; } e => { tracing::warn!(%did, to = %to, "could not deliver request"); @@ -606,7 +641,13 @@ impl ShuttleTask { let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; } } return; @@ -624,7 +665,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; } } } @@ -633,11 +680,12 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; - let peer_id = payload.sender(); - let Ok(did) = peer_id.to_did() else { - tracing::warn!(%peer_id, "Could not convert to did key"); + let Ok(did) = sender.to_did() else { + tracing::warn!(%sender, "Could not convert to did key"); return; }; @@ -721,7 +769,13 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response( + sender_peer_id, + id, + (protocols::SHUTTLE_IDENTITY, bytes), + ) + .await; return; } @@ -742,7 +796,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; return; } @@ -767,7 +823,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; } identity::protocol::Request::Lookup(kind) => { let peer_id = payload.sender(); @@ -784,16 +842,19 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_IDENTITY, bytes)) + .await; } } }; - let tasks = match self.requests.get_mut(&peer_id) { + let tasks = match self.requests.get_mut(&sender_peer_id) { Some(task) => task, None => { - self.requests.insert(peer_id, OrderedFutureSet::new()); - self.requests.get_mut(&peer_id).expect("valid entry") + self.requests + .insert(sender_peer_id, OrderedFutureSet::new()); + self.requests.get_mut(&sender_peer_id).expect("valid entry") } }; tasks.push(fut.boxed()); @@ -801,29 +862,34 @@ impl ShuttleTask { fn process_message_events( &mut self, - peer_id: PeerId, + sender_peer_id: PeerId, + id: InboundRequestId, payload: Result, - resp: OneshotSender, ) { let ipfs = self.ipfs.clone(); let message_storage = self.message_storage.clone(); - let payload = match payload { - Ok(payload) => payload, - Err(e) => { - let payload = - payload_message_construct(ipfs.keypair(), None, Response::Error(e.to_string())) - .expect("Valid payload construction"); - - let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); - return; - } - }; - let fut = async move { let keypair = ipfs.keypair(); - tracing::info!(%peer_id, "Processing Incoming Request"); + tracing::info!(%sender_peer_id, "Processing Incoming Request"); + + let payload = match payload { + Ok(payload) => payload, + Err(e) => { + let payload = payload_message_construct( + ipfs.keypair(), + None, + Response::Error(e.to_string()), + ) + .expect("Valid payload construction"); + + let bytes = payload.to_bytes().expect("valid deserialization"); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_MESSAGE, bytes)) + .await; + return; + } + }; let peer_id = payload.sender(); let Ok(did) = peer_id.to_did() else { @@ -836,7 +902,9 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_MESSAGE, bytes)) + .await; return; }; @@ -926,16 +994,19 @@ impl ShuttleTask { .expect("Valid payload construction"); let bytes = payload.to_bytes().expect("valid deserialization"); - _ = resp.send(bytes); + _ = ipfs + .send_response(sender_peer_id, id, (protocols::SHUTTLE_MESSAGE, bytes)) + .await; } } }; - let tasks = match self.requests.get_mut(&peer_id) { + let tasks = match self.requests.get_mut(&sender_peer_id) { Some(task) => task, None => { - self.requests.insert(peer_id, OrderedFutureSet::new()); - self.requests.get_mut(&peer_id).expect("valid entry") + self.requests + .insert(sender_peer_id, OrderedFutureSet::new()); + self.requests.get_mut(&sender_peer_id).expect("valid entry") } }; tasks.push(fut.boxed());