From cbadd3b1040e68954c873154b9e5c0e8dc9fb79f Mon Sep 17 00:00:00 2001 From: hopeyen Date: Thu, 14 Dec 2023 16:11:48 -0600 Subject: [PATCH] feat: add scalar tap table migrations --- .../db/migrations/12-add-scalar-tap-table.ts | 244 ++++++++++++++++++ .../src/allocations/query-fees.ts | 6 +- .../indexer-common/src/query-fees/models.ts | 14 +- 3 files changed, 255 insertions(+), 9 deletions(-) create mode 100644 packages/indexer-agent/src/db/migrations/12-add-scalar-tap-table.ts diff --git a/packages/indexer-agent/src/db/migrations/12-add-scalar-tap-table.ts b/packages/indexer-agent/src/db/migrations/12-add-scalar-tap-table.ts new file mode 100644 index 000000000..3a9cac28c --- /dev/null +++ b/packages/indexer-agent/src/db/migrations/12-add-scalar-tap-table.ts @@ -0,0 +1,244 @@ +import { Logger } from '@graphprotocol/common-ts' +import { QueryInterface, DataTypes, QueryTypes } from 'sequelize' + +interface MigrationContext { + queryInterface: QueryInterface + logger: Logger +} + +interface Context { + context: MigrationContext +} + +export async function up({ context }: Context): Promise { + const { queryInterface, logger } = context + + const tables = await queryInterface.showAllTables() + logger.debug(`Checking if scalar_tap_receipts table exists`, { tables }) + + if (tables.includes('scalar_tap_receipts')) { + logger.debug(`scalar_tap_receipts already exist, migration not necessary`) + } else { + logger.info(`Create scalar_tap_receipts`) + await queryInterface.createTable('scalar_tap_receipts', { + id: { + type: DataTypes.BIGINT, + primaryKey: true, + autoIncrement: true, + }, + allocation_id: { + type: DataTypes.CHAR(40), + allowNull: false, + }, + sender_address: { + type: DataTypes.CHAR(40), + allowNull: false, + }, + timestamp_ns: { + type: DataTypes.DECIMAL(20), + allowNull: false, + }, + value: { + type: DataTypes.DECIMAL(39), + allowNull: false, + }, + receipt: { + type: DataTypes.JSON, + allowNull: false, + }, + }) + } + + logger.debug('Create function and trigger using raw SQL') + // const schemas = await queryInterface.showAllSchemas() + const functionSQL = `DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_proc + INNER JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE proname = 'scalar_tap_receipt_notify' + AND nspname = 'public' -- or your specific schema if not public + ) THEN + EXECUTE $func$ + CREATE FUNCTION scalar_tap_receipt_notify() + RETURNS trigger AS $body$ + BEGIN + PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "sender_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.sender_address, NEW.timestamp_ns, NEW.value)); + RETURN NEW; + END; + $body$ LANGUAGE plpgsql; + $func$; + END IF; + END $$;` + await queryInterface.sequelize.query(functionSQL) + + const triggerExists = async (triggerName: string, tableName: string) => { + const query = ` + SELECT EXISTS ( + SELECT 1 + FROM pg_trigger + WHERE tgname = '${triggerName}' + AND tgenabled = 'O' + AND tgrelid = ( + SELECT oid + FROM pg_class + WHERE relname = '${tableName}' + ) + )` + const result = await queryInterface.sequelize.query(query, { + type: QueryTypes.SELECT, + }) + return result.length > 0 + } + + if (!(await triggerExists('receipt_update', 'scalar_tap_receipts'))) { + logger.info('Create trigger for receipt update') + const triggerSQL = ` + CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE + ON scalar_tap_receipts + FOR EACH ROW EXECUTE PROCEDURE scalar_tap_receipt_notify(); + ` + await queryInterface.sequelize.query(triggerSQL) + } + + const indexExists = async (indexName: string, tableName: string) => { + const query = ` + SELECT EXISTS ( + SELECT 1 + FROM pg_class t + INNER JOIN pg_index d ON t.oid = d.indrelid + INNER JOIN pg_class i ON d.indexrelid = i.oid + WHERE i.relkind = 'i' + AND i.relname = '${indexName}' + AND t.relname = '${tableName}' + )` + const result = await queryInterface.sequelize.query(query, { + type: QueryTypes.SELECT, + }) + return result.length > 0 + } + + if ( + !(await indexExists( + 'scalar_tap_receipts_allocation_id_idx', + 'scalar_tap_receipts', + )) + ) { + logger.debug('Create indices for allocation_id') + await queryInterface.addIndex('scalar_tap_receipts', ['allocation_id'], { + name: 'scalar_tap_receipts_allocation_id_idx', + }) + } + if ( + !(await indexExists( + 'scalar_tap_receipts_timestamp_ns_idx', + 'scalar_tap_receipts', + )) + ) { + logger.info('Create indices for timestamp_ns') + await queryInterface.addIndex('scalar_tap_receipts', ['timestamp_ns'], { + name: 'scalar_tap_receipts_timestamp_ns_idx', + }) + } + + if (tables.includes('scalar_tap_ravs')) { + logger.info(`scalar_tap_ravs already exist, migration not necessary`) + return + } + // Create the scalar_tap_ravs table if it doesn't exist + await queryInterface.createTable('scalar_tap_ravs', { + allocation_id: { + type: DataTypes.CHAR(40), + allowNull: false, + }, + sender_address: { + type: DataTypes.CHAR(40), + allowNull: false, + }, + rav: { + type: DataTypes.JSON, + allowNull: false, + }, + final: { + type: DataTypes.BOOLEAN, + allowNull: false, + defaultValue: false, + }, + createdAt: { + allowNull: false, + type: DataTypes.DATE, + }, + updatedAt: { + allowNull: false, + type: DataTypes.DATE, + }, + }) + + logger.info(`Add primary key`) + await queryInterface.addConstraint('scalar_tap_ravs', { + fields: ['allocation_id', 'sender_address'], + type: 'primary key', + name: 'pk_scalar_tap_ravs', + }) + + logger.info( + `Remove one-to-one relationship between AllocationSummary and Voucher`, + ) + await queryInterface.removeConstraint('allocation_summaries', 'voucher') + + logger.info(`Add RAV association with AllocationSummary`) + await queryInterface.addConstraint('scalar_tap_ravs', { + fields: ['allocation_id'], + type: 'foreign key', + name: 'allocation_summary', + references: { + table: 'allocation_summaries', + field: 'allocation', + }, + onDelete: 'cascade', + onUpdate: 'cascade', + }) +} + +export async function down({ context }: Context): Promise { + const { queryInterface, logger } = context + + logger.info(`Remove foreign relationship`) + await queryInterface.removeConstraint('scalar_tap_ravs', 'allocationSummary') + + // Drop the scalar_tap_ravs table + logger.info(`Drop table`) + await queryInterface.dropTable('scalar_tap_ravs') + + logger.info( + `Re-add the one-to-one relationship between AllocationSummary and Voucher`, + ) + await queryInterface.addConstraint('vouchers', { + fields: ['allocation'], + type: 'foreign key', + name: 'allocationSummary', + references: { + table: 'allocation_summaries', + field: 'allocation', + }, + onUpdate: 'CASCADE', + onDelete: 'SET NULL', + }) + + logger.info(`Drop function, trigger, indices, and table`) + await queryInterface.sequelize.query( + 'DROP TRIGGER IF EXISTS receipt_update ON scalar_tap_receipts', + ) + await queryInterface.sequelize.query( + 'DROP FUNCTION IF EXISTS scalar_tap_receipt_notify', + ) + await queryInterface.removeIndex( + 'scalar_tap_receipts', + 'scalar_tap_receipts_allocation_id_idx', + ) + await queryInterface.removeIndex( + 'scalar_tap_receipts', + 'scalar_tap_receipts_timestamp_ns_idx', + ) + await queryInterface.dropTable('scalar_tap_receipts') +} diff --git a/packages/indexer-common/src/allocations/query-fees.ts b/packages/indexer-common/src/allocations/query-fees.ts index a56c73ac8..b8b9990dc 100644 --- a/packages/indexer-common/src/allocations/query-fees.ts +++ b/packages/indexer-common/src/allocations/query-fees.ts @@ -152,6 +152,7 @@ export class AllocationReceiptCollector implements ReceiptCollector { collector.startReceiptCollecting() collector.startVoucherProcessing() if (collector.escrowContracts) { + collector.logger.info(`RAV processing is initiated`); collector.startRAVProcessing() } await collector.queuePendingReceiptsFromDatabase() @@ -404,8 +405,9 @@ export class AllocationReceiptCollector implements ReceiptCollector { let pendingRAVs: ReceiptAggregateVoucher[] = [] try { pendingRAVs = await this.pendingRAVs() + this.logger.debug(`Pending RAVs`, { pendingRAVs }) } catch (err) { - this.logger.warn(`Failed to query pending vouchers`, { err }) + this.logger.warn(`Failed to query pending RAVs`, { err }) return } @@ -822,7 +824,7 @@ export class AllocationReceiptCollector implements ReceiptCollector { await this.models.receiptAggregateVouchers.destroy({ where: { - allocationId: signedRavs.map((signedRav) => signedRav.rav.allocationId), + allocation_id: signedRavs.map((signedRav) => signedRav.rav.allocationId), }, }) signedRavs.map((signedRav) => diff --git a/packages/indexer-common/src/query-fees/models.ts b/packages/indexer-common/src/query-fees/models.ts index bcaae0f90..ac66dae1c 100644 --- a/packages/indexer-common/src/query-fees/models.ts +++ b/packages/indexer-common/src/query-fees/models.ts @@ -49,8 +49,8 @@ export class Voucher extends Model implements VoucherAttribut } export interface ReceiptAggregateVoucherAttributes { - allocationId: Address - senderAddress: string + allocation_id: Address + sender_address: string rav: JSON // JSON object mapped from SignedRav final: boolean } @@ -59,8 +59,8 @@ export class ReceiptAggregateVoucher extends Model implements ReceiptAggregateVoucherAttributes { - public allocationId!: Address - public senderAddress!: string + public allocation_id!: Address + public sender_address!: string public rav!: JSON public final!: boolean @@ -263,12 +263,12 @@ export function defineQueryFeeModels(sequelize: Sequelize): QueryFeeModels { ReceiptAggregateVoucher.init( { - allocationId: { + allocation_id: { type: DataTypes.CHAR(40), // 40 because prefix '0x' gets removed by TAP agent allowNull: false, primaryKey: true, }, - senderAddress: { + sender_address: { type: DataTypes.CHAR(40), // 40 because prefix '0x' gets removed by TAP agent allowNull: false, primaryKey: true, @@ -456,7 +456,7 @@ export function defineQueryFeeModels(sequelize: Sequelize): QueryFeeModels { ReceiptAggregateVoucher.belongsTo(AllocationSummary, { targetKey: 'allocation', - foreignKey: 'allocation', + foreignKey: 'allocation_id', as: 'allocationSummary', })