Skip to content

Commit

Permalink
Support subgraph pagination limit
Browse files Browse the repository at this point in the history
Enabled db sync on production
  • Loading branch information
aminlatifi committed Aug 31, 2023
1 parent 53279c3 commit a11ab0d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 5 deletions.
2 changes: 2 additions & 0 deletions config/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ DATABASE_USER=postgres
DATABASE_PASSWORD=postgres
DATABASE_NAME=balance-aggregator

SUBGRAPH_PAGINATION_LIMIT=5000

BLOCKCHAIN_CONFIG_FILE_NAME=test.yaml
45 changes: 41 additions & 4 deletions src/modules/data-fetcher/data-fetch-agent.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class DataFetchAgentService {
this.dataFetchStateService,
this.graphqlClientAdapterService,
this.tokenBalanceService,
this.loadBlockChainConfigService,
);
return fetchAgent.run();
}),
Expand All @@ -54,6 +55,7 @@ class FetchAgent {
readonly dataFetchStateService: DataFetchStateService,
readonly graphqlClientAdapterService: GraphqlClientAdapterService,
readonly tokenBalanceService: TokenBalanceService,
readonly loadBlockChainConfigService: LoadBlockchainConfigService,
) {}

async run() {
Expand All @@ -76,6 +78,9 @@ class FetchAgent {
return;
}

const subgraphPaginationLimit =
await this.loadBlockChainConfigService.getSubgraphPaginationLimit();

this.logger.debug('Fetching id ' + this.fetchId);

try {
Expand All @@ -86,7 +91,7 @@ class FetchAgent {
const { lastUpdateTime, paginationSkip } = fetchState;
const { subgraphUrl, contractAddress } = this.fetchConfig;
let latestBalanceChange: SubgraphBalanceChangeEntity;
const take = 100;
const take = 500;
let skip = paginationSkip;
let result: {
balanceChanges: SubgraphBalanceChangeEntity[];
Expand All @@ -106,9 +111,33 @@ Fetch id ${this.fetchId} - subgraph url ${subgraphUrl} - last update time ${last
take,
});
const { balanceChanges } = result;
let reachedPaginationLimit = false;
if (balanceChanges.length > 0) {
latestBalanceChange = balanceChanges[balanceChanges.length - 1];
reachedPaginationLimit =
skip + balanceChanges.length >= subgraphPaginationLimit;
if (reachedPaginationLimit) {
this.logger.debug(
`Reached pagination limit. Fetch id ${this.fetchId}, last update time ${latestBalanceChange.time}\n`,
`Next time start from the latest balance change time minus 1 second\n`,
`Popping all balance changes with time ${latestBalanceChange.time}`,
);

do {
this.logger.debug(
`Popping balance change with time ${
balanceChanges[balanceChanges.length - 1].time
}`,
);
balanceChanges.pop();
} while (
balanceChanges.length > 0 &&
balanceChanges[balanceChanges.length - 1].time ===
latestBalanceChange.time
);
}

// TODO: save token balance and updating fetch state should be in a transaction
await this.tokenBalanceService.saveTokenBalanceFromSubgraphMany(
balanceChanges,
this.fetchConfig.network,
Expand All @@ -124,17 +153,25 @@ Fetch id ${this.fetchId} - subgraph url ${subgraphUrl} - last update time ${last
);
}

if (balanceChanges.length < take) {
if (balanceChanges.length < take || reachedPaginationLimit) {
// Update last update time and block number and reset pagination skip
if (latestBalanceChange) {
this.logger.debug(
`Fetching completed. Fetch id ${this.fetchId}, last update time ${latestBalanceChange.time}`,
);
let lastUpdateTime = +latestBalanceChange.time;
let lastBlockNumber = +latestBalanceChange.block;

if (reachedPaginationLimit) {
lastUpdateTime = lastUpdateTime > 0 ? lastUpdateTime - 1 : 0;
lastBlockNumber = lastBlockNumber > 0 ? lastBlockNumber - 1 : 0;
}

await this.dataFetchStateService.updateLastUpdateTimeAndBlockNumber(
this.fetchId,
{
lastUpdateTime: +latestBalanceChange.time,
lastBlockNumber: +latestBalanceChange.block,
lastUpdateTime,
lastBlockNumber,
},
);
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/modules/data-fetcher/load-blockchain-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ export class LoadBlockchainConfigService {

return blockChainConfig;
}

async getSubgraphPaginationLimit(): Promise<number> {
return Number(
this.configService.get<number>('SUBGRAPH_PAGINATION_LIMIT') || 4000,
);
}
}
2 changes: 1 addition & 1 deletion src/modules/database.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { ConnectionOptions, DataSource } from 'typeorm';
password: configService.get<string>('DATABASE_PASSWORD'),
database: configService.get<string>('DATABASE_NAME'),
entities: [TokenBalance, TokenBalanceUpdate, DataFetchState],
synchronize: process.env.NODE_ENV !== 'production',
synchronize: true, // process.env.NODE_ENV !== 'production', TODO: change to false in production
migrations: [CreateTokenBalanceInsertTrigger1684090897120],
ssl:
configService.get<string>('DATABASE_SSL') !== 'false'
Expand Down

0 comments on commit a11ab0d

Please sign in to comment.