Skip to content

Commit

Permalink
fix: add incident try catch around base watcher and also added incide…
Browse files Browse the repository at this point in the history
…nt on failure
  • Loading branch information
horpeazy committed Aug 22, 2024
1 parent 127a604 commit ea9811a
Showing 1 changed file with 154 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type NcMetaIO from '../../meta/NcMetaIO';
import type { HookType } from 'nocodb-sdk';
import type { Base } from '../../models';
import type Connection from 'mysql2/typings/mysql/lib/Connection';
import { createIncidentLog } from '../../incidentLogger';

export type PSQLRecordOperationEvent = {
base: Base;
Expand Down Expand Up @@ -398,150 +399,164 @@ export class PSQLRecordOperationWatcher extends EventEmitter {
}

private async _watchBaseInternal(base: Base, rewatch: boolean) {
if (!base.id) {
// TODO: Log this error and report as incident
return;
}

if (base.is_meta) return;

const foundModelData: Record<string, any>[] = await this.ncMeta.metaList2(
base.project_id,
base.id,
MetaTable.MODELS,
{
condition: {
type: 'table',
try {
if (!base.id) {
// TODO: Log this error and report as incident
return;
}

if (base.is_meta) return;

const foundModelData: Record<string, any>[] = await this.ncMeta.metaList2(
base.project_id,
base.id,
MetaTable.MODELS,
{
condition: {
type: 'table',
},
}
);

const models = foundModelData.map(
(foundModelDatum) => new Model(foundModelDatum)
);

const obsoleteModels: Model[] = [];

let skippedModels: Model[] = [];
let newModels: Model[] = [];

let baseData: IBaseData = this.allBaseData.get(base.id);

this.log(`watching base ${base.alias}`);
this.log(`watching base ${base.id}`);

const connectionOptions = (await base.getConnectionConfig()).connection;

const createNewBaseData =
!baseData ||
// if connection options have changed which includes database name, port, host etc.
!isEqual(connectionOptions, baseData.connectionOptions);

if (baseData && createNewBaseData) {
// //////// will never get called as basedata will always be empty at this point
///// and will never dispose all resources
// ////// Needs to be fixed in case connection options change
await this.unwatchBase(baseData.base);
}

if (createNewBaseData) {
const knex = await this.createKnex(base);
baseData = {
base,
models,
knex,
connectionOptions,
};

newModels.push(...models);
} else {
//////// this never even gets used as baseData will be empty
const modelIds = models.map((model) => model.id); /////// //////// // why is tablename being mapped to modelIDs
obsoleteModels.push(
...baseData.models.filter((model) => !modelIds.includes(model.id))
);
const prevModelIds = baseData.models.map((model) => model.id); // this will always be empty, add logs to see intdev tests
newModels.push(
...models.filter((model) => !prevModelIds.includes(model.id))
);

baseData.base = base;
baseData.models = models;
}

/**
* WARNING: do not watch on table ( especially notification table ) created by this class to avoid exaustive loop of death because nocodb will also have a model for the table. If the model is
* watched( an sql trigger registered for it per se ), then a direct insert, update, delete action OR an insertion of notification event from trigger of other tables will cause a
* notification event to be inserted again, which causes another insertion, hence an unending loop.
* Also do not watch audit trail tables except explicitely setup to watch them.
*/
let shouldWatchAuditTables: boolean = false;
if (process.env.SHOULD_WATCH_AUDIT_TABLES === 'true') {
shouldWatchAuditTables = true;
}

[newModels, skippedModels] = newModels.reduce(
(results, newModel) => {
results[
newModel.table_name.startsWith(
this.createSqlIdentifierPrefix(baseData)
) ||
(!shouldWatchAuditTables && newModel.table_name.endsWith('_audit'))
? 1
: 0
].push(newModel);
return results;
},
[[], []]
);

const pickedFields = ['id', 'table_name', 'title'];
this.log(
`watching base : ${base.id} , ${
(await base.getConnectionConfig()).database
}`
);
this.log(
`watched models`,
newModels.map((model) => pick(model, pickedFields))
);
this.log(
'skipped models',
skippedModels.map((model) => pick(model, pickedFields))
);
this.log(
'obsolete models',
obsoleteModels.map((model) => pick(model, pickedFields))
);

// this.log(JSON.stringify({ base }));

await this.setupSQLResources(baseData, newModels);
// this.log('finished seting up sql resources ...');

// this.log('about to consume notifications');
void this.consumeNotifications(baseData);
// this.log('finished consuming notifications ..... ');

if (createNewBaseData || rewatch) {
const connection = await baseData.knex.client.acquireConnection();

// this.log('about to register listeners');
// start watching
// ///////////
await this.registerListeners(baseData, connection);
// this.log('finished registering listeners ...');
}
);

const models = foundModelData.map(
(foundModelDatum) => new Model(foundModelDatum)
);

const obsoleteModels: Model[] = [];

let skippedModels: Model[] = [];
let newModels: Model[] = [];

let baseData: IBaseData = this.allBaseData.get(base.id);

this.log(`watching base ${base.alias}`);
this.log(`watching base ${base.id}`);

const connectionOptions = (await base.getConnectionConfig()).connection;

const createNewBaseData =
!baseData ||
// if connection options have changed which includes database name, port, host etc.
!isEqual(connectionOptions, baseData.connectionOptions);

if (baseData && createNewBaseData) {
// //////// will never get called as basedata will always be empty at this point
///// and will never dispose all resources
// ////// Needs to be fixed in case connection options change
await this.unwatchBase(baseData.base);
}

if (createNewBaseData) {
const knex = await this.createKnex(base);
baseData = {
base,
models,
knex,
connectionOptions,
};

newModels.push(...models);
} else {
//////// this never even gets used as baseData will be empty
const modelIds = models.map((model) => model.id); /////// //////// // why is tablename being mapped to modelIDs
obsoleteModels.push(
...baseData.models.filter((model) => !modelIds.includes(model.id))

// this.log('about to dispose sql resources for obsolete models');
await Promise.all(
obsoleteModels.map((obsoleteModel) =>
this.disposeSQLResourcesForModel(baseData, obsoleteModel)
)
);
const prevModelIds = baseData.models.map((model) => model.id); // this will always be empty, add logs to see intdev tests
newModels.push(
...models.filter((model) => !prevModelIds.includes(model.id))
// this.log('done disposing sql resources for obsolete models ....');

this.allBaseData.set(base.id, baseData);
} catch (err) {
await createIncidentLog(
{
errorMessage: `Could not setup base watcher for base: ${JSON.stringify(base)}: error: ${err?.message}`,
errorStackTrace: err.stack || '',
incidentTime: new Date(),
},
{},
(defaultTitle) => {
return `System triggered - ${defaultTitle}`;
}
);

baseData.base = base;
baseData.models = models;
}

/**
* WARNING: do not watch on table ( especially notification table ) created by this class to avoid exaustive loop of death because nocodb will also have a model for the table. If the model is
* watched( an sql trigger registered for it per se ), then a direct insert, update, delete action OR an insertion of notification event from trigger of other tables will cause a
* notification event to be inserted again, which causes another insertion, hence an unending loop.
* Also do not watch audit trail tables except explicitely setup to watch them.
*/
let shouldWatchAuditTables: boolean = false;
if (process.env.SHOULD_WATCH_AUDIT_TABLES === 'true') {
shouldWatchAuditTables = true;
}

[newModels, skippedModels] = newModels.reduce(
(results, newModel) => {
results[
newModel.table_name.startsWith(
this.createSqlIdentifierPrefix(baseData)
) ||
(!shouldWatchAuditTables && newModel.table_name.endsWith('_audit'))
? 1
: 0
].push(newModel);
return results;
},
[[], []]
);

const pickedFields = ['id', 'table_name', 'title'];
this.log(
`watching base : ${base.id} , ${
(await base.getConnectionConfig()).database
}`
);
this.log(
`watched models`,
newModels.map((model) => pick(model, pickedFields))
);
this.log(
'skipped models',
skippedModels.map((model) => pick(model, pickedFields))
);
this.log(
'obsolete models',
obsoleteModels.map((model) => pick(model, pickedFields))
);

this.log(JSON.stringify({ base }));

await this.setupSQLResources(baseData, newModels);
// this.log('finished seting up sql resources ...');

// this.log('about to consume notifications');
void this.consumeNotifications(baseData);
// this.log('finished consuming notifications ..... ');

if (createNewBaseData || rewatch) {
const connection = await baseData.knex.client.acquireConnection();

// this.log('about to register listeners');
// start watching
// ///////////
await this.registerListeners(baseData, connection);
// this.log('finished registering listeners ...');
}

// this.log('about to dispose sql resources for obsolete models');
await Promise.all(
obsoleteModels.map((obsoleteModel) =>
this.disposeSQLResourcesForModel(baseData, obsoleteModel)
)
);
// this.log('done disposing sql resources for obsolete models ....');

this.allBaseData.set(base.id, baseData);
}

async unwatchBase(base: Base) {
Expand Down

0 comments on commit ea9811a

Please sign in to comment.