Skip to content

Commit

Permalink
Merge branch 'main' into hotshot/0.5.82
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinI authored Dec 4, 2024
2 parents cc39a1f + abf9a9b commit b7e7e65
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 18 deletions.
6 changes: 5 additions & 1 deletion sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,11 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
genesis_state.prefund_account(address, amount);
}

let l1_client = l1_params.options.connect(l1_params.url).await?;
let l1_client = l1_params
.options
.with_metrics(metrics)
.connect(l1_params.url)
.await?;
l1_client.spawn_tasks().await;
let l1_genesis = match genesis.l1_finalized {
L1Finalized::Block(b) => b,
Expand Down
87 changes: 73 additions & 14 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::{
future::Future,
stream::{self, StreamExt},
};
use hotshot_types::traits::metrics::Metrics;
use lru::LruCache;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
Expand All @@ -29,7 +30,7 @@ use tokio::{
use tracing::Instrument;
use url::Url;

use super::{L1BlockInfo, L1State, L1UpdateTask, RpcClient};
use super::{L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask, RpcClient};
use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1ReconnectTask, L1Snapshot};

impl PartialOrd for L1BlockInfo {
Expand Down Expand Up @@ -79,16 +80,24 @@ impl L1BlockInfo {
}

impl RpcClient {
fn http(url: Url) -> Self {
Self::Http(Http::new(url))
fn http(url: Url, metrics: Arc<L1ClientMetrics>) -> Self {
Self::Http {
conn: Http::new(url),
metrics,
}
}

async fn ws(url: Url, retry_delay: Duration) -> anyhow::Result<Self> {
async fn ws(
url: Url,
metrics: Arc<L1ClientMetrics>,
retry_delay: Duration,
) -> anyhow::Result<Self> {
Ok(Self::Ws {
conn: Arc::new(RwLock::new(Ws::connect(url.clone()).await?)),
reconnect: Default::default(),
retry_delay,
url,
metrics,
})
}

Expand All @@ -97,6 +106,13 @@ impl RpcClient {
*reconnect.lock().await = L1ReconnectTask::Cancelled;
}
}

fn metrics(&self) -> &Arc<L1ClientMetrics> {
match self {
Self::Http { metrics, .. } => metrics,
Self::Ws { metrics, .. } => metrics,
}
}
}

#[async_trait]
Expand All @@ -109,12 +125,16 @@ impl JsonRpcClient for RpcClient {
R: DeserializeOwned + Send,
{
let res = match self {
Self::Http(client) => client.request(method, params).await?,
Self::Http { conn, .. } => conn
.request(method, params)
.await
.inspect_err(|err| tracing::warn!(method, "L1 RPC error: {err:#}"))?,
Self::Ws {
conn,
reconnect,
url,
retry_delay,
metrics,
} => {
let conn_guard = conn
.try_read()
Expand All @@ -131,6 +151,7 @@ impl JsonRpcClient for RpcClient {
if let Ok(mut reconnect_guard) = reconnect.try_lock() {
if matches!(*reconnect_guard, L1ReconnectTask::Idle) {
// No one is currently resetting this connection, so it's up to us.
metrics.ws_reconnects.add(1);
let conn = conn.clone();
let reconnect = reconnect.clone();
let url = url.clone();
Expand Down Expand Up @@ -174,7 +195,10 @@ impl JsonRpcClient for RpcClient {
}
Err(err)?
}
Err(err) => Err(err)?,
Err(err) => {
tracing::warn!(method, "L1 RPC error: {err:#}");
Err(err)?
}
}
}
};
Expand All @@ -190,7 +214,7 @@ impl PubsubClient for RpcClient {
T: Into<U256>,
{
match self {
Self::Http(_) => Err(ProviderError::CustomError(
Self::Http { .. } => Err(ProviderError::CustomError(
"subscriptions not supported with HTTP client".into(),
)),
Self::Ws { conn, .. } => Ok(conn
Expand All @@ -210,7 +234,7 @@ impl PubsubClient for RpcClient {
T: Into<U256>,
{
match self {
Self::Http(_) => Err(ProviderError::CustomError(
Self::Http { .. } => Err(ProviderError::CustomError(
"subscriptions not supported with HTTP client".into(),
)),
Self::Ws { conn, .. } => Ok(conn
Expand Down Expand Up @@ -250,6 +274,12 @@ impl Default for L1ClientOptions {
}

impl L1ClientOptions {
/// Use the given metrics collector to publish metrics related to the L1 client.
pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self {
self.metrics = Arc::new(metrics.subgroup("l1".into()));
self
}

/// Instantiate an `L1Client` for a given `Url`.
///
/// The type of the JSON-RPC client is inferred from the scheme of the URL. Supported schemes
Expand All @@ -266,19 +296,36 @@ impl L1ClientOptions {
///
/// `url` must have a scheme `http` or `https`.
pub fn http(self, url: Url) -> L1Client {
L1Client::with_provider(self, Provider::new(RpcClient::http(url)))
let metrics = self.create_metrics();
L1Client::with_provider(self, Provider::new(RpcClient::http(url, metrics)))
}

/// Construct a new WebSockets client.
///
/// `url` must have a scheme `ws` or `wss`.
pub async fn ws(self, url: Url) -> anyhow::Result<L1Client> {
let metrics = self.create_metrics();
let retry_delay = self.l1_retry_delay;
Ok(L1Client::with_provider(
self,
Provider::new(RpcClient::ws(url, retry_delay).await?),
Provider::new(RpcClient::ws(url, metrics, retry_delay).await?),
))
}

fn create_metrics(&self) -> Arc<L1ClientMetrics> {
Arc::new(L1ClientMetrics::new(&**self.metrics))
}
}

impl L1ClientMetrics {
fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
Self {
head: metrics.create_gauge("head".into(), None),
finalized: metrics.create_gauge("finalized".into(), None),
ws_reconnects: metrics.create_counter("ws_reconnects".into(), None),
stream_reconnects: metrics.create_counter("stream_reconnects".into(), None),
}
}
}

impl L1Client {
Expand Down Expand Up @@ -346,6 +393,7 @@ impl L1Client {
let retry_delay = self.retry_delay;
let state = self.state.clone();
let sender = self.sender.clone();
let metrics = (*rpc).as_ref().metrics().clone();

let span = tracing::warn_span!("L1 client update");
async move {
Expand All @@ -354,7 +402,7 @@ impl L1Client {
let mut block_stream = loop {
let res = match (*rpc).as_ref() {
RpcClient::Ws { .. } => rpc.subscribe_blocks().await.map(StreamExt::boxed),
RpcClient::Http(_) => rpc
RpcClient::Http { .. } => rpc
.watch_blocks()
.await
.map(|stream| {
Expand Down Expand Up @@ -427,6 +475,7 @@ impl L1Client {
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
metrics.head.set(head as usize);
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
Expand All @@ -441,6 +490,9 @@ impl L1Client {
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
if let Some(finalized) = finalized {
metrics.finalized.set(finalized.number as usize);
}
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
Expand All @@ -463,6 +515,8 @@ impl L1Client {
}
}
}

metrics.stream_reconnects.add(1);
}
}.instrument(span)
}
Expand Down Expand Up @@ -787,6 +841,7 @@ mod test {
prelude::{LocalWallet, Signer, SignerMiddleware, H160, U64},
utils::{hex, parse_ether, Anvil, AnvilInstance},
};
use hotshot_types::traits::metrics::NoMetrics;
use portpicker::pick_unused_port;
use sequencer_utils::test_utils::setup_test;
use std::time::Duration;
Expand Down Expand Up @@ -1088,9 +1143,13 @@ mod test {
let port = pick_unused_port().unwrap();
let mut anvil = Anvil::new().block_time(1u32).port(port).spawn();
let provider = Provider::new(
RpcClient::ws(anvil.ws_endpoint().parse().unwrap(), Duration::from_secs(1))
.await
.unwrap(),
RpcClient::ws(
anvil.ws_endpoint().parse().unwrap(),
Arc::new(L1ClientMetrics::new(&NoMetrics)),
Duration::from_secs(1),
)
.await
.unwrap(),
);

// Check the provider is working.
Expand Down
4 changes: 3 additions & 1 deletion types/src/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ reexport_unchanged_types!(
ViewBasedUpgrade,
BlockSize,
);
pub(crate) use v0_3::{L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient};
pub(crate) use v0_3::{
L1ClientMetrics, L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient,
};

#[derive(
Clone, Copy, Debug, Default, Hash, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize,
Expand Down
18 changes: 17 additions & 1 deletion types/src/v0/v0_1/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ethers::{
prelude::{H256, U256},
providers::{Http, Provider, Ws},
};
use hotshot_types::traits::metrics::{Counter, Gauge, Metrics, NoMetrics};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
Expand Down Expand Up @@ -86,6 +87,9 @@ pub struct L1ClientOptions {
default_value = "10000"
)]
pub l1_events_max_block_range: u64,

#[clap(skip = Arc::<Box<dyn Metrics>>::new(Box::new(NoMetrics)))]
pub metrics: Arc<Box<dyn Metrics>>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -115,12 +119,16 @@ pub struct L1Client {
/// An Ethereum RPC client over HTTP or WebSockets.
#[derive(Clone, Debug)]
pub(crate) enum RpcClient {
Http(Http),
Http {
conn: Http,
metrics: Arc<L1ClientMetrics>,
},
Ws {
conn: Arc<RwLock<Ws>>,
reconnect: Arc<Mutex<L1ReconnectTask>>,
url: Url,
retry_delay: Duration,
metrics: Arc<L1ClientMetrics>,
},
}

Expand All @@ -147,3 +155,11 @@ pub(crate) enum L1ReconnectTask {
Idle,
Cancelled,
}

#[derive(Debug)]
pub(crate) struct L1ClientMetrics {
pub(crate) head: Box<dyn Gauge>,
pub(crate) finalized: Box<dyn Gauge>,
pub(crate) ws_reconnects: Box<dyn Counter>,
pub(crate) stream_reconnects: Box<dyn Counter>,
}
4 changes: 3 additions & 1 deletion types/src/v0/v0_3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub use super::v0_1::{
UpgradeType, ViewBasedUpgrade, BLOCK_MERKLE_TREE_HEIGHT, FEE_MERKLE_TREE_HEIGHT,
NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN,
};
pub(crate) use super::v0_1::{L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient};
pub(crate) use super::v0_1::{
L1ClientMetrics, L1Event, L1ReconnectTask, L1State, L1UpdateTask, RpcClient,
};

pub const VERSION: Version = Version { major: 0, minor: 3 };

Expand Down

0 comments on commit b7e7e65

Please sign in to comment.