-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[subgraph] feat: use balance deltas over RPC calls #1948
Conversation
- be smart when to query from rpc again - avoid double rpc calling in the same block
Subgraph deployed to Polygon here: https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-v1-polygon-mainnet/1.7.2/gn I will compare to the previous version to see how much it is off. |
I haven't ran the numbers yet. This is a quick script to efficiently query all the pages from subgraph, btw: import { request } from "graphql-request@6";
// https://thegraph.com/docs/en/querying/graphql-api/#example-using-first-and-id_ge
export async function main(subgraphUrl: string, query: string) {
const allItems = [];
let lastID = "";
while (true) {
const response = await request<{
items: Array<{ id: string }>
}>(subgraphUrl, query, { id: lastID });
console.log("Item count in response:", response.items.length);
if (response.items.length === 0) {
break;
}
if (response.items === undefined) {
throw response;
}
allItems.push(response.items);
lastID = response.items[response.items.length - 1].id;
console.log("New last ID:", lastID);
}
return { result: allItems }
} The query needs an alias Example: query($lastID: ID!) {
items: accountTokenSnapshots(first: 1000, where: { id_gt: $lastID }) {
id
updatedAtTimestamp
updatedAtBlockNumber
balanceUntilUpdatedAt
balanceLastUpdatedFromRpcBlocknumber
}
} |
Definitely not adding up currently. |
I made the solution only skip the RPC call for the most certain scenarios. Now pending a new subgraph indexing and balance comparison with an existing subgraph. |
Confirmed, the balances are correct. The script I used to check (by comparing the outputs), for historical purposes: import { request } from "graphql-request";
import { groupBy } from "lodash";
type SubgraphResponse = {
items: Array<{
id: string;
balanceUntilUpdatedAt: string;
token: {
id: string;
symbol: string;
}
}>
}
// https://thegraph.com/docs/en/querying/graphql-api/#example-using-first-and-id_ge
export async function main(subgraphUrl: string, query: string) {
let allItems: SubgraphResponse["items"] = [];
let lastID = "";
while (true) {
const response = await request<SubgraphResponse>(subgraphUrl, query, { lastID: lastID });
console.log("Item count in response:", response.items.length);
if (response.items.length === 0) {
break;
}
if (response.items === undefined) {
throw response;
}
allItems = allItems.concat(response.items);
lastID = response.items[response.items.length - 1].id;
console.log("New last ID:", lastID);
}
return { items: allItems }
}
const newResultPath = "./newResult.txt";
const newResultFile = Bun.file(newResultPath);
let newResult: SubgraphResponse["items"] = null!;
if (await newResultFile.exists()) {
newResult = await newResultFile.json();
} else {
newResult = await main("https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-test-optimism-mainnet/2.1.0/gn", `query($lastID: ID!) {
items: accountTokenSnapshots(block: { number: 125503380 }, first: 1000, where: { id_gt: $lastID }) {
id
balanceUntilUpdatedAt
token {
id
symbol
}
}
}`).then(x => x.items);
await Bun.write(newResultPath, JSON.stringify(newResult, undefined, 2));
}
const oldResultPath = "./oldResult.txt";
const oldResultFile = Bun.file(oldResultPath);
let oldResult: SubgraphResponse["items"] = null!;
if (await oldResultFile.exists()) {
oldResult = await oldResultFile.json();
} else {
oldResult = await main("https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-v1-optimism-mainnet/prod/gn", `query($lastID: ID!) {
items: accountTokenSnapshots(block: { number: 125503380 }, first: 1000, where: { id_gt: $lastID }) {
id
balanceUntilUpdatedAt
token {
id
symbol
}
}
}`).then(x => x.items);
await Bun.write(oldResultPath, JSON.stringify(oldResult, undefined, 2));
}
const newGrouped = groupBy(newResult, x => x.token.id);
const oldGrouped = groupBy(oldResult, x => x.token.id);
const newGroupMap = new Map<string, string>();
for (const [key, group] of Object.entries(newGrouped)) {
const total = group.reduce((acc, curr) => { return acc + BigInt(curr.balanceUntilUpdatedAt) }, 0n)
newGroupMap.set(key, total.toString());
}
const oldGroupMap = new Map<string, string>();
for (const [key, group] of Object.entries(oldGrouped)) {
const total = group.reduce((acc, curr) => { return acc + BigInt(curr.balanceUntilUpdatedAt) }, 0n)
oldGroupMap.set(key, total.toString());
}
const newGroupMapPath = "./newGroupMap.txt";
const oldGroupMapPath = "./oldGroupMap.txt";
await Bun.write(newGroupMapPath, JSON.stringify([...newGroupMap], undefined, 2));
await Bun.write(oldGroupMapPath, JSON.stringify([...oldGroupMap], undefined, 2)); |
I'll follow up with a test on another network as well, but in general, safe to review and merge. |
@@ -497,6 +506,7 @@ function _createFlowDistributionUpdatedEntity( | |||
ev.poolDistributor = poolDistributorId; | |||
ev.totalUnits = totalUnits; | |||
ev.userData = event.params.userData; | |||
// TODO: Why not have data about buffer here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, a stray TODO. I don't quite remember the reasoning anymore...
@@ -938,7 +939,9 @@ export function updateAggregateDistributionAgreementData( | |||
|
|||
accountTokenSnapshot.isLiquidationEstimateOptimistic = | |||
accountTokenSnapshot.totalSubscriptionsWithUnits > 0 || | |||
accountTokenSnapshot.totalMembershipsWithUnits > 0; | |||
accountTokenSnapshot.totalMembershipsWithUnits > 0 || | |||
accountTokenSnapshot.totalPoolAdmins > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should comment that this is because of the adjustment flow.
please also make a new release, so we can proceed straight to deployment |
XKCD Comic RelifLink: https://xkcd.com/1948 |
Why?
Out protocol subgraphs do massive amounts of RPC calls to fetch the balances, putting a lot of strain on the RPC and subgraph indexing speed.
There should exist a low-hanging fruit to remove a lot of these calls.
How?
Pass
balanceDelta
toupdateATSStreamedAndBalanceUntilUpdatedAt
and use it if it's an account not a receiver of an IDA with units, GDA with units or an adjustment flow. EDIT: Only use it for accounts without any IDA/GDA/CFA interactions.Track
balanceLastUpdatedFromRpcBlocknumber
to know the last time the balance was updated from an RPC.2 rules:
Other
There are still gotchas to figure out!
An account should be marked as "dirty" when they're receiving an adjustment flow.EDIT: AddedtotalPoolAdmins
and account for it when calculatingisLiquidationEstimationOptimistic
There are situations where multiple events are emitted for the same thing and we should avoid double accounting. (A correlation ID would be helpful.)Done: use RPC when balance updated multiple times.We should save 'lastRpcCall' on the ATS entity so we would never do more than one RPC call for an account in the same TX.Done.balanceDelta
and flows, no need to pass "streamed amount" as that's accounted for separately. We only need pass the buffer delta.