diff --git a/firestore-bigquery-export/guides/IMPORT_EXISTING_DOCUMENTS.md b/firestore-bigquery-export/guides/IMPORT_EXISTING_DOCUMENTS.md index cc47b3e29..fd8b107c3 100644 --- a/firestore-bigquery-export/guides/IMPORT_EXISTING_DOCUMENTS.md +++ b/firestore-bigquery-export/guides/IMPORT_EXISTING_DOCUMENTS.md @@ -17,11 +17,15 @@ You may pause and resume the import script from the last batch at any point. - If document changes occur in the time between installing the extension and running the import script. - If you run the import script multiple times over the same collection. -- You cannot use wildcard notation in the collection path (i.e. `/collection/{document}/sub_collection}`). Instead, you can use a [collectionGroup](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query. To use a `collectionGroup` query, provide the collection name value as `${COLLECTION_PATH}`, and set `${COLLECTION_GROUP_QUERY}` to `true`. For example, if you are trying to import `/collection/{document}/sub_collection`, the value for the `${COLLECTION_PATH}` should be provided as `sub_collection`. Keep in mind that if you have another sub collection with the same name (e.g. `/collection2/{document}/sub_collection`, that will be imported too. +- You can use wildcard notation in the collection path. Suppose, for example, you have collections `users/user1/pets` and `users/user2/pets`, but also `admins/admin1/pets`. If you set `${COLLECTION_GROUP_QUERY}` to `true` and provide the collection path as `${users/{uid}/pets}`, the import script will import the former two collections but not the later, and will populate the `path_params` column of the big query table with the relevant `uid`s. + +- You can also use a [collectionGroup](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query. To use a `collectionGroup` query, provide the collection name value as `${COLLECTION_PATH}`, and set `${COLLECTION_GROUP_QUERY}` to `true`. For example, if you are trying to import `/collection/{document}/sub_collection`, the value for the `${COLLECTION_PATH}` should be provided as `sub_collection`. Keep in mind that if you have another sub collection with the same name (e.g. `/collection2/{document}/sub_collection`, that will be imported too. - Warning: The import operation is not idempotent; running it twice, or running it after documents have been imported will likely produce duplicate data in your bigquery table. - Warning: A `collectionGroup` query will target every collection in your Firestore project with the provided `${COLLECTION_PATH}`. For example, if you have 10,000 documents with a sub-collection named: `landmarks`, the import script will query every document in 10,000 `landmarks` collections. +You can also use a simple [collectionGroup](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query. To use a `collectionGroup` query, provide the collection name value as `${COLLECTION_PATH}`, and set `${COLLECTION_GROUP_QUERY}` to `true`. + +Warning: A `collectionGroup` query will target every collection in your Firestore project with the provided `${COLLECTION_PATH}`. For example, if you have 10,000 documents with a sub-collection named: `landmarks`, the import script will query every document in 10,000 `landmarks` collections. ### Run the script diff --git a/firestore-bigquery-export/scripts/import/__tests__/e2e.test.ts b/firestore-bigquery-export/scripts/import/__tests__/e2e.test.ts new file mode 100644 index 000000000..04d74e5cf --- /dev/null +++ b/firestore-bigquery-export/scripts/import/__tests__/e2e.test.ts @@ -0,0 +1,1161 @@ +import { BigQuery } from "@google-cloud/bigquery"; +import axios from "axios"; +import * as childProcess from "child_process"; +import * as admin from "firebase-admin"; +import * as path from "path"; + +import { repeat } from "./helpers/waitFor"; + +const scriptPath = path.join(__dirname, "../lib/index.js"); +const projectId = "extensions-testing"; + +const bigquery = new BigQuery({ projectId }); + +async function runScript(scriptPath: string, callback, args?: string[]) { + return new Promise((resolve, reject) => { + // keep track of whether callback has been invoked to prevent multiple invocations + let invoked = false; + const child = childProcess.fork(scriptPath, args, { + cwd: __dirname, + stdio: [process.stdin, process.stdout, process.stderr, "ipc"], + env: { + ...process.env, + }, + }); + + // listen for errors as they may prevent the exit event from firing + child.on("error", (err) => { + if (invoked) return; + invoked = true; + callback(err); + reject(err); + }); + + // execute the callback once the process has finished running + child.on("exit", (code) => { + if (invoked) return; + invoked = true; + const err = code === 0 ? null : new Error("exit code " + code); + callback(err); + resolve(); + }); + }); +} + +describe("e2e test CLI", () => { + let firestore; + let collectionName = "testCollection"; + let datasetName = "testDataset"; + let tableName = "testTable"; + + beforeEach(async () => { + const randomID = Math.random().toString(36).substring(7); + collectionName = `testCollection_${randomID}`; + datasetName = `testDataset_${randomID}`; + tableName = `testTable_${randomID}`; + + //This is live config, should be emulator? + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + await firestore + .collection(collectionName) + .doc("test") + .set({ test: "test" }); + }); + + afterEach(async () => { + // wait 5 seconds for all bigquery data to settle + await new Promise((resolve) => setTimeout(resolve, 5000)); + // if dataset exists, delete it + if ((await bigquery.dataset(datasetName).exists())[0]) { + await bigquery.dataset(datasetName).delete({ force: true }); + } + // delete all data from firestore via axios request + await axios.delete( + `http://localhost:8080/emulator/v1/projects/extensions-testing/databases/(default)/documents` + ); + }); + + test(`should import data with old script`, async () => { + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + collectionName, + "-d", + datasetName, + "-t", + tableName, + "-q", + "false", + "-l", + "us", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 0, + 10, + 20000 + ); + + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + expect(document_name).toBe( + `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + ); + expect(document_id).toBe("test"); + expect(JSON.parse(data)).toEqual({ test: "test" }); + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + }); + test(`should import data with new script, and add the correct view`, async () => { + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + collectionName, + "-d", + datasetName, + "-t", + tableName, + "-q", + "false", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 0, + 10, + 20000 + ); + + expect(rows.length).toBeGreaterThanOrEqual(1); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + expect(document_name).toBe( + `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + ); + expect(document_id).toBe("test"); + expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("basic collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("notregions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + console.log(rows); + expect(rows.length).toBe(2); + + const { operation, timestamp, event_id, old_data } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("wildcarded collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("regions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + expect(rows.length).toBe(2); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + test("shouldn't export non-matching results from collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("regions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + await firestore + .collection("notregions/asia/countries") + .doc("foo") + .set({ name: "Bar" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + //sleep for 2 seconds + await new Promise((resolve) => setTimeout(resolve, 5000)); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + expect(rows.length).toBe(2); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("should match several wildcards in one query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries/france/cities") + .doc("paris") + .set({ name: "Paris" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries/{countryId}/cities", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length >= 1, + 10, + 20000 + ); + + expect(rows.length).toBe(1); + console.log(rows[0]); + const { path_params } = rows[0]; + + const pathParams = JSON.parse(path_params); + + expect(pathParams).toHaveProperty("regionId"); + expect(pathParams).toHaveProperty("countryId"); + expect(pathParams.regionId).toBe("europe"); + expect(pathParams.countryId).toBe("france"); + }); +}); + +describe("e2e multi thread", () => { + let firestore; + let collectionName = "testCollection"; + let datasetName = "testDataset"; + let tableName = "testTable"; + + beforeEach(async () => { + const randomID = Math.random().toString(36).substring(7); + collectionName = `testCollection_${randomID}`; + datasetName = `testDataset_${randomID}`; + tableName = `testTable_${randomID}`; + + //This is live config, should be emulator? + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + await firestore + .collection(collectionName) + .doc("test") + .set({ test: "test" }); + }); + + afterEach(async () => { + // wait 5 seconds for all bigquery data to settle + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // if dataset exists, delete it + if ((await bigquery.dataset(datasetName).exists())[0]) { + await bigquery.dataset(datasetName).delete({ force: true }); + } + // delete all data from firestore via axios request + await axios.delete( + `http://localhost:8080/emulator/v1/projects/extensions-testing/databases/(default)/documents` + ); + }); + + test(`should import data with old script`, async () => { + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + collectionName, + "-d", + datasetName, + "-t", + tableName, + "-q", + "false", + "-l", + "us", + "-e", + "true", + "-m", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 0, + 10, + 20000 + ); + + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + expect(document_name).toBe( + `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + ); + expect(document_id).toBe("test"); + expect(JSON.parse(data)).toEqual({ test: "test" }); + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + }); + test(`should import data with new script, and add the correct view`, async () => { + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + collectionName, + "-d", + datasetName, + "-t", + tableName, + "-q", + "false", + "-l", + "us", + "-u", + "true", + "-e", + "true", + "-m", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 0, + 10, + 20000 + ); + + expect(rows.length).toBeGreaterThanOrEqual(1); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + expect(document_name).toBe( + `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + ); + expect(document_id).toBe("test"); + expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("basic collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("notregions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + "-m", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + console.log(rows); + expect(rows.length).toBe(2); + + const { operation, timestamp, event_id, old_data } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("wildcarded collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("regions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + "-m", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + expect(rows.length).toBe(2); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + test("shouldn't export non-matching results from collection group query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries") + .doc("france") + .set({ name: "France" }); + + await firestore + .collection("regions/asia/countries") + .doc("japan") + .set({ name: "Japan" }); + + await firestore + .collection("notregions/asia/countries") + .doc("foo") + .set({ name: "Bar" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + "-m", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + //sleep for 2 seconds + await new Promise((resolve) => setTimeout(resolve, 5000)); + + console.log(datasetName); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length > 1, + 10, + 20000 + ); + + expect(rows.length).toBe(2); + const { + operation, + timestamp, + document_name, + document_id, + data, + event_id, + old_data, + } = rows[0]; + + expect(operation).toBe("IMPORT"); + // expect(document_name).toBe( + // `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` + // ); + // expect(document_id).toBe("test"); + // expect(JSON.parse(data)).toEqual({ test: "test" }); + + expect(event_id).toBe(""); + expect(old_data).toBeNull(); + expect(timestamp).toBeDefined(); + + const [view] = await bigquery + .dataset(datasetName) + .table(`${tableName}_raw_latest`) + .get(); + + const query = view.metadata.view.query; + expect(query).toBeDefined(); + const isOldQuery = query.includes("FIRST_VALUE"); + expect(isOldQuery).toBe(false); + }); + + test("should match several wildcards in one query", async () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_CONFIG = JSON.stringify({ + apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", + authDomain: "extensions-testing.firebaseapp.com", + databaseURL: "https://extensions-testing.firebaseio.com", + projectId: "extensions-testing", + storageBucket: "extensions-testing.appspot.com", + messagingSenderId: "219368645393", + appId: "1:219368645393:web:e92083eba0c53f366862b0", + measurementId: "G-QF38ZM1SZN", + }); + // if there is no app, initialize one + if (!admin.apps.length) { + admin.initializeApp(); + } + firestore = admin.firestore(); + + await firestore + .collection("regions/europe/countries/france/cities") + .doc("paris") + .set({ name: "Paris" }); + + const args = [ + "--non-interactive", + "-P", + "extensions-testing", + "-u", + "-s", + "regions/{regionId}/countries/{countryId}/cities", + "-d", + datasetName, + "-t", + tableName, + "-q", + "true", + "-l", + "us", + "-u", + "true", + "-e", + "true", + ]; + + await runScript( + scriptPath, + () => { + console.log("complete!"); + }, + args + ); + + const [rows] = await repeat( + () => + bigquery + .dataset(datasetName) + .table(`${tableName}_raw_changelog`) + .getRows(), + (rows) => rows[0].length >= 1, + 10, + 20000 + ); + + expect(rows.length).toBe(1); + console.log(rows[0]); + const { path_params } = rows[0]; + + const pathParams = JSON.parse(path_params); + + expect(pathParams).toHaveProperty("regionId"); + expect(pathParams).toHaveProperty("countryId"); + expect(pathParams.regionId).toBe("europe"); + expect(pathParams.countryId).toBe("france"); + }); +}); diff --git a/firestore-bigquery-export/scripts/import/__tests__/emulator-params.env b/firestore-bigquery-export/scripts/import/__tests__/emulator-params.env deleted file mode 100644 index b44891e9d..000000000 --- a/firestore-bigquery-export/scripts/import/__tests__/emulator-params.env +++ /dev/null @@ -1,2 +0,0 @@ -LOCATION=europe-west2 -PROJECT_ID=extensions-testing diff --git a/firestore-bigquery-export/scripts/import/__tests__/example.test.ts b/firestore-bigquery-export/scripts/import/__tests__/example.test.ts deleted file mode 100644 index c0d820923..000000000 --- a/firestore-bigquery-export/scripts/import/__tests__/example.test.ts +++ /dev/null @@ -1,217 +0,0 @@ -import { BigQuery } from "@google-cloud/bigquery"; - -import * as childProcess from "child_process"; -import * as path from "path"; -import * as admin from "firebase-admin"; -import { repeat } from "./helpers/waitFor"; - -const scriptPath = path.join(__dirname, "../lib/index.js"); -const projectId = "extensions-testing"; - -const bigquery = new BigQuery({ projectId }); - -async function runScript(scriptPath, callback, args?: string[]) { - return new Promise((resolve, reject) => { - // keep track of whether callback has been invoked to prevent multiple invocations - let invoked = false; - const child = childProcess.fork(scriptPath, args, { - cwd: __dirname, - stdio: [process.stdin, process.stdout, process.stderr, "ipc"], - env: { - ...process.env, - }, - }); - - // listen for errors as they may prevent the exit event from firing - child.on("error", (err) => { - if (invoked) return; - invoked = true; - callback(err); - reject(err); - }); - - // execute the callback once the process has finished running - child.on("exit", (code) => { - if (invoked) return; - invoked = true; - const err = code === 0 ? null : new Error("exit code " + code); - callback(err); - resolve(); - }); - }); -} - -describe("CLI", () => { - let firestore; - let collectionName = "testCollection"; - let datasetName = "testDataset"; - const tableName = "testTable"; - - const randomID = Math.random().toString(36).substring(7); - - beforeEach(async () => { - collectionName = `testCollection_${randomID}`; - datasetName = `testDataset_${randomID}`; - - //This is live config, should be emulator? - process.env.FIRESTORE_EMULATOR_HOST = "127.0.0.1:8080"; - process.env.FIREBASE_CONFIG = JSON.stringify({ - apiKey: "AIzaSyAJTgFI-OVRjgd_10JDWc9T3kxvxY-fUe4", - authDomain: "extensions-testing.firebaseapp.com", - databaseURL: "https://extensions-testing.firebaseio.com", - projectId: "extensions-testing", - storageBucket: "extensions-testing.appspot.com", - messagingSenderId: "219368645393", - appId: "1:219368645393:web:e92083eba0c53f366862b0", - measurementId: "G-QF38ZM1SZN", - }); - // if there is no app, initialize one - if (!admin.apps.length) { - admin.initializeApp(); - } - firestore = admin.firestore(); - await firestore - .collection(collectionName) - .doc("test") - .set({ test: "test" }); - }); - - afterEach(async () => { - // if dataset exists, delete it - if ((await bigquery.dataset(datasetName).exists())[0]) { - await bigquery.dataset(datasetName).delete({ force: true }); - } - }); - - test(`should import data with old script`, async () => { - const args = [ - "--non-interactive", - "-P", - "extensions-testing", - "-u", - "-s", - collectionName, - "-d", - datasetName, - "-t", - tableName, - "-q", - "false", - "-l", - "us", - "-e", - "true", - ]; - - await runScript( - scriptPath, - () => { - console.log("complete!"); - }, - args - ); - - const [rows] = await repeat( - () => - bigquery - .dataset(datasetName) - .table(`${tableName}_raw_changelog`) - .getRows(), - (rows) => rows.length > 0, - 10, - 8000 - ); - - const { - operation, - timestamp, - document_name, - document_id, - data, - event_id, - old_data, - } = rows[0]; - console.log(rows[0]); - - expect(operation).toBe("IMPORT"); - expect(document_name).toBe( - `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` - ); - expect(document_id).toBe("test"); - expect(JSON.parse(data)).toEqual({ test: "test" }); - expect(event_id).toBe(""); - expect(old_data).toBeNull(); - expect(timestamp).toBeDefined(); - }); - test(`should import data with new script, and add the correct view`, async () => { - const args = [ - "--non-interactive", - "-P", - "extensions-testing", - "-u", - "-s", - collectionName, - "-d", - datasetName, - "-t", - tableName, - "-q", - "false", - "-l", - "us", - "-u", - "true", - "-e", - "true", - ]; - - await runScript( - scriptPath, - () => { - console.log("complete!"); - }, - args - ); - - const [rows] = await repeat( - () => - bigquery - .dataset(datasetName) - .table(`${tableName}_raw_changelog`) - .getRows(), - (rows) => rows.length > 0, - 10, - 8000 - ); - - const { - operation, - timestamp, - document_name, - document_id, - data, - event_id, - old_data, - } = rows[0]; - - expect(operation).toBe("IMPORT"); - expect(document_name).toBe( - `projects/extensions-testing/databases/(default)/documents/${collectionName}/test` - ); - expect(document_id).toBe("test"); - expect(JSON.parse(data)).toEqual({ test: "test" }); - expect(event_id).toBe(""); - expect(old_data).toBeNull(); - expect(timestamp).toBeDefined(); - - const [view] = await bigquery - .dataset(datasetName) - .table(`${tableName}_raw_latest`) - .get(); - - const query = view.metadata.view.query; - expect(query).toBeDefined(); - const isOldQuery = query.includes("FIRST_VALUE"); - expect(isOldQuery).toBe(false); - }); -}); diff --git a/firestore-bigquery-export/scripts/import/package-lock.json b/firestore-bigquery-export/scripts/import/package-lock.json index dc03cae03..43afc13c7 100644 --- a/firestore-bigquery-export/scripts/import/package-lock.json +++ b/firestore-bigquery-export/scripts/import/package-lock.json @@ -26,7 +26,9 @@ "devDependencies": { "@types/chai": "^4.2.0", "@types/workerpool": "^6.0.0", + "axios": "^1.3.2", "chai": "^4.2.0", + "dotenv": "^16.3.1", "jest": "^29.3.1", "nanoid": "^4.0.0", "rimraf": "^2.6.3", @@ -2248,6 +2250,23 @@ "retry": "0.13.1" } }, + "node_modules/asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==", + "dev": true + }, + "node_modules/axios": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.5.1.tgz", + "integrity": "sha512-Q28iYCWzNHjAm+yEAot5QaAMxhMghWLFVf7rRdwhUI+c2jix2DUXjAHXVi+s1ibs3mjPO/cCgbA++3BjD0vP/A==", + "dev": true, + "dependencies": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/babel-jest": { "version": "29.3.1", "dev": true, @@ -2722,6 +2741,18 @@ "version": "1.1.4", "license": "MIT" }, + "node_modules/combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "dev": true, + "dependencies": { + "delayed-stream": "~1.0.0" + }, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/commander": { "version": "5.0.0", "license": "MIT", @@ -2856,6 +2887,15 @@ "node": ">=0.10.0" } }, + "node_modules/delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/depd": { "version": "2.0.0", "license": "MIT", @@ -2895,6 +2935,18 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/dotenv": { + "version": "16.3.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", + "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", + "dev": true, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/motdotla/dotenv?sponsor=1" + } + }, "node_modules/duplexify": { "version": "4.1.2", "license": "MIT", @@ -3393,6 +3445,40 @@ "@types/serve-static": "*" } }, + "node_modules/follow-redirects": { + "version": "1.15.3", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz", + "integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==", + "dev": true, + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, + "node_modules/form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dev": true, + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/forwarded": { "version": "0.2.0", "license": "MIT", @@ -6260,6 +6346,12 @@ "node": ">= 0.10" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==", + "dev": true + }, "node_modules/pseudomap": { "version": "1.0.2", "license": "ISC" @@ -8839,6 +8931,23 @@ "retry": "0.13.1" } }, + "asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==", + "dev": true + }, + "axios": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.5.1.tgz", + "integrity": "sha512-Q28iYCWzNHjAm+yEAot5QaAMxhMghWLFVf7rRdwhUI+c2jix2DUXjAHXVi+s1ibs3mjPO/cCgbA++3BjD0vP/A==", + "dev": true, + "requires": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, "babel-jest": { "version": "29.3.1", "dev": true, @@ -9131,6 +9240,15 @@ "color-name": { "version": "1.1.4" }, + "combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "dev": true, + "requires": { + "delayed-stream": "~1.0.0" + } + }, "commander": { "version": "5.0.0" }, @@ -9214,6 +9332,12 @@ "version": "4.2.2", "dev": true }, + "delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "dev": true + }, "depd": { "version": "2.0.0" }, @@ -9232,6 +9356,12 @@ "version": "29.3.1", "dev": true }, + "dotenv": { + "version": "16.3.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", + "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", + "dev": true + }, "duplexify": { "version": "4.1.2", "requires": { @@ -9566,6 +9696,23 @@ } } }, + "follow-redirects": { + "version": "1.15.3", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.3.tgz", + "integrity": "sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==", + "dev": true + }, + "form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dev": true, + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + }, "forwarded": { "version": "0.2.0" }, @@ -11409,6 +11556,12 @@ "ipaddr.js": "1.9.1" } }, + "proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==", + "dev": true + }, "pseudomap": { "version": "1.0.2" }, diff --git a/firestore-bigquery-export/scripts/import/package.json b/firestore-bigquery-export/scripts/import/package.json index ddda834fc..91c18a09b 100644 --- a/firestore-bigquery-export/scripts/import/package.json +++ b/firestore-bigquery-export/scripts/import/package.json @@ -15,7 +15,7 @@ "compile": "tsc", "import": "node ./lib/index.js", "prepare": "npm run build", - "test:local": "firebase ext:dev:emulators:exec ./node_modules/.bin/jest --test-params=./__tests__/emulator-params.env --project=extensions-testing --config=./__tests__/firebase.json" + "test:local": "jest" }, "files": [ "lib" @@ -40,7 +40,9 @@ "devDependencies": { "@types/chai": "^4.2.0", "@types/workerpool": "^6.0.0", + "axios": "^1.3.2", "chai": "^4.2.0", + "dotenv": "^16.3.1", "jest": "^29.3.1", "nanoid": "^4.0.0", "rimraf": "^2.6.3", diff --git a/firestore-bigquery-export/scripts/import/src/config.ts b/firestore-bigquery-export/scripts/import/src/config.ts index e437a811b..f48768a9a 100644 --- a/firestore-bigquery-export/scripts/import/src/config.ts +++ b/firestore-bigquery-export/scripts/import/src/config.ts @@ -1,13 +1,16 @@ -import * as inquirer from "inquirer"; import * as program from "commander"; +import * as filenamify from "filenamify"; +import * as inquirer from "inquirer"; import { CliConfig, CliConfigError } from "./types"; const BIGQUERY_VALID_CHARACTERS = /^[a-zA-Z0-9_]+$/; -const FIRESTORE_VALID_CHARACTERS = /^[^\/]+$/; +// regex of ^[^/]+(/[^/]+/[^/]+)*$ +export const FIRESTORE_VALID_CHARACTERS = new RegExp("^[^/]+(/[^/]+/[^/]+)*$"); +// export const FIRESTORE_VALID_CHARACTERS = /^[^/]+(/[^/]+/[^/]+)*$/; const PROJECT_ID_MAX_CHARS = 6144; -const FIRESTORE_COLLECTION_NAME_MAX_CHARS = 6144; +export const FIRESTORE_COLLECTION_NAME_MAX_CHARS = 6144; const BIGQUERY_RESOURCE_NAME_MAX_CHARS = 1024; const validateBatchSize = (value: string) => { @@ -49,7 +52,7 @@ const validateLocation = (value: string) => { return index !== -1; }; -const validateInput = ( +export const validateInput = ( value: string, name: string, regex: RegExp, @@ -196,6 +199,14 @@ export async function parseConfig(): Promise { return { kind: "ERROR", errors }; } + const rawChangeLogName = `${program.tableNamePrefix}_raw_changelog`; + const cursorPositionFile = getCursorPositionFile( + program.sourceCollectionPath, + program.project, + program.dataset, + rawChangeLogName + ); + return { kind: "CONFIG", projectId: program.project, @@ -208,6 +219,8 @@ export async function parseConfig(): Promise { multiThreaded: program.multiThreaded === "true", useNewSnapshotQuerySyntax: program.useNewSnapshotQuerySyntax === "true", useEmulator: program.useEmulator === "true", + rawChangeLogName, + cursorPositionFile, }; } const { @@ -223,6 +236,14 @@ export async function parseConfig(): Promise { useEmulator, } = await inquirer.prompt(questions); + const rawChangeLogName = `${table}_raw_changelog`; + const cursorPositionFile = getCursorPositionFile( + sourceCollectionPath, + project, + dataset, + rawChangeLogName + ); + return { kind: "CONFIG", projectId: project, @@ -235,6 +256,8 @@ export async function parseConfig(): Promise { multiThreaded: multiThreaded, useNewSnapshotQuerySyntax: useNewSnapshotQuerySyntax, useEmulator: useEmulator, + rawChangeLogName, + cursorPositionFile, }; } @@ -256,3 +279,17 @@ export const resolveWildcardIds = (template: string, text: string) => { return previousValue; }, {}); }; + +function getCursorPositionFile( + sourceCollectionPath: string, + projectId: string, + datasetId: string, + rawChangeLogName: string +) { + // TODO: make this part of config, set it in CliConfig + const formattedPath = filenamify(sourceCollectionPath); + return ( + __dirname + + `/from-${formattedPath}-to-${projectId}_${datasetId}_${rawChangeLogName}` + ); +} diff --git a/firestore-bigquery-export/scripts/import/src/helper.ts b/firestore-bigquery-export/scripts/import/src/helper.ts new file mode 100644 index 000000000..d5d929cdc --- /dev/null +++ b/firestore-bigquery-export/scripts/import/src/helper.ts @@ -0,0 +1,103 @@ +import { FirestoreBigQueryEventHistoryTracker } from "@firebaseextensions/firestore-bigquery-change-tracker"; +import { BigQuery } from "@google-cloud/bigquery"; + +import { CliConfig } from "./types"; + +import { + ChangeType, + FirestoreDocumentChangeEvent, +} from "@firebaseextensions/firestore-bigquery-change-tracker"; +import * as firebase from "firebase-admin"; +import * as fs from "fs"; +import * as util from "util"; + +import { resolveWildcardIds } from "./config"; + +const write = util.promisify(fs.writeFile); + +const FIRESTORE_DEFAULT_DATABASE = "(default)"; + +// TODO: do we need this logic? +export const initializeDataSink = async ( + dataSink: FirestoreBigQueryEventHistoryTracker, + config: CliConfig +) => { + const bigquery = new BigQuery(); + const dataset = bigquery.dataset(config.datasetId); + const table = dataset.table(config.rawChangeLogName); + const [tableExists] = await table.exists(); + await dataSink.initialize(); + if (!tableExists) { + console.log("Wait a few seconds for the dataset to initialize..."); + await new Promise((resolve) => setTimeout(resolve, 5000, [])); // Wait for the dataset to initialize + } +}; + +export function getRowsFromDocs( + docs: firebase.firestore.QueryDocumentSnapshot[], + config: CliConfig +): FirestoreDocumentChangeEvent[] { + let rows: FirestoreDocumentChangeEvent[] = []; + + const templateSegments = config.sourceCollectionPath.split("/"); + + if (config.queryCollectionGroup && templateSegments.length > 1) { + for (const doc of docs) { + let pathParams = {}; + const path = doc.ref.path; + + const pathSegments = path.split("/"); + const isSameLength = pathSegments.length === templateSegments.length + 1; + + if (isSameLength) { + let isMatch = true; + for (let i = 0; i < templateSegments.length; i++) { + if ( + templateSegments[i].startsWith("{") && + templateSegments[i].endsWith("}") + ) { + const key = templateSegments[i].substring( + 1, + templateSegments[i].length - 1 + ); + const value = pathSegments[i]; + pathParams = { + ...pathParams, + [key]: value, + }; + } else if (templateSegments[i] !== pathSegments[i]) { + isMatch = false; + break; + } + } + if (isMatch) { + rows.push({ + timestamp: new Date().toISOString(), // epoch + operation: ChangeType.IMPORT, + documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${path}`, + documentId: doc.id, + pathParams, + eventId: "", + data: doc.data(), + }); + } + } + } + } else { + rows = docs.map((snapshot) => { + return { + timestamp: new Date().toISOString(), // epoch + operation: ChangeType.IMPORT, + documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${snapshot.ref.path}`, + documentId: snapshot.id, + pathParams: resolveWildcardIds( + config.sourceCollectionPath, + snapshot.ref.path + ), + eventId: "", + data: snapshot.data(), + }; + }); + } + return rows; +} diff --git a/firestore-bigquery-export/scripts/import/src/index.ts b/firestore-bigquery-export/scripts/import/src/index.ts index c24b96bf9..d3273a106 100644 --- a/firestore-bigquery-export/scripts/import/src/index.ts +++ b/firestore-bigquery-export/scripts/import/src/index.ts @@ -15,109 +15,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +import { FirestoreBigQueryEventHistoryTracker } from "@firebaseextensions/firestore-bigquery-change-tracker"; import * as firebase from "firebase-admin"; -import * as program from "commander"; import * as fs from "fs"; import * as util from "util"; -import * as filenamify from "filenamify"; -import { runMultiThread } from "./run"; -import { resolveWildcardIds } from "./config"; -import { - ChangeType, - FirestoreBigQueryEventHistoryTracker, - FirestoreDocumentChangeEvent, -} from "@firebaseextensions/firestore-bigquery-change-tracker"; import { parseConfig } from "./config"; +import { initializeDataSink } from "./helper"; +import * as logs from "./logs"; +import { getCLIOptions } from "./program"; +import { runMultiThread } from "./run-multi-thread"; +import { runSingleThread } from "./run-single-thread"; // For reading cursor position. const exists = util.promisify(fs.exists); -const write = util.promisify(fs.writeFile); const read = util.promisify(fs.readFile); const unlink = util.promisify(fs.unlink); - -const FIRESTORE_DEFAULT_DATABASE = "(default)"; - -const packageJson = require("../package.json"); - -program - .name("fs-bq-import-collection") - .description(packageJson.description) - .version(packageJson.version) - .option( - "--non-interactive", - "Parse all input from command line flags instead of prompting the caller.", - false - ) - .option( - "-P, --project ", - "Firebase Project ID for project containing the Cloud Firestore database." - ) - .option( - "-s, --source-collection-path ", - "The path of the the Cloud Firestore Collection to import from. (This may, or may not, be the same Collection for which you plan to mirror changes.)" - ) - .option( - "-q, --query-collection-group [true|false]", - "Use 'true' for a collection group query, otherwise a collection query is performed." - ) - .option( - "-d, --dataset ", - "The ID of the BigQuery dataset to import to. (A dataset will be created if it doesn't already exist.)" - ) - .option( - "-t, --table-name-prefix ", - "The identifying prefix of the BigQuery table to import to. (A table will be created if one doesn't already exist.)" - ) - .option( - "-b, --batch-size [batch-size]", - "Number of documents to stream into BigQuery at once.", - (value) => parseInt(value, 10), - 300 - ) - .option( - "-l, --dataset-location ", - "Location of the BigQuery dataset." - ) - .option( - "-m, --multi-threaded [true|false]", - "Whether to run standard or multi-thread import version" - ) - .option( - "-u, --use-new-snapshot-query-syntax [true|false]", - "Whether to use updated latest snapshot query" - ) - .option( - "-e, --use-emulator [true|false]", - "Whether to use the firestore emulator" - ); - +getCLIOptions(); const run = async (): Promise => { const config = await parseConfig(); if (config.kind === "ERROR") { config.errors?.forEach((e) => console.error(`[ERROR] ${e}`)); process.exit(1); } - const { projectId, - sourceCollectionPath, datasetId, tableId, - batchSize, queryCollectionGroup, datasetLocation, multiThreaded, useNewSnapshotQuerySyntax, useEmulator, + cursorPositionFile, } = config; - if (useEmulator) { console.log("Using emulator"); process.env.FIRESTORE_EMULATOR_HOST = "127.0.0.1:8080"; } - + // Set project ID, so it can be used in BigQuery initialization + process.env.PROJECT_ID = projectId; + process.env.GOOGLE_CLOUD_PROJECT = projectId; + process.env.GCLOUD_PROJECT = projectId; // Initialize Firebase // This uses applicationDefault to authenticate // Please see https://cloud.google.com/docs/authentication/production @@ -127,13 +66,7 @@ const run = async (): Promise => { databaseURL: `https://${projectId}.firebaseio.com`, }); } - // Set project ID, so it can be used in BigQuery initialization - process.env.PROJECT_ID = projectId; - process.env.GOOGLE_CLOUD_PROJECT = projectId; - - const rawChangeLogName = `${tableId}_raw_changelog`; - if (multiThreaded) return runMultiThread(config, rawChangeLogName); // We pass in the application-level "tableId" here. The tracker determines // the name of the raw changelog from this field. const dataSink = new FirestoreBigQueryEventHistoryTracker({ @@ -144,94 +77,42 @@ const run = async (): Promise => { useNewSnapshotQuerySyntax, }); - console.log( - `Importing data from Cloud Firestore Collection${ - queryCollectionGroup ? " (via a Collection Group query)" : "" - }: ${sourceCollectionPath}, to BigQuery Dataset: ${datasetId}, Table: ${rawChangeLogName}` - ); + await initializeDataSink(dataSink, config); + + logs.importingData(config); + if (multiThreaded && queryCollectionGroup) { + if (queryCollectionGroup) { + return runMultiThread(config); + } + logs.warningMultiThreadedCollectionGroupOnly(); + } // Build the data row with a 0 timestamp. This ensures that all other // operations supersede imports when listing the live documents. - let cursor; - - const formattedPath = filenamify(sourceCollectionPath); + let cursor: + | firebase.firestore.DocumentSnapshot + | undefined = undefined; - let cursorPositionFile = - __dirname + - `/from-${formattedPath}-to-${projectId}_${datasetId}_${rawChangeLogName}`; if (await exists(cursorPositionFile)) { let cursorDocumentId = (await read(cursorPositionFile)).toString(); cursor = await firebase.firestore().doc(cursorDocumentId).get(); - console.log( - `Resuming import of Cloud Firestore Collection ${sourceCollectionPath} ${ - queryCollectionGroup ? " (via a Collection Group query)" : "" - } from document ${cursorDocumentId}.` - ); + logs.resumingImport(config, cursorDocumentId); } - - let totalRowsImported = 0; - - do { - if (cursor) { - await write(cursorPositionFile, cursor.ref.path); - } - - let query: firebase.firestore.Query; - - if (queryCollectionGroup) { - query = firebase.firestore().collectionGroup(sourceCollectionPath); - } else { - query = firebase.firestore().collection(sourceCollectionPath); - } - - query = query.limit(batchSize); - - if (cursor) { - query = query.startAfter(cursor); - } - const snapshot = await query.get(); - const docs = snapshot.docs; - if (docs.length === 0) { - break; - } - cursor = docs[docs.length - 1]; - const rows: FirestoreDocumentChangeEvent[] = docs.map((snapshot) => { - return { - timestamp: new Date().toISOString(), // epoch - operation: ChangeType.IMPORT, - documentName: `projects/${projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${snapshot.ref.path}`, - documentId: snapshot.id, - pathParams: resolveWildcardIds(sourceCollectionPath, snapshot.ref.path), - eventId: "", - data: snapshot.data(), - }; - }); - await dataSink.record(rows); - totalRowsImported += rows.length; - } while (true); - + const totalRowsImported = runSingleThread(dataSink, config, cursor); try { await unlink(cursorPositionFile); } catch (e) { - console.log(e); - console.log( - `Error unlinking journal file ${cursorPositionFile} after successful import: ${e.toString()}` - ); + logs.warningUnlinkingJournalFile(cursorPositionFile, e); } - return totalRowsImported; }; run() .then((rowCount) => { - console.log("---------------------------------------------------------"); - console.log(`Finished importing ${rowCount} Firestore rows to BigQuery`); - console.log("---------------------------------------------------------"); + logs.finishedImporting(rowCount); process.exit(); }) .catch((error) => { - console.error( - `Error importing Collection to BigQuery: ${error.toString()}` - ); + logs.errorImporting(error); process.exit(1); }); diff --git a/firestore-bigquery-export/scripts/import/src/logs.ts b/firestore-bigquery-export/scripts/import/src/logs.ts new file mode 100644 index 000000000..1072ebb6e --- /dev/null +++ b/firestore-bigquery-export/scripts/import/src/logs.ts @@ -0,0 +1,65 @@ +import { CliConfig } from "./types"; + +export const finishedImporting = (rowCount: number) => { + console.log("---------------------------------------------------------"); + console.log(`Finished importing ${rowCount} Firestore rows to BigQuery`); + console.log("---------------------------------------------------------"); +}; + +export const errorImporting = (error: unknown) => { + console.error(`Error importing Collection to BigQuery: ${error.toString()}`); +}; + +export const warningUnlinkingJournalFile = ( + cursorPositionFile: string, + e: unknown +) => { + console.warn(e); + console.warn( + `Error unlinking journal file ${cursorPositionFile} after successful import: ${e.toString()}` + ); +}; + +export const importingData = (config: CliConfig) => { + console.log( + `Importing data from Cloud Firestore Collection${ + config.queryCollectionGroup ? " (via a Collection Group query)" : "" + }: ${config.sourceCollectionPath}, to BigQuery Dataset: ${ + config.datasetId + }, Table: ${config.rawChangeLogName}` + ); +}; + +export const waitingToInitialize = () => { + console.log("Wait a few seconds for the dataset to initialize..."); +}; + +export const finishedImportingParallel = ( + config: CliConfig, + total: number, + partitions: number +) => { + console.log(`Imported ${total} documents in ${partitions} partitions.`); + + console.log("---------------------------------------------------------"); + console.log( + `Please see https://console.cloud.google.com/bigquery?p=${config.projectId}&d=${config.datasetId}&t=${config.tableId}_raw_changelog&page=table` + ); + console.log("---------------------------------------------------------"); +}; + +export const resumingImport = (config: CliConfig, cursorDocumentId: string) => { + console.log( + `Resuming import of Cloud Firestore Collection ${ + config.sourceCollectionPath + } ${ + config.queryCollectionGroup ? " (via a Collection Group query)" : "" + } from document ${cursorDocumentId}.` + ); +}; + +export const warningMultiThreadedCollectionGroupOnly = () => { + console.warn( + "Multi-threaded imports are only supported for Collection Group queries. Proceeding with a single thread." + ); +}; diff --git a/firestore-bigquery-export/scripts/import/src/program.ts b/firestore-bigquery-export/scripts/import/src/program.ts new file mode 100644 index 000000000..8269bb8c0 --- /dev/null +++ b/firestore-bigquery-export/scripts/import/src/program.ts @@ -0,0 +1,57 @@ +import * as program from "commander"; + +const packageJson = require("../package.json"); + +export const getCLIOptions = () => { + program + .name("fs-bq-import-collection") + .description(packageJson.description) + .version(packageJson.version) + .option( + "--non-interactive", + "Parse all input from command line flags instead of prompting the caller.", + false + ) + .option( + "-P, --project ", + "Firebase Project ID for project containing the Cloud Firestore database." + ) + .option( + "-q, --query-collection-group [true|false]", + "Use 'true' for a collection group query, otherwise a collection query is performed." + ) + .option( + "-s, --source-collection-path ", + "The path of the the Cloud Firestore Collection to import from. (This may or may not be the same Collection for which you plan to mirror changes.)" + ) + .option( + "-d, --dataset ", + "The ID of the BigQuery dataset to import to. (A dataset will be created if it doesn't already exist.)" + ) + .option( + "-t, --table-name-prefix ", + "The identifying prefix of the BigQuery table to import to. (A table will be created if one doesn't already exist.)" + ) + .option( + "-b, --batch-size [batch-size]", + "Number of documents to stream into BigQuery at once.", + (value) => parseInt(value, 10), + 300 + ) + .option( + "-l, --dataset-location ", + "Location of the BigQuery dataset." + ) + .option( + "-m, --multi-threaded [true|false]", + "Whether to run standard or multi-thread import version" + ) + .option( + "-u, --use-new-snapshot-query-syntax [true|false]", + "Whether to use updated latest snapshot query" + ) + .option( + "-e, --use-emulator [true|false]", + "Whether to use the firestore emulator" + ); +}; diff --git a/firestore-bigquery-export/scripts/import/src/run-multi-thread.ts b/firestore-bigquery-export/scripts/import/src/run-multi-thread.ts new file mode 100644 index 000000000..6ab3d1530 --- /dev/null +++ b/firestore-bigquery-export/scripts/import/src/run-multi-thread.ts @@ -0,0 +1,92 @@ +import * as firebase from "firebase-admin"; +import { cpus } from "os"; +import { pool } from "workerpool"; + +import * as logs from "./logs"; +import { CliConfig } from "./types"; + +/** + * Import data from a collection group in parallel using workers. + */ +export async function runMultiThread(config: CliConfig): Promise { + const maxWorkers = Math.ceil(cpus().length / 2); + const workerPool = pool(__dirname + "/worker.js", { + maxWorkers, + forkOpts: { + env: { + PROJECT_ID: config.projectId, + GOOGLE_CLOUD_PROJECT: config.projectId, + GCLOUD_PROJECT: config.projectId, + ...process.env, + }, + }, + }); + + const query = firebase + .firestore() + .collectionGroup( + config.sourceCollectionPath.split("/")[ + config.sourceCollectionPath.split("/").length - 1 + ] + ); + + const partitionsList = query.getPartitions(config.batchSize); + + let total = 0; + let partitions = 0; + + while (true) { + const inProgressTasks = + workerPool.stats().activeTasks + workerPool.stats().pendingTasks; + if (inProgressTasks >= maxWorkers) { + // A timeout is needed here to stop infinite rechecking of workpool.stats(). + await new Promise((resolve) => setTimeout(resolve, 150, [])); + continue; + } + + // @ts-ignore, iterator not typed correctly. + const { value: partition, done } = await partitionsList.next(); + if (done || !partition) { + break; + } + + partitions++; + + const query = partition.toQuery(); + + const serializedQuery = { + startAt: query._queryOptions.startAt, + endAt: query._queryOptions.endAt, + limit: query._queryOptions.limit, + offset: query._queryOptions.offset, + }; + + workerPool + .exec("processDocuments", [serializedQuery, config]) + .then((count) => { + total += count; + console.log(`${total} documents processed`); + }) + .catch((error) => { + console.error( + "An error has occurred on the following documents, please re-run or insert the following query documents manually...", + JSON.stringify(serializedQuery) + ); + console.error(error); + process.exit(1); + }); + } + + // Wait for all tasks to be complete. + while (workerPool.stats().activeTasks + workerPool.stats().pendingTasks > 0) { + // Return a default promise + // A timeout is needed here to stop infinite rechecking of workpool.stats(). + await new Promise((resolve) => setTimeout(resolve, 150, [])); + } + + await workerPool.terminate(); + + logs.finishedImportingParallel(config, total, partitions); + + return Promise.resolve(total); +} diff --git a/firestore-bigquery-export/scripts/import/src/run-single-thread.ts b/firestore-bigquery-export/scripts/import/src/run-single-thread.ts new file mode 100644 index 000000000..38157d07f --- /dev/null +++ b/firestore-bigquery-export/scripts/import/src/run-single-thread.ts @@ -0,0 +1,78 @@ +import { + ChangeType, + FirestoreDocumentChangeEvent, +} from "@firebaseextensions/firestore-bigquery-change-tracker"; +import { FirestoreBigQueryEventHistoryTracker } from "@firebaseextensions/firestore-bigquery-change-tracker"; +import * as firebase from "firebase-admin"; +import * as fs from "fs"; +import * as util from "util"; + +import { getRowsFromDocs } from "./helper"; +import { CliConfig } from "./types"; + +const write = util.promisify(fs.writeFile); +export function getQuery( + config: CliConfig, + cursor?: firebase.firestore.DocumentSnapshot +): firebase.firestore.Query { + const { sourceCollectionPath, batchSize, queryCollectionGroup } = config; + + let collectionOrCollectionGroup: + | firebase.firestore.CollectionGroup + | firebase.firestore.CollectionReference; + if (queryCollectionGroup) { + collectionOrCollectionGroup = firebase + .firestore() + .collectionGroup( + sourceCollectionPath.split("/")[ + sourceCollectionPath.split("/").length - 1 + ] + ); + } else { + console.log("\x1b[36m%s\x1b[0m", "HERE 1.75"); //cyan + collectionOrCollectionGroup = firebase + .firestore() + .collection(sourceCollectionPath); + } + + let query = collectionOrCollectionGroup.limit(batchSize); + if (cursor) { + console.log("\x1b[36m%s\x1b[0m", "we have cursor"); //cyan + query = query.startAfter(cursor); + } + console.log("\x1b[36m%s\x1b[0m", `QUERY: ${JSON.stringify(query)}`); //cyan + return query; +} + +export async function runSingleThread( + dataSink: FirestoreBigQueryEventHistoryTracker, + config: CliConfig, + cursor: + | firebase.firestore.DocumentSnapshot + | undefined +) { + let totalRowsImported = 0; + + while (true) { + if (cursor) { + await write(config.cursorPositionFile, cursor.ref.path); + } + + let query = getQuery(config, cursor); + + const snapshot = await query.get(); + + const docs = snapshot.docs; + + if (docs.length === 0) { + break; + } + cursor = docs[docs.length - 1]; + + const rows: FirestoreDocumentChangeEvent[] = getRowsFromDocs(docs, config); + + await dataSink.record(rows); + totalRowsImported += rows.length; + } + return totalRowsImported; +} diff --git a/firestore-bigquery-export/scripts/import/src/run.ts b/firestore-bigquery-export/scripts/import/src/run.ts deleted file mode 100644 index cce271549..000000000 --- a/firestore-bigquery-export/scripts/import/src/run.ts +++ /dev/null @@ -1,198 +0,0 @@ -import { cpus } from "os"; -import * as firebase from "firebase-admin"; -import { pool } from "workerpool"; - -import { CliConfig } from "./types"; - -const { BigQuery } = require("@google-cloud/bigquery"); -const bigquery = new BigQuery(); - -import { - ChangeType, - FirestoreBigQueryEventHistoryTracker, - FirestoreDocumentChangeEvent, -} from "@firebaseextensions/firestore-bigquery-change-tracker"; - -/** - * Import data from a collection group in parallel using workers. - */ -async function processCollectionGroup(config: CliConfig): Promise { - const maxWorkers = Math.ceil(cpus().length / 2); - const workerPool = pool(__dirname + "/worker.js", { - maxWorkers, - forkOpts: { - env: { - PROJECT_ID: config.projectId, - GOOGLE_CLOUD_PROJECT: config.projectId, - GCLOUD_PROJECT: config.projectId, - ...process.env, - }, - }, - }); - - const query = firebase - .firestore() - .collectionGroup(config.sourceCollectionPath); - - const partitionsList = query.getPartitions(config.batchSize); - - let total = 0; - let partitions = 0; - - while (true) { - const inProgressTasks = - workerPool.stats().activeTasks + workerPool.stats().pendingTasks; - if (inProgressTasks >= maxWorkers) { - // A timeout is needed here to stop infinite rechecking of workpool.stats(). - await new Promise((resolve) => setTimeout(resolve, 150, [])); - continue; - } - - // @ts-ignore, iterator not typed correctly. - const { value: partition, done } = await partitionsList.next(); - if (done || !partition) { - break; - } - - partitions++; - - const query = partition.toQuery(); - - const serializedQuery = { - startAt: query._queryOptions.startAt, - endAt: query._queryOptions.endAt, - limit: query._queryOptions.limit, - offset: query._queryOptions.offset, - }; - - workerPool - .exec("processDocuments", [serializedQuery, config]) - .then((count) => { - total += count; - console.log(`${total} documents processed`); - }) - .catch((error) => { - console.error( - "An error has occurred on the following documents, please re-run or insert the following query documents manually...", - JSON.stringify(serializedQuery) - ); - console.error(error); - process.exit(1); - }); - } - - // Wait for all tasks to be complete. - while (workerPool.stats().activeTasks + workerPool.stats().pendingTasks > 0) { - // Return a default promise - // A timeout is needed here to stop infinite rechecking of workpool.stats(). - await new Promise((resolve) => setTimeout(resolve, 150, [])); - } - - await workerPool.terminate(); - - console.log(`Imported ${total} documents in ${partitions} partitions.`); - - console.log("---------------------------------------------------------"); - console.log( - `Please see https://console.cloud.google.com/bigquery?p=${config.projectId}&d=${config.datasetId}&t=${config.tableId}_raw_changelog&page=table` - ); - console.log("---------------------------------------------------------"); - - return Promise.resolve(total); -} - -/** - * Batch import data from a collection. - */ -async function processCollection(config: CliConfig): Promise { - let total = 0; - let batches = 0; - let lastDocument = null; - let lastBatchSize: number = config.batchSize; - - while (lastBatchSize === config.batchSize) { - batches++; - let query = firebase - .firestore() - .collection(config.sourceCollectionPath) - .limit(config.batchSize); - - if (lastDocument != null) { - query = query.startAfter(lastDocument); - } - - const snapshot = await query.get(); - const { docs } = snapshot; - - const dataSink = new FirestoreBigQueryEventHistoryTracker({ - tableId: config.tableId, - datasetId: config.datasetId, - datasetLocation: config.datasetLocation, - }); - - const rows: FirestoreDocumentChangeEvent = docs.map((document) => { - return { - timestamp: new Date().toISOString(), - operation: ChangeType.IMPORT, - documentName: `projects/${config.projectId}/databases/(default)/documents/${document.ref.path}`, - documentId: document.id, - eventId: "", - data: document.data(), - }; - }); - - await dataSink.record(rows); - - if (docs.length) { - lastDocument = docs[docs.length - 1]; - } - lastBatchSize = docs.length; - total += docs.length; - } - - console.log(`Imported ${total} documents in ${batches} batches.`); - - return total; -} - -export async function runMultiThread( - config: CliConfig, - rawChangeLogName: string -): Promise { - const { - sourceCollectionPath, - datasetId, - tableId, - queryCollectionGroup, - datasetLocation, - } = config; - - const dataset = bigquery.dataset(datasetId, { - location: datasetLocation, - }); - - const table = dataset.table(rawChangeLogName); - const [exists] = await table.exists(); - - const dataSink = new FirestoreBigQueryEventHistoryTracker({ - tableId, - datasetId, - datasetLocation, - }); - - await dataSink.initialize(); - if (!exists) { - console.log("Wait a few seconds for the dataset to initialize..."); - await new Promise((resolve) => setTimeout(resolve, 5000, [])); // Wait for the dataset to initialize - } - - console.log( - `Importing data from Cloud Firestore Collection${ - queryCollectionGroup ? " (via a Collection Group query)" : "" - }: ${sourceCollectionPath}, to BigQuery Dataset: ${datasetId}, Table: ${rawChangeLogName}` - ); - - return queryCollectionGroup - ? processCollectionGroup(config) - : processCollection(config); -} diff --git a/firestore-bigquery-export/scripts/import/src/types.ts b/firestore-bigquery-export/scripts/import/src/types.ts index d394cabbb..15c205729 100644 --- a/firestore-bigquery-export/scripts/import/src/types.ts +++ b/firestore-bigquery-export/scripts/import/src/types.ts @@ -12,6 +12,8 @@ export interface CliConfig { multiThreaded: boolean; useNewSnapshotQuerySyntax: boolean; useEmulator: boolean; + rawChangeLogName: string; + cursorPositionFile: string; } export interface CliConfigError { diff --git a/firestore-bigquery-export/scripts/import/src/worker.ts b/firestore-bigquery-export/scripts/import/src/worker.ts index 01b0662c8..7ee5692fe 100644 --- a/firestore-bigquery-export/scripts/import/src/worker.ts +++ b/firestore-bigquery-export/scripts/import/src/worker.ts @@ -1,12 +1,13 @@ -import * as firebase from "firebase-admin"; -import { CliConfig, SerializableQuery, QueryOptions } from "./types"; -import { worker } from "workerpool"; - import { ChangeType, FirestoreBigQueryEventHistoryTracker, FirestoreDocumentChangeEvent, } from "@firebaseextensions/firestore-bigquery-change-tracker"; +import * as firebase from "firebase-admin"; +import { worker } from "workerpool"; +import { getRowsFromDocs } from "./helper"; + +import { CliConfig, QueryOptions, SerializableQuery } from "./types"; async function processDocuments( serializableQuery: SerializableQuery, @@ -30,7 +31,11 @@ async function processDocuments( const query = firebase .firestore() - .collectionGroup(sourceCollectionPath) + .collectionGroup( + sourceCollectionPath.split("/")[ + sourceCollectionPath.split("/").length - 1 + ] + ) .orderBy(firebase.firestore.FieldPath.documentId(), "asc") as QueryOptions; query._queryOptions.startAt = serializableQuery.startAt; @@ -52,16 +57,7 @@ async function processDocuments( datasetLocation, }); - const rows: FirestoreDocumentChangeEvent = docs.map((document) => { - return { - timestamp: new Date().toISOString(), - operation: ChangeType.IMPORT, - documentName: `projects/${projectId}/databases/(default)/documents/${document.ref.path}`, - documentId: document.id, - eventId: "", - data: document.data(), - }; - }); + const rows: FirestoreDocumentChangeEvent = getRowsFromDocs(docs, config); await dataSink.record(rows); return rows.length;