Skip to content

Commit

Permalink
Refactor task submit (#2368)
Browse files Browse the repository at this point in the history
* fix dag npe

* refactor task submit

* merge dev

* merge dev and formate code

* fix some problem

* merge dev

* remove useSession

* formate code

* formate code

* fix task vesion time

* change task version create

* formate code
  • Loading branch information
gaoyan1998 authored Oct 11, 2023
1 parent a76beb7 commit b4b18f7
Show file tree
Hide file tree
Showing 52 changed files with 1,288 additions and 2,851 deletions.
284 changes: 62 additions & 222 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@

package org.dinky.controller;

import org.dinky.data.dto.APICancelDTO;
import org.dinky.data.dto.APIExecuteJarDTO;
import org.dinky.data.dto.APIExecuteSqlDTO;
import org.dinky.data.dto.APIExplainSqlDTO;
import org.dinky.data.dto.APISavePointDTO;
import org.dinky.data.dto.APISavePointTaskDTO;
import org.dinky.data.annotation.Log;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.JobInstance;
import org.dinky.data.result.APIJobResult;
import org.dinky.data.result.ExplainResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SelectResult;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.service.APIService;
import org.dinky.process.exception.ExcuteException;
import org.dinky.service.JobInstanceService;
import org.dinky.service.StudioService;
import org.dinky.service.TaskService;

import java.util.List;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -49,15 +46,12 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* APIController
*
* @since 2021/12/11 21:44
*/
@SuppressWarnings("AlibabaClassNamingShouldBeCamel")
@Slf4j
Expand All @@ -67,237 +61,71 @@
@RequiredArgsConstructor
public class APIController {

private final APIService apiService;
private final StudioService studioService;
private final TaskService taskService;
private final JobInstanceService jobInstanceService;

@GetMapping("/submitTask")
@PostMapping("/submitTask")
@ApiOperation("Submit Task")
// @Log(title = "Submit Task", businessType = BusinessType.SUBMIT)
@ApiImplicitParam(name = "id", value = "Task Id", required = true, dataType = "Integer")
public Result<JobResult> submitTask(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.submitTask(id), Status.EXECUTE_SUCCESS);
}

@PostMapping("/executeSql")
@ApiOperation("Execute Sql")
// @Log(title = "Execute Sql", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "apiExecuteSqlDTO",
value = "API Execute Sql DTO",
required = true,
dataType = "APIExecuteSqlDTO",
dataTypeClass = APIExecuteSqlDTO.class)
public Result<APIJobResult> executeSql(@RequestBody APIExecuteSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.executeSql(apiExecuteSqlDTO), Status.EXECUTE_SUCCESS);
}

@PostMapping("/explainSql")
@ApiOperation("Explain Sql")
// @Log(title = "Explain Sql", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "apiExecuteSqlDTO",
value = "API Execute Sql DTO",
required = true,
dataType = "APIExecuteSqlDTO",
dataTypeClass = APIExecuteSqlDTO.class)
public Result<ExplainResult> explainSql(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.explainSql(apiExecuteSqlDTO), Status.EXECUTE_SUCCESS);
public Result<JobResult> submitTask(@RequestBody TaskDTO taskDTO) throws ExcuteException {
JobResult jobResult = taskService.submitTask(taskDTO.getId(), null);
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
return Result.failed(jobResult, jobResult.getError());
}
}

@PostMapping("/getJobPlan")
@ApiOperation("Get Job Plan")
@ApiImplicitParam(
name = "apiExecuteSqlDTO",
value = "API Execute Sql DTO",
required = true,
dataType = "APIExecuteSqlDTO",
dataTypeClass = APIExecuteSqlDTO.class)
public Result<ObjectNode> getJobPlan(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.getJobPlan(apiExecuteSqlDTO), Status.EXECUTE_SUCCESS);
}

