diff --git a/types/src/v0/impls/l1.rs b/types/src/v0/impls/l1.rs index 51137ae05..b5c5fe8b6 100644 --- a/types/src/v0/impls/l1.rs +++ b/types/src/v0/impls/l1.rs @@ -394,59 +394,75 @@ impl L1Client { }; tracing::info!("established L1 block stream"); - while let Some(head) = block_stream.next().await { - let Some(head) = head.number else { - // This shouldn't happen, but if it does, it means the block stream has - // erroneously given us a pending block. We are only interested in committed - // blocks, so just skip this one. - tracing::warn!("got block from L1 block stream with no number"); - continue; - }; - let head = head.as_u64(); - tracing::debug!(head, "received L1 block"); - - // A new block has been produced. This happens fairly rarely, so it is now ok to - // poll to see if a new block has been finalized. - let finalized = loop { - match get_finalized_block(&rpc).await { - Ok(finalized) => break finalized, - Err(err) => { - tracing::warn!("error getting finalized block: {err:#}"); - sleep(retry_delay).await; + loop { + // Wait for a block, timing out if we don't get one within 60 seconds + let block_timeout = tokio::time::timeout(Duration::from_secs(60), block_stream.next()).await; + + match block_timeout { + // We got a block + Ok(Some(head)) => { + let Some(head_number) = head.number else { + // This shouldn't happen, but if it does, it means the block stream has + // erroneously given us a pending block. We are only interested in committed + // blocks, so just skip this one. + tracing::warn!("got block from L1 block stream with no number"); + continue; + }; + let head = head_number.as_u64(); + tracing::debug!(head, "received L1 block"); + + // A new block has been produced. This happens fairly rarely, so it is now ok to + // poll to see if a new block has been finalized. + let finalized = loop { + match get_finalized_block(&rpc).await { + Ok(finalized) => break finalized, + Err(err) => { + tracing::warn!("error getting finalized block: {err:#}"); + sleep(retry_delay).await; + } + } + }; + + // Update the state snapshot; + let mut state = state.lock().await; + if head > state.snapshot.head { + tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); + state.snapshot.head = head; + // Emit an event about the new L1 head. Ignore send errors; it just means no + // one is listening to events right now. + sender + .broadcast_direct(L1Event::NewHead { head }) + .await + .ok(); } + if finalized > state.snapshot.finalized { + tracing::info!( + ?finalized, + old_finalized = ?state.snapshot.finalized, + "L1 finalized updated", + ); + state.snapshot.finalized = finalized; + if let Some(finalized) = finalized { + sender + .broadcast_direct(L1Event::NewFinalized { finalized }) + .await + .ok(); + } + } + tracing::debug!("updated L1 snapshot to {:?}", state.snapshot); } - }; - - // Update the state snapshot; - let mut state = state.lock().await; - if head > state.snapshot.head { - tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); - state.snapshot.head = head; - // Emit an event about the new L1 head. Ignore send errors; it just means no - // one is listening to events right now. - sender - .broadcast_direct(L1Event::NewHead { head }) - .await - .ok(); - } - if finalized > state.snapshot.finalized { - tracing::info!( - ?finalized, - old_finalized = ?state.snapshot.finalized, - "L1 finalized updated", - ); - state.snapshot.finalized = finalized; - if let Some(finalized) = finalized { - sender - .broadcast_direct(L1Event::NewFinalized { finalized }) - .await - .ok(); + // The stream ended + Ok(None) => { + tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream"); + break; + } + // We timed out waiting for a block + Err(_) => { + tracing::error!("No block received for 60 seconds, trying to re-establish block stream"); + break; } } - tracing::debug!("updated L1 snapshot to {:?}", state.snapshot); } - - tracing::error!("L1 block stream ended unexpectedly, trying to re-establish"); } }.instrument(span) } @@ -525,13 +541,13 @@ impl L1Client { continue; }; if finalized.number >= number { - tracing::info!(number, ?finalized, "got finalized block"); + tracing::info!(number, ?finalized, "got finalized L! block"); return self .get_finalized_block(self.state.lock().await, number) .await .1; } - tracing::debug!(number, ?finalized, "waiting for L1 finalized block"); + tracing::debug!(number, ?finalized, "waiting for finalized L1 block"); } // This should not happen: the event stream ended. All we can do is try again.