Skip to content

Commit

Permalink
Implement a simple DHT fallback on HTTPS
Browse files Browse the repository at this point in the history
Configured in the config file.
  • Loading branch information
hrxi committed Nov 25, 2024
1 parent c23e917 commit 22f01a7
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 5 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ console-subscriber = { version = "0.4", features = [
], optional = true }
derive_builder = "0.20"
directories = "5.0"
futures = { workspace = true }
hex = "0.4"
http = { version = "1", optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1", default-features = false, optional = true }
hyper-rustls = { version = "0.27", features = ["native-tokio", "http1", "tls12", "logging", "ring"], default-features = false, optional = true }
hyper-util = { version = "0.1", default-features = false, features = ["client-legacy", "tokio"], optional = true }
instant = { version = "0.1", features = ["wasm-bindgen"] }
log = { workspace = true }
log-panics = { version = "2.1", features = ["with-backtrace"], optional = true }
parking_lot = "0.12"
rand = "0.8"
rand_chacha = "0.3.1"
rustls = { version = "0.23", default-features = false, optional = true }
rustls-pemfile = "2.2"
serde = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
Expand Down Expand Up @@ -85,15 +92,26 @@ nimiq-zkp-component = { workspace = true }
nimiq-zkp-primitives = { workspace = true }

[dev-dependencies]
tokio = { version = "1.41", features = ["macros"] }
nimiq-test-log = { workspace = true }

[features]
database-storage = ["nimiq-database", "nimiq-zkp-component/database-storage"]
deadlock = ["parking_lot/deadlock_detection"]
default = ["full-consensus"]
dht-fallback = [
"http",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper-util",
"rustls",
"serde_json",
]
extended-metrics = ["nimiq-metrics-server/extended-staking"]
full-consensus = [
"database-storage",
"dht-fallback",
"nimiq-blockchain",
"nimiq-consensus/full",
"nimiq-dht",
Expand Down
37 changes: 35 additions & 2 deletions lib/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,37 @@ impl ClientInner {
// Load validator address
let automatic_reactivate = validator_config.automatic_reactivate;

let dht_fallback_url = validator_config.dht_fallback_url;

let dht_fallback = {
#[cfg(feature = "dht-fallback")]
{
use futures::future::FutureExt as _;

use crate::extras::dht_fallback::DhtFallback;
let fallback = Arc::new(dht_fallback_url.and_then(DhtFallback::new));
move |address| {
let fallback = fallback.clone();
async move {
if let Some(fallback) = &*fallback {
fallback.resolve(address).await
} else {
None
}
}
.boxed()
}
}
#[cfg(not(feature = "dht-fallback"))]
{
assert!(
dht_fallback_url.is_none(),
"DHT fallback support not compiled in"
);
|_| future::ready(None).boxed()
}
};

// Load signing key (before we give away ownership of the storage config)
let signing_key = config.storage.signing_keypair()?;

Expand All @@ -565,8 +596,10 @@ impl ClientInner {
// Load fee key (before we give away ownership of the storage config)
let fee_key = config.storage.fee_keypair()?;

let validator_network =
Arc::new(ValidatorNetworkImpl::new(Arc::clone(&network)));
let validator_network = Arc::new(ValidatorNetworkImpl::new_with_fallback(
Arc::clone(&network),
Arc::new(dht_fallback),
));

let validator = Validator::new(
environment.clone(),
Expand Down
3 changes: 3 additions & 0 deletions lib/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ pub struct ValidatorConfig {
/// The validator address.
pub validator_address: Address,

pub dht_fallback_url: Option<url::Url>,

/// Config if the validator automatically reactivates itself.
pub automatic_reactivate: bool,
}
Expand Down Expand Up @@ -900,6 +902,7 @@ impl ClientConfigBuilder {
if let Some(validator_config) = config_file.validator.as_ref() {
self.validator(ValidatorConfig {
validator_address: Address::from_any_str(&validator_config.validator_address)?,
dht_fallback_url: validator_config.dht_fallback_url.clone(),
automatic_reactivate: validator_config.automatic_reactivate,
});

Expand Down
1 change: 1 addition & 0 deletions lib/src/config/config_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ pub struct ValidatorSettings {
pub voting_key: Option<Sensitive<String>>,
pub fee_key_file: Option<String>,
pub fee_key: Option<Sensitive<String>>,
pub dht_fallback_url: Option<Url>,
#[serde(default)]
pub automatic_reactivate: bool,
}
Expand Down
118 changes: 118 additions & 0 deletions lib/src/extras/dht_fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::io;

use http::Uri;
use http_body_util::{BodyExt, Empty};
use hyper::body::Bytes;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use log::error;
use nimiq_keys::Address;
use nimiq_network_libp2p::PeerId;
use serde::Deserialize;
use url::Url;

#[derive(Deserialize)]
struct Fallback {
validators: Vec<FallbackValidator>,
}

#[derive(Deserialize)]
struct FallbackValidator {
address: Address,
peer_id: String,
}

pub struct DhtFallback {
client: Client<HttpsConnector<HttpConnector>, Empty<Bytes>>,
uri: Uri,
}

impl DhtFallback {
fn new_inner(url: Url) -> io::Result<DhtFallback> {
let tls = rustls::ClientConfig::builder()
.with_native_roots()?
.with_no_client_auth();

let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(tls)
.https_or_http()
.enable_http1()
.build();

let client = Client::builder(TokioExecutor::new()).build(https);
let uri = url.as_str().parse().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, format!("invalid URI: {}", url))
})?;
Ok(DhtFallback { client, uri })
}
pub fn new(url: Url) -> Option<DhtFallback> {
DhtFallback::new_inner(url)
.inspect_err(|error| error!(%error, "couldn't create http client"))
.ok()
}

async fn resolve_inner(&self, validator_address: Address) -> Result<Option<PeerId>, String> {
let response = self
.client
.get(self.uri.clone())
.await
.map_err(|error| error.to_string())?;

if !response.status().is_success() {
return Err(format!("bad http response: {}", response.status()));
}

let response = response
.into_body()
.collect()
.await
.map_err(|error| error.to_string())?
.to_bytes();

let fallback: Fallback =
serde_json::from_slice(&response).map_err(|error| format!("invalid JSON: {error}"))?;

for validator in fallback.validators {
if validator.address == validator_address {
return Ok(Some(validator.peer_id.parse().map_err(|_| {
format!("invalid peer ID: {:?}", validator.peer_id)
})?));
}
}

Ok(None)
}
pub async fn resolve(&self, validator_address: Address) -> Option<PeerId> {
self.resolve_inner(validator_address.clone())
.await
.inspect_err(|error| error!(%error, %validator_address, "couldn't resolve"))
.ok()
.flatten()
}
}

#[cfg(test)]
mod test {
use url::Url;

use super::DhtFallback;

#[tokio::test]
async fn resolve() {
assert_eq!(
DhtFallback::new(Url::parse("https://gist.githubusercontent.com/hrxi/50dc18caa17826e72cc05542cfe8946f/raw/dht.json").unwrap())
.unwrap()
.resolve(
"NQ36 U0BH 0BHM J0EH UAE5 FMV6 D2EY 8TBP 50M3"
.parse()
.unwrap()
)
.await,
Some(
"12D3KooW9tKu6QesTCCqADjhVSf4hvWinzjuLpxv4mefBzKLkiae".parse().unwrap())
);
}
}
2 changes: 2 additions & 0 deletions lib/src/extras/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(feature = "deadlock")]
pub mod deadlock;
#[cfg(feature = "dht-fallback")]
pub mod dht_fallback;
#[cfg(feature = "launcher")]
pub mod launcher;
#[cfg(feature = "logging")]
Expand Down
4 changes: 4 additions & 0 deletions validator-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["slots"] }
nimiq-serde = { workspace = true }
nimiq-utils = { workspace = true, features = ["futures", "spawn", "tagged-signing"] }
url = "2.5.3"

[dev-dependencies]
tokio = { version = "1.41", features = ["macros"] }
36 changes: 33 additions & 3 deletions validator-network/src/network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc};
use std::{collections::BTreeMap, error::Error, fmt::Debug, future, sync::Arc};

use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt, TryFutureExt};
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt, TryFutureExt};
use log::warn;
use nimiq_keys::{Address, KeyPair};
use nimiq_network_interface::{
Expand Down Expand Up @@ -51,6 +51,9 @@ impl<TPeerId: Clone> CacheState<TPeerId> {
}
}

