Skip to content

Commit

Permalink
optimization direct points.
Browse files Browse the repository at this point in the history
  • Loading branch information
robert-zklink committed Jul 25, 2024
1 parent d4eed32 commit 9b7d3f4
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 56 deletions.
8 changes: 6 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
SeasonTotalPointRepository,
OtherPointRepository,
SupplementPointRepository,
DirectHoldProcessingStatusRepository,
} from "./repositories";
import {
Block,
Expand Down Expand Up @@ -71,7 +72,8 @@ import {
Invites,
SeasonTotalPoint,
OtherPoint,
supplementPoint,
SupplementPoint,
DirectHoldProcessingStatus,
} from "./entities";
import { typeOrmReferModuleOptions, typeOrmLrtModuleOptions, typeOrmExplorerModuleOptions } from "./typeorm.config";
import { RetryDelayProvider } from "./retryDelay.provider";
Expand Down Expand Up @@ -156,7 +158,8 @@ import { CalTxPointService } from "./points/calTxPoint.service";
TvlProcessingStatus,
TxProcessingStatus,
SeasonTotalPoint,
supplementPoint,
SupplementPoint,
DirectHoldProcessingStatus,
],
"lrt"
),
Expand Down Expand Up @@ -232,6 +235,7 @@ import { CalTxPointService } from "./points/calTxPoint.service";
OtherPointRepository,
ProjectTvlService,
SupplementPointRepository,
DirectHoldProcessingStatusRepository,
],
})
export class AppModule {}
10 changes: 6 additions & 4 deletions src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { RedistributePointService } from "./points/redistributePoint.service";
import { BaseDataService } from "./points/baseData.service";
import { ReferralPointService } from "./points/referralPoints.service";
import { SeasonTotalPointService } from "./points/seasonTotalPoint.service";
import { DirectPointService } from "./points/directPoint.service";

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
Expand All @@ -28,7 +29,8 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
private readonly calTxPointService: CalTxPointService,
private readonly redistributePointService: RedistributePointService,
private readonly referralPointService: ReferralPointService,
private readonly seasonTotalPointService: SeasonTotalPointService
private readonly seasonTotalPointService: SeasonTotalPointService,
private readonly directPointService: DirectPointService
) {
this.logger = new Logger(AppService.name);
}
Expand All @@ -39,9 +41,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
// second params is utc+8
// await this.tvlPointService.handleHoldPoint(1395273, new Date(1715159940 * 1000).toISOString());
// this.compensatePoints()
await this.seasonTotalPointService.handlePoint();
await this.referralPointService.handleReferralPoint();
await this.seasonTotalPointService.handlePoint();
await this.directPointService.runProcess();
this.redistributePointService.runProcess();
this.startWorkers();
}
Expand All @@ -68,6 +68,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
this.calTvlPointService.start(),
this.calTxPointService.start(),
this.seasonTotalPointService.start(),
this.directPointService.start(),
]);
}

Expand All @@ -80,6 +81,7 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
this.calTvlPointService.stop(),
this.calTxPointService.stop(),
this.seasonTotalPointService.stop(),
this.directPointService.stop(),
]);
}

Expand Down
12 changes: 12 additions & 0 deletions src/entities/directHoldProcessingStatus.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Entity, Column, Unique, Index, PrimaryColumn } from "typeorm";
import { BaseEntity } from "./base.entity";

