Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiple ingestion #178

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions impl/c-qube/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DATABASE_URL="postgres://timescaledb:postgrespassword@localhost:5432/postgres?sslmode=disable"
DB_USERNAME="timescaledb"
DB_HOST="localhost"
DB_NAME="postgres"
DB_PASSWORD="postgrespassword"
DB_PORT="5432"
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:

- Added the required column `metric` to the `EventGrammar` table without a default value. This is not possible if the table is not empty.

*/
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ADD COLUMN "metric" TEXT NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:

- Added the required column `egSchema` to the `EventGrammar` table without a default value. This is not possible if the table is not empty.

*/
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ADD COLUMN "egSchema" JSONB NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ALTER COLUMN "egSchema" SET DATA TYPE TEXT;
2 changes: 2 additions & 0 deletions impl/c-qube/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ model EventGrammar {
file String?
eventType EventType @default(INTERNAL)
DatasetGrammar DatasetGrammar[]
metric String
egSchema String

@@schema("spec")
}
Expand Down
146 changes: 73 additions & 73 deletions impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
} from './parser/dataset/dataset-grammar.helper';
import {
createEventGrammarFromCSVDefinition,
getEGDefFromFile,
getEGDefFromDB,
} from './parser/event-grammar/event-grammar.service';
import {
createCompoundDatasetGrammars,
Expand Down Expand Up @@ -486,85 +486,85 @@ export class CsvAdapterService {
),
);
}

this.logger.verbose('Ingested single DatasetGrammars');
const compoundDatasetGrammars: DatasetGrammar[] =
await this.datasetService.getCompoundDatasetGrammars(filter);

// Ingest Compound DatasetGrammar
for (let m = 0; m < compoundDatasetGrammars.length; m++) {
promises.push(
limit(() =>
getEGDefFromFile(compoundDatasetGrammars[m].eventGrammarFile).then(
async (s) => {
const {
instrumentField,
}: {
eventGrammarDef: EventGrammarCSVFormat[];
instrumentField: string;
} = s;
const compoundEventGrammar: EventGrammar = {
name: '',
description: '',
dimension: [],
instrument_field: instrumentField,
is_active: true,
schema: {},
instrument: {
type: InstrumentType.COUNTER,
name: 'counter',
},
};
const events: Event[] =
await createCompoundDatasetDataToBeInserted(
compoundDatasetGrammars[m].eventGrammarFile.replace(
'grammar',
'data',
),
compoundEventGrammar,
compoundDatasetGrammars[m],
);
// Create Pipes
const pipe: Pipe = {
event: compoundEventGrammar,
transformer: defaultTransformers[0],
dataset: compoundDatasetGrammars[m],
};
const transformContext: TransformerContext = {
dataset: compoundDatasetGrammars[m],
events: events,
isChainable: false,
pipeContext: {},
};
if (events && events.length > 0) {
const datasetUpdateRequest: DatasetUpdateRequest[] =
pipe.transformer.transformSync(
callback,
transformContext,
events,
) as DatasetUpdateRequest[];

// console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]);

await this.datasetService
.processDatasetUpdateRequest(datasetUpdateRequest)
.then(() => {
this.logger.verbose(
`Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
})
.catch((e) => {
this.logger.verbose(
`Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
});
} else {
console.error(
'No relevant events for this dataset',
compoundDatasetGrammars[m].name,
);
}
},
),
getEGDefFromDB(
compoundDatasetGrammars[m].eventGrammarFile,
this.prisma,
).then(async (s) => {
const {
instrumentField,
}: {
eventGrammarDef: EventGrammarCSVFormat[];
instrumentField: string;
} = s as any;
const compoundEventGrammar: EventGrammar = {
name: '',
description: '',
dimension: [],
instrument_field: instrumentField,
is_active: true,
schema: {},
instrument: {
type: InstrumentType.COUNTER,
name: 'counter',
},
};
const events: Event[] = await createCompoundDatasetDataToBeInserted(
compoundDatasetGrammars[m].eventGrammarFile.replace(
'grammar',
'data',
),
compoundEventGrammar,
compoundDatasetGrammars[m],
);
// Create Pipes
const pipe: Pipe = {
event: compoundEventGrammar,
transformer: defaultTransformers[0],
dataset: compoundDatasetGrammars[m],
};
const transformContext: TransformerContext = {
dataset: compoundDatasetGrammars[m],
events: events,
isChainable: false,
pipeContext: {},
};
if (events && events.length > 0) {
const datasetUpdateRequest: DatasetUpdateRequest[] =
pipe.transformer.transformSync(
callback,
transformContext,
events,
) as DatasetUpdateRequest[];

// console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]);

await this.datasetService
.processDatasetUpdateRequest(datasetUpdateRequest)
.then(() => {
this.logger.verbose(
`Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
})
.catch((e) => {
this.logger.verbose(
`Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
});
} else {
console.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the default logger instead of console?

'No relevant events for this dataset',
compoundDatasetGrammars[m].name,
);
}
}),
),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ export const createDatasetDataToBeInserted = async (

const filePath = eventGrammar.file.replace('grammar', 'data');

fs.access(filePath, fs.constants.F_OK, (err) => {
if (err) {
console.error(`File at $${filePath} does not exist`);
return;
}
});

const df = await readCSV(filePath);
if (!df || !df[0]) return;

Expand Down Expand Up @@ -94,6 +101,12 @@ export const createCompoundDatasetDataToBeInserted = async (
delete properties.year;

// checking if the file is empty or not
fs.access(eventFilePath, fs.constants.F_OK, (err) => {
if (err) {
console.error('File does not exist');
return;
}
});
const stats = fs.statSync(eventFilePath);
if (stats.size === 0) {
console.log(`File at ${eventFilePath} is empty`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ export const createDimensionGrammarFromCSVDefinition = async (
csvFilePath: string,
readFile: (path: string, encoding: string) => Promise<string> = fs.readFile,
): Promise<DimensionGrammar | null> => {
fs.access(csvFilePath, fs.constants.F_OK, (err) => {
if (err) {
console.error('File does not exist');
return null;
}
});
const fileContent = await readFile(csvFilePath, 'utf-8');

const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim());

if (!isValidCSVFormat(row1, row2, row3)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ export class DimensionGrammarService {
async createDimensionGrammarFromCSVDefinition(
csvFilePath: string,
): Promise<DimensionGrammar | null> {
try {
await fs.access(csvFilePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${csvFilePath} does not exist`);
return null;
}

const fileContent = await fs.readFile(csvFilePath, 'utf-8');
const [row1, row2, row3] = fileContent.split('\n').map((row) => row.trim());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
FieldType,
} from '../../types/parser';
import { readCSVFile } from '../utils/csvreader';
import { PrismaService } from 'src/prisma.service';

export async function getEGDefFromFile(csvFilePath: string) {
const [
Expand All @@ -39,6 +40,26 @@ export async function getEGDefFromFile(csvFilePath: string) {
return { eventGrammarDef, instrumentField };
}

export async function getEGDefFromDB(
csvFilePath: string,
prisma: PrismaService,
) {
const metrics = await prisma.eventGrammar.findMany({
where: {
file: csvFilePath,
},
select: {
egSchema: true,
metric: true,
},
});

return {
eventGrammarDef: metrics[0]?.egSchema,
instrumentField: metrics[0]?.metric,
};
}

export const createEventGrammarFromCSVDefinition = async (
csvFilePath: string,
dimensionFileBasePath: string,
Expand Down
13 changes: 13 additions & 0 deletions impl/c-qube/src/services/csv-adapter/parser/utils/csvreader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ const fs = require('fs').promises;
import * as csv from 'csv-parser';

export async function readCSV(filePath: string): Promise<string[][]> {
try {
await fs.access(filePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${filePath} does not exist`);
return null;
}

return new Promise((resolve, reject) => {
const rows: string[][] = [];
// TODO: Add checking here
Expand All @@ -23,6 +30,12 @@ export async function readCSV(filePath: string): Promise<string[][]> {
}

export async function readCSVFile(filePath: string): Promise<string[]> {
try {
await fs.access(filePath, fs.constants.F_OK);
} catch (err) {
console.error(`File at: ${filePath} does not exist`);
return null;
}
const fileContent = await fs.readFile(filePath, 'utf-8');

return fileContent
Expand Down
10 changes: 10 additions & 0 deletions impl/c-qube/src/services/event/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import { QueryBuilderService } from '../query-builder/query-builder.service';
import { DimensionService } from '../dimension/dimension.service';
import { DimensionMapping } from 'src/types/dataset';
import { DimensionGrammar } from 'src/types/dimension';
import {
getEGDefFromDB,
getEGDefFromFile,
} from '../csv-adapter/parser/event-grammar/event-grammar.service';

@Injectable()
export class EventService {
Expand Down Expand Up @@ -65,6 +69,10 @@ export class EventService {
await this.dimensionService.getDimensionGrammaModelByName(
eventGrammar.dimension[0].dimension.name.name,
);
const { eventGrammarDef, instrumentField } = await getEGDefFromFile(
eventGrammar.file,
);

return this.prisma.eventGrammar
.create({
data: {
Expand All @@ -82,6 +90,8 @@ export class EventService {
},
},
]),
metric: eventGrammar.instrument_field,
egSchema: JSON.stringify(eventGrammarDef),
instrument: {
connect: {
name: 'COUNTER', //TODO: Change this to eventGrammar.instrument.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing';
import { JSONSchema4 } from 'json-schema';
import { QueryBuilderService } from './query-builder.service';

// JSON imports
import * as createStatementInput from './test-fixtures/createStatement.json';
import * as insertStatementInput from './test-fixtures/insertStatement.json';

describe('QueryBuilderService', () => {
let service: QueryBuilderService;

Expand Down Expand Up @@ -241,4 +245,22 @@ describe('QueryBuilderService', () => {
);`),
);
});

it('checks for bulk upserting statement', () => {
const res = service.generateBulkInsertStatement(
insertStatementInput.args.schema as any,
insertStatementInput.args.data as any,
);

expect(res).toBe(insertStatementInput.output);
});

it('checks for create statement with proper unique constraint', async () => {
const res = service.generateCreateStatement(
createStatementInput.args.schema as any,
createStatementInput.args.autoPrimaryKey,
);

expect(res).toBe(createStatementInput.output);
});
});
Loading
Loading