diff --git a/Cargo.toml b/Cargo.toml index 7f5937428..00c1c6986 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "extensions/*", "warp", "tools/*", + "extensions/warp-ipfs/hotspot", "extensions/warp-ipfs/shuttle", "extensions/warp-ipfs/examples/wasm-ipfs-identity", "extensions/warp-ipfs/examples/wasm-ipfs-friends", diff --git a/extensions/warp-ipfs/examples/identity-interface.rs b/extensions/warp-ipfs/examples/identity-interface.rs index 43a1bc3fe..da96fac68 100644 --- a/extensions/warp-ipfs/examples/identity-interface.rs +++ b/extensions/warp-ipfs/examples/identity-interface.rs @@ -53,6 +53,8 @@ struct Opt { import: Option, #[clap(long)] phrase: Option, + #[clap(long)] + bootstrap_preload: Vec, } async fn account( @@ -93,6 +95,8 @@ async fn account( *config.enable_relay_mut() = false; } + config.ipfs_setting_mut().preload = opt.bootstrap_preload.clone(); + if opt.upnp { config.ipfs_setting_mut().portmapping = true; } diff --git a/extensions/warp-ipfs/hotspot/Cargo.toml b/extensions/warp-ipfs/hotspot/Cargo.toml new file mode 100644 index 000000000..984c5a5eb --- /dev/null +++ b/extensions/warp-ipfs/hotspot/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "hotspot" +version = "0.1.0" +edition = "2021" + +[dependencies] +warp-ipfs = { path = "../" } +rust-ipfs = { workspace = true, features = ["webrtc_transport", "experimental_stream"] } +anyhow.workspace = true +serde.workspace = true +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +clap = { version = "4.4", features = ["derive"] } +zeroize.workspace = true +base64 = "0.22" + +tokio = { workspace = true } +toml = "0.8.19" diff --git a/extensions/warp-ipfs/hotspot/src/main.rs b/extensions/warp-ipfs/hotspot/src/main.rs new file mode 100644 index 000000000..307b42205 --- /dev/null +++ b/extensions/warp-ipfs/hotspot/src/main.rs @@ -0,0 +1,254 @@ +use base64::alphabet::STANDARD; +use base64::engine::general_purpose::PAD; +use base64::engine::GeneralPurpose; +use base64::Engine; +use clap::Parser; +use rust_ipfs::p2p::{RateLimit, RelayConfig}; +use rust_ipfs::{Keypair, Multiaddr}; +use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use std::{num::NonZeroU32, path::PathBuf, time::Duration}; +use warp_ipfs::hotspot; +use zeroize::Zeroizing; + +fn decode_kp(kp: &str) -> anyhow::Result { + let engine = GeneralPurpose::new(&STANDARD, PAD); + let keypair_bytes = Zeroizing::new(engine.decode(kp.as_bytes())?); + let keypair = Keypair::from_protobuf_encoding(&keypair_bytes)?; + Ok(keypair) +} + +fn encode_kp(kp: &Keypair) -> anyhow::Result { + let bytes = kp.to_protobuf_encoding()?; + let engine = GeneralPurpose::new(&STANDARD, PAD); + let kp_encoded = engine.encode(bytes); + Ok(kp_encoded) +} + +#[derive(Clone, Deserialize, Serialize)] +struct Config { + pub max_circuits: Option, + pub max_circuits_per_peer: Option, + pub max_circuit_duration: Option, + pub max_circuit_bytes: Option, + pub circuit_rate_limiters: Option>, + pub max_reservations_per_peer: Option, + pub max_reservations: Option, + pub reservation_duration: Option, + pub reservation_rate_limiters: Option>, +} + +#[derive(Clone, Copy, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum Rate { + PerPeer { + limit: NonZeroU32, + interval: Duration, + }, + PerIp { + limit: NonZeroU32, + interval: Duration, + }, +} + +impl From for RateLimit { + fn from(rate: Rate) -> Self { + match rate { + Rate::PerPeer { limit, interval } => RateLimit::PerPeer { limit, interval }, + Rate::PerIp { limit, interval } => RateLimit::PerIp { limit, interval }, + } + } +} + +impl Default for Config { + fn default() -> Self { + Self { + max_circuits: Some(32768), + max_circuits_per_peer: Some(32768), + max_circuit_duration: Some(Duration::from_secs(60 * 2)), + max_circuit_bytes: Some(512 * 1024 * 1024), + circuit_rate_limiters: Some(vec![ + Rate::PerPeer { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(60 * 2), + }, + Rate::PerIp { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + ]), + max_reservations_per_peer: Some(32768), + max_reservations: Some(32768), + reservation_duration: Some(Duration::from_secs(60 * 60)), + reservation_rate_limiters: Some(vec![ + Rate::PerPeer { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + Rate::PerIp { + limit: 32768.try_into().expect("greater than zero"), + interval: Duration::from_secs(30), + }, + ]), + } + } +} + +impl From for RelayConfig { + fn from(config: Config) -> Self { + let mut circuit_src_rate_limiters = vec![]; + let circuit_rate = config.circuit_rate_limiters.unwrap_or_default(); + circuit_src_rate_limiters.extend(circuit_rate.iter().map(|s| (*s).into())); + + let mut reservation_rate_limiters = vec![]; + let reservation_rate = config.reservation_rate_limiters.unwrap_or_default(); + reservation_rate_limiters.extend(reservation_rate.iter().map(|s| (*s).into())); + + RelayConfig { + max_circuits: config.max_circuits.unwrap_or(32768), + max_circuits_per_peer: config.max_circuits_per_peer.unwrap_or(32768), + max_circuit_duration: config + .max_circuit_duration + .unwrap_or(Duration::from_secs(2 * 60)), + max_circuit_bytes: config.max_circuit_bytes.unwrap_or(512 * 1024 * 1024), + circuit_src_rate_limiters, + max_reservations_per_peer: config.max_reservations_per_peer.unwrap_or(21768), + max_reservations: config.max_reservations.unwrap_or(32768), + reservation_duration: config + .reservation_duration + .unwrap_or(Duration::from_secs(60 * 60)), + reservation_rate_limiters, + } + } +} + +#[derive(Debug, Parser)] +#[clap(name = "shuttle-hotspot")] +struct Opt { + /// Listening addresses in multiaddr format. If empty, will listen on all addresses available + #[clap(long)] + listen_addr: Vec, + + /// External address in multiaddr format that would indicate how the node can be reached. + /// If empty, all listening addresses will be used as an external address + #[clap(long)] + external_addr: Vec, + + /// Path to key file + #[clap(long)] + keyfile: Option, + + /// Path to a configuration file to adjust relay setting + #[clap(long)] + relay_config: Option, + + /// Enables secured websockets + #[clap(long)] + enable_wss: bool, + + /// Enables webrtc + #[clap(long)] + enable_webrtc: bool, + + /// Use unbounded configuration with higher limits + #[clap(long)] + unbounded: bool, + + /// TLS Certificate when websocket is used + /// Note: websocket required a signed certificate. + #[clap(long)] + ws_tls_certificate: Option>, + + /// TLS Private Key when websocket is used + #[clap(long)] + ws_tls_private_key: Option, +} + +#[cfg(not(target_arch = "wasm32"))] +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let opts = Opt::parse(); + + let keypair = match opts.keyfile { + Some(kp) => match kp.is_file() { + true => { + tracing::info!("Reading keypair from {}", kp.display()); + let kp_str = tokio::fs::read_to_string(&kp).await?; + decode_kp(&kp_str)? + } + false => { + tracing::info!("Generating keypair"); + let k = Keypair::generate_ed25519(); + let encoded_kp = encode_kp(&k)?; + tracing::info!("Saving keypair to {}", kp.display()); + tokio::fs::write(kp, &encoded_kp).await?; + k + } + }, + None => { + tracing::info!("Generating keypair"); + Keypair::generate_ed25519() + } + }; + + let config = match opts.relay_config { + Some(path) => match path.is_file() { + true => { + let conf = tokio::fs::read_to_string(path).await?; + let config: Config = toml::from_str(&conf)?; + config + } + false => { + let config = Config::default(); + let bytes = toml::to_string(&config)?; + tokio::fs::write(path, &bytes).await?; + config + } + }, + None => Config::default(), + }; + + let (ws_cert, ws_pk) = match (opts.ws_tls_certificate, opts.ws_tls_private_key) { + (Some(cert), Some(prv)) => { + let mut certs = Vec::with_capacity(cert.len()); + for c in cert { + let Ok(cert) = tokio::fs::read_to_string(c).await else { + continue; + }; + certs.push(cert); + } + + let prv = tokio::fs::read_to_string(prv).await.ok(); + ((!certs.is_empty()).then_some(certs), prv) + } + _ => (None, None), + }; + + let wss_opt = ws_cert.and_then(|list| ws_pk.map(|k| (list, k))); + + let local_peer_id = keypair.public().to_peer_id(); + println!("Local PeerID: {local_peer_id}"); + + let _handle = hotspot::Hotspot::new( + &keypair, + opts.enable_wss, + wss_opt, + opts.enable_webrtc, + true, + false, + &opts.listen_addr, + &opts.external_addr, + Some(config.into()), + true, + ) + .await?; + + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +#[cfg(target_arch = "wasm32")] +fn main() {} diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 827652675..88d158281 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -76,24 +76,24 @@ impl Default for RelayClient { #[cfg(not(target_arch = "wasm32"))] relay_address: vec![ //NYC-1 - "/ip4/146.190.184.59/tcp/4001/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), + "/ip4/146.190.184.59/tcp/4001/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), "/ip4/146.190.184.59/udp/4001/quic-v1/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), //SF-1 - "/ip4/64.225.88.100/udp/4001/quic-v1/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), - "/ip4/64.225.88.100/tcp/4001/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), + "/ip4/64.225.88.100/udp/4001/quic-v1/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), + "/ip4/64.225.88.100/tcp/4001/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), //NYC-1-EXP "/ip4/24.199.86.91/udp/46315/quic-v1/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap(), "/ip4/24.199.86.91/tcp/46315/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap() ], // Relays that are meant to be used from a web standpoint. // Note: webrtc addresses are prone to change due an upstream issue and shouldnt be relied on for primary connections - #[cfg(target_arch="wasm32")] + #[cfg(target_arch = "wasm32")] relay_address: vec![ //NYC-1 "/dns4/nyc-3-dev.relay.satellite.im/tcp/4410/wss/p2p/12D3KooWJWw4KG2KKpUxQAc8kZZDqmownRvjWGxnr5Y6XRur8WSx".parse().unwrap(), ], background: true, - quorum: Default::default() + quorum: Default::default(), } } } @@ -107,6 +107,7 @@ pub struct IpfsSetting { /// Used for testing with a memory transport pub memory_transport: bool, pub dht_client: bool, + pub preload: Vec, } pub type DefaultPfpFn = std::sync::Arc< @@ -116,14 +117,8 @@ pub type DefaultPfpFn = std::sync::Arc< #[derive(Clone)] pub struct StoreSetting { /// Allow only interactions with friends - /// Note: This is ignored when it comes to chating between group chat recipients + /// Note: This is ignored when it comes to chatting between group chat recipients pub with_friends: bool, - /// Interval for broadcasting out identity (cannot be less than 3 minutes) - /// Note: - /// - If `None`, this will be disabled - /// - Will default to 3 minutes if less than - /// - This may be removed in the future - pub auto_push: Option, /// Discovery type pub discovery: Discovery, @@ -133,10 +128,11 @@ pub struct StoreSetting { pub friend_request_response_duration: Option, /// Disable providing images for identities pub disable_images: bool, - /// Announce to mesh network - pub announce_to_mesh: bool, - /// Function to call to provide data for a default profile picture if one is not apart of the identity + /// Function to call to provide data for a default profile picture if one is not a part of the identity pub default_profile_picture: Option, + /// Duration in seconds before the identity document is announced + /// Note: This field should be left as default unless it is used for testing + pub auto_push_duration: Duration, } impl std::fmt::Debug for StoreSetting { @@ -148,7 +144,6 @@ impl std::fmt::Debug for StoreSetting { impl Default for StoreSetting { fn default() -> Self { Self { - auto_push: None, discovery: Discovery::Namespace { namespace: None, discovery_type: Default::default(), @@ -158,7 +153,7 @@ impl Default for StoreSetting { disable_images: false, with_friends: false, default_profile_picture: None, - announce_to_mesh: false, + auto_push_duration: Duration::from_secs(60), } } } diff --git a/extensions/warp-ipfs/src/hotspot/mod.rs b/extensions/warp-ipfs/src/hotspot/mod.rs new file mode 100644 index 000000000..147eb795e --- /dev/null +++ b/extensions/warp-ipfs/src/hotspot/mod.rs @@ -0,0 +1,452 @@ +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; +use crate::store::document::identity::IdentityDocument; +use crate::store::payload::PayloadMessage; +use crate::store::topics::{PeerTopic, IDENTITY_ANNOUNCEMENT}; +use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use futures_timer::Delay; +use pollable_map::futures::FutureMap; +use rust_ipfs::libp2p::gossipsub::Message; +use rust_ipfs::libp2p::swarm::behaviour::toggle::Toggle; +use rust_ipfs::p2p::{ + IdentifyConfiguration, PubsubConfig, RelayConfig, TransportConfig, UpgradeVersion, +}; +use rust_ipfs::Keypair; +use rust_ipfs::{FDLimit, Ipfs, Multiaddr, SubscriptionStream, UninitializedIpfs}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; +use warp::crypto::DID; + +pub struct Hotspot { + _ipfs: Ipfs, + _handle: AbortableJoinHandle<()>, +} + +impl Hotspot { + #[allow(clippy::too_many_arguments)] + pub async fn new( + keypair: &Keypair, + enable_wss: bool, + wss_certs_and_key: Option<(Vec, String)>, + webrtc_enable: bool, + enable_relay_server: bool, + memory_transport: bool, + listen_addrs: &[Multiaddr], + external_addrs: &[Multiaddr], + relay_config: Option, + ext: bool, + ) -> anyhow::Result { + let executor = LocalExecutor; + + let addrs = match listen_addrs { + [] => vec![ + "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/0/ws".parse().unwrap(), + "/ip4/0.0.0.0/tcp/0/wss".parse().unwrap(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + "/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap(), + ], + addrs => addrs.to_vec(), + }; + + let mut uninitialized = UninitializedIpfs::new() + .with_identify(IdentifyConfiguration { + agent_version: format!("shuttle/hotspot/{}", env!("CARGO_PKG_VERSION")), + ..Default::default() + }) + .with_ping(Default::default()) + .fd_limit(FDLimit::Max) + .set_keypair(keypair) + .with_pubsub(PubsubConfig { + max_transmit_size: 4 * 1024 * 1024, + ..Default::default() + }) + .set_listening_addrs(addrs) + .set_transport_configuration(TransportConfig { + enable_webrtc: webrtc_enable, + enable_memory_transport: memory_transport, + enable_websocket: enable_wss, + enable_secure_websocket: enable_wss, + websocket_pem: wss_certs_and_key, + version: UpgradeVersion::Standard, + ..Default::default() + }) + .with_custom_behaviour(Toggle::from(ext.then_some(ext_behaviour::Behaviour::new( + keypair.public().to_peer_id(), + !external_addrs.is_empty(), + )))); + + if enable_relay_server { + uninitialized = uninitialized + .with_relay_server(relay_config.unwrap_or_default()) + .with_relay(true); + } + + if external_addrs.is_empty() { + uninitialized = uninitialized.listen_as_external_addr(); + } + + let _ipfs = uninitialized.start().await?; + + for external_addr in external_addrs { + _ipfs.add_external_address(external_addr.clone()).await?; + } + + let task = HotspotTask::new(&_ipfs).await; + + let _handle = executor.spawn_abortable(task); + + let hotspot = Hotspot { _ipfs, _handle }; + + Ok(hotspot) + } +} + +struct HotspotTask { + ipfs: Ipfs, + valid_identities: FutureMap, + announcement_stream_st: SubscriptionStream, +} + +impl HotspotTask { + pub async fn new(ipfs: &Ipfs) -> Self { + let announcement_stream_st = ipfs + .pubsub_subscribe(IDENTITY_ANNOUNCEMENT) + .await + .expect("valid subscription"); + let valid_identities = FutureMap::new(); + Self { + ipfs: ipfs.clone(), + valid_identities, + announcement_stream_st, + } + } +} + +impl HotspotTask { + fn handle_announcement_payload(&mut self, message: Message) -> anyhow::Result<()> { + let sender_peer_id = message.source.expect("valid peer id"); + + let payload = PayloadMessage::::from_bytes(&message.data)?; + + // We check the sender of the pubsub message to ensure that the peer is the original sender (either directly or indirectly) and not + // due to registration from another shuttle node + if sender_peer_id.ne(payload.sender()) || sender_peer_id.ne(payload.original_sender()) { + anyhow::bail!("sender is not the original sender"); + } + + let document = payload.message(None)?; + + document.verify()?; + + match self.valid_identities.get_mut(&document.did) { + Some(fut) => { + fut.update_identity_document(document); + } + None => { + let document_did = document.did.clone(); + let user = HotspotUser::new(&self.ipfs, document); + self.valid_identities.insert(document_did, user); + } + }; + + // TODO: manually propagate initial message to mesh network + + Ok(()) + } +} + +impl Future for HotspotTask { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + while let Poll::Ready(Some(message)) = self.announcement_stream_st.poll_next_unpin(cx) { + if let Err(e) = self.handle_announcement_payload(message) { + tracing::error!(error = %e, "unable to handle announcement payload"); + } + } + + while let Poll::Ready(Some((did, _))) = self.valid_identities.poll_next_unpin(cx) { + tracing::info!(identity = %did, "identity timed out"); + } + + Poll::Pending + } +} + +enum HotspotStreamState { + Pending { + future: BoxFuture<'static, anyhow::Result>, + }, + Usable { + stream: SubscriptionStream, + }, +} + +impl HotspotStreamState { + pub fn poll_state(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match self { + Self::Pending { future } => { + let item = + futures::ready!(future.poll_unpin(cx)).expect("valid subscription stream"); + *self = Self::Usable { stream: item }; + } + Self::Usable { stream } => { + return stream.poll_next_unpin(cx); + } + } + } + } +} + +struct HotspotUser { + identity: IdentityDocument, + identity_stream: HotspotStreamState, + friend_stream: HotspotStreamState, + conversation_stream: HotspotStreamState, + last_seen: DateTime, + last_seen_timer: Delay, +} + +impl HotspotUser { + pub fn new(ipfs: &Ipfs, identity_document: IdentityDocument) -> Self { + let identity_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.inbox(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + let friend_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.events(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + let conversation_stream = HotspotStreamState::Pending { + future: { + let ipfs = ipfs.clone(); + let topic = identity_document.did.messaging(); + Box::pin(async move { ipfs.pubsub_subscribe(topic).await }) + }, + }; + + Self { + identity: identity_document, + identity_stream, + friend_stream, + conversation_stream, + last_seen: Utc::now(), + last_seen_timer: Delay::new(Duration::from_secs(2 * 60)), + } + } +} + +impl HotspotUser { + pub fn update_identity_document(&mut self, identity_document: IdentityDocument) { + if self.identity.modified > identity_document.modified { + tracing::warn!(identity = %self.identity.did, "identity is older than previous entry. Ignoring."); + return; + } + self.identity = identity_document; + self.last_seen = Utc::now(); + self.last_seen_timer = Delay::new(Duration::from_secs(2 * 60)); + tracing::info!(identity = %self.identity.did, last_seen = %self.last_seen, "last seen identity."); + } +} + +impl Future for HotspotUser { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + if this.last_seen_timer.poll_unpin(cx).is_ready() { + return Poll::Ready(()); + } + + loop { + match this.identity_stream.poll_state(cx) { + Poll::Ready(Some(_)) => { + // TODO: Maybe determine sender and compare it against the identity here and update last seen + } + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + loop { + match this.friend_stream.poll_state(cx) { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + loop { + match this.conversation_stream.poll_state(cx) { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => { + break; + } + Poll::Pending => break, + } + } + + Poll::Pending + } +} + +mod ext_behaviour { + use rust_ipfs::libp2p::core::transport::PortUse; + use rust_ipfs::libp2p::{ + core::Endpoint, + swarm::{ + ConnectionDenied, ConnectionId, ExternalAddrExpired, FromSwarm, ListenerClosed, + NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, + }; + use rust_ipfs::{ListenerId, NetworkBehaviour}; + use std::{ + collections::{HashMap, HashSet}, + task::{Context, Poll}, + }; + + #[derive(Debug)] + pub struct Behaviour { + peer_id: PeerId, + addrs: HashSet, + listened: HashMap>, + external: bool, + } + + impl Behaviour { + pub fn new(local_peer_id: PeerId, external: bool) -> Self { + println!("PeerID: {}", local_peer_id); + Self { + peer_id: local_peer_id, + addrs: Default::default(), + listened: Default::default(), + external, + } + } + } + + impl NetworkBehaviour for Behaviour { + type ConnectionHandler = rust_ipfs::libp2p::swarm::dummy::ConnectionHandler; + type ToSwarm = void::Void; + + fn handle_pending_inbound_connection( + &mut self, + _: ConnectionId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) + } + + fn handle_pending_outbound_connection( + &mut self, + _: ConnectionId, + _: Option, + _: &[Multiaddr], + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(vec![]) + } + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + _: PortUse, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn on_connection_handler_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::NewListenAddr(NewListenAddr { + listener_id, addr, .. + }) => { + let addr = addr.clone(); + + let addr = match addr.with_p2p(self.peer_id) { + Ok(a) => a, + Err(a) => a, + }; + + if !self.external && self.addrs.insert(addr.clone()) { + self.listened + .entry(listener_id) + .or_default() + .insert(addr.clone()); + + println!("Listening on {addr}"); + } + } + + FromSwarm::ExternalAddrConfirmed(ev) => { + let addr = ev.addr.clone(); + let addr = match addr.with_p2p(self.peer_id) { + Ok(a) => a, + Err(a) => a, + }; + + if self.external && self.addrs.insert(addr.clone()) { + println!("Listening on {}", addr); + } + } + FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }) => { + let addr = addr.clone(); + let addr = addr.with_p2p(self.peer_id).unwrap(); + + if self.addrs.remove(&addr) { + println!("No longer listening on {addr}"); + } + } + FromSwarm::ListenerClosed(ListenerClosed { listener_id, .. }) => { + if let Some(addrs) = self.listened.remove(&listener_id) { + for addr in addrs { + let addr = addr.with_p2p(self.peer_id).unwrap(); + self.addrs.remove(&addr); + println!("No longer listening on {addr}"); + } + } + } + _ => {} + } + } + + fn poll(&mut self, _: &mut Context) -> Poll>> { + Poll::Pending + } + } +} diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index aa956cdee..d7c500e50 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -13,6 +13,7 @@ use ipfs::{DhtMode, Ipfs, Keypair, Protocol, UninitializedIpfs}; use parking_lot::RwLock; use rust_ipfs as ipfs; use rust_ipfs::p2p::{RequestResponseConfig, UpgradeVersion}; +use rust_ipfs::AddPeerOpt; use std::any::Any; use std::collections::HashSet; use std::ffi::OsStr; @@ -78,6 +79,7 @@ use warp::{Extension, SingleHandle}; mod behaviour; pub mod config; +pub mod hotspot; pub(crate) mod rt; pub mod shuttle; pub mod store; @@ -632,6 +634,34 @@ impl WarpIpfs { } } + let preload = self.inner.config.ipfs_setting().preload.clone(); + + self.executor.dispatch({ + let ipfs = ipfs.clone(); + async move { + for addr in preload { + let Some(peer_id) = addr.peer_id() else { + tracing::warn!("{addr} does not contain a peer id. Skipping"); + continue; + }; + + let opt = AddPeerOpt::with_peer_id(peer_id) + .add_address(addr.clone()) + .set_dial(true) + .set_keepalive(true); + + if let Err(e) = ipfs.add_peer(opt).await { + tracing::error!(error = %e, "unable to add {addr} to address book"); + continue; + } + + if !ipfs.is_connected(peer_id).await.unwrap_or_default() { + let _ = ipfs.connect(peer_id).await; + } + } + } + }); + let discovery = Discovery::new(&ipfs, &self.inner.config.store_setting().discovery, &relays); diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 9c81a826a..120e66d1c 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -439,13 +439,9 @@ impl IdentityStore { futures::pin_mut!(event_stream); futures::pin_mut!(friend_stream); - let auto_push = store.config.store_setting().auto_push.is_some(); + let interval = store.config.store_setting().auto_push_duration; - let interval = store - .config - .store_setting() - .auto_push - .unwrap_or(Duration::from_millis(300000)); + assert!(interval != Duration::ZERO); let mut tick = Delay::new(interval); @@ -479,6 +475,12 @@ impl IdentityStore { } }; + if identity.verify().is_err() { + tracing::warn!(from = %from_did, "invalid identity document"); + //TODO: Blacklist? + continue; + } + //Maybe establish a connection? //Note: Although it would be prefer not to establish a connection, it may be ideal to check to determine // the actual source of the payload to determine if its a message propagated over the mesh from the peer @@ -603,9 +605,7 @@ impl IdentityStore { } } _ = &mut tick => { - if auto_push { - store.push_to_all().await; - } + store.push_to_all().await; tick.reset(interval) } } @@ -863,41 +863,21 @@ impl IdentityStore { Ok(()) } - async fn push_iter>(&self, list: I) { - for did in list { - if let Err(e) = self.push(&did).await { - tracing::error!("Error pushing identity to {did}: {e}"); - } - } - } - pub async fn push_to_all(&self) { - //TODO: Possibly announce only to mesh, though this *might* require changing the logic to establish connection - // if profile pictures and banners are supplied in this push too. - let list = self - .discovery - .list() - .await - .iter() - .filter_map(|entry| entry.peer_id().to_did().ok()) - .collect::>(); - self.push_iter(list).await; let _ = self.announce_identity_to_mesh().await; } pub async fn announce_identity_to_mesh(&self) -> Result<(), Error> { - if self.config.store_setting().announce_to_mesh { - let kp = self.ipfs.keypair(); - let document = self.own_identity_document().await?; - tracing::debug!("announcing identity to mesh"); - let payload = PayloadBuilder::new(kp, document) - .from_ipfs(&self.ipfs) - .await?; - let bytes = payload.to_bytes()?; - match self.ipfs.pubsub_publish(IDENTITY_ANNOUNCEMENT, bytes).await { - Ok(_) => tracing::debug!("identity announced to mesh"), - Err(_) => tracing::warn!("unable to announce identity to mesh"), - } + let kp = self.ipfs.keypair(); + let document = self.own_identity_document().await?; + tracing::debug!("announcing identity to mesh"); + let payload = PayloadBuilder::new(kp, document) + .from_ipfs(&self.ipfs) + .await?; + let bytes = payload.to_bytes()?; + match self.ipfs.pubsub_publish(IDENTITY_ANNOUNCEMENT, bytes).await { + Ok(_) => tracing::debug!("identity announced to mesh"), + Err(_) => tracing::warn!("unable to announce identity to mesh"), } Ok(()) diff --git a/extensions/warp-ipfs/tests/common.rs b/extensions/warp-ipfs/tests/common.rs index 270fcaebb..e4abe841e 100644 --- a/extensions/warp-ipfs/tests/common.rs +++ b/extensions/warp-ipfs/tests/common.rs @@ -62,10 +62,9 @@ pub async fn create_account( *config.listen_on_mut() = vec![Multiaddr::empty().with(Protocol::Memory(0))]; config.ipfs_setting_mut().memory_transport = true; config.store_setting_mut().discovery = Discovery::None; + config.store_setting_mut().auto_push_duration = Duration::from_secs(1); config.ipfs_setting_mut().relay_client.relay_address = vec![]; config.ipfs_setting_mut().mdns.enable = false; - config.store_setting_mut().announce_to_mesh = true; - config.store_setting_mut().auto_push = Some(Duration::from_secs(1)); *config.bootstrap_mut() = Bootstrap::None;