@Entity({ name: "directHoldProcessingStatus" })
@Index(["pointProcessed"])
export class DirectHoldProcessingStatus extends BaseEntity {
@PrimaryColumn()
blockNumber: number;

@Column({ default: false })
pointProcessed: boolean;
}
1 change: 1 addition & 0 deletions src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ export * from "./seasonTotalPoint.entity";
export * from "./invites.entity";
export * from "./otherPoints.entity";
export * from "./supplementPoint.entity";
export * from "./directHoldProcessingStatus.entity";
2 changes: 1 addition & 1 deletion src/entities/supplementPoint.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export enum SupplementPointType {

@Entity({ name: "supplementPoint" })
@Index("idx_supplementPoint_1", ["address", "batchString", "type"])
export class supplementPoint extends BaseEntity {
export class SupplementPoint extends BaseEntity {
@Column({ type: "bytea", transformer: hexTransformer })
public address: string;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class GenerateDirectPointProcessingStatusTable1721893869511 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE "directHoldProcessingStatus" (
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
"updatedAt" TIMESTAMP NOT NULL DEFAULT now(),
"blockNumber" integer NOT NULL,
"pointProcessed" boolean NOT NULL DEFAULT false,
CONSTRAINT "PK_directHoldProcessingStatus" PRIMARY KEY ("blockNumber"))`
);
await queryRunner.query(
`CREATE INDEX "IDX_directHoldProcessingStatus_1" ON "directHoldProcessingStatus" ("pointProcessed") `
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_directHoldProcessingStatus_1"`);
await queryRunner.query(`DROP TABLE "directHoldProcessingStatus"`);
}
}
141 changes: 103 additions & 38 deletions src/points/directPoint.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import {
BlockTokenPriceRepository,
BlockAddressPointRepository,
AddressFirstDepositRepository,
DirectHoldProcessingStatusRepository,
} from "../repositories";
import { TokenMultiplier, TokenService } from "../token/token.service";
import BigNumber from "bignumber.js";
import { hexTransformer } from "../transformers/hex.transformer";
import { ConfigService } from "@nestjs/config";
import { getETHPrice, getTokenPrice, STABLE_COIN_TYPE } from "./baseData.service";
import { BaseDataService, getETHPrice, getTokenPrice, STABLE_COIN_TYPE } from "./baseData.service";
import addressMultipliers from "../config/addressMultipliers";
import waitFor from "src/utils/waitFor";
import { LrtUnitOfWork } from "src/unitOfWork";