@PostMapping("/getStreamGraph")
@ApiOperation("Get Stream Graph")
@ApiImplicitParam(
name = "apiExecuteSqlDTO",
value = "API Execute Sql DTO",
required = true,
dataType = "APIExecuteSqlDTO",
dataTypeClass = APIExecuteSqlDTO.class)
public Result<ObjectNode> getStreamGraph(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.getStreamGraph(apiExecuteSqlDTO), Status.EXECUTE_SUCCESS);
}

@GetMapping("/getJobData")
@ApiOperation("Get Job Data")
@ApiImplicitParam(
name = "jobId",
value = "Job Id",
required = true,
dataType = "String",
dataTypeClass = String.class)
public Result<SelectResult> getJobData(@RequestParam String jobId) {
return Result.succeed(studioService.getJobData(jobId));
}

@PostMapping("/cancel")
@GetMapping("/cancel")
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
@ApiImplicitParam(
name = "apiCancelDTO",
value = "API Cancel DTO",
required = true,
dataType = "APICancelDTO",
dataTypeClass = APICancelDTO.class)
public Result<Boolean> cancel(@RequestBody APICancelDTO apiCancelDTO) {
return Result.succeed(apiService.cancel(apiCancelDTO), Status.EXECUTE_SUCCESS);
public Result<Boolean> cancel(@RequestParam Integer id) {
return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS);
}

/**
* 重启任务
*/
@GetMapping(value = "/restartTask")
@ApiOperation("Restart Task")
// @Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION)
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath)
throws ExcuteException {
return Result.succeed(taskService.restartTask(id, savePointPath));
}

@PostMapping("/savepoint")
// @Log(title = "Savepoint Trigger", businessType = BusinessType.TRIGGER)
@ApiOperation("Savepoint Trigger")
@ApiImplicitParam(
name = "apiSavePointDTO",
value = "API SavePoint DTO",
required = true,
dataType = "APISavePointDTO",
dataTypeClass = APISavePointDTO.class)
public Result<SavePointResult> savepoint(@RequestBody APISavePointDTO apiSavePointDTO) {
return Result.succeed(apiService.savepoint(apiSavePointDTO), Status.EXECUTE_SUCCESS);
}

@PostMapping("/executeJar")
@ApiOperation("Execute Jar")
// @Log(title = "Execute Jar", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "apiExecuteJarDTO",
value = "API Execute Jar DTO",
required = true,
dataType = "APIExecuteJarDTO",
dataTypeClass = APIExecuteJarDTO.class)
public Result<APIJobResult> executeJar(@RequestBody APIExecuteJarDTO apiExecuteJarDTO) {
return Result.succeed(apiService.executeJar(apiExecuteJarDTO), Status.EXECUTE_SUCCESS);
}

@PostMapping("/savepointTask")
@ApiOperation("Savepoint Task")
// @Log(title = "Savepoint Task", businessType = BusinessType.TRIGGER)
@ApiImplicitParam(
name = "apiSavePointTaskDTO",
value = "API SavePoint Task DTO",
required = true,
dataType = "APISavePointTaskDTO",
dataTypeClass = APISavePointTaskDTO.class)
public Result<Boolean> savepointTask(@RequestBody APISavePointTaskDTO apiSavePointTaskDTO) {
public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestParam String savePointType) {
return Result.succeed(
taskService.savepointTask(apiSavePointTaskDTO.getTaskId(), apiSavePointTaskDTO.getType()), "执行成功");
taskService.savepointTaskJob(taskService.getTaskInfoById(taskId), savePointType),
Status.EXECUTE_SUCCESS);
}

/** 重启任务 */
@GetMapping("/restartTask")
@ApiOperation("Restart Task")
// @Log(title = "Restart Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class)
public Result<JobResult> restartTask(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.restartTask(id, null), Status.RESTART_SUCCESS);
}

