Skip to content

Commit

Permalink
L1 timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Nov 27, 2024
1 parent 51ea888 commit bbfbb8f
Showing 1 changed file with 66 additions and 50 deletions.
116 changes: 66 additions & 50 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,59 +394,75 @@ impl L1Client {
};

tracing::info!("established L1 block stream");
while let Some(head) = block_stream.next().await {
let Some(head) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
loop {
// Wait for a block, timing out if we don't get one within 60 seconds
let block_timeout = tokio::time::timeout(Duration::from_secs(60), block_stream.next()).await;

match block_timeout {
// We got a block
Ok(Some(head)) => {
let Some(head_number) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head_number.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
}
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
// The stream ended
Ok(None) => {
tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
break;
}
// We timed out waiting for a block
Err(_) => {
tracing::error!("No block received for 60 seconds, trying to re-establish block stream");
break;
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}

tracing::error!("L1 block stream ended unexpectedly, trying to re-establish");
}
}.instrument(span)
}
Expand Down Expand Up @@ -525,13 +541,13 @@ impl L1Client {
continue;
};
if finalized.number >= number {
tracing::info!(number, ?finalized, "got finalized block");
tracing::info!(number, ?finalized, "got finalized L! block");
return self
.get_finalized_block(self.state.lock().await, number)
.await
.1;
}
tracing::debug!(number, ?finalized, "waiting for L1 finalized block");
tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
}

// This should not happen: the event stream ended. All we can do is try again.
Expand Down

0 comments on commit bbfbb8f

Please sign in to comment.