Skip to content

Commit

Permalink
Implement a simple DHT fallback on HTTPS
Browse files Browse the repository at this point in the history
Configured in the config file.
  • Loading branch information
hrxi committed Nov 25, 2024
1 parent 8336a5b commit ffae71c
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 14 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions lib/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ impl ClientInner {
// Load validator address
let automatic_reactivate = validator_config.automatic_reactivate;

let dht_fallback_url = validator_config.dht_fallback_url;

// Load signing key (before we give away ownership of the storage config)
let signing_key = config.storage.signing_keypair()?;

Expand All @@ -565,8 +567,10 @@ impl ClientInner {
// Load fee key (before we give away ownership of the storage config)
let fee_key = config.storage.fee_keypair()?;

let validator_network =
Arc::new(ValidatorNetworkImpl::new(Arc::clone(&network)));
let validator_network = Arc::new(ValidatorNetworkImpl::new(
Arc::clone(&network),
dht_fallback_url,
));

let validator = Validator::new(
environment.clone(),
Expand Down
4 changes: 4 additions & 0 deletions lib/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use nimiq_utils::key_rng::SecureGenerate;
use nimiq_utils::{file_store::FileStore, Sensitive};
use nimiq_zkp_circuits::DEFAULT_PROVER_KEYS_PATH;
use subtle::ConstantTimeEq;
use url::Url;

#[cfg(feature = "database-storage")]
use crate::config::config_file::DatabaseSettings;
Expand Down Expand Up @@ -603,6 +604,8 @@ pub struct ValidatorConfig {
/// The validator address.
pub validator_address: Address,

pub dht_fallback_url: Option<Url>,

/// Config if the validator automatically reactivates itself.
pub automatic_reactivate: bool,
}
Expand Down Expand Up @@ -900,6 +903,7 @@ impl ClientConfigBuilder {
if let Some(validator_config) = config_file.validator.as_ref() {
self.validator(ValidatorConfig {
validator_address: Address::from_any_str(&validator_config.validator_address)?,
dht_fallback_url: validator_config.dht_fallback_url.clone(),
automatic_reactivate: validator_config.automatic_reactivate,
});

Expand Down
1 change: 1 addition & 0 deletions lib/src/config/config_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ pub struct ValidatorSettings {
pub voting_key: Option<Sensitive<String>>,
pub fee_key_file: Option<String>,
pub fee_key: Option<Sensitive<String>>,
pub dht_fallback_url: Option<Url>,
#[serde(default)]
pub automatic_reactivate: bool,
}
Expand Down
3 changes: 2 additions & 1 deletion network-interface/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
fmt::{Debug, Display},
hash::Hash,
str::FromStr,
time::Duration,
};

Expand Down Expand Up @@ -92,7 +93,7 @@ pub trait RequestResponse {

#[async_trait]
pub trait Network: Send + Sync + Unpin + 'static {
type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
type PeerId: Copy + Debug + Display + FromStr + Ord + Hash + Send + Sync + Unpin + 'static;
type AddressType: Debug + Display + 'static;
type Error: std::error::Error;
type PubsubId: PubsubId<Self::PeerId> + Send + Sync + Unpin;
Expand Down
2 changes: 1 addition & 1 deletion network-mock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ workspace = true

[dependencies]
async-trait = "0.1"
derive_more = { version = "1.0", features = ["display", "from", "into"] }
derive_more = { version = "1.0", features = ["display", "from", "from_str", "into"] }
futures = { workspace = true }
log = { workspace = true }
parking_lot = "0.12"
Expand Down
3 changes: 2 additions & 1 deletion network-mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod hub;
mod network;
mod observable_hash_map;

use derive_more::{Display, From, Into};
use derive_more::{Display, From, FromStr, Into};
pub use hub::MockHub;
pub use network::{MockId, MockNetwork};
use nimiq_network_interface::{multiaddr, Multiaddr};
Expand All @@ -28,6 +28,7 @@ pub struct MockAddress(u64);
PartialOrd,
Ord,
Display,
FromStr,
From,
Into,
)]
Expand Down
5 changes: 4 additions & 1 deletion test-utils/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ where
let node =
Node::<N>::history_with_genesis_info(peer_id, genesis_info, hub, is_prover_active).await;
let consensus = node.consensus.expect("Could not create consensus");
let validator_network = Arc::new(ValidatorNetworkImpl::new(Arc::clone(&consensus.network)));
let validator_network = Arc::new(ValidatorNetworkImpl::new(
Arc::clone(&consensus.network),
None,
));
(
Validator::<ValidatorNetworkImpl<N>>::new(
node.environment,
Expand Down
12 changes: 12 additions & 0 deletions validator-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,20 @@ thiserror = "2.0"
time = { version = "0.3" }
tokio = { version = "1.41", features = ["rt"] }

http = "1"
http-body-util = "0.1"
hyper = { version = "1", default-features = false }
hyper-rustls = { version = "0.27", features = ["native-tokio", "http1", "tls12", "logging", "ring"], default-features = false }
hyper-util = { version = "0.1", default-features = false, features = ["client-legacy", "tokio"] }
rustls = { version = "0.23", default-features = false }
serde_json = "1.0"

nimiq-keys = { workspace = true, features = ["serde-derive"] }
nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["slots"] }
nimiq-serde = { workspace = true }
nimiq-utils = { workspace = true, features = ["futures", "spawn", "tagged-signing"] }
url = "2.5.3"

[dev-dependencies]
tokio = { version = "1.41", features = ["macros"] }
120 changes: 120 additions & 0 deletions validator-network/src/dht_fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{io, str::FromStr};

use http::Uri;
use http_body_util::{BodyExt, Empty};
use hyper::body::Bytes;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use log::error;
use nimiq_keys::Address;
use serde::Deserialize;
use url::Url;

#[derive(Deserialize)]
struct Fallback {
validators: Vec<FallbackValidator>,
}

#[derive(Deserialize)]
struct FallbackValidator {
address: Address,
peer_id: String,
}

pub struct DhtFallback {
client: Client<HttpsConnector<HttpConnector>, Empty<Bytes>>,
uri: Uri,
}

impl DhtFallback {
fn new_inner(url: Url) -> io::Result<DhtFallback> {
let tls = rustls::ClientConfig::builder()
.with_native_roots()?
.with_no_client_auth();

let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(tls)
.https_or_http()
.enable_http1()
.build();

let client = Client::builder(TokioExecutor::new()).build(https);
let uri = url.as_str().parse().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, format!("invalid URI: {}", url))
})?;
Ok(DhtFallback { client, uri })
}
pub fn new(url: Url) -> Option<DhtFallback> {
DhtFallback::new_inner(url)
.inspect_err(|error| error!(%error, "couldn't create http client"))
.ok()
}

async fn resolve_inner<T: FromStr>(
&self,
validator_address: Address,
) -> Result<Option<T>, String> {
let response = self
.client
.get(self.uri.clone())
.await
.map_err(|error| error.to_string())?;

if !response.status().is_success() {
return Err(format!("bad http response: {}", response.status()));
}

let response = response
.into_body()
.collect()
.await
.map_err(|error| error.to_string())?
.to_bytes();

let fallback: Fallback =
serde_json::from_slice(&response).map_err(|error| format!("invalid JSON: {error}"))?;

for validator in fallback.validators {
if validator.address == validator_address {
return Ok(Some(validator.peer_id.parse().map_err(|_| {
format!("invalid peer ID: {:?}", validator.peer_id)
})?));
}
}

Ok(None)
}
pub async fn resolve<T: FromStr>(&self, validator_address: Address) -> Option<T> {
self.resolve_inner(validator_address.clone())
.await
.inspect_err(|error| error!(%error, %validator_address, "couldn't resolve"))
.ok()
.flatten()
}
}

#[cfg(test)]
mod test {
use url::Url;

use super::DhtFallback;

#[tokio::test]
async fn resolve() {
assert_eq!(
DhtFallback::new(Url::parse("https://gist.githubusercontent.com/hrxi/50dc18caa17826e72cc05542cfe8946f/raw/dht.json").unwrap())
.unwrap()
.resolve(
"NQ36 U0BH 0BHM J0EH UAE5 FMV6 D2EY 8TBP 50M3"
.parse()
.unwrap()
)
.await,
Some(String::from(
"12D3KooW9tKu6QesTCCqADjhVSf4hvWinzjuLpxv4mefBzKLkiae"))
);
}
}
2 changes: 2 additions & 0 deletions validator-network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod dht_fallback;

pub mod error;
pub mod network_impl;
pub mod single_response_requester;
Expand Down
32 changes: 29 additions & 3 deletions validator-network/src/network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use nimiq_serde::{Deserialize, Serialize};
use nimiq_utils::spawn;
use parking_lot::RwLock;
use time::OffsetDateTime;
use url::Url;

use super::{MessageStream, NetworkError, PubsubId, ValidatorNetwork};
use crate::validator_record::ValidatorRecord;
use crate::{dht_fallback::DhtFallback, validator_record::ValidatorRecord};

/// Validator `PeerId` cache state
#[derive(Clone, Copy)]
Expand Down Expand Up @@ -65,6 +66,7 @@ where
validators: Arc<RwLock<Option<Validators>>>,
/// Cache for mapping validator public keys to peer IDs
validator_peer_id_cache: Arc<RwLock<BTreeMap<Address, CacheState<N::PeerId>>>>,
dht_fallback: Arc<Option<DhtFallback>>,
}

