diff --git a/src/ingestion/controller/ingestion.controller.ts b/src/ingestion/controller/ingestion.controller.ts index 1a8bad0..aa7d6b7 100644 --- a/src/ingestion/controller/ingestion.controller.ts +++ b/src/ingestion/controller/ingestion.controller.ts @@ -158,6 +158,24 @@ export class IngestionController { } } + @Post('/telemetryEvent') + @UseGuards(JwtGuard) + async createTelemetryEvent(@Body() inputData: IEvent, @Res()response: Response){ + try { + let result: Result = await this.eventService.createTelemetryEvent(inputData); + if (result.code == 400) { + response.status(400).send({"message": result.error}); + } else { + response.status(200).send({ + "message": result.message, invalid_record_count: result.errorCounter, + valid_record_count: result.validCounter + }); + } + } catch (error) { + + } + } + @UseInterceptors(FileInterceptor('file', { storage: diskStorage({ destination: './files', diff --git a/src/ingestion/services/event/event.service.ts b/src/ingestion/services/event/event.service.ts index a6d273c..134d890 100644 --- a/src/ingestion/services/event/event.service.ts +++ b/src/ingestion/services/event/event.service.ts @@ -196,5 +196,76 @@ export class EventService { reject('No grammar found for the given id'); } }); - } + } + + async createTelemetryEvent(inputData) { + try { + if (inputData?.event_name) { + let eventName = inputData?.event_name; + let queryStr = await IngestionDatasetQuery.getEvents(eventName); + const queryResult = await this.DatabaseService.executeQuery(queryStr?.query, queryStr?.values); + if (queryResult?.length === 1) { + let validArray = [], invalidArray = []; + if (inputData?.event && inputData?.event.length > 0) { + for (let record of inputData.event) { + let schema = queryResult[0].schema; + validArray.push(await this.service.formatDataToCSVBySchema(record, schema)); + } + let file; + if (validArray?.length > 0) { + if(!inputData?.isTelemetryWritingEnd){ + file = `./emission-files/` + eventName + '.csv'; + await this.service.writeTelemetryToCSV(file, validArray); + } + } + + queryStr = await IngestionDatasetQuery.updateTotalCounter(inputData.file_tracker_pid); + await this.DatabaseService.executeQuery(queryStr.query, queryStr.values); + + invalidArray = undefined; + validArray = undefined; + return { + code: 200, + message: "Event added to file successfully" + } + } else { + if(inputData?.isTelemetryWritingEnd){ + let filePath = `./emission-files/` + eventName + ".csv"; + let folderName = await this.service.getDate(); + if (process.env.STORAGE_TYPE === 'local') { + await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'azure') { + await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`); + } else if (process.env.STORAGE_TYPE === 'oracle') { + await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`); + } else { + await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`); + } + // delete the file + await this.service.deleteLocalFile(filePath) + return { + code: 200, + message: "Event data uploaded successfully" + } + } + + } + } + else { + return { + code: 400, + error: "No Event" + } + } + } else { + return { + code: 400, + error: "Event name is missing" + } + } + } catch (e) { + console.error('create-event-impl.executeQueryAndReturnResults: ', e.message); + throw new Error(e); + } + } } \ No newline at end of file diff --git a/src/ingestion/services/grammar/grammar.service.ts b/src/ingestion/services/grammar/grammar.service.ts index 5bf56c8..6605dc6 100644 --- a/src/ingestion/services/grammar/grammar.service.ts +++ b/src/ingestion/services/grammar/grammar.service.ts @@ -7,11 +7,11 @@ export class GrammarService { } async getEventSchemas() { - return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar"`); + return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar" WHERE eventType='EXTERNAL'`); } async getDimensionSchemas() { - return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar"`); + return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar" WHERE dimensionType='EXTERNAL'`); } async getEventSchemaByID(id) {