Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
dhanush-2397 committed Sep 14, 2023
2 parents 46a8cf3 + 879e46a commit f152c51
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
18 changes: 18 additions & 0 deletions src/ingestion/controller/ingestion.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ export class IngestionController {
}
}

@Post('/telemetryEvent')
@UseGuards(JwtGuard)
async createTelemetryEvent(@Body() inputData: IEvent, @Res()response: Response){
try {
let result: Result = await this.eventService.createTelemetryEvent(inputData);
if (result.code == 400) {
response.status(400).send({"message": result.error});
} else {
response.status(200).send({
"message": result.message, invalid_record_count: result.errorCounter,
valid_record_count: result.validCounter
});
}
} catch (error) {

}
}

@UseInterceptors(FileInterceptor('file', {
storage: diskStorage({
destination: './files',
Expand Down
73 changes: 72 additions & 1 deletion src/ingestion/services/event/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,5 +196,76 @@ export class EventService {
reject('No grammar found for the given id');
}
});
}
}

async createTelemetryEvent(inputData) {
try {
if (inputData?.event_name) {
let eventName = inputData?.event_name;
let queryStr = await IngestionDatasetQuery.getEvents(eventName);
const queryResult = await this.DatabaseService.executeQuery(queryStr?.query, queryStr?.values);
if (queryResult?.length === 1) {
let validArray = [], invalidArray = [];
if (inputData?.event && inputData?.event.length > 0) {
for (let record of inputData.event) {
let schema = queryResult[0].schema;
validArray.push(await this.service.formatDataToCSVBySchema(record, schema));
}
let file;
if (validArray?.length > 0) {
if(!inputData?.isTelemetryWritingEnd){
file = `./emission-files/` + eventName + '.csv';
await this.service.writeTelemetryToCSV(file, validArray);
}
}

queryStr = await IngestionDatasetQuery.updateTotalCounter(inputData.file_tracker_pid);
await this.DatabaseService.executeQuery(queryStr.query, queryStr.values);

invalidArray = undefined;
validArray = undefined;
return {
code: 200,
message: "Event added to file successfully"
}
} else {
if(inputData?.isTelemetryWritingEnd){
let filePath = `./emission-files/` + eventName + ".csv";
let folderName = await this.service.getDate();
if (process.env.STORAGE_TYPE === 'local') {
await this.uploadService.uploadFiles('local', `${process.env.MINIO_BUCKET}`, filePath, `emission/${folderName}/`);
} else if (process.env.STORAGE_TYPE === 'azure') {
await this.uploadService.uploadFiles('azure', `${process.env.AZURE_CONTAINER}`, filePath, `emission/${folderName}/`);
} else if (process.env.STORAGE_TYPE === 'oracle') {
await this.uploadService.uploadFiles('oracle', `${process.env.ORACLE_BUCKET}`, filePath, `emission/${folderName}/`);
} else {
await this.uploadService.uploadFiles('aws', `${process.env.AWS_BUCKET}`, filePath, `emission/${folderName}/`);
}
// delete the file
await this.service.deleteLocalFile(filePath)
return {
code: 200,
message: "Event data uploaded successfully"
}
}

}
}
else {
return {
code: 400,
error: "No Event"
}
}
} else {
return {
code: 400,
error: "Event name is missing"
}
}
} catch (e) {
console.error('create-event-impl.executeQueryAndReturnResults: ', e.message);
throw new Error(e);
}
}
}
4 changes: 2 additions & 2 deletions src/ingestion/services/grammar/grammar.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ export class GrammarService {
}

async getEventSchemas() {
return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar"`);
return await this._databaseService.executeQuery(`select id, name, schema from spec."EventGrammar" WHERE eventType='EXTERNAL'`);
}

async getDimensionSchemas() {
return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar"`);
return await this._databaseService.executeQuery(`select id, name, schema from spec."DimensionGrammar" WHERE dimensionType='EXTERNAL'`);
}

async getEventSchemaByID(id) {
Expand Down

0 comments on commit f152c51

Please sign in to comment.