Skip to content

Commit

Permalink
fix: unique constraint throwing due to char limit in psql, skipping u…
Browse files Browse the repository at this point in the history
…navailable files during `ingest-data`
  • Loading branch information
techsavvyash committed Oct 31, 2023
1 parent ef6f60b commit 88b78ec
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 8 deletions.
File renamed without changes.
6 changes: 6 additions & 0 deletions impl/c-qube/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DATABASE_URL="postgres://timescaledb:postgrespassword@localhost:5432/postgres?sslmode=disable"
DB_USERNAME="timescaledb"
DB_HOST="localhost"
DB_NAME="postgres"
DB_PASSWORD="postgrespassword"
DB_PORT="5432"
3 changes: 1 addition & 2 deletions impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
import {
createEventGrammarFromCSVDefinition,
getEGDefFromDB,
getEGDefFromFile,
} from './parser/event-grammar/event-grammar.service';
import {
createCompoundDatasetGrammars,
Expand Down Expand Up @@ -487,7 +486,7 @@ export class CsvAdapterService {
),
);
}

this.logger.verbose('Ingested single DatasetGrammars');
const compoundDatasetGrammars: DatasetGrammar[] =
await this.datasetService.getCompoundDatasetGrammars(filter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ export const createDatasetDataToBeInserted = async (

const filePath = eventGrammar.file.replace('grammar', 'data');

fs.access(filePath, fs.constants.F_OK, (err) => {
if (err) {
console.error(`File at $${filePath} does not exist`);
return;
}
});

const df = await readCSV(filePath);
if (!df || !df[0]) return;

Expand Down Expand Up @@ -94,6 +101,12 @@ export const createCompoundDatasetDataToBeInserted = async (
delete properties.year;

// checking if the file is empty or not
fs.access(eventFilePath, fs.constants.F_OK, (err) => {
if (err) {
console.error('File does not exist');
return;
}
});
const stats = fs.statSync(eventFilePath);
if (stats.size === 0) {
console.log(`File at ${eventFilePath} is empty`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ export const createDimensionGrammarFromCSVDefinition = async (
csvFilePath: string,
readFile: (path: string, encoding: string) => Promise<string> = fs.readFile,
): Promise<DimensionGrammar | null> => {
fs.access(csvFilePath, fs.constants.F_OK, (err) => {
if (err) {
console.error('File does not exist');
return null;
}
});
const fileContent = await readFile(csvFilePath, 'utf-8');

const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim());

if (!isValidCSVFormat(row1, row2, row3)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ export class DimensionGrammarService {
async createDimensionGrammarFromCSVDefinition(
csvFilePath: string,
): Promise<DimensionGrammar | null> {
try {
await fs.access(csvFilePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${csvFilePath} does not exist`);
return null;
}

const fileContent = await fs.readFile(csvFilePath, 'utf-8');
const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export async function getEGDefFromDB(
csvFilePath: string,
prisma: PrismaService,
) {
console.log('csvFilePath: ', csvFilePath);
const metrics = await prisma.eventGrammar.findMany({
where: {
file: csvFilePath,
Expand All @@ -54,7 +53,6 @@ export async function getEGDefFromDB(
metric: true,
},
});
console.log('metrics: ', metrics);

return {
eventGrammarDef: metrics[0]?.egSchema,
Expand Down
13 changes: 13 additions & 0 deletions impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ const fs = require('fs').promises;
import * as csv from 'csv-parser';

export async function readCSV(filePath: string): Promise<string[][]> {
try {
await fs.access(filePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${filePath} does not exist`);
return null;
}

return new Promise((resolve, reject) => {
const rows: string[][] = [];
// TODO: Add checking here
Expand All @@ -23,6 +30,12 @@ export async function readCSV(filePath: string): Promise<string[][]> {
}

export async function readCSVFile(filePath: string): Promise<string[]> {
try {
await fs.access(filePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${filePath} does not exist`);
return null;
}
const fileContent = await fs.readFile(filePath, 'utf-8');

return fileContent
Expand Down
20 changes: 16 additions & 4 deletions impl/c-qube/src/services/query-builder/query-builder.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Injectable } from '@nestjs/common';
import { JSONSchema4 } from 'json-schema';
import { hash } from '../../utils/hash';

const fs = require('fs');
const crypto = require('crypto');

type fk = {
column: string;
Expand Down Expand Up @@ -49,7 +51,7 @@ export class QueryBuilderService {
const tableName = schema.title;
const psqlSchema = schema.psql_schema;
const primaryKeySegment = autoPrimaryKey ? '\n id SERIAL PRIMARY KEY,' : '';
let createStatement = `CREATE TABLE ${psqlSchema}.${tableName} (${primaryKeySegment}\n`;
let createStatement = `CREATE TABLE IF NOT EXISTS ${psqlSchema}.${tableName} (${primaryKeySegment}\n`;

const properties = schema.properties;

Expand Down Expand Up @@ -97,7 +99,8 @@ export class QueryBuilderService {
);

// adding unique constraint
let uniqueStatements = `,\nconstraint unique_${tableName} UNIQUE (`;
const hashedTableName = hash(tableName, 'secret', {});
let uniqueStatements = `,\nconstraint unique_${hashedTableName} UNIQUE (`;
schema.fk.forEach((fk: fk) => {
uniqueStatements += `${fk.column}, `;
});
Expand Down Expand Up @@ -126,6 +129,7 @@ export class QueryBuilderService {

for (const index of indexes) {
for (const column of index.columns) {
if (column.length === 0) continue;
const indexName = `${schema.title}_${column.join('_')}_idx`;
const columns = column.join(', ');
const statement = `CREATE INDEX ${indexName} ON ${psqlSchema}.${schema.title} (${columns});`;
Expand Down Expand Up @@ -206,7 +210,11 @@ export class QueryBuilderService {
return query
.slice(0, -1)
.concat(
` ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `,
` ON CONFLICT ON CONSTRAINT unique_${hash(
tableName,
'secret',
{},
)} DO UPDATE SET sum = datasets.${tableName}.sum + EXCLUDED.sum, count = datasets.${tableName}.count + EXCLUDED.count, avg = (datasets.${tableName}.sum + EXCLUDED.sum) / (datasets.${tableName}.count + EXCLUDED.count); `,
);
}

Expand Down Expand Up @@ -277,7 +285,11 @@ export class QueryBuilderService {
.join(', ')} FROM ${tempTableName}
${joinStatements === '' ? ' ' : joinStatements}
WHERE TRUE${whereStatements === '' ? ' ' : whereStatements}
ON CONFLICT ON CONSTRAINT unique_${tableName} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`;
ON CONFLICT ON CONSTRAINT unique_${hash(
tableName,
'secret',
{},
)} DO UPDATE SET sum = ${psqlSchema}.${tableName}.sum + EXCLUDED.sum, count = ${psqlSchema}.${tableName}.count + EXCLUDED.count, avg = (${psqlSchema}.${tableName}.sum + EXCLUDED.sum) / (${psqlSchema}.${tableName}.count + EXCLUDED.count);`;

queries.push(filteredInsert);

Expand Down

0 comments on commit 88b78ec

Please sign in to comment.