Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[subgraph] feat: use balance deltas over RPC calls #1948

Merged
merged 17 commits into from
Sep 19, 2024

Conversation

kasparkallas
Copy link
Contributor

@kasparkallas kasparkallas commented May 30, 2024

Why?

Out protocol subgraphs do massive amounts of RPC calls to fetch the balances, putting a lot of strain on the RPC and subgraph indexing speed.

There should exist a low-hanging fruit to remove a lot of these calls.

How?

Pass balanceDelta to updateATSStreamedAndBalanceUntilUpdatedAt and use it if it's an account not a receiver of an IDA with units, GDA with units or an adjustment flow. EDIT: Only use it for accounts without any IDA/GDA/CFA interactions.

Track balanceLastUpdatedFromRpcBlocknumber to know the last time the balance was updated from an RPC.

2 rules:

  • If balance in a block was updated from an RPC, no need to update the balance again in the block. The RPC returns the final balance of a block.
  • If balance is updated a second time in a block (without an RPC) then do an RPC call.

Other

There are still gotchas to figure out!

  • An account should be marked as "dirty" when they're receiving an adjustment flow. EDIT: Added totalPoolAdmins and account for it when calculating isLiquidationEstimationOptimistic
  • There are situations where multiple events are emitted for the same thing and we should avoid double accounting. (A correlation ID would be helpful.) Done: use RPC when balance updated multiple times.
  • We should save 'lastRpcCall' on the ATS entity so we would never do more than one RPC call for an account in the same TX. Done.
  • When dealing with balanceDelta and flows, no need to pass "streamed amount" as that's accounted for separately. We only need pass the buffer delta.

@kasparkallas kasparkallas requested a review from d10r May 30, 2024 09:01
@kasparkallas kasparkallas self-assigned this May 30, 2024
@kasparkallas kasparkallas requested a review from hellwolf as a code owner May 30, 2024 09:01
@kasparkallas kasparkallas marked this pull request as draft May 30, 2024 09:32
- be smart when to query from rpc again
- avoid double rpc calling in the same block
@kasparkallas
Copy link
Contributor Author

Subgraph deployed to Polygon here: https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-v1-polygon-mainnet/1.7.2/gn

I will compare to the previous version to see how much it is off.

@kasparkallas
Copy link
Contributor Author

kasparkallas commented Jun 10, 2024

I haven't ran the numbers yet.

This is a quick script to efficiently query all the pages from subgraph, btw:

import { request } from "graphql-request@6";

// https://thegraph.com/docs/en/querying/graphql-api/#example-using-first-and-id_ge
export async function main(subgraphUrl: string, query: string) {
  const allItems = [];

  let lastID = "";
  while (true) {
    const response = await request<{
      items: Array<{ id: string }>
    }>(subgraphUrl, query, { id: lastID });

    console.log("Item count in response:", response.items.length);
    if (response.items.length === 0) {
      break;
    }

    if (response.items === undefined) {
      throw response;
    }

    allItems.push(response.items);
    lastID = response.items[response.items.length - 1].id;
    console.log("New last ID:", lastID);
  }

  return { result: allItems }
}

The query needs an alias items for the collection being paginated and the query needs to contain id_ge with the $lastID variable.

Example:

query($lastID: ID!) {
  items: accountTokenSnapshots(first: 1000, where: { id_gt: $lastID }) {
    id
    updatedAtTimestamp
    updatedAtBlockNumber
    balanceUntilUpdatedAt
    balanceLastUpdatedFromRpcBlocknumber
  }
}

@kasparkallas
Copy link
Contributor Author

Definitely not adding up currently.

@kasparkallas kasparkallas changed the title [wip][subgraph] feat: use balance deltas over rpc calls [subgraph] feat: use balance deltas over rpc calls Sep 13, 2024
@kasparkallas kasparkallas marked this pull request as ready for review September 13, 2024 07:51
@kasparkallas kasparkallas changed the title [subgraph] feat: use balance deltas over rpc calls [subgraph] feat: use balance deltas over RPC calls Sep 13, 2024
@kasparkallas
Copy link
Contributor Author

I made the solution only skip the RPC call for the most certain scenarios. Now pending a new subgraph indexing and balance comparison with an existing subgraph.

@kasparkallas
Copy link
Contributor Author

kasparkallas commented Sep 18, 2024

Confirmed, the balances are correct.

The script I used to check (by comparing the outputs), for historical purposes:

import { request } from "graphql-request";
import { groupBy } from "lodash";

type SubgraphResponse = {
  items: Array<{ 
    id: string;
    balanceUntilUpdatedAt: string;
    token: {
      id: string;
      symbol: string;
    }
   }>
}

