From bcffad2e4b19715877c495ddb63817fa85aec944 Mon Sep 17 00:00:00 2001 From: dhanush-2397 Date: Tue, 12 Sep 2023 09:57:28 +0530 Subject: [PATCH] API's added --- package.json | 1 + src/database/database.service.ts | 6 ++- .../controller/ingestion.controller.ts | 23 ++++---- src/ingestion/ingestion.module.ts | 4 +- src/ingestion/interfaces/Ingestion-data.ts | 4 +- src/ingestion/query/ingestionQuery.ts | 8 +-- src/ingestion/services/dateService.ts | 6 ++- .../services/file-uploader-service.ts | 5 +- .../services/nvsk-api/nvsk-api.service.ts | 42 ++++++++++----- .../upload-dimension-file.service.spec.ts | 18 +++++++ .../upload-dimension-file.service.ts | 53 +++++++++++++++++++ .../v4-data-emission.service.ts | 34 ------------ 12 files changed, 134 insertions(+), 70 deletions(-) create mode 100644 src/ingestion/services/upload-dimension-file/upload-dimension-file.service.spec.ts create mode 100644 src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts delete mode 100644 src/ingestion/services/v4-data-emission/v4-data-emission.service.ts diff --git a/package.json b/package.json index 35e55d0..bc1e625 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "csvtojson": "^2.0.10", "express": "^4.18.2", "fast-csv": "^4.3.6", + "form-data": "^4.0.0", "fs": "^0.0.1-security", "jsonwebtoken": "^9.0.0", "mime-types": "^2.1.35", diff --git a/src/database/database.service.ts b/src/database/database.service.ts index bfde38d..d173702 100644 --- a/src/database/database.service.ts +++ b/src/database/database.service.ts @@ -12,6 +12,10 @@ export class DatabaseService { this.logger.debug(`Executing query: ${queryText} (${values})`); return this.pool.query(queryText, values).then((result: QueryResult) => { return result.rows; - }); + }, + (error)=>{ + return error + } + ); } } \ No newline at end of file diff --git a/src/ingestion/controller/ingestion.controller.ts b/src/ingestion/controller/ingestion.controller.ts index e3d9026..2a1aa49 100644 --- a/src/ingestion/controller/ingestion.controller.ts +++ b/src/ingestion/controller/ingestion.controller.ts @@ -37,10 +37,10 @@ import {UpdateFileStatusService} from '../services/update-file-status/update-fil import {ApiConsumes, ApiTags} from '@nestjs/swagger'; import {DatabaseService} from '../../database/database.service'; import {DataEmissionService} from '../services/data-emission/data-emission.service'; -import {V4DataEmissionService} from "../services/v4-data-emission/v4-data-emission.service"; import {JwtGuard} from '../../guards/jwt.guard'; import * as jwt from 'jsonwebtoken'; import { NvskApiService } from '../services/nvsk-api/nvsk-api.service'; +import { UploadDimensionFileService } from '../services/upload-dimension-file/upload-dimension-file.service'; @ApiTags('ingestion') @Controller('') @@ -48,9 +48,10 @@ export class IngestionController { constructor( private datasetService: DatasetService, private dimensionService: DimensionService , private eventService: EventService, private csvImportService: CsvImportService, private fileStatus: FileStatusService, private updateFileStatus: UpdateFileStatusService, - private databaseService: DatabaseService, private dataEmissionService: DataEmissionService, private v4DataEmissionService: V4DataEmissionService, + private databaseService: DatabaseService, private dataEmissionService: DataEmissionService, private rawDataImportService:RawDataImportService, - private nvskService:NvskApiService) { + private nvskService:NvskApiService, + private uploadDimension:UploadDimensionFileService) { } @Get('generatejwt') @@ -212,24 +213,23 @@ export class IngestionController { } - @Get('/v4-data-emission') - @UseGuards(JwtGuard) + @Get('/upload-dimension') async dataEmission(@Res()response: Response) { try { - const result: any = await this.v4DataEmissionService.uploadFiles(); + const result: any = await this.uploadDimension.uploadFiles(); if (result.code == 400) { response.status(400).send({message: result.error}); } else { response.status(200).send({message: result.message}); } } catch (e) { - console.error('ingestion.controller.v4dataEmission: ', e.message); + console.error('ingestion.controller.uploadDimensionFiles: ', e.message); throw new Error(e); } } @Post('/getRawData') - // @UseGuards(JwtGuard) + @UseGuards(JwtGuard) async getPresignedUrls(@Body() inputData: RawDataPullBody, @Res()response: Response){ try { @@ -240,10 +240,11 @@ export class IngestionController { throw new Error(e); } } - @Get('/data-emitter') - async fetchData(@Body()inputData: NvskApiService,@Res()response: Response){ + @Post('/data-emitter') + @UseGuards(JwtGuard) + async fetchData(@Body()inputData:RawDataPullBody ,@Res()response: Response,@Req() request: Request){ try { - const result: any = await this.nvskService.getEmitterData(); + const result: any = await this.nvskService.getEmitterData(inputData?.program_names, request); console.log("result is", result); if (result?.code == 400) { response.status(400).send({message: result.error}); diff --git a/src/ingestion/ingestion.module.ts b/src/ingestion/ingestion.module.ts index dadfa80..3f2633d 100644 --- a/src/ingestion/ingestion.module.ts +++ b/src/ingestion/ingestion.module.ts @@ -11,16 +11,16 @@ import {CsvImportService} from "./services/csvImport/csvImport.service"; import {FileStatusService} from './services/file-status/file-status.service'; import {UpdateFileStatusService} from "./services/update-file-status/update-file-status.service"; import {DataEmissionService} from "./services/data-emission/data-emission.service"; -import {V4DataEmissionService} from "./services/v4-data-emission/v4-data-emission.service"; import {UploadService} from "./services/file-uploader-service"; import { RawDataImportService } from './services/rawDataImport/rawDataImport.service'; import { NvskApiService } from './services/nvsk-api/nvsk-api.service'; import { DateService } from './services/dateService'; +import { UploadDimensionFileService } from './services/upload-dimension-file/upload-dimension-file.service'; @Module({ controllers: [IngestionController], providers: [DatasetService, DimensionService, EventService, GenericFunction, HttpCustomService, CsvImportService, FileStatusService, - UpdateFileStatusService, DataEmissionService, V4DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService], + UpdateFileStatusService, DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService,UploadDimensionFileService], imports: [DatabaseModule, HttpModule] }) export class IngestionModule { diff --git a/src/ingestion/interfaces/Ingestion-data.ts b/src/ingestion/interfaces/Ingestion-data.ts index a035cc3..70d7daa 100644 --- a/src/ingestion/interfaces/Ingestion-data.ts +++ b/src/ingestion/interfaces/Ingestion-data.ts @@ -162,9 +162,9 @@ export class EmissionBody { export class RawDataPullBody{ @ApiProperty() - program_names: string[]; + program_names?: string[]; @ApiProperty() - token:string; + token?:string; } export interface RawDataResponse { diff --git a/src/ingestion/query/ingestionQuery.ts b/src/ingestion/query/ingestionQuery.ts index f1afec5..61eff68 100644 --- a/src/ingestion/query/ingestionQuery.ts +++ b/src/ingestion/query/ingestionQuery.ts @@ -79,9 +79,9 @@ export const IngestionDatasetQuery = { ingestion."FileTracker" where pid = $1`; return {query:queryStr,values: [fileTrackerPid]}; }, - async insertIntoEmission(program_name:string,presignedurl:string,file_status:string){ - const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,file_status)VALUES - ($1,$2,$3)`; - return {query:queryStr,values:[program_name,presignedurl,file_status]} + async insertIntoEmission(program_name:string,presignedurl:string,jwt_token:string,file_status:string){ + const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,jwt_token,file_status)VALUES + ($1,$2,$3,$4)`; + return {query:queryStr,values:[program_name,presignedurl,jwt_token,file_status]} } }; \ No newline at end of file diff --git a/src/ingestion/services/dateService.ts b/src/ingestion/services/dateService.ts index bf29907..479a5dc 100644 --- a/src/ingestion/services/dateService.ts +++ b/src/ingestion/services/dateService.ts @@ -24,9 +24,13 @@ export class DateService{ if (this.isTimeGreaterThan12PM(currentTime)) { // Convert to IST (add 5 hours and 30 minutes for the UTC+5:30 offset) const [hours, minutes] = currentTime.split(':'); + // const hour = parseInt(hours, 10) + 5; + // const minute = parseInt(minutes, 10) + 30; + console.log("Hours is:", hours); console.log("Minutes is:",minutes); - return [hours,minutes] + let min = parseInt(minutes) + 2 + return [hours,min] } else { return currentTime } diff --git a/src/ingestion/services/file-uploader-service.ts b/src/ingestion/services/file-uploader-service.ts index 4ca7b2c..6d23ca6 100644 --- a/src/ingestion/services/file-uploader-service.ts +++ b/src/ingestion/services/file-uploader-service.ts @@ -145,13 +145,14 @@ export class UploadService { signatureVersion: "v4", accessKeyId: process.env.AWS_ACCESS_KEY, secretAccessKey: process.env.AWS_SECRET_KEY, - region: 'ap-south-1' + region: process.env.AWS_REGION }); return new Promise((resolve, reject) => { // file name const params = { Bucket: process.env.AWS_BUCKET, Key: fileName, + Expires: 3600 }; s3.getSignedUrl('getObject', params, (err, url) => { if (err) { @@ -171,7 +172,7 @@ export class UploadService { secretKey: process.env.MINIO_SECRET_KEY, }); return new Promise((resolve, reject) => { - minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 24 * 60 * 60, function (err, presignedUrl) { + minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 3600, function (err, presignedUrl) { if (err) { console.log(err) } else { diff --git a/src/ingestion/services/nvsk-api/nvsk-api.service.ts b/src/ingestion/services/nvsk-api/nvsk-api.service.ts index f7da95e..0b7f748 100644 --- a/src/ingestion/services/nvsk-api/nvsk-api.service.ts +++ b/src/ingestion/services/nvsk-api/nvsk-api.service.ts @@ -6,6 +6,7 @@ import { Body, Injectable } from '@nestjs/common'; import axios from 'axios'; import { IngestionDatasetQuery } from 'src/ingestion/query/ingestionQuery'; import { DateService } from '../dateService'; +import { Request } from 'express'; const csv = require('csv-parser'); const fs = require('fs'); @Injectable() @@ -15,28 +16,36 @@ export class NvskApiService { } /* NVSK side implementations */ - async getEmitterData() { + async getEmitterData(inputData: string[],request: Request) { let urlData; - let names = process.env.PROGRAM_NAMES.split(',') + let names; + if(!inputData || inputData.length == 0){ + names = process.env.PROGRAM_NAMES?.split(',') + }else{ + names = inputData; + } let body: any = { "program_names": names } try { - const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body) + const headers = { + Authorization: request.headers.authorization + }; + const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body,{headers: headers}) if (result?.data['code'] === 200) { urlData = result?.data['data'] } else { console.log("Error ocurred::", JSON.stringify(result.data)); return { code: 400, error: result?.data['error'] ? result?.data['error'] : "Error occured during the NVSK data emission" } } - this.writeRawDataFromUrl(urlData) + this.writeRawDataFromUrl(urlData,headers.Authorization) return { code: 200, message: "VSK Writing to the file in process" } } catch (error) { return { code: 400, errorMsg: error } } } - async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>) { + async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>,jwtToken:string) { try { if (urlData?.length > 0) { for (let data of urlData) { @@ -44,6 +53,9 @@ export class NvskApiService { for (let url of data.urls) { const parsedUrl = new URL(url); const fileName = `./rawdata-files/` + parsedUrl.pathname.split('/').pop(); + if(fs.existsSync(fileName)){ + this.service.deleteLocalFile(fileName); + } const stream = (await axios.get(url, { responseType: 'stream' })).data const filteredCsvStream = fs.createWriteStream(`${fileName}`); let isFirstRow = true; @@ -74,8 +86,9 @@ export class NvskApiService { } this.service.deleteLocalFile(fileName) - const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, 'Uploaded') + const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1],'Uploaded') const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values) + console.log("The result is:", result); console.log(`Filtered data saved to ${fileName}`); } catch (error) { this.service.deleteLocalFile(fileName) @@ -83,8 +96,9 @@ export class NvskApiService { }) }) .on('error', async (error) => { - const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, error) - const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values) + const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1], error) + const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values); + this.service.deleteLocalFile(fileName); console.error('Error processing CSV:', error); }); @@ -95,13 +109,15 @@ export class NvskApiService { let cronExpression = `0 ${dateResult[1]} ${dateResult[0]} * * ?` console.log("Cron expression is:", cronExpression); let url = `${process.env.SPEC_URL}` + '/schedule' - let body = { + let scheduleBody = { "processor_group_name": "Run_adapters", - "scheduled_at": `"${cronExpression}"` + "scheduled_at": `${cronExpression}` } - let scheduleResult = await this.httpService.post(url, body) - if (scheduleResult?.data['code'] === 200) { - return { code: 200, message: "successfully scheduled the adapters" } + console.log("The schedule body is:",scheduleBody) + let scheduleResult = await this.httpService.post(url, scheduleBody) + console.log('The scheudle result is:',scheduleResult?.data['message']); + if (scheduleResult.status === 200) { + return { code: 200, message: scheduleResult?.['data']['message'] } } else { return { code: 400, error: "Adapter schedule failed" } } diff --git a/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.spec.ts b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.spec.ts new file mode 100644 index 0000000..b22502e --- /dev/null +++ b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { UploadDimensionFileService } from './upload-dimension-file.service'; + +describe('UploadDimensionFileService', () => { + let service: UploadDimensionFileService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [UploadDimensionFileService], + }).compile(); + + service = module.get(UploadDimensionFileService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts new file mode 100644 index 0000000..e563ad7 --- /dev/null +++ b/src/ingestion/services/upload-dimension-file/upload-dimension-file.service.ts @@ -0,0 +1,53 @@ +import { HttpCustomService } from './../HttpCustomService'; +import { Injectable } from '@nestjs/common'; +const fs = require('fs'); +const FormData = require('form-data'); + +@Injectable() +export class UploadDimensionFileService { + constructor(private httpService:HttpCustomService){} + + async uploadFiles(){ + let folderPath = './dimension-files' + try{ + let result = await this.httpService.get(process.env.URL + '/generatejwt') + let token: any = result?.data; + let res = await this.callCsvImportAPI(token, folderPath) + return res + }catch(error){ + return {code:400, error:error} + } + } + async callCsvImportAPI(data: string, folderPath:string){ + const files = fs.readdirSync(folderPath); + let promises = []; + for(let i=0;i