pub type DhtFallback<N> =
dyn Fn(Address) -> BoxFuture<'static, Option<<N as Network>::PeerId>> + Send + Sync;

/// Validator Network implementation
pub struct ValidatorNetworkImpl<N>
where
Expand All @@ -65,6 +68,7 @@ where
validators: Arc<RwLock<Option<Validators>>>,
/// Cache for mapping validator public keys to peer IDs
validator_peer_id_cache: Arc<RwLock<BTreeMap<Address, CacheState<N::PeerId>>>>,
dht_fallback: Arc<DhtFallback<N>>,
}

impl<N> ValidatorNetworkImpl<N>
Expand All @@ -74,11 +78,16 @@ where
N::Error: Sync + Send,
{
pub fn new(network: Arc<N>) -> Self {
Self::new_with_fallback(network, Arc::new(|_| future::ready(None).boxed()))
}

pub fn new_with_fallback(network: Arc<N>, dht_fallback: Arc<DhtFallback<N>>) -> Self {
Self {
network,
own_validator_id: Arc::new(RwLock::new(None)),
validators: Arc::new(RwLock::new(None)),
validator_peer_id_cache: Arc::new(RwLock::new(BTreeMap::new())),
dht_fallback,
}
}

Expand All @@ -89,6 +98,7 @@ where
own_validator_id: Arc::clone(&self.own_validator_id),
validators: Arc::clone(&self.validators),
validator_peer_id_cache: Arc::clone(&self.validator_peer_id_cache),
dht_fallback: Arc::clone(&self.dht_fallback),
}
}

Expand All @@ -111,6 +121,20 @@ where
async fn resolve_peer_id(
network: &N,
validator_address: &Address,
fallback: Arc<DhtFallback<N>>,
) -> Result<Option<N::PeerId>, NetworkError<N::Error>> {
let result = Self::resolve_peer_id_dht(network, validator_address).await;
if !matches!(result, Ok(Some(_))) {
if let Some(peer_id) = fallback(validator_address.clone()).await {
return Ok(Some(peer_id));
}
}
result
}

async fn resolve_peer_id_dht(
network: &N,
validator_address: &Address,
) -> Result<Option<N::PeerId>, NetworkError<N::Error>> {
if let Some(record) = network
.dht_get::<_, ValidatorRecord<N::PeerId>, KeyPair>(validator_address)
Expand All @@ -130,7 +154,13 @@ where
///
/// The given `validator_id` is used for logging purposes only.
async fn update_peer_id_cache(&self, validator_id: u16, validator_address: &Address) {
let cache_value = match Self::resolve_peer_id(&self.network, validator_address).await {
let cache_value = match Self::resolve_peer_id(
&self.network,
validator_address,
Arc::clone(&self.dht_fallback),
)
.await
{
Ok(Some(peer_id)) => {
log::trace!(
%peer_id,
Expand Down

0 comments on commit 22f01a7

Please sign in to comment.