From 9b7d3f4c3ce235b21100db5bcc39bde746cbf840 Mon Sep 17 00:00:00 2001 From: robert Date: Thu, 25 Jul 2024 18:59:17 +0800 Subject: [PATCH] optimization direct points. --- src/app.module.ts | 8 +- src/app.service.ts | 10 +- .../directHoldProcessingStatus.entity.ts | 12 ++ src/entities/index.ts | 1 + src/entities/supplementPoint.entity.ts | 2 +- ...enerateDirectPointProcessingStatusTable.ts | 22 +++ src/points/directPoint.service.ts | 141 +++++++++++++----- .../directHoldProcessingStatus.repository.ts | 28 ++++ src/repositories/index.ts | 1 + src/repositories/points.repository.ts | 18 ++- .../supplementPoint.repository.ts | 6 +- 11 files changed, 193 insertions(+), 56 deletions(-) create mode 100644 src/entities/directHoldProcessingStatus.entity.ts create mode 100644 src/migrations/1721893869511-GenerateDirectPointProcessingStatusTable.ts create mode 100644 src/repositories/directHoldProcessingStatus.repository.ts diff --git a/src/app.module.ts b/src/app.module.ts index 5a44f84..fae8be9 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -44,6 +44,7 @@ import { SeasonTotalPointRepository, OtherPointRepository, SupplementPointRepository, + DirectHoldProcessingStatusRepository, } from "./repositories"; import { Block, @@ -71,7 +72,8 @@ import { Invites, SeasonTotalPoint, OtherPoint, - supplementPoint, + SupplementPoint, + DirectHoldProcessingStatus, } from "./entities"; import { typeOrmReferModuleOptions, typeOrmLrtModuleOptions, typeOrmExplorerModuleOptions } from "./typeorm.config"; import { RetryDelayProvider } from "./retryDelay.provider"; @@ -156,7 +158,8 @@ import { CalTxPointService } from "./points/calTxPoint.service"; TvlProcessingStatus, TxProcessingStatus, SeasonTotalPoint, - supplementPoint, + SupplementPoint, + DirectHoldProcessingStatus, ], "lrt" ), @@ -232,6 +235,7 @@ import { CalTxPointService } from "./points/calTxPoint.service"; OtherPointRepository, ProjectTvlService, SupplementPointRepository, + DirectHoldProcessingStatusRepository, ], }) export class AppModule {} diff --git a/src/app.service.ts b/src/app.service.ts index ed18900..ee31491 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -12,6 +12,7 @@ import { RedistributePointService } from "./points/redistributePoint.service"; import { BaseDataService } from "./points/baseData.service"; import { ReferralPointService } from "./points/referralPoints.service"; import { SeasonTotalPointService } from "./points/seasonTotalPoint.service"; +import { DirectPointService } from "./points/directPoint.service"; @Injectable() export class AppService implements OnModuleInit, OnModuleDestroy { @@ -28,7 +29,8 @@ export class AppService implements OnModuleInit, OnModuleDestroy { private readonly calTxPointService: CalTxPointService, private readonly redistributePointService: RedistributePointService, private readonly referralPointService: ReferralPointService, - private readonly seasonTotalPointService: SeasonTotalPointService + private readonly seasonTotalPointService: SeasonTotalPointService, + private readonly directPointService: DirectPointService ) { this.logger = new Logger(AppService.name); } @@ -39,9 +41,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy { // second params is utc+8 // await this.tvlPointService.handleHoldPoint(1395273, new Date(1715159940 * 1000).toISOString()); // this.compensatePoints() - await this.seasonTotalPointService.handlePoint(); - await this.referralPointService.handleReferralPoint(); - await this.seasonTotalPointService.handlePoint(); + await this.directPointService.runProcess(); this.redistributePointService.runProcess(); this.startWorkers(); } @@ -68,6 +68,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy { this.calTvlPointService.start(), this.calTxPointService.start(), this.seasonTotalPointService.start(), + this.directPointService.start(), ]); } @@ -80,6 +81,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy { this.calTvlPointService.stop(), this.calTxPointService.stop(), this.seasonTotalPointService.stop(), + this.directPointService.stop(), ]); } diff --git a/src/entities/directHoldProcessingStatus.entity.ts b/src/entities/directHoldProcessingStatus.entity.ts new file mode 100644 index 0000000..8fe0a3d --- /dev/null +++ b/src/entities/directHoldProcessingStatus.entity.ts @@ -0,0 +1,12 @@ +import { Entity, Column, Unique, Index, PrimaryColumn } from "typeorm"; +import { BaseEntity } from "./base.entity"; + +@Entity({ name: "directHoldProcessingStatus" }) +@Index(["pointProcessed"]) +export class DirectHoldProcessingStatus extends BaseEntity { + @PrimaryColumn() + blockNumber: number; + + @Column({ default: false }) + pointProcessed: boolean; +} diff --git a/src/entities/index.ts b/src/entities/index.ts index 2ab496b..4c6a925 100644 --- a/src/entities/index.ts +++ b/src/entities/index.ts @@ -27,3 +27,4 @@ export * from "./seasonTotalPoint.entity"; export * from "./invites.entity"; export * from "./otherPoints.entity"; export * from "./supplementPoint.entity"; +export * from "./directHoldProcessingStatus.entity"; diff --git a/src/entities/supplementPoint.entity.ts b/src/entities/supplementPoint.entity.ts index e1df883..28f1cc1 100644 --- a/src/entities/supplementPoint.entity.ts +++ b/src/entities/supplementPoint.entity.ts @@ -8,7 +8,7 @@ export enum SupplementPointType { @Entity({ name: "supplementPoint" }) @Index("idx_supplementPoint_1", ["address", "batchString", "type"]) -export class supplementPoint extends BaseEntity { +export class SupplementPoint extends BaseEntity { @Column({ type: "bytea", transformer: hexTransformer }) public address: string; diff --git a/src/migrations/1721893869511-GenerateDirectPointProcessingStatusTable.ts b/src/migrations/1721893869511-GenerateDirectPointProcessingStatusTable.ts new file mode 100644 index 0000000..576d9a5 --- /dev/null +++ b/src/migrations/1721893869511-GenerateDirectPointProcessingStatusTable.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class GenerateDirectPointProcessingStatusTable1721893869511 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "directHoldProcessingStatus" ( + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + "blockNumber" integer NOT NULL, + "pointProcessed" boolean NOT NULL DEFAULT false, + CONSTRAINT "PK_directHoldProcessingStatus" PRIMARY KEY ("blockNumber"))` + ); + await queryRunner.query( + `CREATE INDEX "IDX_directHoldProcessingStatus_1" ON "directHoldProcessingStatus" ("pointProcessed") ` + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_directHoldProcessingStatus_1"`); + await queryRunner.query(`DROP TABLE "directHoldProcessingStatus"`); + } +} diff --git a/src/points/directPoint.service.ts b/src/points/directPoint.service.ts index 9c16b62..39194de 100644 --- a/src/points/directPoint.service.ts +++ b/src/points/directPoint.service.ts @@ -8,14 +8,16 @@ import { BlockTokenPriceRepository, BlockAddressPointRepository, AddressFirstDepositRepository, + DirectHoldProcessingStatusRepository, } from "../repositories"; import { TokenMultiplier, TokenService } from "../token/token.service"; import BigNumber from "bignumber.js"; import { hexTransformer } from "../transformers/hex.transformer"; import { ConfigService } from "@nestjs/config"; -import { getETHPrice, getTokenPrice, STABLE_COIN_TYPE } from "./baseData.service"; +import { BaseDataService, getETHPrice, getTokenPrice, STABLE_COIN_TYPE } from "./baseData.service"; import addressMultipliers from "../config/addressMultipliers"; import waitFor from "src/utils/waitFor"; +import { LrtUnitOfWork } from "src/unitOfWork"; export const LOYALTY_BOOSTER_FACTOR: BigNumber = new BigNumber(0.005); type BlockAddressTvl = { @@ -39,6 +41,9 @@ export class DirectPointService extends Worker { private readonly balanceRepository: BalanceRepository, private readonly blockRepository: BlockRepository, private readonly addressFirstDepositRepository: AddressFirstDepositRepository, + private readonly directHoldProcessingStatusRepository: DirectHoldProcessingStatusRepository, + private readonly baseDataService: BaseDataService, + private readonly unitOfWork: LrtUnitOfWork, private readonly configService: ConfigService ) { super(); @@ -54,15 +59,7 @@ export class DirectPointService extends Worker { } @Cron("0 2,10,18 * * *") - protected async runProcess(): Promise { - try { - await this.handleHoldPoint(); - } catch (error) { - this.logger.error("Failed to calculate hold point", error.stack); - } - } - - async handleHoldPoint() { + public async saveBlockNumber(): Promise { const currentBlockNumber = await this.balanceRepository.getLatesBlockNumber(); if (!currentBlockNumber) { this.logger.log(`Wait for the next hold point statistical block`); @@ -73,7 +70,44 @@ export class DirectPointService extends Worker { number: currentBlockNumber, }, }); - await waitFor(() => false, 60 * 1000, 60 * 1000); + if (!currentStatisticalBlock) { + this.logger.error(`No block found for block number: ${currentBlockNumber}`); + return; + } + await this.baseDataService.updateTokenPrice(currentStatisticalBlock); + await this.directHoldProcessingStatusRepository.upsertStatus({ + blockNumber: currentBlockNumber, + pointProcessed: false, + }); + } + + public async runProcess(): Promise { + const blockNumbers = await this.directHoldProcessingStatusRepository.getUnprocessedBlockNumber(); + if (blockNumbers.length === 0) { + this.logger.log(`No block to process`); + return; + } + for (const blockNumber of blockNumbers) { + try { + await this.handleHoldPoint(blockNumber.blockNumber); + } catch (error) { + this.logger.error("Failed to calculate hold point", error.stack); + } + } + await waitFor(() => false, 1000, 1000); + } + + async handleHoldPoint(currentBlockNumber: number) { + const addressHoldPoints: { + address: string; + holdPoint: number; + blockNumber: number; + }[] = []; + const currentStatisticalBlock = await this.blockRepository.getLastBlock({ + where: { + number: currentBlockNumber, + }, + }); const statisticStartTime = new Date(); const earlyBirdMultiplier = this.getEarlyBirdMultiplier(currentStatisticalBlock.timestamp); @@ -81,15 +115,9 @@ export class DirectPointService extends Worker { const tokenPriceMap = await this.getTokenPriceMap(currentStatisticalBlock.number); const blockTs = currentStatisticalBlock.timestamp.getTime(); const addressTvlMap = await this.getAddressTvlMap(currentStatisticalBlock.number, blockTs, tokenPriceMap); - for (const address of addressTvlMap.keys()) { - const fromBlockAddressPoint = await this.blockAddressPointRepository.getBlockAddressPoint( - currentStatisticalBlock.number, - address - ); - if (!!fromBlockAddressPoint && fromBlockAddressPoint.holdPoint > 0) { - this.logger.log(`Address hold point calculated: ${address}`); - continue; - } + const addresses = Array.from(addressTvlMap.keys()); + this.logger.log(`Start loop address, address count: ${addresses.length}`); + for (const address of addresses) { const addressTvl = addressTvlMap.get(address); const addressMultiplier = this.getAddressMultiplier(address, blockTs); @@ -110,8 +138,14 @@ export class DirectPointService extends Worker { .multipliedBy(groupBooster) .multipliedBy(addressMultiplier) .multipliedBy(loyaltyBooster); - await this.updateHoldPoint(currentStatisticalBlock.number, address, newHoldPoint); + addressHoldPoints.push({ + address, + holdPoint: newHoldPoint.toNumber(), + blockNumber: currentStatisticalBlock.number, + }); } + this.logger.log(`Finishloop address`); + await this.updateHoldPoint(addressHoldPoints, currentStatisticalBlock.number); const statisticEndTime = new Date(); const statisticElapsedTime = statisticEndTime.getTime() - statisticStartTime.getTime(); this.logger.log( @@ -129,12 +163,13 @@ export class DirectPointService extends Worker { const addressTvlMap: Map = new Map(); const addressBufferList = await this.balanceRepository.getAllAddressesByBlock(blockNumber); this.logger.log(`The address list length: ${addressBufferList.length}`); - for (const addressBuffer of addressBufferList) { + // for (const addressBuffer of addressBufferList) { + for (const addressBuffer of addressBufferList.slice(0, 100)) { const address = hexTransformer.from(addressBuffer); const addressTvl = await this.calculateAddressTvl(address, blockNumber, tokenPriceMap, blockTs); - if (addressTvl.tvl.gt(new BigNumber(0))) { - //this.logger.log(`Address ${address}: [tvl: ${addressTvl.tvl}, holdBasePoint: ${addressTvl.holdBasePoint}]`); - } + // if (addressTvl.tvl.gt(new BigNumber(0))) { + // this.logger.log(`Address ${address}: [tvl: ${addressTvl.tvl}, holdBasePoint: ${addressTvl.holdBasePoint}]`); + // } addressTvlMap.set(address, addressTvl); } return addressTvlMap; @@ -207,21 +242,51 @@ export class DirectPointService extends Worker { return tokenPrices; } - async updateHoldPoint(blockNumber: number, from: string, holdPoint: BigNumber) { - // update point of user - let fromBlockAddressPoint = await this.blockAddressPointRepository.getBlockAddressPoint(blockNumber, from); - if (!fromBlockAddressPoint) { - fromBlockAddressPoint = this.blockAddressPointRepository.createDefaultBlockAddressPoint(blockNumber, from); + async updateHoldPoint( + addressHoldPoints: { address: string; holdPoint: number; blockNumber: number }[], + blockNumber: number + ) { + const blockAddressPointsMap = new Map(); + const blockAddressPoints = []; + const addresses = []; + const newAddressPoints = []; + for (const item of addressHoldPoints) { + blockAddressPointsMap.set(item.address, item.holdPoint); + addresses.push(item.address); + blockAddressPoints.push({ + blockNumber: item.blockNumber, + address: item.address, + depositPoint: 0, + refPoint: 0, + holdPoint: item.holdPoint, + }); + newAddressPoints.push(this.pointsRepository.createDefaultPoint(item.address, item.holdPoint)); } - let fromAddressPoint = await this.pointsRepository.getPointByAddress(from); - if (!fromAddressPoint) { - fromAddressPoint = this.pointsRepository.createDefaultPoint(from); + const addressPoints = await this.pointsRepository.getPoints(); + const addressPointsMap = new Map(); + for (const item of addressPoints) { + addressPointsMap.set(item.address, item.stakePoint); } - fromBlockAddressPoint.holdPoint = holdPoint.toNumber(); - fromAddressPoint.stakePoint = Number(fromAddressPoint.stakePoint) + holdPoint.toNumber(); - this.logger.log(`Address ${from} get hold point: ${holdPoint}`); - // update point of referrer - await this.blockAddressPointRepository.upsertUserAndReferrerPoint(fromBlockAddressPoint, fromAddressPoint); + for (const item of newAddressPoints) { + const holdPoint = addressPointsMap.get(item.address); + if (!holdPoint) { + continue; + } + item.stakePoint = Number(item.stakePoint) + Number(holdPoint); + } + return new Promise((resolve) => { + this.unitOfWork.useTransaction(async () => { + this.logger.log(`Start insert directHolding point into db for block: ${blockNumber}`); + await this.blockAddressPointRepository.addManyIgnoreConflicts(blockAddressPoints); + this.logger.log(`Finish directHolding point for block: ${blockNumber}, length: ${blockAddressPoints.length}`); + await this.pointsRepository.addManyOrUpdate(newAddressPoints, ["stakePoint"], ["address"]); + this.logger.log(`Finish directHolding point for block: ${blockNumber}, length: ${newAddressPoints.length}`); + + await this.directHoldProcessingStatusRepository.upsertStatus({ blockNumber, pointProcessed: true }); + this.logger.log(`Finish directHolding point statistic for block: ${blockNumber}`); + resolve(); + }); + }); } isWithdrawStartPhase(blockTs: number): boolean { diff --git a/src/repositories/directHoldProcessingStatus.repository.ts b/src/repositories/directHoldProcessingStatus.repository.ts new file mode 100644 index 0000000..0a7eeb4 --- /dev/null +++ b/src/repositories/directHoldProcessingStatus.repository.ts @@ -0,0 +1,28 @@ +import { Injectable } from "@nestjs/common"; +import { LrtUnitOfWork } from "../unitOfWork"; +import { BaseRepository } from "./base.repository"; +import { DirectHoldProcessingStatus } from "../entities"; + +@Injectable() +export class DirectHoldProcessingStatusRepository extends BaseRepository { + public constructor(unitOfWork: LrtUnitOfWork) { + super(DirectHoldProcessingStatus, unitOfWork); + } + + public async upsertStatus(updateData: Partial): Promise { + const entityManager = this.unitOfWork.getTransactionManager(); + const result = await entityManager.upsert(DirectHoldProcessingStatus, updateData, ["blockNumber"]); + + return result.raw[0]; + } + + public async getUnprocessedBlockNumber(): Promise { + const entityManager = this.unitOfWork.getTransactionManager(); + const result = await entityManager.find(DirectHoldProcessingStatus, { + where: [{ pointProcessed: false }], + order: { blockNumber: "ASC" }, + }); + + return result; + } +} diff --git a/src/repositories/index.ts b/src/repositories/index.ts index 7148b3c..2837ddf 100644 --- a/src/repositories/index.ts +++ b/src/repositories/index.ts @@ -26,3 +26,4 @@ export * from "./seasonTotalPoint.repository"; export * from "./invites.repository"; export * from "./otherPoint.repository"; export * from "./supplementPoint.repository"; +export * from "./directHoldProcessingStatus.repository"; diff --git a/src/repositories/points.repository.ts b/src/repositories/points.repository.ts index 9180e2b..86860f4 100644 --- a/src/repositories/points.repository.ts +++ b/src/repositories/points.repository.ts @@ -1,23 +1,25 @@ import { Injectable } from "@nestjs/common"; import { LrtUnitOfWork as UnitOfWork } from "../unitOfWork"; import { Point } from "../entities"; +import { In } from "typeorm"; +import { BaseRepository } from "./base.repository"; @Injectable() -export class PointsRepository { - public constructor(private readonly unitOfWork: UnitOfWork) {} +export class PointsRepository extends BaseRepository { + public constructor(unitOfWork: UnitOfWork) { + super(Point, unitOfWork); + } - public async getPointByAddress(address: string): Promise { + public async getPoints(): Promise { const transactionManager = this.unitOfWork.getTransactionManager(); - return await transactionManager.findOne(Point, { - where: { address }, - }); + return await transactionManager.find(Point); } - public createDefaultPoint(address: string): Point { + public createDefaultPoint(address: string, stakePoint: number = 0): Point { return { id: 0, address, - stakePoint: 0, + stakePoint, refPoint: 0, }; } diff --git a/src/repositories/supplementPoint.repository.ts b/src/repositories/supplementPoint.repository.ts index 6a1aa29..9246462 100644 --- a/src/repositories/supplementPoint.repository.ts +++ b/src/repositories/supplementPoint.repository.ts @@ -1,12 +1,12 @@ import { Injectable } from "@nestjs/common"; import { LrtUnitOfWork as UnitOfWork } from "../unitOfWork"; import { BaseRepository } from "./base.repository"; -import { supplementPoint } from "../entities"; +import { SupplementPoint } from "../entities"; @Injectable() -export class SupplementPointRepository extends BaseRepository { +export class SupplementPointRepository extends BaseRepository { public constructor(unitOfWork: UnitOfWork) { - super(supplementPoint, unitOfWork); + super(SupplementPoint, unitOfWork); } public async getSupplementPointByAddress(