Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiple ingestion #178

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing';
import { JSONSchema4 } from 'json-schema';
import { QueryBuilderService } from './query-builder.service';

// JSON imports
import * as createStatementInput from './test-fixtures/createStatement.json';
import * as insertStatementInput from './test-fixtures/insertStatement.json';

describe('QueryBuilderService', () => {
let service: QueryBuilderService;

Expand Down Expand Up @@ -241,4 +245,22 @@ describe('QueryBuilderService', () => {
);`),
);
});

it('checks for bulk upserting statement', () => {
const res = service.generateBulkInsertStatement(
insertStatementInput.args.schema as any,
insertStatementInput.args.data as any,
);

expect(res).toBe(insertStatementInput.output);
});

it('checks for create statement with proper unique constraint', async () => {
const res = service.generateCreateStatement(
createStatementInput.args.schema as any,
createStatementInput.args.autoPrimaryKey,
);

expect(res).toBe(createStatementInput.output);
});
});
74 changes: 59 additions & 15 deletions impl/c-qube/src/services/query-builder/query-builder.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ export class QueryBuilderService {
}

generateCreateStatement(schema: JSONSchema4, autoPrimaryKey = false): string {
// console.log('schema: ', schema);
const tableName = schema.title;
const psqlSchema = schema.psql_schema;
const primaryKeySegment = autoPrimaryKey ? '\n id SERIAL PRIMARY KEY,' : '';
let createStatement = `CREATE TABLE ${psqlSchema}.${tableName} (${primaryKeySegment}\n`;

const properties = schema.properties;

const propsForUniqueConstraint = [];
for (const property in properties) {
if (['date', 'week', 'year', 'month'].includes(property.trim())) {
propsForUniqueConstraint.push(property);
}
const column: JSONSchema4 = properties[property];
createStatement += ` ${property} `;
if (column.type === 'string' && column.format === 'date-time') {
Expand Down Expand Up @@ -84,11 +90,31 @@ export class QueryBuilderService {
createStatement += '\n);';

if (schema.fk !== undefined) {
// console.log('fk constraints called');
createStatement = this.addFKConstraintDuringCreation(
schema,
createStatement,
);

// adding unique constraint
let uniqueStatements = `,\nconstraint unique_${tableName} UNIQUE (`;
schema.fk.forEach((fk: fk) => {
uniqueStatements += `${fk.column}, `;
});
propsForUniqueConstraint.forEach((prop) => {
uniqueStatements += `${prop}, `;
});

uniqueStatements = uniqueStatements.slice(0, -2) + ')';
createStatement = createStatement
.slice(0, -2)
.concat(uniqueStatements)
.concat(');');
console.log('sql:', createStatement);
// console.log('schema: ', schema);
}

// console.log('create statement: ', createStatement);
return this.cleanStatement(createStatement);
}

Expand All @@ -100,8 +126,6 @@ export class QueryBuilderService {

for (const index of indexes) {
for (const column of index.columns) {
// if no indexes are specified in grammar, skip
if (column.length === 0) continue;
const indexName = `${schema.title}_${column.join('_')}_idx`;
const columns = column.join(', ');
const statement = `CREATE INDEX ${indexName} ON ${psqlSchema}.${schema.title} (${columns});`;
Expand Down Expand Up @@ -137,6 +161,7 @@ export class QueryBuilderService {
const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join(
', ',
)}) VALUES (${values.join(', ')});`;
// ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum=sum+EXCLUDED.sum, count=count+EXCLUDED.count, avg=(sum+EXCLUDED.sum)/(count+EXCLUDED.count);`;
return this.cleanStatement(query);
}

Expand Down Expand Up @@ -173,9 +198,18 @@ export class QueryBuilderService {
const query = `INSERT INTO ${psqlSchema}.${tableName} (${fields.join(
', ',
)}) VALUES ${values.join(', ')};`;
// console.log('insert statement: ', query);
return this.cleanStatement(query);
}

addOnConflictStatement(tableName: string, query: string): string {
return query
.slice(0, -1)
.concat(
` ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `,
);
}

generateBulkInsertStatement(schema: JSONSchema4, data: any[]): string {
const tableName = schema.title;
const psqlSchema = schema.psql_schema;
Expand All @@ -192,10 +226,10 @@ export class QueryBuilderService {
fields.push(property);
}

const tempTableName = `temp_${tableName}`;
const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName});`;
const tempTableName = `temp_${tableName} `;
const createTempTable = `CREATE TABLE IF NOT EXISTS ${tempTableName} (LIKE ${psqlSchema}.${tableName}); `;
queries.push(createTempTable);
const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY;`;
const autoGen = `ALTER TABLE ${tempTableName} ADD COLUMN id SERIAL PRIMARY KEY; `;
queries.push(autoGen);
const rows = [];
let id = 1;
Expand All @@ -220,7 +254,7 @@ export class QueryBuilderService {
const insertTempTable = `INSERT INTO ${tempTableName} (${tempTableFields.join(
', ',
)}) VALUES `;
const insertTempTableRows = `${insertTempTable}${rows.join(', ')};`;
const insertTempTableRows = `${insertTempTable}${rows.join(', ')}; `;
queries.push(this.cleanStatement(insertTempTable));
let joinStatements = '';
let whereStatements = '';
Expand All @@ -230,7 +264,7 @@ export class QueryBuilderService {
const referenceTable = fk.reference.table;
const referenceColumn = fk.reference.column;
const childColumn = fk.column;
joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn}`;
joinStatements += ` LEFT JOIN dimensions.${referenceTable} ON ${tempTableName}.${childColumn} = dimensions.${referenceTable}.${childColumn} `;
whereStatements += ` AND dimensions.${referenceTable}.${childColumn} IS NOT NULL`;
});
}
Expand All @@ -239,17 +273,18 @@ export class QueryBuilderService {
', ',
)})
SELECT ${fields
.map((field) => `${tempTableName}.${field}`)
.join(', ')} FROM ${tempTableName}
.map((field) => `${tempTableName}.${field}`)
.join(', ')} FROM ${tempTableName}
${joinStatements === '' ? ' ' : joinStatements}
WHERE TRUE${whereStatements === '' ? ' ' : whereStatements};`;
WHERE TRUE${whereStatements === '' ? ' ' : whereStatements}
ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`;

queries.push(filteredInsert);

const dropTempTable = `DROP TABLE ${tempTableName};`;
const dropTempTable = `DROP TABLE ${tempTableName}; `;
queries.push(dropTempTable);
const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}\n${dropTempTable}`;
// const query = `${createTempTable}\n${insertTempTableRows}\n${filteredInsert}`;
const query = `${createTempTable} \n${insertTempTableRows} \n${filteredInsert} \n${dropTempTable} `;
// const query = `${ createTempTable } \n${ insertTempTableRows } \n${ filteredInsert } `;
// if (query.toLowerCase().includes('null')) {
// console.log('NULL Query: ', query);
// }
Expand All @@ -258,7 +293,16 @@ export class QueryBuilderService {
return this.cleanStatement(query);
}

generateUpdateStatement(schema: JSONSchema4, data: any): string[] {
throw new Error('Method not implemented.');
generateUpdateStatement(
schema: JSONSchema4,
data: any,
where: string,
): string {
// throw new Error('Method not implemented.');
return `UPDATE ${schema.schema.psql_schema}.${schema.tableName}
SET sum = sum + ${data.sum},
count = count + ${data.count - 2 * data.negcount},
avg = (sum + ${data.sum}) /(count+${data.count - 2 * data.negcount})
WHERE ${where} `;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"args":{"schema":{"title":"nishtha_totalmedium_eGFra0JofUpoa0QRHgIC","psql_schema":"datasets","properties":{"state_id":{"type":"string"},"program_name":{"type":"string"},"count":{"type":"number","format":"float"},"sum":{"type":"number","format":"float"},"avg":{"type":"number","format":"float"}},"fk":[{"column":"state_id","reference":{"table":"dimensions.state","column":"state_id"}},{"column":"program_name","reference":{"table":"dimensions.programnishtha","column":"program_name"}}]},"autoPrimaryKey":true},"output":"CREATE TABLE datasets.nishtha_totalmedium_eGFra0JofUpoa0QRHgIC (id SERIAL PRIMARY KEY, state_id VARCHAR, program_name VARCHAR, count FLOAT8, sum FLOAT8, avg FLOAT8,constraint fk_state_id FOREIGN KEY (state_id) REFERENCES dimensions.state(state_id),constraint fk_program_name FOREIGN KEY (program_name) REFERENCES dimensions.programnishtha(program_name),constraint unique_nishtha_totalmedium_eGFra0JofUpoa0QRHgIC UNIQUE (state_id, program_name));"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"args": {
"schema": {
"fk": [
{
"column": "gender",
"reference": { "table": "gender", "column": "gender_id" }
},
{
"column": "district_id",
"reference": { "table": "district", "column": "district_id" }
}
],
"title": "sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf",
"properties": {
"gender": { "type": "string" },
"district_id": { "type": "string" },
"count": { "type": "number", "format": "float" },
"sum": { "type": "number", "format": "float" },
"avg": { "type": "number", "format": "float" },
"week": { "type": "integer" },
"year": { "type": "integer" }
},
"psql_schema": "datasets"
},
"data": [
{
"count": 120,
"sum": 4200,
"avg": 35,
"week": 9,
"year": 2023,
"gender": "male",
"district_id": "101"
},
{
"count": 120,
"sum": 4176,
"avg": 34.8,
"week": 8,
"year": 2023,
"gender": "male",
"district_id": "101"
},
{
"count": 120,
"sum": 4083,
"avg": 34.025,
"week": 6,
"year": 2023,
"gender": "male",
"district_id": "101"
},
{
"count": 239,
"sum": 8269,
"avg": 34.59832635983263,
"week": 7,
"year": 2023,
"gender": "male",
"district_id": "101"
},
{
"count": 120,
"sum": 4184,
"avg": 34.86666666666667,
"week": 9,
"year": 2023,
"gender": "female",
"district_id": "101"
},
{
"count": 120,
"sum": 4111,
"avg": 34.25833333333333,
"week": 8,
"year": 2023,
"gender": "female",
"district_id": "101"
},
{
"count": 120,
"sum": 4141,
"avg": 34.50833333333333,
"week": 6,
"year": 2023,
"gender": "female",
"district_id": "101"
},
{
"count": 240,
"sum": 8278,
"avg": 34.49166666666667,
"week": 7,
"year": 2023,
"gender": "female",
"district_id": "101"
}
]
},
"output": "CREATE TABLE IF NOT EXISTS temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (LIKE datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf); INSERT INTO temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year, id) VALUES ('male', '101', '120', '4200', '35', '9', '2023', 1), ('male', '101', '120', '4176', '34.8', '8', '2023', 2), ('male', '101', '120', '4083', '34.025', '6', '2023', 3), ('male', '101', '239', '8269', '34.59832635983263', '7', '2023', 4), ('female', '101', '120', '4184', '34.86666666666667', '9', '2023', 5), ('female', '101', '120', '4111', '34.25833333333333', '8', '2023', 6), ('female', '101', '120', '4141', '34.50833333333333', '6', '2023', 7), ('female', '101', '240', '8278', '34.49166666666667', '7', '2023', 8); INSERT INTO datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf (gender, district_id, count, sum, avg, week, year) SELECT temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .count, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .sum, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .avg, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .week, temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .year FROM temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf LEFT JOIN dimensions.gender ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .gender = dimensions.gender.gender LEFT JOIN dimensions.district ON temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf .district_id = dimensions.district.district_id WHERE TRUE AND dimensions.gender.gender IS NOT NULL AND dimensions.district.district_id IS NOT NULL ON CONFLICT ON CONSTRAINT unique_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf DO UPDATE SET sum = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum, count = datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count, avg = (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.sum + EXCLUDED.sum) / (datasets.sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf.count + EXCLUDED.count); DROP TABLE temp_sch_att_studentsmarked_Z2tnSW9jIGRNYmR9YHRf ;"
}
34 changes: 34 additions & 0 deletions impl/c-qube/test/fixtures/ingestionConfigs/config.multiple.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"globals": {
"onlyCreateWhitelisted": true
},
"dimensions": {
"namespace": "dimensions",
"fileNameFormat": "${dimensionName}.${index}.dimensions.data.csv",
"input": {
"files": "./ingest/dimensions"
}
},
"programs": [
{
"name": "DIKSHA",
"namespace": "diksha",
"description": "DIKSHA",
"shouldIngestToDB": true,
"input": {
"files": "./ingest/programs/diksha"
},
"./output": {
"location": "./output/programs/diksha"
},
"dimensions": {
"whitelisted": [
"state,grade,subject,medium,board",
"textbookdiksha,grade,subject,medium",
"textbookdiksha,grade,subject,medium"
],
"blacklisted": []
}
}
]
}
51 changes: 51 additions & 0 deletions impl/c-qube/test/fixtures/outputDatasets/multiple_first.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[
{
"grade_diksha": "Class 2",
"avg": 2,
"count": 1,
"sum": 2,
"id": 1
},
{
"grade_diksha": "Class 1",
"avg": 2,
"count": 2,
"sum": 4,
"id": 2
},
{
"grade_diksha": "Class 6",
"avg": 2.25,
"count": 4,
"sum": 9,
"id": 3
},
{
"grade_diksha": "Class 7",
"avg": 1.5,
"count": 2,
"sum": 3,
"id": 4
},
{
"grade_diksha": "Class 8",
"avg": 3,
"count": 1,
"sum": 3,
"id": 5
},
{
"grade_diksha": "Class 3",
"avg": 3.3333333333333335,
"count": 3,
"sum": 10,
"id": 6
},
{
"grade_diksha": "Class 4",
"avg": 1,
"count": 1,
"sum": 1,
"id": 7
}
]
Loading
Loading