diff --git a/packages/indexer-common/src/allocations/__tests__/tap.test.ts b/packages/indexer-common/src/allocations/__tests__/tap.test.ts index b644ba528..f0697d4d3 100644 --- a/packages/indexer-common/src/allocations/__tests__/tap.test.ts +++ b/packages/indexer-common/src/allocations/__tests__/tap.test.ts @@ -1,10 +1,10 @@ import { - AllocationReceiptCollector, defineQueryFeeModels, GraphNode, Network, QueryFeeModels, TapSubgraphResponse, + TapCollector, } from '@graphprotocol/indexer-common' import { Address, @@ -24,16 +24,13 @@ import { utils } from 'ethers' declare const __DATABASE__: any declare const __LOG_LEVEL__: never let logger: Logger -let receiptCollector: AllocationReceiptCollector +let tapCollector: TapCollector let metrics: Metrics let queryFeeModels: QueryFeeModels let sequelize: Sequelize const timeout = 30000 -const startRAVProcessing = jest.spyOn( - AllocationReceiptCollector.prototype, - 'startRAVProcessing', -) +const startRAVProcessing = jest.spyOn(TapCollector.prototype, 'startRAVProcessing') const setup = async () => { logger = createLogger({ name: 'Indexer API Client', @@ -61,7 +58,7 @@ const setup = async () => { graphNode, metrics, ) - receiptCollector = network.receiptCollector + tapCollector = network.tapCollector! } const ALLOCATION_ID_1 = toAddress('edde47df40c29949a75a6693c77834c00b8ad626') @@ -97,7 +94,7 @@ const setupEach = async () => { await queryFeeModels.receiptAggregateVouchers.create(rav) jest - .spyOn(receiptCollector, 'findTransactionsForRavs') + .spyOn(tapCollector, 'findTransactionsForRavs') .mockImplementation(async (): Promise => { return { transactions: [], @@ -134,7 +131,7 @@ describe('TAP', () => { test( 'test getPendingRAVs', async () => { - const ravs = await receiptCollector['pendingRAVs']() + const ravs = await tapCollector['pendingRAVs']() expect(ravs).toEqual([ expect.objectContaining({ @@ -174,7 +171,7 @@ describe('TAP', () => { // it's not showing on the subgraph on a specific point in time // the timestamp of the subgraph is greater than the receipt id // should revert the rav - await receiptCollector['revertRavsRedeemed'](ravList, nowSecs - 1) + await tapCollector['revertRavsRedeemed'](ravList, nowSecs - 1) let lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { @@ -191,7 +188,7 @@ describe('TAP', () => { expect.objectContaining(ravList[2]), ]) - await receiptCollector['revertRavsRedeemed'](ravList, nowSecs) + await tapCollector['revertRavsRedeemed'](ravList, nowSecs) lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { @@ -207,7 +204,7 @@ describe('TAP', () => { expect.objectContaining(ravList[2]), ]) - await receiptCollector['revertRavsRedeemed'](ravList, nowSecs + 1) + await tapCollector['revertRavsRedeemed'](ravList, nowSecs + 1) lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { @@ -220,7 +217,7 @@ describe('TAP', () => { }) expect(lastRedeemedRavs).toEqual([expect.objectContaining(ravList[2])]) - await receiptCollector['revertRavsRedeemed'](ravList, nowSecs + 2) + await tapCollector['revertRavsRedeemed'](ravList, nowSecs + 2) lastRedeemedRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { @@ -253,7 +250,7 @@ describe('TAP', () => { await queryFeeModels.receiptAggregateVouchers.bulkCreate(ravList) // it's showing on the subgraph on a specific point in time - await receiptCollector['revertRavsRedeemed']( + await tapCollector['revertRavsRedeemed']( [ { allocationId: ALLOCATION_ID_1, @@ -305,7 +302,7 @@ describe('TAP', () => { ] await queryFeeModels.receiptAggregateVouchers.bulkCreate(ravList) - await receiptCollector['markRavsAsFinal'](nowSecs - 1) + await tapCollector['markRavsAsFinal'](nowSecs - 1) let finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { last: true, final: true }, @@ -313,13 +310,13 @@ describe('TAP', () => { expect(finalRavs).toEqual([]) - await receiptCollector['markRavsAsFinal'](nowSecs) + await tapCollector['markRavsAsFinal'](nowSecs) finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { last: true, final: true }, }) expect(finalRavs).toEqual([expect.objectContaining({ ...ravList[0], final: true })]) - await receiptCollector['markRavsAsFinal'](nowSecs + 1) + await tapCollector['markRavsAsFinal'](nowSecs + 1) finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { last: true, final: true }, }) @@ -328,7 +325,7 @@ describe('TAP', () => { expect.objectContaining({ ...ravList[1], final: true }), ]) - await receiptCollector['markRavsAsFinal'](nowSecs + 2) + await tapCollector['markRavsAsFinal'](nowSecs + 2) finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { last: true, final: true }, }) @@ -354,7 +351,7 @@ describe('TAP', () => { redeemedAt: new Date(redeemDate), } await queryFeeModels.receiptAggregateVouchers.create(rav2) - const ravs = await receiptCollector['pendingRAVs']() + const ravs = await tapCollector['pendingRAVs']() // The point is it will only return the rav that is not final expect(ravs.length).toEqual(1) expect(ravs).toEqual([ @@ -390,8 +387,8 @@ describe('TAP', () => { } await queryFeeModels.receiptAggregateVouchers.create(rav2) - let ravs = await receiptCollector['pendingRAVs']() - ravs = await receiptCollector['filterAndUpdateRavs'](ravs) + let ravs = await tapCollector['pendingRAVs']() + ravs = await tapCollector['filterAndUpdateRavs'](ravs) // The point is it will only return the rav that is not final expect(ravs).toEqual([ @@ -458,7 +455,7 @@ describe('TAP', () => { const redeemDateSecs = Math.floor(redeemDate / 1000) const nowSecs = Math.floor(Date.now() / 1000) const anotherFuncSpy = jest - .spyOn(receiptCollector, 'findTransactionsForRavs') + .spyOn(tapCollector, 'findTransactionsForRavs') .mockImplementation(async (): Promise => { return { transactions: [ @@ -489,8 +486,8 @@ describe('TAP', () => { redeemedAt: new Date(redeemDate), } await queryFeeModels.receiptAggregateVouchers.create(rav2) - let ravs = await receiptCollector['pendingRAVs']() - ravs = await receiptCollector['filterAndUpdateRavs'](ravs) + let ravs = await tapCollector['pendingRAVs']() + ravs = await tapCollector['filterAndUpdateRavs'](ravs) expect(anotherFuncSpy).toBeCalled() const finalRavs = await queryFeeModels.receiptAggregateVouchers.findAll({ where: { last: true, final: true }, diff --git a/packages/indexer-common/src/allocations/index.ts b/packages/indexer-common/src/allocations/index.ts index 022489129..a52b572fb 100644 --- a/packages/indexer-common/src/allocations/index.ts +++ b/packages/indexer-common/src/allocations/index.ts @@ -1,4 +1,5 @@ export * from './keys' export * from './query-fees' +export * from './tap-collector' export * from './monitor' export * from './types' diff --git a/packages/indexer-common/src/allocations/query-fees.ts b/packages/indexer-common/src/allocations/query-fees.ts index a914b3d93..53fff3e66 100644 --- a/packages/indexer-common/src/allocations/query-fees.ts +++ b/packages/indexer-common/src/allocations/query-fees.ts @@ -9,9 +9,7 @@ import { Address, Metrics, Eventual, - join as joinEventual, } from '@graphprotocol/common-ts' -import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings' import { Allocation, AllocationReceipt, @@ -19,22 +17,15 @@ import { IndexerErrorCode, QueryFeeModels, Voucher, - ReceiptAggregateVoucher, ensureAllocationSummary, TransactionManager, specification as spec, - SignedRAV, - allocationSigner, - tapAllocationIdProof, - parseGraphQLAllocation, } from '..' import { DHeap } from '@thi.ng/heaps' import { BigNumber, BigNumberish, Contract } from 'ethers' import { Op } from 'sequelize' import pReduce from 'p-reduce' -import { TAPSubgraph } from '../tap-subgraph' import { NetworkSubgraph } from '../network-subgraph' -import gql from 'graphql-tag' // Receipts are collected with a delay of 20 minutes after // the corresponding allocation was closed @@ -65,11 +56,6 @@ interface ReceiptMetrics { vouchersRedeemDuration: Histogram vouchersBatchRedeemSize: Gauge voucherCollectedFees: Gauge - ravRedeemsSuccess: Counter - ravRedeemsInvalid: Counter - ravRedeemsFailed: Counter - ravsRedeemDuration: Histogram - ravCollectedFees: Gauge } export interface AllocationPartialVouchers { @@ -82,11 +68,9 @@ export interface AllocationReceiptCollectorOptions { metrics: Metrics transactionManager: TransactionManager allocationExchange: Contract - tapContracts?: TapContracts allocations: Eventual models: QueryFeeModels networkSpecification: spec.NetworkSpecification - tapSubgraph: TAPSubgraph | undefined networkSubgraph: NetworkSubgraph } @@ -95,39 +79,12 @@ export interface ReceiptCollector { collectReceipts(actionID: number, allocation: Allocation): Promise } -interface ValidRavs { - belowThreshold: RavWithAllocation[] - eligible: RavWithAllocation[] -} - -interface RavWithAllocation { - rav: SignedRAV - allocation: Allocation - sender: Address -} - -export interface TapSubgraphResponse { - transactions: { - allocationID: string - timestamp: number - sender: { - id: string - } - }[] - _meta: { - block: { - timestamp: number - } - } -} - export class AllocationReceiptCollector implements ReceiptCollector { declare logger: Logger declare metrics: ReceiptMetrics declare models: QueryFeeModels declare transactionManager: TransactionManager declare allocationExchange: Contract - declare tapContracts: TapContracts | undefined declare allocations: Eventual declare collectEndpoint: URL declare partialVoucherEndpoint: URL @@ -137,9 +94,7 @@ export class AllocationReceiptCollector implements ReceiptCollector { declare voucherRedemptionBatchThreshold: BigNumber declare voucherRedemptionMaxBatchSize: number declare protocolNetwork: string - declare tapSubgraph: TAPSubgraph | undefined declare networkSubgraph: NetworkSubgraph - declare finalityTime: number // eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation private constructor() {} @@ -150,10 +105,8 @@ export class AllocationReceiptCollector implements ReceiptCollector { transactionManager, models, allocationExchange, - tapContracts, allocations, networkSpecification, - tapSubgraph, networkSubgraph, }: AllocationReceiptCollectorOptions): Promise { const collector = new AllocationReceiptCollector() @@ -165,10 +118,8 @@ export class AllocationReceiptCollector implements ReceiptCollector { collector.transactionManager = transactionManager collector.models = models collector.allocationExchange = allocationExchange - collector.tapContracts = tapContracts collector.allocations = allocations collector.protocolNetwork = networkSpecification.networkIdentifier - collector.tapSubgraph = tapSubgraph collector.networkSubgraph = networkSubgraph // Process Gateway routes @@ -181,26 +132,16 @@ export class AllocationReceiptCollector implements ReceiptCollector { voucherRedemptionThreshold, voucherRedemptionBatchThreshold, voucherRedemptionMaxBatchSize, - finalityTime, } = networkSpecification.indexerOptions collector.voucherRedemptionThreshold = voucherRedemptionThreshold collector.voucherRedemptionBatchThreshold = voucherRedemptionBatchThreshold collector.voucherRedemptionMaxBatchSize = voucherRedemptionMaxBatchSize - collector.finalityTime = finalityTime // Start the AllocationReceiptCollector // TODO: Consider calling methods conditionally based on a boolean // flag during startup. collector.startReceiptCollecting() collector.startVoucherProcessing() - if (collector.tapContracts && collector.tapSubgraph) { - collector.logger.info(`RAV processing is initiated`) - collector.startRAVProcessing() - } else { - collector.logger.info(`RAV process not initiated. - Tap Contracts: ${!!collector.tapContracts}. - Tap Subgraph: ${!!collector.tapSubgraph}.`) - } await collector.queuePendingReceiptsFromDatabase() return collector } @@ -444,311 +385,6 @@ export class AllocationReceiptCollector implements ReceiptCollector { }) } - startRAVProcessing() { - const notifyAndMapEligible = (signedRavs: ValidRavs) => { - if (signedRavs.belowThreshold.length > 0) { - const logger = this.logger.child({ function: 'startRAVProcessing()' }) - const totalValueGRT = formatGRT( - signedRavs.belowThreshold.reduce( - (total, signedRav) => - total.add(BigNumber.from(signedRav.rav.rav.valueAggregate)), - BigNumber.from(0), - ), - ) - logger.info(`Query RAVs below the redemption threshold`, { - hint: 'If you would like to redeem vouchers like this, reduce the voucher redemption threshold', - voucherRedemptionThreshold: formatGRT(this.voucherRedemptionThreshold), - belowThresholdCount: signedRavs.belowThreshold.length, - totalValueGRT, - allocations: signedRavs.belowThreshold.map( - (signedRav) => signedRav.rav.rav.allocationId, - ), - }) - } - return signedRavs.eligible - } - - const pendingRAVs = this.getPendingRAVs() - const signedRAVs = this.getSignedRAVsEventual(pendingRAVs) - const eligibleRAVs = signedRAVs - .map(notifyAndMapEligible) - .filter((signedRavs) => signedRavs.length > 0) - eligibleRAVs.pipe(async (ravs) => await this.submitRAVs(ravs)) - } - - private getPendingRAVs(): Eventual { - return joinEventual({ - timer: timer(30_000), - }).tryMap( - async () => { - let ravs = await this.pendingRAVs() - if (ravs.length === 0) { - this.logger.info(`No pending RAVs to process`) - return [] - } - if (ravs.length > 0) { - ravs = await this.filterAndUpdateRavs(ravs) - } - const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) - this.logger.info( - `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`, - ) - return ravs - .map((rav) => { - const signedRav = rav.getSignedRAV() - return { - rav: signedRav, - allocation: allocations.find( - (a) => a.id === toAddress(signedRav.rav.allocationId), - ), - sender: rav.senderAddress, - } - }) - .filter((rav) => rav.allocation !== undefined) as RavWithAllocation[] // this is safe because we filter out undefined allocations - }, - { onError: (err) => this.logger.error(`Failed to query pending RAVs`, { err }) }, - ) - } - - private async getAllocationsfromAllocationIds( - ravs: ReceiptAggregateVoucher[], - ): Promise { - const allocationIds: string[] = ravs.map((rav) => - rav.getSignedRAV().rav.allocationId.toLowerCase(), - ) - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const returnedAllocations: any[] = ( - await this.networkSubgraph.query( - gql` - query allocations($allocationIds: [String!]!) { - allocations(where: { id_in: $allocationIds }) { - id - status - subgraphDeployment { - id - stakedTokens - signalledTokens - queryFeesAmount - deniedAt - } - indexer { - id - } - allocatedTokens - createdAtEpoch - createdAtBlockHash - closedAtEpoch - closedAtEpoch - closedAtBlockHash - poi - queryFeeRebates - queryFeesCollected - } - } - `, - { allocationIds }, - ) - ).data.allocations - - if (returnedAllocations.length == 0) { - this.logger.error( - `No allocations returned for ${allocationIds} in network subgraph`, - ) - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return returnedAllocations.map((x) => parseGraphQLAllocation(x, this.protocolNetwork)) - } - - private getSignedRAVsEventual( - pendingRAVs: Eventual, - ): Eventual { - return pendingRAVs.tryMap( - async (pendingRAVs) => { - return await pReduce( - pendingRAVs, - async (results, rav) => { - if ( - BigNumber.from(rav.rav.rav.valueAggregate).lt( - this.voucherRedemptionThreshold, - ) - ) { - results.belowThreshold.push(rav) - } else { - results.eligible.push(rav) - } - return results - }, - { belowThreshold: [], eligible: [] }, - ) - }, - { onError: (err) => this.logger.error(`Failed to reduce to signed RAVs`, { err }) }, - ) - } - - // redeem only if last is true - // Later can add order and limit - private async pendingRAVs(): Promise { - return await this.models.receiptAggregateVouchers.findAll({ - where: { last: true, final: false }, - }) - } - - private async filterAndUpdateRavs( - ravsLastNotFinal: ReceiptAggregateVoucher[], - ): Promise { - const tapSubgraphResponse = await this.findTransactionsForRavs(ravsLastNotFinal) - - const redeemedRavsNotOnOurDatabase = tapSubgraphResponse.transactions.filter( - (tx) => - !ravsLastNotFinal.find( - (rav) => - toAddress(rav.senderAddress) === toAddress(tx.sender.id) && - toAddress(rav.allocationId) === toAddress(tx.allocationID), - ), - ) - - // for each transaction that is not redeemed on our database - // but was redeemed on the blockchain, update it to redeemed - if (redeemedRavsNotOnOurDatabase.length > 0) { - for (const rav of redeemedRavsNotOnOurDatabase) { - await this.markRavAsRedeemed( - toAddress(rav.allocationID), - toAddress(rav.sender.id), - rav.timestamp, - ) - } - } - - // Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain - const nonRedeemedRavs = ravsLastNotFinal - .filter((rav) => !!rav.redeemedAt) - .filter( - (rav) => - !tapSubgraphResponse.transactions.find( - (tx) => - toAddress(rav.senderAddress) === toAddress(tx.sender.id) && - toAddress(rav.allocationId) === toAddress(tx.allocationID), - ), - ) - - // we use the subgraph timestamp to make decisions - // block timestamp minus 1 minute (because of blockchain timestamp uncertainty) - const ONE_MINUTE = 60 - const blockTimestampSecs = tapSubgraphResponse._meta.block.timestamp - ONE_MINUTE - - // Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx. - // To handle a chain reorg that "unredeemed" the RAVs. - if (nonRedeemedRavs.length > 0) { - await this.revertRavsRedeemed(nonRedeemedRavs, blockTimestampSecs) - } - - // For all RAVs that passed finality time, we mark it as final - await this.markRavsAsFinal(blockTimestampSecs) - - return await this.models.receiptAggregateVouchers.findAll({ - where: { redeemedAt: null, final: false, last: true }, - }) - } - - public async findTransactionsForRavs( - ravs: ReceiptAggregateVoucher[], - ): Promise { - const response = await this.tapSubgraph!.query( - gql` - query transactions( - $unfinalizedRavsAllocationIds: [String!]! - $senderAddresses: [String!]! - ) { - transactions( - where: { - type: "redeem" - allocationID_in: $unfinalizedRavsAllocationIds - sender_: { id_in: $senderAddresses } - } - ) { - allocationID - timestamp - sender { - id - } - } - _meta { - block { - timestamp - } - } - } - `, - { - unfinalizedRavsAllocationIds: ravs.map((value) => - toAddress(value.allocationId).toLowerCase(), - ), - senderAddresses: ravs.map((value) => - toAddress(value.senderAddress).toLowerCase(), - ), - }, - ) - if (!response.data) { - throw `There was an error while querying Tap Subgraph. Errors: ${response.error}` - } - - return response.data - } - - // for every allocation_id of this list that contains the redeemedAt less than the current - // subgraph timestamp - private async revertRavsRedeemed( - ravsNotRedeemed: { allocationId: Address; senderAddress: Address }[], - blockTimestampSecs: number, - ) { - if (ravsNotRedeemed.length == 0) { - return - } - - // WE use sql directly due to a bug in sequelize update: - // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever) - const query = ` - UPDATE scalar_tap_ravs - SET redeemed_at = NULL - WHERE (allocation_id::char(40), sender_address::char(40)) IN (VALUES ${ravsNotRedeemed - .map( - (rav) => - `('${rav.allocationId - .toString() - .toLowerCase() - .replace('0x', '')}'::char(40), '${rav.senderAddress - .toString() - .toLowerCase() - .replace('0x', '')}'::char(40))`, - ) - .join(', ')}) - AND redeemed_at < to_timestamp(${blockTimestampSecs}) - ` - - await this.models.receiptAggregateVouchers.sequelize?.query(query) - - this.logger.warn( - `Reverted Redeemed RAVs: ${ravsNotRedeemed - .map((rav) => `(${rav.senderAddress},${rav.allocationId})`) - .join(', ')}`, - ) - } - - // we use blockTimestamp instead of NOW() because we must be older than - // the subgraph timestamp - private async markRavsAsFinal(blockTimestampSecs: number) { - const query = ` - UPDATE scalar_tap_ravs - SET final = TRUE - WHERE last = TRUE - AND final = FALSE - AND redeemed_at IS NOT NULL - AND redeemed_at < to_timestamp(${blockTimestampSecs - this.finalityTime}) - ` - - await this.models.receiptAggregateVouchers.sequelize?.query(query) - } - private encodeReceiptBatch(receipts: AllocationReceipt[]): BytesWriter { // Encode the receipt batch to a buffer // [allocationId, receipts[]] (in bytes) @@ -998,143 +634,6 @@ export class AllocationReceiptCollector implements ReceiptCollector { } } - private async submitRAVs(signedRavs: RavWithAllocation[]): Promise { - const logger = this.logger.child({ - function: 'submitRAVs()', - ravsToSubmit: signedRavs.length, - }) - if (!this.tapContracts) { - logger.error( - `Undefined escrow contracts, but this shouldn't happen as RAV process is only triggered when escrow is provided. \n - If this error is encountered please report and oepn an issue at https://github.com/graphprotocol/indexer/issues`, - { - signedRavs, - }, - ) - return - } - const escrow = this.tapContracts - - logger.info(`Redeem last RAVs on chain individually`, { - signedRavs, - }) - - // Redeem RAV one-by-one as no plual version available - for (const { rav: signedRav, allocation, sender } of signedRavs) { - const { rav } = signedRav - const stopTimer = this.metrics.ravsRedeemDuration.startTimer({ - allocation: rav.allocationId, - }) - try { - const proof = await tapAllocationIdProof( - allocationSigner(this.transactionManager.wallet, allocation), - parseInt(this.protocolNetwork.split(':')[1]), - sender, - toAddress(rav.allocationId), - toAddress(escrow.escrow.address), - ) - this.logger.debug(`Computed allocationIdProof`, { - allocationId: rav.allocationId, - proof, - }) - // Submit the signed RAV on chain - const txReceipt = await this.transactionManager.executeTransaction( - () => escrow.escrow.estimateGas.redeem(signedRav, proof), - (gasLimit) => - escrow.escrow.redeem(signedRav, proof, { - gasLimit, - }), - logger.child({ function: 'redeem' }), - ) - - // get tx receipt and post process - if (txReceipt === 'paused' || txReceipt === 'unauthorized') { - this.metrics.ravRedeemsInvalid.inc({ allocation: rav.allocationId }) - return - } - this.metrics.ravCollectedFees.set( - { allocation: rav.allocationId }, - parseFloat(rav.valueAggregate.toString()), - ) - - try { - await this.markRavAsRedeemed(toAddress(rav.allocationId), sender) - logger.info( - `Updated receipt aggregate vouchers table with redeemed_at for allocation ${rav.allocationId} and sender ${sender}`, - ) - } catch (err) { - logger.warn( - `Failed to update receipt aggregate voucher table with redeemed_at for allocation ${rav.allocationId}`, - { - err, - }, - ) - } - } catch (err) { - this.metrics.ravRedeemsFailed.inc({ allocation: rav.allocationId }) - logger.error(`Failed to redeem RAV`, { - err: indexerError(IndexerErrorCode.IE055, err), - }) - return - } - stopTimer() - } - - try { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - await this.models.allocationSummaries.sequelize!.transaction( - async (transaction) => { - for (const { rav: signedRav } of signedRavs) { - const { rav } = signedRav - const [summary] = await ensureAllocationSummary( - this.models, - toAddress(rav.allocationId), - transaction, - this.protocolNetwork, - ) - summary.withdrawnFees = BigNumber.from(summary.withdrawnFees) - .add(rav.valueAggregate) - .toString() - await summary.save({ transaction }) - } - }, - ) - - logger.info(`Updated allocation summaries table with withdrawn fees`) - } catch (err) { - logger.warn(`Failed to update allocation summaries`, { - err, - }) - } - - signedRavs.map((signedRav) => - this.metrics.ravRedeemsSuccess.inc({ allocation: signedRav.allocation.id }), - ) - } - - private async markRavAsRedeemed( - allocationId: Address, - senderAddress: Address, - timestamp?: number, - ) { - // WE use sql directly due to a bug in sequelize update: - // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever) - const query = ` - UPDATE scalar_tap_ravs - SET redeemed_at = ${timestamp ? timestamp : 'NOW()'} - WHERE allocation_id = '${allocationId - .toString() - .toLowerCase() - .replace('0x', '')}' - AND sender_address = '${senderAddress - .toString() - .toLowerCase() - .replace('0x', '')}' - ` - - await this.models.receiptAggregateVouchers.sequelize?.query(query) - } - public async queuePendingReceiptsFromDatabase(): Promise { // Obtain all closed allocations const closedAllocations = await this.models.allocationSummaries.findAll({ @@ -1280,41 +779,6 @@ const registerReceiptMetrics = (metrics: Metrics, networkIdentifier: string) => registers: [metrics.registry], labelNames: ['allocation'], }), - - ravRedeemsSuccess: new metrics.client.Counter({ - name: `indexer_agent_rav_exchanges_ok_${networkIdentifier}`, - help: 'Successfully redeemed ravs', - registers: [metrics.registry], - labelNames: ['allocation'], - }), - - ravRedeemsInvalid: new metrics.client.Counter({ - name: `indexer_agent_rav_exchanges_invalid_${networkIdentifier}`, - help: 'Invalid ravs redeems - tx paused or unauthorized', - registers: [metrics.registry], - labelNames: ['allocation'], - }), - - ravRedeemsFailed: new metrics.client.Counter({ - name: `indexer_agent_rav_redeems_failed_${networkIdentifier}`, - help: 'Failed redeems for ravs', - registers: [metrics.registry], - labelNames: ['allocation'], - }), - - ravsRedeemDuration: new metrics.client.Histogram({ - name: `indexer_agent_ravs_redeem_duration_${networkIdentifier}`, - help: 'Duration of redeeming ravs', - registers: [metrics.registry], - labelNames: ['allocation'], - }), - - ravCollectedFees: new metrics.client.Gauge({ - name: `indexer_agent_rav_collected_fees_${networkIdentifier}`, - help: 'Amount of query fees collected for a rav', - registers: [metrics.registry], - labelNames: ['allocation'], - }), }) interface GatewayRoutes { diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts new file mode 100644 index 000000000..95a51564e --- /dev/null +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -0,0 +1,608 @@ +import { Counter, Gauge, Histogram } from 'prom-client' +import { + Logger, + timer, + toAddress, + formatGRT, + Address, + Metrics, + Eventual, + join as joinEventual, +} from '@graphprotocol/common-ts' +import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings' +import { + Allocation, + indexerError, + IndexerErrorCode, + QueryFeeModels, + ReceiptAggregateVoucher, + ensureAllocationSummary, + TransactionManager, + specification as spec, + SignedRAV, + allocationSigner, + tapAllocationIdProof, + parseGraphQLAllocation, +} from '..' +import { BigNumber } from 'ethers' +import pReduce from 'p-reduce' +import { TAPSubgraph } from '../tap-subgraph' +import { NetworkSubgraph } from '../network-subgraph' +import gql from 'graphql-tag' + +const RAV_CHECK_INTERVAL_MS = 30_000 + +interface RavMetrics { + ravRedeemsSuccess: Counter + ravRedeemsInvalid: Counter + ravRedeemsFailed: Counter + ravsRedeemDuration: Histogram + ravCollectedFees: Gauge +} + +interface TapCollectorOptions { + logger: Logger + metrics: Metrics + transactionManager: TransactionManager + tapContracts: TapContracts + allocations: Eventual + models: QueryFeeModels + networkSpecification: spec.NetworkSpecification + tapSubgraph: TAPSubgraph + networkSubgraph: NetworkSubgraph +} + +interface ValidRavs { + belowThreshold: RavWithAllocation[] + eligible: RavWithAllocation[] +} + +interface RavWithAllocation { + rav: SignedRAV + allocation: Allocation + sender: Address +} + +export interface TapSubgraphResponse { + transactions: { + allocationID: string + timestamp: number + sender: { + id: string + } + }[] + _meta: { + block: { + timestamp: number + } + } +} + +export class TapCollector { + declare logger: Logger + declare metrics: RavMetrics + declare models: QueryFeeModels + declare transactionManager: TransactionManager + declare tapContracts: TapContracts + declare allocations: Eventual + declare ravRedemptionThreshold: BigNumber + declare protocolNetwork: string + declare tapSubgraph: TAPSubgraph + declare networkSubgraph: NetworkSubgraph + declare finalityTime: number + + // eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation + private constructor() {} + + public static async create({ + logger, + metrics, + transactionManager, + models, + tapContracts, + allocations, + networkSpecification, + tapSubgraph, + networkSubgraph, + }: TapCollectorOptions): Promise { + const collector = new TapCollector() + collector.logger = logger.child({ component: 'AllocationReceiptCollector' }) + collector.metrics = registerReceiptMetrics( + metrics, + networkSpecification.networkIdentifier, + ) + collector.transactionManager = transactionManager + collector.models = models + collector.tapContracts = tapContracts + collector.allocations = allocations + collector.protocolNetwork = networkSpecification.networkIdentifier + collector.tapSubgraph = tapSubgraph + collector.networkSubgraph = networkSubgraph + + const { voucherRedemptionThreshold, finalityTime } = + networkSpecification.indexerOptions + collector.ravRedemptionThreshold = voucherRedemptionThreshold + collector.finalityTime = finalityTime + + collector.logger.info(`RAV processing is initiated`) + collector.startRAVProcessing() + return collector + } + + startRAVProcessing() { + const notifyAndMapEligible = (signedRavs: ValidRavs) => { + if (signedRavs.belowThreshold.length > 0) { + const logger = this.logger.child({ function: 'startRAVProcessing()' }) + const totalValueGRT = formatGRT( + signedRavs.belowThreshold.reduce( + (total, signedRav) => + total.add(BigNumber.from(signedRav.rav.rav.valueAggregate)), + BigNumber.from(0), + ), + ) + logger.info(`Query RAVs below the redemption threshold`, { + hint: 'If you would like to redeem RAVs like this, reduce the voucher redemption threshold', + ravRedemptionThreshold: formatGRT(this.ravRedemptionThreshold), + belowThresholdCount: signedRavs.belowThreshold.length, + totalValueGRT, + allocations: signedRavs.belowThreshold.map( + (signedRav) => signedRav.rav.rav.allocationId, + ), + }) + } + return signedRavs.eligible + } + + const pendingRAVs = this.getPendingRAVs() + const signedRAVs = this.getSignedRAVsEventual(pendingRAVs) + const eligibleRAVs = signedRAVs + .map(notifyAndMapEligible) + .filter((signedRavs) => signedRavs.length > 0) + eligibleRAVs.pipe(async (ravs) => await this.submitRAVs(ravs)) + } + + private getPendingRAVs(): Eventual { + return joinEventual({ + timer: timer(RAV_CHECK_INTERVAL_MS), + }).tryMap( + async () => { + let ravs = await this.pendingRAVs() + if (ravs.length === 0) { + this.logger.info(`No pending RAVs to process`) + return [] + } + if (ravs.length > 0) { + ravs = await this.filterAndUpdateRavs(ravs) + } + const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) + this.logger.info( + `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`, + ) + return ravs + .map((rav) => { + const signedRav = rav.getSignedRAV() + return { + rav: signedRav, + allocation: allocations.find( + (a) => a.id === toAddress(signedRav.rav.allocationId), + ), + sender: rav.senderAddress, + } + }) + .filter((rav) => rav.allocation !== undefined) as RavWithAllocation[] // this is safe because we filter out undefined allocations + }, + { onError: (err) => this.logger.error(`Failed to query pending RAVs`, { err }) }, + ) + } + + private async getAllocationsfromAllocationIds( + ravs: ReceiptAggregateVoucher[], + ): Promise { + const allocationIds: string[] = ravs.map((rav) => + rav.getSignedRAV().rav.allocationId.toLowerCase(), + ) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const returnedAllocations: any[] = ( + await this.networkSubgraph.query( + gql` + query allocations($allocationIds: [String!]!) { + allocations(where: { id_in: $allocationIds }) { + id + status + subgraphDeployment { + id + stakedTokens + signalledTokens + queryFeesAmount + deniedAt + } + indexer { + id + } + allocatedTokens + createdAtEpoch + createdAtBlockHash + closedAtEpoch + closedAtEpoch + closedAtBlockHash + poi + queryFeeRebates + queryFeesCollected + } + } + `, + { allocationIds }, + ) + ).data.allocations + + if (returnedAllocations.length == 0) { + this.logger.error( + `No allocations returned for ${allocationIds} in network subgraph`, + ) + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return returnedAllocations.map((x) => parseGraphQLAllocation(x, this.protocolNetwork)) + } + + private getSignedRAVsEventual( + pendingRAVs: Eventual, + ): Eventual { + return pendingRAVs.tryMap( + async (pendingRAVs) => { + return await pReduce( + pendingRAVs, + async (results, rav) => { + if ( + BigNumber.from(rav.rav.rav.valueAggregate).lt(this.ravRedemptionThreshold) + ) { + results.belowThreshold.push(rav) + } else { + results.eligible.push(rav) + } + return results + }, + { belowThreshold: [], eligible: [] }, + ) + }, + { onError: (err) => this.logger.error(`Failed to reduce to signed RAVs`, { err }) }, + ) + } + + // redeem only if last is true + // Later can add order and limit + private async pendingRAVs(): Promise { + return await this.models.receiptAggregateVouchers.findAll({ + where: { last: true, final: false }, + }) + } + + private async filterAndUpdateRavs( + ravsLastNotFinal: ReceiptAggregateVoucher[], + ): Promise { + const tapSubgraphResponse = await this.findTransactionsForRavs(ravsLastNotFinal) + + const redeemedRavsNotOnOurDatabase = tapSubgraphResponse.transactions.filter( + (tx) => + !ravsLastNotFinal.find( + (rav) => + toAddress(rav.senderAddress) === toAddress(tx.sender.id) && + toAddress(rav.allocationId) === toAddress(tx.allocationID), + ), + ) + + // for each transaction that is not redeemed on our database + // but was redeemed on the blockchain, update it to redeemed + if (redeemedRavsNotOnOurDatabase.length > 0) { + for (const rav of redeemedRavsNotOnOurDatabase) { + await this.markRavAsRedeemed( + toAddress(rav.allocationID), + toAddress(rav.sender.id), + rav.timestamp, + ) + } + } + + // Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain + const nonRedeemedRavs = ravsLastNotFinal + .filter((rav) => !!rav.redeemedAt) + .filter( + (rav) => + !tapSubgraphResponse.transactions.find( + (tx) => + toAddress(rav.senderAddress) === toAddress(tx.sender.id) && + toAddress(rav.allocationId) === toAddress(tx.allocationID), + ), + ) + + // we use the subgraph timestamp to make decisions + // block timestamp minus 1 minute (because of blockchain timestamp uncertainty) + const ONE_MINUTE = 60 + const blockTimestampSecs = tapSubgraphResponse._meta.block.timestamp - ONE_MINUTE + + // Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx. + // To handle a chain reorg that "unredeemed" the RAVs. + if (nonRedeemedRavs.length > 0) { + await this.revertRavsRedeemed(nonRedeemedRavs, blockTimestampSecs) + } + + // For all RAVs that passed finality time, we mark it as final + await this.markRavsAsFinal(blockTimestampSecs) + + return await this.models.receiptAggregateVouchers.findAll({ + where: { redeemedAt: null, final: false, last: true }, + }) + } + + public async findTransactionsForRavs( + ravs: ReceiptAggregateVoucher[], + ): Promise { + const response = await this.tapSubgraph!.query( + gql` + query transactions( + $unfinalizedRavsAllocationIds: [String!]! + $senderAddresses: [String!]! + ) { + transactions( + where: { + type: "redeem" + allocationID_in: $unfinalizedRavsAllocationIds + sender_: { id_in: $senderAddresses } + } + ) { + allocationID + timestamp + sender { + id + } + } + _meta { + block { + timestamp + } + } + } + `, + { + unfinalizedRavsAllocationIds: ravs.map((value) => + toAddress(value.allocationId).toLowerCase(), + ), + senderAddresses: ravs.map((value) => + toAddress(value.senderAddress).toLowerCase(), + ), + }, + ) + if (!response.data) { + throw `There was an error while querying Tap Subgraph. Errors: ${response.error}` + } + + return response.data + } + + // for every allocation_id of this list that contains the redeemedAt less than the current + // subgraph timestamp + private async revertRavsRedeemed( + ravsNotRedeemed: { allocationId: Address; senderAddress: Address }[], + blockTimestampSecs: number, + ) { + if (ravsNotRedeemed.length == 0) { + return + } + + // WE use sql directly due to a bug in sequelize update: + // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever) + const query = ` + UPDATE scalar_tap_ravs + SET redeemed_at = NULL + WHERE (allocation_id::char(40), sender_address::char(40)) IN (VALUES ${ravsNotRedeemed + .map( + (rav) => + `('${rav.allocationId + .toString() + .toLowerCase() + .replace('0x', '')}'::char(40), '${rav.senderAddress + .toString() + .toLowerCase() + .replace('0x', '')}'::char(40))`, + ) + .join(', ')}) + AND redeemed_at < to_timestamp(${blockTimestampSecs}) + ` + + await this.models.receiptAggregateVouchers.sequelize?.query(query) + + this.logger.warn( + `Reverted Redeemed RAVs: ${ravsNotRedeemed + .map((rav) => `(${rav.senderAddress},${rav.allocationId})`) + .join(', ')}`, + ) + } + + // we use blockTimestamp instead of NOW() because we must be older than + // the subgraph timestamp + private async markRavsAsFinal(blockTimestampSecs: number) { + const query = ` + UPDATE scalar_tap_ravs + SET final = TRUE + WHERE last = TRUE + AND final = FALSE + AND redeemed_at IS NOT NULL + AND redeemed_at < to_timestamp(${blockTimestampSecs - this.finalityTime}) + ` + + await this.models.receiptAggregateVouchers.sequelize?.query(query) + } + + private async submitRAVs(signedRavs: RavWithAllocation[]): Promise { + const logger = this.logger.child({ + function: 'submitRAVs()', + ravsToSubmit: signedRavs.length, + }) + if (!this.tapContracts) { + logger.error( + `Undefined escrow contracts, but this shouldn't happen as RAV process is only triggered when escrow is provided. \n + If this error is encountered please report and oepn an issue at https://github.com/graphprotocol/indexer/issues`, + { + signedRavs, + }, + ) + return + } + const escrow = this.tapContracts + + logger.info(`Redeem last RAVs on chain individually`, { + signedRavs, + }) + + // Redeem RAV one-by-one as no plual version available + for (const { rav: signedRav, allocation, sender } of signedRavs) { + const { rav } = signedRav + const stopTimer = this.metrics.ravsRedeemDuration.startTimer({ + allocation: rav.allocationId, + }) + try { + const proof = await tapAllocationIdProof( + allocationSigner(this.transactionManager.wallet, allocation), + parseInt(this.protocolNetwork.split(':')[1]), + sender, + toAddress(rav.allocationId), + toAddress(escrow.escrow.address), + ) + this.logger.debug(`Computed allocationIdProof`, { + allocationId: rav.allocationId, + proof, + }) + // Submit the signed RAV on chain + const txReceipt = await this.transactionManager.executeTransaction( + () => escrow.escrow.estimateGas.redeem(signedRav, proof), + (gasLimit) => + escrow.escrow.redeem(signedRav, proof, { + gasLimit, + }), + logger.child({ function: 'redeem' }), + ) + + // get tx receipt and post process + if (txReceipt === 'paused' || txReceipt === 'unauthorized') { + this.metrics.ravRedeemsInvalid.inc({ allocation: rav.allocationId }) + return + } + this.metrics.ravCollectedFees.set( + { allocation: rav.allocationId }, + parseFloat(rav.valueAggregate.toString()), + ) + + try { + await this.markRavAsRedeemed(toAddress(rav.allocationId), sender) + logger.info( + `Updated receipt aggregate vouchers table with redeemed_at for allocation ${rav.allocationId} and sender ${sender}`, + ) + } catch (err) { + logger.warn( + `Failed to update receipt aggregate voucher table with redeemed_at for allocation ${rav.allocationId}`, + { + err, + }, + ) + } + } catch (err) { + this.metrics.ravRedeemsFailed.inc({ allocation: rav.allocationId }) + logger.error(`Failed to redeem RAV`, { + err: indexerError(IndexerErrorCode.IE055, err), + }) + return + } + stopTimer() + } + + try { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + await this.models.allocationSummaries.sequelize!.transaction( + async (transaction) => { + for (const { rav: signedRav } of signedRavs) { + const { rav } = signedRav + const [summary] = await ensureAllocationSummary( + this.models, + toAddress(rav.allocationId), + transaction, + this.protocolNetwork, + ) + summary.withdrawnFees = BigNumber.from(summary.withdrawnFees) + .add(rav.valueAggregate) + .toString() + await summary.save({ transaction }) + } + }, + ) + + logger.info(`Updated allocation summaries table with withdrawn fees`) + } catch (err) { + logger.warn(`Failed to update allocation summaries`, { + err, + }) + } + + signedRavs.map((signedRav) => + this.metrics.ravRedeemsSuccess.inc({ allocation: signedRav.allocation.id }), + ) + } + + private async markRavAsRedeemed( + allocationId: Address, + senderAddress: Address, + timestamp?: number, + ) { + // WE use sql directly due to a bug in sequelize update: + // https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever) + const query = ` + UPDATE scalar_tap_ravs + SET redeemed_at = ${timestamp ? timestamp : 'NOW()'} + WHERE allocation_id = '${allocationId + .toString() + .toLowerCase() + .replace('0x', '')}' + AND sender_address = '${senderAddress + .toString() + .toLowerCase() + .replace('0x', '')}' + ` + + await this.models.receiptAggregateVouchers.sequelize?.query(query) + } +} + +const registerReceiptMetrics = (metrics: Metrics, networkIdentifier: string) => ({ + ravRedeemsSuccess: new metrics.client.Counter({ + name: `indexer_agent_rav_exchanges_ok_${networkIdentifier}`, + help: 'Successfully redeemed ravs', + registers: [metrics.registry], + labelNames: ['allocation'], + }), + + ravRedeemsInvalid: new metrics.client.Counter({ + name: `indexer_agent_rav_exchanges_invalid_${networkIdentifier}`, + help: 'Invalid ravs redeems - tx paused or unauthorized', + registers: [metrics.registry], + labelNames: ['allocation'], + }), + + ravRedeemsFailed: new metrics.client.Counter({ + name: `indexer_agent_rav_redeems_failed_${networkIdentifier}`, + help: 'Failed redeems for ravs', + registers: [metrics.registry], + labelNames: ['allocation'], + }), + + ravsRedeemDuration: new metrics.client.Histogram({ + name: `indexer_agent_ravs_redeem_duration_${networkIdentifier}`, + help: 'Duration of redeeming ravs', + registers: [metrics.registry], + labelNames: ['allocation'], + }), + + ravCollectedFees: new metrics.client.Gauge({ + name: `indexer_agent_rav_collected_fees_${networkIdentifier}`, + help: 'Amount of query fees collected for a rav', + registers: [metrics.registry], + labelNames: ['allocation'], + }), +}) diff --git a/packages/indexer-common/src/network.ts b/packages/indexer-common/src/network.ts index 2dda0d353..29f8fecb6 100644 --- a/packages/indexer-common/src/network.ts +++ b/packages/indexer-common/src/network.ts @@ -37,6 +37,7 @@ import { QueryFeeModels } from './query-fees' import { readFileSync } from 'fs' import { TAPSubgraph } from './tap-subgraph' +import { TapCollector } from './allocations/tap-collector' export class Network { logger: Logger @@ -47,6 +48,7 @@ export class Network { transactionManager: TransactionManager networkMonitor: NetworkMonitor receiptCollector: AllocationReceiptCollector + tapCollector: TapCollector | undefined specification: spec.NetworkSpecification paused: Eventual isOperator: Eventual @@ -60,6 +62,7 @@ export class Network { transactionManager: TransactionManager, networkMonitor: NetworkMonitor, receiptCollector: AllocationReceiptCollector, + tapCollector: TapCollector | undefined, specification: spec.NetworkSpecification, paused: Eventual, isOperator: Eventual, @@ -72,6 +75,7 @@ export class Network { this.transactionManager = transactionManager this.networkMonitor = networkMonitor this.receiptCollector = receiptCollector + this.tapCollector = tapCollector this.specification = specification this.paused = paused this.isOperator = isOperator @@ -268,19 +272,39 @@ export class Network { // -------------------------------------------------------------------------------- // * Allocation Receipt Collector // -------------------------------------------------------------------------------- - const receiptCollector = await AllocationReceiptCollector.create({ + const scalarCollector = await AllocationReceiptCollector.create({ logger, metrics, transactionManager: transactionManager, models: queryFeeModels, allocationExchange: contracts.allocationExchange, - tapContracts, allocations, networkSpecification: specification, - tapSubgraph, networkSubgraph, }) + // -------------------------------------------------------------------------------- + // * TAP Collector + // -------------------------------------------------------------------------------- + let tapCollector: TapCollector | undefined = undefined + if (tapContracts && tapSubgraph) { + tapCollector = await TapCollector.create({ + logger, + metrics, + transactionManager: transactionManager, + models: queryFeeModels, + tapContracts, + allocations, + networkSpecification: specification, + tapSubgraph, + networkSubgraph, + }) + } else { + logger.info(`RAV process not initiated. + Tap Contracts: ${!!tapContracts}. + Tap Subgraph: ${!!tapSubgraph}.`) + } + // -------------------------------------------------------------------------------- // * Network // -------------------------------------------------------------------------------- @@ -292,7 +316,8 @@ export class Network { networkProvider, transactionManager, networkMonitor, - receiptCollector, + scalarCollector, + tapCollector, specification, paused, isOperator,