Skip to content

Commit

Permalink
refactor: remove shuttle behaviour and send requests directly (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Dec 10, 2024
1 parent 8280796 commit 00d4b78
Show file tree
Hide file tree
Showing 22 changed files with 1,971 additions and 3,587 deletions.
46 changes: 45 additions & 1 deletion extensions/warp-ipfs/shuttle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ struct Opt {
#[clap(long)]
listen_addr: Vec<Multiaddr>,

/// External address in multiaddr format that would indicate how the node can be reached.
/// If empty, all listening addresses will be used as an external address
#[clap(long)]
external_addr: Vec<Multiaddr>,

/// Primary node in multiaddr format for bootstrap, discovery and building out mesh network
#[clap(long)]
primary_nodes: Vec<Multiaddr>,
Expand All @@ -45,15 +50,26 @@ struct Opt {
#[clap(long)]
trusted_nodes: Vec<Multiaddr>,

/// Path to keyfile
#[clap(long)]
keyfile: Option<PathBuf>,

/// Path to the ipfs instance
#[clap(long)]
path: Option<PathBuf>,

/// Enable relay server
#[clap(long)]
enable_relay_server: bool,

/// TLS Certificate when websocket is used
/// Note: websocket required a signed certificate.
#[clap(long)]
ws_tls_certificate: Option<Vec<PathBuf>>,

/// TLS Private Key when websocket is used
#[clap(long)]
ws_tls_private_key: Option<PathBuf>,
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -109,15 +125,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};

let (ws_cert, ws_pk) = match (
opts.ws_tls_certificate.map(|list| {
list.into_iter()
.map(|conf| path.as_ref().map(|p| p.join(conf.clone())).unwrap_or(conf))
.collect::<Vec<_>>()
}),
opts.ws_tls_private_key
.map(|conf| path.as_ref().map(|p| p.join(conf.clone())).unwrap_or(conf)),
) {
(Some(cert), Some(prv)) => {
let mut certs = Vec::with_capacity(cert.len());
for c in cert {
let Ok(cert) = tokio::fs::read_to_string(c).await else {
continue;
};
certs.push(cert);
}

let prv = tokio::fs::read_to_string(prv).await.ok();
((!certs.is_empty()).then_some(certs), prv)
}
_ => (None, None),
};

let wss_opt = ws_cert.and_then(|list| ws_pk.map(|k| (list, k)));

let local_peer_id = keypair.public().to_peer_id();
println!("Local PeerID: {local_peer_id}");

let _ = shuttle::server::ShuttleServer::new(
let _handle = shuttle::server::ShuttleServer::new(
&keypair,
wss_opt,
path,
opts.enable_relay_server,
false,
&opts.listen_addr,
&opts.external_addr,
true,
)
.await?;
Expand Down
4 changes: 1 addition & 3 deletions extensions/warp-ipfs/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
pub mod phonebook;

use libp2p::swarm::NetworkBehaviour;
use rust_ipfs::libp2p::{self, swarm::behaviour::toggle::Toggle};
use rust_ipfs::libp2p;

#[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2p::swarm::derive_prelude", to_swarm = "void::Void")]
pub struct Behaviour {
pub shuttle_identity: Toggle<crate::shuttle::identity::client::Behaviour>,
pub shuttle_message: Toggle<crate::shuttle::message::client::Behaviour>,
pub phonebook: phonebook::Behaviour,
}
116 changes: 64 additions & 52 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,28 +292,49 @@ impl WarpIpfs {
}

let (pb_tx, pb_rx) = channel(50);
let (id_sh_tx, id_sh_rx) = futures::channel::mpsc::channel(1);
let (msg_sh_tx, msg_sh_rx) = futures::channel::mpsc::channel(1);

let (enable, nodes) = match &self.inner.config.store_setting().discovery {
config::Discovery::Shuttle { addresses } => (true, addresses.clone()),
_ => Default::default(),
};

let behaviour = behaviour::Behaviour {
shuttle_identity: enable
.then(|| {
shuttle::identity::client::Behaviour::new(&keypair, None, id_sh_rx, &nodes)
})
.into(),
shuttle_message: enable
.then(|| {
shuttle::message::client::Behaviour::new(&keypair, None, msg_sh_rx, &nodes)
})
.into(),
phonebook: behaviour::phonebook::Behaviour::new(self.multipass_tx.clone(), pb_rx),
};

let mut request_response_configs = vec![
RequestResponseConfig {
protocol: protocols::EXCHANGE_PROTOCOL.as_ref().into(),
max_request_size: 8 * 1024,
max_response_size: 16 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::IDENTITY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::DISCOVERY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
];

if let config::Discovery::Shuttle { .. } = &self.inner.config.store_setting().discovery {
request_response_configs.extend([
RequestResponseConfig {
protocol: protocols::SHUTTLE_IDENTITY.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::SHUTTLE_MESSAGE.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
]);
}

tracing::info!("Starting ipfs");
let mut uninitialized = UninitializedIpfs::new()
.with_identify({
Expand All @@ -333,39 +354,7 @@ impl WarpIpfs {
..Default::default()
})
.with_relay(true)
.with_request_response(vec![
RequestResponseConfig {
protocol: protocols::EXCHANGE_PROTOCOL.as_ref().into(),
max_request_size: 8 * 1024,
max_response_size: 16 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::IDENTITY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
RequestResponseConfig {
protocol: protocols::DISCOVERY_PROTOCOL.as_ref().into(),
max_request_size: 256 * 1024,
max_response_size: 512 * 1024,
..Default::default()
},
// TODO: Uncomment or remove during shuttle protocol refactor
// RequestResponseConfig {
// protocol: protocols::SHUTTLE_IDENTITY.as_ref().into(),
// max_request_size: 256 * 1024,
// max_response_size: 512 * 1024,
// ..Default::default()
// },
// RequestResponseConfig {
// protocol: protocols::SHUTTLE_MESSAGE.as_ref().into(),
// max_request_size: 256 * 1024,
// max_response_size: 512 * 1024,
// ..Default::default()
// },
])
.with_request_response(request_response_configs)
.set_listening_addrs(self.inner.config.listen_on().to_vec())
.with_custom_behaviour(behaviour)
.set_keypair(&keypair)
Expand Down Expand Up @@ -535,6 +524,31 @@ impl WarpIpfs {
}
}

if let config::Discovery::Shuttle { addresses } =
self.inner.config.store_setting().discovery.clone()
{
let mut nodes = IndexSet::new();
for mut addr in addresses {
let Some(peer_id) = addr.extract_peer_id() else {
tracing::warn!("{addr} does not contain a peer id. Skipping");
continue;
};

if let Err(_e) = ipfs.add_peer((peer_id, addr)).await {
// TODO:
continue;
}

nodes.insert(peer_id);
}

for node in nodes {
if let Err(_e) = ipfs.connect(node).await {
// TODO
}
}
}

if matches!(
self.inner.config.store_setting().discovery,
config::Discovery::Namespace {
Expand Down Expand Up @@ -630,7 +644,6 @@ impl WarpIpfs {
self.multipass_tx.clone(),
&phonebook,
&discovery,
id_sh_tx,
&span,
)
.await?;
Expand All @@ -654,7 +667,6 @@ impl WarpIpfs {
&filestore,
self.raygun_tx.clone(),
&identity_store,
msg_sh_tx,
)
.await;

Expand Down
79 changes: 0 additions & 79 deletions extensions/warp-ipfs/src/shuttle/identity.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
use crate::store::DidExt;
use chrono::{DateTime, Utc};
use ipld_core::cid::Cid;
use rust_ipfs::Keypair;
use serde::{Deserialize, Serialize};
use warp::crypto::DID;

use crate::store::document::identity::IdentityDocument;

pub mod client;
pub mod protocol;
#[cfg(not(target_arch = "wasm32"))]
pub mod server;

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct IdentityDag {
Expand All @@ -20,75 +13,3 @@ pub struct IdentityDag {
#[serde(skip_serializing_if = "Option::is_none")]
pub mailbox: Option<Cid>,
}

#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Hash, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RequestEvent {
/// Event indicating a friend request
Request,
/// Event accepting the request
Accept,
/// Remove identity as a friend
Remove,
/// Reject friend request, if any
Reject,
/// Retract a sent friend request
Retract,
/// Block user
Block,
/// Unblock user
Unblock,
}

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct RequestPayload {
pub sender: DID,
pub event: RequestEvent,
pub created: DateTime<Utc>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub original_signature: Vec<u8>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub signature: Vec<u8>,
}

impl std::hash::Hash for RequestPayload {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.sender.hash(state);
}
}

impl PartialOrd for RequestPayload {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for RequestPayload {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.created.cmp(&other.created)
}
}

impl RequestPayload {
pub fn sign(mut self, keypair: &Keypair) -> Result<Self, Box<dyn std::error::Error>> {
if !self.signature.is_empty() {
return Err(Box::new(warp::error::Error::InvalidSignature));
}

let bytes = serde_json::to_vec(&self)?;
let signature = keypair.sign(&bytes)?;
self.signature = signature;
Ok(self)
}

pub fn verify(&self) -> Result<(), Box<dyn std::error::Error>> {
let pk = self.sender.to_public_key()?;
let mut doc = self.clone();
let signature = std::mem::take(&mut doc.signature);
let bytes = serde_json::to_vec(&doc)?;
if !pk.verify(&bytes, &signature) {
return Err(Box::new(warp::error::Error::InvalidSignature));
}
Ok(())
}
}
Loading

0 comments on commit 00d4b78

Please sign in to comment.