Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout L1 provider #2340

Merged
merged 6 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
)
.with_context(|| "Failed to derive Libp2p peer ID")?;

// Print the libp2p public key
info!("Starting Libp2p with PeerID: {}", libp2p_public_key);

let persistence = persistence_opt.clone().create().await?;
let (mut network_config, wait_for_orchestrator) = match (
persistence.load_config().await?,
Expand Down
116 changes: 66 additions & 50 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,59 +394,75 @@ impl L1Client {
};

tracing::info!("established L1 block stream");
while let Some(head) = block_stream.next().await {
let Some(head) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
loop {
// Wait for a block, timing out if we don't get one within 60 seconds
let block_timeout = tokio::time::timeout(Duration::from_secs(60), block_stream.next()).await;

match block_timeout {
// We got a block
Ok(Some(head)) => {
let Some(head_number) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head_number.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
}
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
// The stream ended
Ok(None) => {
tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
break;
}
// We timed out waiting for a block
Err(_) => {
tracing::error!("No block received for 60 seconds, trying to re-establish block stream");
break;
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}

tracing::error!("L1 block stream ended unexpectedly, trying to re-establish");
}
}.instrument(span)
}
Expand Down Expand Up @@ -525,13 +541,13 @@ impl L1Client {
continue;
};
if finalized.number >= number {
tracing::info!(number, ?finalized, "got finalized block");
tracing::info!(number, ?finalized, "got finalized L1 block");
return self
.get_finalized_block(self.state.lock().await, number)
.await
.1;
}
tracing::debug!(number, ?finalized, "waiting for L1 finalized block");
tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
}

// This should not happen: the event stream ended. All we can do is try again.
Expand Down