Skip to content

Commit

Permalink
fix(firestore-bigquery-export): add path_params to multi-thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Oct 20, 2023
1 parent 97c056b commit 0241872
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 80 deletions.
83 changes: 83 additions & 0 deletions firestore-bigquery-export/scripts/import/src/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ import { BigQuery } from "@google-cloud/bigquery";

import { CliConfig } from "./types";

import {
ChangeType,
FirestoreDocumentChangeEvent,
} from "@firebaseextensions/firestore-bigquery-change-tracker";
import * as firebase from "firebase-admin";
import * as fs from "fs";
import * as util from "util";

import { resolveWildcardIds } from "./config";

const write = util.promisify(fs.writeFile);

const FIRESTORE_DEFAULT_DATABASE = "(default)";

// TODO: do we need this logic?
export const initializeDataSink = async (
dataSink: FirestoreBigQueryEventHistoryTracker,
Expand All @@ -18,3 +32,72 @@ export const initializeDataSink = async (
await new Promise((resolve) => setTimeout(resolve, 5000, [])); // Wait for the dataset to initialize
}
};

export function getRowsFromDocs(
docs: firebase.firestore.QueryDocumentSnapshot<firebase.firestore.DocumentData>[],
config: CliConfig
): FirestoreDocumentChangeEvent[] {
let rows: FirestoreDocumentChangeEvent[] = [];

const templateSegments = config.sourceCollectionPath.split("/");

if (config.queryCollectionGroup && templateSegments.length > 1) {
for (const doc of docs) {
let pathParams = {};
const path = doc.ref.path;

const pathSegments = path.split("/");
const isSameLength = pathSegments.length === templateSegments.length + 1;

if (isSameLength) {
let isMatch = true;
for (let i = 0; i < templateSegments.length; i++) {
if (
templateSegments[i].startsWith("{") &&
templateSegments[i].endsWith("}")
) {
const key = templateSegments[i].substring(
1,
templateSegments[i].length - 1
);
const value = pathSegments[i];
pathParams = {
...pathParams,
[key]: value,
};
} else if (templateSegments[i] !== pathSegments[i]) {
isMatch = false;
break;
}
}
if (isMatch) {
rows.push({
timestamp: new Date().toISOString(), // epoch
operation: ChangeType.IMPORT,
documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${path}`,
documentId: doc.id,
pathParams,
eventId: "",
data: doc.data(),
});
}
}
}
} else {
rows = docs.map((snapshot) => {
return {
timestamp: new Date().toISOString(), // epoch
operation: ChangeType.IMPORT,
documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${snapshot.ref.path}`,
documentId: snapshot.id,
pathParams: resolveWildcardIds(
config.sourceCollectionPath,
snapshot.ref.path
),
eventId: "",
data: snapshot.data(),
};
});
}
return rows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,81 +8,13 @@ import * as fs from "fs";
import * as util from "util";

import { resolveWildcardIds } from "./config";
import { getRowsFromDocs } from "./helper";
import { CliConfig } from "./types";

const write = util.promisify(fs.writeFile);

const FIRESTORE_DEFAULT_DATABASE = "(default)";

export function getRowsFromDocs(
docs: firebase.firestore.QueryDocumentSnapshot<firebase.firestore.DocumentData>[],
config: CliConfig
): FirestoreDocumentChangeEvent[] {
let rows: FirestoreDocumentChangeEvent[] = [];

const templateSegments = config.sourceCollectionPath.split("/");

if (config.queryCollectionGroup && templateSegments.length > 1) {
for (const doc of docs) {
let pathParams = {};
const path = doc.ref.path;

const pathSegments = path.split("/");
const isSameLength = pathSegments.length === templateSegments.length + 1;

if (isSameLength) {
let isMatch = true;
for (let i = 0; i < templateSegments.length; i++) {
if (
templateSegments[i].startsWith("{") &&
templateSegments[i].endsWith("}")
) {
const key = templateSegments[i].substring(
1,
templateSegments[i].length - 1
);
const value = pathSegments[i];
pathParams = {
...pathParams,
[key]: value,
};
} else if (templateSegments[i] !== pathSegments[i]) {
isMatch = false;
break;
}
}
if (isMatch) {
rows.push({
timestamp: new Date().toISOString(), // epoch
operation: ChangeType.IMPORT,
documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${path}`,
documentId: doc.id,
pathParams,
eventId: "",
data: doc.data(),
});
}
}
}
} else {
rows = docs.map((snapshot) => {
return {
timestamp: new Date().toISOString(), // epoch
operation: ChangeType.IMPORT,
documentName: `projects/${config.projectId}/databases/${FIRESTORE_DEFAULT_DATABASE}/documents/${snapshot.ref.path}`,
documentId: snapshot.id,
pathParams: resolveWildcardIds(
config.sourceCollectionPath,
snapshot.ref.path
),
eventId: "",
data: snapshot.data(),
};
});
}
return rows;
}

export function getQuery(
config: CliConfig,
cursor?: firebase.firestore.DocumentSnapshot<firebase.firestore.DocumentData>
Expand Down
18 changes: 7 additions & 11 deletions firestore-bigquery-export/scripts/import/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@firebaseextensions/firestore-bigquery-change-tracker";
import * as firebase from "firebase-admin";
import { worker } from "workerpool";
import { getRowsFromDocs } from "./helper";

import { CliConfig, QueryOptions, SerializableQuery } from "./types";

Expand All @@ -30,7 +31,11 @@ async function processDocuments(

const query = firebase
.firestore()
.collectionGroup(sourceCollectionPath)
.collectionGroup(
sourceCollectionPath.split("/")[
sourceCollectionPath.split("/").length - 1
]
)
.orderBy(firebase.firestore.FieldPath.documentId(), "asc") as QueryOptions;

query._queryOptions.startAt = serializableQuery.startAt;
Expand All @@ -52,16 +57,7 @@ async function processDocuments(
datasetLocation,
});

const rows: FirestoreDocumentChangeEvent = docs.map((document) => {
return {
timestamp: new Date().toISOString(),
operation: ChangeType.IMPORT,
documentName: `projects/${projectId}/databases/(default)/documents/${document.ref.path}`,
documentId: document.id,
eventId: "",
data: document.data(),
};
});
const rows: FirestoreDocumentChangeEvent = getRowsFromDocs(docs, config);

await dataSink.record(rows);
return rows.length;
Expand Down

0 comments on commit 0241872

Please sign in to comment.