Skip to content

Commit

Permalink
agent, common: Add 'syncingNetwork' column to Actions table
Browse files Browse the repository at this point in the history
- Syncing network is the network/chain that the subgraph deployment
extracts data from.
  • Loading branch information
fordN committed Nov 15, 2024
1 parent 6d8975f commit 1e046be
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Logger } from '@graphprotocol/common-ts'
import { DataTypes, QueryInterface } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
}

interface Context {
context: MigrationContext
}

export async function up({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.debug(`Checking if actions table exists`)
const tables = await queryInterface.showAllTables()
if (!tables.includes('Actions')) {
logger.info(`Actions table does not exist, migration not necessary`)
return
}

logger.debug(`Checking if 'Actions' table needs to be migrated`)
const table = await queryInterface.describeTable('Actions')
const syncingNetworkColumn = table.syncingNetwork
if (syncingNetworkColumn) {
logger.info(
`'syncingNetwork' columns already exist, migration not necessary`,
)
return
}

logger.info(`Add 'syncingNetwork' column to 'Actions' table`)
await queryInterface.addColumn('Actions', 'syncingNetwork', {
type: DataTypes.BOOLEAN,
defaultValue: false,
})
}

export async function down({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

return await queryInterface.sequelize.transaction({}, async transaction => {
const tables = await queryInterface.showAllTables()

if (tables.includes('Actions')) {
logger.info(`Remove 'syncingNetwork' column`)
await context.queryInterface.removeColumn('Actions', 'syncingNetwork', {
transaction,
})
}
})
}
4 changes: 4 additions & 0 deletions packages/indexer-cli/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export async function buildActionInput(
status,
priority,
protocolNetwork,
syncingNetwork: 'unknown',
}
case ActionType.UNALLOCATE: {
let poi = actionParams.param2
Expand All @@ -64,6 +65,7 @@ export async function buildActionInput(
status,
priority,
protocolNetwork,
syncingNetwork: 'unknown',
}
}
case ActionType.REALLOCATE: {
Expand All @@ -83,6 +85,7 @@ export async function buildActionInput(
status,
priority,
protocolNetwork,
syncingNetwork: 'unknown',
}
}
}
Expand Down Expand Up @@ -399,6 +402,7 @@ export async function fetchActions(
) {
id
protocolNetwork
syncingNetwork
type
allocationID
deploymentID
Expand Down
74 changes: 44 additions & 30 deletions packages/indexer-common/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,36 @@ export interface ActionInput {
status: ActionStatus
priority: number | undefined
protocolNetwork: string
syncingNetwork: string
}

export const isValidActionInput = (
/* eslint-disable @typescript-eslint/no-explicit-any */
variableToCheck: any,
): variableToCheck is ActionInput => {
if (!('type' in variableToCheck)) {
actionToCheck: any,
): actionToCheck is ActionInput => {
if (!('type' in actionToCheck)) {
return false
}
let hasActionParams = false
switch (variableToCheck.type) {
switch (actionToCheck.type) {
case ActionType.ALLOCATE:
hasActionParams = 'deploymentID' in variableToCheck && 'amount' in variableToCheck
hasActionParams = 'deploymentID' in actionToCheck && 'amount' in actionToCheck
break
case ActionType.UNALLOCATE:
hasActionParams =
'deploymentID' in variableToCheck && 'allocationID' in variableToCheck
hasActionParams = 'deploymentID' in actionToCheck && 'allocationID' in actionToCheck
break
case ActionType.REALLOCATE:
hasActionParams =
'deploymentID' in variableToCheck &&
'allocationID' in variableToCheck &&
'amount' in variableToCheck
'deploymentID' in actionToCheck &&
'allocationID' in actionToCheck &&
'amount' in actionToCheck
}
return (
hasActionParams &&
'source' in variableToCheck &&
'reason' in variableToCheck &&
'status' in variableToCheck &&
'priority' in variableToCheck
'source' in actionToCheck &&
'reason' in actionToCheck &&
'status' in actionToCheck &&
'priority' in actionToCheck
)
}

Expand All @@ -92,22 +92,6 @@ export const validateActionInputs = async (
throw Error("Cannot set an action without the field 'protocolNetwork'")
}

try {
// Set the parsed network identifier back in the action input object
action.protocolNetwork = validateNetworkIdentifier(action.protocolNetwork)
} catch (e) {
throw Error(`Invalid value for the field 'protocolNetwork'. ${e}`)
}

// Must have the required params for the action type
if (!isValidActionInput(action)) {
throw new Error(
`Failed to queue action: Invalid action input, actionInput: ${JSON.stringify(
action,
)}`,
)
}

// Must have status QUEUED or APPROVED
if (
[
Expand All @@ -122,6 +106,15 @@ export const validateActionInputs = async (
)
}

// Must have the required params for the action type
if (!isValidActionInput(action)) {
throw new Error(
`Failed to queue action: Invalid action input, actionInput: ${JSON.stringify(
action,
)}`,
)
}

// Action must target an existing subgraph deployment
const subgraphDeployment = await networkMonitor.subgraphDeployment(
action.deploymentID,
Expand All @@ -132,6 +125,25 @@ export const validateActionInputs = async (
)
}

try {
// Set the parsed protocol network identifier back in the action input object
action.protocolNetwork = validateNetworkIdentifier(action.protocolNetwork)
} catch (e) {
throw Error(`Invalid value for the field 'protocolNetwork'. ${e}`)
}

try {
// Fetch syncing network, parse alias, and set the parsed value back in the action input object
const syncingNetwork = await networkMonitor.deploymentSyncingNetwork(
action.deploymentID,
)
action.syncingNetwork = validateNetworkIdentifier(syncingNetwork)
} catch (e) {
throw Error(
`Could not resolve 'syncingNetwork' for deployment '${action.deploymentID}'. ${e}`,
)
}

// Unallocate & reallocate actions must target an active allocationID
if ([ActionType.UNALLOCATE, ActionType.REALLOCATE].includes(action.type)) {
// allocationID must belong to active allocation
Expand Down Expand Up @@ -161,6 +173,7 @@ export interface ActionFilter {
reason?: string
updatedAt?: WhereOperators
protocolNetwork?: string
syncingNetwork?: string
}

export const actionFilterToWhereOptions = (filter: ActionFilter): WhereOptions => {
Expand Down Expand Up @@ -192,6 +205,7 @@ export interface ActionResult {
failureReason: string | null
transaction: string | null
protocolNetwork: string
syncingNetwork: string
}

export enum ActionType {
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export enum IndexerErrorCode {
IE075 = 'IE075',
IE076 = 'IE076',
IE077 = 'IE077',
IE078 = 'IE078',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -169,6 +170,7 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE075: 'Failed to connect to network contracts',
IE076: 'Failed to resume subgraph deployment',
IE077: 'Failed to allocate: subgraph not healthily syncing',
IE078: 'Failed to query subgraph features from network subgraph',
}

export type IndexerErrorCause = unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ describe('Actions', () => {
priority: 0,
// When writing directly to the database, `protocolNetwork` must be in the CAIP2-ID format.
protocolNetwork: 'eip155:421614',
syncingNetwork: 'eip155:1',
}

await models.Action.upsert(action)
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/src/indexer-management/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ const SCHEMA_SDL = gql`
createdAt: BigInt!
updatedAt: BigInt
protocolNetwork: String!
syncingNetwork: String!
}
input ActionInput {
Expand All @@ -149,6 +150,7 @@ const SCHEMA_SDL = gql`
reason: String!
priority: Int!
protocolNetwork: String!
syncingNetwork: String!
}
input ActionUpdateInput {
Expand Down Expand Up @@ -201,6 +203,7 @@ const SCHEMA_SDL = gql`
input ActionFilter {
id: Int
protocolNetwork: String
syncingNetwork: String
type: ActionType
status: String
source: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class Action extends Model<
declare updatedAt: CreationOptional<Date>

declare protocolNetwork: string
declare syncingNetwork: string

// eslint-disable-next-line @typescript-eslint/ban-types
public toGraphQL(): object {
Expand Down Expand Up @@ -151,6 +152,14 @@ export const defineActionModels = (sequelize: Sequelize): ActionModels => {
is: caip2IdRegex,
},
},
syncingNetwork: {
type: DataTypes.STRING(50),
primaryKey: false,
allowNull: false,
validate: {
is: caip2IdRegex,
},
},
},
{
modelName: 'Action',
Expand Down
42 changes: 42 additions & 0 deletions packages/indexer-common/src/indexer-management/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,48 @@ export class NetworkMonitor {
}
}

async deploymentSyncingNetwork(ipfsHash: string): Promise<string> {
try {
const result = await this.networkSubgraph.checkedQuery(
gql`
query subgraphDeploymentManifest($ipfsHash: String!) {
subgraphDeploymentManifest(id: $ipfsHash) {
network
}
}
`,
{
ipfsHash: ipfsHash,
},
)

if (result.error) {
throw result.error
}

if (!result.data || !result.data.subgraphDeploymentManifest) {
throw new Error(
`SubgraphDeployment with ipfsHash = ${ipfsHash} not found on chain`,
)
}

if (result.data.subgraphDeploymentManifest.network == undefined) {
return 'unknown'
}

return result.data.subgraphDeploymentManifest.network
} catch (error) {
const err = indexerError(IndexerErrorCode.IE078, error)
this.logger.error(
`Failed to query subgraphDeploymentManifest with ipfsHash = ${ipfsHash}`,
{
err,
},
)
throw err
}
}

async transferredDeployments(): Promise<TransferredSubgraphDeployment[]> {
this.logger.debug('Querying the Network for transferred subgraph deployments')
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
NetworkMapped,
OrderDirection,
validateActionInputs,
validateNetworkIdentifier,
} from '@graphprotocol/indexer-common'
import { literal, Op, Transaction } from 'sequelize'
import { ActionManager } from '../actions'
Expand Down Expand Up @@ -161,15 +160,6 @@ export default {
throw Error('IndexerManagementClient must be in `network` mode to modify actions')
}

// Sanitize protocol network identifier
actions.forEach((action) => {
try {
action.protocolNetwork = validateNetworkIdentifier(action.protocolNetwork)
} catch (e) {
throw Error(`Invalid value for the field 'protocolNetwork'. ${e}`)
}
})

// Let Network Monitors validate actions based on their protocol networks
await multiNetworks.mapNetworkMapped(
groupBy(actions, (action) => action.protocolNetwork),
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-common/src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ export class Operator {
reason: action.reason,
priority: 0,
protocolNetwork: action.protocolNetwork,
syncingNetork: 'unknown',
}
this.logger.trace(`Queueing action input`, {
actionInput,
Expand Down

0 comments on commit 1e046be

Please sign in to comment.