diff --git a/.gitignore b/.gitignore
index 4f69103f..51c23f50 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,9 @@
/vendor
/wallets
test-results.xml
+/scripts/import-data/bundles
+/scripts/import-data/transactions
+/scripts/import-data/parquet
# Generated docs
/docs/sqlite/bundles
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 3d10bf7e..6a2bf532 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -108,6 +108,7 @@ services:
- ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-}
- BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-}
- CLICKHOUSE_URL=${CLICKHOUSE_URL:-}
+ - BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-}
networks:
- ar-io-network
depends_on:
diff --git a/scripts/import-data/count-fetched-ids.ts b/scripts/import-data/count-fetched-ids.ts
new file mode 100644
index 00000000..69a3c851
--- /dev/null
+++ b/scripts/import-data/count-fetched-ids.ts
@@ -0,0 +1,134 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import * as fs from 'node:fs/promises';
+import path from 'node:path';
+import { fileURLToPath } from 'node:url';
+import { getFilesInRange } from './utils.js';
+const args = process.argv.slice(2);
+const __filename = fileURLToPath(import.meta.url);
+const __dirname = path.dirname(__filename);
+
+let TRANSACTIONS_DIR = path.join(__dirname, 'transactions');
+let BUNDLES_DIR = path.join(__dirname, 'bundles');
+let MIN_BLOCK_HEIGHT = 0;
+let MAX_BLOCK_HEIGHT = Infinity;
+
+args.forEach((arg, index) => {
+ switch (arg) {
+ case '--transactionsDir':
+ if (args[index + 1]) {
+ TRANSACTIONS_DIR = args[index + 1];
+ } else {
+ console.error('Missing value for --transactionsDir');
+ process.exit(1);
+ }
+ break;
+ case '--bundlesDir':
+ if (args[index + 1]) {
+ BUNDLES_DIR = args[index + 1];
+ } else {
+ console.error('Missing value for --bundlesDir');
+ process.exit(1);
+ }
+ break;
+ case '--minHeight':
+ if (args[index + 1]) {
+ MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --minHeight');
+ process.exit(1);
+ }
+ break;
+ case '--maxHeight':
+ if (args[index + 1]) {
+ MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --maxHeight');
+ process.exit(1);
+ }
+ break;
+ default:
+ break;
+ }
+});
+
+const countIds = async ({
+ folder,
+ files,
+}: {
+ folder: string;
+ files: string[];
+}) => {
+ let counter = 0;
+ for (const file of files) {
+ const filePath = path.join(folder, file);
+ const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[];
+ counter += ids.length;
+ }
+ return counter;
+};
+
+(async () => {
+ const transactionFiles = await getFilesInRange({
+ folder: TRANSACTIONS_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+ const bundleFiles = await getFilesInRange({
+ folder: BUNDLES_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+
+ console.log({ transactionFiles, bundleFiles });
+
+ if (transactionFiles.length > 0) {
+ const firstTransactionHeight = parseInt(
+ transactionFiles[0].split('.')[0],
+ 10,
+ );
+ const lastTransactionHeight = parseInt(
+ transactionFiles[transactionFiles.length - 1].split('.')[0],
+ 10,
+ );
+ const transactionCount = await countIds({
+ folder: TRANSACTIONS_DIR,
+ files: transactionFiles,
+ });
+
+ console.log(
+ `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`,
+ );
+ }
+
+ if (bundleFiles.length > 0) {
+ const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10);
+ const lastBundleHeight = parseInt(
+ bundleFiles[bundleFiles.length - 1].split('.')[0],
+ 10,
+ );
+ const bundleCount = await countIds({
+ folder: BUNDLES_DIR,
+ files: bundleFiles,
+ });
+
+ console.log(
+ `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`,
+ );
+ }
+})();
diff --git a/scripts/import-data/export-parquet.ts b/scripts/import-data/export-parquet.ts
new file mode 100644
index 00000000..6b504042
--- /dev/null
+++ b/scripts/import-data/export-parquet.ts
@@ -0,0 +1,138 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import path from 'node:path';
+import { fileURLToPath } from 'node:url';
+import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js';
+const args = process.argv.slice(2);
+const __filename = fileURLToPath(import.meta.url);
+const __dirname = path.dirname(__filename);
+
+let ARIO_ENDPOINT = 'http://localhost:4000';
+let ADMIN_KEY: string | undefined;
+let OUTPUT_DIR = path.join(__dirname, 'parquet');
+let MAX_FILE_ROWS = 1_000_000;
+let MIN_BLOCK_HEIGHT = 0;
+let MAX_BLOCK_HEIGHT: number | undefined;
+
+args.forEach((arg, index) => {
+ switch (arg) {
+ case '--adminKey':
+ if (args[index + 1]) {
+ ADMIN_KEY = args[index + 1];
+ } else {
+ console.error('Missing value for --adminKey');
+ process.exit(1);
+ }
+ break;
+ case '--arioNode':
+ if (args[index + 1]) {
+ ARIO_ENDPOINT = args[index + 1];
+ } else {
+ console.error('Missing value for --arioNode');
+ process.exit(1);
+ }
+ break;
+ case '--outputDir':
+ if (args[index + 1]) {
+ OUTPUT_DIR = args[index + 1];
+ } else {
+ console.error('Missing value for --outputDir');
+ process.exit(1);
+ }
+ break;
+ case '--minHeight':
+ if (args[index + 1]) {
+ MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --minHeight');
+ process.exit(1);
+ }
+ break;
+ case '--maxHeight':
+ if (args[index + 1]) {
+ MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --maxHeight');
+ process.exit(1);
+ }
+ break;
+
+ case '--maxFileRows':
+ if (args[index + 1]) {
+ MAX_FILE_ROWS = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --maxFileRows');
+ process.exit(1);
+ }
+ break;
+ default:
+ break;
+ }
+});
+
+(async () => {
+ if (ADMIN_KEY === undefined) {
+ throw new Error('Missing admin key');
+ }
+
+ if (MAX_BLOCK_HEIGHT === undefined) {
+ MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight();
+ }
+
+ await fetchWithRetry(`${ARIO_ENDPOINT}/ar-io/admin/export-parquet`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${ADMIN_KEY}`,
+ },
+ body: JSON.stringify({
+ outputDir: OUTPUT_DIR,
+ startHeight: MIN_BLOCK_HEIGHT,
+ endHeight: MAX_BLOCK_HEIGHT,
+ maxFileRows: MAX_FILE_ROWS,
+ }),
+ });
+
+ console.log(
+ `Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`,
+ );
+
+ let isComplete = false;
+
+ while (!isComplete) {
+ const response = await fetchWithRetry(
+ `${ARIO_ENDPOINT}/ar-io/admin/export-parquet/status`,
+ {
+ method: 'GET',
+ headers: {
+ Authorization: `Bearer ${ADMIN_KEY}`,
+ },
+ },
+ );
+
+ const data = await response.json();
+ isComplete = data.status === 'completed';
+
+ if (isComplete) {
+ console.log('Parque export finished!');
+ console.log(data);
+ } else {
+ await new Promise((resolve) => setTimeout(resolve, 5000));
+ }
+ }
+})();
diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts
new file mode 100644
index 00000000..fe4fd2ce
--- /dev/null
+++ b/scripts/import-data/fetch-data-gql.ts
@@ -0,0 +1,401 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import * as fs from 'node:fs/promises';
+import path from 'node:path';
+import { fileURLToPath } from 'node:url';
+import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js';
+const args = process.argv.slice(2);
+const __filename = fileURLToPath(import.meta.url);
+const __dirname = path.dirname(__filename);
+
+let GQL_ENDPOINT = 'https://arweave-search.goldsky.com/graphql';
+let MIN_BLOCK_HEIGHT = 0;
+let MAX_BLOCK_HEIGHT: number | undefined;
+let BLOCK_RANGE_SIZE = 100;
+let BUNDLES_FETCH_ROOT_TX = true;
+let GQL_TAGS = `[
+ {
+ name: "App-Name"
+ values: [
+ "ArDrive-App"
+ "ArDrive-Web"
+ "ArDrive-CLI"
+ "ArDrive-Desktop"
+ "ArDrive-Mobile"
+ "ArDrive-Core"
+ "ArDrive-Sync"
+ ]
+ }
+]`;
+
+args.forEach((arg, index) => {
+ switch (arg) {
+ case '--gqlEndpoint':
+ if (args[index + 1]) {
+ GQL_ENDPOINT = args[index + 1];
+ } else {
+ console.error('Missing value for --gqlEndpoint');
+ process.exit(1);
+ }
+ break;
+ case '--minHeight':
+ if (args[index + 1]) {
+ MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --minHeight');
+ process.exit(1);
+ }
+ break;
+ case '--maxHeight':
+ if (args[index + 1]) {
+ MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --maxHeight');
+ process.exit(1);
+ }
+ break;
+ case '--blockRangeSize':
+ if (args[index + 1]) {
+ BLOCK_RANGE_SIZE = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --blockRangeSize');
+ process.exit(1);
+ }
+ break;
+ case '--fetchOnlyRootTx':
+ if (args[index + 1]) {
+ BUNDLES_FETCH_ROOT_TX = args[index + 1] === 'true';
+ } else {
+ console.error('Missing value for --fetchOnlyRootTx');
+ process.exit(1);
+ }
+ break;
+ case '--gqlTags':
+ if (args[index + 1]) {
+ GQL_TAGS = args[index + 1];
+ } else {
+ console.error('Missing value for --gqlTags');
+ process.exit(1);
+ }
+ break;
+ default:
+ break;
+ }
+});
+
+type BlockRange = { min: number; max: number };
+const getBlockRanges = ({
+ minBlock,
+ maxBlock,
+ rangeSize,
+}: {
+ minBlock: number;
+ maxBlock: number;
+ rangeSize: number;
+}) => {
+ if (minBlock >= maxBlock || rangeSize <= 0) {
+ throw new Error(
+ 'Invalid input: ensure minBlock < maxBlock and rangeSize > 0',
+ );
+ }
+
+ const ranges: BlockRange[] = [];
+ let currentMin = minBlock;
+
+ while (currentMin < maxBlock) {
+ const currentMax = Math.min(currentMin + rangeSize - 1, maxBlock);
+ ranges.push({ min: currentMin, max: currentMax });
+ currentMin = currentMax + 1;
+ }
+
+ return ranges;
+};
+
+const txsGqlQuery = ({
+ minBlock,
+ maxBlock,
+ cursor,
+}: {
+ minBlock: number;
+ maxBlock: number;
+ cursor?: string;
+}) => `
+query {
+ transactions(
+ block: {
+ min: ${minBlock}
+ max: ${maxBlock}
+ }
+ tags: ${GQL_TAGS}
+ first: 100
+ sort: HEIGHT_ASC
+ after: "${cursor !== undefined ? cursor : ''}"
+ ) {
+ pageInfo {
+ hasNextPage
+ }
+ edges {
+ cursor
+ node {
+ id
+ bundledIn {
+ id
+ }
+ block {
+ height
+ }
+ }
+ }
+ }
+}
+`;
+
+const rootTxGqlQuery = (txId: string) => `
+query {
+ transaction(
+ id: "${txId}"
+ ) {
+ bundledIn {
+ id
+ }
+ }
+}
+`;
+
+const getRootTxId = async (txId: string) => {
+ let rootTxId: string | undefined;
+ let currentId = txId;
+
+ while (rootTxId === undefined) {
+ const response = await fetchWithRetry(GQL_ENDPOINT, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ query: rootTxGqlQuery(currentId),
+ }),
+ });
+
+ const result = await response.json();
+
+ if (result.errors) {
+ throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`);
+ }
+
+ const { data } = result;
+
+ if (data.transaction === null) {
+ console.warn("Can't get any results while fetching rootTxId for", txId);
+ return;
+ }
+
+ const bundleId = data.transaction.bundledIn?.id;
+
+ if (bundleId === undefined) {
+ rootTxId = currentId;
+ } else {
+ currentId = bundleId;
+ }
+ }
+
+ return rootTxId;
+};
+
+const fetchGql = async ({
+ minBlock,
+ maxBlock,
+ cursor,
+}: {
+ minBlock: number;
+ maxBlock: number;
+ cursor?: string;
+}) => {
+ const response = await fetchWithRetry(GQL_ENDPOINT, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ query: txsGqlQuery({ minBlock, maxBlock, cursor }),
+ }),
+ });
+ const result = await response.json();
+ if (result.errors) {
+ throw new Error(`GraphQL error: ${JSON.stringify(result.errors)}`);
+ }
+ const { data } = result;
+ return data;
+};
+
+const txsMissingRootTx = new Set();
+
+type BlockTransactions = Map>;
+const getTransactionsForRange = async ({ min, max }: BlockRange) => {
+ let cursor: string | undefined;
+ let hasNextPage = true;
+ const transactions: BlockTransactions = new Map();
+ const bundles: BlockTransactions = new Map();
+ const rootTxIdsForBundles: Map = new Map();
+
+ while (hasNextPage) {
+ const {
+ transactions: { edges, pageInfo },
+ } = await fetchGql({
+ minBlock: min,
+ maxBlock: max,
+ cursor,
+ });
+
+ hasNextPage = pageInfo.hasNextPage;
+ cursor = hasNextPage ? edges[edges.length - 1].cursor : undefined;
+
+ for (const edge of edges) {
+ const blockHeight = edge.node.block.height;
+ const bundleId = edge.node.bundledIn?.id;
+ const id = edge.node.id;
+
+ if (!transactions.has(blockHeight)) {
+ transactions.set(blockHeight, new Set());
+ }
+ if (!bundles.has(blockHeight)) {
+ bundles.set(blockHeight, new Set());
+ }
+
+ if (bundleId !== undefined) {
+ if (BUNDLES_FETCH_ROOT_TX) {
+ const cachedRootTxId = rootTxIdsForBundles.get(bundleId);
+ const rootTxId = cachedRootTxId ?? (await getRootTxId(bundleId));
+
+ if (rootTxId === undefined) {
+ txsMissingRootTx.add(bundleId);
+ } else {
+ rootTxIdsForBundles.set(bundleId, rootTxId);
+ bundles.get(blockHeight)?.add(rootTxId);
+ }
+ } else {
+ bundles.get(blockHeight)?.add(bundleId);
+ }
+ } else {
+ transactions.get(blockHeight)?.add(id);
+ }
+ }
+ }
+
+ return { transactions, bundles };
+};
+
+const writeTransactionsToFile = async ({
+ outputDir,
+ transactions,
+}: {
+ outputDir: string;
+ transactions: BlockTransactions;
+}) => {
+ try {
+ await fs.mkdir(outputDir, { recursive: true });
+ } catch (error) {
+ console.error(`Failed to create directory: ${error}`);
+ throw error;
+ }
+
+ for (const [height, ids] of transactions.entries()) {
+ if (ids.size === 0) continue;
+
+ const content = JSON.stringify([...ids]);
+ const filePath = path.join(outputDir, `${height}.json`);
+
+ try {
+ await fs.writeFile(filePath, content);
+ } catch (error) {
+ console.error(`Failed to write ${filePath}: ${error}`);
+ throw error;
+ }
+ }
+};
+
+const countTransactions = (map: BlockTransactions) => {
+ let total = 0;
+ map.forEach((set) => {
+ total += set.size;
+ });
+ return total;
+};
+
+(async () => {
+ if (MAX_BLOCK_HEIGHT === undefined) {
+ MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight();
+ }
+
+ const blockRanges = getBlockRanges({
+ minBlock: MIN_BLOCK_HEIGHT,
+ maxBlock: MAX_BLOCK_HEIGHT,
+ rangeSize: BLOCK_RANGE_SIZE,
+ });
+
+ console.log(
+ `Starting to fetch transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`,
+ );
+
+ let txCount = 0;
+ let bundleCount = 0;
+
+ for (const range of blockRanges) {
+ const { transactions, bundles } = await getTransactionsForRange(range);
+
+ await writeTransactionsToFile({
+ outputDir: path.join(__dirname, 'transactions'),
+ transactions,
+ });
+ await writeTransactionsToFile({
+ outputDir: path.join(__dirname, 'bundles'),
+ transactions: bundles,
+ });
+
+ txCount += countTransactions(transactions);
+ bundleCount += countTransactions(bundles);
+
+ if (transactions.size > 0 || bundles.size > 0) {
+ console.log(
+ `Transactions and bundles from block ${range.min} to ${range.max} saved!`,
+ );
+ console.log(
+ `Saved transactions: ${txCount}, Saved bundles: ${bundleCount}`,
+ );
+ }
+
+ if (txsMissingRootTx.size > 0) {
+ console.warn(
+ `Failed to fetch rootTxId for ${txsMissingRootTx.size} transactions`,
+ );
+
+ const txMissingRootTxPath = path.join(
+ __dirname,
+ 'txs-missing-root-tx-id.json',
+ );
+ await fs.writeFile(
+ txMissingRootTxPath,
+ JSON.stringify([...txsMissingRootTx]),
+ );
+
+ console.log(
+ 'Transactions missing root transaction id were written to txs-missing-root-tx-id.json file',
+ );
+ }
+ }
+})();
diff --git a/scripts/import-data/import-data.ts b/scripts/import-data/import-data.ts
new file mode 100644
index 00000000..57f30eb9
--- /dev/null
+++ b/scripts/import-data/import-data.ts
@@ -0,0 +1,217 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import * as fs from 'node:fs/promises';
+import path from 'node:path';
+import { fileURLToPath } from 'node:url';
+import {
+ fetchLatestBlockHeight,
+ fetchWithRetry,
+ getFilesInRange,
+} from './utils.js';
+const args = process.argv.slice(2);
+const __filename = fileURLToPath(import.meta.url);
+const __dirname = path.dirname(__filename);
+
+let ARIO_ENDPOINT = 'http://localhost:4000';
+let ADMIN_KEY: string | undefined;
+let MIN_BLOCK_HEIGHT = 0;
+let MAX_BLOCK_HEIGHT: number | undefined;
+let TRANSACTIONS_DIR = path.join(__dirname, 'transactions');
+let BUNDLES_DIR = path.join(__dirname, 'bundles');
+
+type ImportType = 'transaction' | 'bundle' | 'both';
+let IMPORT_TYPE: ImportType | undefined;
+
+args.forEach((arg, index) => {
+ switch (arg) {
+ case '--adminKey':
+ if (args[index + 1]) {
+ ADMIN_KEY = args[index + 1];
+ } else {
+ console.error('Missing value for --adminKey');
+ process.exit(1);
+ }
+ break;
+ case '--arioNode':
+ if (args[index + 1]) {
+ ARIO_ENDPOINT = args[index + 1];
+ } else {
+ console.error('Missing value for --arioNode');
+ process.exit(1);
+ }
+ break;
+ case '--minHeight':
+ if (args[index + 1]) {
+ MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --minHeight');
+ process.exit(1);
+ }
+ break;
+ case '--maxHeight':
+ if (args[index + 1]) {
+ MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
+ } else {
+ console.error('Missing value for --maxHeight');
+ process.exit(1);
+ }
+ break;
+ case '--transactionsDir':
+ if (args[index + 1]) {
+ TRANSACTIONS_DIR = args[index + 1];
+ } else {
+ console.error('Missing value for --transactionsDir');
+ process.exit(1);
+ }
+ break;
+ case '--bundlesDir':
+ if (args[index + 1]) {
+ BUNDLES_DIR = args[index + 1];
+ } else {
+ console.error('Missing value for --bundlesDir');
+ process.exit(1);
+ }
+ break;
+ case '--importType': {
+ const importType = args[index + 1];
+ if (
+ importType === 'transaction' ||
+ importType === 'bundle' ||
+ importType === 'both'
+ ) {
+ IMPORT_TYPE = importType;
+ } else {
+ console.error('Missing value for --importType');
+ process.exit(1);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+});
+
+const importFromFiles = async ({
+ files,
+ type,
+}: {
+ files: string[];
+ type: 'transactions' | 'bundles';
+}) => {
+ let counter = 0;
+ let folder: string;
+ let endpoint: string;
+ switch (type) {
+ case 'transactions':
+ folder = TRANSACTIONS_DIR;
+ endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`;
+ break;
+ case 'bundles':
+ folder = BUNDLES_DIR;
+ endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`;
+ break;
+ default:
+ throw new Error('Invalid type');
+ }
+
+ for (const file of files) {
+ const filePath = path.join(folder, file);
+ const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[];
+ console.log(
+ `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`,
+ );
+
+ for (const id of ids) {
+ counter++;
+ await fetchWithRetry(endpoint, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${ADMIN_KEY}`,
+ },
+ body: JSON.stringify({ id }),
+ });
+ }
+ }
+
+ return { queued: counter };
+};
+
+(async () => {
+ if (ADMIN_KEY === undefined) {
+ throw new Error('Missing admin key');
+ }
+
+ if (MAX_BLOCK_HEIGHT === undefined) {
+ MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight();
+ }
+
+ let transactionFiles: string[] = [];
+ let bundleFiles: string[] = [];
+
+ switch (IMPORT_TYPE ?? 'both') {
+ case 'transaction':
+ transactionFiles = await getFilesInRange({
+ folder: TRANSACTIONS_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+ break;
+ case 'bundle':
+ bundleFiles = await getFilesInRange({
+ folder: BUNDLES_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+ break;
+ case 'both':
+ transactionFiles = await getFilesInRange({
+ folder: TRANSACTIONS_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+ bundleFiles = await getFilesInRange({
+ folder: BUNDLES_DIR,
+ min: MIN_BLOCK_HEIGHT,
+ max: MAX_BLOCK_HEIGHT,
+ });
+ break;
+ }
+
+ console.log(
+ `Starting to import transactions and bundles from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`,
+ );
+
+ const queuedTransactions = await importFromFiles({
+ files: transactionFiles,
+ type: 'transactions',
+ });
+
+ if (queuedTransactions.queued > 0) {
+ console.log(`Finished queueing ${queuedTransactions.queued} transactions`);
+ }
+
+ const queuedBundles = await importFromFiles({
+ files: bundleFiles,
+ type: 'bundles',
+ });
+
+ if (queuedBundles.queued > 0) {
+ console.log(`Finished queueing ${queuedBundles.queued} bundles`);
+ }
+})();
diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts
new file mode 100644
index 00000000..b2e7cecc
--- /dev/null
+++ b/scripts/import-data/utils.ts
@@ -0,0 +1,99 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+import * as fs from 'node:fs/promises';
+import path from 'node:path';
+
+export const fetchWithRetry = async (
+ url: string,
+ options: RequestInit = {},
+ retries = 15,
+ retryInterval = 300, // interval in milliseconds
+): Promise => {
+ let attempt = 0;
+
+ while (attempt < retries) {
+ try {
+ const response = await fetch(url, options);
+
+ if (response.ok) {
+ return response;
+ }
+ if (response.status === 429) {
+ console.warn(
+ `Import queue is full! Waiting 30 seconds before retrying...`,
+ );
+ await new Promise((resolve) => setTimeout(resolve, 30000));
+ continue;
+ }
+
+ throw new Error(`HTTP error! status: ${response.status}`);
+ } catch (error) {
+ attempt++;
+
+ if (attempt >= retries) {
+ throw new Error(
+ `Fetch failed after ${retries} attempts: ${(error as Error).message}`,
+ );
+ }
+
+ const waitTime = retryInterval * attempt;
+ console.warn(
+ `Fetch attempt ${attempt} failed. Retrying in ${waitTime}ms...`,
+ );
+
+ await new Promise((resolve) => setTimeout(resolve, waitTime));
+ }
+ }
+
+ throw new Error('Unexpected error in fetchWithRetry');
+};
+
+export const fetchLatestBlockHeight = async () => {
+ const response = await fetchWithRetry('https://arweave.net/info', {
+ method: 'GET',
+ });
+ const { blocks } = await response.json();
+ return blocks as number;
+};
+
+export const getFilesInRange = async ({
+ folder,
+ min,
+ max,
+}: {
+ folder: string;
+ min: number;
+ max: number;
+}): Promise => {
+ try {
+ const files = await fs.readdir(folder);
+
+ const filesInRange = files
+ .filter((file) => path.extname(file) === '.json')
+ .filter((file) => {
+ const match = file.match(/^\d+/);
+ const number = match ? parseInt(match[0], 10) : null;
+ return number !== null && number >= min && number <= max;
+ });
+
+ return filesInRange;
+ } catch (error) {
+ throw new Error(`Error processing files: ${(error as Error).message}`);
+ }
+};
diff --git a/src/config.ts b/src/config.ts
index a0feac1d..fba4d2e8 100644
--- a/src/config.ts
+++ b/src/config.ts
@@ -198,6 +198,12 @@ export const MAX_DATA_ITEM_QUEUE_SIZE = +env.varOrDefault(
'100000',
);
+// The maximum number of bundles to queue for unbundling before skipping
+export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault(
+ 'BUNDLE_DATA_IMPORTER_QUEUE_SIZE',
+ '1000',
+);
+
//
// Verification
//
diff --git a/src/routes/ar-io.ts b/src/routes/ar-io.ts
index cbe5aa58..067fcf3a 100644
--- a/src/routes/ar-io.ts
+++ b/src/routes/ar-io.ts
@@ -210,6 +210,11 @@ arIoRouter.post(
return;
}
+ if (await system.bundleDataImporter.isQueueFull()) {
+ res.status(429).send('Bundle importer queue is full');
+ return;
+ }
+
system.eventEmitter.emit(events.ANS104_BUNDLE_QUEUED, {
id,
root_tx_id: id,
diff --git a/src/system.ts b/src/system.ts
index 65c1560e..b802057e 100644
--- a/src/system.ts
+++ b/src/system.ts
@@ -469,11 +469,12 @@ metrics.registerQueueLengthGauge('ans104Unbundler', {
length: () => ans104Unbundler.queueDepth(),
});
-const bundleDataImporter = new BundleDataImporter({
+export const bundleDataImporter = new BundleDataImporter({
log,
contiguousDataSource: backgroundContiguousDataSource,
ans104Unbundler,
workerCount: config.ANS104_DOWNLOAD_WORKERS,
+ maxQueueSize: config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE,
});
metrics.registerQueueLengthGauge('bundleDataImporter', {
length: () => bundleDataImporter.queueDepth(),
diff --git a/src/workers/bundle-data-importer.ts b/src/workers/bundle-data-importer.ts
index 8ef9741d..de4d253b 100644
--- a/src/workers/bundle-data-importer.ts
+++ b/src/workers/bundle-data-importer.ts
@@ -25,8 +25,7 @@ import {
NormalizedDataItem,
PartialJsonTransaction,
} from '../types.js';
-
-const DEFAULT_MAX_QUEUE_SIZE = 1000;
+import * as config from '../config.js';
interface IndexProperty {
index: number;
@@ -56,7 +55,7 @@ export class BundleDataImporter {
contiguousDataSource,
ans104Unbundler,
workerCount,
- maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
+ maxQueueSize = config.BUNDLE_DATA_IMPORTER_QUEUE_SIZE,
}: {
log: winston.Logger;
contiguousDataSource: ContiguousDataSource;
@@ -138,4 +137,8 @@ export class BundleDataImporter {
queueDepth(): number {
return this.queue.length();
}
+
+ async isQueueFull(): Promise {
+ return this.queue.length() >= this.maxQueueSize;
+ }
}
diff --git a/tsconfig.json b/tsconfig.json
index 70473ac3..ada5b8b8 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -25,5 +25,5 @@
"swc": true,
"esm": true
},
- "include": ["src", "test"]
+ "include": ["src", "test", "scripts"]
}