Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API added to upload files for dimension #260

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"csvtojson": "^2.0.10",
"express": "^4.18.2",
"fast-csv": "^4.3.6",
"form-data": "^4.0.0",
"fs": "^0.0.1-security",
"jsonwebtoken": "^9.0.0",
"mime-types": "^2.1.35",
Expand Down
6 changes: 5 additions & 1 deletion src/database/database.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export class DatabaseService {
this.logger.debug(`Executing query: ${queryText} (${values})`);
return this.pool.query(queryText, values).then((result: QueryResult) => {
return result.rows;
});
},
(error)=>{
return error
}
);
}
}
23 changes: 12 additions & 11 deletions src/ingestion/controller/ingestion.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import {UpdateFileStatusService} from '../services/update-file-status/update-fil
import {ApiConsumes, ApiTags} from '@nestjs/swagger';
import {DatabaseService} from '../../database/database.service';
import {DataEmissionService} from '../services/data-emission/data-emission.service';
import {V4DataEmissionService} from "../services/v4-data-emission/v4-data-emission.service";
import {JwtGuard} from '../../guards/jwt.guard';
import * as jwt from 'jsonwebtoken';
import { NvskApiService } from '../services/nvsk-api/nvsk-api.service';
import { UploadDimensionFileService } from '../services/upload-dimension-file/upload-dimension-file.service';
import { GrammarService } from '../services/grammar/grammar.service';
import { GenericFunction } from '../services/generic-function';

