From f384110c2c9e59ab3c98c0631bdbe3b3d619fc10 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 14:31:11 -0300 Subject: [PATCH 01/16] feat: add import data from gql script This Nodejs script fetches ArDrive L1 transactions and bundles from a gateway through GQL interface. The script can receive some parameters like: - --gqlEndpoint # defaults to Goldsky - --minHeight # defaults to 0 - --maxHeight # defaults to the latest Arweave block - --blockRangeSize # defaults to 100 Example: `node --import=./register.js scripts/import-data/fetch-data-gql.ts --minHeight 1000000 --maxHeight 1000500` --- .gitignore | 2 + scripts/import-data/fetch-data-gql.ts | 303 ++++++++++++++++++++++++++ tsconfig.json | 2 +- 3 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 scripts/import-data/fetch-data-gql.ts diff --git a/.gitignore b/.gitignore index 4f69103f..b2b5210d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ /vendor /wallets test-results.xml +/scripts/import-data/bundles +/scripts/import-data/transactions # Generated docs /docs/sqlite/bundles diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts new file mode 100644 index 00000000..0377abbe --- /dev/null +++ b/scripts/import-data/fetch-data-gql.ts @@ -0,0 +1,303 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let GQL_ENDPOINT = 'https://arweave-search.goldsky.com/graphql'; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; +let BLOCK_RANGE_SIZE = 100; + +args.forEach((arg, index) => { + switch (arg) { + case '--gqlEndpoint': + GQL_ENDPOINT = args[index + 1]; + break; + case '--minHeight': + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + break; + case '--maxHeight': + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + break; + case '--blockRangeSize': + BLOCK_RANGE_SIZE = parseInt(args[index + 1], 10); + break; + default: + break; + } +}); + +const fetchWithRetry = async ( + url: string, + options: RequestInit = {}, + retries = 5, + retryInterval = 300, // interval in milliseconds +): Promise => { + let attempt = 0; + + while (attempt < retries) { + try { + const response = await fetch(url, options); + + if (response.ok) { + return response; + } + + throw new Error(`HTTP error! status: ${response.status}`); + } catch (error) { + attempt++; + + if (attempt >= retries) { + throw new Error( + `Fetch failed after ${retries} attempts: ${(error as Error).message}`, + ); + } + + const waitTime = retryInterval * attempt; + console.warn( + `Fetch attempt ${attempt} failed. Retrying in ${waitTime}ms...`, + ); + + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + throw new Error('Unexpected error in fetchWithRetry'); +}; + +const fetchLatestBlockHeight = async () => { + const response = await fetchWithRetry('https://arweave.net/info', { + method: 'GET', + }); + const { blocks } = await response.json(); + return blocks as number; +}; + +type BlockRange = { min: number; max: number }; +const getBlockRanges = ({ + minBlock, + maxBlock, + rangeSize, +}: { + minBlock: number; + maxBlock: number; + rangeSize: number; +}) => { + if (minBlock >= maxBlock || rangeSize <= 0) { + throw new Error( + 'Invalid input: ensure minBlock < maxBlock and rangeSize > 0', + ); + } + + const ranges: BlockRange[] = []; + let currentMin = minBlock; + + while (currentMin < maxBlock) { + const currentMax = Math.min(currentMin + rangeSize - 1, maxBlock); + ranges.push({ min: currentMin, max: currentMax }); + currentMin = currentMax + 1; + } + + return ranges; +}; + +const gqlQuery = ({ + minBlock, + maxBlock, + cursor, +}: { + minBlock: number; + maxBlock: number; + cursor?: string; +}) => ` +query { + transactions( + block: { + min: ${minBlock} + max: ${maxBlock} + } + tags: [ + { + name: "App-Name" + values: [ + "ArDrive-App" + "ArDrive-Web" + "ArDrive-CLI" + "ArDrive-Desktop" + "ArDrive-Mobile" + "ArDrive-Core" + "ArDrive-Sync" + ] + } + ] + first: 100 + sort: HEIGHT_ASC + after: "${cursor !== undefined ? cursor : ''}" + ) { + pageInfo { + hasNextPage + } + edges { + cursor + node { + id + bundledIn { + id + } + block { + height + } + } + } + } +} +`; + +const fetchGql = async ({ + minBlock, + maxBlock, + cursor, +}: { + minBlock: number; + maxBlock: number; + cursor?: string; +}) => { + const response = await fetchWithRetry(GQL_ENDPOINT, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ query: gqlQuery({ minBlock, maxBlock, cursor }) }), + }); + const { data } = await response.json(); + return data; +}; + +type BlockTransactions = Map>; +const getTransactionsForRange = async ({ min, max }: BlockRange) => { + let cursor: string | undefined; + let hasNextPage = true; + let page = 0; + const transactions: BlockTransactions = new Map(); + const bundles: BlockTransactions = new Map(); + + while (hasNextPage) { + console.log( + `Fetching transactions and bundles from block ${min} to ${max}. Page ${page}`, + ); + const { + transactions: { edges, pageInfo }, + } = await fetchGql({ + minBlock: min, + maxBlock: max, + cursor, + }); + + hasNextPage = pageInfo.hasNextPage; + cursor = hasNextPage ? edges[edges.length - 1].cursor : undefined; + + for (const edge of edges) { + const blockHeight = edge.node.block.height; + const bundleId = edge.node.bundledIn?.id; + const id = edge.node.id; + + if (!transactions.has(blockHeight)) { + transactions.set(blockHeight, new Set()); + } + if (!bundles.has(blockHeight)) { + bundles.set(blockHeight, new Set()); + } + + if (bundleId !== undefined) { + bundles.get(blockHeight)?.add(bundleId); + } else { + transactions.get(blockHeight)?.add(id); + } + } + + page++; + } + + return { transactions, bundles }; +}; + +const writeTransactionsToFile = async ({ + outputDir, + transactions, +}: { + outputDir: string; + transactions: BlockTransactions; +}) => { + try { + await fs.mkdir(outputDir, { recursive: true }); + } catch (error) { + console.error(`Failed to create directory: ${error}`); + throw error; + } + + for (const [height, ids] of transactions.entries()) { + if (ids.size === 0) continue; + + const content = JSON.stringify([...ids], null, 2); + const filePath = path.join(outputDir, `${height}.json`); + + try { + await fs.writeFile(filePath, content); + } catch (error) { + console.error(`Failed to write ${filePath}: ${error}`); + throw error; + } + } +}; + +(async () => { + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + const blockRanges = getBlockRanges({ + minBlock: MIN_BLOCK_HEIGHT, + maxBlock: MAX_BLOCK_HEIGHT, + rangeSize: BLOCK_RANGE_SIZE, + }); + + console.log( + `Starting to fetch transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); + + for (const range of blockRanges) { + const { transactions, bundles } = await getTransactionsForRange(range); + + await writeTransactionsToFile({ + outputDir: path.join(__dirname, 'transactions'), + transactions, + }); + await writeTransactionsToFile({ + outputDir: path.join(__dirname, 'bundles'), + transactions: bundles, + }); + + console.log( + `Transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT} saved!`, + ); + } +})(); diff --git a/tsconfig.json b/tsconfig.json index 70473ac3..ada5b8b8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -25,5 +25,5 @@ "swc": true, "esm": true }, - "include": ["src", "test"] + "include": ["src", "test", "scripts"] } From 6a5088073b8b242101100a3bad0f9d5350d32090 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 16:53:28 -0300 Subject: [PATCH 02/16] Update scripts/import-data/fetch-data-gql.ts Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- scripts/import-data/fetch-data-gql.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 0377abbe..7c020013 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -188,7 +188,11 @@ const fetchGql = async ({ }, body: JSON.stringify({ query: gqlQuery({ minBlock, maxBlock, cursor }) }), }); - const { data } = await response.json(); + const result = await response.json(); + if (result.errors) { + throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`); + } + const { data } = result; return data; }; From d6cfb5f2809475c9418b0c28f61062ac5d61a95b Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 16:54:48 -0300 Subject: [PATCH 03/16] Update scripts/import-data/fetch-data-gql.ts Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- scripts/import-data/fetch-data-gql.ts | 28 +++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 7c020013..c380fa0e 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -30,16 +30,36 @@ let BLOCK_RANGE_SIZE = 100; args.forEach((arg, index) => { switch (arg) { case '--gqlEndpoint': - GQL_ENDPOINT = args[index + 1]; + if (args[index + 1]) { + GQL_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --gqlEndpoint'); + process.exit(1); + } break; case '--minHeight': - MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } break; case '--maxHeight': - MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } break; case '--blockRangeSize': - BLOCK_RANGE_SIZE = parseInt(args[index + 1], 10); + if (args[index + 1]) { + BLOCK_RANGE_SIZE = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --blockRangeSize'); + process.exit(1); + } break; default: break; From 16f07ca8a0ac56f712fdea84ea3c9926acc6189b Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 17:03:49 -0300 Subject: [PATCH 04/16] chore: update log --- scripts/import-data/fetch-data-gql.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index c380fa0e..fd23c460 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -321,7 +321,7 @@ const writeTransactionsToFile = async ({ }); console.log( - `Transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT} saved!`, + `Transactions and bundles from block ${range.min} to ${range.max} saved!`, ); } })(); From 4bbf4b67bca5c7004725c77c79aa7ea563df6a9a Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 18:12:50 -0300 Subject: [PATCH 05/16] chore: update to 15 retry attempts --- scripts/import-data/fetch-data-gql.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index fd23c460..579bcebb 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -69,7 +69,7 @@ args.forEach((arg, index) => { const fetchWithRetry = async ( url: string, options: RequestInit = {}, - retries = 5, + retries = 15, retryInterval = 300, // interval in milliseconds ): Promise => { let attempt = 0; @@ -320,8 +320,10 @@ const writeTransactionsToFile = async ({ transactions: bundles, }); - console.log( - `Transactions and bundles from block ${range.min} to ${range.max} saved!`, - ); + if (transactions.size !== 0 || bundles.size !== 0) { + console.log( + `Transactions and bundles from block ${range.min} to ${range.max} saved!`, + ); + } } })(); From 1f15119808584abd6ccb44a533b8bfa1dba83345 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 10 Dec 2024 19:57:26 -0300 Subject: [PATCH 06/16] feat(routes): return 429 when bundle importer queue is full --- docker-compose.yaml | 1 + src/config.ts | 6 ++++++ src/routes/ar-io.ts | 5 +++++ src/system.ts | 3 ++- src/workers/bundle-data-importer.ts | 9 ++++++--- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 3d10bf7e..6a2bf532 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -108,6 +108,7 @@ services: - ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-} - BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-} - CLICKHOUSE_URL=${CLICKHOUSE_URL:-} + - BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-} networks: - ar-io-network depends_on: diff --git a/src/config.ts b/src/config.ts index a0feac1d..fba4d2e8 100644 --- a/src/config.ts +++ b/src/config.ts @@ -198,6 +198,12 @@ export const MAX_DATA_ITEM_QUEUE_SIZE = +env.varOrDefault( '100000', ); +// The maximum number of bundles to queue for unbundling before skipping +export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault( + 'BUNDLE_DATA_IMPORTER_QUEUE_SIZE', + '1000', +); + // // Verification // diff --git a/src/routes/ar-io.ts b/src/routes/ar-io.ts index cbe5aa58..067fcf3a 100644 --- a/src/routes/ar-io.ts +++ b/src/routes/ar-io.ts @@ -210,6 +210,11 @@ arIoRouter.post( return; } + if (await system.bundleDataImporter.isQueueFull()) { + res.status(429).send('Bundle importer queue is full'); + return; + } + system.eventEmitter.emit(events.ANS104_BUNDLE_QUEUED, { id, root_tx_id: id, diff --git a/src/system.ts b/src/system.ts index 65c1560e..b802057e 100644 --- a/src/system.ts +++ b/src/system.ts @@ -469,11 +469,12 @@ metrics.registerQueueLengthGauge('ans104Unbundler', { length: () => ans104Unbundler.queueDepth(), }); -const bundleDataImporter = new BundleDataImporter({ +export const bundleDataImporter = new BundleDataImporter({ log, contiguousDataSource: backgroundContiguousDataSource, ans104Unbundler, workerCount: config.ANS104_DOWNLOAD_WORKERS, + maxQueueSize: config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE, }); metrics.registerQueueLengthGauge('bundleDataImporter', { length: () => bundleDataImporter.queueDepth(), diff --git a/src/workers/bundle-data-importer.ts b/src/workers/bundle-data-importer.ts index 8ef9741d..de4d253b 100644 --- a/src/workers/bundle-data-importer.ts +++ b/src/workers/bundle-data-importer.ts @@ -25,8 +25,7 @@ import { NormalizedDataItem, PartialJsonTransaction, } from '../types.js'; - -const DEFAULT_MAX_QUEUE_SIZE = 1000; +import * as config from '../config.js'; interface IndexProperty { index: number; @@ -56,7 +55,7 @@ export class BundleDataImporter { contiguousDataSource, ans104Unbundler, workerCount, - maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, + maxQueueSize = config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE, }: { log: winston.Logger; contiguousDataSource: ContiguousDataSource; @@ -138,4 +137,8 @@ export class BundleDataImporter { queueDepth(): number { return this.queue.length(); } + + async isQueueFull(): Promise { + return this.queue.length() >= this.maxQueueSize; + } } From b106aab6d69e28d51e8f31e83978f731086762a5 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 10:51:04 -0300 Subject: [PATCH 07/16] fix: option to fetch root tx id --- scripts/import-data/fetch-data-gql.ts | 68 +++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 579bcebb..6dee0bfd 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -26,6 +26,7 @@ let GQL_ENDPOINT = 'https://arweave-search.goldsky.com/graphql'; let MIN_BLOCK_HEIGHT = 0; let MAX_BLOCK_HEIGHT: number | undefined; let BLOCK_RANGE_SIZE = 100; +let BUNDLES_FETCH_ROOT_TX = true; args.forEach((arg, index) => { switch (arg) { @@ -61,6 +62,14 @@ args.forEach((arg, index) => { process.exit(1); } break; + case '--fetchOnlyRootTx': + if (args[index + 1]) { + BUNDLES_FETCH_ROOT_TX = args[index + 1] === 'true'; + } else { + console.error('Missing value for --fetchOnlyRootTx'); + process.exit(1); + } + break; default: break; } @@ -140,7 +149,7 @@ const getBlockRanges = ({ return ranges; }; -const gqlQuery = ({ +const txsGqlQuery = ({ minBlock, maxBlock, cursor, @@ -192,6 +201,52 @@ query { } `; +const rootTxGqlQuery = (txId: string) => ` +query { + transaction( + id: "${txId}" + ) { + bundledIn { + id + } + } +} +`; + +const getRootTxId = async (txId: string) => { + let rootTxId: string | undefined; + let currentId = txId; + + while (rootTxId === undefined) { + const response = await fetchWithRetry(GQL_ENDPOINT, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: rootTxGqlQuery(currentId), + }), + }); + + const result = await response.json(); + + if (result.errors) { + throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`); + } + + const { data } = result; + const bundleId = data.transaction.bundledIn?.id; + + if (bundleId === undefined) { + rootTxId = currentId; + } else { + currentId = bundleId; + } + } + + return rootTxId; +}; + const fetchGql = async ({ minBlock, maxBlock, @@ -206,7 +261,9 @@ const fetchGql = async ({ headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify({ query: gqlQuery({ minBlock, maxBlock, cursor }) }), + body: JSON.stringify({ + query: txsGqlQuery({ minBlock, maxBlock, cursor }), + }), }); const result = await response.json(); if (result.errors) { @@ -252,7 +309,12 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { } if (bundleId !== undefined) { - bundles.get(blockHeight)?.add(bundleId); + if (BUNDLES_FETCH_ROOT_TX) { + const rootTxId = await getRootTxId(bundleId); + bundles.get(blockHeight)?.add(rootTxId); + } else { + bundles.get(blockHeight)?.add(bundleId); + } } else { transactions.get(blockHeight)?.add(id); } From 4c7ab81534397ba1d950539deae61f1279087fa2 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 10:54:18 -0300 Subject: [PATCH 08/16] fix: option for gql tags --- scripts/import-data/fetch-data-gql.ts | 37 +++++++++++++++++---------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 6dee0bfd..29cfac9d 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -27,6 +27,20 @@ let MIN_BLOCK_HEIGHT = 0; let MAX_BLOCK_HEIGHT: number | undefined; let BLOCK_RANGE_SIZE = 100; let BUNDLES_FETCH_ROOT_TX = true; +let GQL_TAGS = `[ + { + name: "App-Name" + values: [ + "ArDrive-App" + "ArDrive-Web" + "ArDrive-CLI" + "ArDrive-Desktop" + "ArDrive-Mobile" + "ArDrive-Core" + "ArDrive-Sync" + ] + } +]`; args.forEach((arg, index) => { switch (arg) { @@ -70,6 +84,14 @@ args.forEach((arg, index) => { process.exit(1); } break; + case '--gqlTags': + if (args[index + 1]) { + GQL_TAGS = args[index + 1]; + } else { + console.error('Missing value for --gqlTags'); + process.exit(1); + } + break; default: break; } @@ -164,20 +186,7 @@ query { min: ${minBlock} max: ${maxBlock} } - tags: [ - { - name: "App-Name" - values: [ - "ArDrive-App" - "ArDrive-Web" - "ArDrive-CLI" - "ArDrive-Desktop" - "ArDrive-Mobile" - "ArDrive-Core" - "ArDrive-Sync" - ] - } - ] + tags: ${GQL_TAGS} first: 100 sort: HEIGHT_ASC after: "${cursor !== undefined ? cursor : ''}" From 1da55570048717f2ddfce8b7835c156c041480ee Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 11:33:22 -0300 Subject: [PATCH 09/16] better logging --- scripts/import-data/fetch-data-gql.ts | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 29cfac9d..47e1d60a 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -18,6 +18,7 @@ import * as fs from 'node:fs/promises'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; +import { transport } from 'winston'; const args = process.argv.slice(2); const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -286,14 +287,10 @@ type BlockTransactions = Map>; const getTransactionsForRange = async ({ min, max }: BlockRange) => { let cursor: string | undefined; let hasNextPage = true; - let page = 0; const transactions: BlockTransactions = new Map(); const bundles: BlockTransactions = new Map(); while (hasNextPage) { - console.log( - `Fetching transactions and bundles from block ${min} to ${max}. Page ${page}`, - ); const { transactions: { edges, pageInfo }, } = await fetchGql({ @@ -328,8 +325,6 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { transactions.get(blockHeight)?.add(id); } } - - page++; } return { transactions, bundles }; @@ -364,6 +359,14 @@ const writeTransactionsToFile = async ({ } }; +const countTransactions = (map: BlockTransactions) => { + let total = 0; + map.forEach((set) => { + total += set.size; + }); + return total; +}; + (async () => { if (MAX_BLOCK_HEIGHT === undefined) { MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); @@ -379,6 +382,9 @@ const writeTransactionsToFile = async ({ `Starting to fetch transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, ); + let txCount = 0; + let bundleCount = 0; + for (const range of blockRanges) { const { transactions, bundles } = await getTransactionsForRange(range); @@ -391,10 +397,16 @@ const writeTransactionsToFile = async ({ transactions: bundles, }); + txCount += countTransactions(transactions); + bundleCount += countTransactions(bundles); + if (transactions.size !== 0 || bundles.size !== 0) { console.log( `Transactions and bundles from block ${range.min} to ${range.max} saved!`, ); + console.log( + `Saved transactions: ${txCount}, Saved bundles: ${bundleCount}`, + ); } } })(); From eef376a5ee07b04248dca727443e33c21a62f82a Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 11:35:49 -0300 Subject: [PATCH 10/16] writes minified json --- scripts/import-data/fetch-data-gql.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 47e1d60a..7bb2d55b 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -347,7 +347,7 @@ const writeTransactionsToFile = async ({ for (const [height, ids] of transactions.entries()) { if (ids.size === 0) continue; - const content = JSON.stringify([...ids], null, 2); + const content = JSON.stringify([...ids]); const filePath = path.join(outputDir, `${height}.json`); try { From 2561dd19302d1636c147e172cd6b3eb59acf9c92 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 14:51:28 -0300 Subject: [PATCH 11/16] create utils --- scripts/import-data/fetch-data-gql.ts | 48 +----------------- scripts/import-data/utils.ts | 70 +++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 47 deletions(-) create mode 100644 scripts/import-data/utils.ts diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 7bb2d55b..7b6d4945 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -18,7 +18,7 @@ import * as fs from 'node:fs/promises'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; -import { transport } from 'winston'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; const args = process.argv.slice(2); const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -98,52 +98,6 @@ args.forEach((arg, index) => { } }); -const fetchWithRetry = async ( - url: string, - options: RequestInit = {}, - retries = 15, - retryInterval = 300, // interval in milliseconds -): Promise => { - let attempt = 0; - - while (attempt < retries) { - try { - const response = await fetch(url, options); - - if (response.ok) { - return response; - } - - throw new Error(`HTTP error! status: ${response.status}`); - } catch (error) { - attempt++; - - if (attempt >= retries) { - throw new Error( - `Fetch failed after ${retries} attempts: ${(error as Error).message}`, - ); - } - - const waitTime = retryInterval * attempt; - console.warn( - `Fetch attempt ${attempt} failed. Retrying in ${waitTime}ms...`, - ); - - await new Promise((resolve) => setTimeout(resolve, waitTime)); - } - } - - throw new Error('Unexpected error in fetchWithRetry'); -}; - -const fetchLatestBlockHeight = async () => { - const response = await fetchWithRetry('https://arweave.net/info', { - method: 'GET', - }); - const { blocks } = await response.json(); - return blocks as number; -}; - type BlockRange = { min: number; max: number }; const getBlockRanges = ({ minBlock, diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts new file mode 100644 index 00000000..ad3fe90a --- /dev/null +++ b/scripts/import-data/utils.ts @@ -0,0 +1,70 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +export const fetchWithRetry = async ( + url: string, + options: RequestInit = {}, + retries = 15, + retryInterval = 300, // interval in milliseconds +): Promise => { + let attempt = 0; + + while (attempt < retries) { + try { + const response = await fetch(url, options); + + if (response.ok) { + return response; + } + if (response.status === 429) { + console.warn( + `Import queue is full! Waiting 30 seconds before retrying...`, + ); + await new Promise((resolve) => setTimeout(resolve, 30000)); + continue; + } + + throw new Error(`HTTP error! status: ${response.status}`); + } catch (error) { + attempt++; + + if (attempt >= retries) { + throw new Error( + `Fetch failed after ${retries} attempts: ${(error as Error).message}`, + ); + } + + const waitTime = retryInterval * attempt; + console.warn( + `Fetch attempt ${attempt} failed. Retrying in ${waitTime}ms...`, + ); + + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + throw new Error('Unexpected error in fetchWithRetry'); +}; + +export const fetchLatestBlockHeight = async () => { + const response = await fetchWithRetry('https://arweave.net/info', { + method: 'GET', + }); + const { blocks } = await response.json(); + return blocks as number; +}; From 4b57bb4080aadc4e2e91549a47a1388a28f65781 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 14:51:55 -0300 Subject: [PATCH 12/16] feat: queue transactions and bundles --- scripts/import-data/import-data.ts | 239 +++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 scripts/import-data/import-data.ts diff --git a/scripts/import-data/import-data.ts b/scripts/import-data/import-data.ts new file mode 100644 index 00000000..f0a32831 --- /dev/null +++ b/scripts/import-data/import-data.ts @@ -0,0 +1,239 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let ARIO_ENDPOINT = 'http://localhost:4000'; +let ADMIN_KEY: string | undefined; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; +let TRANSACTIONS_DIR = path.join(__dirname, 'transactions'); +let BUNDLES_DIR = path.join(__dirname, 'bundles'); + +type ImportType = 'transaction' | 'bundle' | 'both'; +let IMPORT_TYPE: ImportType | undefined; + +args.forEach((arg, index) => { + switch (arg) { + case '--adminKey': + if (args[index + 1]) { + ADMIN_KEY = args[index + 1]; + } else { + console.error('Missing value for --adminKey'); + process.exit(1); + } + break; + case '--arioNode': + if (args[index + 1]) { + ARIO_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --arioNode'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + case '--transactionsDir': + if (args[index + 1]) { + TRANSACTIONS_DIR = args[index + 1]; + } else { + console.error('Missing value for --transactionsDir'); + process.exit(1); + } + break; + case '--bundlesDir': + if (args[index + 1]) { + BUNDLES_DIR = args[index + 1]; + } else { + console.error('Missing value for --bundlesDir'); + process.exit(1); + } + break; + case '--importType': { + const importType = args[index + 1]; + if ( + importType === 'transaction' || + importType === 'bundle' || + importType === 'both' + ) { + IMPORT_TYPE = importType; + } else { + console.error('Missing value for --importType'); + process.exit(1); + } + break; + } + default: + break; + } +}); + +const getFilesInRange = async ({ + folder, + min, + max, +}: { + folder: string; + min: number; + max: number; +}): Promise => { + try { + const files = await fs.readdir(folder); + + const filesInRange = files + .filter((file) => path.extname(file) === '.json') + .filter((file) => { + const match = file.match(/^\d+/); + const number = match ? parseInt(match[0], 10) : null; + return number !== null && number >= min && number <= max; + }); + + return filesInRange; + } catch (error) { + throw new Error(`Error processing files: ${(error as Error).message}`); + } +}; + +const importFromFiles = async ({ + files, + type, +}: { + files: string[]; + type: 'transactions' | 'bundles'; +}) => { + let counter = 0; + let folder: string; + let endpoint: string; + switch (type) { + case 'transactions': + folder = TRANSACTIONS_DIR; + endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`; + break; + case 'bundles': + folder = BUNDLES_DIR; + endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`; + break; + default: + throw new Error('Invalid type'); + } + + for (const file of files) { + const filePath = path.join(folder, file); + const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; + console.log( + `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`, + ); + + for (const id of ids) { + counter++; + await fetchWithRetry(endpoint, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${ADMIN_KEY}`, + }, + body: JSON.stringify({ id }), + }); + } + } + + return { queued: counter }; +}; + +(async () => { + if (ADMIN_KEY === undefined) { + throw new Error('Missing admin key'); + } + + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + let transactionFiles: string[] = []; + let bundleFiles: string[] = []; + + switch (IMPORT_TYPE ?? 'both') { + case 'transaction': + transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + case 'bundle': + bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + case 'both': + transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + } + + console.log( + `Starting to import transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); + + const queuedTransactions = await importFromFiles({ + files: transactionFiles, + type: 'transactions', + }); + + if (queuedTransactions.queued > 0) { + console.log(`Finished queueing ${queuedTransactions.queued} transactions`); + } + + const queuedBundles = await importFromFiles({ + files: bundleFiles, + type: 'bundles', + }); + + if (queuedBundles.queued > 0) { + console.log(`Finished queueing ${queuedBundles.queued} bundles`); + } +})(); From 72c992235f602fa75deaf54f02dd78b60dc03366 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 12 Dec 2024 16:17:37 -0300 Subject: [PATCH 13/16] chore: add script to count fetched ids --- scripts/import-data/count-fetched-ids.ts | 174 +++++++++++++++++++++++ scripts/import-data/import-data.ts | 32 +---- scripts/import-data/utils.ts | 30 ++++ 3 files changed, 209 insertions(+), 27 deletions(-) create mode 100644 scripts/import-data/count-fetched-ids.ts diff --git a/scripts/import-data/count-fetched-ids.ts b/scripts/import-data/count-fetched-ids.ts new file mode 100644 index 00000000..7f9dbd69 --- /dev/null +++ b/scripts/import-data/count-fetched-ids.ts @@ -0,0 +1,174 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { getFilesInRange } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let TRANSACTIONS_DIR = path.join(__dirname, 'transactions'); +let BUNDLES_DIR = path.join(__dirname, 'bundles'); +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT = Infinity; + +args.forEach((arg, index) => { + switch (arg) { + case '--transactionsDir': + if (args[index + 1]) { + TRANSACTIONS_DIR = args[index + 1]; + } else { + console.error('Missing value for --transactionsDir'); + process.exit(1); + } + break; + case '--bundlesDir': + if (args[index + 1]) { + BUNDLES_DIR = args[index + 1]; + } else { + console.error('Missing value for --bundlesDir'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + default: + break; + } +}); + +const countIds = async ({ + folder, + files, +}: { + folder: string; + files: string[]; +}) => { + let counter = 0; + for (const file of files) { + const filePath = path.join(folder, file); + const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; + counter += ids.length; + } + return counter; +}; + +// const importFromFiles = async ({ +// files, +// type, +// }: { +// files: string[]; +// type: 'transactions' | 'bundles'; +// }) => { +// let counter = 0; +// let folder: string; +// let endpoint: string; +// switch (type) { +// case 'transactions': +// folder = TRANSACTIONS_DIR; +// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`; +// break; +// case 'bundles': +// folder = BUNDLES_DIR; +// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`; +// break; +// default: +// throw new Error('Invalid type'); +// } + +// for (const file of files) { +// const filePath = path.join(folder, file); +// const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; +// console.log( +// `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`, +// ); + +// for (const id of ids) { +// counter++; +// await fetchWithRetry(endpoint, { +// method: 'POST', +// headers: { +// 'Content-Type': 'application/json', +// Authorization: `Bearer ${ADMIN_KEY}`, +// }, +// body: JSON.stringify({ id }), +// }); +// } +// } + +// return { queued: counter }; +// }; + +(async () => { + const transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + const bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + + const firstTransactionHeight = parseInt( + transactionFiles[0].split('.')[0], + 10, + ); + const lastTransactionHeight = parseInt( + transactionFiles[transactionFiles.length - 1].split('.')[0], + 10, + ); + const transactionCount = await countIds({ + folder: TRANSACTIONS_DIR, + files: transactionFiles, + }); + + const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); + const lastBundleHeight = parseInt( + bundleFiles[bundleFiles.length - 1].split('.')[0], + 10, + ); + const bundleCount = await countIds({ + folder: BUNDLES_DIR, + files: bundleFiles, + }); + + console.log( + `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, + ); + + console.log( + `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, + ); +})(); diff --git a/scripts/import-data/import-data.ts b/scripts/import-data/import-data.ts index f0a32831..57f30eb9 100644 --- a/scripts/import-data/import-data.ts +++ b/scripts/import-data/import-data.ts @@ -18,7 +18,11 @@ import * as fs from 'node:fs/promises'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; -import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +import { + fetchLatestBlockHeight, + fetchWithRetry, + getFilesInRange, +} from './utils.js'; const args = process.argv.slice(2); const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -102,32 +106,6 @@ args.forEach((arg, index) => { } }); -const getFilesInRange = async ({ - folder, - min, - max, -}: { - folder: string; - min: number; - max: number; -}): Promise => { - try { - const files = await fs.readdir(folder); - - const filesInRange = files - .filter((file) => path.extname(file) === '.json') - .filter((file) => { - const match = file.match(/^\d+/); - const number = match ? parseInt(match[0], 10) : null; - return number !== null && number >= min && number <= max; - }); - - return filesInRange; - } catch (error) { - throw new Error(`Error processing files: ${(error as Error).message}`); - } -}; - const importFromFiles = async ({ files, type, diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts index ad3fe90a..ae4b13f5 100644 --- a/scripts/import-data/utils.ts +++ b/scripts/import-data/utils.ts @@ -16,6 +16,9 @@ * along with this program. If not, see . */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; + export const fetchWithRetry = async ( url: string, options: RequestInit = {}, @@ -28,6 +31,7 @@ export const fetchWithRetry = async ( try { const response = await fetch(url, options); + console.log({ status: response.status }); if (response.ok) { return response; } @@ -68,3 +72,29 @@ export const fetchLatestBlockHeight = async () => { const { blocks } = await response.json(); return blocks as number; }; + +export const getFilesInRange = async ({ + folder, + min, + max, +}: { + folder: string; + min: number; + max: number; +}): Promise => { + try { + const files = await fs.readdir(folder); + + const filesInRange = files + .filter((file) => path.extname(file) === '.json') + .filter((file) => { + const match = file.match(/^\d+/); + const number = match ? parseInt(match[0], 10) : null; + return number !== null && number >= min && number <= max; + }); + + return filesInRange; + } catch (error) { + throw new Error(`Error processing files: ${(error as Error).message}`); + } +}; From 035c895cf3290302d3352b32efcebe8deeb7e44a Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Fri, 13 Dec 2024 13:37:03 -0300 Subject: [PATCH 14/16] chore: add export parquet and fixes to fetch-data-gql --- .gitignore | 1 + scripts/import-data/count-fetched-ids.ts | 106 +++++++-------------- scripts/import-data/export-parquet.ts | 114 +++++++++++++++++++++++ scripts/import-data/fetch-data-gql.ts | 9 +- scripts/import-data/utils.ts | 2 +- 5 files changed, 156 insertions(+), 76 deletions(-) create mode 100644 scripts/import-data/export-parquet.ts diff --git a/.gitignore b/.gitignore index b2b5210d..51c23f50 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ test-results.xml /scripts/import-data/bundles /scripts/import-data/transactions +/scripts/import-data/parquet # Generated docs /docs/sqlite/bundles diff --git a/scripts/import-data/count-fetched-ids.ts b/scripts/import-data/count-fetched-ids.ts index 7f9dbd69..69a3c851 100644 --- a/scripts/import-data/count-fetched-ids.ts +++ b/scripts/import-data/count-fetched-ids.ts @@ -83,52 +83,6 @@ const countIds = async ({ return counter; }; -// const importFromFiles = async ({ -// files, -// type, -// }: { -// files: string[]; -// type: 'transactions' | 'bundles'; -// }) => { -// let counter = 0; -// let folder: string; -// let endpoint: string; -// switch (type) { -// case 'transactions': -// folder = TRANSACTIONS_DIR; -// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`; -// break; -// case 'bundles': -// folder = BUNDLES_DIR; -// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`; -// break; -// default: -// throw new Error('Invalid type'); -// } - -// for (const file of files) { -// const filePath = path.join(folder, file); -// const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; -// console.log( -// `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`, -// ); - -// for (const id of ids) { -// counter++; -// await fetchWithRetry(endpoint, { -// method: 'POST', -// headers: { -// 'Content-Type': 'application/json', -// Authorization: `Bearer ${ADMIN_KEY}`, -// }, -// body: JSON.stringify({ id }), -// }); -// } -// } - -// return { queued: counter }; -// }; - (async () => { const transactionFiles = await getFilesInRange({ folder: TRANSACTIONS_DIR, @@ -141,34 +95,40 @@ const countIds = async ({ max: MAX_BLOCK_HEIGHT, }); - const firstTransactionHeight = parseInt( - transactionFiles[0].split('.')[0], - 10, - ); - const lastTransactionHeight = parseInt( - transactionFiles[transactionFiles.length - 1].split('.')[0], - 10, - ); - const transactionCount = await countIds({ - folder: TRANSACTIONS_DIR, - files: transactionFiles, - }); + console.log({ transactionFiles, bundleFiles }); - const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); - const lastBundleHeight = parseInt( - bundleFiles[bundleFiles.length - 1].split('.')[0], - 10, - ); - const bundleCount = await countIds({ - folder: BUNDLES_DIR, - files: bundleFiles, - }); + if (transactionFiles.length > 0) { + const firstTransactionHeight = parseInt( + transactionFiles[0].split('.')[0], + 10, + ); + const lastTransactionHeight = parseInt( + transactionFiles[transactionFiles.length - 1].split('.')[0], + 10, + ); + const transactionCount = await countIds({ + folder: TRANSACTIONS_DIR, + files: transactionFiles, + }); - console.log( - `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, - ); + console.log( + `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, + ); + } - console.log( - `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, - ); + if (bundleFiles.length > 0) { + const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); + const lastBundleHeight = parseInt( + bundleFiles[bundleFiles.length - 1].split('.')[0], + 10, + ); + const bundleCount = await countIds({ + folder: BUNDLES_DIR, + files: bundleFiles, + }); + + console.log( + `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, + ); + } })(); diff --git a/scripts/import-data/export-parquet.ts b/scripts/import-data/export-parquet.ts new file mode 100644 index 00000000..4e041542 --- /dev/null +++ b/scripts/import-data/export-parquet.ts @@ -0,0 +1,114 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let ARIO_ENDPOINT = 'http://localhost:4000'; +let ADMIN_KEY: string | undefined; +let OUTPUT_DIR = path.join(__dirname, 'parquet'); +let MAX_FILE_ROWS = 1_000_000; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; + +args.forEach((arg, index) => { + switch (arg) { + case '--adminKey': + if (args[index + 1]) { + ADMIN_KEY = args[index + 1]; + } else { + console.error('Missing value for --adminKey'); + process.exit(1); + } + break; + case '--arioNode': + if (args[index + 1]) { + ARIO_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --arioNode'); + process.exit(1); + } + break; + case '--outputDir': + if (args[index + 1]) { + OUTPUT_DIR = args[index + 1]; + } else { + console.error('Missing value for --outputDir'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + + case '--maxFileRows': + if (args[index + 1]) { + MAX_FILE_ROWS = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxFileRows'); + process.exit(1); + } + break; + default: + break; + } +}); + +(async () => { + if (ADMIN_KEY === undefined) { + throw new Error('Missing admin key'); + } + + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + await fetchWithRetry(`${ARIO_ENDPOINT}/ar-io/admin/export-parquet`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${ADMIN_KEY}`, + }, + body: JSON.stringify({ + outputDir: OUTPUT_DIR, + startHeight: MIN_BLOCK_HEIGHT, + endHeight: MAX_BLOCK_HEIGHT, + maxFileRows: MAX_FILE_ROWS, + }), + }); + + console.log( + `Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); +})(); diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 7b6d4945..69bf3f70 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -199,7 +199,7 @@ const getRootTxId = async (txId: string) => { } const { data } = result; - const bundleId = data.transaction.bundledIn?.id; + const bundleId = data.transaction?.bundledIn?.id; if (bundleId === undefined) { rootTxId = currentId; @@ -243,6 +243,7 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { let hasNextPage = true; const transactions: BlockTransactions = new Map(); const bundles: BlockTransactions = new Map(); + const rootTxIdsForBundles: Map = new Map(); while (hasNextPage) { const { @@ -270,7 +271,11 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { if (bundleId !== undefined) { if (BUNDLES_FETCH_ROOT_TX) { - const rootTxId = await getRootTxId(bundleId); + let rootTxId = rootTxIdsForBundles.get(bundleId); + if (rootTxId === undefined) { + rootTxId = await getRootTxId(bundleId); + rootTxIdsForBundles.set(bundleId, rootTxId); + } bundles.get(blockHeight)?.add(rootTxId); } else { bundles.get(blockHeight)?.add(bundleId); diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts index ae4b13f5..59315f4f 100644 --- a/scripts/import-data/utils.ts +++ b/scripts/import-data/utils.ts @@ -31,7 +31,7 @@ export const fetchWithRetry = async ( try { const response = await fetch(url, options); - console.log({ status: response.status }); + console.log(response); if (response.ok) { return response; } From 3e409f457faeadbe05c45a7b698995e62aa50eeb Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Fri, 13 Dec 2024 14:18:17 -0300 Subject: [PATCH 15/16] fix: track missing root tx ids --- scripts/import-data/fetch-data-gql.ts | 40 +++++++++++++++++++++++---- scripts/import-data/utils.ts | 1 - 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 69bf3f70..fe4fd2ce 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -199,7 +199,13 @@ const getRootTxId = async (txId: string) => { } const { data } = result; - const bundleId = data.transaction?.bundledIn?.id; + + if (data.transaction === null) { + console.warn("Can't get any results while fetching rootTxId for", txId); + return; + } + + const bundleId = data.transaction.bundledIn?.id; if (bundleId === undefined) { rootTxId = currentId; @@ -237,6 +243,8 @@ const fetchGql = async ({ return data; }; +const txsMissingRootTx = new Set(); + type BlockTransactions = Map>; const getTransactionsForRange = async ({ min, max }: BlockRange) => { let cursor: string | undefined; @@ -271,12 +279,15 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { if (bundleId !== undefined) { if (BUNDLES_FETCH_ROOT_TX) { - let rootTxId = rootTxIdsForBundles.get(bundleId); + const cachedRootTxId = rootTxIdsForBundles.get(bundleId); + const rootTxId = cachedRootTxId ?? (await getRootTxId(bundleId)); + if (rootTxId === undefined) { - rootTxId = await getRootTxId(bundleId); + txsMissingRootTx.add(bundleId); + } else { rootTxIdsForBundles.set(bundleId, rootTxId); + bundles.get(blockHeight)?.add(rootTxId); } - bundles.get(blockHeight)?.add(rootTxId); } else { bundles.get(blockHeight)?.add(bundleId); } @@ -359,7 +370,7 @@ const countTransactions = (map: BlockTransactions) => { txCount += countTransactions(transactions); bundleCount += countTransactions(bundles); - if (transactions.size !== 0 || bundles.size !== 0) { + if (transactions.size > 0 || bundles.size > 0) { console.log( `Transactions and bundles from block ${range.min} to ${range.max} saved!`, ); @@ -367,5 +378,24 @@ const countTransactions = (map: BlockTransactions) => { `Saved transactions: ${txCount}, Saved bundles: ${bundleCount}`, ); } + + if (txsMissingRootTx.size > 0) { + console.warn( + `Failed to fetch rootTxId for ${txsMissingRootTx.size} transactions`, + ); + + const txMissingRootTxPath = path.join( + __dirname, + 'txs-missing-root-tx-id.json', + ); + await fs.writeFile( + txMissingRootTxPath, + JSON.stringify([...txsMissingRootTx]), + ); + + console.log( + 'Transactions missing root transaction id were written to txs-missing-root-tx-id.json file', + ); + } } })(); diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts index 59315f4f..b2e7cecc 100644 --- a/scripts/import-data/utils.ts +++ b/scripts/import-data/utils.ts @@ -31,7 +31,6 @@ export const fetchWithRetry = async ( try { const response = await fetch(url, options); - console.log(response); if (response.ok) { return response; } From 5e8fb3885cbac272655ffddc3c30defd805db653 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Fri, 13 Dec 2024 14:38:12 -0300 Subject: [PATCH 16/16] fix: wait for parquet export to finish --- scripts/import-data/export-parquet.ts | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/scripts/import-data/export-parquet.ts b/scripts/import-data/export-parquet.ts index 4e041542..6b504042 100644 --- a/scripts/import-data/export-parquet.ts +++ b/scripts/import-data/export-parquet.ts @@ -111,4 +111,28 @@ args.forEach((arg, index) => { console.log( `Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, ); + + let isComplete = false; + + while (!isComplete) { + const response = await fetchWithRetry( + `${ARIO_ENDPOINT}/ar-io/admin/export-parquet/status`, + { + method: 'GET', + headers: { + Authorization: `Bearer ${ADMIN_KEY}`, + }, + }, + ); + + const data = await response.json(); + isComplete = data.status === 'completed'; + + if (isComplete) { + console.log('Parque export finished!'); + console.log(data); + } else { + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + } })();