From 2e980c7683511dbe0ed605e0221b2a16100e8314 Mon Sep 17 00:00:00 2001 From: hschiau Date: Wed, 23 Oct 2024 21:17:47 +0300 Subject: [PATCH 1/3] SERVICES-2669: add cron job that handles indexing sessions --- .../analytics.indexer.module.ts | 2 + .../services/indexer.cron.service.ts | 215 ++++++++++++++++++ .../services/indexer.persistence.service.ts | 7 + src/private.app.module.ts | 2 + 4 files changed, 226 insertions(+) create mode 100644 src/modules/analytics-indexer/services/indexer.cron.service.ts diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts index 913fbd48b..4d877688c 100644 --- a/src/modules/analytics-indexer/analytics.indexer.module.ts +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -23,6 +23,7 @@ import { IndexerSessionSchema, } from './schemas/indexer.session.schema'; import { MXCommunicationModule } from 'src/services/multiversx-communication/mx.communication.module'; +import { IndexerCronService } from './services/indexer.cron.service'; @Module({ imports: [ @@ -49,6 +50,7 @@ import { MXCommunicationModule } from 'src/services/multiversx-communication/mx. IndexerPriceDiscoveryHandlerService, IndexerSessionRepositoryService, IndexerPersistenceService, + IndexerCronService, ], exports: [IndexerService], controllers: [AnalyticsIndexerController], diff --git a/src/modules/analytics-indexer/services/indexer.cron.service.ts b/src/modules/analytics-indexer/services/indexer.cron.service.ts new file mode 100644 index 000000000..02f0822ac --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.cron.service.ts @@ -0,0 +1,215 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; +import moment from 'moment'; +import { generateLogMessage } from 'src/utils/generate-log-message'; +import { Lock } from '@multiversx/sdk-nestjs-common'; +import { + IndexerJob, + IndexerSession, + IndexerStatus, +} from '../schemas/indexer.session.schema'; +import { PerformanceProfiler } from '@multiversx/sdk-nestjs-monitoring'; +import { CacheService } from '@multiversx/sdk-nestjs-cache'; +import { IndexerService } from './indexer.service'; +import { IndexerPersistenceService } from './indexer.persistence.service'; +import { IndexerEventTypes } from '../entities/indexer.event.types'; + +const JOB_MAX_ATTEMPTS = 3; + +@Injectable() +export class IndexerCronService { + constructor( + private readonly indexerService: IndexerService, + private readonly indexerPersistence: IndexerPersistenceService, + private readonly cachingService: CacheService, + @Inject(WINSTON_MODULE_PROVIDER) protected readonly logger: Logger, + ) {} + + async onModuleInit() { + await this.resetStuckJobsAndSessions(); + } + + @Cron(CronExpression.EVERY_10_SECONDS) + @Lock({ name: 'indexAnalytics' }) + public async indexAnalytics() { + let activeSession: IndexerSession; + + try { + activeSession = await this.indexerPersistence.getActiveSession(); + if (!activeSession) { + return; + } + + const sessionAbortSignal = await this.getSessionAbortSignal( + activeSession, + ); + if (sessionAbortSignal) { + return await this.markSessionAborted(activeSession); + } + + if (activeSession.status === IndexerStatus.PENDING) { + activeSession.status = IndexerStatus.IN_PROGRESS; + await this.indexerPersistence.updateSession(activeSession); + } + + await this.processIndexingJobs(activeSession); + } catch (error) { + const logMessage = generateLogMessage( + IndexerCronService.name, + this.indexAnalytics.name, + '', + error, + ); + this.logger.error(logMessage); + } + } + + private async processIndexingJobs(session: IndexerSession): Promise { + for (const [index, job] of session.jobs.entries()) { + if ( + job.status !== IndexerStatus.IN_PROGRESS && + job.status !== IndexerStatus.PENDING + ) { + continue; + } + + try { + job.status = IndexerStatus.IN_PROGRESS; + + await this.indexerPersistence.updateSession(session); + + session.jobs[index] = await this.runIndexingJob( + job, + session.eventTypes, + ); + + await this.indexerPersistence.updateSession(session); + } catch (error) { + await this.markSessionFailed(session); + break; + } + } + + return await this.markSessionCompleted(session); + } + + private async runIndexingJob( + job: IndexerJob, + eventTypes: IndexerEventTypes[], + ): Promise { + const profiler = new PerformanceProfiler(); + + const startDate = moment + .unix(job.startTimestamp) + .format('YYYY-MM-DD HH:mm:ss'); + const endDate = moment + .unix(job.endTimestamp) + .format('YYYY-MM-DD HH:mm:ss'); + + let errorsCount = 0; + while (job.runAttempts < JOB_MAX_ATTEMPTS) { + try { + errorsCount = await this.indexerService.indexAnalytics( + job.startTimestamp, + job.endTimestamp, + eventTypes, + ); + + profiler.stop(); + + this.logger.info( + `Finished indexing analytics data between '${startDate}' and '${endDate}' in ${profiler.duration}ms`, + { + context: 'IndexerCronService', + }, + ); + + job.runAttempts += 1; + job.errorCount = errorsCount; + job.durationMs = profiler.duration; + job.status = IndexerStatus.COMPLETED; + + return job; + } catch (error) { + job.runAttempts += 1; + console.log('EXTRA ATTEMPT', job.runAttempts); + } + } + + profiler.stop(); + + throw new Error( + `Failed to index analytics data between '${startDate}' and '${endDate}'`, + ); + } + + private async getSessionAbortSignal( + session: IndexerSession, + ): Promise { + const abortSignal = await this.cachingService.get( + `indexer.abortSession.${session.name}`, + ); + + if (abortSignal) { + return true; + } + + return false; + } + + private async markSessionAborted(session: IndexerSession): Promise { + session = this.updateSessionAndJobsStatus( + session, + [IndexerStatus.PENDING, IndexerStatus.IN_PROGRESS], + IndexerStatus.ABORTED, + ); + + await this.indexerPersistence.updateSession(session); + + await this.cachingService.delete( + `indexer.abortSession.${session.name}`, + ); + } + + private async markSessionFailed(session: IndexerSession): Promise { + session = this.updateSessionAndJobsStatus( + session, + [ + IndexerStatus.PENDING, + IndexerStatus.IN_PROGRESS, + IndexerStatus.ABORTED, + ], + IndexerStatus.FAILED, + ); + + await this.indexerPersistence.updateSession(session); + } + + private async markSessionCompleted(session: IndexerSession): Promise { + session.status = IndexerStatus.COMPLETED; + + await this.indexerPersistence.updateSession(session); + } + + private updateSessionAndJobsStatus( + session: IndexerSession, + affectedStatuses: IndexerStatus[], + newStatus: IndexerStatus, + ): IndexerSession { + session.status = newStatus; + + for (const job of session.jobs) { + if (affectedStatuses.includes(job.status)) { + job.status = newStatus; + } + } + + return session; + } + + private async resetStuckJobsAndSessions(): Promise { + // TODO implement reset functionality + } +} diff --git a/src/modules/analytics-indexer/services/indexer.persistence.service.ts b/src/modules/analytics-indexer/services/indexer.persistence.service.ts index 9accd5784..55e60d939 100644 --- a/src/modules/analytics-indexer/services/indexer.persistence.service.ts +++ b/src/modules/analytics-indexer/services/indexer.persistence.service.ts @@ -57,6 +57,13 @@ export class IndexerPersistenceService { }); } + public async updateSession(session: IndexerSession): Promise { + await this.indexerSessionRepository.findOneAndUpdate( + { name: session.name }, + session, + ); + } + private createSessionJobs(start: number, end: number): IndexerJob[] { const jobs: IndexerJob[] = []; const oneWeek = Constants.oneWeek(); diff --git a/src/private.app.module.ts b/src/private.app.module.ts index fae2896b8..fe8f12a2a 100644 --- a/src/private.app.module.ts +++ b/src/private.app.module.ts @@ -9,6 +9,7 @@ import { TokenModule } from './modules/tokens/token.module'; import { DynamicModuleUtils } from './utils/dynamic.module.utils'; import { ESTransactionsService } from './services/elastic-search/services/es.transactions.service'; import { AnalyticsIndexerModule } from './modules/analytics-indexer/analytics.indexer.module'; +import { ScheduleModule } from '@nestjs/schedule'; @Module({ imports: [ @@ -18,6 +19,7 @@ import { AnalyticsIndexerModule } from './modules/analytics-indexer/analytics.in RemoteConfigModule, DynamicModuleUtils.getCacheModule(), AnalyticsIndexerModule, + ScheduleModule.forRoot(), ], controllers: [MetricsController, TokenController, RemoteConfigController], providers: [ESTransactionsService], From ab7926cc2134c1252a3c2921ddaec4141aa475d5 Mon Sep 17 00:00:00 2001 From: hschiau Date: Thu, 24 Oct 2024 18:06:58 +0300 Subject: [PATCH 2/3] SERVICES-2669: index 'addInitialLiquidity' events + fixes - add default scroll timeout of '5m' to prevent failure --- .../analytics-indexer/entities/indexer.event.types.ts | 1 + src/modules/analytics-indexer/services/indexer.service.ts | 6 +++++- src/services/elastic-search/services/es.events.service.ts | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/modules/analytics-indexer/entities/indexer.event.types.ts b/src/modules/analytics-indexer/entities/indexer.event.types.ts index f72d91c42..cf4f7916f 100644 --- a/src/modules/analytics-indexer/entities/indexer.event.types.ts +++ b/src/modules/analytics-indexer/entities/indexer.event.types.ts @@ -2,6 +2,7 @@ export enum IndexerEventIdentifiers { SWAP_FIXED_INPUT = 'swapTokensFixedInput', SWAP_FIXED_OUTPUT = 'swapTokensFixedOutput', ADD_LIQUIDITY = 'addLiquidity', + ADD_INITIAL_LIQUIDITY = 'addInitialLiquidity', REMOVE_LIQUIDITY = 'removeLiquidity', PRICE_DISCOVERY_DEPOSIT = 'deposit', PRICE_DISCOVERY_WITHDRAW = 'withdraw', diff --git a/src/modules/analytics-indexer/services/indexer.service.ts b/src/modules/analytics-indexer/services/indexer.service.ts index ef013be20..5a7999343 100644 --- a/src/modules/analytics-indexer/services/indexer.service.ts +++ b/src/modules/analytics-indexer/services/indexer.service.ts @@ -67,7 +67,6 @@ export class IndexerService { const pairs = this.stateService.getPairsMetadata(); this.filterAddresses.push(...pairs.map((pair) => pair.address)); - this.filterAddresses.push(...scAddress.priceDiscovery); this.handleSwapEvents = eventTypes.includes( IndexerEventTypes.SWAP_EVENTS, @@ -90,6 +89,9 @@ export class IndexerService { if (this.handleLiquidityEvents) { this.eventIdentifiers.push(IndexerEventIdentifiers.ADD_LIQUIDITY); + this.eventIdentifiers.push( + IndexerEventIdentifiers.ADD_INITIAL_LIQUIDITY, + ); this.eventIdentifiers.push( IndexerEventIdentifiers.REMOVE_LIQUIDITY, ); @@ -102,6 +104,7 @@ export class IndexerService { this.eventIdentifiers.push( IndexerEventIdentifiers.PRICE_DISCOVERY_WITHDRAW, ); + this.filterAddresses.push(...scAddress.priceDiscovery); } } @@ -201,6 +204,7 @@ export class IndexerService { new SwapEvent(rawEvent), ); break; + case IndexerEventIdentifiers.ADD_INITIAL_LIQUIDITY: case IndexerEventIdentifiers.ADD_LIQUIDITY: if (!this.handleLiquidityEvents) { break; diff --git a/src/services/elastic-search/services/es.events.service.ts b/src/services/elastic-search/services/es.events.service.ts index 93f513c91..9369fb804 100644 --- a/src/services/elastic-search/services/es.events.service.ts +++ b/src/services/elastic-search/services/es.events.service.ts @@ -243,6 +243,10 @@ export class ElasticSearchEventsService { '', elasticQueryAdapter, action, + { + delayBetweenScrolls: 0, + scrollTimeout: '5m', + }, ); } } From 14153abc95588f0e21b968ae94d4dc666403da04 Mon Sep 17 00:00:00 2001 From: hschiau Date: Mon, 28 Oct 2024 18:56:23 +0200 Subject: [PATCH 3/3] SERVICES-2669: add extra logging --- .../analytics-indexer/services/indexer.cron.service.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/modules/analytics-indexer/services/indexer.cron.service.ts b/src/modules/analytics-indexer/services/indexer.cron.service.ts index 02f0822ac..c9e28236a 100644 --- a/src/modules/analytics-indexer/services/indexer.cron.service.ts +++ b/src/modules/analytics-indexer/services/indexer.cron.service.ts @@ -87,6 +87,10 @@ export class IndexerCronService { await this.indexerPersistence.updateSession(session); } catch (error) { + this.logger.error( + `Indexing session ${session.name} failed`, + error, + ); await this.markSessionFailed(session); break; } @@ -134,7 +138,10 @@ export class IndexerCronService { return job; } catch (error) { job.runAttempts += 1; - console.log('EXTRA ATTEMPT', job.runAttempts); + this.logger.error( + `Failed attempt #${job.runAttempts} while indexing analytics data between '${startDate}' and '${endDate}'`, + error, + ); } }