diff --git a/.gitignore b/.gitignore index 4f69103f..51c23f50 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,9 @@ /vendor /wallets test-results.xml +/scripts/import-data/bundles +/scripts/import-data/transactions +/scripts/import-data/parquet # Generated docs /docs/sqlite/bundles diff --git a/docker-compose.yaml b/docker-compose.yaml index 3d10bf7e..6a2bf532 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -108,6 +108,7 @@ services: - ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-} - BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-} - CLICKHOUSE_URL=${CLICKHOUSE_URL:-} + - BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-} networks: - ar-io-network depends_on: diff --git a/scripts/import-data/count-fetched-ids.ts b/scripts/import-data/count-fetched-ids.ts new file mode 100644 index 00000000..69a3c851 --- /dev/null +++ b/scripts/import-data/count-fetched-ids.ts @@ -0,0 +1,134 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { getFilesInRange } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let TRANSACTIONS_DIR = path.join(__dirname, 'transactions'); +let BUNDLES_DIR = path.join(__dirname, 'bundles'); +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT = Infinity; + +args.forEach((arg, index) => { + switch (arg) { + case '--transactionsDir': + if (args[index + 1]) { + TRANSACTIONS_DIR = args[index + 1]; + } else { + console.error('Missing value for --transactionsDir'); + process.exit(1); + } + break; + case '--bundlesDir': + if (args[index + 1]) { + BUNDLES_DIR = args[index + 1]; + } else { + console.error('Missing value for --bundlesDir'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + default: + break; + } +}); + +const countIds = async ({ + folder, + files, +}: { + folder: string; + files: string[]; +}) => { + let counter = 0; + for (const file of files) { + const filePath = path.join(folder, file); + const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; + counter += ids.length; + } + return counter; +}; + +(async () => { + const transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + const bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + + console.log({ transactionFiles, bundleFiles }); + + if (transactionFiles.length > 0) { + const firstTransactionHeight = parseInt( + transactionFiles[0].split('.')[0], + 10, + ); + const lastTransactionHeight = parseInt( + transactionFiles[transactionFiles.length - 1].split('.')[0], + 10, + ); + const transactionCount = await countIds({ + folder: TRANSACTIONS_DIR, + files: transactionFiles, + }); + + console.log( + `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, + ); + } + + if (bundleFiles.length > 0) { + const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); + const lastBundleHeight = parseInt( + bundleFiles[bundleFiles.length - 1].split('.')[0], + 10, + ); + const bundleCount = await countIds({ + folder: BUNDLES_DIR, + files: bundleFiles, + }); + + console.log( + `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, + ); + } +})(); diff --git a/scripts/import-data/export-parquet.ts b/scripts/import-data/export-parquet.ts new file mode 100644 index 00000000..6b504042 --- /dev/null +++ b/scripts/import-data/export-parquet.ts @@ -0,0 +1,138 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let ARIO_ENDPOINT = 'http://localhost:4000'; +let ADMIN_KEY: string | undefined; +let OUTPUT_DIR = path.join(__dirname, 'parquet'); +let MAX_FILE_ROWS = 1_000_000; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; + +args.forEach((arg, index) => { + switch (arg) { + case '--adminKey': + if (args[index + 1]) { + ADMIN_KEY = args[index + 1]; + } else { + console.error('Missing value for --adminKey'); + process.exit(1); + } + break; + case '--arioNode': + if (args[index + 1]) { + ARIO_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --arioNode'); + process.exit(1); + } + break; + case '--outputDir': + if (args[index + 1]) { + OUTPUT_DIR = args[index + 1]; + } else { + console.error('Missing value for --outputDir'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + + case '--maxFileRows': + if (args[index + 1]) { + MAX_FILE_ROWS = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxFileRows'); + process.exit(1); + } + break; + default: + break; + } +}); + +(async () => { + if (ADMIN_KEY === undefined) { + throw new Error('Missing admin key'); + } + + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + await fetchWithRetry(`${ARIO_ENDPOINT}/ar-io/admin/export-parquet`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${ADMIN_KEY}`, + }, + body: JSON.stringify({ + outputDir: OUTPUT_DIR, + startHeight: MIN_BLOCK_HEIGHT, + endHeight: MAX_BLOCK_HEIGHT, + maxFileRows: MAX_FILE_ROWS, + }), + }); + + console.log( + `Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); + + let isComplete = false; + + while (!isComplete) { + const response = await fetchWithRetry( + `${ARIO_ENDPOINT}/ar-io/admin/export-parquet/status`, + { + method: 'GET', + headers: { + Authorization: `Bearer ${ADMIN_KEY}`, + }, + }, + ); + + const data = await response.json(); + isComplete = data.status === 'completed'; + + if (isComplete) { + console.log('Parque export finished!'); + console.log(data); + } else { + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + } +})(); diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts new file mode 100644 index 00000000..fe4fd2ce --- /dev/null +++ b/scripts/import-data/fetch-data-gql.ts @@ -0,0 +1,401 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let GQL_ENDPOINT = 'https://arweave-search.goldsky.com/graphql'; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; +let BLOCK_RANGE_SIZE = 100; +let BUNDLES_FETCH_ROOT_TX = true; +let GQL_TAGS = `[ + { + name: "App-Name" + values: [ + "ArDrive-App" + "ArDrive-Web" + "ArDrive-CLI" + "ArDrive-Desktop" + "ArDrive-Mobile" + "ArDrive-Core" + "ArDrive-Sync" + ] + } +]`; + +args.forEach((arg, index) => { + switch (arg) { + case '--gqlEndpoint': + if (args[index + 1]) { + GQL_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --gqlEndpoint'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + case '--blockRangeSize': + if (args[index + 1]) { + BLOCK_RANGE_SIZE = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --blockRangeSize'); + process.exit(1); + } + break; + case '--fetchOnlyRootTx': + if (args[index + 1]) { + BUNDLES_FETCH_ROOT_TX = args[index + 1] === 'true'; + } else { + console.error('Missing value for --fetchOnlyRootTx'); + process.exit(1); + } + break; + case '--gqlTags': + if (args[index + 1]) { + GQL_TAGS = args[index + 1]; + } else { + console.error('Missing value for --gqlTags'); + process.exit(1); + } + break; + default: + break; + } +}); + +type BlockRange = { min: number; max: number }; +const getBlockRanges = ({ + minBlock, + maxBlock, + rangeSize, +}: { + minBlock: number; + maxBlock: number; + rangeSize: number; +}) => { + if (minBlock >= maxBlock || rangeSize <= 0) { + throw new Error( + 'Invalid input: ensure minBlock < maxBlock and rangeSize > 0', + ); + } + + const ranges: BlockRange[] = []; + let currentMin = minBlock; + + while (currentMin < maxBlock) { + const currentMax = Math.min(currentMin + rangeSize - 1, maxBlock); + ranges.push({ min: currentMin, max: currentMax }); + currentMin = currentMax + 1; + } + + return ranges; +}; + +const txsGqlQuery = ({ + minBlock, + maxBlock, + cursor, +}: { + minBlock: number; + maxBlock: number; + cursor?: string; +}) => ` +query { + transactions( + block: { + min: ${minBlock} + max: ${maxBlock} + } + tags: ${GQL_TAGS} + first: 100 + sort: HEIGHT_ASC + after: "${cursor !== undefined ? cursor : ''}" + ) { + pageInfo { + hasNextPage + } + edges { + cursor + node { + id + bundledIn { + id + } + block { + height + } + } + } + } +} +`; + +const rootTxGqlQuery = (txId: string) => ` +query { + transaction( + id: "${txId}" + ) { + bundledIn { + id + } + } +} +`; + +const getRootTxId = async (txId: string) => { + let rootTxId: string | undefined; + let currentId = txId; + + while (rootTxId === undefined) { + const response = await fetchWithRetry(GQL_ENDPOINT, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: rootTxGqlQuery(currentId), + }), + }); + + const result = await response.json(); + + if (result.errors) { + throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`); + } + + const { data } = result; + + if (data.transaction === null) { + console.warn("Can't get any results while fetching rootTxId for", txId); + return; + } + + const bundleId = data.transaction.bundledIn?.id; + + if (bundleId === undefined) { + rootTxId = currentId; + } else { + currentId = bundleId; + } + } + + return rootTxId; +}; + +const fetchGql = async ({ + minBlock, + maxBlock, + cursor, +}: { + minBlock: number; + maxBlock: number; + cursor?: string; +}) => { + const response = await fetchWithRetry(GQL_ENDPOINT, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: txsGqlQuery({ minBlock, maxBlock, cursor }), + }), + }); + const result = await response.json(); + if (result.errors) { + throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`); + } + const { data } = result; + return data; +}; + +const txsMissingRootTx = new Set(); + +type BlockTransactions = Map>; +const getTransactionsForRange = async ({ min, max }: BlockRange) => { + let cursor: string | undefined; + let hasNextPage = true; + const transactions: BlockTransactions = new Map(); + const bundles: BlockTransactions = new Map(); + const rootTxIdsForBundles: Map = new Map(); + + while (hasNextPage) { + const { + transactions: { edges, pageInfo }, + } = await fetchGql({ + minBlock: min, + maxBlock: max, + cursor, + }); + + hasNextPage = pageInfo.hasNextPage; + cursor = hasNextPage ? edges[edges.length - 1].cursor : undefined; + + for (const edge of edges) { + const blockHeight = edge.node.block.height; + const bundleId = edge.node.bundledIn?.id; + const id = edge.node.id; + + if (!transactions.has(blockHeight)) { + transactions.set(blockHeight, new Set()); + } + if (!bundles.has(blockHeight)) { + bundles.set(blockHeight, new Set()); + } + + if (bundleId !== undefined) { + if (BUNDLES_FETCH_ROOT_TX) { + const cachedRootTxId = rootTxIdsForBundles.get(bundleId); + const rootTxId = cachedRootTxId ?? (await getRootTxId(bundleId)); + + if (rootTxId === undefined) { + txsMissingRootTx.add(bundleId); + } else { + rootTxIdsForBundles.set(bundleId, rootTxId); + bundles.get(blockHeight)?.add(rootTxId); + } + } else { + bundles.get(blockHeight)?.add(bundleId); + } + } else { + transactions.get(blockHeight)?.add(id); + } + } + } + + return { transactions, bundles }; +}; + +const writeTransactionsToFile = async ({ + outputDir, + transactions, +}: { + outputDir: string; + transactions: BlockTransactions; +}) => { + try { + await fs.mkdir(outputDir, { recursive: true }); + } catch (error) { + console.error(`Failed to create directory: ${error}`); + throw error; + } + + for (const [height, ids] of transactions.entries()) { + if (ids.size === 0) continue; + + const content = JSON.stringify([...ids]); + const filePath = path.join(outputDir, `${height}.json`); + + try { + await fs.writeFile(filePath, content); + } catch (error) { + console.error(`Failed to write ${filePath}: ${error}`); + throw error; + } + } +}; + +const countTransactions = (map: BlockTransactions) => { + let total = 0; + map.forEach((set) => { + total += set.size; + }); + return total; +}; + +(async () => { + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + const blockRanges = getBlockRanges({ + minBlock: MIN_BLOCK_HEIGHT, + maxBlock: MAX_BLOCK_HEIGHT, + rangeSize: BLOCK_RANGE_SIZE, + }); + + console.log( + `Starting to fetch transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); + + let txCount = 0; + let bundleCount = 0; + + for (const range of blockRanges) { + const { transactions, bundles } = await getTransactionsForRange(range); + + await writeTransactionsToFile({ + outputDir: path.join(__dirname, 'transactions'), + transactions, + }); + await writeTransactionsToFile({ + outputDir: path.join(__dirname, 'bundles'), + transactions: bundles, + }); + + txCount += countTransactions(transactions); + bundleCount += countTransactions(bundles); + + if (transactions.size > 0 || bundles.size > 0) { + console.log( + `Transactions and bundles from block ${range.min} to ${range.max} saved!`, + ); + console.log( + `Saved transactions: ${txCount}, Saved bundles: ${bundleCount}`, + ); + } + + if (txsMissingRootTx.size > 0) { + console.warn( + `Failed to fetch rootTxId for ${txsMissingRootTx.size} transactions`, + ); + + const txMissingRootTxPath = path.join( + __dirname, + 'txs-missing-root-tx-id.json', + ); + await fs.writeFile( + txMissingRootTxPath, + JSON.stringify([...txsMissingRootTx]), + ); + + console.log( + 'Transactions missing root transaction id were written to txs-missing-root-tx-id.json file', + ); + } + } +})(); diff --git a/scripts/import-data/import-data.ts b/scripts/import-data/import-data.ts new file mode 100644 index 00000000..57f30eb9 --- /dev/null +++ b/scripts/import-data/import-data.ts @@ -0,0 +1,217 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as fs from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { + fetchLatestBlockHeight, + fetchWithRetry, + getFilesInRange, +} from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let ARIO_ENDPOINT = 'http://localhost:4000'; +let ADMIN_KEY: string | undefined; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; +let TRANSACTIONS_DIR = path.join(__dirname, 'transactions'); +let BUNDLES_DIR = path.join(__dirname, 'bundles'); + +type ImportType = 'transaction' | 'bundle' | 'both'; +let IMPORT_TYPE: ImportType | undefined; + +args.forEach((arg, index) => { + switch (arg) { + case '--adminKey': + if (args[index + 1]) { + ADMIN_KEY = args[index + 1]; + } else { + console.error('Missing value for --adminKey'); + process.exit(1); + } + break; + case '--arioNode': + if (args[index + 1]) { + ARIO_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --arioNode'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + case '--transactionsDir': + if (args[index + 1]) { + TRANSACTIONS_DIR = args[index + 1]; + } else { + console.error('Missing value for --transactionsDir'); + process.exit(1); + } + break; + case '--bundlesDir': + if (args[index + 1]) { + BUNDLES_DIR = args[index + 1]; + } else { + console.error('Missing value for --bundlesDir'); + process.exit(1); + } + break; + case '--importType': { + const importType = args[index + 1]; + if ( + importType === 'transaction' || + importType === 'bundle' || + importType === 'both' + ) { + IMPORT_TYPE = importType; + } else { + console.error('Missing value for --importType'); + process.exit(1); + } + break; + } + default: + break; + } +}); + +const importFromFiles = async ({ + files, + type, +}: { + files: string[]; + type: 'transactions' | 'bundles'; +}) => { + let counter = 0; + let folder: string; + let endpoint: string; + switch (type) { + case 'transactions': + folder = TRANSACTIONS_DIR; + endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`; + break; + case 'bundles': + folder = BUNDLES_DIR; + endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`; + break; + default: + throw new Error('Invalid type'); + } + + for (const file of files) { + const filePath = path.join(folder, file); + const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; + console.log( + `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`, + ); + + for (const id of ids) { + counter++; + await fetchWithRetry(endpoint, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${ADMIN_KEY}`, + }, + body: JSON.stringify({ id }), + }); + } + } + + return { queued: counter }; +}; + +(async () => { + if (ADMIN_KEY === undefined) { + throw new Error('Missing admin key'); + } + + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + let transactionFiles: string[] = []; + let bundleFiles: string[] = []; + + switch (IMPORT_TYPE ?? 'both') { + case 'transaction': + transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + case 'bundle': + bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + case 'both': + transactionFiles = await getFilesInRange({ + folder: TRANSACTIONS_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + bundleFiles = await getFilesInRange({ + folder: BUNDLES_DIR, + min: MIN_BLOCK_HEIGHT, + max: MAX_BLOCK_HEIGHT, + }); + break; + } + + console.log( + `Starting to import transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); + + const queuedTransactions = await importFromFiles({ + files: transactionFiles, + type: 'transactions', + }); + + if (queuedTransactions.queued > 0) { + console.log(`Finished queueing ${queuedTransactions.queued} transactions`); + } + + const queuedBundles = await importFromFiles({ + files: bundleFiles, + type: 'bundles', + }); + + if (queuedBundles.queued > 0) { + console.log(`Finished queueing ${queuedBundles.queued} bundles`); + } +})(); diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts new file mode 100644 index 00000000..b2e7cecc --- /dev/null +++ b/scripts/import-data/utils.ts @@ -0,0 +1,99 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import * as fs from 'node:fs/promises'; +import path from 'node:path'; + +export const fetchWithRetry = async ( + url: string, + options: RequestInit = {}, + retries = 15, + retryInterval = 300, // interval in milliseconds +): Promise => { + let attempt = 0; + + while (attempt < retries) { + try { + const response = await fetch(url, options); + + if (response.ok) { + return response; + } + if (response.status === 429) { + console.warn( + `Import queue is full! Waiting 30 seconds before retrying...`, + ); + await new Promise((resolve) => setTimeout(resolve, 30000)); + continue; + } + + throw new Error(`HTTP error! status: ${response.status}`); + } catch (error) { + attempt++; + + if (attempt >= retries) { + throw new Error( + `Fetch failed after ${retries} attempts: ${(error as Error).message}`, + ); + } + + const waitTime = retryInterval * attempt; + console.warn( + `Fetch attempt ${attempt} failed. Retrying in ${waitTime}ms...`, + ); + + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + throw new Error('Unexpected error in fetchWithRetry'); +}; + +export const fetchLatestBlockHeight = async () => { + const response = await fetchWithRetry('https://arweave.net/info', { + method: 'GET', + }); + const { blocks } = await response.json(); + return blocks as number; +}; + +export const getFilesInRange = async ({ + folder, + min, + max, +}: { + folder: string; + min: number; + max: number; +}): Promise => { + try { + const files = await fs.readdir(folder); + + const filesInRange = files + .filter((file) => path.extname(file) === '.json') + .filter((file) => { + const match = file.match(/^\d+/); + const number = match ? parseInt(match[0], 10) : null; + return number !== null && number >= min && number <= max; + }); + + return filesInRange; + } catch (error) { + throw new Error(`Error processing files: ${(error as Error).message}`); + } +}; diff --git a/src/config.ts b/src/config.ts index a0feac1d..fba4d2e8 100644 --- a/src/config.ts +++ b/src/config.ts @@ -198,6 +198,12 @@ export const MAX_DATA_ITEM_QUEUE_SIZE = +env.varOrDefault( '100000', ); +// The maximum number of bundles to queue for unbundling before skipping +export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault( + 'BUNDLE_DATA_IMPORTER_QUEUE_SIZE', + '1000', +); + // // Verification // diff --git a/src/routes/ar-io.ts b/src/routes/ar-io.ts index cbe5aa58..067fcf3a 100644 --- a/src/routes/ar-io.ts +++ b/src/routes/ar-io.ts @@ -210,6 +210,11 @@ arIoRouter.post( return; } + if (await system.bundleDataImporter.isQueueFull()) { + res.status(429).send('Bundle importer queue is full'); + return; + } + system.eventEmitter.emit(events.ANS104_BUNDLE_QUEUED, { id, root_tx_id: id, diff --git a/src/system.ts b/src/system.ts index 65c1560e..b802057e 100644 --- a/src/system.ts +++ b/src/system.ts @@ -469,11 +469,12 @@ metrics.registerQueueLengthGauge('ans104Unbundler', { length: () => ans104Unbundler.queueDepth(), }); -const bundleDataImporter = new BundleDataImporter({ +export const bundleDataImporter = new BundleDataImporter({ log, contiguousDataSource: backgroundContiguousDataSource, ans104Unbundler, workerCount: config.ANS104_DOWNLOAD_WORKERS, + maxQueueSize: config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE, }); metrics.registerQueueLengthGauge('bundleDataImporter', { length: () => bundleDataImporter.queueDepth(), diff --git a/src/workers/bundle-data-importer.ts b/src/workers/bundle-data-importer.ts index 8ef9741d..de4d253b 100644 --- a/src/workers/bundle-data-importer.ts +++ b/src/workers/bundle-data-importer.ts @@ -25,8 +25,7 @@ import { NormalizedDataItem, PartialJsonTransaction, } from '../types.js'; - -const DEFAULT_MAX_QUEUE_SIZE = 1000; +import * as config from '../config.js'; interface IndexProperty { index: number; @@ -56,7 +55,7 @@ export class BundleDataImporter { contiguousDataSource, ans104Unbundler, workerCount, - maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, + maxQueueSize = config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE, }: { log: winston.Logger; contiguousDataSource: ContiguousDataSource; @@ -138,4 +137,8 @@ export class BundleDataImporter { queueDepth(): number { return this.queue.length(); } + + async isQueueFull(): Promise { + return this.queue.length() >= this.maxQueueSize; + } } diff --git a/tsconfig.json b/tsconfig.json index 70473ac3..ada5b8b8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -25,5 +25,5 @@ "swc": true, "esm": true }, - "include": ["src", "test"] + "include": ["src", "test", "scripts"] }