Skip to content

Commit

Permalink
Improve substreams error handling (#5160)
Browse files Browse the repository at this point in the history
  • Loading branch information
Filipe Azevedo authored Feb 1, 2024
1 parent 6067090 commit 4641d04
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 66 deletions.
2 changes: 1 addition & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
mapper,
package.modules.clone(),
NEAR_FILTER_MODULE_NAME.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{format_err, Context, Error};
use graph::blockchain::block_stream::BlockStreamEvent;
use graph::blockchain::block_stream::{BlockStreamEvent, FirehoseCursor};
use graph::blockchain::client::ChainClient;
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
use graph::endpoint::EndpointMetrics;
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn main() -> Result<(), Error> {
DeploymentHash::new("substreams".to_string()).unwrap(),
client,
None,
None,
FirehoseCursor::None,
Arc::new(Mapper {
schema: None,
skip_empty_blocks: false,
Expand Down
34 changes: 27 additions & 7 deletions chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration};

use crate::mapper::Mapper;
use anyhow::{Context, Error};
use graph::blockchain::block_stream::{BlockStreamError, FirehoseCursor};
use graph::blockchain::{
client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor,
};
Expand Down Expand Up @@ -65,11 +66,12 @@ impl SubstreamsBlockIngestor {
/// Consumes the incoming stream of blocks infinitely until it hits an error. In which case
/// the error is logged right away and the latest available cursor is returned
/// upstream for future consumption.
/// If an error is returned it indicates a fatal/deterministic error which should not be retried.
async fn process_blocks(
&self,
cursor: String,
cursor: FirehoseCursor,
mut stream: SubstreamsBlockStream<super::Chain>,
) -> String {
) -> Result<FirehoseCursor, BlockStreamError> {
let mut latest_cursor = cursor;

while let Some(message) = stream.next().await {
Expand All @@ -90,6 +92,9 @@ impl SubstreamsBlockIngestor {
trace!(self.logger, "Received undo block to ingest, skipping");
continue;
}
Err(e) if e.is_deterministic() => {
return Err(e);
}
Err(e) => {
info!(
self.logger,
Expand All @@ -105,14 +110,15 @@ impl SubstreamsBlockIngestor {
break;
}

latest_cursor = cursor.to_string()
latest_cursor = cursor
}

error!(
self.logger,
"Stream blocks complete unexpectedly, expecting stream to always stream blocks"
);
latest_cursor

Ok(latest_cursor)
}

async fn process_new_block(
Expand All @@ -139,7 +145,7 @@ impl BlockIngestor for SubstreamsBlockIngestor {
schema: None,
skip_empty_blocks: false,
});
let mut latest_cursor = self.fetch_head_cursor().await;
let mut latest_cursor = FirehoseCursor::from(self.fetch_head_cursor().await);
let mut backoff =
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
let package = Package::decode(SUBSTREAMS_HEAD_TRACKER_BYTES.to_vec().as_ref()).unwrap();
Expand All @@ -149,7 +155,7 @@ impl BlockIngestor for SubstreamsBlockIngestor {
DeploymentHash::default(),
self.client.cheap_clone(),
None,
Some(latest_cursor.clone()),
latest_cursor.clone(),
mapper.cheap_clone(),
package.modules.clone(),
"map_blocks".to_string(),
Expand All @@ -160,7 +166,21 @@ impl BlockIngestor for SubstreamsBlockIngestor {
);

// Consume the stream of blocks until an error is hit
latest_cursor = self.process_blocks(latest_cursor, stream).await;
// If the error is retryable it will print the error and return the cursor
// therefore if we get an error here it has to be a fatal error.
// This is a bit brittle and should probably be improved at some point.
let res = self.process_blocks(latest_cursor, stream).await;
match res {
Ok(cursor) => latest_cursor = cursor,
Err(BlockStreamError::Fatal(e)) => {
error!(
self.logger,
"fatal error while ingesting substream blocks: {}", e
);
return;
}
_ => unreachable!("Nobody should ever see this error message, something is wrong"),
}

// If we reach this point, we must wait a bit before retrying
backoff.sleep_async().await;
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
Arc::new(WasmBlockMapper {
handler: handler.clone(),
}),
Expand All @@ -69,7 +69,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
Arc::new(Mapper {
schema: Some(schema),
skip_empty_blocks: true,
Expand Down
39 changes: 34 additions & 5 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::state::IndexingState;
use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::block_stream::{
BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor,
};
use graph::blockchain::{Block, BlockTime, Blockchain, DataSource as _, TriggerFilter as _};
use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource};
use graph::components::{
Expand Down Expand Up @@ -206,7 +208,7 @@ where
&self.metrics.subgraph,
)
.await?
.map_err(CancelableError::Error)
.map_err(CancelableError::from)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));

// Keep the stream's cancel guard around to be able to shut it down when the subgraph
Expand Down Expand Up @@ -910,7 +912,7 @@ where
{
async fn handle_stream_event(
&mut self,
event: Option<Result<BlockStreamEvent<C>, CancelableError<Error>>>,
event: Option<Result<BlockStreamEvent<C>, CancelableError<BlockStreamError>>>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error> {
let action = match event {
Expand Down Expand Up @@ -1087,7 +1089,7 @@ trait StreamEventHandler<C: Blockchain> {
) -> Result<Action, Error>;
async fn handle_err(
&mut self,
err: CancelableError<Error>,
err: CancelableError<BlockStreamError>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error>;
fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool;
Expand Down Expand Up @@ -1399,14 +1401,41 @@ where

async fn handle_err(
&mut self,
err: CancelableError<Error>,
err: CancelableError<BlockStreamError>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error> {
if cancel_handle.is_canceled() {
debug!(&self.logger, "Subgraph block stream shut down cleanly");
return Ok(Action::Stop);
}

let err = match err {
CancelableError::Error(BlockStreamError::Fatal(msg)) => {
error!(
&self.logger,
"The block stream encountered a substreams fatal error and will not retry: {}",
msg
);

// If substreams returns a deterministic error we may not necessarily have a specific block
// but we should not retry since it will keep failing.
self.inputs
.store
.fail_subgraph(SubgraphError {
subgraph_id: self.inputs.deployment.hash.clone(),
message: msg,
block_ptr: None,
handler: None,
deterministic: true,
})
.await
.context("Failed to set subgraph status to `failed`")?;

return Ok(Action::Stop);
}
e => e,
};

debug!(
&self.logger,
"Block stream produced a non-fatal error";
Expand Down
33 changes: 24 additions & 9 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ pub const FIREHOSE_BUFFER_STREAM_SIZE: usize = 1;
pub const SUBSTREAMS_BUFFER_STREAM_SIZE: usize = 100;

pub struct BufferedBlockStream<C: Blockchain> {
inner: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, Error>> + Send>>,
inner: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Send>>,
}

impl<C: Blockchain + 'static> BufferedBlockStream<C> {
pub fn spawn_from_stream(
size_hint: usize,
stream: Box<dyn BlockStream<C>>,
) -> Box<dyn BlockStream<C>> {
let (sender, receiver) = mpsc::channel::<Result<BlockStreamEvent<C>, Error>>(size_hint);
let (sender, receiver) =
mpsc::channel::<Result<BlockStreamEvent<C>, BlockStreamError>>(size_hint);
crate::spawn(async move { BufferedBlockStream::stream_blocks(stream, sender).await });

Box::new(BufferedBlockStream::new(receiver))
}

pub fn new(mut receiver: Receiver<Result<BlockStreamEvent<C>, Error>>) -> Self {
pub fn new(mut receiver: Receiver<Result<BlockStreamEvent<C>, BlockStreamError>>) -> Self {
let inner = stream! {
loop {
let event = match receiver.recv().await {
Expand All @@ -59,7 +60,7 @@ impl<C: Blockchain + 'static> BufferedBlockStream<C> {

pub async fn stream_blocks(
mut stream: Box<dyn BlockStream<C>>,
sender: Sender<Result<BlockStreamEvent<C>, Error>>,
sender: Sender<Result<BlockStreamEvent<C>, BlockStreamError>>,
) -> Result<(), Error> {
while let Some(event) = stream.next().await {
match sender.send(event).await {
Expand All @@ -84,7 +85,7 @@ impl<C: Blockchain> BlockStream<C> for BufferedBlockStream<C> {
}

impl<C: Blockchain> Stream for BufferedBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand All @@ -95,7 +96,7 @@ impl<C: Blockchain> Stream for BufferedBlockStream<C> {
}

pub trait BlockStream<C: Blockchain>:
Stream<Item = Result<BlockStreamEvent<C>, Error>> + Unpin + Send
Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Unpin + Send
{
fn buffer_size_hint(&self) -> usize;
}
Expand Down Expand Up @@ -482,6 +483,20 @@ pub enum SubstreamsError {
UnexpectedStoreDeltaOutput,
}

#[derive(Debug, Error)]
pub enum BlockStreamError {
#[error("block stream error")]
Unknown(#[from] anyhow::Error),
#[error("block stream fatal error")]
Fatal(String),
}

impl BlockStreamError {
pub fn is_deterministic(&self) -> bool {
matches!(self, Self::Fatal(_))
}
}

#[derive(Debug)]
pub enum BlockStreamEvent<C: Blockchain> {
// The payload is the block the subgraph should revert to, so it becomes the new subgraph head.
Expand Down Expand Up @@ -576,7 +591,6 @@ pub trait ChainHeadUpdateListener: Send + Sync + 'static {
mod test {
use std::{collections::HashSet, task::Poll};

use anyhow::Error;
use futures03::{Stream, StreamExt, TryStreamExt};

use crate::{
Expand All @@ -585,7 +599,8 @@ mod test {
};

use super::{
BlockStream, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream, FirehoseCursor,
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream,
FirehoseCursor,
};

#[derive(Debug)]
Expand All @@ -600,7 +615,7 @@ mod test {
}

impl Stream for TestStream {
type Item = Result<BlockStreamEvent<MockBlockchain>, Error>;
type Item = Result<BlockStreamEvent<MockBlockchain>, BlockStreamError>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::block_stream::{
BlockStream, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE,
BlockStream, BlockStreamError, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE,
};
use super::client::ChainClient;
use super::Blockchain;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl FirehoseBlockStreamMetrics {
}

pub struct FirehoseBlockStream<C: Blockchain> {
stream: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, Error>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Send>>,
}

impl<C> FirehoseBlockStream<C>
Expand Down Expand Up @@ -156,7 +156,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
subgraph_current_block: Option<BlockPtr>,
logger: Logger,
metrics: FirehoseBlockStreamMetrics,
) -> impl Stream<Item = Result<BlockStreamEvent<C>, Error>> {
) -> impl Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> {
let mut subgraph_current_block = subgraph_current_block;
let mut start_block_num = subgraph_current_block
.as_ref()
Expand Down Expand Up @@ -406,7 +406,7 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
}

impl<C: Blockchain> Stream for FirehoseBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Expand Down
12 changes: 6 additions & 6 deletions graph/src/blockchain/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::task::{Context, Poll};
use std::time::Duration;

use super::block_stream::{
BlockStream, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, FirehoseCursor,
TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE,
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream,
FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE,
};
use super::{Block, BlockPtr, Blockchain};

Expand Down Expand Up @@ -470,7 +470,7 @@ impl<C: Blockchain> BlockStream<C> for PollingBlockStream<C> {
}

impl<C: Blockchain> Stream for PollingBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = loop {
Expand Down Expand Up @@ -599,8 +599,8 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
// Chain head update stream ended
Poll::Ready(None) => {
// Should not happen
return Poll::Ready(Some(Err(anyhow::anyhow!(
"chain head update stream ended unexpectedly"
return Poll::Ready(Some(Err(BlockStreamError::from(
anyhow::anyhow!("chain head update stream ended unexpectedly"),
))));
}

Expand All @@ -610,6 +610,6 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
}
};

result
result.map_err(BlockStreamError::from)
}
}
Loading

0 comments on commit 4641d04

Please sign in to comment.