diff --git a/packages/nocodb/src/lib/elitesoftwareautomation/db/PSQLRecordOperationWatcher.ts b/packages/nocodb/src/lib/elitesoftwareautomation/db/PSQLRecordOperationWatcher.ts index 237173668a41..03a6fbe4f369 100644 --- a/packages/nocodb/src/lib/elitesoftwareautomation/db/PSQLRecordOperationWatcher.ts +++ b/packages/nocodb/src/lib/elitesoftwareautomation/db/PSQLRecordOperationWatcher.ts @@ -12,6 +12,7 @@ import type NcMetaIO from '../../meta/NcMetaIO'; import type { HookType } from 'nocodb-sdk'; import type { Base } from '../../models'; import type Connection from 'mysql2/typings/mysql/lib/Connection'; +import { createIncidentLog } from '../../incidentLogger'; export type PSQLRecordOperationEvent = { base: Base; @@ -398,150 +399,164 @@ export class PSQLRecordOperationWatcher extends EventEmitter { } private async _watchBaseInternal(base: Base, rewatch: boolean) { - if (!base.id) { - // TODO: Log this error and report as incident - return; - } - - if (base.is_meta) return; - - const foundModelData: Record[] = await this.ncMeta.metaList2( - base.project_id, - base.id, - MetaTable.MODELS, - { - condition: { - type: 'table', + try { + if (!base.id) { + // TODO: Log this error and report as incident + return; + } + + if (base.is_meta) return; + + const foundModelData: Record[] = await this.ncMeta.metaList2( + base.project_id, + base.id, + MetaTable.MODELS, + { + condition: { + type: 'table', + }, + } + ); + + const models = foundModelData.map( + (foundModelDatum) => new Model(foundModelDatum) + ); + + const obsoleteModels: Model[] = []; + + let skippedModels: Model[] = []; + let newModels: Model[] = []; + + let baseData: IBaseData = this.allBaseData.get(base.id); + + this.log(`watching base ${base.alias}`); + this.log(`watching base ${base.id}`); + + const connectionOptions = (await base.getConnectionConfig()).connection; + + const createNewBaseData = + !baseData || + // if connection options have changed which includes database name, port, host etc. + !isEqual(connectionOptions, baseData.connectionOptions); + + if (baseData && createNewBaseData) { + // //////// will never get called as basedata will always be empty at this point + ///// and will never dispose all resources + // ////// Needs to be fixed in case connection options change + await this.unwatchBase(baseData.base); + } + + if (createNewBaseData) { + const knex = await this.createKnex(base); + baseData = { + base, + models, + knex, + connectionOptions, + }; + + newModels.push(...models); + } else { + //////// this never even gets used as baseData will be empty + const modelIds = models.map((model) => model.id); /////// //////// // why is tablename being mapped to modelIDs + obsoleteModels.push( + ...baseData.models.filter((model) => !modelIds.includes(model.id)) + ); + const prevModelIds = baseData.models.map((model) => model.id); // this will always be empty, add logs to see intdev tests + newModels.push( + ...models.filter((model) => !prevModelIds.includes(model.id)) + ); + + baseData.base = base; + baseData.models = models; + } + + /** + * WARNING: do not watch on table ( especially notification table ) created by this class to avoid exaustive loop of death because nocodb will also have a model for the table. If the model is + * watched( an sql trigger registered for it per se ), then a direct insert, update, delete action OR an insertion of notification event from trigger of other tables will cause a + * notification event to be inserted again, which causes another insertion, hence an unending loop. + * Also do not watch audit trail tables except explicitely setup to watch them. + */ + let shouldWatchAuditTables: boolean = false; + if (process.env.SHOULD_WATCH_AUDIT_TABLES === 'true') { + shouldWatchAuditTables = true; + } + + [newModels, skippedModels] = newModels.reduce( + (results, newModel) => { + results[ + newModel.table_name.startsWith( + this.createSqlIdentifierPrefix(baseData) + ) || + (!shouldWatchAuditTables && newModel.table_name.endsWith('_audit')) + ? 1 + : 0 + ].push(newModel); + return results; }, + [[], []] + ); + + const pickedFields = ['id', 'table_name', 'title']; + this.log( + `watching base : ${base.id} , ${ + (await base.getConnectionConfig()).database + }` + ); + this.log( + `watched models`, + newModels.map((model) => pick(model, pickedFields)) + ); + this.log( + 'skipped models', + skippedModels.map((model) => pick(model, pickedFields)) + ); + this.log( + 'obsolete models', + obsoleteModels.map((model) => pick(model, pickedFields)) + ); + + // this.log(JSON.stringify({ base })); + + await this.setupSQLResources(baseData, newModels); + // this.log('finished seting up sql resources ...'); + + // this.log('about to consume notifications'); + void this.consumeNotifications(baseData); + // this.log('finished consuming notifications ..... '); + + if (createNewBaseData || rewatch) { + const connection = await baseData.knex.client.acquireConnection(); + + // this.log('about to register listeners'); + // start watching + // /////////// + await this.registerListeners(baseData, connection); + // this.log('finished registering listeners ...'); } - ); - - const models = foundModelData.map( - (foundModelDatum) => new Model(foundModelDatum) - ); - - const obsoleteModels: Model[] = []; - - let skippedModels: Model[] = []; - let newModels: Model[] = []; - - let baseData: IBaseData = this.allBaseData.get(base.id); - - this.log(`watching base ${base.alias}`); - this.log(`watching base ${base.id}`); - - const connectionOptions = (await base.getConnectionConfig()).connection; - - const createNewBaseData = - !baseData || - // if connection options have changed which includes database name, port, host etc. - !isEqual(connectionOptions, baseData.connectionOptions); - - if (baseData && createNewBaseData) { - // //////// will never get called as basedata will always be empty at this point - ///// and will never dispose all resources - // ////// Needs to be fixed in case connection options change - await this.unwatchBase(baseData.base); - } - - if (createNewBaseData) { - const knex = await this.createKnex(base); - baseData = { - base, - models, - knex, - connectionOptions, - }; - - newModels.push(...models); - } else { - //////// this never even gets used as baseData will be empty - const modelIds = models.map((model) => model.id); /////// //////// // why is tablename being mapped to modelIDs - obsoleteModels.push( - ...baseData.models.filter((model) => !modelIds.includes(model.id)) + + // this.log('about to dispose sql resources for obsolete models'); + await Promise.all( + obsoleteModels.map((obsoleteModel) => + this.disposeSQLResourcesForModel(baseData, obsoleteModel) + ) ); - const prevModelIds = baseData.models.map((model) => model.id); // this will always be empty, add logs to see intdev tests - newModels.push( - ...models.filter((model) => !prevModelIds.includes(model.id)) + // this.log('done disposing sql resources for obsolete models ....'); + + this.allBaseData.set(base.id, baseData); + } catch (err) { + await createIncidentLog( + { + errorMessage: `Could not setup base watcher for base: ${JSON.stringify(base)}: error: ${err?.message}`, + errorStackTrace: err.stack || '', + incidentTime: new Date(), + }, + {}, + (defaultTitle) => { + return `System triggered - ${defaultTitle}`; + } ); - - baseData.base = base; - baseData.models = models; } - - /** - * WARNING: do not watch on table ( especially notification table ) created by this class to avoid exaustive loop of death because nocodb will also have a model for the table. If the model is - * watched( an sql trigger registered for it per se ), then a direct insert, update, delete action OR an insertion of notification event from trigger of other tables will cause a - * notification event to be inserted again, which causes another insertion, hence an unending loop. - * Also do not watch audit trail tables except explicitely setup to watch them. - */ - let shouldWatchAuditTables: boolean = false; - if (process.env.SHOULD_WATCH_AUDIT_TABLES === 'true') { - shouldWatchAuditTables = true; - } - - [newModels, skippedModels] = newModels.reduce( - (results, newModel) => { - results[ - newModel.table_name.startsWith( - this.createSqlIdentifierPrefix(baseData) - ) || - (!shouldWatchAuditTables && newModel.table_name.endsWith('_audit')) - ? 1 - : 0 - ].push(newModel); - return results; - }, - [[], []] - ); - - const pickedFields = ['id', 'table_name', 'title']; - this.log( - `watching base : ${base.id} , ${ - (await base.getConnectionConfig()).database - }` - ); - this.log( - `watched models`, - newModels.map((model) => pick(model, pickedFields)) - ); - this.log( - 'skipped models', - skippedModels.map((model) => pick(model, pickedFields)) - ); - this.log( - 'obsolete models', - obsoleteModels.map((model) => pick(model, pickedFields)) - ); - - this.log(JSON.stringify({ base })); - - await this.setupSQLResources(baseData, newModels); - // this.log('finished seting up sql resources ...'); - - // this.log('about to consume notifications'); - void this.consumeNotifications(baseData); - // this.log('finished consuming notifications ..... '); - - if (createNewBaseData || rewatch) { - const connection = await baseData.knex.client.acquireConnection(); - - // this.log('about to register listeners'); - // start watching - // /////////// - await this.registerListeners(baseData, connection); - // this.log('finished registering listeners ...'); - } - - // this.log('about to dispose sql resources for obsolete models'); - await Promise.all( - obsoleteModels.map((obsoleteModel) => - this.disposeSQLResourcesForModel(baseData, obsoleteModel) - ) - ); - // this.log('done disposing sql resources for obsolete models ....'); - - this.allBaseData.set(base.id, baseData); } async unwatchBase(base: Base) {