Skip to content

Commit

Permalink
feat: add scalar tap table migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 14, 2023
1 parent 68d587f commit cbadd3b
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 9 deletions.
244 changes: 244 additions & 0 deletions packages/indexer-agent/src/db/migrations/12-add-scalar-tap-table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import { Logger } from '@graphprotocol/common-ts'
import { QueryInterface, DataTypes, QueryTypes } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
}

interface Context {
context: MigrationContext
}

export async function up({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

const tables = await queryInterface.showAllTables()
logger.debug(`Checking if scalar_tap_receipts table exists`, { tables })

if (tables.includes('scalar_tap_receipts')) {
logger.debug(`scalar_tap_receipts already exist, migration not necessary`)
} else {
logger.info(`Create scalar_tap_receipts`)
await queryInterface.createTable('scalar_tap_receipts', {
id: {
type: DataTypes.BIGINT,
primaryKey: true,
autoIncrement: true,
},
allocation_id: {
type: DataTypes.CHAR(40),
allowNull: false,
},
sender_address: {
type: DataTypes.CHAR(40),
allowNull: false,
},
timestamp_ns: {
type: DataTypes.DECIMAL(20),
allowNull: false,
},
value: {
type: DataTypes.DECIMAL(39),
allowNull: false,
},
receipt: {
type: DataTypes.JSON,
allowNull: false,
},
})
}

logger.debug('Create function and trigger using raw SQL')
// const schemas = await queryInterface.showAllSchemas()
const functionSQL = `DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_proc
INNER JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE proname = 'scalar_tap_receipt_notify'
AND nspname = 'public' -- or your specific schema if not public
) THEN
EXECUTE $func$
CREATE FUNCTION scalar_tap_receipt_notify()
RETURNS trigger AS $body$
BEGIN
PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "sender_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.sender_address, NEW.timestamp_ns, NEW.value));
RETURN NEW;
END;
$body$ LANGUAGE plpgsql;
$func$;
END IF;
END $$;`
await queryInterface.sequelize.query(functionSQL)

const triggerExists = async (triggerName: string, tableName: string) => {
const query = `
SELECT EXISTS (
SELECT 1
FROM pg_trigger
WHERE tgname = '${triggerName}'
AND tgenabled = 'O'
AND tgrelid = (
SELECT oid
FROM pg_class
WHERE relname = '${tableName}'
)
)`
const result = await queryInterface.sequelize.query(query, {
type: QueryTypes.SELECT,
})
return result.length > 0
}

if (!(await triggerExists('receipt_update', 'scalar_tap_receipts'))) {
logger.info('Create trigger for receipt update')
const triggerSQL = `
CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE
ON scalar_tap_receipts
FOR EACH ROW EXECUTE PROCEDURE scalar_tap_receipt_notify();
`
await queryInterface.sequelize.query(triggerSQL)
}

const indexExists = async (indexName: string, tableName: string) => {
const query = `
SELECT EXISTS (
SELECT 1
FROM pg_class t
INNER JOIN pg_index d ON t.oid = d.indrelid
INNER JOIN pg_class i ON d.indexrelid = i.oid
WHERE i.relkind = 'i'
AND i.relname = '${indexName}'
AND t.relname = '${tableName}'
)`
const result = await queryInterface.sequelize.query(query, {
type: QueryTypes.SELECT,
})
return result.length > 0
}

if (
!(await indexExists(
'scalar_tap_receipts_allocation_id_idx',
'scalar_tap_receipts',
))
) {
logger.debug('Create indices for allocation_id')
await queryInterface.addIndex('scalar_tap_receipts', ['allocation_id'], {
name: 'scalar_tap_receipts_allocation_id_idx',
})
}
if (
!(await indexExists(
'scalar_tap_receipts_timestamp_ns_idx',
'scalar_tap_receipts',
))
) {
logger.info('Create indices for timestamp_ns')
await queryInterface.addIndex('scalar_tap_receipts', ['timestamp_ns'], {
name: 'scalar_tap_receipts_timestamp_ns_idx',
})
}

if (tables.includes('scalar_tap_ravs')) {
logger.info(`scalar_tap_ravs already exist, migration not necessary`)
return
}
// Create the scalar_tap_ravs table if it doesn't exist
await queryInterface.createTable('scalar_tap_ravs', {
allocation_id: {
type: DataTypes.CHAR(40),
allowNull: false,
},
sender_address: {
type: DataTypes.CHAR(40),
allowNull: false,
},
rav: {
type: DataTypes.JSON,
allowNull: false,
},
final: {
type: DataTypes.BOOLEAN,
allowNull: false,
defaultValue: false,
},
createdAt: {
allowNull: false,
type: DataTypes.DATE,
},
updatedAt: {
allowNull: false,
type: DataTypes.DATE,
},
})

logger.info(`Add primary key`)
await queryInterface.addConstraint('scalar_tap_ravs', {
fields: ['allocation_id', 'sender_address'],
type: 'primary key',
name: 'pk_scalar_tap_ravs',
})

logger.info(
`Remove one-to-one relationship between AllocationSummary and Voucher`,
)
await queryInterface.removeConstraint('allocation_summaries', 'voucher')

logger.info(`Add RAV association with AllocationSummary`)
await queryInterface.addConstraint('scalar_tap_ravs', {
fields: ['allocation_id'],
type: 'foreign key',
name: 'allocation_summary',
references: {
table: 'allocation_summaries',
field: 'allocation',
},
onDelete: 'cascade',
onUpdate: 'cascade',
})
}

export async function down({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.info(`Remove foreign relationship`)
await queryInterface.removeConstraint('scalar_tap_ravs', 'allocationSummary')

// Drop the scalar_tap_ravs table
logger.info(`Drop table`)
await queryInterface.dropTable('scalar_tap_ravs')

logger.info(
`Re-add the one-to-one relationship between AllocationSummary and Voucher`,
)
await queryInterface.addConstraint('vouchers', {
fields: ['allocation'],
type: 'foreign key',
name: 'allocationSummary',
references: {
table: 'allocation_summaries',
field: 'allocation',
},
onUpdate: 'CASCADE',
onDelete: 'SET NULL',
})

logger.info(`Drop function, trigger, indices, and table`)
await queryInterface.sequelize.query(
'DROP TRIGGER IF EXISTS receipt_update ON scalar_tap_receipts',
)
await queryInterface.sequelize.query(
'DROP FUNCTION IF EXISTS scalar_tap_receipt_notify',
)
await queryInterface.removeIndex(
'scalar_tap_receipts',
'scalar_tap_receipts_allocation_id_idx',
)
await queryInterface.removeIndex(
'scalar_tap_receipts',
'scalar_tap_receipts_timestamp_ns_idx',
)
await queryInterface.dropTable('scalar_tap_receipts')
}
6 changes: 4 additions & 2 deletions packages/indexer-common/src/allocations/query-fees.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
collector.startReceiptCollecting()
collector.startVoucherProcessing()
if (collector.escrowContracts) {
collector.logger.info(`RAV processing is initiated`);
collector.startRAVProcessing()
}
await collector.queuePendingReceiptsFromDatabase()
Expand Down Expand Up @@ -404,8 +405,9 @@ export class AllocationReceiptCollector implements ReceiptCollector {
let pendingRAVs: ReceiptAggregateVoucher[] = []
try {
pendingRAVs = await this.pendingRAVs()
this.logger.debug(`Pending RAVs`, { pendingRAVs })
} catch (err) {
this.logger.warn(`Failed to query pending vouchers`, { err })
this.logger.warn(`Failed to query pending RAVs`, { err })
return
}

Expand Down Expand Up @@ -822,7 +824,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {

await this.models.receiptAggregateVouchers.destroy({
where: {
allocationId: signedRavs.map((signedRav) => signedRav.rav.allocationId),
allocation_id: signedRavs.map((signedRav) => signedRav.rav.allocationId),
},
})
signedRavs.map((signedRav) =>
Expand Down
14 changes: 7 additions & 7 deletions packages/indexer-common/src/query-fees/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export class Voucher extends Model<VoucherAttributes> implements VoucherAttribut
}

export interface ReceiptAggregateVoucherAttributes {
allocationId: Address
senderAddress: string
allocation_id: Address
sender_address: string
rav: JSON // JSON object mapped from SignedRav
final: boolean
}
Expand All @@ -59,8 +59,8 @@ export class ReceiptAggregateVoucher
extends Model<ReceiptAggregateVoucherAttributes>
implements ReceiptAggregateVoucherAttributes
{
public allocationId!: Address
public senderAddress!: string
public allocation_id!: Address
public sender_address!: string
public rav!: JSON
public final!: boolean

Expand Down Expand Up @@ -263,12 +263,12 @@ export function defineQueryFeeModels(sequelize: Sequelize): QueryFeeModels {

ReceiptAggregateVoucher.init(
{
allocationId: {
allocation_id: {
type: DataTypes.CHAR(40), // 40 because prefix '0x' gets removed by TAP agent
allowNull: false,
primaryKey: true,
},
senderAddress: {
sender_address: {
type: DataTypes.CHAR(40), // 40 because prefix '0x' gets removed by TAP agent
allowNull: false,
primaryKey: true,
Expand Down Expand Up @@ -456,7 +456,7 @@ export function defineQueryFeeModels(sequelize: Sequelize): QueryFeeModels {

ReceiptAggregateVoucher.belongsTo(AllocationSummary, {
targetKey: 'allocation',
foreignKey: 'allocation',
foreignKey: 'allocation_id',
as: 'allocationSummary',
})

Expand Down

0 comments on commit cbadd3b

Please sign in to comment.