/** 选择保存点重启任务 */
@GetMapping("/selectSavePointRestartTask")
@ApiOperation("Select SavePoint Restart Task")
// @Log(title = "Select SavePoint Restart Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParams({
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class),
@ApiImplicitParam(
name = "savePointPath",
value = "SavePoint Path",
required = true,
dataType = "String",
dataTypeClass = String.class)
})
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.restartTask(id, savePointPath), Status.RESTART_SUCCESS);
}

/** 上线任务 */
@GetMapping("/onLineTask")
@ApiOperation("Online Task")
// @Log(title = "Online Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class)
public Result<JobResult> onLineTask(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return taskService.onLineTask(id);
}

/** 下线任务 */
@GetMapping("/offLineTask")
@ApiOperation("Offline Task")
// @Log(title = "Offline Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class)
public Result<Void> offLineTask(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return taskService.offLineTask(id, null);
@PostMapping("/explainSql")
@ApiOperation("Explain Sql")
public Result<List<SqlExplainResult>> explainSql(@RequestBody TaskDTO taskDTO) throws NotSupportExplainExcepition {
return Result.succeed(taskService.explainTask(taskDTO), Status.EXECUTE_SUCCESS);
}

/** 重新上线任务 */
@GetMapping("/reOnLineTask")
@ApiOperation("ReOnline Task")
// @Log(title = "ReOnline Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class)
public Result<JobResult> reOnLineTask(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return taskService.reOnLineTask(id, null);
@PostMapping("/getJobPlan")
@ApiOperation("Get Job Plan")
public Result<ObjectNode> getJobPlan(@RequestBody TaskDTO taskDTO) {
return Result.succeed(taskService.getJobPlan(taskDTO), Status.EXECUTE_SUCCESS);
}

/** 选择保存点重新上线任务 */
@GetMapping("/selectSavePointReOnLineTask")
@ApiOperation("Select SavePoint ReOnline Task")
// @Log(title = "Select SavePoint ReOnline Task", businessType = BusinessType.EXECUTE)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
dataTypeClass = Integer.class)
public Result<JobResult> selectSavePointReOnLineTask(@RequestParam Integer id, @RequestParam String savePointPath) {
taskService.initTenantByTaskId(id);
return taskService.reOnLineTask(id, savePointPath);
@PostMapping("/getStreamGraph")
@ApiOperation("Get Stream Graph")
public Result<ObjectNode> getStreamGraph(@RequestBody TaskDTO taskDTO) {
return Result.succeed(taskService.getStreamGraph(taskDTO), Status.EXECUTE_SUCCESS);
}

/** 获取Job实例的信息 */
/**
* 获取Job实例的信息
*/
@GetMapping("/getJobInstance")
@ApiOperation("Get Job Instance")
// @Log(title = "Get Job Instance", businessType = BusinessType.QUERY)
@ApiImplicitParam(
name = "id",
value = "Job Instance Id",
Expand All @@ -309,10 +137,8 @@ public Result<JobInstance> getJobInstance(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.getById(id));
}

/** 通过 taskId 获取 Task 对应的 Job 实例的信息 */
@GetMapping("/getJobInstanceByTaskId")
@ApiOperation("Get Job Instance By Task Id")
// @Log(title = "Get Job Instance By Task Id", businessType = BusinessType.QUERY)
@ApiImplicitParam(
name = "id",
value = "Task Id",
Expand All @@ -323,4 +149,18 @@ public Result<JobInstance> getJobInstanceByTaskId(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(jobInstanceService.getJobInstanceByTaskId(id));
}

@GetMapping(value = "/exportSql")
@ApiOperation("Export Sql")
@Log(title = "Export Sql", businessType = BusinessType.EXPORT)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
paramType = "query",
dataTypeClass = Integer.class)
public Result<String> exportSql(@RequestParam Integer id) {
return Result.succeed(taskService.exportSql(id));
}
}
Loading

0 comments on commit b4b18f7

Please sign in to comment.