Skip to content

Commit

Permalink
feat: remove dependency on grammar files during ingest-data
Browse files Browse the repository at this point in the history
  • Loading branch information
techsavvyash committed Oct 30, 2023
1 parent 1b42f87 commit ef6f60b
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:
- Added the required column `metric` to the `EventGrammar` table without a default value. This is not possible if the table is not empty.
*/
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ADD COLUMN "metric" TEXT NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:
- Added the required column `egSchema` to the `EventGrammar` table without a default value. This is not possible if the table is not empty.
*/
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ADD COLUMN "egSchema" JSONB NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "spec"."EventGrammar" ALTER COLUMN "egSchema" SET DATA TYPE TEXT;
2 changes: 2 additions & 0 deletions impl/c-qube/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ model EventGrammar {
file String?
eventType EventType @default(INTERNAL)
DatasetGrammar DatasetGrammar[]
metric String
egSchema String
@@schema("spec")
}
Expand Down
143 changes: 72 additions & 71 deletions impl/c-qube/src/services/csv-adapter/csv-adapter.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from './parser/dataset/dataset-grammar.helper';
import {
createEventGrammarFromCSVDefinition,
getEGDefFromDB,
getEGDefFromFile,
} from './parser/event-grammar/event-grammar.service';
import {
Expand Down Expand Up @@ -494,77 +495,77 @@ export class CsvAdapterService {
for (let m = 0; m < compoundDatasetGrammars.length; m++) {
promises.push(
limit(() =>
getEGDefFromFile(compoundDatasetGrammars[m].eventGrammarFile).then(
async (s) => {
const {
instrumentField,
}: {
eventGrammarDef: EventGrammarCSVFormat[];
instrumentField: string;
} = s;
const compoundEventGrammar: EventGrammar = {
name: '',
description: '',
dimension: [],
instrument_field: instrumentField,
is_active: true,
schema: {},
instrument: {
type: InstrumentType.COUNTER,
name: 'counter',
},
};
const events: Event[] =
await createCompoundDatasetDataToBeInserted(
compoundDatasetGrammars[m].eventGrammarFile.replace(
'grammar',
'data',
),
compoundEventGrammar,
compoundDatasetGrammars[m],
);
// Create Pipes
const pipe: Pipe = {
event: compoundEventGrammar,
transformer: defaultTransformers[0],
dataset: compoundDatasetGrammars[m],
};
const transformContext: TransformerContext = {
dataset: compoundDatasetGrammars[m],
events: events,
isChainable: false,
pipeContext: {},
};
if (events && events.length > 0) {
const datasetUpdateRequest: DatasetUpdateRequest[] =
pipe.transformer.transformSync(
callback,
transformContext,
events,
) as DatasetUpdateRequest[];

// console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]);

await this.datasetService
.processDatasetUpdateRequest(datasetUpdateRequest)
.then(() => {
this.logger.verbose(
`Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
})
.catch((e) => {
this.logger.verbose(
`Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
});
} else {
console.error(
'No relevant events for this dataset',
compoundDatasetGrammars[m].name,
);
}
},
),
getEGDefFromDB(
compoundDatasetGrammars[m].eventGrammarFile,
this.prisma,
).then(async (s) => {
const {
instrumentField,
}: {
eventGrammarDef: EventGrammarCSVFormat[];
instrumentField: string;
} = s as any;
const compoundEventGrammar: EventGrammar = {
name: '',
description: '',
dimension: [],
instrument_field: instrumentField,
is_active: true,
schema: {},
instrument: {
type: InstrumentType.COUNTER,
name: 'counter',
},
};
const events: Event[] = await createCompoundDatasetDataToBeInserted(
compoundDatasetGrammars[m].eventGrammarFile.replace(
'grammar',
'data',
),
compoundEventGrammar,
compoundDatasetGrammars[m],
);
// Create Pipes
const pipe: Pipe = {
event: compoundEventGrammar,
transformer: defaultTransformers[0],
dataset: compoundDatasetGrammars[m],
};
const transformContext: TransformerContext = {
dataset: compoundDatasetGrammars[m],
events: events,
isChainable: false,
pipeContext: {},
};
if (events && events.length > 0) {
const datasetUpdateRequest: DatasetUpdateRequest[] =
pipe.transformer.transformSync(
callback,
transformContext,
events,
) as DatasetUpdateRequest[];

// console.log(datasetUpdateRequest.length, datasetUpdateRequest[0]);

await this.datasetService
.processDatasetUpdateRequest(datasetUpdateRequest)
.then(() => {
this.logger.verbose(
`Ingested Compound Dataset without any error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
})
.catch((e) => {
this.logger.verbose(
`Ingested Compound Dataset with error ${events.length} events for ${compoundDatasetGrammars[m].name}`,
);
});
} else {
console.error(
'No relevant events for this dataset',
compoundDatasetGrammars[m].name,
);
}
}),
),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
FieldType,
} from '../../types/parser';
import { readCSVFile } from '../utils/csvreader';
import { PrismaService } from 'src/prisma.service';

export async function getEGDefFromFile(csvFilePath: string) {
const [
Expand All @@ -39,6 +40,28 @@ export async function getEGDefFromFile(csvFilePath: string) {
return { eventGrammarDef, instrumentField };
}

export async function getEGDefFromDB(
csvFilePath: string,
prisma: PrismaService,
) {
console.log('csvFilePath: ', csvFilePath);
const metrics = await prisma.eventGrammar.findMany({
where: {
file: csvFilePath,
},
select: {
egSchema: true,
metric: true,
},
});
console.log('metrics: ', metrics);

return {
eventGrammarDef: metrics[0]?.egSchema,
instrumentField: metrics[0]?.metric,
};
}

export const createEventGrammarFromCSVDefinition = async (
csvFilePath: string,
dimensionFileBasePath: string,
Expand Down
10 changes: 10 additions & 0 deletions impl/c-qube/src/services/event/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import { QueryBuilderService } from '../query-builder/query-builder.service';
import { DimensionService } from '../dimension/dimension.service';
import { DimensionMapping } from 'src/types/dataset';
import { DimensionGrammar } from 'src/types/dimension';
import {
getEGDefFromDB,
getEGDefFromFile,
} from '../csv-adapter/parser/event-grammar/event-grammar.service';

@Injectable()
export class EventService {
Expand Down Expand Up @@ -65,6 +69,10 @@ export class EventService {
await this.dimensionService.getDimensionGrammaModelByName(
eventGrammar.dimension[0].dimension.name.name,
);
const { eventGrammarDef, instrumentField } = await getEGDefFromFile(
eventGrammar.file,
);

return this.prisma.eventGrammar
.create({
data: {
Expand All @@ -82,6 +90,8 @@ export class EventService {
},
},
]),
metric: eventGrammar.instrument_field,
egSchema: JSON.stringify(eventGrammarDef),
instrument: {
connect: {
name: 'COUNTER', //TODO: Change this to eventGrammar.instrument.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class QueryBuilderService {
.slice(0, -2)
.concat(uniqueStatements)
.concat(');');
console.log('sql:', createStatement);
// console.log('sql:', createStatement);
// console.log('schema: ', schema);
}

Expand Down

0 comments on commit ef6f60b

Please sign in to comment.