diff --git a/extensions/warp-ipfs/src/shuttle/server.rs b/extensions/warp-ipfs/src/shuttle/server.rs index 579be4f3c..b7265f1ce 100644 --- a/extensions/warp-ipfs/src/shuttle/server.rs +++ b/extensions/warp-ipfs/src/shuttle/server.rs @@ -1,8 +1,10 @@ +use chrono::Utc; use futures::stream::BoxStream; use futures::{future::BoxFuture, FutureExt, StreamExt}; use pollable_map::futures::ordered::OrderedFutureSet; use pollable_map::stream::StreamMap; use rust_ipfs::libp2p::request_response::InboundRequestId; +use rust_ipfs::SubscriptionStream; use rust_ipfs::{ libp2p::swarm::behaviour::toggle::Toggle, p2p::{IdentifyConfiguration, RelayConfig, TransportConfig}, @@ -12,6 +14,7 @@ use std::{path::Path, time::Duration}; use warp::error::{Error as WarpError, Error}; use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; +use crate::store::topics::IDENTITY_ANNOUNCEMENT; // use crate::shuttle::identity::protocol::RegisterError; use crate::store::{ document::identity::IdentityDocument, @@ -72,9 +75,9 @@ struct ShuttleTask { message_storage: super::store::messages::MessageStorage, subscriptions: super::subscription_stream::Subscriptions, requests: StreamMap>>, - identity_request_response: IdReqSt, message_request_response: MsgReqSt, + identity_announcement: SubscriptionStream, } impl ShuttleServer { @@ -203,10 +206,10 @@ impl ShuttleServer { }) .boxed(); - let mut subscriptions = Subscriptions::new(&ipfs, &identity, &message); - _ = subscriptions - .subscribe("/identity/announce/v0".into()) - .await; + let identity_announcement = ipfs.pubsub_subscribe(IDENTITY_ANNOUNCEMENT).await?; + + let subscriptions = Subscriptions::new(&ipfs, &identity, &message); + let mut server_event = ShuttleTask { ipfs: ipfs.clone(), subscriptions, @@ -216,6 +219,7 @@ impl ShuttleServer { requests: Default::default(), identity_request_response, message_request_response, + identity_announcement, }; let _handle = executor.spawn_abortable(async move { @@ -242,6 +246,38 @@ impl ShuttleTask { Some((peer_id, id, request)) = self.message_request_response.next() => { self.process_message_events(peer_id, id, request); } + Some(message) = self.identity_announcement.next() => { + // TODO: Score against the peer if it becomes a pattern of invalid data to eventually terminate their + // connection + if message.source.is_none() { + continue; + } + + let sender_peer_id = message.source.expect("valid peer id"); + + let Ok(payload) = PayloadMessage::::from_bytes(&message.data) else { + continue; + }; + + // We check the sender of the pubsub message to ensure that the peer is the original sender (either directly or indirectly) and not + // due to registration from another shuttle node + if sender_peer_id.ne(payload.sender()) || sender_peer_id.ne(payload.original_sender()) { + continue; + } + + let document = payload.message(); + + if document.verify().is_err() { + continue; + } + + // TODO: confirm that identity has been registered + + let timestamp = Utc::now(); + + tracing::info!(peer_id = %payload.sender(), did = %document.did, last_seen = %timestamp); + + } _ = self.requests.next() => { // }