diff --git a/impl/c-qube/.env.sample b/impl/c-qube/.env.sample new file mode 100644 index 00000000..a4bd3be3 --- /dev/null +++ b/impl/c-qube/.env.sample @@ -0,0 +1,6 @@ +DATABASE_URL="postgres://timescaledb:postgrespassword@localhost:5432/postgres?sslmode=disable" +DB_USERNAME="timescaledb" +DB_HOST="localhost" +DB_NAME="postgres" +DB_PASSWORD="postgrespassword" +DB_PORT="5432" \ No newline at end of file diff --git a/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql b/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql new file mode 100644 index 00000000..22591ae7 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030113226_add_metric_in_eg/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - Added the required column `metric` to the `EventGrammar` table without a default value. This is not possible if the table is not empty. + +*/ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ADD COLUMN "metric" TEXT NOT NULL; diff --git a/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql b/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql new file mode 100644 index 00000000..a70e7e32 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030115722_add_eg_schema/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - Added the required column `egSchema` to the `EventGrammar` table without a default value. This is not possible if the table is not empty. + +*/ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ADD COLUMN "egSchema" JSONB NOT NULL; diff --git a/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql b/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql new file mode 100644 index 00000000..b0d49bd4 --- /dev/null +++ b/impl/c-qube/prisma/migrations/20231030115911_change_egschema_datatype/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "spec"."EventGrammar" ALTER COLUMN "egSchema" SET DATA TYPE TEXT; diff --git a/impl/c-qube/prisma/schema.prisma b/impl/c-qube/prisma/schema.prisma index d73dcd65..ed1af19c 100644 --- a/impl/c-qube/prisma/schema.prisma +++ b/impl/c-qube/prisma/schema.prisma @@ -72,6 +72,8 @@ model EventGrammar { file String? eventType EventType @default(INTERNAL) DatasetGrammar DatasetGrammar[] + metric String + egSchema String @@schema("spec") } diff --git a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts index e206fcee..992b3c6a 100644 --- a/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts +++ b/impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts @@ -27,7 +27,7 @@ import { } from './parser/dataset/dataset-grammar.helper'; import { createEventGrammarFromCSVDefinition, - getEGDefFromFile, + getEGDefFromDB, } from './parser/event-grammar/event-grammar.service'; import { createCompoundDatasetGrammars, @@ -486,7 +486,7 @@ export class CsvAdapterService { ), ); } - + this.logger.verbose('Ingested single DatasetGrammars'); const compoundDatasetGrammars: DatasetGrammar[] = await this.datasetService.getCompoundDatasetGrammars(filter); @@ -494,77 +494,77 @@ export class CsvAdapterService { for (let m = 0; m < compoundDatasetGrammars.length; m++) { promises.push( limit(() => - getEGDefFromFile(compoundDatasetGrammars[m].eventGrammarFile).then( - async (s) => { - const { - instrumentField, - }: { - eventGrammarDef: EventGrammarCSVFormat[]; - instrumentField: string; - } = s; - const compoundEventGrammar: EventGrammar = { - name: '', - description: '', - dimension: [], - instrument_field: instrumentField, - is_active: true, - schema: {}, - instrument: { - type: InstrumentType.COUNTER, - name: 'counter', - }, - }; - const events: Event[] = - await createCompoundDatasetDataToBeInserted( - compoundDatasetGrammars[m].eventGrammarFile.replace( - 'grammar', - 'data', - ), - compoundEventGrammar, - compoundDatasetGrammars[m], - ); - // Create Pipes - const pipe: Pipe = { - event: compoundEventGrammar, - transformer: defaultTransformers[0], - dataset: compoundDatasetGrammars[m], - }; - const transformContext: TransformerContext = { - dataset: compoundDatasetGrammars[m], - events: events, - isChainable: false, - pipeContext: {}, - }; - if (events && events.length > 0) { - const datasetUpdateRequest: DatasetUpdateRequest[] = - pipe.transformer.transformSync( - callback, - transformContext, - events, - ) as DatasetUpdateRequest[]; - - // console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]); - - await this.datasetService - .processDatasetUpdateRequest(datasetUpdateRequest) - .then(() => { - this.logger.verbose( - `Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`, - ); - }) - .catch((e) => { - this.logger.verbose( - `Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`, - ); - }); - } else { - console.error( - 'No relevant events for this dataset', - compoundDatasetGrammars[m].name, - ); - } - }, - ), + getEGDefFromDB( + compoundDatasetGrammars[m].eventGrammarFile, + this.prisma, + ).then(async (s) => { + const { + instrumentField, + }: { + eventGrammarDef: EventGrammarCSVFormat[]; + instrumentField: string; + } = s as any; + const compoundEventGrammar: EventGrammar = { + name: '', + description: '', + dimension: [], + instrument_field: instrumentField, + is_active: true, + schema: {}, + instrument: { + type: InstrumentType.COUNTER, + name: 'counter', + }, + }; + const events: Event[] = await createCompoundDatasetDataToBeInserted( + compoundDatasetGrammars[m].eventGrammarFile.replace( + 'grammar', + 'data', + ), + compoundEventGrammar, + compoundDatasetGrammars[m], + ); + // Create Pipes + const pipe: Pipe = { + event: compoundEventGrammar, + transformer: defaultTransformers[0], + dataset: compoundDatasetGrammars[m], + }; + const transformContext: TransformerContext = { + dataset: compoundDatasetGrammars[m], + events: events, + isChainable: false, + pipeContext: {}, + }; + if (events && events.length > 0) { + const datasetUpdateRequest: DatasetUpdateRequest[] = + pipe.transformer.transformSync( + callback, + transformContext, + events, + ) as DatasetUpdateRequest[]; + + // console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]); + + await this.datasetService + .processDatasetUpdateRequest(datasetUpdateRequest) + .then(() => { + this.logger.verbose( + `Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`, + ); + }) + .catch((e) => { + this.logger.verbose( + `Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`, + ); + }); + } else { + console.error( + 'No relevant events for this dataset', + compoundDatasetGrammars[m].name, + ); + } + }), ), ); } diff --git a/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts b/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts index d387ccb3..9169937b 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dataset/dataset-grammar.helper.ts @@ -18,6 +18,13 @@ export const createDatasetDataToBeInserted = async ( const filePath = eventGrammar.file.replace('grammar', 'data'); + fs.access(filePath, fs.constants.F_OK, (err) => { + if (err) { + console.error(`File at $${filePath} does not exist`); + return; + } + }); + const df = await readCSV(filePath); if (!df || !df[0]) return; @@ -94,6 +101,12 @@ export const createCompoundDatasetDataToBeInserted = async ( delete properties.year; // checking if the file is empty or not + fs.access(eventFilePath, fs.constants.F_OK, (err) => { + if (err) { + console.error('File does not exist'); + return; + } + }); const stats = fs.statSync(eventFilePath); if (stats.size === 0) { console.log(`File at ${eventFilePath} is empty`); diff --git a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts index f6ea3a7f..39cb07de 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.helpers.ts @@ -6,7 +6,14 @@ export const createDimensionGrammarFromCSVDefinition = async ( csvFilePath: string, readFile: (path: string, encoding: string) => Promise = fs.readFile, ): Promise => { + fs.access(csvFilePath, fs.constants.F_OK, (err) => { + if (err) { + console.error('File does not exist'); + return null; + } + }); const fileContent = await readFile(csvFilePath, 'utf-8'); + const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim()); if (!isValidCSVFormat(row1, row2, row3)) { diff --git a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts index 9594f9a1..3b361daf 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/dimension-grammar/dimension-grammar.service.ts @@ -15,6 +15,13 @@ export class DimensionGrammarService { async createDimensionGrammarFromCSVDefinition( csvFilePath: string, ): Promise { + try { + await fs.access(csvFilePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${csvFilePath} does not exist`); + return null; + } + const fileContent = await fs.readFile(csvFilePath, 'utf-8'); const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim()); diff --git a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts index e3c1493e..60a833c6 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/event-grammar/event-grammar.service.ts @@ -15,6 +15,7 @@ import { FieldType, } from '../../types/parser'; import { readCSVFile } from '../utils/csvreader'; +import { PrismaService } from 'src/prisma.service'; export async function getEGDefFromFile(csvFilePath: string) { const [ @@ -39,6 +40,26 @@ export async function getEGDefFromFile(csvFilePath: string) { return { eventGrammarDef, instrumentField }; } +export async function getEGDefFromDB( + csvFilePath: string, + prisma: PrismaService, +) { + const metrics = await prisma.eventGrammar.findMany({ + where: { + file: csvFilePath, + }, + select: { + egSchema: true, + metric: true, + }, + }); + + return { + eventGrammarDef: metrics[0]?.egSchema, + instrumentField: metrics[0]?.metric, + }; +} + export const createEventGrammarFromCSVDefinition = async ( csvFilePath: string, dimensionFileBasePath: string, diff --git a/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts b/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts index d3ae2962..b9555e45 100644 --- a/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts +++ b/impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts @@ -4,6 +4,13 @@ const fs = require('fs').promises; import * as csv from 'csv-parser'; export async function readCSV(filePath: string): Promise { + try { + await fs.access(filePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${filePath} does not exist`); + return null; + } + return new Promise((resolve, reject) => { const rows: string[][] = []; // TODO: Add checking here @@ -23,6 +30,12 @@ export async function readCSV(filePath: string): Promise { } export async function readCSVFile(filePath: string): Promise { + try { + await fs.access(filePath, fs.constants.F_OK); + } catch (err) { + console.error(`File at: ${filePath} does not exist`); + return null; + } const fileContent = await fs.readFile(filePath, 'utf-8'); return fileContent diff --git a/impl/c-qube/src/services/event/event.service.ts b/impl/c-qube/src/services/event/event.service.ts index 84c1adfe..f2064ce0 100644 --- a/impl/c-qube/src/services/event/event.service.ts +++ b/impl/c-qube/src/services/event/event.service.ts @@ -15,6 +15,10 @@ import { QueryBuilderService } from '../query-builder/query-builder.service'; import { DimensionService } from '../dimension/dimension.service'; import { DimensionMapping } from 'src/types/dataset'; import { DimensionGrammar } from 'src/types/dimension'; +import { + getEGDefFromDB, + getEGDefFromFile, +} from '../csv-adapter/parser/event-grammar/event-grammar.service'; @Injectable() export class EventService { @@ -65,6 +69,10 @@ export class EventService { await this.dimensionService.getDimensionGrammaModelByName( eventGrammar.dimension[0].dimension.name.name, ); + const { eventGrammarDef, instrumentField } = await getEGDefFromFile( + eventGrammar.file, + ); + return this.prisma.eventGrammar .create({ data: { @@ -82,6 +90,8 @@ export class EventService { }, }, ]), + metric: eventGrammar.instrument_field, + egSchema: JSON.stringify(eventGrammarDef), instrument: { connect: { name: 'COUNTER', //TODO: Change this to eventGrammar.instrument.name diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts b/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts index edf23ee4..48fea205 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.spec.ts @@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing'; import { JSONSchema4 } from 'json-schema'; import { QueryBuilderService } from './query-builder.service'; +// JSON imports +import * as createStatementInput from './test-fixtures/createStatement.json'; +import * as insertStatementInput from './test-fixtures/insertStatement.json'; + describe('QueryBuilderService', () => { let service: QueryBuilderService; @@ -241,4 +245,22 @@ describe('QueryBuilderService', () => { );`), ); }); + + it('checks for bulk upserting statement', () => { + const res = service.generateBulkInsertStatement( + insertStatementInput.args.schema as any, + insertStatementInput.args.data as any, + ); + + expect(res).toBe(insertStatementInput.output); + }); + + it('checks for create statement with proper unique constraint', async () => { + const res = service.generateCreateStatement( + createStatementInput.args.schema as any, + createStatementInput.args.autoPrimaryKey, + ); + + expect(res).toBe(createStatementInput.output); + }); }); diff --git a/impl/c-qube/src/services/query-builder/query-builder.service.ts b/impl/c-qube/src/services/query-builder/query-builder.service.ts index df36f1ba..2b7323e0 100644 --- a/impl/c-qube/src/services/query-builder/query-builder.service.ts +++ b/impl/c-qube/src/services/query-builder/query-builder.service.ts @@ -1,7 +1,9 @@ import { Injectable } from '@nestjs/common'; import { JSONSchema4 } from 'json-schema'; +import { hash } from '../../utils/hash'; const fs = require('fs'); +const crypto = require('crypto'); type fk = { column: string; @@ -45,13 +47,19 @@ export class QueryBuilderService { } generateCreateStatement(schema: JSONSchema4, autoPrimaryKey = false): string { + // console.log('schema: ', schema); const tableName = schema.title; const psqlSchema = schema.psql_schema; const primaryKeySegment = autoPrimaryKey ? '\n id SERIAL PRIMARY KEY,' : ''; - let createStatement = `CREATE TABLE ${psqlSchema}.${tableName} (${primaryKeySegment}\n`; + let createStatement = `CREATE TABLE IF NOT EXISTS ${psqlSchema}.${tableName} (${primaryKeySegment}\n`; const properties = schema.properties; + + const propsForUniqueConstraint = []; for (const property in properties) { + if (['date', 'week', 'year', 'month'].includes(property.trim())) { + propsForUniqueConstraint.push(property); + } const column: JSONSchema4 = properties[property]; createStatement += ` ${property} `; if (column.type === 'string' && column.format === 'date-time') { @@ -84,11 +92,32 @@ export class QueryBuilderService { createStatement += '\n);'; if (schema.fk !== undefined) { + // console.log('fk constraints called'); createStatement = this.addFKConstraintDuringCreation( schema, createStatement, ); + + // adding unique constraint + const hashedTableName = hash(tableName, 'secret', {}); + let uniqueStatements = `,\nconstraint unique_${hashedTableName} UNIQUE (`; + schema.fk.forEach((fk: fk) => { + uniqueStatements += `${fk.column}, `; + }); + propsForUniqueConstraint.forEach((prop) => { + uniqueStatements += `${prop}, `; + }); + + uniqueStatements = uniqueStatements.slice(0, -2) + ')'; + createStatement = createStatement + .slice(0, -2) + .concat(uniqueStatements) + .concat(');'); + // console.log('sql:', createStatement); + // console.log('schema: ', schema); } + + // console.log('create statement: ', createStatement); return this.cleanStatement(createStatement); } @@ -100,7 +129,6 @@ export class QueryBuilderService { for (const index of indexes) { for (const column of index.columns) { - // if no indexes are specified in grammar, skip if (column.length === 0) continue; const indexName = `${schema.title}_${column.join('_')}_idx`; const columns = column.join(', '); @@ -137,6 +165,7 @@ export class QueryBuilderService { const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join( ', ', )}) VALUES (${values.join(', ')});`; + // ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum=sum+EXCLUDED.sum, count=count+EXCLUDED.count, avg=(sum+EXCLUDED.sum)/(count+EXCLUDED.count);`; return this.cleanStatement(query); } @@ -173,9 +202,22 @@ export class QueryBuilderService { const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join( ', ', )}) VALUES ${values.join(', ')};`; + // console.log('insert statement: ', query); return this.cleanStatement(query); } + addOnConflictStatement(tableName: string, query: string): string { + return query + .slice(0, -1) + .concat( + ` ON CONFLICT ON CONSTRAINT unique_${hash( + tableName, + 'secret', + {}, + )} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `, + ); + } + generateBulkInsertStatement(schema: JSONSchema4, data: any[]): string { const tableName = schema.title; const psqlSchema = schema.psql_schema; @@ -192,10 +234,10 @@ export class QueryBuilderService { fields.push(property); } - const tempTableName = `temp_${tableName}`; - const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName});`; + const tempTableName = `temp_${tableName} `; + const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName}); `; queries.push(createTempTable); - const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY;`; + const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY; `; queries.push(autoGen); const rows = []; let id = 1; @@ -220,7 +262,7 @@ export class QueryBuilderService { const insertTempTable = `INSERT INTO ${tempTableName} (${tempTableFields.join( ', ', )}) VALUES `; - const insertTempTableRows = `${insertTempTable}${rows.join(', ')};`; + const insertTempTableRows = `${insertTempTable}${rows.join(', ')}; `; queries.push(this.cleanStatement(insertTempTable)); let joinStatements = ''; let whereStatements = ''; @@ -230,7 +272,7 @@ export class QueryBuilderService { const referenceTable = fk.reference.table; const referenceColumn = fk.reference.column; const childColumn = fk.column; - joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn}`; + joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn} `; whereStatements += ` AND dimensions.${referenceTable}.${childColumn} IS NOT NULL`; }); } @@ -239,17 +281,22 @@ export class QueryBuilderService { ', ', )}) SELECT ${fields - .map((field) => `${tempTableName}.${field}`) - .join(', ')} FROM ${tempTableName} + .map((field) => `${tempTableName}.${field}`) + .join(', ')} FROM ${tempTableName} ${joinStatements === '' ? ' ' : joinStatements} - WHERE TRUE${whereStatements === '' ? ' ' : whereStatements};`; + WHERE TRUE${whereStatements === '' ? ' ' : whereStatements} + ON CONFLICT ON CONSTRAINT unique_${hash( + tableName, + 'secret', + {}, + )} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`; queries.push(filteredInsert); - const dropTempTable = `DROP TABLE ${tempTableName};`; + const dropTempTable = `DROP TABLE ${tempTableName}; `; queries.push(dropTempTable); - const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}\n${dropTempTable}`; - // const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}`; + const query = `${createTempTable} \n${insertTempTableRows} \n${filteredInsert} \n${dropTempTable} `; + // const query = `${ createTempTable } \n${ insertTempTableRows } \n${ filteredInsert } `; // if (query.toLowerCase().includes('null')) { // console.log('NULL Query: ', query); // } @@ -258,7 +305,16 @@ export class QueryBuilderService { return this.cleanStatement(query); } - generateUpdateStatement(schema: JSONSchema4, data: any): string[] { - throw new Error('Method not implemented.'); + generateUpdateStatement( + schema: JSONSchema4, + data: any, + where: string, + ): string { + // throw new Error('Method not implemented.'); + return `UPDATE ${schema.schema.psql_schema}.${schema.tableName} +SET sum = sum + ${data.sum}, + count = count + ${data.count - 2 * data.negcount}, + avg = (sum + ${data.sum}) /(count+${data.count - 2 * data.negcount}) +WHERE ${where} `; } } diff --git a/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json b/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json new file mode 100644 index 00000000..1d8186f2 --- /dev/null +++ b/impl/c-qube/src/services/query-builder/test-fixtures/createStatement.json @@ -0,0 +1 @@ +{"args":{"schema":{"title":"nishtha_totalmedium_eGFra0JofUpoa0QRHgIC","psql_schema":"datasets","properties":{"state_id":{"type":"string"},"program_name":{"type":"string"},"count":{"type":"number","format":"float"},"sum":{"type":"number","format":"float"},"avg":{"type":"number","format":"float"}},"fk":[{"column":"state_id","reference":{"table":"dimensions.state","column":"state_id"}},{"column":"program_name","reference":{"table":"dimensions.programnishtha","column":"program_name"}}]},"autoPrimaryKey":true},"output":"CREATE TABLE datasets.nishtha_totalmedium_eGFra0JofUpoa0QRHgIC (id SERIAL PRIMARY KEY, state_id VARCHAR, program_name VARCHAR, count FLOAT8, sum FLOAT8, avg FLOAT8,constraint fk_state_id FOREIGN KEY (state_id) REFERENCES dimensions.state(state_id),constraint fk_program_name FOREIGN KEY (program_name) REFERENCES dimensions.programnishtha(program_name),constraint unique_nishtha_totalmedium_eGFra0JofUpoa0QRHgIC UNIQUE (state_id, program_name));"} \ No newline at end of file diff --git a/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json b/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json new file mode 100644 index 00000000..cc5cc00f --- /dev/null +++ b/impl/c-qube/src/services/query-builder/test-fixtures/insertStatement.json @@ -0,0 +1,102 @@ +{ + "args": { + "schema": { + "fk": [ + { + "column": "gender", + "reference": { "table": "gender", "column": "gender_id" } + }, + { + "column": "district_id", + "reference": { "table": "district", "column": "district_id" } + } + ], + "title": "sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf", + "properties": { + "gender": { "type": "string" }, + "district_id": { "type": "string" }, + "count": { "type": "number", "format": "float" }, + "sum": { "type": "number", "format": "float" }, + "avg": { "type": "number", "format": "float" }, + "week": { "type": "integer" }, + "year": { "type": "integer" } + }, + "psql_schema": "datasets" + }, + "data": [ + { + "count": 120, + "sum": 4200, + "avg": 35, + "week": 9, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4176, + "avg": 34.8, + "week": 8, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4083, + "avg": 34.025, + "week": 6, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 239, + "sum": 8269, + "avg": 34.59832635983263, + "week": 7, + "year": 2023, + "gender": "male", + "district_id": "101" + }, + { + "count": 120, + "sum": 4184, + "avg": 34.86666666666667, + "week": 9, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 120, + "sum": 4111, + "avg": 34.25833333333333, + "week": 8, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 120, + "sum": 4141, + "avg": 34.50833333333333, + "week": 6, + "year": 2023, + "gender": "female", + "district_id": "101" + }, + { + "count": 240, + "sum": 8278, + "avg": 34.49166666666667, + "week": 7, + "year": 2023, + "gender": "female", + "district_id": "101" + } + ] + }, + "output": "CREATE TABLE IF NOT EXISTS temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (LIKE datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf); INSERT INTO temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year, id) VALUES ('male', '101', '120', '4200', '35', '9', '2023', 1), ('male', '101', '120', '4176', '34.8', '8', '2023', 2), ('male', '101', '120', '4083', '34.025', '6', '2023', 3), ('male', '101', '239', '8269', '34.59832635983263', '7', '2023', 4), ('female', '101', '120', '4184', '34.86666666666667', '9', '2023', 5), ('female', '101', '120', '4111', '34.25833333333333', '8', '2023', 6), ('female', '101', '120', '4141', '34.50833333333333', '6', '2023', 7), ('female', '101', '240', '8278', '34.49166666666667', '7', '2023', 8); INSERT INTO datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year) SELECT temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .count, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .sum, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .avg, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .week, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .year FROM temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf LEFT JOIN dimensions.gender ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender = dimensions.gender.gender LEFT JOIN dimensions.district ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id = dimensions.district.district_id WHERE TRUE AND dimensions.gender.gender IS NOT NULL AND dimensions.district.district_id IS NOT NULL ON CONFLICT ON CONSTRAINT unique_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf DO UPDATE SET sum = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum, count = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count, avg = (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum) / (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count); DROP TABLE temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf ;" +} diff --git a/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json b/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json new file mode 100644 index 00000000..2848dbb2 --- /dev/null +++ b/impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json @@ -0,0 +1,34 @@ +{ + "globals": { + "onlyCreateWhitelisted": true + }, + "dimensions": { + "namespace": "dimensions", + "fileNameFormat": "${dimensionName}.${index}.dimensions.data.csv", + "input": { + "files": "./ingest/dimensions" + } + }, + "programs": [ + { + "name": "DIKSHA", + "namespace": "diksha", + "description": "DIKSHA", + "shouldIngestToDB": true, + "input": { + "files": "./ingest/programs/diksha" + }, + "./output": { + "location": "./output/programs/diksha" + }, + "dimensions": { + "whitelisted": [ + "state,grade,subject,medium,board", + "textbookdiksha,grade,subject,medium", + "textbookdiksha,grade,subject,medium" + ], + "blacklisted": [] + } + } + ] +} diff --git a/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json b/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json new file mode 100644 index 00000000..967256f1 --- /dev/null +++ b/impl/c-qube/test/fixtures/outputDatasets/multiple_first.json @@ -0,0 +1,51 @@ +[ + { + "grade_diksha": "Class 2", + "avg": 2, + "count": 1, + "sum": 2, + "id": 1 + }, + { + "grade_diksha": "Class 1", + "avg": 2, + "count": 2, + "sum": 4, + "id": 2 + }, + { + "grade_diksha": "Class 6", + "avg": 2.25, + "count": 4, + "sum": 9, + "id": 3 + }, + { + "grade_diksha": "Class 7", + "avg": 1.5, + "count": 2, + "sum": 3, + "id": 4 + }, + { + "grade_diksha": "Class 8", + "avg": 3, + "count": 1, + "sum": 3, + "id": 5 + }, + { + "grade_diksha": "Class 3", + "avg": 3.3333333333333335, + "count": 3, + "sum": 10, + "id": 6 + }, + { + "grade_diksha": "Class 4", + "avg": 1, + "count": 1, + "sum": 1, + "id": 7 + } +] \ No newline at end of file diff --git a/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json b/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json new file mode 100644 index 00000000..5e86e39d --- /dev/null +++ b/impl/c-qube/test/fixtures/outputDatasets/multiple_second.json @@ -0,0 +1,51 @@ +[ + { + "grade_diksha": "Class 2", + "avg": 2, + "count": 2, + "sum": 4, + "id": 1 + }, + { + "grade_diksha": "Class 1", + "avg": 2, + "count": 4, + "sum": 8, + "id": 2 + }, + { + "grade_diksha": "Class 4", + "avg": 1, + "count": 2, + "sum": 2, + "id": 7 + }, + { + "grade_diksha": "Class 8", + "avg": 3, + "count": 2, + "sum": 6, + "id": 5 + }, + { + "grade_diksha": "Class 7", + "avg": 1.5, + "count": 4, + "sum": 6, + "id": 4 + }, + { + "grade_diksha": "Class 3", + "avg": 3.3333333333333335, + "count": 6, + "sum": 20, + "id": 6 + }, + { + "grade_diksha": "Class 6", + "avg": 2.25, + "count": 8, + "sum": 18, + "id": 3 + } +] \ No newline at end of file diff --git a/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts b/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts index e535335f..868bad02 100644 --- a/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts +++ b/impl/c-qube/test/modules/csv-adapter/csv-adapter.e2e-spec.ts @@ -12,6 +12,10 @@ import { DimensionGrammarService } from './../../../src/services/csv-adapter/par import { Pool } from 'pg'; import { ConfigModule, ConfigService } from '@nestjs/config'; +// importing JSONs +import * as multiple_first from '../../fixtures/outputDatasets/multiple_first.json'; +import * as multiple_second from '../../fixtures/outputDatasets/multiple_second.json'; + describe('AppController (e2e)', () => { let app: INestApplication; let csvAdapterService: CsvAdapterService; @@ -120,4 +124,42 @@ describe('AppController (e2e)', () => { expect(data).toBeDefined(); expect(data).toEqual([]); }); + + it('should test for multiple ingestion', async () => { + await csvAdapterService.nuke(); + await csvAdapterService.ingest( + './test/fixtures/ingestionConfigs', + 'config.multiple.json', + ); + await csvAdapterService.ingestData({}); + + // check that the values are correct + + let data = await csvAdapterService.prisma.$queryRawUnsafe( + 'SELECT grade_diksha, sum, count, avg FROM datasets.diksha_avg_play_time_in_mins_on_app_and_portal_grade', + ); + // remove id col + + expect(data).toEqual( + expect.arrayContaining( + multiple_first.map((item) => { + delete item.id; + return item; + }), + ), + ); + + await csvAdapterService.ingestData({}); + data = await csvAdapterService.prisma.$queryRawUnsafe( + 'SELECT grade_diksha, sum, count, avg FROM datasets.diksha_avg_play_time_in_mins_on_app_and_portal_grade', + ); + expect(data).toEqual( + expect.arrayContaining( + multiple_second.map((item) => { + delete item.id; + return item; + }), + ), + ); + }); });