impl<N> ValidatorNetworkImpl<N>
Expand All @@ -73,12 +75,13 @@ where
N::PeerId: Serialize + Deserialize,
N::Error: Sync + Send,
{
pub fn new(network: Arc<N>) -> Self {
pub fn new(network: Arc<N>, dht_fallback_url: Option<Url>) -> Self {
Self {
network,
own_validator_id: Arc::new(RwLock::new(None)),
validators: Arc::new(RwLock::new(None)),
validator_peer_id_cache: Arc::new(RwLock::new(BTreeMap::new())),
dht_fallback: Arc::new(dht_fallback_url.and_then(|url| DhtFallback::new(url))),

Check warning on line 84 in validator-network/src/network_impl.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

redundant closure

warning: redundant closure --> validator-network/src/network_impl.rs:84:62 | 84 | dht_fallback: Arc::new(dht_fallback_url.and_then(|url| DhtFallback::new(url))), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace the closure with the function itself: `DhtFallback::new` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure = note: `#[warn(clippy::redundant_closure)]` on by default
}
}

Expand All @@ -89,6 +92,7 @@ where
own_validator_id: Arc::clone(&self.own_validator_id),
validators: Arc::clone(&self.validators),
validator_peer_id_cache: Arc::clone(&self.validator_peer_id_cache),
dht_fallback: Arc::clone(&self.dht_fallback),
}
}

