From 4641d041916fb73ee4b1b82df5158198aa8e65ae Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Thu, 1 Feb 2024 16:07:54 +0000 Subject: [PATCH] Improve substreams error handling (#5160) --- chain/near/src/chain.rs | 2 +- chain/substreams/examples/substreams.rs | 4 +- chain/substreams/src/block_ingestor.rs | 34 +++++++++++--- chain/substreams/src/block_stream.rs | 4 +- core/src/subgraph/runner.rs | 39 +++++++++++++-- graph/src/blockchain/block_stream.rs | 33 +++++++++---- graph/src/blockchain/firehose_block_stream.rs | 8 ++-- graph/src/blockchain/polling_block_stream.rs | 12 ++--- .../src/blockchain/substreams_block_stream.rs | 47 ++++++++++--------- graph/src/ext/futures.rs | 7 +++ store/postgres/src/deployment.rs | 5 +- tests/src/fixture/mod.rs | 10 ++-- 12 files changed, 139 insertions(+), 66 deletions(-) diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index e8925c89366..88fe69fcf6e 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -95,7 +95,7 @@ impl BlockStreamBuilder for NearStreamBuilder { deployment.hash, chain.chain_client(), subgraph_current_block, - block_cursor.as_ref().clone(), + block_cursor.clone(), mapper, package.modules.clone(), NEAR_FILTER_MODULE_NAME.to_string(), diff --git a/chain/substreams/examples/substreams.rs b/chain/substreams/examples/substreams.rs index fbf245856ac..b60f990437a 100644 --- a/chain/substreams/examples/substreams.rs +++ b/chain/substreams/examples/substreams.rs @@ -1,5 +1,5 @@ use anyhow::{format_err, Context, Error}; -use graph::blockchain::block_stream::BlockStreamEvent; +use graph::blockchain::block_stream::{BlockStreamEvent, FirehoseCursor}; use graph::blockchain::client::ChainClient; use graph::blockchain::substreams_block_stream::SubstreamsBlockStream; use graph::endpoint::EndpointMetrics; @@ -67,7 +67,7 @@ async fn main() -> Result<(), Error> { DeploymentHash::new("substreams".to_string()).unwrap(), client, None, - None, + FirehoseCursor::None, Arc::new(Mapper { schema: None, skip_empty_blocks: false, diff --git a/chain/substreams/src/block_ingestor.rs b/chain/substreams/src/block_ingestor.rs index 520e98dd461..21565f6ce2d 100644 --- a/chain/substreams/src/block_ingestor.rs +++ b/chain/substreams/src/block_ingestor.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use crate::mapper::Mapper; use anyhow::{Context, Error}; +use graph::blockchain::block_stream::{BlockStreamError, FirehoseCursor}; use graph::blockchain::{ client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor, }; @@ -65,11 +66,12 @@ impl SubstreamsBlockIngestor { /// Consumes the incoming stream of blocks infinitely until it hits an error. In which case /// the error is logged right away and the latest available cursor is returned /// upstream for future consumption. + /// If an error is returned it indicates a fatal/deterministic error which should not be retried. async fn process_blocks( &self, - cursor: String, + cursor: FirehoseCursor, mut stream: SubstreamsBlockStream, - ) -> String { + ) -> Result { let mut latest_cursor = cursor; while let Some(message) = stream.next().await { @@ -90,6 +92,9 @@ impl SubstreamsBlockIngestor { trace!(self.logger, "Received undo block to ingest, skipping"); continue; } + Err(e) if e.is_deterministic() => { + return Err(e); + } Err(e) => { info!( self.logger, @@ -105,14 +110,15 @@ impl SubstreamsBlockIngestor { break; } - latest_cursor = cursor.to_string() + latest_cursor = cursor } error!( self.logger, "Stream blocks complete unexpectedly, expecting stream to always stream blocks" ); - latest_cursor + + Ok(latest_cursor) } async fn process_new_block( @@ -139,7 +145,7 @@ impl BlockIngestor for SubstreamsBlockIngestor { schema: None, skip_empty_blocks: false, }); - let mut latest_cursor = self.fetch_head_cursor().await; + let mut latest_cursor = FirehoseCursor::from(self.fetch_head_cursor().await); let mut backoff = ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30)); let package = Package::decode(SUBSTREAMS_HEAD_TRACKER_BYTES.to_vec().as_ref()).unwrap(); @@ -149,7 +155,7 @@ impl BlockIngestor for SubstreamsBlockIngestor { DeploymentHash::default(), self.client.cheap_clone(), None, - Some(latest_cursor.clone()), + latest_cursor.clone(), mapper.cheap_clone(), package.modules.clone(), "map_blocks".to_string(), @@ -160,7 +166,21 @@ impl BlockIngestor for SubstreamsBlockIngestor { ); // Consume the stream of blocks until an error is hit - latest_cursor = self.process_blocks(latest_cursor, stream).await; + // If the error is retryable it will print the error and return the cursor + // therefore if we get an error here it has to be a fatal error. + // This is a bit brittle and should probably be improved at some point. + let res = self.process_blocks(latest_cursor, stream).await; + match res { + Ok(cursor) => latest_cursor = cursor, + Err(BlockStreamError::Fatal(e)) => { + error!( + self.logger, + "fatal error while ingesting substream blocks: {}", e + ); + return; + } + _ => unreachable!("Nobody should ever see this error message, something is wrong"), + } // If we reach this point, we must wait a bit before retrying backoff.sleep_async().await; diff --git a/chain/substreams/src/block_stream.rs b/chain/substreams/src/block_stream.rs index 4cdb8fdf895..286996745ac 100644 --- a/chain/substreams/src/block_stream.rs +++ b/chain/substreams/src/block_stream.rs @@ -53,7 +53,7 @@ impl BlockStreamBuilderTrait for BlockStreamBuilder { deployment.hash, chain.chain_client(), subgraph_current_block, - block_cursor.as_ref().clone(), + block_cursor.clone(), Arc::new(WasmBlockMapper { handler: handler.clone(), }), @@ -69,7 +69,7 @@ impl BlockStreamBuilderTrait for BlockStreamBuilder { deployment.hash, chain.chain_client(), subgraph_current_block, - block_cursor.as_ref().clone(), + block_cursor.clone(), Arc::new(Mapper { schema: Some(schema), skip_empty_blocks: true, diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 2bc03f37214..b1ea38193c4 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -4,7 +4,9 @@ use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::state::IndexingState; use crate::subgraph::stream::new_block_stream; use atomic_refcell::AtomicRefCell; -use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor}; +use graph::blockchain::block_stream::{ + BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, +}; use graph::blockchain::{Block, BlockTime, Blockchain, DataSource as _, TriggerFilter as _}; use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource}; use graph::components::{ @@ -206,7 +208,7 @@ where &self.metrics.subgraph, ) .await? - .map_err(CancelableError::Error) + .map_err(CancelableError::from) .cancelable(&block_stream_canceler, || Err(CancelableError::Cancel)); // Keep the stream's cancel guard around to be able to shut it down when the subgraph @@ -910,7 +912,7 @@ where { async fn handle_stream_event( &mut self, - event: Option, CancelableError>>, + event: Option, CancelableError>>, cancel_handle: &CancelHandle, ) -> Result { let action = match event { @@ -1087,7 +1089,7 @@ trait StreamEventHandler { ) -> Result; async fn handle_err( &mut self, - err: CancelableError, + err: CancelableError, cancel_handle: &CancelHandle, ) -> Result; fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool; @@ -1399,7 +1401,7 @@ where async fn handle_err( &mut self, - err: CancelableError, + err: CancelableError, cancel_handle: &CancelHandle, ) -> Result { if cancel_handle.is_canceled() { @@ -1407,6 +1409,33 @@ where return Ok(Action::Stop); } + let err = match err { + CancelableError::Error(BlockStreamError::Fatal(msg)) => { + error!( + &self.logger, + "The block stream encountered a substreams fatal error and will not retry: {}", + msg + ); + + // If substreams returns a deterministic error we may not necessarily have a specific block + // but we should not retry since it will keep failing. + self.inputs + .store + .fail_subgraph(SubgraphError { + subgraph_id: self.inputs.deployment.hash.clone(), + message: msg, + block_ptr: None, + handler: None, + deterministic: true, + }) + .await + .context("Failed to set subgraph status to `failed`")?; + + return Ok(Action::Stop); + } + e => e, + }; + debug!( &self.logger, "Block stream produced a non-fatal error"; diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index fc650252201..bd6da6fc6c0 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -26,7 +26,7 @@ pub const FIREHOSE_BUFFER_STREAM_SIZE: usize = 1; pub const SUBSTREAMS_BUFFER_STREAM_SIZE: usize = 100; pub struct BufferedBlockStream { - inner: Pin, Error>> + Send>>, + inner: Pin, BlockStreamError>> + Send>>, } impl BufferedBlockStream { @@ -34,13 +34,14 @@ impl BufferedBlockStream { size_hint: usize, stream: Box>, ) -> Box> { - let (sender, receiver) = mpsc::channel::, Error>>(size_hint); + let (sender, receiver) = + mpsc::channel::, BlockStreamError>>(size_hint); crate::spawn(async move { BufferedBlockStream::stream_blocks(stream, sender).await }); Box::new(BufferedBlockStream::new(receiver)) } - pub fn new(mut receiver: Receiver, Error>>) -> Self { + pub fn new(mut receiver: Receiver, BlockStreamError>>) -> Self { let inner = stream! { loop { let event = match receiver.recv().await { @@ -59,7 +60,7 @@ impl BufferedBlockStream { pub async fn stream_blocks( mut stream: Box>, - sender: Sender, Error>>, + sender: Sender, BlockStreamError>>, ) -> Result<(), Error> { while let Some(event) = stream.next().await { match sender.send(event).await { @@ -84,7 +85,7 @@ impl BlockStream for BufferedBlockStream { } impl Stream for BufferedBlockStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next( mut self: Pin<&mut Self>, @@ -95,7 +96,7 @@ impl Stream for BufferedBlockStream { } pub trait BlockStream: - Stream, Error>> + Unpin + Send + Stream, BlockStreamError>> + Unpin + Send { fn buffer_size_hint(&self) -> usize; } @@ -482,6 +483,20 @@ pub enum SubstreamsError { UnexpectedStoreDeltaOutput, } +#[derive(Debug, Error)] +pub enum BlockStreamError { + #[error("block stream error")] + Unknown(#[from] anyhow::Error), + #[error("block stream fatal error")] + Fatal(String), +} + +impl BlockStreamError { + pub fn is_deterministic(&self) -> bool { + matches!(self, Self::Fatal(_)) + } +} + #[derive(Debug)] pub enum BlockStreamEvent { // The payload is the block the subgraph should revert to, so it becomes the new subgraph head. @@ -576,7 +591,6 @@ pub trait ChainHeadUpdateListener: Send + Sync + 'static { mod test { use std::{collections::HashSet, task::Poll}; - use anyhow::Error; use futures03::{Stream, StreamExt, TryStreamExt}; use crate::{ @@ -585,7 +599,8 @@ mod test { }; use super::{ - BlockStream, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream, FirehoseCursor, + BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream, + FirehoseCursor, }; #[derive(Debug)] @@ -600,7 +615,7 @@ mod test { } impl Stream for TestStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index 29febc67108..f39e7861733 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -1,5 +1,5 @@ use super::block_stream::{ - BlockStream, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE, + BlockStream, BlockStreamError, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE, }; use super::client::ChainClient; use super::Blockchain; @@ -100,7 +100,7 @@ impl FirehoseBlockStreamMetrics { } pub struct FirehoseBlockStream { - stream: Pin, Error>> + Send>>, + stream: Pin, BlockStreamError>> + Send>>, } impl FirehoseBlockStream @@ -156,7 +156,7 @@ fn stream_blocks>( subgraph_current_block: Option, logger: Logger, metrics: FirehoseBlockStreamMetrics, -) -> impl Stream, Error>> { +) -> impl Stream, BlockStreamError>> { let mut subgraph_current_block = subgraph_current_block; let mut start_block_num = subgraph_current_block .as_ref() @@ -406,7 +406,7 @@ async fn process_firehose_response>( } impl Stream for FirehoseBlockStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.poll_next_unpin(cx) diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index a9c52294c18..85ebbf0240a 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -8,8 +8,8 @@ use std::task::{Context, Poll}; use std::time::Duration; use super::block_stream::{ - BlockStream, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, FirehoseCursor, - TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE, + BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, + FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE, }; use super::{Block, BlockPtr, Blockchain}; @@ -470,7 +470,7 @@ impl BlockStream for PollingBlockStream { } impl Stream for PollingBlockStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let result = loop { @@ -599,8 +599,8 @@ impl Stream for PollingBlockStream { // Chain head update stream ended Poll::Ready(None) => { // Should not happen - return Poll::Ready(Some(Err(anyhow::anyhow!( - "chain head update stream ended unexpectedly" + return Poll::Ready(Some(Err(BlockStreamError::from( + anyhow::anyhow!("chain head update stream ended unexpectedly"), )))); } @@ -610,6 +610,6 @@ impl Stream for PollingBlockStream { } }; - result + result.map_err(BlockStreamError::from) } } diff --git a/graph/src/blockchain/substreams_block_stream.rs b/graph/src/blockchain/substreams_block_stream.rs index 8a89d391597..91451e27eaa 100644 --- a/graph/src/blockchain/substreams_block_stream.rs +++ b/graph/src/blockchain/substreams_block_stream.rs @@ -1,4 +1,6 @@ -use super::block_stream::{BlockStreamMapper, SUBSTREAMS_BUFFER_STREAM_SIZE}; +use super::block_stream::{ + BlockStreamError, BlockStreamMapper, FirehoseCursor, SUBSTREAMS_BUFFER_STREAM_SIZE, +}; use super::client::ChainClient; use crate::blockchain::block_stream::{BlockStream, BlockStreamEvent}; use crate::blockchain::Blockchain; @@ -12,7 +14,7 @@ use humantime::format_duration; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use tonic::Status; +use tonic::{Code, Status}; struct SubstreamsBlockStreamMetrics { deployment: DeploymentHash, @@ -100,7 +102,7 @@ impl SubstreamsBlockStreamMetrics { pub struct SubstreamsBlockStream { //fixme: not sure if this is ok to be set as public, maybe // we do not want to expose the stream to the caller - stream: Pin, Error>> + Send>>, + stream: Pin, BlockStreamError>> + Send>>, } impl SubstreamsBlockStream @@ -111,7 +113,7 @@ where deployment: DeploymentHash, client: Arc>, subgraph_current_block: Option, - cursor: Option, + cursor: FirehoseCursor, mapper: Arc, modules: Option, module_name: String, @@ -149,7 +151,7 @@ where fn stream_blocks>( client: Arc>, - cursor: Option, + cursor: FirehoseCursor, deployment: DeploymentHash, mapper: Arc, modules: Option, @@ -159,8 +161,8 @@ fn stream_blocks>( subgraph_current_block: Option, logger: Logger, metrics: SubstreamsBlockStreamMetrics, -) -> impl Stream, Error>> { - let mut latest_cursor = cursor.unwrap_or_default(); +) -> impl Stream, BlockStreamError>> { + let mut latest_cursor = cursor.to_string(); let start_block_num = subgraph_current_block .as_ref() @@ -186,23 +188,13 @@ fn stream_blocks>( let mut logger = logger.new(o!("deployment" => deployment.clone(), "provider" => endpoint.provider.to_string())); loop { - info!( - &logger, - "Blockstreams disconnected, connecting"; - "endpoint_uri" => format_args!("{}", endpoint), - "subgraph" => &deployment, - "start_block" => start_block_num, - "cursor" => &latest_cursor, - "provider_err_count" => endpoint.current_error_count(), - ); - // We just reconnected, assume that we want to back off on errors skip_backoff = false; let mut connect_start = Instant::now(); let request = Request { start_block_num, - start_cursor: latest_cursor.clone(), + start_cursor: latest_cursor.to_string(), stop_block_num, modules: modules.clone(), output_module: module_name.clone(), @@ -245,7 +237,11 @@ fn stream_blocks>( } } }, + Err(BlockStreamError::Fatal(msg)) => { + Err(BlockStreamError::Fatal(msg))? + } Err(err) => { + info!(&logger, "received err"); // We have an open connection but there was an error processing the Firehose // response. We will reconnect the stream after this; this is the case where @@ -294,10 +290,19 @@ async fn process_substreams_response>( mapper: &F, logger: &mut Logger, log_data: &mut SubstreamsLogData, -) -> Result>, Error> { +) -> Result>, BlockStreamError> { let response = match result { Ok(v) => v, - Err(e) => return Err(anyhow!("An error occurred while streaming blocks: {:#}", e)), + Err(e) => { + if e.code() == Code::InvalidArgument { + return Err(BlockStreamError::Fatal(e.message().to_string())); + } + + return Err(BlockStreamError::from(anyhow!( + "An error occurred while streaming blocks: {:#}", + e + ))); + } }; match mapper @@ -320,7 +325,7 @@ async fn process_substreams_response>( } impl Stream for SubstreamsBlockStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.poll_next_unpin(cx) diff --git a/graph/src/ext/futures.rs b/graph/src/ext/futures.rs index 57eae6933cd..c25550a426f 100644 --- a/graph/src/ext/futures.rs +++ b/graph/src/ext/futures.rs @@ -1,3 +1,4 @@ +use crate::blockchain::block_stream::BlockStreamError; use crate::prelude::tokio::macros::support::Poll; use crate::prelude::{Pin, StoreError}; use futures03::channel::oneshot; @@ -313,6 +314,12 @@ impl From for CancelableError { } } +impl From for CancelableError { + fn from(e: BlockStreamError) -> Self { + Self::Error(e) + } +} + impl From for CancelableError { fn from(e: anyhow::Error) -> Self { Self::Error(e) diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 4f46c5eb60d..12868a941c9 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -748,10 +748,7 @@ fn insert_subgraph_error(conn: &PgConnection, error: &SubgraphError) -> anyhow:: } = error; let block_num = match &block_ptr { - None => { - assert_eq!(*deterministic, false); - crate::block_range::BLOCK_UNVERSIONED - } + None => crate::block_range::BLOCK_UNVERSIONED, Some(block) => crate::block_range::block_number(block), }; diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 1aaecee7605..f02433101c0 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -10,8 +10,8 @@ use anyhow::Error; use async_stream::stream; use futures::{Stream, StreamExt}; use graph::blockchain::block_stream::{ - BlockRefetcher, BlockStream, BlockStreamBuilder, BlockStreamEvent, BlockWithTriggers, - FirehoseCursor, + BlockRefetcher, BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamEvent, + BlockWithTriggers, FirehoseCursor, }; use graph::blockchain::{ Block, BlockHash, BlockPtr, Blockchain, BlockchainMap, ChainIdentifier, RuntimeAdapter, @@ -784,7 +784,7 @@ where } struct StaticStream { - stream: Pin, Error>> + Send>>, + stream: Pin, BlockStreamError>> + Send>>, } impl BlockStream for StaticStream { @@ -794,7 +794,7 @@ impl BlockStream for StaticStream { } impl Stream for StaticStream { - type Item = Result, Error>; + type Item = Result, BlockStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.poll_next_unpin(cx) @@ -804,7 +804,7 @@ impl Stream for StaticStream { fn stream_events( blocks: Vec>, current_idx: Option, -) -> impl Stream, Error>> +) -> impl Stream, BlockStreamError>> where C::TriggerData: Clone, {