Expand Down Expand Up @@ -71,11 +71,12 @@ export class IngestionController {
constructor(
private datasetService: DatasetService, private dimensionService: DimensionService
, private eventService: EventService, private csvImportService: CsvImportService, private fileStatus: FileStatusService, private updateFileStatus: UpdateFileStatusService,
private databaseService: DatabaseService, private dataEmissionService: DataEmissionService, private v4DataEmissionService: V4DataEmissionService,
private databaseService: DatabaseService, private dataEmissionService: DataEmissionService,
private rawDataImportService:RawDataImportService,
private nvskService:NvskApiService,
private grammarService: GrammarService,
private service: GenericFunction) {
private service: GenericFunction,
private uploadDimension:UploadDimensionFileService) {
}

@Get('generatejwt')
Expand Down Expand Up @@ -237,24 +238,23 @@ export class IngestionController {
}


@Get('/v4-data-emission')
@UseGuards(JwtGuard)
@Get('/upload-dimension')
async dataEmission(@Res()response: Response) {
try {
const result: any = await this.v4DataEmissionService.uploadFiles();
const result: any = await this.uploadDimension.uploadFiles();
if (result.code == 400) {
response.status(400).send({message: result.error});
} else {
response.status(200).send({message: result.message});
}
} catch (e) {
console.error('ingestion.controller.v4dataEmission: ', e.message);
console.error('ingestion.controller.uploadDimensionFiles: ', e.message);
throw new Error(e);
}
}

@Post('/getRawData')
// @UseGuards(JwtGuard)
@UseGuards(JwtGuard)
async getPresignedUrls(@Body() inputData: RawDataPullBody, @Res()response: Response){
try {

Expand All @@ -265,10 +265,11 @@ export class IngestionController {
throw new Error(e);
}
}
@Get('/data-emitter')
async fetchData(@Body()inputData: NvskApiService,@Res()response: Response){
@Post('/data-emitter')
@UseGuards(JwtGuard)
async fetchData(@Body()inputData:RawDataPullBody ,@Res()response: Response,@Req() request: Request){
try {
const result: any = await this.nvskService.getEmitterData();
const result: any = await this.nvskService.getEmitterData(inputData?.program_names, request);
console.log("result is", result);
if (result?.code == 400) {
response.status(400).send({message: result.error});
Expand Down
4 changes: 2 additions & 2 deletions src/ingestion/ingestion.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import {CsvImportService} from "./services/csvImport/csvImport.service";
import {FileStatusService} from './services/file-status/file-status.service';
import {UpdateFileStatusService} from "./services/update-file-status/update-file-status.service";
import {DataEmissionService} from "./services/data-emission/data-emission.service";
import {V4DataEmissionService} from "./services/v4-data-emission/v4-data-emission.service";
import {UploadService} from "./services/file-uploader-service";
import { RawDataImportService } from './services/rawDataImport/rawDataImport.service';
import { NvskApiService } from './services/nvsk-api/nvsk-api.service';
import { DateService } from './services/dateService';
import { UploadDimensionFileService } from './services/upload-dimension-file/upload-dimension-file.service';
import { GrammarService } from './services/grammar/grammar.service';

@Module({
controllers: [IngestionController],
providers: [DatasetService, DimensionService, EventService, GenericFunction, HttpCustomService, CsvImportService, FileStatusService,
UpdateFileStatusService, DataEmissionService, V4DataEmissionService, UploadService,RawDataImportService,NvskApiService,DateService,GrammarService],
UpdateFileStatusService, DataEmissionService, UploadDimensionFileService, UploadService,RawDataImportService,NvskApiService,DateService,GrammarService],
imports: [DatabaseModule, HttpModule]
})
export class IngestionModule {
Expand Down
4 changes: 2 additions & 2 deletions src/ingestion/interfaces/Ingestion-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ export class EmissionBody {

export class RawDataPullBody{
@ApiProperty()
program_names: string[];
program_names?: string[];
@ApiProperty()
token:string;
token?:string;
}

export interface RawDataResponse {
Expand Down
8 changes: 4 additions & 4 deletions src/ingestion/query/ingestionQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ export const IngestionDatasetQuery = {
ingestion."FileTracker" where pid = $1`;
return {query:queryStr,values: [fileTrackerPid]};
},
async insertIntoEmission(program_name:string,presignedurl:string,file_status:string){
const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,file_status)VALUES
($1,$2,$3)`;
return {query:queryStr,values:[program_name,presignedurl,file_status]}
async insertIntoEmission(program_name:string,presignedurl:string,jwt_token:string,file_status:string){
const queryStr =`INSERT into emission."vsk_tracker"(program_name,presignedurl,jwt_token,file_status)VALUES
($1,$2,$3,$4)`;
return {query:queryStr,values:[program_name,presignedurl,jwt_token,file_status]}
}
};
6 changes: 5 additions & 1 deletion src/ingestion/services/dateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ export class DateService{
if (this.isTimeGreaterThan12PM(currentTime)) {
// Convert to IST (add 5 hours and 30 minutes for the UTC+5:30 offset)
const [hours, minutes] = currentTime.split(':');
// const hour = parseInt(hours, 10) + 5;
// const minute = parseInt(minutes, 10) + 30;

console.log("Hours is:", hours);
console.log("Minutes is:",minutes);
return [hours,minutes]
let min = parseInt(minutes) + 2
return [hours,min]
} else {
return currentTime
}
Expand Down
5 changes: 3 additions & 2 deletions src/ingestion/services/file-uploader-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,14 @@ export class UploadService {
signatureVersion: "v4",
accessKeyId: process.env.AWS_ACCESS_KEY,
secretAccessKey: process.env.AWS_SECRET_KEY,
region: 'ap-south-1'
region: process.env.AWS_REGION
});
return new Promise((resolve, reject) => {
// file name
const params = {
Bucket: process.env.AWS_BUCKET,
Key: fileName,
Expires: 3600
};
s3.getSignedUrl('getObject', params, (err, url) => {
if (err) {
Expand All @@ -171,7 +172,7 @@ export class UploadService {
secretKey: process.env.MINIO_SECRET_KEY,
});
return new Promise((resolve, reject) => {
minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 24 * 60 * 60, function (err, presignedUrl) {
minioClient.presignedUrl('GET', process.env.MINIO_BUCKET, fileName, 3600, function (err, presignedUrl) {
if (err) {
console.log(err)
} else {
Expand Down
42 changes: 29 additions & 13 deletions src/ingestion/services/nvsk-api/nvsk-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Body, Injectable } from '@nestjs/common';
import axios from 'axios';
import { IngestionDatasetQuery } from 'src/ingestion/query/ingestionQuery';
import { DateService } from '../dateService';
import { Request } from 'express';
const csv = require('csv-parser');
const fs = require('fs');
@Injectable()
Expand All @@ -15,35 +16,46 @@ export class NvskApiService {

}
/* NVSK side implementations */
async getEmitterData() {
async getEmitterData(inputData: string[],request: Request) {
let urlData;
let names = process.env.PROGRAM_NAMES.split(',')
let names;
if(!inputData || inputData.length == 0){
names = process.env.PROGRAM_NAMES?.split(',')
}else{
names = inputData;
}
let body: any = {
"program_names": names
}
try {
const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body)
const headers = {
Authorization: request.headers.authorization
};
const result = await this.httpService.post(process.env.NVSK_URL + '/getRawData', body,{headers: headers})
if (result?.data['code'] === 200) {
urlData = result?.data['data']
} else {
console.log("Error ocurred::", JSON.stringify(result.data));
return { code: 400, error: result?.data['error'] ? result?.data['error'] : "Error occured during the NVSK data emission" }
}
this.writeRawDataFromUrl(urlData)
this.writeRawDataFromUrl(urlData,headers.Authorization)
return { code: 200, message: "VSK Writing to the file in process" }
} catch (error) {
return { code: 400, errorMsg: error }
}

}
async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>) {
async writeRawDataFromUrl(urlData: Array<{ program_name: string, urls: string[] }>,jwtToken:string) {
try {
if (urlData?.length > 0) {
for (let data of urlData) {
let pgname = data.program_name;
for (let url of data.urls) {
const parsedUrl = new URL(url);
const fileName = `./rawdata-files/` + parsedUrl.pathname.split('/').pop();
if(fs.existsSync(fileName)){
this.service.deleteLocalFile(fileName);
}
const stream = (await axios.get(url, { responseType: 'stream' })).data
const filteredCsvStream = fs.createWriteStream(`${fileName}`);
let isFirstRow = true;
Expand Down Expand Up @@ -74,17 +86,19 @@ export class NvskApiService {
}
this.service.deleteLocalFile(fileName)

const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, 'Uploaded')
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1],'Uploaded')
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values)
console.log("The result is:", result);
console.log(`Filtered data saved to ${fileName}`);
} catch (error) {
this.service.deleteLocalFile(fileName)
}
})
})
.on('error', async (error) => {
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url, error)
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values)
const queryStr = await IngestionDatasetQuery.insertIntoEmission(pgname, url,jwtToken.split(' ')[1], error)
const result = await this.databaseService.executeQuery(queryStr.query, queryStr.values);

this.service.deleteLocalFile(fileName);
console.error('Error processing CSV:', error);
});
Expand All @@ -95,13 +109,15 @@ export class NvskApiService {
let cronExpression = `0 ${dateResult[1]} ${dateResult[0]} * * ?`
console.log("Cron expression is:", cronExpression);
let url = `${process.env.SPEC_URL}` + '/schedule'
let body = {
let scheduleBody = {
"processor_group_name": "Run_adapters",
"scheduled_at": `"${cronExpression}"`
"scheduled_at": `${cronExpression}`
}
let scheduleResult = await this.httpService.post(url, body)
if (scheduleResult?.data['code'] === 200) {
return { code: 200, message: "successfully scheduled the adapters" }
console.log("The schedule body is:",scheduleBody)
let scheduleResult = await this.httpService.post(url, scheduleBody)
console.log('The scheudle result is:',scheduleResult?.data['message']);
if (scheduleResult.status === 200) {
return { code: 200, message: scheduleResult?.['data']['message'] }
} else {
return { code: 400, error: "Adapter schedule failed" }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UploadDimensionFileService } from './upload-dimension-file.service';

describe('UploadDimensionFileService', () => {
let service: UploadDimensionFileService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UploadDimensionFileService],
}).compile();

service = module.get<UploadDimensionFileService>(UploadDimensionFileService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { HttpCustomService } from './../HttpCustomService';
import { Injectable } from '@nestjs/common';
const fs = require('fs');
const FormData = require('form-data');

@Injectable()
export class UploadDimensionFileService {
constructor(private httpService:HttpCustomService){}

async uploadFiles(){
let folderPath = './dimension-files'
try{
let result = await this.httpService.get(process.env.URL + '/generatejwt')
let token: any = result?.data;
let res = await this.callCsvImportAPI(token, folderPath)
return res
}catch(error){
return {code:400, error:error}
}
}
async callCsvImportAPI(data: string, folderPath:string){
const files = fs.readdirSync(folderPath);
let promises = [];
for(let i=0;i<files?.length;i++){
const filePath = folderPath + '/' +files[i]
const fileName: string = files[i]?.split('-')[0]
console.log("The file name is:", fileName)
const formData = new FormData();
formData.append('ingestion_type','dimension');
formData.append('ingestion_name',`${fileName}`);
formData.append('file',fs.createReadStream(filePath));
try{
let url = `${process.env.URL}` + '/new_programs';
const headers = {
Authorization: 'Bearer'+ ' '+data,

};
promises.push(this.httpService.post(url,formData,{headers: headers}))

}catch(error){
console.error("error during csv import:", error)
}
}
try{
await Promise.all(promises)
return { code: 200, message:"Uploading the files"}
}catch(error){
console.error("error is", error)
return {code:400, error:"Error occured during the upload"}
}

}
}

This file was deleted.