// https://thegraph.com/docs/en/querying/graphql-api/#example-using-first-and-id_ge
export async function main(subgraphUrl: string, query: string) {
  let allItems: SubgraphResponse["items"] = [];

  let lastID = "";
  while (true) {
    const response = await request<SubgraphResponse>(subgraphUrl, query, { lastID: lastID });

    console.log("Item count in response:", response.items.length);
    if (response.items.length === 0) {
      break;
    }

    if (response.items === undefined) {
      throw response;
    }
    
    allItems = allItems.concat(response.items);
    lastID = response.items[response.items.length - 1].id;
    console.log("New last ID:", lastID);
  }

  return { items: allItems }
}

const newResultPath = "./newResult.txt";
const newResultFile = Bun.file(newResultPath);

let newResult: SubgraphResponse["items"] = null!;
if (await newResultFile.exists()) {
  newResult = await newResultFile.json();
} else {
  newResult = await main("https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-test-optimism-mainnet/2.1.0/gn", `query($lastID: ID!) {
    items: accountTokenSnapshots(block: { number: 125503380 }, first: 1000, where: { id_gt: $lastID }) {
      id
      balanceUntilUpdatedAt
      token {
        id
        symbol
      }
    }
  }`).then(x => x.items);
  await Bun.write(newResultPath, JSON.stringify(newResult, undefined, 2));
}

const oldResultPath = "./oldResult.txt";
const oldResultFile = Bun.file(oldResultPath);

let oldResult: SubgraphResponse["items"] = null!;
if (await oldResultFile.exists()) {
  oldResult = await oldResultFile.json();
} else {
  oldResult = await main("https://api.goldsky.com/api/public/project_clsnd6xsoma5j012qepvucfpp/subgraphs/protocol-v1-optimism-mainnet/prod/gn", `query($lastID: ID!) {
    items: accountTokenSnapshots(block: { number: 125503380 }, first: 1000, where: { id_gt: $lastID }) {
      id
      balanceUntilUpdatedAt
      token {
        id
        symbol
      }
    }
  }`).then(x => x.items);
  await Bun.write(oldResultPath, JSON.stringify(oldResult, undefined, 2));
}

const newGrouped = groupBy(newResult, x => x.token.id);
const oldGrouped = groupBy(oldResult, x => x.token.id);

const newGroupMap = new Map<string, string>();
for (const [key, group] of Object.entries(newGrouped)) {
  const total = group.reduce((acc, curr) => { return acc + BigInt(curr.balanceUntilUpdatedAt) }, 0n)
  newGroupMap.set(key, total.toString());
}

const oldGroupMap = new Map<string, string>();
for (const [key, group] of Object.entries(oldGrouped)) {
  const total = group.reduce((acc, curr) => { return acc + BigInt(curr.balanceUntilUpdatedAt) }, 0n)
  oldGroupMap.set(key, total.toString());
}

const newGroupMapPath = "./newGroupMap.txt";
const oldGroupMapPath = "./oldGroupMap.txt";

await Bun.write(newGroupMapPath, JSON.stringify([...newGroupMap], undefined, 2));
await Bun.write(oldGroupMapPath, JSON.stringify([...oldGroupMap], undefined, 2));

@kasparkallas
Copy link
Contributor Author

I'll follow up with a test on another network as well, but in general, safe to review and merge.

@@ -497,6 +506,7 @@ function _createFlowDistributionUpdatedEntity(
ev.poolDistributor = poolDistributorId;
ev.totalUnits = totalUnits;
ev.userData = event.params.userData;
// TODO: Why not have data about buffer here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, a stray TODO. I don't quite remember the reasoning anymore...

@@ -938,7 +939,9 @@ export function updateAggregateDistributionAgreementData(

accountTokenSnapshot.isLiquidationEstimateOptimistic =
accountTokenSnapshot.totalSubscriptionsWithUnits > 0 ||
accountTokenSnapshot.totalMembershipsWithUnits > 0;
accountTokenSnapshot.totalMembershipsWithUnits > 0 ||
accountTokenSnapshot.totalPoolAdmins > 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should comment that this is because of the adjustment flow.

packages/subgraph/schema.graphql Outdated Show resolved Hide resolved
packages/subgraph/schema.graphql Outdated Show resolved Hide resolved
@d10r
Copy link
Collaborator

d10r commented Sep 18, 2024

please also make a new release, so we can proceed straight to deployment

@kasparkallas kasparkallas merged commit 2898aaa into dev Sep 19, 2024
14 checks passed
@kasparkallas kasparkallas deleted the optimize-subgraph-indexing branch September 19, 2024 09:58
Copy link

XKCD Comic Relif

Link: https://xkcd.com/1948
https://xkcd.com/1948

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants