diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index b2e95c753f5..02b20c089e3 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -17,6 +17,7 @@ pub struct IndexingInputs { pub start_blocks: Vec, pub end_blocks: BTreeSet, pub stop_block: Option, + pub max_end_block: Option, pub store: Arc, pub debug_fork: Option>, pub triggers_adapter: Arc>, @@ -40,6 +41,7 @@ impl IndexingInputs { start_blocks, end_blocks, stop_block, + max_end_block, store: _, debug_fork, triggers_adapter, @@ -57,6 +59,7 @@ impl IndexingInputs { start_blocks: start_blocks.clone(), end_blocks: end_blocks.clone(), stop_block: stop_block.clone(), + max_end_block: max_end_block.clone(), store, debug_fork: debug_fork.clone(), triggers_adapter: triggers_adapter.clone(), diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index c98641539d9..39f829066a9 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -331,6 +331,19 @@ impl SubgraphInstanceManager { }) .collect(); + // We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph + // at that point only when there are no dynamic data sources present and offchain + // data sources exist. This is because: + // - Dynamic data sources do not have a defined `end_block`, so we can't determine + // when to stop processing them. + // - Offchain data sources might still require processing beyond the end block of + // onchain data sources, necessitating the continuation of the subgraph. + let max_end_block: Option = if data_sources.len() == end_blocks.len() { + end_blocks.iter().max().cloned() + } else { + None + }; + let templates = Arc::new(manifest.templates.clone()); // Obtain the debug fork from the subgraph store @@ -419,6 +432,7 @@ impl SubgraphInstanceManager { start_blocks, end_blocks, stop_block, + max_end_block, store, debug_fork, triggers_adapter, diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index cd341ce2f99..9b81c420ec2 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -197,6 +197,17 @@ where .unfail_deterministic_error(¤t_ptr, &parent_ptr) .await?; } + + // Stop subgraph when we reach maximum endblock. + if let Some(max_end_block) = self.inputs.max_end_block { + if max_end_block <= current_ptr.block_number() { + info!(self.logger, "Stopping subgraph as we reached maximum endBlock"; + "max_end_block" => max_end_block, + "current_block" => current_ptr.block_number()); + self.inputs.store.flush().await?; + return Ok(self); + } + } } loop { @@ -837,9 +848,21 @@ where } } - if let Some(stop_block) = &self.inputs.stop_block { - if block_ptr.number >= *stop_block { - info!(self.logger, "stop block reached for subgraph"); + if let Some(stop_block) = self.inputs.stop_block { + if block_ptr.number >= stop_block { + info!(self.logger, "Stop block reached for subgraph"); + return Ok(Action::Stop); + } + } + + if let Some(max_end_block) = self.inputs.max_end_block { + if block_ptr.number >= max_end_block { + info!( + self.logger, + "Stopping subgraph as maximum endBlock reached"; + "max_end_block" => max_end_block, + "current_block" => block_ptr.number + ); return Ok(Action::Stop); } } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 25a923dd502..0daf4c33eda 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -541,6 +541,16 @@ pub enum BlockStreamEvent { ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor), } +impl BlockStreamEvent { + pub fn block_ptr(&self) -> BlockPtr { + match self { + BlockStreamEvent::Revert(ptr, _) => ptr.clone(), + BlockStreamEvent::ProcessBlock(block, _) => block.ptr(), + BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(), + } + } +} + impl Clone for BlockStreamEvent where C::TriggerData: Clone,