Skip to content

Commit

Permalink
Merge pull request #260 from dhanush-2397/dev
Browse files Browse the repository at this point in the history
API added to upload files for dimension
  • Loading branch information
htvenkatesh authored Sep 12, 2023
2 parents e26254e + 96022d2 commit 805c957
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 70 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"csvtojson": "^2.0.10",
"express": "^4.18.2",
"fast-csv": "^4.3.6",
"form-data": "^4.0.0",
"fs": "^0.0.1-security",
"jsonwebtoken": "^9.0.0",
"mime-types": "^2.1.35",
Expand Down
6 changes: 5 additions & 1 deletion src/database/database.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export class DatabaseService {
this.logger.debug(`Executing query: ${queryText} (${values})`);
return this.pool.query(queryText, values).then((result: QueryResult) => {
return result.rows;
});
},
(error)=>{
return error
}
);
}
}
23 changes: 12 additions & 11 deletions src/ingestion/controller/ingestion.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import {UpdateFileStatusService} from '../services/update-file-status/update-fil
import {ApiConsumes, ApiTags} from '@nestjs/swagger';
import {DatabaseService} from '../../database/database.service';
import {DataEmissionService} from '../services/data-emission/data-emission.service';
import {V4DataEmissionService} from "../services/v4-data-emission/v4-data-emission.service";
import {JwtGuard} from '../../guards/jwt.guard';
import * as jwt from 'jsonwebtoken';
import { NvskApiService } from '../services/nvsk-api/nvsk-api.service';
import { UploadDimensionFileService } from '../services/upload-dimension-file/upload-dimension-file.service';
import { GrammarService } from '../services/grammar/grammar.service';
import { GenericFunction } from '../services/generic-function';

