Skip to content

Commit

Permalink
Added a processor group state change API
Browse files Browse the repository at this point in the history
  • Loading branch information
dhanush-2397 committed Nov 17, 2023
1 parent 4312d99 commit ca7a5e3
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 119 deletions.
20 changes: 10 additions & 10 deletions src/specifications/controller/specification.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ import {
specDimensionDTO,
scheduleDto,
specEventDTO,
s3DTO, GetGrammar
ProcessorDto
} from '../dto/specData.dto';
import {DatasetService} from '../service/dataset/dataset.service';
import {ScheduleService} from '../service/schedule/schedule.service';
import {ApiTags} from '@nestjs/swagger';
import {Grammar} from '../service/grammar/grammar.service';
import { ReadSchemaService } from '../service/read-schema/read-schema.service';
import { ProcessorGroupStateService } from '../service/processor-group-state/processor-group-state.service';

@ApiTags('spec-ms')
@Controller('')
export class SpecificationController {
constructor(private dimensionService: DimensionService, private EventService: EventService, private datasetService: DatasetService,
private scheduleService: ScheduleService, private readJsonFiles: ReadSchemaService, private grammar: Grammar, private pipelineService: PipelineService) {
private scheduleService: ScheduleService, private readJsonFiles: ReadSchemaService, private pipelineService: PipelineService,
private processorStateService: ProcessorGroupStateService) {
}

@Get('/hello')
Expand Down Expand Up @@ -134,19 +135,18 @@ export class SpecificationController {
}
}

@Get('/grammar')
async getGrammar(@Query()getGrammar: GetGrammar, @Res() response: Response) {
@Post('/change-state')
async processorState(@Body() inputBody:ProcessorDto, @Res()response: Response) {
try {
const result: any = await this.grammar.getGrammar(getGrammar);
const result: any = await this.processorStateService.changeProcessorGroupState(inputBody);
if (result?.code == 400) {
response.status(400).send({"message": result.error});
}
else {
response.status(200).send({"schema": result.data.schema});
response.status(200).send({"message": result.message});
}
} catch (error) {
console.error('specification.controller.getGrammar: ', error);
throw new Error(error);
console.error('changeprocessorState: ', error);
}
}
}
}
14 changes: 4 additions & 10 deletions src/specifications/dto/specData.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,11 @@ export class scheduleDto {
program_name?: string
}


export class s3DTO {
export class ProcessorDto{
@ApiProperty()
scheduled_at?: string;
processor_group_name?: string;
@ApiProperty()
scheduled_type: string;
state: string;
}

export class GetGrammar {
@ApiProperty()
grammar_type: string;
@ApiProperty()
grammar_name: string;
}

77 changes: 0 additions & 77 deletions src/specifications/service/grammar/grammar.service.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ProcessorGroupStateService } from './processor-group-state.service';

describe('ProcessorGroupStateService', () => {
let service: ProcessorGroupStateService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ProcessorGroupStateService],
}).compile();

service = module.get<ProcessorGroupStateService>(ProcessorGroupStateService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Injectable } from "@nestjs/common";
import { ScheduleService } from "../schedule/schedule.service";
import { HttpCustomService } from "../HttpCustomService";
import { ProcessorDto } from "src/specifications/dto/specData.dto";
import { GenericFunction } from "../genericFunction";

