diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index abf1c5dde..15166f345 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -37,6 +37,7 @@ import { networkIsL1, DeploymentManagementMode, SubgraphStatus, + sequentialTimerMap, } from '@graphprotocol/indexer-common' import PQueue from 'p-queue' @@ -253,40 +254,41 @@ export class Agent { const requestIntervalSmall = this.pollingInterval const requestIntervalLarge = this.pollingInterval * 5 const logger = this.logger.child({ component: 'ReconciliationLoop' }) - const currentEpochNumber: Eventual> = timer( - requestIntervalLarge, - ).tryMap( - async () => - await this.multiNetworks.map(({ network }) => { - logger.trace('Fetching current epoch number', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.networkMonitor.currentEpochNumber() - }), - { - onError: error => - logger.warn(`Failed to fetch current epoch`, { error }), - }, - ) + const currentEpochNumber: Eventual> = + sequentialTimerMap( + { logger, milliseconds: requestIntervalLarge }, + async () => + await this.multiNetworks.map(({ network }) => { + logger.trace('Fetching current epoch number', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.networkMonitor.currentEpochNumber() + }), + { + onError: error => + logger.warn(`Failed to fetch current epoch`, { error }), + }, + ) - const maxAllocationEpochs: Eventual> = timer( - requestIntervalLarge, - ).tryMap( - () => - this.multiNetworks.map(({ network }) => { - logger.trace('Fetching max allocation epochs', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.contracts.staking.maxAllocationEpochs() - }), - { - onError: error => - logger.warn(`Failed to fetch max allocation epochs`, { error }), - }, - ) + const maxAllocationEpochs: Eventual> = + sequentialTimerMap( + { logger, milliseconds: requestIntervalLarge }, + () => + this.multiNetworks.map(({ network }) => { + logger.trace('Fetching max allocation epochs', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.contracts.staking.maxAllocationEpochs() + }), + { + onError: error => + logger.warn(`Failed to fetch max allocation epochs`, { error }), + }, + ) const indexingRules: Eventual> = - timer(requestIntervalSmall).tryMap( + sequentialTimerMap( + { logger, milliseconds: requestIntervalSmall }, async () => { return this.multiNetworks.map(async ({ network, operator }) => { logger.trace('Fetching indexing rules', { @@ -322,24 +324,25 @@ export class Agent { }, ) - const activeDeployments: Eventual = timer( - requestIntervalSmall, - ).tryMap( - () => { - logger.trace('Fetching active deployments') - return this.graphNode.subgraphDeployments() - }, - { - onError: error => - logger.warn( - `Failed to obtain active deployments, trying again later`, - { error }, - ), - }, - ) + const activeDeployments: Eventual = + sequentialTimerMap( + { logger, milliseconds: requestIntervalSmall }, + () => { + logger.trace('Fetching active deployments') + return this.graphNode.subgraphDeployments() + }, + { + onError: error => + logger.warn( + `Failed to obtain active deployments, trying again later`, + { error }, + ), + }, + ) const networkDeployments: Eventual> = - timer(requestIntervalSmall).tryMap( + sequentialTimerMap( + { logger, milliseconds: requestIntervalSmall }, async () => await this.multiNetworks.map(({ network }) => { logger.trace('Fetching network deployments', { @@ -358,7 +361,8 @@ export class Agent { const eligibleTransferDeployments: Eventual< NetworkMapped - > = timer(requestIntervalLarge).tryMap( + > = sequentialTimerMap( + { logger, milliseconds: requestIntervalLarge }, async () => { // Return early if the auto migration feature is disabled. if (!this.autoMigrationSupport) { @@ -558,23 +562,23 @@ export class Agent { }, ) - const activeAllocations: Eventual> = timer( - requestIntervalSmall, - ).tryMap( - () => - this.multiNetworks.map(({ network }) => { - logger.trace('Fetching active allocations', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.networkMonitor.allocations(AllocationStatus.ACTIVE) - }), - { - onError: () => - logger.warn( - `Failed to obtain active allocations, trying again later`, - ), - }, - ) + const activeAllocations: Eventual> = + sequentialTimerMap( + { logger, milliseconds: requestIntervalSmall }, + () => + this.multiNetworks.map(({ network }) => { + logger.trace('Fetching active allocations', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.networkMonitor.allocations(AllocationStatus.ACTIVE) + }), + { + onError: () => + logger.warn( + `Failed to obtain active allocations, trying again later`, + ), + }, + ) // `activeAllocations` is used to trigger this Eventual, but not really needed // inside. diff --git a/packages/indexer-common/src/allocations/monitor.ts b/packages/indexer-common/src/allocations/monitor.ts index 5ef5c9555..c67742f7c 100644 --- a/packages/indexer-common/src/allocations/monitor.ts +++ b/packages/indexer-common/src/allocations/monitor.ts @@ -2,12 +2,13 @@ import { indexerError, IndexerErrorCode, parseGraphQLAllocation, + sequentialTimerReduce, } from '@graphprotocol/indexer-common' import { Allocation, MonitorEligibleAllocationsOptions } from './types' import gql from 'graphql-tag' -import { Eventual, timer } from '@graphprotocol/common-ts' +import { Eventual } from '@graphprotocol/common-ts' export const monitorEligibleAllocations = ({ indexer, @@ -168,7 +169,14 @@ export const monitorEligibleAllocations = ({ } } - const allocations = timer(interval).reduce(refreshAllocations, []) + const allocations = sequentialTimerReduce( + { + logger, + milliseconds: interval, + }, + refreshAllocations, + [], + ) allocations.pipe((allocations) => { logger.info(`Eligible allocations`, { diff --git a/packages/indexer-common/src/allocations/query-fees.ts b/packages/indexer-common/src/allocations/query-fees.ts index 53fff3e66..31ba222c0 100644 --- a/packages/indexer-common/src/allocations/query-fees.ts +++ b/packages/indexer-common/src/allocations/query-fees.ts @@ -2,7 +2,6 @@ import { Counter, Gauge, Histogram } from 'prom-client' import axios from 'axios' import { Logger, - timer, BytesWriter, toAddress, formatGRT, @@ -20,6 +19,7 @@ import { ensureAllocationSummary, TransactionManager, specification as spec, + sequentialTimerMap, } from '..' import { DHeap } from '@thi.ng/heaps' import { BigNumber, BigNumberish, Contract } from 'ethers' @@ -264,7 +264,7 @@ export class AllocationReceiptCollector implements ReceiptCollector { } // Check if there's another batch of receipts to collect every 10s - timer(10_000).pipe(async () => { + sequentialTimerMap({ logger: this.logger, milliseconds: 10_000 }, async () => { while (hasReceiptsReadyForCollecting()) { // Remove the batch from the processing queue // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -283,7 +283,7 @@ export class AllocationReceiptCollector implements ReceiptCollector { } private startVoucherProcessing() { - timer(30_000).pipe(async () => { + sequentialTimerMap({ logger: this.logger, milliseconds: 30_000 }, async () => { let pendingVouchers: Voucher[] = [] try { pendingVouchers = await this.pendingVouchers() // Ordered by value diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index 7628cfa78..57b05f4f1 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -1,13 +1,11 @@ import { Counter, Gauge, Histogram } from 'prom-client' import { Logger, - timer, toAddress, formatGRT, Address, Metrics, Eventual, - join as joinEventual, } from '@graphprotocol/common-ts' import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings' import { @@ -23,6 +21,7 @@ import { allocationSigner, tapAllocationIdProof, parseGraphQLAllocation, + sequentialTimerMap, } from '..' import { BigNumber } from 'ethers' import pReduce from 'p-reduce' @@ -184,9 +183,11 @@ export class TapCollector { } private getPendingRAVs(): Eventual { - return joinEventual({ - timer: timer(RAV_CHECK_INTERVAL_MS), - }).tryMap( + return sequentialTimerMap( + { + logger: this.logger, + milliseconds: RAV_CHECK_INTERVAL_MS, + }, async () => { let ravs = await this.pendingRAVs() if (ravs.length === 0) { diff --git a/packages/indexer-common/src/index.ts b/packages/indexer-common/src/index.ts index 0090cad0d..c26f3cd2f 100644 --- a/packages/indexer-common/src/index.ts +++ b/packages/indexer-common/src/index.ts @@ -17,3 +17,4 @@ export * from './utils' export * from './parsers' export * as specification from './network-specification' export * from './multi-networks' +export * from './sequential-timer' diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index fde4fd0ee..3eb56a44b 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -18,10 +18,11 @@ import { Network, OrderDirection, GraphNode, + sequentialTimerMap, } from '@graphprotocol/indexer-common' import { Order, Transaction } from 'sequelize' -import { Eventual, join, Logger, timer } from '@graphprotocol/common-ts' +import { Eventual, join, Logger } from '@graphprotocol/common-ts' import groupBy from 'lodash.groupby' export class ActionManager { @@ -116,7 +117,11 @@ export class ActionManager { async monitorQueue(): Promise { const logger = this.logger.child({ component: 'QueueMonitor' }) - const approvedActions: Eventual = timer(30_000).tryMap( + const approvedActions: Eventual = sequentialTimerMap( + { + logger, + milliseconds: 30_000, + }, async () => { logger.trace('Fetching approved actions') let actions: Action[] = [] diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 900efb9d3..3e78ab3ce 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -19,6 +19,7 @@ import { resolveChainId, resolveChainAlias, TransferredSubgraphDeployment, + sequentialTimerReduce, } from '@graphprotocol/indexer-common' import { Address, @@ -27,7 +28,6 @@ import { mutable, NetworkContracts, SubgraphDeploymentID, - timer, toAddress, formatGRT, } from '@graphprotocol/common-ts' @@ -991,8 +991,12 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n const initialPauseValue = await contracts.controller.paused().catch((_) => { return false }) - return timer(60_000) - .reduce(async (currentlyPaused) => { + return sequentialTimerReduce( + { + logger, + milliseconds: 60_000, + }, + async (currentlyPaused) => { try { logger.debug('Query network subgraph isPaused state') const result = await networkSubgraph.checkedQuery(gql` @@ -1022,11 +1026,12 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n }) return currentlyPaused } - }, initialPauseValue) - .map((paused) => { - logger.info(paused ? `Network paused` : `Network active`) - return paused - }) + }, + initialPauseValue, + ).map((paused) => { + logger.info(paused ? `Network paused` : `Network active`) + return paused + }) } async monitorIsOperator( @@ -1042,30 +1047,32 @@ Please submit an issue at https://github.com/graphprotocol/block-oracle/issues/n return mutable(true) } - return timer(300_000) - .reduce( - async (isOperator) => { - try { - logger.debug('Check operator status') - return await contracts.staking.isOperator(wallet.address, indexerAddress) - } catch (err) { - logger.warn( - `Failed to check operator status for indexer, assuming it has not changed`, - { err: indexerError(IndexerErrorCode.IE008, err), isOperator }, - ) - return isOperator - } - }, - await contracts.staking.isOperator(wallet.address, indexerAddress), + return sequentialTimerReduce( + { + logger, + milliseconds: 300_000, + }, + async (isOperator) => { + try { + logger.debug('Check operator status') + return await contracts.staking.isOperator(wallet.address, indexerAddress) + } catch (err) { + logger.warn( + `Failed to check operator status for indexer, assuming it has not changed`, + { err: indexerError(IndexerErrorCode.IE008, err), isOperator }, + ) + return isOperator + } + }, + await contracts.staking.isOperator(wallet.address, indexerAddress), + ).map((isOperator) => { + logger.info( + isOperator + ? `Have operator status for indexer` + : `No operator status for indexer`, ) - .map((isOperator) => { - logger.info( - isOperator - ? `Have operator status for indexer` - : `No operator status for indexer`, - ) - return isOperator - }) + return isOperator + }) } async claimableAllocations(disputableEpoch: number): Promise { diff --git a/packages/indexer-common/src/network-subgraph.ts b/packages/indexer-common/src/network-subgraph.ts index ff8b4766d..d3140d383 100644 --- a/packages/indexer-common/src/network-subgraph.ts +++ b/packages/indexer-common/src/network-subgraph.ts @@ -1,10 +1,11 @@ import axios, { AxiosInstance, AxiosResponse } from 'axios' -import { Eventual, Logger, SubgraphDeploymentID, timer } from '@graphprotocol/common-ts' +import { Eventual, Logger, SubgraphDeploymentID } from '@graphprotocol/common-ts' import { DocumentNode, print } from 'graphql' import { OperationResult, CombinedError } from '@urql/core' import { BlockPointer, IndexingError } from './types' import { GraphNode } from './graph-node' import { SubgraphFreshnessChecker } from './subgraphs' +import { sequentialTimerReduce } from './sequential-timer' export interface NetworkSubgraphCreateOptions { logger: Logger @@ -233,52 +234,59 @@ const monitorDeployment = async ({ fatalError: undefined, } - return timer(60_000).reduce(async (lastStatus) => { - try { - logger.trace(`Checking the network subgraph deployment status`) + return sequentialTimerReduce( + { + logger, + milliseconds: 60_000, + }, + async (lastStatus) => { + try { + logger.trace(`Checking the network subgraph deployment status`) + + const indexingStatuses = await graphNode.indexingStatus([deployment]) + const indexingStatus = indexingStatuses.pop() + if (!indexingStatus) { + throw `No indexing status found` + } - const indexingStatuses = await graphNode.indexingStatus([deployment]) - const indexingStatus = indexingStatuses.pop() - if (!indexingStatus) { - throw `No indexing status found` - } + const status = { + health: indexingStatus.health, + synced: indexingStatus.synced, + latestBlock: indexingStatus.chains[0]?.latestBlock, + chainHeadBlock: indexingStatus.chains[0]?.chainHeadBlock, + fatalError: indexingStatus.fatalError, + } - const status = { - health: indexingStatus.health, - synced: indexingStatus.synced, - latestBlock: indexingStatus.chains[0]?.latestBlock, - chainHeadBlock: indexingStatus.chains[0]?.chainHeadBlock, - fatalError: indexingStatus.fatalError, - } + // If failed for the first time, log an error + if (!lastStatus || (!lastStatus.fatalError && status.fatalError)) { + logger.error(`Failed to index network subgraph deployment`, { + err: status.fatalError, + latestBlock: status.latestBlock, + }) + } - // If failed for the first time, log an error - if (!lastStatus || (!lastStatus.fatalError && status.fatalError)) { - logger.error(`Failed to index network subgraph deployment`, { - err: status.fatalError, - latestBlock: status.latestBlock, - }) - } + // Don't log anything else after the subgraph has failed + if (status.fatalError) { + return status + } - // Don't log anything else after the subgraph has failed - if (status.fatalError) { - return status - } + // If not synced yet, log the progress so far + if (!status.synced) { + const latestBlock = status.latestBlock?.number || 0 + const chainHeadBlock = status.chainHeadBlock?.number || 1 - // If not synced yet, log the progress so far - if (!status.synced) { - const latestBlock = status.latestBlock?.number || 0 - const chainHeadBlock = status.chainHeadBlock?.number || 1 + const syncedPercent = ((100 * latestBlock) / chainHeadBlock).toFixed(2) - const syncedPercent = ((100 * latestBlock) / chainHeadBlock).toFixed(2) + logger.info( + `Network subgraph is synced ${syncedPercent}% (block #${latestBlock} of #${chainHeadBlock})`, + ) + } - logger.info( - `Network subgraph is synced ${syncedPercent}% (block #${latestBlock} of #${chainHeadBlock})`, - ) + return status + } catch (err) { + return lastStatus } - - return status - } catch (err) { - return lastStatus - } - }, initialStatus) + }, + initialStatus, + ) } diff --git a/packages/indexer-common/src/sequential-timer.ts b/packages/indexer-common/src/sequential-timer.ts new file mode 100644 index 000000000..701cf36fa --- /dev/null +++ b/packages/indexer-common/src/sequential-timer.ts @@ -0,0 +1,164 @@ +import { + equal, + Eventual, + Logger, + Mapper, + mutable, + Reducer, + TryMapOptions, +} from '@graphprotocol/common-ts' + +function isPromiseLike(value: T | PromiseLike): value is PromiseLike { + return value && typeof (value as PromiseLike).then === 'function' +} + +export interface TimerTaskContext { + logger: Logger + milliseconds: number +} + +function logWorkTime( + workStarted: number, + logger: Logger, + loopTime: number, + caller: string | undefined, + milliseconds: number, +) { + const workTimeWarningThreshold = 1000 + const workTime = Date.now() - workStarted + logger.debug(`timer loop took ${loopTime}ms workTime ${workTime} caller(${caller})`) + if (loopTime > milliseconds + workTimeWarningThreshold) { + logger.warn( + 'timer work took longer than the sequential timer was configured for (+1s)', + { + loopTime, + milliseconds, + }, + ) + } +} + +/** + * Create an eventual that performs the work in the Reducer function every `milliseconds` milliseconds. + * The main difference between this and `timer(...).reduce(...)` is that this function will wait for the previous work to complete before starting the next one. + * + * @param milliseconds number + * @param reducer Reducer + * @param initial U + * @returns Eventual + */ +export function sequentialTimerReduce( + { logger, milliseconds }: TimerTaskContext, + reducer: Reducer, + initial: U, +): Eventual { + const output = mutable(initial) + // obtain the calling method name from the call stack + const stack = new Error().stack + const caller = stack?.split('\n')[2].trim() + let lastWorkStarted = Date.now() + + let acc: U = initial + let previousT: T | undefined + let latestT: T | undefined + + function outputReduce(value: U) { + previousT = latestT + acc = value + if (!equal(latestT, previousT)) { + output.push(value) + } + } + + function work() { + const workStarted = Date.now() + const promiseOrT = reducer(acc, workStarted) + const loopTime = workStarted - lastWorkStarted + + lastWorkStarted = workStarted + if (isPromiseLike(promiseOrT)) { + promiseOrT.then( + function onfulfilled(value) { + outputReduce(value) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + }, + function onrejected(err) { + console.error(err) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + }, + ) + } else { + outputReduce(promiseOrT) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + } + } + // initial call + setTimeout(work, milliseconds) + return output +} + +/** + * Create an eventual that performs the work in the Mapper function every `milliseconds` milliseconds. + * The main difference between this and `timer(...).tryMap(...)` is that this function will wait for the previous work to complete before starting the next one. + * + * @param milliseconds number + * @param mapper Mapper + * @param options TryMapOptions + * @returns Eventual + */ +export function sequentialTimerMap( + { logger, milliseconds }: TimerTaskContext, + mapper: Mapper, + options?: TryMapOptions, +): Eventual { + // obtain the calling method name from the call stack + const stack = new Error().stack + const caller = stack?.split('\n')[2].trim() + let lastWorkStarted = Date.now() + + const output = mutable() + + let latestU: U | undefined + + // this emulates the behavior of Eventual.tryMap + function checkMappedValue(value: U) { + if (!equal(latestU, value)) { + latestU = value + output.push(value) + } + } + + function work() { + const workStarted = Date.now() + const promiseOrU = mapper(workStarted) + const loopTime = workStarted - lastWorkStarted + lastWorkStarted = workStarted + + if (isPromiseLike(promiseOrU)) { + promiseOrU.then( + function onfulfilled(value) { + checkMappedValue(value) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + }, + function onrejected(err) { + options?.onError(err) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + }, + ) + } else { + // resolved value + checkMappedValue(promiseOrU) + logWorkTime(workStarted, logger, loopTime, caller, milliseconds) + setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + } + } + + // initial call + setTimeout(work, milliseconds) + return output +} diff --git a/packages/indexer-common/src/transactions.ts b/packages/indexer-common/src/transactions.ts index 9a73c08c9..0194e4769 100644 --- a/packages/indexer-common/src/transactions.ts +++ b/packages/indexer-common/src/transactions.ts @@ -13,7 +13,6 @@ import { Logger, mutable, NetworkContracts, - timer, toAddress, } from '@graphprotocol/common-ts' import delay from 'delay' @@ -22,6 +21,7 @@ import { IndexerError, indexerError, IndexerErrorCode } from './errors' import { TransactionConfig, TransactionType } from './types' import { NetworkSubgraph } from './network-subgraph' import gql from 'graphql-tag' +import { sequentialTimerReduce } from './sequential-timer' export class TransactionManager { ethereum: providers.BaseProvider @@ -323,44 +323,43 @@ export class TransactionManager { contracts: NetworkContracts, networkSubgraph: NetworkSubgraph, ): Promise> { - return timer(60_000) - .reduce( - async (currentlyPaused) => { - try { - const result = await networkSubgraph.checkedQuery(gql` - { - graphNetworks { - isPaused - } + return sequentialTimerReduce( + { + logger, + milliseconds: 60_000, + }, + async (currentlyPaused) => { + try { + const result = await networkSubgraph.checkedQuery(gql` + { + graphNetworks { + isPaused } - `) - - if (result.error) { - throw result.error } + `) - if (!result.data || result.data.length === 0) { - throw new Error(`No data returned by network subgraph`) - } + if (result.error) { + throw result.error + } - return result.data.graphNetworks[0].isPaused - } catch (err) { - logger.warn( - `Failed to check for network pause, assuming it has not changed`, - { - err: indexerError(IndexerErrorCode.IE007, err), - paused: currentlyPaused, - }, - ) - return currentlyPaused + if (!result.data || result.data.length === 0) { + throw new Error(`No data returned by network subgraph`) } - }, - await contracts.controller.paused(), - ) - .map((paused) => { - logger.info(paused ? `Network paused` : `Network active`) - return paused - }) + + return result.data.graphNetworks[0].isPaused + } catch (err) { + logger.warn(`Failed to check for network pause, assuming it has not changed`, { + err: indexerError(IndexerErrorCode.IE007, err), + paused: currentlyPaused, + }) + return currentlyPaused + } + }, + await contracts.controller.paused(), + ).map((paused) => { + logger.info(paused ? `Network paused` : `Network active`) + return paused + }) } async monitorIsOperator( @@ -376,29 +375,31 @@ export class TransactionManager { return mutable(true) } - return timer(60_000) - .reduce( - async (isOperator) => { - try { - return await contracts.staking.isOperator(wallet.address, indexerAddress) - } catch (err) { - logger.warn( - `Failed to check operator status for indexer, assuming it has not changed`, - { err: indexerError(IndexerErrorCode.IE008, err), isOperator }, - ) - return isOperator - } - }, - await contracts.staking.isOperator(wallet.address, indexerAddress), + return sequentialTimerReduce( + { + logger, + milliseconds: 60_000, + }, + async (isOperator) => { + try { + return await contracts.staking.isOperator(wallet.address, indexerAddress) + } catch (err) { + logger.warn( + `Failed to check operator status for indexer, assuming it has not changed`, + { err: indexerError(IndexerErrorCode.IE008, err), isOperator }, + ) + return isOperator + } + }, + await contracts.staking.isOperator(wallet.address, indexerAddress), + ).map((isOperator) => { + logger.info( + isOperator + ? `Have operator status for indexer` + : `No operator status for indexer`, ) - .map((isOperator) => { - logger.info( - isOperator - ? `Have operator status for indexer` - : `No operator status for indexer`, - ) - return isOperator - }) + return isOperator + }) } findEvent( diff --git a/packages/indexer-common/src/utils.ts b/packages/indexer-common/src/utils.ts index a486424a0..f9c4b014d 100644 --- a/packages/indexer-common/src/utils.ts +++ b/packages/indexer-common/src/utils.ts @@ -4,10 +4,11 @@ import { JsonRpcProvider, getDefaultProvider, } from '@ethersproject/providers' -import { Logger, Metrics, timer } from '@graphprotocol/common-ts' +import { Logger, Metrics } from '@graphprotocol/common-ts' import { indexerError, IndexerErrorCode } from './errors' import { DocumentNode, SelectionSetNode, Kind } from 'graphql' import cloneDeep from 'lodash.clonedeep' +import { sequentialTimerMap } from './sequential-timer' export const parseBoolean = ( val: string | boolean | number | undefined | null, @@ -49,7 +50,7 @@ export async function monitorEthBalance( const balanceMetrics = registerMetrics(metrics, networkIdentifier) - timer(120_000).pipe(async () => { + sequentialTimerMap({ logger, milliseconds: 120_000 }, async () => { try { const balance = await wallet.getBalance() const eth = parseFloat(utils.formatEther(balance)) diff --git a/packages/indexer-service/src/query-fees/allocations.ts b/packages/indexer-service/src/query-fees/allocations.ts index cef0c00a3..49ab07c8f 100644 --- a/packages/indexer-service/src/query-fees/allocations.ts +++ b/packages/indexer-service/src/query-fees/allocations.ts @@ -5,9 +5,10 @@ import { QueryFeeModels, AllocationReceiptAttributes, ensureAllocationSummary, + sequentialTimerMap, } from '@graphprotocol/indexer-common' import { NativeSignatureVerifier } from '@graphprotocol/indexer-native' -import { Address, Logger, timer, toAddress } from '@graphprotocol/common-ts' +import { Address, Logger, toAddress } from '@graphprotocol/common-ts' import { Sequelize, Transaction } from 'sequelize' import pRetry from 'p-retry' import { ReceiptManager } from '.' @@ -61,7 +62,7 @@ export class AllocationReceiptManager implements ReceiptManager { this._allocationReceiptVerifier = new NativeSignatureVerifier(clientSignerAddress) this.protocolNetwork = protocolNetwork - timer(30_000).pipe(async () => { + sequentialTimerMap({ logger: this.logger, milliseconds: 30_000 }, async () => { try { await this._flushOutstanding() } catch (err) {