Skip to content

Commit

Permalink
graph: Stop subgraphs passing max endBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
shuaibbapputty committed Nov 14, 2024
1 parent 9458fc5 commit c066e3d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
3 changes: 3 additions & 0 deletions core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub start_blocks: Vec<BlockNumber>,
pub end_blocks: BTreeSet<BlockNumber>,
pub stop_block: Option<BlockNumber>,
pub max_end_block: Option<BlockNumber>,
pub store: Arc<dyn WritableStore>,
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
Expand All @@ -40,6 +41,7 @@ impl<C: Blockchain> IndexingInputs<C> {
start_blocks,
end_blocks,
stop_block,
max_end_block,
store: _,
debug_fork,
triggers_adapter,
Expand All @@ -57,6 +59,7 @@ impl<C: Blockchain> IndexingInputs<C> {
start_blocks: start_blocks.clone(),
end_blocks: end_blocks.clone(),
stop_block: stop_block.clone(),
max_end_block: max_end_block.clone(),
store,
debug_fork: debug_fork.clone(),
triggers_adapter: triggers_adapter.clone(),
Expand Down
14 changes: 14 additions & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,19 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
})
.collect();

// We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph
// at that point only when there are no dynamic data sources present and offchain
// data sources exist. This is because:
// - Dynamic data sources do not have a defined `end_block`, so we can't determine
// when to stop processing them.
// - Offchain data sources might still require processing beyond the end block of
// onchain data sources, necessitating the continuation of the subgraph.
let max_end_block: Option<BlockNumber> = if data_sources.len() == end_blocks.len() {
end_blocks.iter().max().cloned()
} else {
None
};

let templates = Arc::new(manifest.templates.clone());

// Obtain the debug fork from the subgraph store
Expand Down Expand Up @@ -419,6 +432,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
start_blocks,
end_blocks,
stop_block,
max_end_block,
store,
debug_fork,
triggers_adapter,
Expand Down
29 changes: 26 additions & 3 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ where
.unfail_deterministic_error(&current_ptr, &parent_ptr)
.await?;
}

// Stop subgraph when we reach maximum endblock.
if let Some(max_end_block) = self.inputs.max_end_block {
if max_end_block <= current_ptr.block_number() {
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
"max_end_block" => max_end_block,
"current_block" => current_ptr.block_number());
self.inputs.store.flush().await?;
return Ok(self);
}
}
}

loop {
Expand Down Expand Up @@ -837,9 +848,21 @@ where
}
}

if let Some(stop_block) = &self.inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(self.logger, "stop block reached for subgraph");
if let Some(stop_block) = self.inputs.stop_block {
if block_ptr.number >= stop_block {
info!(self.logger, "Stop block reached for subgraph");
return Ok(Action::Stop);
}
}

if let Some(max_end_block) = self.inputs.max_end_block {
if block_ptr.number >= max_end_block {
info!(
self.logger,
"Stopping subgraph as maximum endBlock reached";
"max_end_block" => max_end_block,
"current_block" => block_ptr.number
);
return Ok(Action::Stop);
}
}
Expand Down
10 changes: 10 additions & 0 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,16 @@ pub enum BlockStreamEvent<C: Blockchain> {
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
}

impl<C: Blockchain> BlockStreamEvent<C> {
pub fn block_ptr(&self) -> BlockPtr {
match self {
BlockStreamEvent::Revert(ptr, _) => ptr.clone(),
BlockStreamEvent::ProcessBlock(block, _) => block.ptr(),
BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(),
}
}
}

impl<C: Blockchain> Clone for BlockStreamEvent<C>
where
C::TriggerData: Clone,
Expand Down

0 comments on commit c066e3d

Please sign in to comment.