From 86d5256177ce0e9b73bae466f89ce2b90a54cf75 Mon Sep 17 00:00:00 2001 From: pandutibil Date: Mon, 11 Sep 2023 01:00:56 +0530 Subject: [PATCH] Event and dimension validation APIs are implemented --- .../controller/ingestion.controller.ts | 86 ++++++++++++++++++- src/ingestion/ingestion.module.ts | 3 +- .../services/dimension/dimension.service.ts | 53 +++++++++++- src/ingestion/services/event/event.service.ts | 53 +++++++++++- src/ingestion/services/generic-function.ts | 2 +- .../services/grammar/grammar.service.spec.ts | 18 ++++ .../services/grammar/grammar.service.ts | 24 ++++++ 7 files changed, 232 insertions(+), 7 deletions(-) create mode 100644 src/ingestion/services/grammar/grammar.service.spec.ts create mode 100644 src/ingestion/services/grammar/grammar.service.ts diff --git a/src/ingestion/controller/ingestion.controller.ts b/src/ingestion/controller/ingestion.controller.ts index e3d9026..741eaf6 100644 --- a/src/ingestion/controller/ingestion.controller.ts +++ b/src/ingestion/controller/ingestion.controller.ts @@ -21,7 +21,7 @@ import { UploadedFile, UseInterceptors, Put, - UseGuards, Req + UseGuards, Req, Param } from '@nestjs/common'; import {DatasetService} from '../services/dataset/dataset.service'; import {DimensionService} from '../services/dimension/dimension.service'; @@ -41,6 +41,29 @@ import {V4DataEmissionService} from "../services/v4-data-emission/v4-data-emissi import {JwtGuard} from '../../guards/jwt.guard'; import * as jwt from 'jsonwebtoken'; import { NvskApiService } from '../services/nvsk-api/nvsk-api.service'; +import { GrammarService } from '../services/grammar/grammar.service'; +import { GenericFunction } from '../services/generic-function'; + +let validateBodySchema = { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "event", + "dimension" + ] + }, + "id": { + "type": "string", + "shouldnotnull": true + } + }, + "required": [ + "type", + "id" + ] +}; @ApiTags('ingestion') @Controller('') @@ -50,7 +73,9 @@ export class IngestionController { , private eventService: EventService, private csvImportService: CsvImportService, private fileStatus: FileStatusService, private updateFileStatus: UpdateFileStatusService, private databaseService: DatabaseService, private dataEmissionService: DataEmissionService, private v4DataEmissionService: V4DataEmissionService, private rawDataImportService:RawDataImportService, - private nvskService:NvskApiService) { + private nvskService:NvskApiService, + private grammarService: GrammarService, + private service: GenericFunction) { } @Get('generatejwt') @@ -256,6 +281,61 @@ export class IngestionController { } } -} + @Get('/grammar/:type') + async fetchGrammar(@Param() params: any, @Res()response: Response){ + try { + const { type } = params; + let result; + if (type === "event") { + result = await this.grammarService.getEventSchemas(); + } else if (type === "dimension") { + result = await this.grammarService.getDimensionSchemas(); + } else { + throw new Error(`Invalid type found ${type}`); + } + + response.status(200).send(result); + } catch (e) { + console.error('ingestion.controller.nvskAPI service: ', e.message); + throw new Error(e); + } + } + + @UseInterceptors(FileInterceptor('file', { + storage: diskStorage({ + destination: './files', + }) + })) + + @Post('/validate') + @ApiConsumes('multipart/form-data') + async validateEventOrDimension(@Body() body: any, @Res()response: Response, @UploadedFile( + new ParseFilePipe({ + validators: [ + new FileIsDefinedValidator(), + new FileTypeValidator({fileType: 'text/csv'}), + ], + }), + ) file: Express.Multer.File, @Req() request: Request) { + try { + let isValidSchema: any = await this.service.ajvValidator(validateBodySchema, body); + if (isValidSchema && isValidSchema.errors) { + response.status(400).send({error: isValidSchema.errors}); + } else { + let result: any; + if (body.type === "dimension") { + result = await this.dimensionService.validateDimension(body, file, request); + } else if (body.type === "event") { + result = await this.eventService.validateEvent(body, file, request); + } + response.status(200).send({data: result}); + } + } catch (e) { + console.error('ingestion.controller.csv: ', e); + response.status(400).send({message: e.error || e.message || e}); + // throw new Error(e); + } + } +} diff --git a/src/ingestion/ingestion.module.ts b/src/ingestion/ingestion.module.ts index dadfa80..41a5b82 100644 --- a/src/ingestion/ingestion.module.ts +++ b/src/ingestion/ingestion.module.ts @@ -16,11 +16,12 @@ import {UploadService} from "./services/file-uploader-service"; import { RawDataImportService } from './services/rawDataImport/rawDataImport.service'; import { NvskApiService } from './services/nvsk-api/nvsk-api.service'; import { DateService } from './services/dateService'; +import { GrammarService } from './services/grammar/grammar.service'; @Module({ controllers: [IngestionController], providers: [DatasetService, DimensionService, EventService, GenericFunction, HttpCustomService, CsvImportService, FileStatusService, - UpdateFileStatusService, DataEmissionService, V4DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService], + UpdateFileStatusService, DataEmissionService, V4DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService,GrammarService], imports: [DatabaseModule, HttpModule] }) export class IngestionModule { diff --git a/src/ingestion/services/dimension/dimension.service.ts b/src/ingestion/services/dimension/dimension.service.ts index 96d2b72..a3a84d3 100644 --- a/src/ingestion/services/dimension/dimension.service.ts +++ b/src/ingestion/services/dimension/dimension.service.ts @@ -3,10 +3,14 @@ import {IngestionDatasetQuery} from '../../query/ingestionQuery'; import {DatabaseService} from '../../../database/database.service'; import {GenericFunction} from '../generic-function'; import {UploadService} from '../file-uploader-service'; +import { Request } from 'express'; +import { GrammarService } from '../grammar/grammar.service'; +const fs = require('fs'); +const {parse} = require('@fast-csv/parse'); @Injectable() export class DimensionService { - constructor(private DatabaseService: DatabaseService, private service: GenericFunction, private uploadService: UploadService) { + constructor(private DatabaseService: DatabaseService, private service: GenericFunction, private uploadService: UploadService, private grammarService: GrammarService) { } async createDimension(inputData) { @@ -118,4 +122,51 @@ export class DimensionService { throw new Error(e); } } + + async validateDimension(inputData, file: Express.Multer.File, request?: Request) { + return new Promise(async (resolve, reject) => { + const { id } = inputData; + const fileCompletePath = file.path; + const fileSize = file.size; + const uploadedFileName = file.originalname; + const dimensionGrammar = await this.grammarService.getDimensionSchemaByID(+id); + + if (dimensionGrammar?.length > 0) { + const schema = dimensionGrammar[0].schema; + const rows = []; + const csvReadStream = fs.createReadStream(fileCompletePath) + .pipe(parse({headers: true})) + .on('data', async (csvrow) => { + this.service.formatDataToCSVBySchema(csvrow, schema, false); + const isValidSchema: any = await this.service.ajvValidator(schema, csvrow); + if (isValidSchema.errors) { + csvrow['error_description'] = isValidSchema.errors; + } + + rows.push(csvrow); + }) + .on('error', async (err) => { + // delete the file + try { + await fs.unlinkSync(fileCompletePath); + reject(`Error -> file stream error ${err}`); + } catch (e) { + console.error('csvImport.service.file delete error: ', e); + reject(`csvImport.service.file delete error: ${e}`); + } + }) + .on('end', async () => { + try { + await fs.unlinkSync(fileCompletePath); + resolve(rows); + } catch (e) { + console.error('csvImport.service.file delete error: ', e); + reject('csvImport.service.file delete error: ' + e); + } + }); + } else { + reject('No grammar found for the given id'); + } + }); + } } diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index 6886e95..a6d273c 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -3,10 +3,14 @@ import { IngestionDatasetQuery } from '../../query/ingestionQuery'; import { DatabaseService } from '../../../database/database.service'; import { GenericFunction } from '../generic-function'; import { UploadService } from '../file-uploader-service'; +import { GrammarService } from '../grammar/grammar.service'; +import { Request } from 'express'; +const fs = require('fs'); +const {parse} = require('@fast-csv/parse'); @Injectable() export class EventService { - constructor(private DatabaseService: DatabaseService, private service: GenericFunction, private uploadService: UploadService) { + constructor(private DatabaseService: DatabaseService, private service: GenericFunction, private uploadService: UploadService, private grammarService: GrammarService) { } async createEvent(inputData) { @@ -146,4 +150,51 @@ export class EventService { throw new Error(e); } } + + async validateEvent(inputData, file: Express.Multer.File, request?: Request) { + return new Promise(async (resolve, reject) => { + const { id } = inputData; + const fileCompletePath = file.path; + const fileSize = file.size; + const uploadedFileName = file.originalname; + const eventGrammar = await this.grammarService.getEventSchemaByID(+id); + + if (eventGrammar?.length > 0) { + const schema = eventGrammar[0].schema; + const rows = []; + const csvReadStream = fs.createReadStream(fileCompletePath) + .pipe(parse({headers: true})) + .on('data', async (csvrow) => { + this.service.formatDataToCSVBySchema(csvrow, schema, false); + const isValidSchema: any = await this.service.ajvValidator(schema, csvrow); + if (isValidSchema.errors) { + csvrow['error_description'] = isValidSchema.errors; + } + + rows.push(csvrow); + }) + .on('error', async (err) => { + // delete the file + try { + await fs.unlinkSync(fileCompletePath); + reject(`Error -> file stream error ${err}`); + } catch (e) { + console.error('csvImport.service.file delete error: ', e); + reject(`csvImport.service.file delete error: ${e}`); + } + }) + .on('end', async () => { + try { + await fs.unlinkSync(fileCompletePath); + resolve(rows); + } catch (e) { + console.error('csvImport.service.file delete error: ', e); + reject('csvImport.service.file delete error: ' + e); + } + }); + } else { + reject('No grammar found for the given id'); + } + }); + } } \ No newline at end of file diff --git a/src/ingestion/services/generic-function.ts b/src/ingestion/services/generic-function.ts index 4e3b6bd..d204f06 100644 --- a/src/ingestion/services/generic-function.ts +++ b/src/ingestion/services/generic-function.ts @@ -7,7 +7,7 @@ import * as fs from 'fs'; const path = require('path'); const csv = require('fast-csv'); -const ajv = new Ajv2019(); +const ajv = new Ajv2019({allErrors: true}); addFormats(ajv); const ObjectsToCsv = require('objects-to-csv'); diff --git a/src/ingestion/services/grammar/grammar.service.spec.ts b/src/ingestion/services/grammar/grammar.service.spec.ts new file mode 100644 index 0000000..4c8272f --- /dev/null +++ b/src/ingestion/services/grammar/grammar.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { GrammarService } from './grammar.service'; + +describe('GrammarService', () => { + let service: GrammarService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [GrammarService], + }).compile(); + + service = module.get(GrammarService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/ingestion/services/grammar/grammar.service.ts b/src/ingestion/services/grammar/grammar.service.ts new file mode 100644 index 0000000..5bf56c8 --- /dev/null +++ b/src/ingestion/services/grammar/grammar.service.ts @@ -0,0 +1,24 @@ +import { Injectable } from '@nestjs/common'; +import { DatabaseService } from 'src/database/database.service'; + +@Injectable() +export class GrammarService { + constructor(private _databaseService: DatabaseService) { + } + + async getEventSchemas() { + return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar"`); + } + + async getDimensionSchemas() { + return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar"`); + } + + async getEventSchemaByID(id) { + return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar" where id = $1`, [id]); + } + + async getDimensionSchemaByID(id) { + return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar" where id = $1`, [id]); + } +}