diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 571545fd7..f5e194a06 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -75,177 +75,6 @@ impl Discovery { } } - // Start discovery task - // Note: This starting will only work across a provided namespace - // pub async fn start(&self) -> Result<(), Error> { - // match &self.config { - // DiscoveryConfig::Namespace { - // discovery_type: DiscoveryType::DHT, - // namespace, - // } => { - // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - // let cid = self.ipfs.put_dag(format!("discovery:{namespace}")).await?; - // - // let task = self.executor.spawn_abortable({ - // let discovery = self.clone(); - // async move { - // let mut cached = HashSet::new(); - // - // if let Err(e) = discovery.ipfs.provide(cid).await { - // //Maybe panic? - // tracing::error!("Error providing key: {e}"); - // return; - // } - // - // loop { - // if let Ok(mut stream) = discovery.ipfs.get_providers(cid).await { - // while let Some(peer_id) = stream.next().await { - // if discovery - // .ipfs - // .is_connected(peer_id) - // .await - // .unwrap_or_default() - // && cached.insert(peer_id) - // && !discovery.contains(peer_id).await - // { - // let entry = DiscoveryEntry::new( - // &discovery.ipfs, - // peer_id, - // discovery.config.clone(), - // discovery.events.clone(), - // discovery.relays.clone(), - // ) - // .await; - // if discovery.entries.write().await.insert(entry.clone()) { - // entry.start().await; - // } - // } - // } - // } - // futures_timer::Delay::new(Duration::from_secs(1)).await; - // } - // } - // }); - // - // *self.task.write().await = Some(task); - // } - // DiscoveryConfig::Namespace { - // discovery_type: DiscoveryType::RzPoint { addresses }, - // namespace, - // } => { - // let mut peers = vec![]; - // for mut addr in addresses.iter().cloned() { - // let Some(peer_id) = addr.extract_peer_id() else { - // continue; - // }; - // - // if let Err(e) = self.ipfs.add_peer((peer_id, addr)).await { - // tracing::error!("Error adding peer to address book {e}"); - // continue; - // } - // - // peers.push(peer_id); - // } - // - // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - // let mut register_id = vec![]; - // - // for peer_id in &peers { - // if let Err(e) = self - // .ipfs - // .rendezvous_register_namespace(namespace.clone(), None, *peer_id) - // .await - // { - // tracing::error!("Error registering to namespace: {e}"); - // continue; - // } - // - // register_id.push(*peer_id); - // } - // - // if register_id.is_empty() { - // return Err(Error::OtherWithContext( - // "Unable to register to any external nodes".into(), - // )); - // } - // - // let task = self.executor.spawn_abortable({ - // let discovery = self.clone(); - // let register_id = register_id; - // async move { - // let mut meshed_map: HashMap> = - // HashMap::new(); - // - // loop { - // for peer_id in ®ister_id { - // let map = match discovery - // .ipfs - // .rendezvous_namespace_discovery( - // namespace.clone(), - // None, - // *peer_id, - // ) - // .await - // { - // Ok(map) => map, - // Err(e) => { - // tracing::error!(namespace = %namespace, error = %e, "failed to perform discovery over given namespace"); - // continue; - // } - // }; - // - // for (peer_id, addrs) in map { - // match meshed_map.entry(peer_id) { - // Entry::Occupied(mut entry) => { - // entry.get_mut().extend(addrs); - // } - // Entry::Vacant(entry) => { - // entry.insert(HashSet::from_iter( - // addrs.iter().cloned(), - // )); - // if !discovery - // .ipfs - // .is_connected(peer_id) - // .await - // .unwrap_or_default() - // && discovery.ipfs.connect(peer_id).await.is_ok() - // && !discovery.contains(peer_id).await - // { - // let entry = DiscoveryEntry::new( - // &discovery.ipfs, - // peer_id, - // discovery.config.clone(), - // discovery.events.clone(), - // discovery.relays.clone(), - // ) - // .await; - // - // if discovery - // .entries - // .write() - // .await - // .insert(entry.clone()) - // { - // entry.start().await; - // } - // } - // } - // } - // } - // } - // futures_timer::Delay::new(Duration::from_secs(5)).await; - // } - // } - // }); - // - // *self.task.write().await = Some(task); - // } - // DiscoveryConfig::Shuttle { addresses: _ } => {} - // _ => {} - // } - // Ok(()) - // } - pub fn events(&self) -> broadcast::Receiver { self.broadcast_tx.subscribe() }