From cceaa0b1eb32af16260b3230b67b179e1c1446a7 Mon Sep 17 00:00:00 2001 From: htvenkatesh <68595990+htvenkatesh@users.noreply.github.com> Date: Tue, 29 Aug 2023 15:07:13 +0530 Subject: [PATCH 1/7] Update package.json --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d33ee5e..63658ec 100644 --- a/package.json +++ b/package.json @@ -63,7 +63,7 @@ "@types/express": "^4.17.13", "@types/jest": "28.1.8", "@types/mime-types": "^2.1.1", - "@types/minio": "^7.0.17", + "@types/minio": "7.0.18", "@types/multer": "^1.4.7", "@types/node": "^16.0.0", "@types/supertest": "^2.0.11", From ef464634d9cf941568b575105070aa0693139281 Mon Sep 17 00:00:00 2001 From: htvenkatesh <68595990+htvenkatesh@users.noreply.github.com> Date: Tue, 12 Sep 2023 20:30:49 +0530 Subject: [PATCH 2/7] Update upload-dimension-file.service.ts --- .../upload-dimension-file/upload-dimension-file.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts index e563ad7..13ac2f2 100644 --- a/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts +++ b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts @@ -8,7 +8,7 @@ export class UploadDimensionFileService { constructor(private httpService:HttpCustomService){} async uploadFiles(){ - let folderPath = './dimension-files' + let folderPath = './dimension_files' try{ let result = await this.httpService.get(process.env.URL + '/generatejwt') let token: any = result?.data; From b3b00b2432a675d183b9e70db791a85d7c4bf789 Mon Sep 17 00:00:00 2001 From: pandutibil Date: Thu, 14 Sep 2023 11:28:42 +0530 Subject: [PATCH 3/7] External schema check added for grammar service --- src/ingestion/services/grammar/grammar.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingestion/services/grammar/grammar.service.ts b/src/ingestion/services/grammar/grammar.service.ts index 5bf56c8..6605dc6 100644 --- a/src/ingestion/services/grammar/grammar.service.ts +++ b/src/ingestion/services/grammar/grammar.service.ts @@ -7,11 +7,11 @@ export class GrammarService { } async getEventSchemas() { - return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar"`); + return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar" WHERE eventType='EXTERNAL'`); } async getDimensionSchemas() { - return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar"`); + return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar" WHERE dimensionType='EXTERNAL'`); } async getEventSchemaByID(id) { From 4409cd611187c80a632a3e716a2ce54c6a041222 Mon Sep 17 00:00:00 2001 From: htvenkatesh Date: Thu, 14 Sep 2023 12:40:51 +0530 Subject: [PATCH 4/7] changed telemetry calling function --- .../controller/ingestion.controller.ts | 18 +++++ src/ingestion/services/event/event.service.ts | 77 ++++++++++++++++++- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/src/ingestion/controller/ingestion.controller.ts b/src/ingestion/controller/ingestion.controller.ts index 98d397a..f4dd167 100644 --- a/src/ingestion/controller/ingestion.controller.ts +++ b/src/ingestion/controller/ingestion.controller.ts @@ -158,6 +158,24 @@ export class IngestionController { } } + @Post('/telemetryEvent') + @UseGuards(JwtGuard) + async createTelemetryEvent(@Body() inputData: IEvent, @Res()response: Response){ + try { + let result: Result = await this.eventService.createTelemetryEvent(inputData); + if (result.code == 400) { + response.status(400).send({"message": result.error}); + } else { + response.status(200).send({ + "message": result.message, invalid_record_count: result.errorCounter, + valid_record_count: result.validCounter + }); + } + } catch (error) { + + } + } + @UseInterceptors(FileInterceptor('file', { storage: diskStorage({ destination: './files', diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index a6d273c..b301649 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -196,5 +196,80 @@ export class EventService { reject('No grammar found for the given id'); } }); - } + } + + async createTelemetryEvent(inputData) { + try { + if (inputData.event_name) { + let eventName = inputData.event_name; + let queryStr = await IngestionDatasetQuery.getEvents(eventName); + const queryResult = await this.DatabaseService.executeQuery(queryStr.query, queryStr.values); + if (queryResult?.length === 1) { + if(inputData?.isTelemetryWritingEnd){ + let filePath = `./emission-files/` + eventName + ".csv"; + let folderName = await this.service.getDate(); + if (process.env.STORAGE_TYPE === 'local') { + await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'azure') { + await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'oracle') { + await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); + } else { + await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); + } + // delete the file + await this.service.deleteLocalFile(filePath) + return { + code: 200, + message: "Event data uploaded successfully" + } + } + + let validArray = [], invalidArray = []; + if (inputData.event && inputData.event.length > 0) { + for (let record of inputData.event) { + let schema = queryResult[0].schema; + validArray.push(await this.service.formatDataToCSVBySchema(record, schema)); + } + let file; + if (validArray.length > 0) { + if(!inputData?.isTelemetryWritingEnd){ + file = `./emission-files/` + eventName + '.csv'; + await this.service.writeTelemetryToCSV(file, validArray); + } + } + + queryStr = await IngestionDatasetQuery.updateTotalCounter(inputData.file_tracker_pid); + await this.DatabaseService.executeQuery(queryStr.query, queryStr.values); + + invalidArray = undefined; + validArray = undefined; + return { + code: 200, + message: "Event added successfully" + } + } else { + return { + code: 400, + error: "Event array is required and cannot be empty" + } + } + } + else { + return { + code: 400, + error: "No event found" + } + } + } else { + return { + code: 400, + error: "Event name is missing" + } + } + } catch (e) { + console.error('create-event-impl.executeQueryAndReturnResults: ', e.message); + throw new Error(e); + } + } } \ No newline at end of file From d34c98885299ceea67fdce80b96f4d128b33ca0f Mon Sep 17 00:00:00 2001 From: htvenkatesh Date: Thu, 14 Sep 2023 12:48:06 +0530 Subject: [PATCH 5/7] changes for telemetry --- src/ingestion/services/event/event.service.ts | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index b301649..49b6010 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -205,26 +205,6 @@ export class EventService { let queryStr = await IngestionDatasetQuery.getEvents(eventName); const queryResult = await this.DatabaseService.executeQuery(queryStr.query, queryStr.values); if (queryResult?.length === 1) { - if(inputData?.isTelemetryWritingEnd){ - let filePath = `./emission-files/` + eventName + ".csv"; - let folderName = await this.service.getDate(); - if (process.env.STORAGE_TYPE === 'local') { - await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); - } else if (process.env.STORAGE_TYPE === 'azure') { - await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); - } else if (process.env.STORAGE_TYPE === 'oracle') { - await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); - } else { - await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); - } - // delete the file - await this.service.deleteLocalFile(filePath) - return { - code: 200, - message: "Event data uploaded successfully" - } - } - let validArray = [], invalidArray = []; if (inputData.event && inputData.event.length > 0) { for (let record of inputData.event) { @@ -256,9 +236,24 @@ export class EventService { } } else { - return { - code: 400, - error: "No event found" + if(inputData?.isTelemetryWritingEnd){ + let filePath = `./emission-files/` + eventName + ".csv"; + let folderName = await this.service.getDate(); + if (process.env.STORAGE_TYPE === 'local') { + await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'azure') { + await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'oracle') { + await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); + } else { + await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); + } + // delete the file + await this.service.deleteLocalFile(filePath) + return { + code: 200, + message: "Event data uploaded successfully" + } } } } else { From 46127aaf52eee3ace1a14a821175c9cfb087d575 Mon Sep 17 00:00:00 2001 From: htvenkatesh Date: Thu, 14 Sep 2023 12:49:40 +0530 Subject: [PATCH 6/7] chages --- src/ingestion/services/event/event.service.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index 49b6010..31f8b8e 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -200,10 +200,10 @@ export class EventService { async createTelemetryEvent(inputData) { try { - if (inputData.event_name) { - let eventName = inputData.event_name; + if (inputData?.event_name) { + let eventName = inputData?.event_name; let queryStr = await IngestionDatasetQuery.getEvents(eventName); - const queryResult = await this.DatabaseService.executeQuery(queryStr.query, queryStr.values); + const queryResult = await this.DatabaseService.executeQuery(queryStr?.query, queryStr?.values); if (queryResult?.length === 1) { let validArray = [], invalidArray = []; if (inputData.event && inputData.event.length > 0) { From 879e46ad3742c11cc853a8e54b16e5f2fc7be2db Mon Sep 17 00:00:00 2001 From: htvenkatesh Date: Thu, 14 Sep 2023 12:57:12 +0530 Subject: [PATCH 7/7] condition changed for telemetry --- src/ingestion/services/event/event.service.ts | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index 31f8b8e..134d890 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -206,13 +206,13 @@ export class EventService { const queryResult = await this.DatabaseService.executeQuery(queryStr?.query, queryStr?.values); if (queryResult?.length === 1) { let validArray = [], invalidArray = []; - if (inputData.event && inputData.event.length > 0) { + if (inputData?.event && inputData?.event.length > 0) { for (let record of inputData.event) { let schema = queryResult[0].schema; validArray.push(await this.service.formatDataToCSVBySchema(record, schema)); } let file; - if (validArray.length > 0) { + if (validArray?.length > 0) { if(!inputData?.isTelemetryWritingEnd){ file = `./emission-files/` + eventName + '.csv'; await this.service.writeTelemetryToCSV(file, validArray); @@ -226,34 +226,35 @@ export class EventService { validArray = undefined; return { code: 200, - message: "Event added successfully" + message: "Event added to file successfully" } } else { - return { - code: 400, - error: "Event array is required and cannot be empty" + if(inputData?.isTelemetryWritingEnd){ + let filePath = `./emission-files/` + eventName + ".csv"; + let folderName = await this.service.getDate(); + if (process.env.STORAGE_TYPE === 'local') { + await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'azure') { + await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'oracle') { + await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); + } else { + await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); + } + // delete the file + await this.service.deleteLocalFile(filePath) + return { + code: 200, + message: "Event data uploaded successfully" + } } + } } else { - if(inputData?.isTelemetryWritingEnd){ - let filePath = `./emission-files/` + eventName + ".csv"; - let folderName = await this.service.getDate(); - if (process.env.STORAGE_TYPE === 'local') { - await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); - } else if (process.env.STORAGE_TYPE === 'azure') { - await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); - } else if (process.env.STORAGE_TYPE === 'oracle') { - await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); - } else { - await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); - } - // delete the file - await this.service.deleteLocalFile(filePath) - return { - code: 200, - message: "Event data uploaded successfully" - } + return { + code: 400, + error: "No Event" } } } else {