Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter --maxConcurrency to prevent different load errors. #848

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/bin/bin-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const commandLineParams: { [param: string]: Params } =
key: 'noWait',
description: 'Use with unattended confirmation to remove the 5 second delay.',
},
maxConcurrency: {
shortKey: 'm',
key: 'maxConcurrency',
args: '<maxConcurrency>',
description: `Maximum import concurrency to prevent bandwidth exhausted and other load errors. The default (0) is unlimited.`,
},
prettyPrint: {
shortKey: 'p',
key: 'prettyPrint',
Expand Down
3 changes: 2 additions & 1 deletion src/bin/firestore-clear.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
commandLineParams as params,
packageInfo,
} from './bin-common';
import {measureTimeAsync} from "../lib/helpers";


commander.version(packageInfo.version)
Expand Down Expand Up @@ -64,7 +65,7 @@ const noWait = commander[params.yesToNoWait.key];
await sleep(5000);
}
console.log(colors.bold(colors.green('Starting clearing of records 🏋️')));
await firestoreClear(pathReference, true);
await measureTimeAsync("firestore-clear", () => firestoreClear(pathReference, true));
console.log(colors.bold(colors.green('All done 🎉')));
})().catch((error) => {
if (error instanceof ActionAbortedError) {
Expand Down
9 changes: 6 additions & 3 deletions src/bin/firestore-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import fs from 'fs';
import {firestoreExport} from '../lib';
import {getCredentialsFromFile, getDBReferenceFromPath, getFirestoreDBReference} from '../lib/firestore-helpers';
import {accountCredentialsEnvironmentKey, buildOption, commandLineParams as params, packageInfo} from './bin-common';
import {measureTimeAsync} from "../lib/helpers";

commander.version(packageInfo.version)
.option(...buildOption(params.accountCredentialsPath))
Expand Down Expand Up @@ -54,9 +55,11 @@ const nodePath = commander[params.nodePath.key];
const db = getFirestoreDBReference(credentials);
const pathReference = getDBReferenceFromPath(db, nodePath);
console.log(colors.bold(colors.green('Starting Export 🏋️')));
const results = await firestoreExport(pathReference, true);
const stringResults = JSON.stringify(results, undefined, prettyPrint ? 2 : undefined);
await writeResults(stringResults, backupFile);
await measureTimeAsync("firestore-export", async () => {
const results = await firestoreExport(pathReference, true);
const stringResults = JSON.stringify(results, undefined, prettyPrint ? 2 : undefined);
await writeResults(stringResults, backupFile);
});
console.log(colors.yellow(`Results were saved to ${backupFile}`));
console.log(colors.bold(colors.green('All done 🎉')));
})().catch((error) => {
Expand Down
6 changes: 5 additions & 1 deletion src/bin/firestore-import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import {
commandLineParams as params,
packageInfo,
} from './bin-common';
import {measureTimeAsync} from "../lib/helpers";

commander.version(packageInfo.version)
.option(...buildOption(params.accountCredentialsPath))
.option(...buildOption(params.backupFileImport))
.option(...buildOption(params.nodePath))
.option(...buildOption(params.yesToImport))
.option(...buildOption(params.maxConcurrency))
.parse(process.argv);

const accountCredentialsPath = commander[params.accountCredentialsPath.key] || process.env[accountCredentialsEnvironmentKey];
Expand Down Expand Up @@ -52,6 +54,8 @@ const nodePath = commander[params.nodePath.key];

const unattendedConfirmation = commander[params.yesToImport.key];

const maxConcurrency = parseInt(commander[params.maxConcurrency.key]) || 0;

(async () => {
const credentials = await getCredentialsFromFile(accountCredentialsPath);
const db = getFirestoreDBReference(credentials);
Expand Down Expand Up @@ -79,7 +83,7 @@ const unattendedConfirmation = commander[params.yesToImport.key];
}

console.log(colors.bold(colors.green('Starting Import 🏋️')));
await firestoreImport(data, pathReference, true, true);
await measureTimeAsync("firestore-import", () => firestoreImport(data, pathReference, true, maxConcurrency, true));
console.log(colors.bold(colors.green('All done 🎉')));
})().catch((error) => {
if (error instanceof ActionAbortedError) {
Expand Down
59 changes: 58 additions & 1 deletion src/lib/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IGeopoint} from '../interfaces/IGeopoint';
import {IDocumentReference} from '../interfaces/IDocumentReference';
import DocumentReference = admin.firestore.DocumentReference;
import GeoPoint = admin.firestore.GeoPoint;
import Timeout = NodeJS.Timeout;

// From https://stackoverflow.com/questions/8495687/split-array-into-chunks
const array_chunks = (array: Array<any>, chunk_size: number): Array<Array<any>> => {
Expand Down Expand Up @@ -93,4 +94,60 @@ const isScalar = (val: any) => (typeof val === 'string' || val instanceof String
|| (val === null)
|| (typeof val === 'boolean');

export {array_chunks, serializeSpecialTypes, unserializeSpecialTypes};
interface ConcurrencyLimit {
wait(): Promise<void>;
done(): void
}

function limitConcurrency(maxConcurrency: number = 0, interval: number = 10): ConcurrencyLimit {
if (maxConcurrency === 0) {
return {
async wait(): Promise<void> { },
done() { }
}
}
let unfinishedCount = 0;
let resolveQueue: Function[] = [];
let intervalId: Timeout;
let started = false;

function start() {
started = true;
intervalId = setInterval(() => {
if (resolveQueue.length === 0) {
started = false;
clearInterval(intervalId);
return;
}

while (unfinishedCount <= maxConcurrency && resolveQueue.length > 0) {
const resolveFn = resolveQueue.shift();
unfinishedCount++;
if (resolveFn) resolveFn();
}

}, interval);
}

return {
wait(): Promise<void> {
return new Promise(resolve => {
if (!started) start();
resolveQueue.push(resolve)
});
},
done() {
unfinishedCount--;
}
}
}

const measureTimeAsync = async <T>( info: string, fn: () => Promise<T>): Promise<T> => {
const startTime = Date.now();
const result = await fn();
const timeDiff = Date.now() - startTime;
console.log(`${info} took ${timeDiff}ms`);
return result;
}

export {array_chunks, serializeSpecialTypes, unserializeSpecialTypes, ConcurrencyLimit, limitConcurrency, measureTimeAsync};
20 changes: 12 additions & 8 deletions src/lib/import.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import {anyFirebaseRef, batchExecutor, isLikeDocument, isRootOfDatabase} from './firestore-helpers';
import {array_chunks, unserializeSpecialTypes} from './helpers';
import {array_chunks, ConcurrencyLimit, limitConcurrency, unserializeSpecialTypes} from './helpers';
import {ICollection} from '../interfaces/ICollection';

const importData = (data: any,
startingRef: anyFirebaseRef,
mergeWithExisting: boolean = true,
maxConcurrency: number = 0,
logs = false,
): Promise<any> => {

const dataToImport = {...data};
const writeLimit = limitConcurrency(maxConcurrency);
if (isLikeDocument(startingRef)) {
if (!dataToImport.hasOwnProperty('__collections__')) {
throw new Error('Root or document reference doesn\'t contain a __collections__ property.');
Expand All @@ -17,7 +19,7 @@ const importData = (data: any,
const collectionPromises: Array<Promise<any>> = [];
for (const collection in collections) {
if (collections.hasOwnProperty(collection)) {
collectionPromises.push(setDocuments(collections[collection], startingRef.collection(collection), mergeWithExisting, logs));
collectionPromises.push(setDocuments(collections[collection], startingRef.collection(collection), mergeWithExisting, writeLimit,logs));
}
}
if (isRootOfDatabase(startingRef)) {
Expand All @@ -26,23 +28,25 @@ const importData = (data: any,
const documentID = startingRef.id;
const documentData: any = {};
documentData[documentID] = dataToImport;
const documentPromise = setDocuments(documentData, startingRef.parent, mergeWithExisting, logs);
const documentPromise = setDocuments(documentData, startingRef.parent, mergeWithExisting, writeLimit, logs);
return documentPromise.then(() => batchExecutor(collectionPromises));
}
} else {
return setDocuments(dataToImport, <FirebaseFirestore.CollectionReference>startingRef, mergeWithExisting, logs);
return setDocuments(dataToImport, <FirebaseFirestore.CollectionReference>startingRef, mergeWithExisting, writeLimit, logs);
}
};

const setDocuments = (data: ICollection, startingRef: FirebaseFirestore.CollectionReference, mergeWithExisting: boolean = true, logs = false): Promise<any> => {
const setDocuments = async (data: ICollection, startingRef: FirebaseFirestore.CollectionReference, mergeWithExisting: boolean = true, batchLimit: ConcurrencyLimit, logs = false): Promise<any> => {
logs && console.log(`Writing documents for ${startingRef.path}`);
if ('__collections__' in data) {
throw new Error('Found unexpected "__collection__" in collection data. Does the starting node match' +
' the root of the incoming data?');
}
const collections: Array<any> = [];
const chunks = array_chunks(Object.keys(data), 500);
const chunkPromises = chunks.map((documentKeys: string[]) => {
const chunkPromises = chunks.map(async (documentKeys: string[], index: number) => {
await batchLimit.wait();

const batch = startingRef.firestore.batch();
documentKeys.map((documentKey: string) => {
if (data[documentKey]['__collections__']) {
Expand All @@ -57,12 +61,12 @@ const setDocuments = (data: ICollection, startingRef: FirebaseFirestore.Collecti
const documentData: any = unserializeSpecialTypes(documents);
batch.set(startingRef.doc(documentKey), documentData, {merge: mergeWithExisting});
});
return batch.commit();
return batch.commit().finally(batchLimit.done);
});
return batchExecutor(chunkPromises)
.then(() => {
return collections.map((col) => {
return setDocuments(col.collection, col.path, mergeWithExisting, logs);
return setDocuments(col.collection, col.path, mergeWithExisting, batchLimit, logs);
});
})
.then(subCollectionPromises => batchExecutor(subCollectionPromises))
Expand Down