Skip to content

Commit

Permalink
common: add pagination to tap collector
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Oct 10, 2024
1 parent 7223b49 commit d5fab3e
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 62 deletions.
3 changes: 3 additions & 0 deletions packages/indexer-common/src/allocations/__tests__/tap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const setupEach = async () => {
_meta: {
block: {
timestamp: Date.now(),
hash: 'str',
},
},
}
Expand Down Expand Up @@ -460,6 +461,7 @@ describe('TAP', () => {
return {
transactions: [
{
id: 'test',
allocationID: ALLOCATION_ID_2.toString().toLowerCase().replace('0x', ''),
timestamp: redeemDateSecs,
sender: {
Expand All @@ -470,6 +472,7 @@ describe('TAP', () => {
_meta: {
block: {
timestamp: nowSecs,
hash: 'test',
},
},
}
Expand Down
189 changes: 127 additions & 62 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
import { BigNumber } from 'ethers'
import pReduce from 'p-reduce'
import { TAPSubgraph } from '../tap-subgraph'
import { NetworkSubgraph } from '../network-subgraph'
import { NetworkSubgraph, QueryResult } from '../network-subgraph'
import gql from 'graphql-tag'

// every 15 minutes
Expand Down Expand Up @@ -65,16 +65,32 @@ interface RavWithAllocation {
}

export interface TapSubgraphResponse {
transactions: {
allocationID: string
transactions: TapTransaction[]
_meta: TapMeta
}

interface TapMeta {
block: {
timestamp: number
sender: {
id: string
}
}[]
_meta: {
hash: string
}
}

interface TapTransaction {
id: string
allocationID: string
timestamp: number
sender: {
id: string
}
}

export interface AllocationsResponse {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
allocations: any[]
meta: {
block: {
timestamp: number
hash: string
}
}
}
Expand Down Expand Up @@ -202,12 +218,34 @@ export class TapCollector {
const allocationIds: string[] = ravs.map((rav) =>
rav.getSignedRAV().rav.allocationId.toLowerCase(),
)

let hash: string | undefined = undefined
let lastId = ''
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const returnedAllocations: any[] = (
await this.networkSubgraph.query(
const returnedAllocations: any[] = []
const pageSize = 1000

for (;;) {
const result = await this.networkSubgraph.query<AllocationsResponse>(
gql`
query allocations($allocationIds: [String!]!) {
allocations(where: { id_in: $allocationIds }) {
query allocations(
$lastId: String!
$pageSize: Int!
$hash: String
$allocationIds: [String!]!
) {
meta: _meta(block: { hash: $hash }) {
block {
number
hash
timestamp
}
}
allocations(
first: $pageSize
block: { hash: $hash }
where: { id_gt: $lastId, id_in: $allocationIds }
) {
id
status
subgraphDeployment {
Expand All @@ -232,9 +270,19 @@ export class TapCollector {
}
}
`,
{ allocationIds },
{ allocationIds, lastId, pageSize, hash },
)
).data.allocations
if (!result.data) {
throw `There was an error while querying Network Subgraph. Errors: ${result.error}`
}

if (result.data.allocations.length < pageSize) {
break
}
hash = result.data.meta.block.hash
lastId = result.data.allocations.slice(-1)[0].id
returnedAllocations.push(...result.data.allocations)
}

if (returnedAllocations.length == 0) {
this.logger.error(
Expand Down Expand Up @@ -337,46 +385,73 @@ export class TapCollector {
public async findTransactionsForRavs(
ravs: ReceiptAggregateVoucher[],
): Promise<TapSubgraphResponse> {
const response = await this.tapSubgraph!.query<TapSubgraphResponse>(
gql`
query transactions(
$unfinalizedRavsAllocationIds: [String!]!
$senderAddresses: [String!]!
) {
transactions(
where: {
type: "redeem"
allocationID_in: $unfinalizedRavsAllocationIds
sender_: { id_in: $senderAddresses }
}
) {
allocationID
timestamp
sender {
id
}
}
_meta {
block {
timestamp
let meta: TapMeta | undefined = undefined
let lastId = ''
const transactions: TapTransaction[] = []
const pageSize = 1000

for (;;) {
const hash = meta?.block?.hash
const result: QueryResult<TapSubgraphResponse> =
await this.tapSubgraph.query<TapSubgraphResponse>(
gql`
query transactions(
$lastId: String!
$pageSize: Int!
$hash: String
$unfinalizedRavsAllocationIds: [String!]!
$senderAddresses: [String!]!
) {
transactions(
where: {
type: "redeem"
allocationID_in: $unfinalizedRavsAllocationIds
sender_: { id_in: $senderAddresses }
}
) {
id
allocationID
timestamp
sender {
id
}
}
_meta {
block {
hash
timestamp
}
}
}
}
}
`,
{
unfinalizedRavsAllocationIds: ravs.map((value) =>
toAddress(value.allocationId).toLowerCase(),
),
senderAddresses: ravs.map((value) =>
toAddress(value.senderAddress).toLowerCase(),
),
},
)
if (!response.data) {
throw `There was an error while querying Tap Subgraph. Errors: ${response.error}`
`,
{
lastId,
pageSize,
hash,
unfinalizedRavsAllocationIds: ravs.map((value) =>
toAddress(value.allocationId).toLowerCase(),
),
senderAddresses: ravs.map((value) =>
toAddress(value.senderAddress).toLowerCase(),
),
},
)

if (!result.data) {
throw `There was an error while querying Tap Subgraph. Errors: ${result.error}`
}
meta = result.data._meta
if (result.data.transactions.length < pageSize) {
break
}
lastId = result.data.transactions.slice(-1)[0].id
transactions.push(...result.data.transactions)
}

return response.data
return {
transactions,
_meta: meta!,
}
}

// for every allocation_id of this list that contains the redeemedAt less than the current
Expand Down Expand Up @@ -438,16 +513,6 @@ export class TapCollector {
function: 'submitRAVs()',
ravsToSubmit: signedRavs.length,
})
if (!this.tapContracts) {
logger.error(
`Undefined escrow contracts, but this shouldn't happen as RAV process is only triggered when escrow is provided. \n
If this error is encountered please report and oepn an issue at https://github.com/graphprotocol/indexer/issues`,
{
signedRavs,
},
)
return
}
const escrow = this.tapContracts

logger.info(`Redeem last RAVs on chain individually`, {
Expand Down

0 comments on commit d5fab3e

Please sign in to comment.