From 27ea2d3e82b4a62c11c1ae8fb214876beb8b81f4 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 16 Dec 2024 23:12:03 -0600 Subject: [PATCH 01/13] feat: Add shuttle hotspot --- Cargo.toml | 1 + extensions/warp-ipfs/hotspot/Cargo.toml | 25 ++ extensions/warp-ipfs/hotspot/src/main.rs | 254 +++++++++++++ extensions/warp-ipfs/src/hotspot/mod.rs | 451 +++++++++++++++++++++++ extensions/warp-ipfs/src/lib.rs | 1 + 5 files changed, 732 insertions(+) create mode 100644 extensions/warp-ipfs/hotspot/Cargo.toml create mode 100644 extensions/warp-ipfs/hotspot/src/main.rs create mode 100644 extensions/warp-ipfs/src/hotspot/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 7f5937428..00c1c6986 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "extensions/*", "warp", "tools/*", + "extensions/warp-ipfs/hotspot", "extensions/warp-ipfs/shuttle", "extensions/warp-ipfs/examples/wasm-ipfs-identity", "extensions/warp-ipfs/examples/wasm-ipfs-friends", diff --git a/extensions/warp-ipfs/hotspot/Cargo.toml b/extensions/warp-ipfs/hotspot/Cargo.toml new file mode 100644 index 000000000..e5ba83117 --- /dev/null +++ b/extensions/warp-ipfs/hotspot/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "hotspot" +version = "0.1.0" +edition = "2021" + +[dependencies] +warp-ipfs = { path = "../" } +rust-ipfs = { workspace = true, features = ["webrtc_transport", "experimental_stream"] } +anyhow.workspace = true +serde.workspace = true +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-appender = "0.2" +pollable-map = "0.1.0-alpha.1" + +clap = { version = "4.4", features = ["derive"] } +zeroize.workspace = true +dotenv = "0.15" +base64 = "0.21" + +bs58.workspace = true + +tokio = { workspace = true } +toml = "0.5.11" +serde_json = "1.0.133" \ No newline at end of file diff --git a/extensions/warp-ipfs/hotspot/src/main.rs b/extensions/warp-ipfs/hotspot/src/main.rs new file mode 100644 index 000000000..36d725f96 --- /dev/null +++ b/extensions/warp-ipfs/hotspot/src/main.rs @@ -0,0 +1,254 @@ +use base64::alphabet::STANDARD; +use base64::engine::general_purpose::PAD; +use base64::engine::GeneralPurpose; +use base64::Engine; +use clap::Parser; +use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use std::{num::NonZeroU32, path::PathBuf, time::Duration}; +use rust_ipfs::{Keypair, Multiaddr}; +use rust_ipfs::p2p::{RateLimit, RelayConfig}; +use warp_ipfs::hotspot; +use zeroize::Zeroizing; + +fn decode_kp(kp: &str) -> anyhow::Result { + let engine = GeneralPurpose::new(&STANDARD, PAD); + let keypair_bytes = Zeroizing::new(engine.decode(kp.as_bytes())?); + let keypair = Keypair::from_protobuf_encoding(&keypair_bytes)?; + Ok(keypair) +} + +fn encode_kp(kp: &Keypair) -> anyhow::Result { + let bytes = kp.to_protobuf_encoding()?; + let engine = GeneralPurpose::new(&STANDARD, PAD); + let kp_encoded = engine.encode(bytes); + Ok(kp_encoded) +} + +#[derive(Clone, Deserialize, Serialize)] +struct Config { + pub max_circuits: Option, + pub max_circuits_per_peer: Option, + pub max_circuit_duration: Option, + pub max_circuit_bytes: Option, + pub circuit_rate_limiters: Option>, + pub max_reservations_per_peer: Option, + pub max_reservations: Option, + pub reservation_duration: Option, + pub reservation_rate_limiters: Option>, +} + +#[derive(Clone, Copy, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum Rate { + PerPeer { + limit: NonZeroU32, + interval: Duration, + }, + PerIp { + limit: NonZeroU32, + interval: Duration, + }, +} + +impl From for RateLimit { + fn from(rate: Rate) -> Self { + match rate { + Rate::PerPeer { limit, interval } => RateLimit::PerPeer { limit, interval }, + Rate::PerIp { limit, interval } => RateLimit::PerIp { limit, interval }, + } + } +} + +impl Default for Config { + fn default() -> Self { + Self { + max_circuits: Some(32768), + max_circuits_per_peer: Some(32768), + max_circuit_duration: Some(Duration::from_secs(60 * 2)), + max_circuit_bytes: Some(512 * 1024 * 1024), + circuit_rate_limiters: Some(vec![ + Rate::PerPeer { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(60 * 2), + }, + Rate::PerIp { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + ]), + max_reservations_per_peer: Some(32768), + max_reservations: Some(32768), + reservation_duration: Some(Duration::from_secs(60 * 60)), + reservation_rate_limiters: Some(vec![ + Rate::PerPeer { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + Rate::PerIp { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + ]), + } + } +} + +impl From for RelayConfig { + fn from(config: Config) -> Self { + let mut circuit_src_rate_limiters = vec![]; + let circuit_rate = config.circuit_rate_limiters.unwrap_or_default(); + circuit_src_rate_limiters.extend(circuit_rate.iter().map(|s| (*s).into())); + + let mut reservation_rate_limiters = vec![]; + let reservation_rate = config.reservation_rate_limiters.unwrap_or_default(); + reservation_rate_limiters.extend(reservation_rate.iter().map(|s| (*s).into())); + + RelayConfig { + max_circuits: config.max_circuits.unwrap_or(32768), + max_circuits_per_peer: config.max_circuits_per_peer.unwrap_or(32768), + max_circuit_duration: config + .max_circuit_duration + .unwrap_or(Duration::from_secs(2 * 60)), + max_circuit_bytes: config.max_circuit_bytes.unwrap_or(512 * 1024 * 1024), + circuit_src_rate_limiters, + max_reservations_per_peer: config.max_reservations_per_peer.unwrap_or(21768), + max_reservations: config.max_reservations.unwrap_or(32768), + reservation_duration: config + .reservation_duration + .unwrap_or(Duration::from_secs(60 * 60)), + reservation_rate_limiters, + } + } +} + +#[derive(Debug, Parser)] +#[clap(name = "shuttle-hotspot")] +struct Opt { + /// Listening addresses in multiaddr format. If empty, will listen on all addresses available + #[clap(long)] + listen_addr: Vec, + + /// External address in multiaddr format that would indicate how the node can be reached. + /// If empty, all listening addresses will be used as an external address + #[clap(long)] + external_addr: Vec, + + /// Path to key file + #[clap(long)] + keyfile: Option, + + /// Path to a configuration file to adjust relay setting + #[clap(long)] + relay_config: Option, + + /// Enables secured websockets + #[clap(long)] + enable_wss: bool, + + /// Enables webrtc + #[clap(long)] + enable_webrtc: bool, + + /// Use unbounded configuration with higher limits + #[clap(long)] + unbounded: bool, + + /// TLS Certificate when websocket is used + /// Note: websocket required a signed certificate. + #[clap(long)] + ws_tls_certificate: Option>, + + /// TLS Private Key when websocket is used + #[clap(long)] + ws_tls_private_key: Option, +} + +#[cfg(not(target_arch = "wasm32"))] +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let opts = Opt::parse(); + + let keypair = match opts.keyfile { + Some(kp) => match kp.is_file() { + true => { + tracing::info!("Reading keypair from {}", kp.display()); + let kp_str = tokio::fs::read_to_string(&kp).await?; + decode_kp(&kp_str)? + } + false => { + tracing::info!("Generating keypair"); + let k = Keypair::generate_ed25519(); + let encoded_kp = encode_kp(&k)?; + tracing::info!("Saving keypair to {}", kp.display()); + tokio::fs::write(kp, &encoded_kp).await?; + k + } + }, + None => { + tracing::info!("Generating keypair"); + Keypair::generate_ed25519() + } + }; + + let config = match opts.relay_config { + Some(path) => match path.is_file() { + true => { + let conf = tokio::fs::read_to_string(path).await?; + let config: Config = toml::from_str(&conf)?; + config + } + false => { + let config = Config::default(); + let bytes = toml::to_string(&config)?; + tokio::fs::write(path, &bytes).await?; + config + } + }, + None => Config::default(), + }; + + let (ws_cert, ws_pk) = match (opts.ws_tls_certificate, opts.ws_tls_private_key) { + (Some(cert), Some(prv)) => { + let mut certs = Vec::with_capacity(cert.len()); + for c in cert { + let Ok(cert) = tokio::fs::read_to_string(c).await else { + continue; + }; + certs.push(cert); + } + + let prv = tokio::fs::read_to_string(prv).await.ok(); + ((!certs.is_empty()).then_some(certs), prv) + } + _ => (None, None), + }; + + let wss_opt = ws_cert.and_then(|list| ws_pk.map(|k| (list, k))); + + let local_peer_id = keypair.public().to_peer_id(); + println!("Local PeerID: {local_peer_id}"); + + let _handle = hotspot::Hotspot::new( + &keypair, + opts.enable_wss, + wss_opt, + opts.enable_webrtc, + true, + false, + &opts.listen_addr, + &opts.external_addr, + Some(config.into()), + true, + ) + .await?; + + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +#[cfg(target_arch = "wasm32")] +fn main() {} diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs new file mode 100644 index 000000000..177a0f859 --- /dev/null +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -0,0 +1,451 @@ +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; +use crate::store::document::identity::IdentityDocument; +use crate::store::payload::PayloadMessage; +use crate::store::topics::{PeerTopic, IDENTITY_ANNOUNCEMENT}; +use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use futures_timer::Delay; +use pollable_map::futures::FutureMap; +use rust_ipfs::libp2p::gossipsub::Message; +use rust_ipfs::p2p::{ + IdentifyConfiguration, PubsubConfig, RelayConfig, TransportConfig, UpgradeVersion, +}; +use rust_ipfs::Keypair; +use rust_ipfs::{ + FDLimit, Ipfs, Multiaddr, SubscriptionStream, + UninitializedIpfs, +}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; +use rust_ipfs::libp2p::swarm::behaviour::toggle::Toggle; +use warp::crypto::DID; + +pub struct Hotspot { + _ipfs: Ipfs, + _handle: AbortableJoinHandle<()>, +} + +impl Hotspot { + pub async fn new( + keypair: &Keypair, + enable_wss: bool, + wss_certs_and_key: Option<(Vec, String)>, + webrtc_enable: bool, + enable_relay_server: bool, + memory_transport: bool, + listen_addrs: &[Multiaddr], + external_addrs: &[Multiaddr], + relay_config: Option, + ext: bool, + ) -> anyhow::Result { + let executor = LocalExecutor; + + let addrs = match listen_addrs { + [] => vec![ + "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/0/ws".parse().unwrap(), + "/ip4/0.0.0.0/tcp/0/wss".parse().unwrap(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + "/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap(), + ], + addrs => addrs.to_vec(), + }; + + let mut uninitialized = UninitializedIpfs::new() + .with_identify(IdentifyConfiguration { + agent_version: format!("shuttle/hotspot/{}", env!("CARGO_PKG_VERSION")), + ..Default::default() + }) + .with_ping(Default::default()) + .fd_limit(FDLimit::Max) + .set_keypair(&keypair) + .with_pubsub(PubsubConfig { + max_transmit_size: 4 * 1024 * 1024, + ..Default::default() + }).set_listening_addrs(addrs) + .set_transport_configuration(TransportConfig { + enable_webrtc: webrtc_enable, + enable_memory_transport: memory_transport, + enable_websocket: enable_wss, + enable_secure_websocket: enable_wss, + websocket_pem: wss_certs_and_key, + version: UpgradeVersion::Standard, + ..Default::default() + }) + .with_custom_behaviour(Toggle::from(ext.then_some(ext_behaviour::Behaviour::new( + keypair.public().to_peer_id(), + !external_addrs.is_empty(), + )))); + + if enable_relay_server { + uninitialized = uninitialized + .with_relay_server(relay_config.unwrap_or_default()) + .with_relay(true); + } + + if external_addrs.is_empty() { + uninitialized = uninitialized.listen_as_external_addr(); + } + + let _ipfs = uninitialized.start().await?; + + for external_addr in external_addrs { + _ipfs.add_external_address(external_addr.clone()).await?; + } + + let task = HotspotTask::new(&_ipfs).await; + + let _handle = executor.spawn_abortable(task); + + let hotspot = Hotspot { _ipfs, _handle }; + + Ok(hotspot) + } +} + +struct HotspotTask { + ipfs: Ipfs, + valid_identities: FutureMap, + announcement_stream_st: SubscriptionStream, +} + +impl HotspotTask { + pub async fn new(ipfs: &Ipfs) -> Self { + let announcement_stream_st = ipfs + .pubsub_subscribe(IDENTITY_ANNOUNCEMENT) + .await + .expect("valid subscription"); + let valid_identities = FutureMap::new(); + Self { + ipfs: ipfs.clone(), + valid_identities, + announcement_stream_st, + } + } +} + +impl HotspotTask { + fn handle_announcement_payload(&mut self, message: Message) -> anyhow::Result<()> { + let sender_peer_id = message.source.expect("valid peer id"); + + let payload = PayloadMessage::::from_bytes(&message.data)?; + + // We check the sender of the pubsub message to ensure that the peer is the original sender (either directly or indirectly) and not + // due to registration from another shuttle node + if sender_peer_id.ne(payload.sender()) || sender_peer_id.ne(payload.original_sender()) { + anyhow::bail!("sender is not the original sender"); + } + + let document = payload.message(); + + let document_did = &document.did; + + document.verify()?; + + let user = match self.valid_identities.get_mut(document_did) { + Some(fut) => fut, + None => { + let user = HotspotUser::new(&self.ipfs, document.clone()); + self.valid_identities.insert(document_did.clone(), user); + self.valid_identities + .get_mut(document_did) + .expect("identity exist due to previous insertion") + } + }; + + // TODO: Maybe perform in match condition to prevent nesdless update if the document entry is new? + user.update_identity_document(document.clone()); + + Ok(()) + } +} + +impl Future for HotspotTask { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + while let Poll::Ready(Some(message)) = self.announcement_stream_st.poll_next_unpin(cx) { + if let Err(e) = self.handle_announcement_payload(message) { + tracing::error!(error = %e, "unable to handle announcement payload"); + } + } + + while let Poll::Ready(Some((did, _))) = self.valid_identities.poll_next_unpin(cx) { + tracing::info!(identity = %did, "identity timed out"); + } + + Poll::Pending + } +} + +enum HotspotStreamState { + Pending { + future: BoxFuture<'static, anyhow::Result>, + }, + Usable { + stream: SubscriptionStream, + }, +} + +impl HotspotStreamState { + pub fn poll_state(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match self { + Self::Pending { future } => { + let item = + futures::ready!(future.poll_unpin(cx)).expect("valid subscription stream"); + *self = Self::Usable { stream: item }; + } + Self::Usable { stream } => { + return stream.poll_next_unpin(cx); + } + } + } + } +} + +struct HotspotUser { + identity: IdentityDocument, + identity_stream: HotspotStreamState, + friend_stream: HotspotStreamState, + conversation_stream: HotspotStreamState, + last_seen: DateTime, + last_seen_timer: Delay, +} + +impl HotspotUser { + pub fn new(ipfs: &Ipfs, identity_document: IdentityDocument) -> Self { + let identity_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.inbox(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + let friend_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.events(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + let conversation_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.messaging(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + + Self { + identity: identity_document, + identity_stream, + friend_stream, + conversation_stream, + last_seen: Utc::now(), + last_seen_timer: Delay::new(Duration::from_secs(2 * 60)), + } + } +} + +impl HotspotUser { + pub fn update_identity_document(&mut self, identity_document: IdentityDocument) { + self.identity = identity_document; + self.last_seen = Utc::now(); + self.last_seen_timer = Delay::new(Duration::from_secs(2 * 60)); + } +} + +impl Future for HotspotUser { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + if this.last_seen_timer.poll_unpin(cx).is_ready() { + return Poll::Ready(()); + } + + loop { + match this.identity_stream.poll_state(cx) { + Poll::Ready(Some(_)) => { + // TODO: Maybe determine sender and compare it against the identity here and update last seen + } + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + loop { + match this.friend_stream.poll_state(cx) { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + loop { + match this.conversation_stream.poll_state(cx) { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + Poll::Pending + } +} + +mod ext_behaviour { + use rust_ipfs::libp2p::core::transport::PortUse; + use rust_ipfs::libp2p::{ + core::Endpoint, + swarm::{ + ConnectionDenied, ConnectionId, ExternalAddrExpired, FromSwarm, ListenerClosed, + NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, + }; + use rust_ipfs::{ListenerId, NetworkBehaviour}; + use std::{ + collections::{HashMap, HashSet}, + task::{Context, Poll}, + }; + + #[derive(Debug)] + pub struct Behaviour { + peer_id: PeerId, + addrs: HashSet, + listened: HashMap>, + external: bool, + } + + impl Behaviour { + pub fn new(local_peer_id: PeerId, external: bool) -> Self { + println!("PeerID: {}", local_peer_id); + Self { + peer_id: local_peer_id, + addrs: Default::default(), + listened: Default::default(), + external, + } + } + } + + impl NetworkBehaviour for Behaviour { + type ConnectionHandler = rust_ipfs::libp2p::swarm::dummy::ConnectionHandler; + type ToSwarm = void::Void; + + fn handle_pending_inbound_connection( + &mut self, + _: ConnectionId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) + } + + fn handle_pending_outbound_connection( + &mut self, + _: ConnectionId, + _: Option, + _: &[Multiaddr], + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(vec![]) + } + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + _: PortUse, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn on_connection_handler_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::NewListenAddr(NewListenAddr { + listener_id, addr, .. + }) => { + let addr = addr.clone(); + + let addr = match addr.with_p2p(self.peer_id) { + Ok(a) => a, + Err(a) => a, + }; + + if !self.external && self.addrs.insert(addr.clone()) { + self.listened + .entry(listener_id) + .or_default() + .insert(addr.clone()); + + println!("Listening on {addr}"); + } + } + + FromSwarm::ExternalAddrConfirmed(ev) => { + let addr = ev.addr.clone(); + let addr = match addr.with_p2p(self.peer_id) { + Ok(a) => a, + Err(a) => a, + }; + + if self.external && self.addrs.insert(addr.clone()) { + println!("Listening on {}", addr); + } + } + FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }) => { + let addr = addr.clone(); + let addr = addr.with_p2p(self.peer_id).unwrap(); + + if self.addrs.remove(&addr) { + println!("No longer listening on {addr}"); + } + } + FromSwarm::ListenerClosed(ListenerClosed { listener_id, .. }) => { + if let Some(addrs) = self.listened.remove(&listener_id) { + for addr in addrs { + let addr = addr.with_p2p(self.peer_id).unwrap(); + self.addrs.remove(&addr); + println!("No longer listening on {addr}"); + } + } + } + _ => {} + } + } + + fn poll(&mut self, _: &mut Context) -> Poll>> { + Poll::Pending + } + } +} diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index aa956cdee..26a4d03bb 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -78,6 +78,7 @@ use warp::{Extension, SingleHandle}; mod behaviour; pub mod config; +pub mod hotspot; pub(crate) mod rt; pub mod shuttle; pub mod store; From 38d68edee9f1f46c11da66d503bdf530ada09646 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 16 Dec 2024 23:35:07 -0600 Subject: [PATCH 02/13] chore: fmt --- extensions/warp-ipfs/hotspot/Cargo.toml | 6 ------ extensions/warp-ipfs/hotspot/src/main.rs | 6 +++--- extensions/warp-ipfs/src/hotspot/mod.rs | 12 +++++------- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/extensions/warp-ipfs/hotspot/Cargo.toml b/extensions/warp-ipfs/hotspot/Cargo.toml index e5ba83117..796512ea2 100644 --- a/extensions/warp-ipfs/hotspot/Cargo.toml +++ b/extensions/warp-ipfs/hotspot/Cargo.toml @@ -10,16 +10,10 @@ anyhow.workspace = true serde.workspace = true tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tracing-appender = "0.2" -pollable-map = "0.1.0-alpha.1" clap = { version = "4.4", features = ["derive"] } zeroize.workspace = true -dotenv = "0.15" base64 = "0.21" -bs58.workspace = true - tokio = { workspace = true } toml = "0.5.11" -serde_json = "1.0.133" \ No newline at end of file diff --git a/extensions/warp-ipfs/hotspot/src/main.rs b/extensions/warp-ipfs/hotspot/src/main.rs index 36d725f96..307b42205 100644 --- a/extensions/warp-ipfs/hotspot/src/main.rs +++ b/extensions/warp-ipfs/hotspot/src/main.rs @@ -3,11 +3,11 @@ use base64::engine::general_purpose::PAD; use base64::engine::GeneralPurpose; use base64::Engine; use clap::Parser; +use rust_ipfs::p2p::{RateLimit, RelayConfig}; +use rust_ipfs::{Keypair, Multiaddr}; use serde::{Deserialize, Serialize}; use std::convert::TryInto; use std::{num::NonZeroU32, path::PathBuf, time::Duration}; -use rust_ipfs::{Keypair, Multiaddr}; -use rust_ipfs::p2p::{RateLimit, RelayConfig}; use warp_ipfs::hotspot; use zeroize::Zeroizing; @@ -246,7 +246,7 @@ async fn main() -> anyhow::Result<()> { .await?; tokio::signal::ctrl_c().await?; - + Ok(()) } diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index 177a0f859..c72ba3b65 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -8,19 +8,16 @@ use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use pollable_map::futures::FutureMap; use rust_ipfs::libp2p::gossipsub::Message; +use rust_ipfs::libp2p::swarm::behaviour::toggle::Toggle; use rust_ipfs::p2p::{ IdentifyConfiguration, PubsubConfig, RelayConfig, TransportConfig, UpgradeVersion, }; use rust_ipfs::Keypair; -use rust_ipfs::{ - FDLimit, Ipfs, Multiaddr, SubscriptionStream, - UninitializedIpfs, -}; +use rust_ipfs::{FDLimit, Ipfs, Multiaddr, SubscriptionStream, UninitializedIpfs}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use rust_ipfs::libp2p::swarm::behaviour::toggle::Toggle; use warp::crypto::DID; pub struct Hotspot { @@ -53,7 +50,7 @@ impl Hotspot { ], addrs => addrs.to_vec(), }; - + let mut uninitialized = UninitializedIpfs::new() .with_identify(IdentifyConfiguration { agent_version: format!("shuttle/hotspot/{}", env!("CARGO_PKG_VERSION")), @@ -65,7 +62,8 @@ impl Hotspot { .with_pubsub(PubsubConfig { max_transmit_size: 4 * 1024 * 1024, ..Default::default() - }).set_listening_addrs(addrs) + }) + .set_listening_addrs(addrs) .set_transport_configuration(TransportConfig { enable_webrtc: webrtc_enable, enable_memory_transport: memory_transport, From 4278331af96edd495da521d8485e8cff2355178b Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 18 Dec 2024 22:32:43 -0600 Subject: [PATCH 03/13] chore: enable auto push by default and announcing to mesh by default; add provider preload list --- extensions/warp-ipfs/hotspot/Cargo.toml | 4 +-- extensions/warp-ipfs/src/config.rs | 11 +----- extensions/warp-ipfs/src/hotspot/mod.rs | 3 ++ extensions/warp-ipfs/src/lib.rs | 35 +++++++++++++++--- extensions/warp-ipfs/src/store/identity.rs | 42 ++++++++++------------ 5 files changed, 56 insertions(+), 39 deletions(-) diff --git a/extensions/warp-ipfs/hotspot/Cargo.toml b/extensions/warp-ipfs/hotspot/Cargo.toml index 796512ea2..984c5a5eb 100644 --- a/extensions/warp-ipfs/hotspot/Cargo.toml +++ b/extensions/warp-ipfs/hotspot/Cargo.toml @@ -13,7 +13,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } clap = { version = "4.4", features = ["derive"] } zeroize.workspace = true -base64 = "0.21" +base64 = "0.22" tokio = { workspace = true } -toml = "0.5.11" +toml = "0.8.19" diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 827652675..a5a53fbe8 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -107,6 +107,7 @@ pub struct IpfsSetting { /// Used for testing with a memory transport pub memory_transport: bool, pub dht_client: bool, + pub preload: Vec, } pub type DefaultPfpFn = std::sync::Arc< @@ -118,12 +119,6 @@ pub struct StoreSetting { /// Allow only interactions with friends /// Note: This is ignored when it comes to chating between group chat recipients pub with_friends: bool, - /// Interval for broadcasting out identity (cannot be less than 3 minutes) - /// Note: - /// - If `None`, this will be disabled - /// - Will default to 3 minutes if less than - /// - This may be removed in the future - pub auto_push: Option, /// Discovery type pub discovery: Discovery, @@ -133,8 +128,6 @@ pub struct StoreSetting { pub friend_request_response_duration: Option, /// Disable providing images for identities pub disable_images: bool, - /// Announce to mesh network - pub announce_to_mesh: bool, /// Function to call to provide data for a default profile picture if one is not apart of the identity pub default_profile_picture: Option, } @@ -148,7 +141,6 @@ impl std::fmt::Debug for StoreSetting { impl Default for StoreSetting { fn default() -> Self { Self { - auto_push: None, discovery: Discovery::Namespace { namespace: None, discovery_type: Default::default(), @@ -158,7 +150,6 @@ impl Default for StoreSetting { disable_images: false, with_friends: false, default_profile_picture: None, - announce_to_mesh: false, } } } diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index c72ba3b65..d8494a18e 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -157,6 +157,8 @@ impl HotspotTask { // TODO: Maybe perform in match condition to prevent nesdless update if the document entry is new? user.update_identity_document(document.clone()); + // TODO: manually propagate initial message to mesh network + Ok(()) } } @@ -253,6 +255,7 @@ impl HotspotUser { self.identity = identity_document; self.last_seen = Utc::now(); self.last_seen_timer = Delay::new(Duration::from_secs(2 * 60)); + tracing::info!(identity = %self.identity.did, last_seen = %self.last_seen, "last seen identity."); } } diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 26a4d03bb..193f70824 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -13,6 +13,7 @@ use ipfs::{DhtMode, Ipfs, Keypair, Protocol, UninitializedIpfs}; use parking_lot::RwLock; use rust_ipfs as ipfs; use rust_ipfs::p2p::{RequestResponseConfig, UpgradeVersion}; +use rust_ipfs::AddPeerOpt; use std::any::Any; use std::collections::HashSet; use std::ffi::OsStr; @@ -525,6 +526,7 @@ impl WarpIpfs { } } + if let config::Discovery::Shuttle { addresses } = self.inner.config.store_setting().discovery.clone() { @@ -633,6 +635,31 @@ impl WarpIpfs { } } + let preload = self.inner.config.ipfs_setting().preload.clone(); + + self.executor.dispatch({ + let ipfs = ipfs.clone(); + async move { + for addr in preload { + let Some(peer_id) = addr.peer_id() else { + tracing::warn!("{addr} does not contain a peer id. Skipping"); + continue; + }; + + let opt = AddPeerOpt::with_peer_id(peer_id).add_address(addr.clone()).set_dial(); + + if let Err(e) = ipfs.add_peer(addr.clone()).await { + tracing::error!(error = %e, "unable to add {addr} to address book"); + continue; + } + + if !ipfs.is_connected(peer_id).await.unwrap_or_default() { + let _ = ipfs.connect(peer_id).await; + } + } + } + }); + let discovery = Discovery::new(&ipfs, &self.inner.config.store_setting().discovery, &relays); @@ -647,7 +674,7 @@ impl WarpIpfs { &discovery, &span, ) - .await?; + .await?; tracing::info!("Identity initialized"); @@ -660,7 +687,7 @@ impl WarpIpfs { self.constellation_tx.clone(), &span, ) - .await; + .await; let message_store = MessageStore::new( &ipfs, @@ -669,7 +696,7 @@ impl WarpIpfs { self.raygun_tx.clone(), &identity_store, ) - .await; + .await; tracing::info!("Messaging store initialized"); @@ -1125,7 +1152,7 @@ impl LocalIdentity for WarpIpfs { format.into(), Some(MAX_IMAGE_SIZE), ) - .await?; + .await?; tracing::debug!("Image cid: {cid}"); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 3b9e779e0..1e5613880 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -439,13 +439,7 @@ impl IdentityStore { futures::pin_mut!(event_stream); futures::pin_mut!(friend_stream); - let auto_push = store.config.store_setting().auto_push.is_some(); - - let interval = store - .config - .store_setting() - .auto_push - .unwrap_or(Duration::from_millis(300000)); + let interval = Duration::from_secs(60); let mut tick = Delay::new(interval); @@ -472,7 +466,13 @@ impl IdentityStore { }; let identity = payload.message().clone(); - + + if identity.verify().is_err() { + tracing::warn!(from = %from_did, "invalid identity document"); + //TODO: Blacklist? + continue; + } + //Maybe establish a connection? //Note: Although it would be prefer not to establish a connection, it may be ideal to check to determine // the actual source of the payload to determine if its a message propagated over the mesh from the peer @@ -595,9 +595,7 @@ impl IdentityStore { } } _ = &mut tick => { - if auto_push { - store.push_to_all().await; - } + store.push_to_all().await; tick.reset(interval) } } @@ -878,18 +876,16 @@ impl IdentityStore { } pub async fn announce_identity_to_mesh(&self) -> Result<(), Error> { - if self.config.store_setting().announce_to_mesh { - let kp = self.ipfs.keypair(); - let document = self.own_identity_document().await?; - tracing::debug!("announcing identity to mesh"); - let payload = PayloadBuilder::new(kp, document) - .from_ipfs(&self.ipfs) - .await?; - let bytes = payload.to_bytes()?; - match self.ipfs.pubsub_publish(IDENTITY_ANNOUNCEMENT, bytes).await { - Ok(_) => tracing::debug!("identity announced to mesh"), - Err(_) => tracing::warn!("unable to announce identity to mesh"), - } + let kp = self.ipfs.keypair(); + let document = self.own_identity_document().await?; + tracing::debug!("announcing identity to mesh"); + let payload = PayloadBuilder::new(kp, document) + .from_ipfs(&self.ipfs) + .await?; + let bytes = payload.to_bytes()?; + match self.ipfs.pubsub_publish(IDENTITY_ANNOUNCEMENT, bytes).await { + Ok(_) => tracing::debug!("identity announced to mesh"), + Err(_) => tracing::warn!("unable to announce identity to mesh"), } Ok(()) From 5ee3b2a01697a522893b378825437e3876a7398b Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 18 Dec 2024 22:35:43 -0600 Subject: [PATCH 04/13] chore: fix and fmt --- extensions/warp-ipfs/src/lib.rs | 16 +- extensions/warp-ipfs/src/store/identity.rs | 166 ++++++++++----------- 2 files changed, 92 insertions(+), 90 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 193f70824..d7c500e50 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -526,7 +526,6 @@ impl WarpIpfs { } } - if let config::Discovery::Shuttle { addresses } = self.inner.config.store_setting().discovery.clone() { @@ -646,9 +645,12 @@ impl WarpIpfs { continue; }; - let opt = AddPeerOpt::with_peer_id(peer_id).add_address(addr.clone()).set_dial(); + let opt = AddPeerOpt::with_peer_id(peer_id) + .add_address(addr.clone()) + .set_dial(true) + .set_keepalive(true); - if let Err(e) = ipfs.add_peer(addr.clone()).await { + if let Err(e) = ipfs.add_peer(opt).await { tracing::error!(error = %e, "unable to add {addr} to address book"); continue; } @@ -674,7 +676,7 @@ impl WarpIpfs { &discovery, &span, ) - .await?; + .await?; tracing::info!("Identity initialized"); @@ -687,7 +689,7 @@ impl WarpIpfs { self.constellation_tx.clone(), &span, ) - .await; + .await; let message_store = MessageStore::new( &ipfs, @@ -696,7 +698,7 @@ impl WarpIpfs { self.raygun_tx.clone(), &identity_store, ) - .await; + .await; tracing::info!("Messaging store initialized"); @@ -1152,7 +1154,7 @@ impl LocalIdentity for WarpIpfs { format.into(), Some(MAX_IMAGE_SIZE), ) - .await?; + .await?; tracing::debug!("Image cid: {cid}"); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 1e5613880..140efcad5 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -466,13 +466,13 @@ impl IdentityStore { }; let identity = payload.message().clone(); - + if identity.verify().is_err() { tracing::warn!(from = %from_did, "invalid identity document"); //TODO: Blacklist? continue; } - + //Maybe establish a connection? //Note: Although it would be prefer not to establish a connection, it may be ideal to check to determine // the actual source of the payload to determine if its a message propagated over the mesh from the peer @@ -1305,14 +1305,14 @@ impl IdentityStore { let store = self.clone(); let did = in_did.clone(); async move { - let peer_id = vec![did.to_peer_id()?]; - let _ = super::document::image_dag::get_image( - &ipfs, - identity_profile_picture, - &peer_id, - false, - Some(MAX_IMAGE_SIZE), - ) + let peer_id = vec![did.to_peer_id()?]; + let _ = super::document::image_dag::get_image( + &ipfs, + identity_profile_picture, + &peer_id, + false, + Some(MAX_IMAGE_SIZE), + ) .await .map_err(|e| { tracing::error!( @@ -1321,17 +1321,17 @@ impl IdentityStore { e })?; - tracing::trace!("Image pointed to {identity_profile_picture} for {did} downloaded"); + tracing::trace!("Image pointed to {identity_profile_picture} for {did} downloaded"); - store - .emit_event( - MultiPassEventKind::IdentityUpdate { - did, - }, - ) - .await; + store + .emit_event( + MultiPassEventKind::IdentityUpdate { + did, + }, + ) + .await; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(()) } }); } @@ -1370,15 +1370,15 @@ impl IdentityStore { let did = in_did.clone(); let store = self.clone(); async move { - let peer_id = vec![did.to_peer_id()?]; - - let _ = super::document::image_dag::get_image( - &ipfs, - identity_profile_banner, - &peer_id, - false, - Some(MAX_IMAGE_SIZE), - ) + let peer_id = vec![did.to_peer_id()?]; + + let _ = super::document::image_dag::get_image( + &ipfs, + identity_profile_banner, + &peer_id, + false, + Some(MAX_IMAGE_SIZE), + ) .await .map_err(|e| { tracing::error!( @@ -1387,17 +1387,17 @@ impl IdentityStore { e })?; - tracing::trace!("Image pointed to {identity_profile_banner} for {did} downloaded"); + tracing::trace!("Image pointed to {identity_profile_banner} for {did} downloaded"); - store - .emit_event( - MultiPassEventKind::IdentityUpdate { - did, - }, - ) - .await; + store + .emit_event( + MultiPassEventKind::IdentityUpdate { + did, + }, + ) + .await; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(()) } }); } @@ -1430,34 +1430,34 @@ impl IdentityStore { let did = in_did.clone(); let store = self.clone(); async move { - let peer_id = vec![did.to_peer_id()?]; - let _ = - super::document::image_dag::get_image( - &ipfs, - picture, - &peer_id, - false, - Some(MAX_IMAGE_SIZE), - ) - .await - .map_err(|e| { - tracing::error!( + let peer_id = vec![did.to_peer_id()?]; + let _ = + super::document::image_dag::get_image( + &ipfs, + picture, + &peer_id, + false, + Some(MAX_IMAGE_SIZE), + ) + .await + .map_err(|e| { + tracing::error!( "Error fetching image from {did}: {e}" ); - e - })?; + e + })?; - tracing::trace!("Image pointed to {picture} for {did} downloaded"); + tracing::trace!("Image pointed to {picture} for {did} downloaded"); - store - .emit_event( - MultiPassEventKind::IdentityUpdate { - did, - }, - ) - .await; + store + .emit_event( + MultiPassEventKind::IdentityUpdate { + did, + }, + ) + .await; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(()) } }); } @@ -1468,34 +1468,34 @@ impl IdentityStore { let did = in_did.clone(); async move { - let peer_id = vec![did.to_peer_id()?]; - let _ = - super::document::image_dag::get_image( - &ipfs, - banner, - &peer_id, - false, - Some(MAX_IMAGE_SIZE), - ) - .await - .map_err(|e| { - tracing::error!( + let peer_id = vec![did.to_peer_id()?]; + let _ = + super::document::image_dag::get_image( + &ipfs, + banner, + &peer_id, + false, + Some(MAX_IMAGE_SIZE), + ) + .await + .map_err(|e| { + tracing::error!( "Error fetching image from {did}: {e}" ); - e - })?; + e + })?; - tracing::trace!("Image pointed to {banner} for {did} downloaded"); + tracing::trace!("Image pointed to {banner} for {did} downloaded"); - store - .emit_event( - MultiPassEventKind::IdentityUpdate { - did, - }, - ) - .await; + store + .emit_event( + MultiPassEventKind::IdentityUpdate { + did, + }, + ) + .await; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(()) } }); } From b9b01deca259d444617eb9c9a0402143d229507f Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 18 Dec 2024 23:25:18 -0600 Subject: [PATCH 05/13] chore: update example --- extensions/warp-ipfs/examples/identity-interface.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extensions/warp-ipfs/examples/identity-interface.rs b/extensions/warp-ipfs/examples/identity-interface.rs index 43a1bc3fe..da96fac68 100644 --- a/extensions/warp-ipfs/examples/identity-interface.rs +++ b/extensions/warp-ipfs/examples/identity-interface.rs @@ -53,6 +53,8 @@ struct Opt { import: Option, #[clap(long)] phrase: Option, + #[clap(long)] + bootstrap_preload: Vec, } async fn account( @@ -93,6 +95,8 @@ async fn account( *config.enable_relay_mut() = false; } + config.ipfs_setting_mut().preload = opt.bootstrap_preload.clone(); + if opt.upnp { config.ipfs_setting_mut().portmapping = true; } From cb3c721e212c318cb46b5c38987054a9306afd0d Mon Sep 17 00:00:00 2001 From: Darius Date: Wed, 18 Dec 2024 23:51:41 -0600 Subject: [PATCH 06/13] chore: update test --- extensions/warp-ipfs/tests/common.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/warp-ipfs/tests/common.rs b/extensions/warp-ipfs/tests/common.rs index 270fcaebb..5309859b3 100644 --- a/extensions/warp-ipfs/tests/common.rs +++ b/extensions/warp-ipfs/tests/common.rs @@ -64,8 +64,6 @@ pub async fn create_account( config.store_setting_mut().discovery = Discovery::None; config.ipfs_setting_mut().relay_client.relay_address = vec![]; config.ipfs_setting_mut().mdns.enable = false; - config.store_setting_mut().announce_to_mesh = true; - config.store_setting_mut().auto_push = Some(Duration::from_secs(1)); *config.bootstrap_mut() = Bootstrap::None; From 93f8a93f3061cc3635a7f577ecf5d624c2644a9b Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 19 Dec 2024 06:43:43 -0600 Subject: [PATCH 07/13] chore: readd duration for test --- extensions/warp-ipfs/src/config.rs | 19 ++++++++++++------- extensions/warp-ipfs/src/store/identity.rs | 4 +++- extensions/warp-ipfs/tests/common.rs | 1 + 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index a5a53fbe8..399e4ff23 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -76,24 +76,24 @@ impl Default for RelayClient { #[cfg(not(target_arch = "wasm32"))] relay_address: vec![ //NYC-1 - "/ip4/146.190.184.59/tcp/4001/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), + "/ip4/146.190.184.59/tcp/4001/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), "/ip4/146.190.184.59/udp/4001/quic-v1/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), //SF-1 - "/ip4/64.225.88.100/udp/4001/quic-v1/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), - "/ip4/64.225.88.100/tcp/4001/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), + "/ip4/64.225.88.100/udp/4001/quic-v1/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), + "/ip4/64.225.88.100/tcp/4001/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), //NYC-1-EXP "/ip4/24.199.86.91/udp/46315/quic-v1/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap(), "/ip4/24.199.86.91/tcp/46315/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap() ], // Relays that are meant to be used from a web standpoint. // Note: webrtc addresses are prone to change due an upstream issue and shouldnt be relied on for primary connections - #[cfg(target_arch="wasm32")] + #[cfg(target_arch = "wasm32")] relay_address: vec![ //NYC-1 "/dns4/nyc-3-dev.relay.satellite.im/tcp/4410/wss/p2p/12D3KooWJWw4KG2KKpUxQAc8kZZDqmownRvjWGxnr5Y6XRur8WSx".parse().unwrap(), ], background: true, - quorum: Default::default() + quorum: Default::default(), } } } @@ -117,7 +117,7 @@ pub type DefaultPfpFn = std::sync::Arc< #[derive(Clone)] pub struct StoreSetting { /// Allow only interactions with friends - /// Note: This is ignored when it comes to chating between group chat recipients + /// Note: This is ignored when it comes to chatting between group chat recipients pub with_friends: bool, /// Discovery type pub discovery: Discovery, @@ -128,8 +128,12 @@ pub struct StoreSetting { pub friend_request_response_duration: Option, /// Disable providing images for identities pub disable_images: bool, - /// Function to call to provide data for a default profile picture if one is not apart of the identity + /// Function to call to provide data for a default profile picture if one is not a part of the identity pub default_profile_picture: Option, + /// Duration in seconds before the identity document is announced + /// Note: This field should be left as default unless it is used for testing + pub auto_push_duration: Duration, + } impl std::fmt::Debug for StoreSetting { @@ -150,6 +154,7 @@ impl Default for StoreSetting { disable_images: false, with_friends: false, default_profile_picture: None, + auto_push_duration: Duration::from_secs(60), } } } diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 140efcad5..00b212f1f 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -439,7 +439,9 @@ impl IdentityStore { futures::pin_mut!(event_stream); futures::pin_mut!(friend_stream); - let interval = Duration::from_secs(60); + let interval = store.config.store_setting().auto_push_duration; + + assert!(interval != Duration::ZERO); let mut tick = Delay::new(interval); diff --git a/extensions/warp-ipfs/tests/common.rs b/extensions/warp-ipfs/tests/common.rs index 5309859b3..e4abe841e 100644 --- a/extensions/warp-ipfs/tests/common.rs +++ b/extensions/warp-ipfs/tests/common.rs @@ -62,6 +62,7 @@ pub async fn create_account( *config.listen_on_mut() = vec![Multiaddr::empty().with(Protocol::Memory(0))]; config.ipfs_setting_mut().memory_transport = true; config.store_setting_mut().discovery = Discovery::None; + config.store_setting_mut().auto_push_duration = Duration::from_secs(1); config.ipfs_setting_mut().relay_client.relay_address = vec![]; config.ipfs_setting_mut().mdns.enable = false; From 65605f41ec8a9183240277c1fb73fd4edf27e6dc Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 19 Dec 2024 07:24:13 -0600 Subject: [PATCH 08/13] chore: fmt --- extensions/warp-ipfs/src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 399e4ff23..88d158281 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -133,7 +133,6 @@ pub struct StoreSetting { /// Duration in seconds before the identity document is announced /// Note: This field should be left as default unless it is used for testing pub auto_push_duration: Duration, - } impl std::fmt::Debug for StoreSetting { From 74f40d43cd2cd1920c052a329d236bd6bdccfce6 Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 19 Dec 2024 07:37:51 -0600 Subject: [PATCH 09/13] chore: fmt --- extensions/warp-ipfs/src/hotspot/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index d8494a18e..db32b19ab 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -26,6 +26,7 @@ pub struct Hotspot { } impl Hotspot { + #[allow(clippy::too_many_arguments)] pub async fn new( keypair: &Keypair, enable_wss: bool, @@ -58,7 +59,7 @@ impl Hotspot { }) .with_ping(Default::default()) .fd_limit(FDLimit::Max) - .set_keypair(&keypair) + .set_keypair(keypair) .with_pubsub(PubsubConfig { max_transmit_size: 4 * 1024 * 1024, ..Default::default() From 85bc6f9e0438c9f8ee5c8f1fd838f19d3d5c9502 Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 19 Dec 2024 12:01:32 -0600 Subject: [PATCH 10/13] chore: remove push iter --- extensions/warp-ipfs/src/store/identity.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 00b212f1f..7d012f8bb 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -855,25 +855,7 @@ impl IdentityStore { Ok(()) } - async fn push_iter>(&self, list: I) { - for did in list { - if let Err(e) = self.push(&did).await { - tracing::error!("Error pushing identity to {did}: {e}"); - } - } - } - pub async fn push_to_all(&self) { - //TODO: Possibly announce only to mesh, though this *might* require changing the logic to establish connection - // if profile pictures and banners are supplied in this push too. - let list = self - .discovery - .list() - .await - .iter() - .filter_map(|entry| entry.peer_id().to_did().ok()) - .collect::>(); - self.push_iter(list).await; let _ = self.announce_identity_to_mesh().await; } From 68b72f51669a7fad8f7fd643b4164b36c325803c Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 20 Dec 2024 17:37:35 -0600 Subject: [PATCH 11/13] fix: try the result from the payload message --- extensions/warp-ipfs/src/hotspot/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index db32b19ab..a723d0b42 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -138,7 +138,7 @@ impl HotspotTask { anyhow::bail!("sender is not the original sender"); } - let document = payload.message(); + let document = payload.message(None)?; let document_did = &document.did; From adfe6ad0ac1f8b83c1eac20019db0f93106a87b1 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 23 Dec 2024 18:27:13 -0500 Subject: [PATCH 12/13] chore: move update into match condition --- extensions/warp-ipfs/src/hotspot/mod.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index a723d0b42..fa1f1dc29 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -140,24 +140,19 @@ impl HotspotTask { let document = payload.message(None)?; - let document_did = &document.did; - document.verify()?; - let user = match self.valid_identities.get_mut(document_did) { - Some(fut) => fut, + match self.valid_identities.get_mut(&document.did) { + Some(fut) => { + fut.update_identity_document(document); + } None => { - let user = HotspotUser::new(&self.ipfs, document.clone()); - self.valid_identities.insert(document_did.clone(), user); - self.valid_identities - .get_mut(document_did) - .expect("identity exist due to previous insertion") + let document_did = document.did.clone(); + let user = HotspotUser::new(&self.ipfs, document); + self.valid_identities.insert(document_did, user); } }; - // TODO: Maybe perform in match condition to prevent nesdless update if the document entry is new? - user.update_identity_document(document.clone()); - // TODO: manually propagate initial message to mesh network Ok(()) From 8a64bef399f69c6561d579ffcb480c4a690bc7b0 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 23 Dec 2024 18:29:48 -0500 Subject: [PATCH 13/13] chore: add check to modified time --- extensions/warp-ipfs/src/hotspot/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs index fa1f1dc29..147eb795e 100644 --- a/extensions/warp-ipfs/src/hotspot/mod.rs +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -248,6 +248,10 @@ impl HotspotUser { impl HotspotUser { pub fn update_identity_document(&mut self, identity_document: IdentityDocument) { + if self.identity.modified > identity_document.modified { + tracing::warn!(identity = %self.identity.did, "identity is older than previous entry. Ignoring."); + return; + } self.identity = identity_document; self.last_seen = Utc::now(); self.last_seen_timer = Delay::new(Duration::from_secs(2 * 60));