diff --git a/extensions/warp-ipfs/examples/identity-interface.rs b/extensions/warp-ipfs/examples/identity-interface.rs index b8c3b5533..32d6dab1a 100644 --- a/extensions/warp-ipfs/examples/identity-interface.rs +++ b/extensions/warp-ipfs/examples/identity-interface.rs @@ -12,7 +12,7 @@ use warp::crypto::DID; use warp::multipass::identity::{Identifier, IdentityProfile, IdentityStatus, IdentityUpdate}; use warp::multipass::{IdentityImportOption, ImportLocation, MultiPass}; use warp::tesseract::Tesseract; -use warp_ipfs::config::{Config, Discovery, DiscoveryType}; +use warp_ipfs::config::{Bootstrap, Config, Discovery, DiscoveryType}; use warp_ipfs::WarpIpfsBuilder; #[derive(Debug, Parser)] @@ -96,6 +96,7 @@ async fn account( } if opt.no_discovery { config.store_setting.discovery = Discovery::None; + config.bootstrap = Bootstrap::None; config.ipfs_setting.bootstrap = false; } diff --git a/extensions/warp-ipfs/src/behaviour/phonebook.rs b/extensions/warp-ipfs/src/behaviour/phonebook.rs index 686e1b562..b777e1a9d 100644 --- a/extensions/warp-ipfs/src/behaviour/phonebook.rs +++ b/extensions/warp-ipfs/src/behaviour/phonebook.rs @@ -217,10 +217,7 @@ impl NetworkBehaviour for Behaviour { } } - fn poll( - &mut self, - cx: &mut Context, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 9128b4f55..bb7b0c68a 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -6,7 +6,7 @@ use std::{ str::FromStr, time::Duration, }; -use warp::{multipass::identity::Identity, constellation::file::FileType}; +use warp::{constellation::file::FileType, multipass::identity::Identity}; #[derive(Default, Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -249,8 +249,9 @@ pub enum UpdateEvents { Disable, } -pub type DefaultPfpFn = - std::sync::Arc Result<(Vec, FileType), std::io::Error> + Send + Sync + 'static>; +pub type DefaultPfpFn = std::sync::Arc< + dyn Fn(&Identity) -> Result<(Vec, FileType), std::io::Error> + Send + Sync + 'static, +>; #[derive(Clone, Serialize, Deserialize)] pub struct StoreSetting { diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 15b700142..03b44fcd4 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -321,8 +321,10 @@ impl WarpIpfs { uninitialized = uninitialized.set_path(path); } - for addr in config.bootstrap.address() { - uninitialized = uninitialized.add_bootstrap(addr); + if config.ipfs_setting.bootstrap { + for addr in config.bootstrap.address() { + uninitialized = uninitialized.add_bootstrap(addr); + } } if config.ipfs_setting.memory_transport { @@ -398,10 +400,19 @@ impl WarpIpfs { } for relay_peer in relay_peers { - if let Err(e) = ipfs.enable_relay(Some(relay_peer)).await { - error!("Failed to use {relay_peer} as a relay: {e}"); - continue; - } + match tokio::time::timeout(Duration::from_secs(15), ipfs.enable_relay(Some(relay_peer))) + .await + { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + error!("Failed to use {relay_peer} as a relay: {e}"); + continue; + } + Err(_) => { + error!("Relay connection timed out"); + continue; + } + }; let list = ipfs.list_relays(true).await.unwrap_or_default(); for addr in list diff --git a/extensions/warp-ipfs/tests/group.rs b/extensions/warp-ipfs/tests/group.rs index d7124de82..0d50a4e0a 100644 --- a/extensions/warp-ipfs/tests/group.rs +++ b/extensions/warp-ipfs/tests/group.rs @@ -766,7 +766,6 @@ mod test { assert!(conversation.recipients().contains(&did_b)); assert!(conversation.recipients().contains(&did_c)); - let mut conversation_a = chat_a.get_conversation_stream(id_a).await?; let mut conversation_b = chat_b.get_conversation_stream(id_b).await?; @@ -918,7 +917,6 @@ mod test { assert!(conversation.recipients().contains(&did_b)); assert!(conversation.recipients().contains(&did_c)); - let mut conversation_a = chat_a.get_conversation_stream(id_a).await?; let mut conversation_b = chat_b.get_conversation_stream(id_b).await?; @@ -926,7 +924,8 @@ mod test { tokio::time::timeout(Duration::from_secs(60), async { loop { - if let Some(MultiPassEventKind::BlockedBy { did }) = account_subscribe_a.next().await + if let Some(MultiPassEventKind::BlockedBy { did }) = + account_subscribe_a.next().await { assert_eq!(did, did_c); break; @@ -967,8 +966,7 @@ mod test { tokio::time::timeout(Duration::from_secs(60), async { loop { - if let Some(MultiPassEventKind::Blocked { did }) = - account_subscribe_c.next().await + if let Some(MultiPassEventKind::Blocked { did }) = account_subscribe_c.next().await { assert_eq!(did, did_a); break; diff --git a/tools/relay-server/Cargo.toml b/tools/relay-server/Cargo.toml new file mode 100644 index 000000000..2f86dba75 --- /dev/null +++ b/tools/relay-server/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "relay-server" +version.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rust-ipfs = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true, features = ["full"] } +tokio-stream = { workspace = true, features = ["net"] } +futures.workspace = true +futures-timer.workspace = true +async-trait.workspace = true +async-stream.workspace = true +anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true +void.workspace = true +tracing.workspace = true +clap = { version = "4.4", features = ["derive"] } +zeroize = "1" +dotenv = "0.15" +base64 = "0.21" \ No newline at end of file diff --git a/tools/relay-server/src/config.rs b/tools/relay-server/src/config.rs new file mode 100644 index 000000000..793f42934 --- /dev/null +++ b/tools/relay-server/src/config.rs @@ -0,0 +1,47 @@ +use std::{error::Error, path::Path}; + +use base64::{ + alphabet::STANDARD, + engine::{general_purpose::PAD, GeneralPurpose}, + Engine, +}; +use rust_ipfs::{Keypair, PeerId}; +use serde::Deserialize; +use zeroize::Zeroizing; + +#[derive(Clone, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct IpfsConfig { + pub identity: Identity, +} + +impl IpfsConfig { + pub fn load>(path: P) -> Result> { + let file = std::fs::File::open(path)?; + let config = serde_json::from_reader(file)?; + Ok(config) + } +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct Identity { + #[serde(rename = "PeerID")] + pub peer_id: PeerId, + pub priv_key: String, +} + +impl Identity { + pub fn keypair(&self) -> Result> { + let engine = GeneralPurpose::new(&STANDARD, PAD); + let keypair_bytes = Zeroizing::new(engine.decode(self.priv_key.as_bytes())?); + let keypair = Keypair::from_protobuf_encoding(&keypair_bytes)?; + Ok(keypair) + } +} + +impl zeroize::Zeroize for IpfsConfig { + fn zeroize(&mut self) { + self.identity.priv_key.zeroize(); + } +} diff --git a/tools/relay-server/src/main.rs b/tools/relay-server/src/main.rs new file mode 100644 index 000000000..e20f07fc6 --- /dev/null +++ b/tools/relay-server/src/main.rs @@ -0,0 +1,237 @@ +mod config; + +use std::{path::PathBuf, time::Duration}; + +use base64::{ + alphabet::STANDARD, + engine::{general_purpose::PAD, GeneralPurpose}, + Engine, +}; +use clap::Parser; +use rust_ipfs::{ + p2p::{RateLimit, RelayConfig, TransportConfig}, + FDLimit, Keypair, Multiaddr, UninitializedIpfs, +}; + +use zeroize::Zeroizing; + +use crate::config::IpfsConfig; + +fn decode_kp(kp: &str) -> anyhow::Result { + let engine = GeneralPurpose::new(&STANDARD, PAD); + let keypair_bytes = Zeroizing::new(engine.decode(kp.as_bytes())?); + let keypair = Keypair::from_protobuf_encoding(&keypair_bytes)?; + Ok(keypair) +} + +fn encode_kp(kp: &Keypair) -> anyhow::Result { + let bytes = kp.to_protobuf_encoding()?; + let engine = GeneralPurpose::new(&STANDARD, PAD); + let kp_encoded = engine.encode(bytes); + Ok(kp_encoded) +} + +#[derive(Debug, Parser)] +#[clap(name = "relay-server")] +struct Opt { + /// Listening addresses in multiaddr format. If empty, will listen on all addresses available + #[clap(long)] + listen_addr: Vec, + + #[clap(long)] + keyfile: Option, + + /// Path to the ipfs instance + #[clap(long)] + path: Option, + + /// Path to ipfs config to use existing keypair + #[clap(long)] + ipfs_config: Option, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let opts = Opt::parse(); + + let path = opts.path; + + if let Some(path) = path.as_ref() { + tokio::fs::create_dir_all(path).await?; + } + + let keypair = match opts + .keyfile + .map(|kp| path.as_ref().map(|p| p.join(kp.clone())).unwrap_or(kp)) + { + Some(kp) => match kp.is_file() { + true => { + tracing::info!("Reading keypair from {}", kp.display()); + let kp_str = tokio::fs::read_to_string(&kp).await?; + decode_kp(&kp_str)? + } + false => { + tracing::info!("Generating keypair"); + let k = Keypair::generate_ed25519(); + let encoded_kp = encode_kp(&k)?; + let kp = path.as_ref().map(|p| p.join(kp.clone())).unwrap_or(kp); + tracing::info!("Saving keypair to {}", kp.display()); + tokio::fs::write(kp, &encoded_kp).await?; + k + } + }, + None => { + if let Some(config) = opts.ipfs_config { + let config = IpfsConfig::load(config)?; + config.identity.keypair()? + } else { + tracing::info!("Generating keypair"); + Keypair::generate_ed25519() + } + } + }; + + let local_peer_id = keypair.public().to_peer_id(); + println!("Local PeerID: {local_peer_id}"); + + let mut uninitialized = UninitializedIpfs::new() + .with_identify(None) + .with_ping(None) + .with_relay_server(Some(RelayConfig { + max_circuits: 8198, + max_circuits_per_peer: 8198, + max_circuit_duration: Duration::from_secs(2 * 60), + max_circuit_bytes: 8 * 1024 * 1024, + circuit_src_rate_limiters: vec![ + RateLimit::PerIp { + limit: 256.try_into().expect("Greater than 0"), + interval: Duration::from_secs(60 * 2), + }, + RateLimit::PerPeer { + limit: 256.try_into().expect("Greater than 0"), + interval: Duration::from_secs(60), + }, + ], + max_reservations_per_peer: 512, + max_reservations: 8198, + reservation_duration: Duration::from_secs(60 * 60), + reservation_rate_limiters: vec![ + RateLimit::PerIp { + limit: 256.try_into().expect("Greater than 0"), + interval: Duration::from_secs(60), + }, + RateLimit::PerPeer { + limit: 256.try_into().expect("Greater than 0"), + interval: Duration::from_secs(60), + }, + ], + })) + .fd_limit(FDLimit::Max) + .set_keypair(keypair) + .set_idle_connection_timeout(30) + .set_transport_configuration(TransportConfig { + enable_quic: true, + ..Default::default() + }) + .listen_as_external_addr() + .with_custom_behaviour(ext_behaviour::Behaviour); + + let addrs = match opts.listen_addr.as_slice() { + [] => vec![ + "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + ], + addrs => addrs.to_vec(), + }; + + if let Some(path) = path { + uninitialized = uninitialized.set_path(path); + } + + uninitialized = uninitialized.set_listening_addrs(addrs); + + let _ipfs = uninitialized.start().await?; + + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +mod ext_behaviour { + use std::task::{Context, Poll}; + + use rust_ipfs::libp2p::{ + core::Endpoint, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NewListenAddr, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, + }; + use rust_ipfs::NetworkBehaviour; + + #[derive(Default, Debug)] + pub struct Behaviour; + + impl NetworkBehaviour for Behaviour { + type ConnectionHandler = rust_ipfs::libp2p::swarm::dummy::ConnectionHandler; + type ToSwarm = void::Void; + + fn handle_pending_inbound_connection( + &mut self, + _: ConnectionId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + Ok(()) + } + + fn handle_pending_outbound_connection( + &mut self, + _: ConnectionId, + _: Option, + _: &[Multiaddr], + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(vec![]) + } + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(rust_ipfs::libp2p::swarm::dummy::ConnectionHandler) + } + + fn on_connection_handler_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) = event { + println!("Listening on {addr}"); + } + } + + fn poll(&mut self, _: &mut Context) -> Poll>> { + Poll::Pending + } + } +} diff --git a/warp/src/multipass/mod.rs b/warp/src/multipass/mod.rs index 35464efbd..de430ac31 100644 --- a/warp/src/multipass/mod.rs +++ b/warp/src/multipass/mod.rs @@ -17,7 +17,7 @@ use identity::Identity; use crate::crypto::DID; use crate::multipass::identity::{Identifier, IdentityUpdate}; -use self::identity::{IdentityProfile, IdentityStatus, Platform, Relationship, IdentityImage}; +use self::identity::{IdentityImage, IdentityProfile, IdentityStatus, Platform, Relationship}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, FFIFree)] #[serde(rename_all = "snake_case")]