From 009a62398e95c59f9e286860f40a2d4acd20e1e8 Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Thu, 19 Oct 2023 01:02:57 +0530 Subject: [PATCH 1/5] fix: multiple ingestion --- .../query-builder/query-builder.service.ts | 74 +++++++++++++++---- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.ts b/impl/c-qube/src/services/query-builder/query-builder.service.ts index df36f1ba..e76e1ce8 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.ts @@ -45,13 +45,19 @@ export class QueryBuilderService { } generateCreateStatement(schema: JSONSchema4, autoPrimaryKey = false): string { + // console.log('schema: ', schema); const tableName = schema.title; const psqlSchema = schema.psql_schema; const primaryKeySegment = autoPrimaryKey ? '\n id SERIAL PRIMARY KEY,' : ''; let createStatement = `CREATE TABLE ${psqlSchema}.${tableName} (${primaryKeySegment}\n`; const properties = schema.properties; + + const propsForUniqueConstraint = []; for (const property in properties) { + if (['date', 'week', 'year', 'month'].includes(property.trim())) { + propsForUniqueConstraint.push(property); + } const column: JSONSchema4 = properties[property]; createStatement += ` ${property} `; if (column.type === 'string' && column.format === 'date-time') { @@ -84,11 +90,31 @@ export class QueryBuilderService { createStatement += '\n);'; if (schema.fk !== undefined) { + // console.log('fk constraints called'); createStatement = this.addFKConstraintDuringCreation( schema, createStatement, ); + + // adding unique constraint + let uniqueStatements = `,\nconstraint unique_${tableName} UNIQUE (`; + schema.fk.forEach((fk: fk) => { + uniqueStatements += `${fk.column}, `; + }); + propsForUniqueConstraint.forEach((prop) => { + uniqueStatements += `${prop}, `; + }); + + uniqueStatements = uniqueStatements.slice(0, -2) + ')'; + createStatement = createStatement + .slice(0, -2) + .concat(uniqueStatements) + .concat(');'); + console.log('sql:', createStatement); + // console.log('schema: ', schema); } + + // console.log('create statement: ', createStatement); return this.cleanStatement(createStatement); } @@ -100,8 +126,6 @@ export class QueryBuilderService { for (const index of indexes) { for (const column of index.columns) { - // if no indexes are specified in grammar, skip - if (column.length === 0) continue; const indexName = `${schema.title}_${column.join('_')}_idx`; const columns = column.join(', '); const statement = `CREATE INDEX ${indexName} ON ${psqlSchema}.${schema.title} (${columns});`; @@ -137,6 +161,7 @@ export class QueryBuilderService { const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join( ', ', )}) VALUES (${values.join(', ')});`; + // ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum=sum+EXCLUDED.sum, count=count+EXCLUDED.count, avg=(sum+EXCLUDED.sum)/(count+EXCLUDED.count);`; return this.cleanStatement(query); } @@ -173,9 +198,18 @@ export class QueryBuilderService { const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join( ', ', )}) VALUES ${values.join(', ')};`; + // console.log('insert statement: ', query); return this.cleanStatement(query); } + addOnConflictStatement(tableName: string, query: string): string { + return query + .slice(0, -1) + .concat( + ` ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `, + ); + } + generateBulkInsertStatement(schema: JSONSchema4, data: any[]): string { const tableName = schema.title; const psqlSchema = schema.psql_schema; @@ -192,10 +226,10 @@ export class QueryBuilderService { fields.push(property); } - const tempTableName = `temp_${tableName}`; - const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName});`; + const tempTableName = `temp_${tableName} `; + const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName}); `; queries.push(createTempTable); - const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY;`; + const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY; `; queries.push(autoGen); const rows = []; let id = 1; @@ -220,7 +254,7 @@ export class QueryBuilderService { const insertTempTable = `INSERT INTO ${tempTableName} (${tempTableFields.join( ', ', )}) VALUES `; - const insertTempTableRows = `${insertTempTable}${rows.join(', ')};`; + const insertTempTableRows = `${insertTempTable}${rows.join(', ')}; `; queries.push(this.cleanStatement(insertTempTable)); let joinStatements = ''; let whereStatements = ''; @@ -230,7 +264,7 @@ export class QueryBuilderService { const referenceTable = fk.reference.table; const referenceColumn = fk.reference.column; const childColumn = fk.column; - joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn}`; + joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn} `; whereStatements += ` AND dimensions.${referenceTable}.${childColumn} IS NOT NULL`; }); } @@ -239,17 +273,18 @@ export class QueryBuilderService { ', ', )}) SELECT ${fields - .map((field) => `${tempTableName}.${field}`) - .join(', ')} FROM ${tempTableName} + .map((field) => `${tempTableName}.${field}`) + .join(', ')} FROM ${tempTableName} ${joinStatements === '' ? ' ' : joinStatements} - WHERE TRUE${whereStatements === '' ? ' ' : whereStatements};`; + WHERE TRUE${whereStatements === '' ? ' ' : whereStatements} + ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`; queries.push(filteredInsert); - const dropTempTable = `DROP TABLE ${tempTableName};`; + const dropTempTable = `DROP TABLE ${tempTableName}; `; queries.push(dropTempTable); - const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}\n${dropTempTable}`; - // const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}`; + const query = `${createTempTable} \n${insertTempTableRows} \n${filteredInsert} \n${dropTempTable} `; + // const query = `${ createTempTable } \n${ insertTempTableRows } \n${ filteredInsert } `; // if (query.toLowerCase().includes('null')) { // console.log('NULL Query: ', query); // } @@ -258,7 +293,16 @@ export class QueryBuilderService { return this.cleanStatement(query); } - generateUpdateStatement(schema: JSONSchema4, data: any): string[] { - throw new Error('Method not implemented.'); + generateUpdateStatement( + schema: JSONSchema4, + data: any, + where: string, + ): string { + // throw new Error('Method not implemented.'); + return `UPDATE ${schema.schema.psql_schema}.${schema.tableName} +SET sum = sum + ${data.sum}, + count = count + ${data.count - 2 * data.negcount}, + avg = (sum + ${data.sum}) /(count+${data.count - 2 * data.negcount}) +WHERE ${where} `; } } From 1b42f876f1527fd067898215bdfad34e0a3c2498 Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Thu, 19 Oct 2023 22:52:06 +0530 Subject: [PATCH 2/5] feat(test): unit + e2e test for the multiple ingestion bug fix. --- .../query-builder.service.spec.ts | 22 ++++ .../test-fixtures/createStatement.json | 1 + .../test-fixtures/insertStatement.json | 102 ++++++++++++++++++ .../ingestionConfigs/config.multiple.json | 34 ++++++ .../outputDatasets/multiple_first.json | 51 +++++++++ .../outputDatasets/multiple_second.json | 51 +++++++++ .../csv-adapter/csv-adapter.e2e-spec.ts | 42 ++++++++ 7 files changed, 303 insertions(+) create mode 100644 impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json create mode 100644 impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json create mode 100644 impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json create mode 100644 impl/c-qube/test/fixtures/outputDatasets/multiple_first.json create mode 100644 impl/c-qube/test/fixtures/outputDatasets/multiple_second.json diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts b/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts index edf23ee4..48fea205 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts @@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing'; import { JSONSchema4 } from 'json-schema'; import { QueryBuilderService } from './query-builder.service'; +// JSON imports +import * as createStatementInput from './test-fixtures/createStatement.json'; +import * as insertStatementInput from './test-fixtures/insertStatement.json'; + describe('QueryBuilderService', () => { let service: QueryBuilderService; @@ -241,4 +245,22 @@ describe('QueryBuilderService', () => { );`), ); }); + + it('checks for bulk upserting statement', () => { + const res = service.generateBulkInsertStatement( + insertStatementInput.args.schema as any, + insertStatementInput.args.data as any, + ); + + expect(res).toBe(insertStatementInput.output); + }); + + it('checks for create statement with proper unique constraint', async () => { + const res = service.generateCreateStatement( + createStatementInput.args.schema as any, + createStatementInput.args.autoPrimaryKey, + ); + + expect(res).toBe(createStatementInput.output); + }); }); diff --git a/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json b/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json new file mode 100644 index 00000000..1d8186f2 --- /dev/null +++ b/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json @@ -0,0 +1 @@ +{"args":{"schema":{"title":"nishtha_totalmedium_eGFra0JofUpoa0QRHgIC","psql_schema":"datasets","properties":{"state_id":{"type":"string"},"program_name":{"type":"string"},"count":{"type":"number","format":"float"},"sum":{"type":"number","format":"float"},"avg":{"type":"number","format":"float"}},"fk":[{"column":"state_id","reference":{"table":"dimensions.state","column":"state_id"}},{"column":"program_name","reference":{"table":"dimensions.programnishtha","column":"program_name"}}]},"autoPrimaryKey":true},"output":"CREATE TABLE datasets.nishtha_totalmedium_eGFra0JofUpoa0QRHgIC (id SERIAL PRIMARY KEY, state_id VARCHAR, program_name VARCHAR, count FLOAT8, sum FLOAT8, avg FLOAT8,constraint fk_state_id FOREIGN KEY (state_id) REFERENCES dimensions.state(state_id),constraint fk_program_name FOREIGN KEY (program_name) REFERENCES dimensions.programnishtha(program_name),constraint unique_nishtha_totalmedium_eGFra0JofUpoa0QRHgIC UNIQUE (state_id, program_name));"} \ No newline at end of file diff --git a/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json b/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json new file mode 100644 index 00000000..cc5cc00f --- /dev/null +++ b/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json @@ -0,0 +1,102 @@ +{ + "args": { + "schema": { + "fk": [ + { + "column": "gender", + "reference": { "table": "gender", "column": "gender_id" } + }, + { + "column": "district_id", + "reference": { "table": "district", "column": "district_id" } + } + ], + "title": "sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf", + "properties": { + "gender": { "type": "string" }, + "district_id": { "type": "string" }, + "count": { "type": "number", "format": "float" }, + "sum": { "type": "number", "format": "float" }, + "avg": { "type": "number", "format": "float" }, + "week": { "type": "integer" }, + "year": { "type": "integer" } + }, + "psql_schema": "datasets" + }, + "data": [ + { + "count": 120, + "sum": 4200, + "avg": 35, + "week": 9, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4176, + "avg": 34.8, + "week": 8, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4083, + "avg": 34.025, + "week": 6, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 239, + "sum": 8269, + "avg": 34.59832635983263, + "week": 7, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4184, + "avg": 34.86666666666667, + "week": 9, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 120, + "sum": 4111, + "avg": 34.25833333333333, + "week": 8, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 120, + "sum": 4141, + "avg": 34.50833333333333, + "week": 6, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 240, + "sum": 8278, + "avg": 34.49166666666667, + "week": 7, + "year": 2023, + "gender": "female", + "district_id": "101" + } + ] + }, + "output": "CREATE TABLE IF NOT EXISTS temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (LIKE datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf); INSERT INTO temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year, id) VALUES ('male', '101', '120', '4200', '35', '9', '2023', 1), ('male', '101', '120', '4176', '34.8', '8', '2023', 2), ('male', '101', '120', '4083', '34.025', '6', '2023', 3), ('male', '101', '239', '8269', '34.59832635983263', '7', '2023', 4), ('female', '101', '120', '4184', '34.86666666666667', '9', '2023', 5), ('female', '101', '120', '4111', '34.25833333333333', '8', '2023', 6), ('female', '101', '120', '4141', '34.50833333333333', '6', '2023', 7), ('female', '101', '240', '8278', '34.49166666666667', '7', '2023', 8); INSERT INTO datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year) SELECT temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .count, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .sum, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .avg, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .week, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .year FROM temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf LEFT JOIN dimensions.gender ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender = dimensions.gender.gender LEFT JOIN dimensions.district ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id = dimensions.district.district_id WHERE TRUE AND dimensions.gender.gender IS NOT NULL AND dimensions.district.district_id IS NOT NULL ON CONFLICT ON CONSTRAINT unique_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf DO UPDATE SET sum = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum, count = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count, avg = (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum) / (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count); DROP TABLE temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf ;" +} diff --git a/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json b/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json new file mode 100644 index 00000000..2848dbb2 --- /dev/null +++ b/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json @@ -0,0 +1,34 @@ +{ + "globals": { + "onlyCreateWhitelisted": true + }, + "dimensions": { + "namespace": "dimensions", + "fileNameFormat": "${dimensionName}.${index}.dimensions.data.csv", + "input": { + "files": "./ingest/dimensions" + } + }, + "programs": [ + { + "name": "DIKSHA", + "namespace": "diksha", + "description": "DIKSHA", + "shouldIngestToDB": true, + "input": { + "files": "./ingest/programs/diksha" + }, + "./output": { + "location": "./output/programs/diksha" + }, + "dimensions": { + "whitelisted": [ + "state,grade,subject,medium,board", + "textbookdiksha,grade,subject,medium", + "textbookdiksha,grade,subject,medium" + ], + "blacklisted": [] + } + } + ] +} diff --git a/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json b/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json new file mode 100644 index 00000000..967256f1 --- /dev/null +++ b/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json @@ -0,0 +1,51 @@ +[ + { + "grade_diksha": "Class 2", + "avg": 2, + "count": 1, + "sum": 2, + "id": 1 + }, + { + "grade_diksha": "Class 1", + "avg": 2, + "count": 2, + "sum": 4, + "id": 2 + }, + { + "grade_diksha": "Class 6", + "avg": 2.25, + "count": 4, + "sum": 9, + "id": 3 + }, + { + "grade_diksha": "Class 7", + "avg": 1.5, + "count": 2, + "sum": 3, + "id": 4 + }, + { + "grade_diksha": "Class 8", + "avg": 3, + "count": 1, + "sum": 3, + "id": 5 + }, + { + "grade_diksha": "Class 3", + "avg": 3.3333333333333335, + "count": 3, + "sum": 10, + "id": 6 + }, + { + "grade_diksha": "Class 4", + "avg": 1, + "count": 1, + "sum": 1, + "id": 7 + } +] \ No newline at end of file diff --git a/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json b/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json new file mode 100644 index 00000000..5e86e39d --- /dev/null +++ b/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json @@ -0,0 +1,51 @@ +[ + { + "grade_diksha": "Class 2", + "avg": 2, + "count": 2, + "sum": 4, + "id": 1 + }, + { + "grade_diksha": "Class 1", + "avg": 2, + "count": 4, + "sum": 8, + "id": 2 + }, + { + "grade_diksha": "Class 4", + "avg": 1, + "count": 2, + "sum": 2, + "id": 7 + }, + { + "grade_diksha": "Class 8", + "avg": 3, + "count": 2, + "sum": 6, + "id": 5 + }, + { + "grade_diksha": "Class 7", + "avg": 1.5, + "count": 4, + "sum": 6, + "id": 4 + }, + { + "grade_diksha": "Class 3", + "avg": 3.3333333333333335, + "count": 6, + "sum": 20, + "id": 6 + }, + { + "grade_diksha": "Class 6", + "avg": 2.25, + "count": 8, + "sum": 18, + "id": 3 + } +] \ No newline at end of file diff --git a/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts b/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts index e535335f..868bad02 100644 --- a/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts +++ b/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts @@ -12,6 +12,10 @@ import { DimensionGrammarService } from './../../../src/services/csv-adapter/par import { Pool } from 'pg'; import { ConfigModule, ConfigService } from '@nestjs/config'; +// importing JSONs +import * as multiple_first from '../../fixtures/outputDatasets/multiple_first.json'; +import * as multiple_second from '../../fixtures/outputDatasets/multiple_second.json'; + describe('AppController (e2e)', () => { let app: INestApplication; let csvAdapterService: CsvAdapterService; @@ -120,4 +124,42 @@ describe('AppController (e2e)', () => { expect(data).toBeDefined(); expect(data).toEqual([]); }); + + it('should test for multiple ingestion', async () => { + await csvAdapterService.nuke(); + await csvAdapterService.ingest( + './test/fixtures/ingestionConfigs', + 'config.multiple.json', + ); + await csvAdapterService.ingestData({}); + + // check that the values are correct + + let data = await csvAdapterService.prisma.$queryRawUnsafe( + 'SELECT grade_diksha, sum, count, avg FROM datasets.diksha_avg_play_time_in_mins_on_app_and_portal_grade', + ); + // remove id col + + expect(data).toEqual( + expect.arrayContaining( + multiple_first.map((item) => { + delete item.id; + return item; + }), + ), + ); + + await csvAdapterService.ingestData({}); + data = await csvAdapterService.prisma.$queryRawUnsafe( + 'SELECT grade_diksha, sum, count, avg FROM datasets.diksha_avg_play_time_in_mins_on_app_and_portal_grade', + ); + expect(data).toEqual( + expect.arrayContaining( + multiple_second.map((item) => { + delete item.id; + return item; + }), + ), + ); + }); }); From ef6f60bd257e8e09e2e5243881527ba3f32e710f Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Mon, 30 Oct 2023 18:28:40 +0530 Subject: [PATCH 3/5] feat: remove dependency on grammar files during `ingest-data` --- .../migration.sql | 8 + .../migration.sql | 8 + .../migration.sql | 2 + impl/c-qube/prisma/schema.prisma | 2 + .../csv-adapter/csv-adapter.service.ts | 143 +++++++++--------- .../event-grammar/event-grammar.service.ts | 23 +++ .../src/services/event/event.service.ts | 10 ++ .../query-builder/query-builder.service.ts | 2 +- 8 files changed, 126 insertions(+), 72 deletions(-) create mode 100644 impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql create mode 100644 impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql create mode 100644 impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql diff --git a/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql b/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql new file mode 100644 index 00000000..22591ae7 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - Added the required column `metric` to the `EventGrammar` table without a default value. This is not possible if the table is not empty. + +*/ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ADD COLUMN "metric" TEXT NOT NULL; diff --git a/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql b/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql new file mode 100644 index 00000000..a70e7e32 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - Added the required column `egSchema` to the `EventGrammar` table without a default value. This is not possible if the table is not empty. + +*/ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ADD COLUMN "egSchema" JSONB NOT NULL; diff --git a/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql b/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql new file mode 100644 index 00000000..b0d49bd4 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ALTER COLUMN "egSchema" SET DATA TYPE TEXT; diff --git a/impl/c-qube/prisma/schema.prisma b/impl/c-qube/prisma/schema.prisma index d73dcd65..ed1af19c 100644 --- a/impl/c-qube/prisma/schema.prisma +++ b/impl/c-qube/prisma/schema.prisma @@ -72,6 +72,8 @@ model EventGrammar { file String? eventType EventType @default(INTERNAL) DatasetGrammar DatasetGrammar[] + metric String + egSchema String @@schema("spec") } diff --git a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts index e206fcee..e1512022 100644 --- a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts +++ b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts @@ -27,6 +27,7 @@ import { } from './parser/dataset/dataset-grammar.helper'; import { createEventGrammarFromCSVDefinition, + getEGDefFromDB, getEGDefFromFile, } from './parser/event-grammar/event-grammar.service'; import { @@ -494,77 +495,77 @@ export class CsvAdapterService { for (let m = 0; m < compoundDatasetGrammars.length; m++) { promises.push( limit(() => - getEGDefFromFile(compoundDatasetGrammars[m].eventGrammarFile).then( - async (s) => { - const { - instrumentField, - }: { - eventGrammarDef: EventGrammarCSVFormat[]; - instrumentField: string; - } = s; - const compoundEventGrammar: EventGrammar = { - name: '', - description: '', - dimension: [], - instrument_field: instrumentField, - is_active: true, - schema: {}, - instrument: { - type: InstrumentType.COUNTER, - name: 'counter', - }, - }; - const events: Event[] = - await createCompoundDatasetDataToBeInserted( - compoundDatasetGrammars[m].eventGrammarFile.replace( - 'grammar', - 'data', - ), - compoundEventGrammar, - compoundDatasetGrammars[m], - ); - // Create Pipes - const pipe: Pipe = { - event: compoundEventGrammar, - transformer: defaultTransformers[0], - dataset: compoundDatasetGrammars[m], - }; - const transformContext: TransformerContext = { - dataset: compoundDatasetGrammars[m], - events: events, - isChainable: false, - pipeContext: {}, - }; - if (events && events.length > 0) { - const datasetUpdateRequest: DatasetUpdateRequest[] = - pipe.transformer.transformSync( - callback, - transformContext, - events, - ) as DatasetUpdateRequest[]; - - // console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]); - - await this.datasetService - .processDatasetUpdateRequest(datasetUpdateRequest) - .then(() => { - this.logger.verbose( - `Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`, - ); - }) - .catch((e) => { - this.logger.verbose( - `Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`, - ); - }); - } else { - console.error( - 'No relevant events for this dataset', - compoundDatasetGrammars[m].name, - ); - } - }, - ), + getEGDefFromDB( + compoundDatasetGrammars[m].eventGrammarFile, + this.prisma, + ).then(async (s) => { + const { + instrumentField, + }: { + eventGrammarDef: EventGrammarCSVFormat[]; + instrumentField: string; + } = s as any; + const compoundEventGrammar: EventGrammar = { + name: '', + description: '', + dimension: [], + instrument_field: instrumentField, + is_active: true, + schema: {}, + instrument: { + type: InstrumentType.COUNTER, + name: 'counter', + }, + }; + const events: Event[] = await createCompoundDatasetDataToBeInserted( + compoundDatasetGrammars[m].eventGrammarFile.replace( + 'grammar', + 'data', + ), + compoundEventGrammar, + compoundDatasetGrammars[m], + ); + // Create Pipes + const pipe: Pipe = { + event: compoundEventGrammar, + transformer: defaultTransformers[0], + dataset: compoundDatasetGrammars[m], + }; + const transformContext: TransformerContext = { + dataset: compoundDatasetGrammars[m], + events: events, + isChainable: false, + pipeContext: {}, + }; + if (events && events.length > 0) { + const datasetUpdateRequest: DatasetUpdateRequest[] = + pipe.transformer.transformSync( + callback, + transformContext, + events, + ) as DatasetUpdateRequest[]; + + // console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]); + + await this.datasetService + .processDatasetUpdateRequest(datasetUpdateRequest) + .then(() => { + this.logger.verbose( + `Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`, + ); + }) + .catch((e) => { + this.logger.verbose( + `Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`, + ); + }); + } else { + console.error( + 'No relevant events for this dataset', + compoundDatasetGrammars[m].name, + ); + } + }), ), ); } diff --git a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts index e3c1493e..edf3eeae 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts @@ -15,6 +15,7 @@ import { FieldType, } from '../../types/parser'; import { readCSVFile } from '../utils/csvreader'; +import { PrismaService } from 'src/prisma.service'; export async function getEGDefFromFile(csvFilePath: string) { const [ @@ -39,6 +40,28 @@ export async function getEGDefFromFile(csvFilePath: string) { return { eventGrammarDef, instrumentField }; } +export async function getEGDefFromDB( + csvFilePath: string, + prisma: PrismaService, +) { + console.log('csvFilePath: ', csvFilePath); + const metrics = await prisma.eventGrammar.findMany({ + where: { + file: csvFilePath, + }, + select: { + egSchema: true, + metric: true, + }, + }); + console.log('metrics: ', metrics); + + return { + eventGrammarDef: metrics[0]?.egSchema, + instrumentField: metrics[0]?.metric, + }; +} + export const createEventGrammarFromCSVDefinition = async ( csvFilePath: string, dimensionFileBasePath: string, diff --git a/impl/c-qube/src/services/event/event.service.ts b/impl/c-qube/src/services/event/event.service.ts index 84c1adfe..f2064ce0 100644 --- a/impl/c-qube/src/services/event/event.service.ts +++ b/impl/c-qube/src/services/event/event.service.ts @@ -15,6 +15,10 @@ import { QueryBuilderService } from '../query-builder/query-builder.service'; import { DimensionService } from '../dimension/dimension.service'; import { DimensionMapping } from 'src/types/dataset'; import { DimensionGrammar } from 'src/types/dimension'; +import { + getEGDefFromDB, + getEGDefFromFile, +} from '../csv-adapter/parser/event-grammar/event-grammar.service'; @Injectable() export class EventService { @@ -65,6 +69,10 @@ export class EventService { await this.dimensionService.getDimensionGrammaModelByName( eventGrammar.dimension[0].dimension.name.name, ); + const { eventGrammarDef, instrumentField } = await getEGDefFromFile( + eventGrammar.file, + ); + return this.prisma.eventGrammar .create({ data: { @@ -82,6 +90,8 @@ export class EventService { }, }, ]), + metric: eventGrammar.instrument_field, + egSchema: JSON.stringify(eventGrammarDef), instrument: { connect: { name: 'COUNTER', //TODO: Change this to eventGrammar.instrument.name diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.ts b/impl/c-qube/src/services/query-builder/query-builder.service.ts index e76e1ce8..06665f0b 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.ts @@ -110,7 +110,7 @@ export class QueryBuilderService { .slice(0, -2) .concat(uniqueStatements) .concat(');'); - console.log('sql:', createStatement); + // console.log('sql:', createStatement); // console.log('schema: ', schema); } From 88b78ec270557d229e901ba0b2f2e3bf80ef99b1 Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Wed, 1 Nov 2023 02:01:14 +0530 Subject: [PATCH 4/5] fix: unique constraint throwing due to char limit in psql, skipping unavailable files during `ingest-data` --- .env.example => .env.sample | 0 impl/c-qube/.env.sample | 6 ++++++ .../csv-adapter/csv-adapter.service.ts | 3 +-- .../parser/dataset/dataset-grammar.helper.ts | 13 ++++++++++++ .../dimension-grammar.helpers.ts | 7 +++++++ .../dimension-grammar.service.ts | 7 +++++++ .../event-grammar/event-grammar.service.ts | 2 -- .../csv-adapter/parser/utils/csvreader.ts | 13 ++++++++++++ .../query-builder/query-builder.service.ts | 20 +++++++++++++++---- 9 files changed, 63 insertions(+), 8 deletions(-) rename .env.example => .env.sample (100%) create mode 100644 impl/c-qube/.env.sample diff --git a/.env.example b/.env.sample similarity index 100% rename from .env.example rename to .env.sample diff --git a/impl/c-qube/.env.sample b/impl/c-qube/.env.sample new file mode 100644 index 00000000..a4bd3be3 --- /dev/null +++ b/impl/c-qube/.env.sample @@ -0,0 +1,6 @@ +DATABASE_URL="postgres://timescaledb:postgrespassword@localhost:5432/postgres?sslmode=disable" +DB_USERNAME="timescaledb" +DB_HOST="localhost" +DB_NAME="postgres" +DB_PASSWORD="postgrespassword" +DB_PORT="5432" \ No newline at end of file diff --git a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts index e1512022..992b3c6a 100644 --- a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts +++ b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts @@ -28,7 +28,6 @@ import { import { createEventGrammarFromCSVDefinition, getEGDefFromDB, - getEGDefFromFile, } from './parser/event-grammar/event-grammar.service'; import { createCompoundDatasetGrammars, @@ -487,7 +486,7 @@ export class CsvAdapterService { ), ); } - + this.logger.verbose('Ingested single DatasetGrammars'); const compoundDatasetGrammars: DatasetGrammar[] = await this.datasetService.getCompoundDatasetGrammars(filter); diff --git a/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts b/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts index d387ccb3..9169937b 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts @@ -18,6 +18,13 @@ export const createDatasetDataToBeInserted = async ( const filePath = eventGrammar.file.replace('grammar', 'data'); + fs.access(filePath, fs.constants.F_OK, (err) => { + if (err) { + console.error(`File at $${filePath} does not exist`); + return; + } + }); + const df = await readCSV(filePath); if (!df || !df[0]) return; @@ -94,6 +101,12 @@ export const createCompoundDatasetDataToBeInserted = async ( delete properties.year; // checking if the file is empty or not + fs.access(eventFilePath, fs.constants.F_OK, (err) => { + if (err) { + console.error('File does not exist'); + return; + } + }); const stats = fs.statSync(eventFilePath); if (stats.size === 0) { console.log(`File at ${eventFilePath} is empty`); diff --git a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts index f6ea3a7f..39cb07de 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts @@ -6,7 +6,14 @@ export const createDimensionGrammarFromCSVDefinition = async ( csvFilePath: string, readFile: (path: string, encoding: string) => Promise = fs.readFile, ): Promise => { + fs.access(csvFilePath, fs.constants.F_OK, (err) => { + if (err) { + console.error('File does not exist'); + return null; + } + }); const fileContent = await readFile(csvFilePath, 'utf-8'); + const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim()); if (!isValidCSVFormat(row1, row2, row3)) { diff --git a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts index 9594f9a1..3b361daf 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts @@ -15,6 +15,13 @@ export class DimensionGrammarService { async createDimensionGrammarFromCSVDefinition( csvFilePath: string, ): Promise { + try { + await fs.access(csvFilePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${csvFilePath} does not exist`); + return null; + } + const fileContent = await fs.readFile(csvFilePath, 'utf-8'); const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim()); diff --git a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts index edf3eeae..60a833c6 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts @@ -44,7 +44,6 @@ export async function getEGDefFromDB( csvFilePath: string, prisma: PrismaService, ) { - console.log('csvFilePath: ', csvFilePath); const metrics = await prisma.eventGrammar.findMany({ where: { file: csvFilePath, @@ -54,7 +53,6 @@ export async function getEGDefFromDB( metric: true, }, }); - console.log('metrics: ', metrics); return { eventGrammarDef: metrics[0]?.egSchema, diff --git a/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts b/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts index d3ae2962..b9555e45 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts @@ -4,6 +4,13 @@ const fs = require('fs').promises; import * as csv from 'csv-parser'; export async function readCSV(filePath: string): Promise { + try { + await fs.access(filePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${filePath} does not exist`); + return null; + } + return new Promise((resolve, reject) => { const rows: string[][] = []; // TODO: Add checking here @@ -23,6 +30,12 @@ export async function readCSV(filePath: string): Promise { } export async function readCSVFile(filePath: string): Promise { + try { + await fs.access(filePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${filePath} does not exist`); + return null; + } const fileContent = await fs.readFile(filePath, 'utf-8'); return fileContent diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.ts b/impl/c-qube/src/services/query-builder/query-builder.service.ts index 06665f0b..2b7323e0 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.ts @@ -1,7 +1,9 @@ import { Injectable } from '@nestjs/common'; import { JSONSchema4 } from 'json-schema'; +import { hash } from '../../utils/hash'; const fs = require('fs'); +const crypto = require('crypto'); type fk = { column: string; @@ -49,7 +51,7 @@ export class QueryBuilderService { const tableName = schema.title; const psqlSchema = schema.psql_schema; const primaryKeySegment = autoPrimaryKey ? '\n id SERIAL PRIMARY KEY,' : ''; - let createStatement = `CREATE TABLE ${psqlSchema}.${tableName} (${primaryKeySegment}\n`; + let createStatement = `CREATE TABLE IF NOT EXISTS ${psqlSchema}.${tableName} (${primaryKeySegment}\n`; const properties = schema.properties; @@ -97,7 +99,8 @@ export class QueryBuilderService { ); // adding unique constraint - let uniqueStatements = `,\nconstraint unique_${tableName} UNIQUE (`; + const hashedTableName = hash(tableName, 'secret', {}); + let uniqueStatements = `,\nconstraint unique_${hashedTableName} UNIQUE (`; schema.fk.forEach((fk: fk) => { uniqueStatements += `${fk.column}, `; }); @@ -126,6 +129,7 @@ export class QueryBuilderService { for (const index of indexes) { for (const column of index.columns) { + if (column.length === 0) continue; const indexName = `${schema.title}_${column.join('_')}_idx`; const columns = column.join(', '); const statement = `CREATE INDEX ${indexName} ON ${psqlSchema}.${schema.title} (${columns});`; @@ -206,7 +210,11 @@ export class QueryBuilderService { return query .slice(0, -1) .concat( - ` ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `, + ` ON CONFLICT ON CONSTRAINT unique_${hash( + tableName, + 'secret', + {}, + )} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `, ); } @@ -277,7 +285,11 @@ export class QueryBuilderService { .join(', ')} FROM ${tempTableName} ${joinStatements === '' ? ' ' : joinStatements} WHERE TRUE${whereStatements === '' ? ' ' : whereStatements} - ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`; + ON CONFLICT ON CONSTRAINT unique_${hash( + tableName, + 'secret', + {}, + )} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`; queries.push(filteredInsert); From 191f3b8a014d0c6055f59544548ea56acc5b202b Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Wed, 1 Nov 2023 02:03:50 +0530 Subject: [PATCH 5/5] fix: rename `.env.sample` to `.env.example` --- .env.sample => .env.example | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .env.sample => .env.example (100%) diff --git a/.env.sample b/.env.example similarity index 100% rename from .env.sample rename to .env.example