export const LOYALTY_BOOSTER_FACTOR: BigNumber = new BigNumber(0.005);
type BlockAddressTvl = {
Expand All @@ -39,6 +41,9 @@ export class DirectPointService extends Worker {
private readonly balanceRepository: BalanceRepository,
private readonly blockRepository: BlockRepository,
private readonly addressFirstDepositRepository: AddressFirstDepositRepository,
private readonly directHoldProcessingStatusRepository: DirectHoldProcessingStatusRepository,
private readonly baseDataService: BaseDataService,
private readonly unitOfWork: LrtUnitOfWork,
private readonly configService: ConfigService
) {
super();
Expand All @@ -54,15 +59,7 @@ export class DirectPointService extends Worker {
}

@Cron("0 2,10,18 * * *")
protected async runProcess(): Promise<void> {
try {
await this.handleHoldPoint();
} catch (error) {
this.logger.error("Failed to calculate hold point", error.stack);
}
}

async handleHoldPoint() {
public async saveBlockNumber(): Promise<void> {
const currentBlockNumber = await this.balanceRepository.getLatesBlockNumber();
if (!currentBlockNumber) {
this.logger.log(`Wait for the next hold point statistical block`);
Expand All @@ -73,23 +70,54 @@ export class DirectPointService extends Worker {
number: currentBlockNumber,
},
});
await waitFor(() => false, 60 * 1000, 60 * 1000);
if (!currentStatisticalBlock) {
this.logger.error(`No block found for block number: ${currentBlockNumber}`);
return;
}
await this.baseDataService.updateTokenPrice(currentStatisticalBlock);
await this.directHoldProcessingStatusRepository.upsertStatus({
blockNumber: currentBlockNumber,
pointProcessed: false,
});
}

public async runProcess(): Promise<void> {
const blockNumbers = await this.directHoldProcessingStatusRepository.getUnprocessedBlockNumber();
if (blockNumbers.length === 0) {
this.logger.log(`No block to process`);
return;
}
for (const blockNumber of blockNumbers) {
try {
await this.handleHoldPoint(blockNumber.blockNumber);
} catch (error) {
this.logger.error("Failed to calculate hold point", error.stack);
}
}
await waitFor(() => false, 1000, 1000);
}

async handleHoldPoint(currentBlockNumber: number) {
const addressHoldPoints: {
address: string;
holdPoint: number;
blockNumber: number;
}[] = [];
const currentStatisticalBlock = await this.blockRepository.getLastBlock({
where: {
number: currentBlockNumber,
},
});

const statisticStartTime = new Date();
const earlyBirdMultiplier = this.getEarlyBirdMultiplier(currentStatisticalBlock.timestamp);
this.logger.log(`Early bird multiplier: ${earlyBirdMultiplier}`);
const tokenPriceMap = await this.getTokenPriceMap(currentStatisticalBlock.number);
const blockTs = currentStatisticalBlock.timestamp.getTime();
const addressTvlMap = await this.getAddressTvlMap(currentStatisticalBlock.number, blockTs, tokenPriceMap);
for (const address of addressTvlMap.keys()) {
const fromBlockAddressPoint = await this.blockAddressPointRepository.getBlockAddressPoint(
currentStatisticalBlock.number,
address
);
if (!!fromBlockAddressPoint && fromBlockAddressPoint.holdPoint > 0) {
this.logger.log(`Address hold point calculated: ${address}`);
continue;
}
const addresses = Array.from(addressTvlMap.keys());
this.logger.log(`Start loop address, address count: ${addresses.length}`);
for (const address of addresses) {
const addressTvl = addressTvlMap.get(address);
const addressMultiplier = this.getAddressMultiplier(address, blockTs);

Expand All @@ -110,8 +138,14 @@ export class DirectPointService extends Worker {
.multipliedBy(groupBooster)
.multipliedBy(addressMultiplier)
.multipliedBy(loyaltyBooster);
await this.updateHoldPoint(currentStatisticalBlock.number, address, newHoldPoint);
addressHoldPoints.push({
address,
holdPoint: newHoldPoint.toNumber(),
blockNumber: currentStatisticalBlock.number,
});
}
this.logger.log(`Finishloop address`);
await this.updateHoldPoint(addressHoldPoints, currentStatisticalBlock.number);
const statisticEndTime = new Date();
const statisticElapsedTime = statisticEndTime.getTime() - statisticStartTime.getTime();
this.logger.log(
Expand All @@ -129,12 +163,13 @@ export class DirectPointService extends Worker {
const addressTvlMap: Map<string, BlockAddressTvl> = new Map();
const addressBufferList = await this.balanceRepository.getAllAddressesByBlock(blockNumber);
this.logger.log(`The address list length: ${addressBufferList.length}`);
for (const addressBuffer of addressBufferList) {
// for (const addressBuffer of addressBufferList) {
for (const addressBuffer of addressBufferList.slice(0, 100)) {
const address = hexTransformer.from(addressBuffer);
const addressTvl = await this.calculateAddressTvl(address, blockNumber, tokenPriceMap, blockTs);
if (addressTvl.tvl.gt(new BigNumber(0))) {
//this.logger.log(`Address ${address}: [tvl: ${addressTvl.tvl}, holdBasePoint: ${addressTvl.holdBasePoint}]`);
}
// if (addressTvl.tvl.gt(new BigNumber(0))) {
// this.logger.log(`Address ${address}: [tvl: ${addressTvl.tvl}, holdBasePoint: ${addressTvl.holdBasePoint}]`);
// }
addressTvlMap.set(address, addressTvl);
}
return addressTvlMap;
Expand Down Expand Up @@ -207,21 +242,51 @@ export class DirectPointService extends Worker {
return tokenPrices;
}

async updateHoldPoint(blockNumber: number, from: string, holdPoint: BigNumber) {
// update point of user
let fromBlockAddressPoint = await this.blockAddressPointRepository.getBlockAddressPoint(blockNumber, from);
if (!fromBlockAddressPoint) {
fromBlockAddressPoint = this.blockAddressPointRepository.createDefaultBlockAddressPoint(blockNumber, from);
async updateHoldPoint(
addressHoldPoints: { address: string; holdPoint: number; blockNumber: number }[],
blockNumber: number
) {
const blockAddressPointsMap = new Map<string, number>();
const blockAddressPoints = [];
const addresses = [];
const newAddressPoints = [];
for (const item of addressHoldPoints) {
blockAddressPointsMap.set(item.address, item.holdPoint);
addresses.push(item.address);
blockAddressPoints.push({
blockNumber: item.blockNumber,
address: item.address,
depositPoint: 0,
refPoint: 0,
holdPoint: item.holdPoint,
});
newAddressPoints.push(this.pointsRepository.createDefaultPoint(item.address, item.holdPoint));
}
let fromAddressPoint = await this.pointsRepository.getPointByAddress(from);
if (!fromAddressPoint) {
fromAddressPoint = this.pointsRepository.createDefaultPoint(from);
const addressPoints = await this.pointsRepository.getPoints();
const addressPointsMap = new Map<string, number>();
for (const item of addressPoints) {
addressPointsMap.set(item.address, item.stakePoint);
}
fromBlockAddressPoint.holdPoint = holdPoint.toNumber();
fromAddressPoint.stakePoint = Number(fromAddressPoint.stakePoint) + holdPoint.toNumber();
this.logger.log(`Address ${from} get hold point: ${holdPoint}`);
// update point of referrer
await this.blockAddressPointRepository.upsertUserAndReferrerPoint(fromBlockAddressPoint, fromAddressPoint);
for (const item of newAddressPoints) {
const holdPoint = addressPointsMap.get(item.address);
if (!holdPoint) {
continue;
}
item.stakePoint = Number(item.stakePoint) + Number(holdPoint);
}
return new Promise<void>((resolve) => {
this.unitOfWork.useTransaction(async () => {
this.logger.log(`Start insert directHolding point into db for block: ${blockNumber}`);
await this.blockAddressPointRepository.addManyIgnoreConflicts(blockAddressPoints);
this.logger.log(`Finish directHolding point for block: ${blockNumber}, length: ${blockAddressPoints.length}`);
await this.pointsRepository.addManyOrUpdate(newAddressPoints, ["stakePoint"], ["address"]);
this.logger.log(`Finish directHolding point for block: ${blockNumber}, length: ${newAddressPoints.length}`);

await this.directHoldProcessingStatusRepository.upsertStatus({ blockNumber, pointProcessed: true });
this.logger.log(`Finish directHolding point statistic for block: ${blockNumber}`);
resolve();
});
});
}

isWithdrawStartPhase(blockTs: number): boolean {
Expand Down
28 changes: 28 additions & 0 deletions src/repositories/directHoldProcessingStatus.repository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Injectable } from "@nestjs/common";
import { LrtUnitOfWork } from "../unitOfWork";
import { BaseRepository } from "./base.repository";
import { DirectHoldProcessingStatus } from "../entities";

@Injectable()
export class DirectHoldProcessingStatusRepository extends BaseRepository<DirectHoldProcessingStatus> {
public constructor(unitOfWork: LrtUnitOfWork) {
super(DirectHoldProcessingStatus, unitOfWork);
}

public async upsertStatus(updateData: Partial<DirectHoldProcessingStatus>): Promise<DirectHoldProcessingStatus> {
const entityManager = this.unitOfWork.getTransactionManager();
const result = await entityManager.upsert(DirectHoldProcessingStatus, updateData, ["blockNumber"]);

return result.raw[0];
}

public async getUnprocessedBlockNumber(): Promise<DirectHoldProcessingStatus[]> {
const entityManager = this.unitOfWork.getTransactionManager();
const result = await entityManager.find(DirectHoldProcessingStatus, {
where: [{ pointProcessed: false }],
order: { blockNumber: "ASC" },
});

return result;
}
}
1 change: 1 addition & 0 deletions src/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ export * from "./seasonTotalPoint.repository";
export * from "./invites.repository";
export * from "./otherPoint.repository";
export * from "./supplementPoint.repository";
export * from "./directHoldProcessingStatus.repository";
Loading

0 comments on commit 9b7d3f4

Please sign in to comment.