Expand All @@ -111,6 +115,22 @@ where
async fn resolve_peer_id(
network: &N,
validator_address: &Address,
fallback: Arc<Option<DhtFallback>>,
) -> Result<Option<N::PeerId>, NetworkError<N::Error>> {
let result = Self::resolve_peer_id_dht(network, validator_address).await;
if !matches!(result, Ok(Some(_))) {
if let Some(fallback) = &*fallback {
if let Some(peer_id) = fallback.resolve(validator_address.clone()).await {
return Ok(Some(peer_id));
}
}
}
result
}

async fn resolve_peer_id_dht(
network: &N,
validator_address: &Address,
) -> Result<Option<N::PeerId>, NetworkError<N::Error>> {
if let Some(record) = network
.dht_get::<_, ValidatorRecord<N::PeerId>, KeyPair>(validator_address)
Expand All @@ -130,7 +150,13 @@ where
///
/// The given `validator_id` is used for logging purposes only.
async fn update_peer_id_cache(&self, validator_id: u16, validator_address: &Address) {
let cache_value = match Self::resolve_peer_id(&self.network, validator_address).await {
let cache_value = match Self::resolve_peer_id(
&self.network,
validator_address,
Arc::clone(&self.dht_fallback),
)
.await
{
Ok(Some(peer_id)) => {
log::trace!(
%peer_id,
Expand Down
1 change: 1 addition & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ nimiq-transaction-builder = { workspace = true }
nimiq-utils = { workspace = true, features = ["futures", "time"] }
nimiq-validator-network = { workspace = true }
nimiq-vrf = { workspace = true }
url = "2.5.3"

[dev-dependencies]
hex = "0.4"
Expand Down
Loading

0 comments on commit ffae71c

Please sign in to comment.