From 203cfd8e7394f826e79d328d1fc9a0257b828154 Mon Sep 17 00:00:00 2001 From: Ford Date: Mon, 13 Nov 2023 13:00:19 -0800 Subject: [PATCH] indexer-agent: Improve efficiency of batch action preparation - Fetch 'currentEpoch' and 'activeAllocations' ony once; instead of for each action being prepared - Check indexing status before allocating; instead of calling ensure() --- .../src/indexer-management/allocations.ts | 116 ++++++++++++------ 1 file changed, 79 insertions(+), 37 deletions(-) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index c42f51796..5e08ed1d4 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -9,6 +9,7 @@ import { Action, ActionFailure, ActionType, + Allocation, allocationIdProof, AllocationResult, AllocationStatus, @@ -22,6 +23,7 @@ import { IndexerManagementModels, IndexingDecisionBasis, IndexingRuleAttributes, + IndexingStatus, isActionFailure, isDeploymentWorthAllocatingTowards, Network, @@ -42,6 +44,12 @@ import { import { BytesLike } from '@ethersproject/bytes' import pMap from 'p-map' +export interface TransactionPreparationContext { + activeAllocations: Allocation[] + currentEpoch: BigNumber + indexingStatuses: IndexingStatus[] +} + export interface AllocateTransactionParams { indexer: string subgraphDeploymentID: BytesLike @@ -228,15 +236,26 @@ export class AllocationManager { } async prepareTransactions(actions: Action[]): Promise { + const context: TransactionPreparationContext = { + activeAllocations: await this.network.networkMonitor.allocations( + AllocationStatus.ACTIVE, + ), + currentEpoch: await this.network.contracts.epochManager.currentEpoch(), + indexingStatuses: await this.graphNode.indexingStatus([]), + } return await pMap( actions, - async (action: Action) => await this.prepareTransaction(action), + async (action: Action) => await this.prepareTransaction(action, context), { stopOnError: false, }, ) } - async prepareTransaction(action: Action): Promise { + + async prepareTransaction( + action: Action, + context: TransactionPreparationContext, + ): Promise { const logger = this.logger.child({ action: action.id }) logger.trace('Preparing transaction', { action, @@ -246,6 +265,7 @@ export class AllocationManager { case ActionType.ALLOCATE: return await this.prepareAllocate( logger, + context, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion new SubgraphDeploymentID(action.deploymentID!), // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -254,6 +274,7 @@ export class AllocationManager { case ActionType.UNALLOCATE: return await this.prepareUnallocate( logger, + context, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion action.allocationID!, action.poi === null ? undefined : action.poi, @@ -262,6 +283,7 @@ export class AllocationManager { case ActionType.REALLOCATE: return await this.prepareReallocate( logger, + context, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion action.allocationID!, action.poi === null ? undefined : action.poi, @@ -287,6 +309,7 @@ export class AllocationManager { async prepareAllocateParams( logger: Logger, + context: TransactionPreparationContext, deployment: SubgraphDeploymentID, amount: BigNumber, ): Promise { @@ -295,10 +318,7 @@ export class AllocationManager { amount: amount.toString(), }) - const activeAllocations = await this.network.networkMonitor.allocations( - AllocationStatus.ACTIVE, - ) - const allocation = activeAllocations.find( + const allocation = context.activeAllocations.find( (allocation) => allocation.subgraphDeployment.id.toString() === deployment.toString(), ) @@ -323,22 +343,32 @@ export class AllocationManager { ) } - const currentEpoch = await this.network.contracts.epochManager.currentEpoch() - - // Ensure subgraph is deployed before allocating - await this.graphNode.ensure( - `indexer-agent/${deployment.ipfsHash.slice(-10)}`, - deployment, + // Check that the subgraph is syncing and healthy before allocating + // Throw error if: + // - subgraph deployment is not syncing, + // - subgraph deployment is failed + const status = context.indexingStatuses.find( + (status) => status.subgraphDeployment.ipfsHash == deployment.ipfsHash, ) + if (!status) { + throw indexerError( + IndexerErrorCode.IE020, + `Subgraph deployment, '${deployment.ipfsHash}', is not syncing`, + ) + } + if (status && status.health == 'failed') { + throw indexerError( + IndexerErrorCode.IE020, + `Subgraph deployment, '${deployment.ipfsHash}', failed during syncing`, + ) + } logger.debug('Obtain a unique Allocation ID') - - // Obtain a unique allocation ID const { allocationSigner, allocationId } = uniqueAllocationID( this.network.transactionManager.wallet.mnemonic.phrase, - currentEpoch.toNumber(), + context.currentEpoch.toNumber(), deployment, - activeAllocations.map((allocation) => allocation.id), + context.activeAllocations.map((allocation) => allocation.id), ) // Double-check whether the allocationID already exists on chain, to @@ -460,10 +490,11 @@ export class AllocationManager { async prepareAllocate( logger: Logger, + context: TransactionPreparationContext, deployment: SubgraphDeploymentID, amount: BigNumber, ): Promise { - const params = await this.prepareAllocateParams(logger, deployment, amount) + const params = await this.prepareAllocateParams(logger, context, deployment, amount) logger.debug(`Populating allocateFrom transaction`, { indexer: params.indexer, subgraphDeployment: params.subgraphDeploymentID, @@ -483,6 +514,7 @@ export class AllocationManager { async prepareUnallocateParams( logger: Logger, + context: TransactionPreparationContext, allocationID: string, poi: string | undefined, force: boolean, @@ -492,12 +524,14 @@ export class AllocationManager { poi: poi || 'none provided', }) const allocation = await this.network.networkMonitor.allocation(allocationID) + // Ensure allocation is old enough to close - const currentEpoch = await this.network.contracts.epochManager.currentEpoch() - if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) { + if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) { throw indexerError( IndexerErrorCode.IE064, - `Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add( + `Allocation '${ + allocation.id + }' cannot be closed until epoch ${context.currentEpoch.add( 1, )}. (Allocations cannot be closed in the same epoch they were created)`, ) @@ -635,16 +669,24 @@ export class AllocationManager { async prepareUnallocate( logger: Logger, + context: TransactionPreparationContext, allocationID: string, poi: string | undefined, force: boolean, ): Promise { - const params = await this.prepareUnallocateParams(logger, allocationID, poi, force) + const params = await this.prepareUnallocateParams( + logger, + context, + allocationID, + poi, + force, + ) return await this.populateUnallocateTransaction(logger, params) } async prepareReallocateParams( logger: Logger, + context: TransactionPreparationContext, allocationID: string, poi: string | undefined, amount: BigNumber, @@ -657,14 +699,9 @@ export class AllocationManager { force, }) - /* Fetch all active allocations and search for our input parameter `allocationID`. - * We don't call `fetchAllocations` here because all allocations will be required - * later when generating a new `uniqueAllocationID`. */ - const activeAllocations = await this.network.networkMonitor.allocations( - AllocationStatus.ACTIVE, - ) + // Validate that the allocation exists and is old enough to close const allocationAddress = toAddress(allocationID) - const allocation = activeAllocations.find((allocation) => { + const allocation = context.activeAllocations.find((allocation) => { return allocation.id === allocationAddress }) if (!allocation) { @@ -674,25 +711,28 @@ export class AllocationManager { `Reallocation failed: No active allocation with id '${allocationID}' found`, ) } - - // Ensure allocation is old enough to close - const currentEpoch = await this.network.contracts.epochManager.currentEpoch() - if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) { + if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) { throw indexerError( IndexerErrorCode.IE064, - `Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add( + `Allocation '${ + allocation.id + }' cannot be closed until epoch ${context.currentEpoch.add( 1, )}. (Allocations cannot be closed in the same epoch they were created)`, ) } - logger.debug('Resolving POI') + logger.debug('Resolving POI', { + allocation: allocationID, + deployment: allocation.subgraphDeployment.id.ipfsHash, + }) const allocationPOI = await this.network.networkMonitor.resolvePOI( allocation, poi, force, ) logger.debug('POI resolved', { + deployment: allocation.subgraphDeployment.id.ipfsHash, userProvidedPOI: poi, poi: allocationPOI, }) @@ -723,9 +763,9 @@ export class AllocationManager { logger.debug('Generating a new unique Allocation ID') const { allocationSigner, allocationId: newAllocationId } = uniqueAllocationID( this.network.transactionManager.wallet.mnemonic.phrase, - currentEpoch.toNumber(), + context.currentEpoch.toNumber(), allocation.subgraphDeployment.id, - activeAllocations.map((allocation) => allocation.id), + context.activeAllocations.map((allocation) => allocation.id), ) logger.debug('New unique Allocation ID generated', { @@ -774,7 +814,7 @@ export class AllocationManager { deployment: allocation.subgraphDeployment.id.toString(), poi: allocationPOI, proof, - epoch: currentEpoch.toString(), + epoch: context.currentEpoch.toString(), }) return { @@ -921,6 +961,7 @@ export class AllocationManager { async prepareReallocate( logger: Logger, + context: TransactionPreparationContext, allocationID: string, poi: string | undefined, amount: BigNumber, @@ -928,6 +969,7 @@ export class AllocationManager { ): Promise { const params = await this.prepareReallocateParams( logger, + context, allocationID, poi, amount,