Skip to content

Commit

Permalink
chore: log identity announcements for last seen
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 8, 2024
1 parent 383933b commit 5888108
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions extensions/warp-ipfs/src/shuttle/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::Utc;
use futures::stream::BoxStream;
use futures::{future::BoxFuture, FutureExt, StreamExt};
use pollable_map::futures::ordered::OrderedFutureSet;
use pollable_map::stream::StreamMap;
use rust_ipfs::libp2p::request_response::InboundRequestId;
use rust_ipfs::SubscriptionStream;
use rust_ipfs::{
libp2p::swarm::behaviour::toggle::Toggle,
p2p::{IdentifyConfiguration, RelayConfig, TransportConfig},
Expand All @@ -12,6 +14,7 @@ use std::{path::Path, time::Duration};
use warp::error::{Error as WarpError, Error};

use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::store::topics::IDENTITY_ANNOUNCEMENT;
// use crate::shuttle::identity::protocol::RegisterError;
use crate::store::{
document::identity::IdentityDocument,
Expand Down Expand Up @@ -72,9 +75,9 @@ struct ShuttleTask {
message_storage: super::store::messages::MessageStorage,
subscriptions: super::subscription_stream::Subscriptions,
requests: StreamMap<PeerId, OrderedFutureSet<BoxFuture<'static, ()>>>,

identity_request_response: IdReqSt,
message_request_response: MsgReqSt,
identity_announcement: SubscriptionStream,
}

impl ShuttleServer {
Expand Down Expand Up @@ -203,10 +206,10 @@ impl ShuttleServer {
})
.boxed();

let mut subscriptions = Subscriptions::new(&ipfs, &identity, &message);
_ = subscriptions
.subscribe("/identity/announce/v0".into())
.await;
let identity_announcement = ipfs.pubsub_subscribe(IDENTITY_ANNOUNCEMENT).await?;

let subscriptions = Subscriptions::new(&ipfs, &identity, &message);

let mut server_event = ShuttleTask {
ipfs: ipfs.clone(),
subscriptions,
Expand All @@ -216,6 +219,7 @@ impl ShuttleServer {
requests: Default::default(),
identity_request_response,
message_request_response,
identity_announcement,
};

let _handle = executor.spawn_abortable(async move {
Expand All @@ -242,6 +246,38 @@ impl ShuttleTask {
Some((peer_id, id, request)) = self.message_request_response.next() => {
self.process_message_events(peer_id, id, request);
}
Some(message) = self.identity_announcement.next() => {
// TODO: Score against the peer if it becomes a pattern of invalid data to eventually terminate their
// connection
if message.source.is_none() {
continue;
}

let sender_peer_id = message.source.expect("valid peer id");

let Ok(payload) = PayloadMessage::<IdentityDocument>::from_bytes(&message.data) else {
continue;
};

// We check the sender of the pubsub message to ensure that the peer is the original sender (either directly or indirectly) and not
// due to registration from another shuttle node
if sender_peer_id.ne(payload.sender()) || sender_peer_id.ne(payload.original_sender()) {
continue;
}

let document = payload.message();

if document.verify().is_err() {
continue;
}

// TODO: confirm that identity has been registered

let timestamp = Utc::now();

tracing::info!(peer_id = %payload.sender(), did = %document.did, last_seen = %timestamp);

}
_ = self.requests.next() => {
//
}
Expand Down

0 comments on commit 5888108

Please sign in to comment.