From 8e185347d88dac00f594ae27af4a635c8026e957 Mon Sep 17 00:00:00 2001 From: belopash Date: Fri, 20 Dec 2024 21:15:53 +0300 Subject: [PATCH] use ReadableStream --- .../portal-api_2024-12-16-21-16.json | 10 - common/config/rush/pnpm-lock.yaml | 44 ++-- test/erc20-transfers/src/processor.ts | 22 +- util/portal-client/src/client.ts | 211 ++++++++++-------- 4 files changed, 154 insertions(+), 133 deletions(-) delete mode 100644 common/changes/@subsquid/http-client/portal-api_2024-12-16-21-16.json diff --git a/common/changes/@subsquid/http-client/portal-api_2024-12-16-21-16.json b/common/changes/@subsquid/http-client/portal-api_2024-12-16-21-16.json deleted file mode 100644 index ff35dda8c..000000000 --- a/common/changes/@subsquid/http-client/portal-api_2024-12-16-21-16.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "changes": [ - { - "packageName": "@subsquid/http-client", - "comment": "add `retry-after` header support", - "type": "minor" - } - ], - "packageName": "@subsquid/http-client" -} \ No newline at end of file diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 901ed1a9b..9614d4bd8 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -122,6 +122,9 @@ dependencies: '@rush-temp/ops-xcm-typegen': specifier: file:./projects/ops-xcm-typegen.tgz version: file:projects/ops-xcm-typegen.tgz + '@rush-temp/portal-client': + specifier: file:./projects/portal-client.tgz + version: file:projects/portal-client.tgz '@rush-temp/raw-archive-validator': specifier: file:./projects/raw-archive-validator.tgz version: file:projects/raw-archive-validator.tgz @@ -6991,7 +6994,7 @@ packages: dev: false file:projects/data-test.tgz: - resolution: {integrity: sha512-vfocRZTAM/R/+T+dR+5OwAzOim5k+JWu+uWq5NqTINBuin6gQ4H6ufbTXjXeAvD3OvAyaUqS2TPYENntSFAHdA==, tarball: file:projects/data-test.tgz} + resolution: {integrity: sha512-s+hwonZVmCEShGErXBavkrZKxMnQxPGBIsraWM+8LK74aGUAD1nYqshUh9jh05RlwavSWEilNTo+BbI81fFriw==, tarball: file:projects/data-test.tgz} name: '@rush-temp/data-test' version: 0.0.0 dependencies: @@ -7100,7 +7103,7 @@ packages: dev: false file:projects/evm-processor.tgz: - resolution: {integrity: sha512-JEYc/4KCDHotB8lMj+SjVPzNPOPpzsWXWwreQy+hAjZQes/fWnMg4FGQ71NFQOYwZ/zL0k+imKUpvvcTQ69ALw==, tarball: file:projects/evm-processor.tgz} + resolution: {integrity: sha512-ZgPmppneDuwcDlXNnE+VhlGQxlqHoG/WtfYidVuwqgUKbX4jEKYKXJE3hzkBlNegrmIOvOnF3FStRgcuWucnvQ==, tarball: file:projects/evm-processor.tgz} name: '@rush-temp/evm-processor' version: 0.0.0 dependencies: @@ -7109,7 +7112,7 @@ packages: dev: false file:projects/evm-typegen.tgz: - resolution: {integrity: sha512-zze/6ha5YyYINCNglygQHh63jV8/nsVqPbKpZeeznnnEldNG2n6/WMajR0vdAnGDRTMR+nmP9/TveOPbDTX+mg==, tarball: file:projects/evm-typegen.tgz} + resolution: {integrity: sha512-3DiW6rLdLmAUqUB+P1iYHfRHi9123lAVabeIrJB0klQj16q70mvel+mrY9hCk1nUvxiEVEnPCueay3Ldf2ZQTg==, tarball: file:projects/evm-typegen.tgz} name: '@rush-temp/evm-typegen' version: 0.0.0 dependencies: @@ -7140,7 +7143,7 @@ packages: dev: false file:projects/fuel-data.tgz: - resolution: {integrity: sha512-6XW1Dvmzit570V/bEwNjiD5jJ9ZHbvJ3gyNoS6HE2qoR+C2/NPqPjMcev6xhOd/XS2/raaw6y8c6ywT3lTWTUw==, tarball: file:projects/fuel-data.tgz} + resolution: {integrity: sha512-LZwxHIPqKJw/590kj/j7tba2SzlHejChA/x2JTnSD9HW1ijOrrmit8TYH9zPf4mcW4gZTM9hcZFFtjIOfPLsCw==, tarball: file:projects/fuel-data.tgz} name: '@rush-temp/fuel-data' version: 0.0.0 dependencies: @@ -7149,7 +7152,7 @@ packages: dev: false file:projects/fuel-dump.tgz: - resolution: {integrity: sha512-P93UXjAjSIwTD/ZyemYajxts+j65qTByNGxAApyDXZOWywHdt0K3EPnDyLDz4/gm8LG6OtalcJfOhyPU/iKk2A==, tarball: file:projects/fuel-dump.tgz} + resolution: {integrity: sha512-ImKKGnItSKiCytmEqLaz148rhsH0b2kxYWkAMAR+JN3s89/rVDLKcFrqOh6CJHinIeZ1/zcT4ZoFzVXyYIS9mQ==, tarball: file:projects/fuel-dump.tgz} name: '@rush-temp/fuel-dump' version: 0.0.0 dependencies: @@ -7216,7 +7219,7 @@ packages: dev: false file:projects/fuel-stream.tgz: - resolution: {integrity: sha512-iEuYdfArMof7F87gKvpnuGx4j/PzsgkMrruS29W2UTx+7FhwkUnujQxqLQx2E2++SuKSb18Ga+eI+GP0lsl07g==, tarball: file:projects/fuel-stream.tgz} + resolution: {integrity: sha512-6J2VBe48XWIqSZ3M8lZtBrsHZr+p8zRTofEkolzhhT4kighSSCG/HVPp3tp1ZXc5rPGpQGT/JZeCArOsGo3EhQ==, tarball: file:projects/fuel-stream.tgz} name: '@rush-temp/fuel-stream' version: 0.0.0 dependencies: @@ -7225,7 +7228,7 @@ packages: dev: false file:projects/gql-test-client.tgz(graphql@15.8.0): - resolution: {integrity: sha512-JGE+gV8EgQ8u90IfJW06yab+oC1gw+HsJdwvSj6LdzY6dbtlIDXNRFeJMV988eLN8IxLSdDhd98h4YYt6n3suw==, tarball: file:projects/gql-test-client.tgz} + resolution: {integrity: sha512-bivyHNQZ2H4YrNdCo4dwEc53cIe+iXpU2UF6vMjBwbgHqiIbrhGpMbtzgKoZLdNAkbqchh0E543pkDQy318iow==, tarball: file:projects/gql-test-client.tgz} id: file:projects/gql-test-client.tgz name: '@rush-temp/gql-test-client' version: 0.0.0 @@ -7393,6 +7396,15 @@ packages: typescript: 5.5.4 dev: false + file:projects/portal-client.tgz: + resolution: {integrity: sha512-pbgQKxXzHf+VnuzfaxJ6HPz0cHEdqcufxV+ztN/6aTP4tDaAaHGnS7iKdUZNmj/ZOxhkeMRPrxdjsJQQyuy+Iw==, tarball: file:projects/portal-client.tgz} + name: '@rush-temp/portal-client' + version: 0.0.0 + dependencies: + '@types/node': 18.19.0 + typescript: 5.5.4 + dev: false + file:projects/raw-archive-validator.tgz: resolution: {integrity: sha512-ySuPCdXOui/7IYLbvb6mbtiZaYR6jo96wxUe+TcVy9RsNpXKCjKMESmmaIbL2rHFty95SClZrwKkchoKdNScWw==, tarball: file:projects/raw-archive-validator.tgz} name: '@rush-temp/raw-archive-validator' @@ -7404,7 +7416,7 @@ packages: dev: false file:projects/rpc-client.tgz: - resolution: {integrity: sha512-B54boDboO5+HelVasKnnBZs/VahkkLvB9ETLN4nlILcTBwv2SZP0UbC4VYNaSRm5n1PtYb52954pqKlNkEecmA==, tarball: file:projects/rpc-client.tgz} + resolution: {integrity: sha512-T49gc/C/cmRUB956khwoKu41Izdk4BPYLlFBZAWkPn0xDje0kbmmjb76WyueHTu7mrjvSdjVNqrIkg/RYxGV8Q==, tarball: file:projects/rpc-client.tgz} name: '@rush-temp/rpc-client' version: 0.0.0 dependencies: @@ -7580,7 +7592,7 @@ packages: dev: false file:projects/solana-stream.tgz: - resolution: {integrity: sha512-xt4F3+TWZqYkHFwWyxO9FFeCi1ZwfDjgt3AkM9qPwhnxBfyawaGdEeTUwfJtPJsNrxaroMLXmLRFmKp65iXO8w==, tarball: file:projects/solana-stream.tgz} + resolution: {integrity: sha512-krZzJP5QBey2MWE6BMGBn4fRnV4P1weoVts3iAyfNNYs+ukKIZLn76qFrzJ8yIpYlZUF4tq49CK0v0jnZ+XHEA==, tarball: file:projects/solana-stream.tgz} name: '@rush-temp/solana-stream' version: 0.0.0 dependencies: @@ -7590,7 +7602,7 @@ packages: dev: false file:projects/solana-typegen.tgz: - resolution: {integrity: sha512-Qva321wiPxpOzXBUf6Qj5vlQ2eZILvwFdBCfchCmxQY7EqzLILY+FjMfNcj/VzaA3tTnfp8adYgO1YfoKHdCMw==, tarball: file:projects/solana-typegen.tgz} + resolution: {integrity: sha512-P2R//97dE6hHXv65zV5e3SuD+ArfjY1jxt9Ct2j5ckibvec646B47UHZsBRTlu6u9tVJSJIFth4OvWa7CogkYw==, tarball: file:projects/solana-typegen.tgz} name: '@rush-temp/solana-typegen' version: 0.0.0 dependencies: @@ -7687,7 +7699,7 @@ packages: dev: false file:projects/starknet-stream.tgz: - resolution: {integrity: sha512-WSp/9Dd2y5+GeN0F01Az1HS5BtnOAjlDQ6o1qwlUptfoYMB6DBf4udXGAA7i2Sj3AijE+t3pAxoRXkX82Y3GmA==, tarball: file:projects/starknet-stream.tgz} + resolution: {integrity: sha512-xOStr+km+Cj7Q9yY8YNp0ledJQhcYDzparQYEVgg55Na1fPHHJls2qvHiGb5mqofRjVfQlYsmWkD1QeyXpojUg==, tarball: file:projects/starknet-stream.tgz} name: '@rush-temp/starknet-stream' version: 0.0.0 dependencies: @@ -7760,7 +7772,7 @@ packages: dev: false file:projects/substrate-processor.tgz: - resolution: {integrity: sha512-bv9yCssIgRcDJfSIiJqw97p9t03FHIBMk5Qvfs3cQuN9MOOCB4retTUPhzT+yhgtD1czIriXIcAgmGBgcEeOHw==, tarball: file:projects/substrate-processor.tgz} + resolution: {integrity: sha512-i4cnuhfZZGKSH8as/BE4usnpoRdUwlB+LS6PONw4c8zpX5pl1yATKfDZiBOwUudAuZyFv514fXc0ejOx5y3/zw==, tarball: file:projects/substrate-processor.tgz} name: '@rush-temp/substrate-processor' version: 0.0.0 dependencies: @@ -7782,7 +7794,7 @@ packages: dev: false file:projects/substrate-typegen.tgz: - resolution: {integrity: sha512-HDrlRIPga2gKlNtr8I+7jJPb4UMygW8+7BEIBJsIOVjpk/jPBQX5EjWOg6bmuiL6PtODQTN4gDutgrmptMOLZg==, tarball: file:projects/substrate-typegen.tgz} + resolution: {integrity: sha512-hYFLl7+BgkKy15M2J31XXmpXDo67jmhmh1/j/C0uJbDPwiTKkLleFaDM9bPBgynB+WF3AmGv9XUkvCGvLawzbQ==, tarball: file:projects/substrate-typegen.tgz} name: '@rush-temp/substrate-typegen' version: 0.0.0 dependencies: @@ -7792,7 +7804,7 @@ packages: dev: false file:projects/tron-data.tgz: - resolution: {integrity: sha512-TSoknC0M1/aV4Jg070uz8i/V4YBRBcXkZzrSZwc1SaTQUOYVtQh8UpBdQyJLcRlVTi+K6SZtUM99wsE/VJgB3Q==, tarball: file:projects/tron-data.tgz} + resolution: {integrity: sha512-AnZ2mjIhDmxqQ6je661F2AzDRvUJHoZTkdqa0lWo5QScjjvvdEOGv42VAGXqPuYe4n4gY4Dj+K/eFzfeDCAQ+Q==, tarball: file:projects/tron-data.tgz} name: '@rush-temp/tron-data' version: 0.0.0 dependencies: @@ -7840,7 +7852,7 @@ packages: dev: false file:projects/tron-processor.tgz: - resolution: {integrity: sha512-jxOST5hrGQAEkbehlZuOZPDQ47SX+nQ1u0vp0UAmaj5XcgxoRm3u9oX807X2YOWxAfgrjX7fBWeIassnFewLVg==, tarball: file:projects/tron-processor.tgz} + resolution: {integrity: sha512-hyWAMCO/NbpJPmnw4Jo+CpML5m6wcS1j3gXe+IL7nZYUAjZzvNEa4NCtsaKaVRSNR1qr8kNz1++56Kt2VvQQ1A==, tarball: file:projects/tron-processor.tgz} name: '@rush-temp/tron-processor' version: 0.0.0 dependencies: @@ -7995,7 +8007,7 @@ packages: dev: false file:projects/util-internal-archive-client.tgz: - resolution: {integrity: sha512-cq/2eIibQ3DuQCV1bCylf0IPexvdRTf27x7BhOjR4lkGMS6DeqlF0qgVfct9WY4TCHljIey09EMnRwTJzn+7uQ==, tarball: file:projects/util-internal-archive-client.tgz} + resolution: {integrity: sha512-aTWT++wNI2gux8EwA+nSZgiaU6tM7Rh+DVnumLCpJF3z44ujTs0ipFTbnktt0pVOtkbN7G7oyTrZUXUMQgjthA==, tarball: file:projects/util-internal-archive-client.tgz} name: '@rush-temp/util-internal-archive-client' version: 0.0.0 dependencies: diff --git a/test/erc20-transfers/src/processor.ts b/test/erc20-transfers/src/processor.ts index 9533af597..fadd48850 100644 --- a/test/erc20-transfers/src/processor.ts +++ b/test/erc20-transfers/src/processor.ts @@ -10,12 +10,12 @@ const CONTRACT = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'.toLowerCase() const processor = new EvmBatchProcessor() .setPortal({ url: 'https://portal.sqd.dev/datasets/ethereum-mainnet', - bufferThreshold: 100 * 1024 * 1024, + bufferThreshold: 10 * 1024 * 1024, newBlockTimeout: 5000, }) .setRpcEndpoint('https://rpc.ankr.com/eth') .setFinalityConfirmation(500) - .setBlockRange({from: 20801368}) + .setBlockRange({from: 0}) .setFields({ block: {size: true}, log: {transactionHash: true}, @@ -32,20 +32,20 @@ processor.run(new TypeormDatabase({supportHotBlocks: true}), async ctx => { for (let block of ctx.blocks) { for (let log of block.logs) { if (log.address == CONTRACT && erc20.events.Transfer.is(log)) { - // let {from, to, value} = erc20.events.Transfer.decode(log) + let {from, to, value} = erc20.events.Transfer.decode(log) transfers.push(new Transfer({ - // id: log.id, - // blockNumber: block.header.height, - // timestamp: new Date(block.header.timestamp), - // tx: log.transactionHash, - // from, - // to, - // amount: value + id: log.id, + blockNumber: block.header.height, + timestamp: new Date(block.header.timestamp), + tx: log.transactionHash, + from, + to, + amount: value })) } } } ctx.log.info(`found ${transfers.length} transfers`) - // await ctx.store.insert(transfers) + await ctx.store.insert(transfers) }) diff --git a/util/portal-client/src/client.ts b/util/portal-client/src/client.ts index 8a33d4ec5..c55791d23 100644 --- a/util/portal-client/src/client.ts +++ b/util/portal-client/src/client.ts @@ -1,10 +1,9 @@ import {HttpClient} from '@subsquid/http-client' import type {Logger} from '@subsquid/logger' -import {AsyncQueue, ensureError, last, wait, withErrorContext} from '@subsquid/util-internal' +import {AsyncQueue, last, wait, withErrorContext} from '@subsquid/util-internal' import {splitLines} from '@subsquid/util-internal-archive-layout' import {addTimeout, TimeoutError} from '@subsquid/util-timeout' import assert from 'assert' -import {Readable} from 'stream' export interface PortalQuery { fromBlock: number @@ -104,133 +103,153 @@ export class PortalClient { }) } - async *finalizedStream( + finalizedStream( query: Q, stopOnHead = false - ): AsyncIterable { - let queue = new AsyncQueue(1) - let bufferSize = 0 - let isReady = false - let cache: B[] = [] - - const getBuffer = () => { - if (queue.isClosed()) return - let peeked = queue.peek() - // FIXME: is it a valid case? - if (peeked instanceof Error) return - - // buffer has been consumed, we need to reset - if (isReady && !peeked) { - reset() - } - - return peeked ?? cache - } - - const reset = () => { - bufferSize = 0 - isReady = false - cache.length = 0 - } - - const setReady = () => { - if (queue.isClosed()) return - if (isReady) return - queue.forcePut(cache) - isReady = true - cache = [] - } - - const waitForReset = async () => { - if (queue.isClosed()) return - await queue.wait() - reset() - } + ): ReadableStream { + let buffer = new BlocksBuffer(this.bufferThreshold) + let abortStream = new AbortController() const ingest = async () => { - let fromBlock = query.fromBlock - let toBlock = query.toBlock ?? Infinity + let startBlock = query.fromBlock + let endBlock = query.toBlock ?? Infinity - while (fromBlock <= toBlock) { - let archiveQuery = {...query, fromBlock} + while (startBlock <= endBlock && !abortStream.signal.aborted) { + let abortRequest = new AbortController() - let res = await this.http - .request('POST', this.getDatasetUrl(`finalized-stream`), { + let archiveQuery = {...query, fromBlock: startBlock} + let response = await this.http + .request('POST', this.getDatasetUrl('finalized-stream'), { json: archiveQuery, - retryAttempts: this.retryAttempts, httpTimeout: this.requestTimeout, + retryAttempts: this.retryAttempts, stream: true, + abort: anySignal([abortRequest.signal, abortStream.signal]), }) .catch( withErrorContext({ - archiveQuery, + query: archiveQuery, }) ) - // no blocks left - if (res.status == 204) { + if (response.status == 204) { if (stopOnHead) return - - await wait(1000) + await wait(1000, abortStream.signal) continue } try { - let stream = splitLines(res.body) - - while (true) { - let lines = await addTimeout(stream.next(), this.newBlockTimeout) - if (lines.done) break - - let buffer = getBuffer() - if (buffer == null) break - - let blocks = lines.value.map((line) => { - bufferSize += line.length - return JSON.parse(line) as B - }) - - // FIXME: won't it overflow stack? - buffer.push(...blocks) - - fromBlock = last(blocks).header.number + 1 - - if (bufferSize > this.bufferThreshold) { - setReady() - await waitForReset() + let newBlockTimeout = this.newBlockTimeout + let stream = (async function* () { + let chunks = response.body[Symbol.asyncIterator]() + while (true) { + let chunk = await addTimeout(chunks.next(), newBlockTimeout) + if (chunk.done) break + yield chunk.value as Buffer } - } + })() - if (bufferSize > 0) { - setReady() + for await (let lines of splitLines(stream)) { + let lastBlock = await buffer.put(lines) + startBlock = lastBlock + 1 } + + buffer.ready() } catch (err) { + abortRequest.abort() if (err instanceof TimeoutError) { - this.log?.warn( - `resetting stream, because we haven't seen a new blocks for ${this.newBlockTimeout} ms` - ) + this.log?.warn(`resetting stream due to inactivity for ${this.newBlockTimeout} ms`) } else { throw err } - } finally { - // FIXME: is it needed? - res.body.destroy() } } } - ingest().then( - () => queue.close(), - (err) => { - if (queue.isClosed()) return - queue.forcePut(ensureError(err)) - queue.close() - } - ) + return new ReadableStream({ + start: async (controller) => { + ingest().then( + () => buffer.close(), + (error) => { + if (buffer.isClosed()) return + controller.error(error) + buffer.close() + } + ) + }, + pull: async (controller) => { + let value = await buffer.take() + if (value) { + controller.enqueue(value) + } else { + controller.close() + } + }, + cancel: () => { + abortStream.abort() + }, + }) + } +} + +class BlocksBuffer { + private blocks: B[] = [] + private queue = new AsyncQueue(1) + private size = 0 + + constructor(private bufferSizeThreshold: number) {} + + async put(lines: string[]) { + for (let line of lines) { + this.size += line.length + this.blocks.push(JSON.parse(line)) + } + + let lastBlock = last(this.blocks).header.number + + if (this.size > this.bufferSizeThreshold) { + this.ready() + await this.queue.wait() + } + + return lastBlock + } + + async take() { + let value = await this.queue.take() + this.blocks = [] + this.size = 0 + return value + } + + ready() { + if (this.blocks.length == 0) return + this.queue.forcePut(this.blocks) + } + + close() { + return this.queue.close() + } + + isClosed() { + return this.queue.isClosed() + } +} + +// AbortSignal.any is available only in Node.js >=20 +function anySignal(signals: AbortSignal[]): AbortSignal { + const controller = new AbortController() - for await (let valueOrError of queue.iterate()) { - if (valueOrError instanceof Error) throw valueOrError - yield valueOrError + for (const signal of signals) { + if (signal.aborted) { + controller.abort(signal.reason) + break } + + signal.addEventListener('abort', () => controller.abort(signal.reason), { + signal: controller.signal, + }) } + + return controller.signal }