diff --git a/src/bin/bin-common.ts b/src/bin/bin-common.ts index c955f3a..231b567 100644 --- a/src/bin/bin-common.ts +++ b/src/bin/bin-common.ts @@ -44,6 +44,12 @@ const commandLineParams: { [param: string]: Params } = key: 'noWait', description: 'Use with unattended confirmation to remove the 5 second delay.', }, + maxConcurrency: { + shortKey: 'm', + key: 'maxConcurrency', + args: '', + description: `Maximum import concurrency to prevent bandwidth exhausted and other load errors. The default (0) is unlimited.`, + }, prettyPrint: { shortKey: 'p', key: 'prettyPrint', diff --git a/src/bin/firestore-clear.ts b/src/bin/firestore-clear.ts index 534a329..6f7da98 100644 --- a/src/bin/firestore-clear.ts +++ b/src/bin/firestore-clear.ts @@ -13,6 +13,7 @@ import { commandLineParams as params, packageInfo, } from './bin-common'; +import {measureTimeAsync} from "../lib/helpers"; commander.version(packageInfo.version) @@ -64,7 +65,7 @@ const noWait = commander[params.yesToNoWait.key]; await sleep(5000); } console.log(colors.bold(colors.green('Starting clearing of records 🏋️'))); - await firestoreClear(pathReference, true); + await measureTimeAsync("firestore-clear", () => firestoreClear(pathReference, true)); console.log(colors.bold(colors.green('All done 🎉'))); })().catch((error) => { if (error instanceof ActionAbortedError) { diff --git a/src/bin/firestore-export.ts b/src/bin/firestore-export.ts index 17c498a..20487be1 100644 --- a/src/bin/firestore-export.ts +++ b/src/bin/firestore-export.ts @@ -6,6 +6,7 @@ import fs from 'fs'; import {firestoreExport} from '../lib'; import {getCredentialsFromFile, getDBReferenceFromPath, getFirestoreDBReference} from '../lib/firestore-helpers'; import {accountCredentialsEnvironmentKey, buildOption, commandLineParams as params, packageInfo} from './bin-common'; +import {measureTimeAsync} from "../lib/helpers"; commander.version(packageInfo.version) .option(...buildOption(params.accountCredentialsPath)) @@ -54,9 +55,11 @@ const nodePath = commander[params.nodePath.key]; const db = getFirestoreDBReference(credentials); const pathReference = getDBReferenceFromPath(db, nodePath); console.log(colors.bold(colors.green('Starting Export 🏋️'))); - const results = await firestoreExport(pathReference, true); - const stringResults = JSON.stringify(results, undefined, prettyPrint ? 2 : undefined); - await writeResults(stringResults, backupFile); + await measureTimeAsync("firestore-export", async () => { + const results = await firestoreExport(pathReference, true); + const stringResults = JSON.stringify(results, undefined, prettyPrint ? 2 : undefined); + await writeResults(stringResults, backupFile); + }); console.log(colors.yellow(`Results were saved to ${backupFile}`)); console.log(colors.bold(colors.green('All done 🎉'))); })().catch((error) => { diff --git a/src/bin/firestore-import.ts b/src/bin/firestore-import.ts index ba246e4..f54aa6d 100644 --- a/src/bin/firestore-import.ts +++ b/src/bin/firestore-import.ts @@ -14,12 +14,14 @@ import { commandLineParams as params, packageInfo, } from './bin-common'; +import {measureTimeAsync} from "../lib/helpers"; commander.version(packageInfo.version) .option(...buildOption(params.accountCredentialsPath)) .option(...buildOption(params.backupFileImport)) .option(...buildOption(params.nodePath)) .option(...buildOption(params.yesToImport)) + .option(...buildOption(params.maxConcurrency)) .parse(process.argv); const accountCredentialsPath = commander[params.accountCredentialsPath.key] || process.env[accountCredentialsEnvironmentKey]; @@ -52,6 +54,8 @@ const nodePath = commander[params.nodePath.key]; const unattendedConfirmation = commander[params.yesToImport.key]; +const maxConcurrency = parseInt(commander[params.maxConcurrency.key]) || 0; + (async () => { const credentials = await getCredentialsFromFile(accountCredentialsPath); const db = getFirestoreDBReference(credentials); @@ -79,7 +83,7 @@ const unattendedConfirmation = commander[params.yesToImport.key]; } console.log(colors.bold(colors.green('Starting Import 🏋️'))); - await firestoreImport(data, pathReference, true, true); + await measureTimeAsync("firestore-import", () => firestoreImport(data, pathReference, true, maxConcurrency, true)); console.log(colors.bold(colors.green('All done 🎉'))); })().catch((error) => { if (error instanceof ActionAbortedError) { diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index eadcd69..628b692 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -4,6 +4,7 @@ import {IGeopoint} from '../interfaces/IGeopoint'; import {IDocumentReference} from '../interfaces/IDocumentReference'; import DocumentReference = admin.firestore.DocumentReference; import GeoPoint = admin.firestore.GeoPoint; +import Timeout = NodeJS.Timeout; // From https://stackoverflow.com/questions/8495687/split-array-into-chunks const array_chunks = (array: Array, chunk_size: number): Array> => { @@ -93,4 +94,60 @@ const isScalar = (val: any) => (typeof val === 'string' || val instanceof String || (val === null) || (typeof val === 'boolean'); -export {array_chunks, serializeSpecialTypes, unserializeSpecialTypes}; \ No newline at end of file +interface ConcurrencyLimit { + wait(): Promise; + done(): void +} + +function limitConcurrency(maxConcurrency: number = 0, interval: number = 10): ConcurrencyLimit { + if (maxConcurrency === 0) { + return { + async wait(): Promise { }, + done() { } + } + } + let unfinishedCount = 0; + let resolveQueue: Function[] = []; + let intervalId: Timeout; + let started = false; + + function start() { + started = true; + intervalId = setInterval(() => { + if (resolveQueue.length === 0) { + started = false; + clearInterval(intervalId); + return; + } + + while (unfinishedCount <= maxConcurrency && resolveQueue.length > 0) { + const resolveFn = resolveQueue.shift(); + unfinishedCount++; + if (resolveFn) resolveFn(); + } + + }, interval); + } + + return { + wait(): Promise { + return new Promise(resolve => { + if (!started) start(); + resolveQueue.push(resolve) + }); + }, + done() { + unfinishedCount--; + } + } +} + +const measureTimeAsync = async ( info: string, fn: () => Promise): Promise => { + const startTime = Date.now(); + const result = await fn(); + const timeDiff = Date.now() - startTime; + console.log(`${info} took ${timeDiff}ms`); + return result; +} + +export {array_chunks, serializeSpecialTypes, unserializeSpecialTypes, ConcurrencyLimit, limitConcurrency, measureTimeAsync}; \ No newline at end of file diff --git a/src/lib/import.ts b/src/lib/import.ts index 3aa471b..d4555c0 100644 --- a/src/lib/import.ts +++ b/src/lib/import.ts @@ -1,14 +1,16 @@ import {anyFirebaseRef, batchExecutor, isLikeDocument, isRootOfDatabase} from './firestore-helpers'; -import {array_chunks, unserializeSpecialTypes} from './helpers'; +import {array_chunks, ConcurrencyLimit, limitConcurrency, unserializeSpecialTypes} from './helpers'; import {ICollection} from '../interfaces/ICollection'; const importData = (data: any, startingRef: anyFirebaseRef, mergeWithExisting: boolean = true, + maxConcurrency: number = 0, logs = false, ): Promise => { const dataToImport = {...data}; + const writeLimit = limitConcurrency(maxConcurrency); if (isLikeDocument(startingRef)) { if (!dataToImport.hasOwnProperty('__collections__')) { throw new Error('Root or document reference doesn\'t contain a __collections__ property.'); @@ -17,7 +19,7 @@ const importData = (data: any, const collectionPromises: Array> = []; for (const collection in collections) { if (collections.hasOwnProperty(collection)) { - collectionPromises.push(setDocuments(collections[collection], startingRef.collection(collection), mergeWithExisting, logs)); + collectionPromises.push(setDocuments(collections[collection], startingRef.collection(collection), mergeWithExisting, writeLimit,logs)); } } if (isRootOfDatabase(startingRef)) { @@ -26,15 +28,15 @@ const importData = (data: any, const documentID = startingRef.id; const documentData: any = {}; documentData[documentID] = dataToImport; - const documentPromise = setDocuments(documentData, startingRef.parent, mergeWithExisting, logs); + const documentPromise = setDocuments(documentData, startingRef.parent, mergeWithExisting, writeLimit, logs); return documentPromise.then(() => batchExecutor(collectionPromises)); } } else { - return setDocuments(dataToImport, startingRef, mergeWithExisting, logs); + return setDocuments(dataToImport, startingRef, mergeWithExisting, writeLimit, logs); } }; -const setDocuments = (data: ICollection, startingRef: FirebaseFirestore.CollectionReference, mergeWithExisting: boolean = true, logs = false): Promise => { +const setDocuments = async (data: ICollection, startingRef: FirebaseFirestore.CollectionReference, mergeWithExisting: boolean = true, batchLimit: ConcurrencyLimit, logs = false): Promise => { logs && console.log(`Writing documents for ${startingRef.path}`); if ('__collections__' in data) { throw new Error('Found unexpected "__collection__" in collection data. Does the starting node match' + @@ -42,7 +44,9 @@ const setDocuments = (data: ICollection, startingRef: FirebaseFirestore.Collecti } const collections: Array = []; const chunks = array_chunks(Object.keys(data), 500); - const chunkPromises = chunks.map((documentKeys: string[]) => { + const chunkPromises = chunks.map(async (documentKeys: string[], index: number) => { + await batchLimit.wait(); + const batch = startingRef.firestore.batch(); documentKeys.map((documentKey: string) => { if (data[documentKey]['__collections__']) { @@ -57,12 +61,12 @@ const setDocuments = (data: ICollection, startingRef: FirebaseFirestore.Collecti const documentData: any = unserializeSpecialTypes(documents); batch.set(startingRef.doc(documentKey), documentData, {merge: mergeWithExisting}); }); - return batch.commit(); + return batch.commit().finally(batchLimit.done); }); return batchExecutor(chunkPromises) .then(() => { return collections.map((col) => { - return setDocuments(col.collection, col.path, mergeWithExisting, logs); + return setDocuments(col.collection, col.path, mergeWithExisting, batchLimit, logs); }); }) .then(subCollectionPromises => batchExecutor(subCollectionPromises))