Skip to content

Commit

Permalink
fix: log data and old data on fail to enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Sep 18, 2024
1 parent 09ec773 commit fdaaef3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 37 deletions.
97 changes: 62 additions & 35 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,52 +97,79 @@ export const fsexportbigquery = functions
.document(config.collectionPath)
.onWrite(async (change, context) => {
logs.start();
try {
const changeType = getChangeType(change);
const documentId = getDocumentId(change);

const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;
/**
* enqueue data cannot currently handle documentdata
* Serialize early before queueing in clopud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
const changeType = getChangeType(change);
const documentId = getDocumentId(change);

const data = isDeleted ? undefined : change.after.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before.data();
const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;

await events.recordStartEvent({
documentId,
changeType,
before: {
data: change.before.data(),
},
after: {
data: change.after.data(),
},
context: context.resource,
});
const data = isDeleted ? undefined : change.after.data();
const oldData =
isCreated || config.excludeOldData ? undefined : change.before.data();

let serializedData: any;
let serializedOldData: any;

try {
serializedData = eventTracker.serializeData(data);
} catch (err) {
functions.logger.error("failed to serialize data", err);
throw err;
}
try {
serializedOldData = eventTracker.serializeData(oldData);
} catch (err) {
functions.logger.error("failed to serialize old data", err);
throw err;
}

try {
await events
.recordStartEvent({
documentId,
changeType,
before: {
data: change.before.data(),
},
after: {
data: change.after.data(),
},
context: context.resource,
})
.catch((err) => {
functions.logger.error("failed to record start event", err);

throw err;
});

const queue = getFunctions().taskQueue(
`locations/${config.location}/functions/syncBigQuery`,
config.instanceId
);

/**
* enqueue data cannot currently handle documentdata
* Serialize early before queueing in clopud task
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
*/
const seializedData = eventTracker.serializeData(data);
const serializedOldData = eventTracker.serializeData(oldData);

await queue.enqueue({
context,
changeType,
documentId,
data: seializedData,
oldData: serializedOldData,
});
await queue
.enqueue({
context,
changeType,
documentId,
data: serializedData,
oldData: serializedOldData,
})
.catch((err) => {
functions.logger.error("failed to enqueue task", err);

throw err;
});
} catch (err) {
await events.recordErrorEvent(err as Error);
logs.error(err);

logs.error(err, serializedData, serializedOldData);
const eventAgeMs = Date.now() - Date.parse(context.timestamp);
const eventMaxAgeMs = 10000;

Expand Down
8 changes: 6 additions & 2 deletions firestore-bigquery-export/functions/src/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ export const dataTypeInvalid = (
);
};

export const error = (err: Error) => {
logger.error("Error when mirroring data to BigQuery", err);
export const error = (err: Error, data, oldData) => {
logger.error("Error when mirroring data to BigQuery", {
error: err,
data,
oldData,
});
};

export const init = () => {
Expand Down

0 comments on commit fdaaef3

Please sign in to comment.