@Injectable()
export class ProcessorGroupStateService {
scheduleSchema = {
type: "object",
properties: {
processor_group_name: {
type: "string",
shouldnotnull: true,
},
state: {
type: "string",
shouldnotnull: true,
enum: ["RUNNING", "STOPPED"],
},
},
required: ["processor_group_name", "state"],
};
nifiUrl: string = `${process.env.NIFI_HOST}:${process.env.NIFI_PORT}`;
constructor(
private scheduleService: ScheduleService,
private http: HttpCustomService,
private specService: GenericFunction
) {}
async changeProcessorGroupState(inputProcessorData: ProcessorDto) {
let isValidSchema: any;
isValidSchema = await this.specService.ajvValidator(
this.scheduleSchema,
inputProcessorData
);
if (isValidSchema.errors) {
return { code: 400, error: isValidSchema.errors };
} else {
try {
const processorGroups = await this.scheduleService.getRootDetails();
let pg_list = processorGroups.data;
let processorGroupName = inputProcessorData.processor_group_name;
let counter = 0;
let data = {};
let pg_group = pg_list["processGroupFlow"]["flow"]["processGroups"];
for (let pg of pg_group) {
if (pg.component.name == processorGroupName) {
let pg_source = pg;
counter = counter + 1;
data = {
id: pg_source["component"]["id"],
state: inputProcessorData.state, // RUNNING or STOP
disconnectedNodeAcknowledged: false,
};
const result = await this.http.put(
`${this.nifiUrl}/nifi-api/flow/process-groups/${pg_source["component"]["id"]}`,
data
);
console.log("the result is:",result.data);
return {code: 200,message:`changed the state of ${processorGroupName}`}
}
}
return {code:400,error:"Could not find the processor group"}
} catch (error) {
return {code:400,error:error?.message}

}
}
}
}
54 changes: 32 additions & 22 deletions src/specifications/specifications.module.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
import {HttpCustomService} from './service/HttpCustomService';
import {EventService} from './service/event/event.service';
import {Dimension} from './../typeorm/dimension.entity';
import {Module} from '@nestjs/common';
import {SpecificationController} from './controller/specification.controller';
import {TypeOrmModule} from '@nestjs/typeorm';
import {DimensionService} from './service/dimension/dimension.service';
import {GenericFunction} from './service/genericFunction';
import {TransformerService} from './service/transformer/transformer.service';
import {DatasetService} from './service/dataset/dataset.service';
import {HttpModule} from '@nestjs/axios';
import { PipelineService } from '../specifications/service/pipeline-old/pipeline.service';
import {ScheduleService} from './service/schedule/schedule.service';
import { PipelineGenericService } from './service/pipeline-generic/pipeline-generic.service';
import {Grammar} from "./service/grammar/grammar.service";
import { ReadSchemaService } from './service/read-schema/read-schema.service';
import { HttpCustomService } from "./service/HttpCustomService";
import { EventService } from "./service/event/event.service";
import { Dimension } from "./../typeorm/dimension.entity";
import { Module } from "@nestjs/common";
import { SpecificationController } from "./controller/specification.controller";
import { TypeOrmModule } from "@nestjs/typeorm";
import { DimensionService } from "./service/dimension/dimension.service";
import { GenericFunction } from "./service/genericFunction";
import { TransformerService } from "./service/transformer/transformer.service";
import { DatasetService } from "./service/dataset/dataset.service";
import { HttpModule } from "@nestjs/axios";
import { PipelineService } from "../specifications/service/pipeline-old/pipeline.service";
import { ScheduleService } from "./service/schedule/schedule.service";
import { PipelineGenericService } from "./service/pipeline-generic/pipeline-generic.service";
import { ReadSchemaService } from "./service/read-schema/read-schema.service";
import { ProcessorGroupStateService } from "./service/processor-group-state/processor-group-state.service";

@Module({
imports: [HttpModule],
controllers: [SpecificationController],
providers: [DimensionService, EventService, GenericFunction, TransformerService, DatasetService, PipelineService, HttpCustomService, ScheduleService, Grammar, PipelineGenericService,ReadSchemaService],

imports: [HttpModule],
controllers: [SpecificationController],
providers: [
DimensionService,
EventService,
GenericFunction,
TransformerService,
DatasetService,
PipelineService,
HttpCustomService,
ScheduleService,
PipelineGenericService,
ReadSchemaService,
ProcessorGroupStateService
],
})
export class SpecificationsModule {
}
export class SpecificationsModule {}

0 comments on commit ca7a5e3

Please sign in to comment.