Skip to content

Commit

Permalink
agent,common: Improve process of ensuring subgraphs are deployed
Browse files Browse the repository at this point in the history
- Ahead of executing a batch, ensure all deployments to be allocated to
are ensured
- Check status of subgraph deployment assignments: if already syncing
then no-op, if paused then resume, if doesn't exist then create and
deploy
  • Loading branch information
fordN committed Jul 11, 2024
1 parent 84fe045 commit 5c788c8
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 7 deletions.
5 changes: 4 additions & 1 deletion packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
networkIsL2,
networkIsL1,
DeploymentManagementMode,
SubgraphStatus,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand Down Expand Up @@ -970,6 +971,8 @@ export class Agent {
// Deploy/remove up to 10 subgraphs in parallel
const queue = new PQueue({ concurrency: 10 })

const currentAssignments =
await this.graphNode.subgraphDeploymentsAssignments(SubgraphStatus.ALL)
// Index all new deployments worth indexing
await queue.addAll(
deploy.map(deployment => async () => {
Expand All @@ -981,7 +984,7 @@ export class Agent {
})

// Ensure the deployment is deployed to the indexer
await this.graphNode.ensure(name, deployment)
await this.graphNode.ensure(name, deployment, currentAssignments)
}),
)

Expand Down
4 changes: 3 additions & 1 deletion packages/indexer-common/src/allocations/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ export const monitorEligibleAllocations = ({
const allocations = [...activeAllocations, ...recentlyClosedAllocations]

if (allocations.length == 0) {
throw new Error(`No data / indexer not found on chain`)
logger.warn(`No data / indexer not found on chain`, {
allocations: [],
})
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
4 changes: 4 additions & 0 deletions packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export enum IndexerErrorCode {
IE073 = 'IE073',
IE074 = 'IE074',
IE075 = 'IE075',
IE076 = 'IE076',
IE077 = 'IE077',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -165,6 +167,8 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE073: 'Failed to query subgraph features from indexing statuses endpoint',
IE074: 'Failed to deploy subgraph: network not supported',
IE075: 'Failed to connect to network contracts',
IE076: 'Failed to resume subgraph deployment',
IE077: 'Failed to allocate: subgraph not healthily syncing',
}

export type IndexerErrorCause = unknown
Expand Down
67 changes: 63 additions & 4 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ export class GraphNode {
return result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return status.node !== 'removed'
return !status.paused
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
return status.node === 'removed' || status.paused === true
} else {
Expand All @@ -192,6 +192,7 @@ export class GraphNode {
return {
id: new SubgraphDeploymentID(status.subgraphDeployment),
node: status.node,
paused: status.paused,
}
})
} catch (error) {
Expand Down Expand Up @@ -343,6 +344,29 @@ export class GraphNode {
}
}

async resume(deployment: SubgraphDeploymentID): Promise<void> {
try {
this.logger.info(`Resume subgraph deployment`, {
deployment: deployment.display,
})
const response = await this.admin.request('subgraph_resume', {
deployment: deployment.ipfsHash,
})
if (response.error) {
throw response.error
}
this.logger.info(`Successfully resumed subgraph deployment`, {
deployment: deployment.display,
})
} catch (error) {
const errorCode = IndexerErrorCode.IE076
this.logger.error(INDEXER_ERROR_MESSAGES[errorCode], {
deployment: deployment.display,
error: indexerError(errorCode, error),
})
}
}

async reassign(deployment: SubgraphDeploymentID, node: string): Promise<void> {
try {
this.logger.info(`Reassign subgraph deployment`, {
Expand Down Expand Up @@ -374,10 +398,45 @@ export class GraphNode {
}
}

async ensure(name: string, deployment: SubgraphDeploymentID): Promise<void> {
async ensure(
name: string,
deployment: SubgraphDeploymentID,
currentAssignments?: SubgraphDeploymentAssignment[],
): Promise<void> {
this.logger.debug('Ensure subgraph deployment is syncing', {
name,
deployment: deployment.ipfsHash,
})
try {
await this.create(name)
await this.deploy(name, deployment)
const deploymentAssignments =
currentAssignments ??
(await this.subgraphDeploymentsAssignments(SubgraphStatus.ALL))
const matchingAssignment = deploymentAssignments.find(
(deploymentAssignment) => deploymentAssignment.id.ipfsHash == deployment.ipfsHash,
)

if (matchingAssignment?.paused == false) {
this.logger.debug('Subgraph deployment already syncing, ensure() is a no-op', {
name,
deployment: deployment.ipfsHash,
})
} else if (matchingAssignment?.paused == true) {
this.logger.debug('Subgraph deployment paused, resuming', {
name,
deployment: deployment.ipfsHash,
})
await this.resume(deployment)
} else {
this.logger.debug(
'Subgraph deployment not found, creating subgraph name and deploying...',
{
name,
deployment: deployment.ipfsHash,
},
)
await this.create(name)
await this.deploy(name, deployment)
}
} catch (error) {
if (!(error instanceof IndexerError)) {
const errorCode = IndexerErrorCode.IE020
Expand Down
33 changes: 32 additions & 1 deletion packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
Network,
ReallocateAllocationResult,
SubgraphIdentifierType,
SubgraphStatus,
uniqueAllocationID,
upsertIndexingRule,
} from '@graphprotocol/indexer-common'
Expand Down Expand Up @@ -124,6 +125,8 @@ export class AllocationManager {
const validatedActions = await this.validateActionBatchFeasibilty(actions)
logger.trace('Validated actions', { validatedActions })

await this.deployBeforeAllocating(logger, validatedActions)

const populateTransactionsResults = await this.prepareTransactions(validatedActions)

const failedTransactionPreparations = populateTransactionsResults
Expand Down Expand Up @@ -307,6 +310,28 @@ export class AllocationManager {
}
}

async deployBeforeAllocating(logger: Logger, actions: Action[]): Promise<void> {
const allocateActions = actions.filter((action) => action.type == ActionType.ALLOCATE)
logger.info('Ensure subgraph deployments are deployed before we allocate to them', {
allocateActions,
})
const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments(
SubgraphStatus.ALL,
)
await pMap(
allocateActions,
async (action: Action) =>
await this.graphNode.ensure(
`indexer-agent/${action.deploymentID!.slice(-10)}`,
new SubgraphDeploymentID(action.deploymentID!),
currentAssignments,
),
{
stopOnError: false,
},
)
}

async prepareAllocateParams(
logger: Logger,
context: TransactionPreparationContext,
Expand Down Expand Up @@ -352,10 +377,16 @@ export class AllocationManager {
)
if (!status) {
throw indexerError(
IndexerErrorCode.IE020,
IndexerErrorCode.IE077,
`Subgraph deployment, '${deployment.ipfsHash}', is not syncing`,
)
}
if (status?.health == 'failed') {
throw indexerError(
IndexerErrorCode.IE077,
`Subgraph deployment, '${deployment.ipfsHash}', has failed`,
)
}

logger.debug('Obtain a unique Allocation ID')
const { allocationSigner, allocationId } = uniqueAllocationID(
Expand Down

0 comments on commit 5c788c8

Please sign in to comment.