Expand Down Expand Up @@ -71,11 +71,12 @@ export class IngestionController {
constructor(
private datasetService: DatasetService, private dimensionService: DimensionService
, private eventService: EventService, private csvImportService: CsvImportService, private fileStatus: FileStatusService, private updateFileStatus: UpdateFileStatusService,
private databaseService: DatabaseService, private dataEmissionService: DataEmissionService, private v4DataEmissionService: V4DataEmissionService,
private databaseService: DatabaseService, private dataEmissionService: DataEmissionService,
private rawDataImportService:RawDataImportService,
private nvskService:NvskApiService,
private grammarService: GrammarService,
private service: GenericFunction) {
private service: GenericFunction,
private uploadDimension:UploadDimensionFileService) {
}

@Get('generatejwt')
Expand Down Expand Up @@ -237,24 +238,23 @@ export class IngestionController {
}


@Get('/v4-data-emission')
@UseGuards(JwtGuard)
@Get('/upload-dimension')
async dataEmission(@Res()response: Response) {
try {
const result: any = await this.v4DataEmissionService.uploadFiles();
const result: any = await this.uploadDimension.uploadFiles();
if (result.code == 400) {
response.status(400).send({message: result.error});
} else {
response.status(200).send({message: result.message});
}
} catch (e) {
console.error('ingestion.controller.v4dataEmission: ', e.message);
console.error('ingestion.controller.uploadDimensionFiles: ', e.message);
throw new Error(e);
}
}

@Post('/getRawData')
// @UseGuards(JwtGuard)
@UseGuards(JwtGuard)
async getPresignedUrls(@Body() inputData: RawDataPullBody, @Res()response: Response){
try {

Expand All @@ -265,10 +265,11 @@ export class IngestionController {
throw new Error(e);
}
}
@Get('/data-emitter')
async fetchData(@Body()inputData: NvskApiService,@Res()response: Response){
@Post('/data-emitter')
@UseGuards(JwtGuard)
async fetchData(@Body()inputData:RawDataPullBody ,@Res()response: Response,@Req() request: Request){
try {
const result: any = await this.nvskService.getEmitterData();
const result: any = await this.nvskService.getEmitterData(inputData?.program_names, request);
console.log("result is", result);
if (result?.code == 400) {
response.status(400).send({message: result.error});
Expand Down
4 changes: 2 additions & 2 deletions src/ingestion/ingestion.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import {CsvImportService} from "./services/csvImport/csvImport.service";
import {FileStatusService} from './services/file-status/file-status.service';
import {UpdateFileStatusService} from "./services/update-file-status/update-file-status.service";
import {DataEmissionService} from "./services/data-emission/data-emission.service";
import {V4DataEmissionService} from "./services/v4-data-emission/v4-data-emission.service";
import {UploadService} from "./services/file-uploader-service";
import { RawDataImportService } from './services/rawDataImport/rawDataImport.service';
import { NvskApiService } from './services/nvsk-api/nvsk-api.service';
import { DateService } from './services/dateService';
import { UploadDimensionFileService } from './services/upload-dimension-file/upload-dimension-file.service';
import { GrammarService } from './services/grammar/grammar.service';

@Module({
controllers: [IngestionController],
providers: [DatasetService, DimensionService, EventService, GenericFunction, HttpCustomService, CsvImportService, FileStatusService,
UpdateFileStatusService, DataEmissionService, V4DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService,GrammarService],
UpdateFileStatusService, DataEmissionService, UploadDimensionFileService, UploadService,RawDataImportService,NvskApiService,DateService,GrammarService],
imports: [DatabaseModule, HttpModule]
})
export class IngestionModule {
Expand Down
4 changes: 2 additions & 2 deletions src/ingestion/interfaces/Ingestion-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ export class EmissionBody {

export class RawDataPullBody{
@ApiProperty()
program_names: string[];
program_names?: string[];
@ApiProperty()
token:string;
token?:string;
}

export interface RawDataResponse {
Expand Down
8 changes: 4 additions & 4 deletions src/ingestion/query/ingestionQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ export const IngestionDatasetQuery = {
ingestion."FileTracker" where pid = $1`;
return {query:queryStr,values: [fileTrackerPid]};
},
async insertIntoEmission(program_name:string,presignedurl:string,file_status:string){
const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,file_status)VALUES
($1,$2,$3)`;
return {query:queryStr,values:[program_name,presignedurl,file_status]}
async insertIntoEmission(program_name:string,presignedurl:string,jwt_token:string,file_status:string){
const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,jwt_token,file_status)VALUES
($1,$2,$3,$4)`;
return {query:queryStr,values:[program_name,presignedurl,jwt_token,file_status]}
}
};
6 changes: 5 additions & 1 deletion src/ingestion/services/dateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ export class DateService{
if (this.isTimeGreaterThan12PM(currentTime)) {
// Convert to IST (add 5 hours and 30 minutes for the UTC+5:30 offset)
const [hours, minutes] = currentTime.split(':');
// const hour = parseInt(hours, 10) + 5;
// const minute = parseInt(minutes, 10) + 30;

console.log("Hours is:", hours);
console.log("Minutes is:",minutes);
return [hours,minutes]
let min = parseInt(minutes) + 2
return [hours,min]
} else {
return currentTime
}
Expand Down
5 changes: 3 additions & 2 deletions src/ingestion/services/file-uploader-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,14 @@ export class UploadService {
signatureVersion: "v4",
accessKeyId: process.env.AWS_ACCESS_KEY,
secretAccessKey: process.env.AWS_SECRET_KEY,
region: 'ap-south-1'
region: process.env.AWS_REGION
});
return new Promise((resolve, reject) => {
// file name
const params = {
Bucket: process.env.AWS_BUCKET,
Key: fileName,
Expires: 3600
};
s3.getSignedUrl('getObject', params, (err, url) => {
if (err) {
Expand All @@ -171,7 +172,7 @@ export class UploadService {
secretKey: process.env.MINIO_SECRET_KEY,
});
return new Promise((resolve, reject) => {
minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 24 * 60 * 60, function (err, presignedUrl) {
minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 3600, function (err, presignedUrl) {
if (err) {
console.log(err)
} else {
Expand Down
42 changes: 29 additions & 13 deletions src/ingestion/services/nvsk-api/nvsk-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Body, Injectable } from '@nestjs/common';
import axios from 'axios';
import { IngestionDatasetQuery } from 'src/ingestion/query/ingestionQuery';
import { DateService } from '../dateService';
import { Request } from 'express';
const csv = require('csv-parser');
const fs = require('fs');
@Injectable()
Expand All @@ -15,35 +16,46 @@ export class NvskApiService {

}
/* NVSK side implementations */
async getEmitterData() {
async getEmitterData(inputData: string[],request: Request) {
let urlData;
let names = process.env.PROGRAM_NAMES.split(',')
let names;
if(!inputData || inputData.length == 0){
names = process.env.PROGRAM_NAMES?.split(',')
}else{
names = inputData;
}
let body: any = {
"program_names": names
}
try {
const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body)
const headers = {
Authorization: request.headers.authorization
};
const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body,{headers: headers})
if (result?.data['code'] === 200) {
urlData = result?.data['data']
} else {
console.log("Error ocurred::", JSON.stringify(result.data));
return { code: 400, error: result?.data['error'] ? result?.data['error'] : "Error occured during the NVSK data emission" }
}
this.writeRawDataFromUrl(urlData)
this.writeRawDataFromUrl(urlData,headers.Authorization)
return { code: 200, message: "VSK Writing to the file in process" }
} catch (error) {
return { code: 400, errorMsg: error }
}

}
async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>) {
async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>,jwtToken:string) {
try {
if (urlData?.length > 0) {
for (let data of urlData) {
let pgname = data.program_name;
for (let url of data.urls) {
const parsedUrl = new URL(url);
const fileName = `./rawdata-files/` + parsedUrl.pathname.split('/').pop();
if(fs.existsSync(fileName)){
this.service.deleteLocalFile(fileName);
}
const stream = (await axios.get(url, { responseType: 'stream' })).data
const filteredCsvStream = fs.createWriteStream(`${fileName}`);
let isFirstRow = true;
Expand Down Expand Up @@ -74,17 +86,19 @@ export class NvskApiService {
}
this.service.deleteLocalFile(fileName)

const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, 'Uploaded')
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1],'Uploaded')
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values)
console.log("The result is:", result);
console.log(`Filtered data saved to ${fileName}`);
} catch (error) {
this.service.deleteLocalFile(fileName)
}
})
})
.on('error', async (error) => {
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, error)
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values)
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1], error)
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values);

this.service.deleteLocalFile(fileName);
console.error('Error processing CSV:', error);
});
Expand All @@ -95,13 +109,15 @@ export class NvskApiService {
let cronExpression = `0 ${dateResult[1]} ${dateResult[0]} * * ?`
console.log("Cron expression is:", cronExpression);
let url = `${process.env.SPEC_URL}` + '/schedule'
let body = {
let scheduleBody = {
"processor_group_name": "Run_adapters",
"scheduled_at": `"${cronExpression}"`
"scheduled_at": `${cronExpression}`
}
let scheduleResult = await this.httpService.post(url, body)
if (scheduleResult?.data['code'] === 200) {
return { code: 200, message: "successfully scheduled the adapters" }
console.log("The schedule body is:",scheduleBody)
let scheduleResult = await this.httpService.post(url, scheduleBody)
console.log('The scheudle result is:',scheduleResult?.data['message']);
if (scheduleResult.status === 200) {
return { code: 200, message: scheduleResult?.['data']['message'] }
} else {
return { code: 400, error: "Adapter schedule failed" }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UploadDimensionFileService } from './upload-dimension-file.service';

describe('UploadDimensionFileService', () => {
let service: UploadDimensionFileService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UploadDimensionFileService],
}).compile();

service = module.get<UploadDimensionFileService>(UploadDimensionFileService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { HttpCustomService } from './../HttpCustomService';
import { Injectable } from '@nestjs/common';
const fs = require('fs');
const FormData = require('form-data');

@Injectable()
export class UploadDimensionFileService {
constructor(private httpService:HttpCustomService){}

async uploadFiles(){
let folderPath = './dimension-files'
try{
let result = await this.httpService.get(process.env.URL + '/generatejwt')
let token: any = result?.data;
let res = await this.callCsvImportAPI(token, folderPath)
return res
}catch(error){
return {code:400, error:error}
}
}
async callCsvImportAPI(data: string, folderPath:string){
const files = fs.readdirSync(folderPath);
let promises = [];
for(let i=0;i<files?.length;i++){
const filePath = folderPath + '/' +files[i]
const fileName: string = files[i]?.split('-')[0]
console.log("The file name is:", fileName)
const formData = new FormData();
formData.append('ingestion_type','dimension');
formData.append('ingestion_name',`${fileName}`);
formData.append('file',fs.createReadStream(filePath));
try{
let url = `${process.env.URL}` + '/new_programs';
const headers = {
Authorization: 'Bearer'+ ' '+data,

};
promises.push(this.httpService.post(url,formData,{headers: headers}))

}catch(error){
console.error("error during csv import:", error)
}
}
try{
await Promise.all(promises)
return { code: 200, message:"Uploading the files"}
}catch(error){
console.error("error is", error)
return {code:400, error:"Error occured during the upload"}
}

}
}

This file was deleted.

0 comments on commit 805c957

Please sign in to comment.