Skip to content

Commit

Permalink
[Optimization-2349][core] Optimize dinky executor (#2350)
Browse files Browse the repository at this point in the history
* [Optimization-2349][core] Optimize dinky executor

* Optimize code

---------

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Oct 7, 2023
1 parent 496e118 commit 0459e09
Show file tree
Hide file tree
Showing 34 changed files with 867 additions and 786 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.data.annotation.Log;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.ID;
import org.dinky.data.model.JobInfoDetail;
import org.dinky.data.model.JobInstance;
import org.dinky.data.model.JobManagerConfiguration;
Expand Down Expand Up @@ -106,6 +107,18 @@ public Result<JobInfoDetail> getJobInfoDetail(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.getJobInfoDetail(id));
}

@PostMapping("/getOneById")
@ApiOperation("Get job instance info by job instance id")
@ApiImplicitParam(
name = "id",
value = "Job instance id",
dataType = "Integer",
paramType = "query",
required = true)
public Result getOneById(@RequestBody ID id) {
return Result.succeed(jobInstanceService.getById(id.getId()));
}

/** 刷新Job实例的所有信息 */
@GetMapping("/refreshJobInfoDetail")
@ApiOperation("Refresh job info detail")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.dinky.assertion.Asserts;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorSetting;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
Expand Down Expand Up @@ -170,13 +171,13 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
// 添加自定义全局变量信息
sb.append(getFlinkSQLStatement(id, dbConfig));
List<String> statements = Submitter.getStatements(sb.toString());
ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig);
ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig);

// 加载第三方jar
loadDep(taskConfig.get("type"), id, dinkyAddr, executorSetting);
loadDep(taskConfig.get("type"), id, dinkyAddr, executorConfig);

logger.info("作业配置如下: {}", executorSetting);
Executor executor = Executor.buildAppStreamExecutor(executorSetting);
logger.info("The job configuration is as follows: {}", executorConfig);
Executor executor = ExecutorFactory.buildAppStreamExecutor(executorConfig);
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();
Expand All @@ -188,39 +189,39 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType));
if (!executorSetting.isUseStatementSet()) {
if (!executorConfig.isUseStatementSet()) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(statement, operationType));
if (!executorSetting.isUseStatementSet()) {
if (!executorConfig.isUseStatementSet()) {
break;
}
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
for (StatementParam item : ddl) {
logger.info("正在执行 FlinkSQL " + item.getValue());
executor.submitSql(item.getValue());
logger.info("执行成功");
logger.info("Executing FlinkSQL: " + item.getValue());
executor.executeSql(item.getValue());
logger.info("Execution succeeded.");
}
if (trans.size() > 0) {
if (executorSetting.isUseStatementSet()) {
if (executorConfig.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
executor.submitStatementSet(inserts);
logger.info("执行成功");
logger.info("Executing FlinkSQL statement set: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
executor.executeStatementSet(inserts);
logger.info("Execution succeeded.");
} else {
for (StatementParam item : trans) {
logger.info("正在执行 FlinkSQL " + item.getValue());
executor.submitSql(item.getValue());
logger.info("执行成功");
logger.info("Executing FlinkSQL: " + item.getValue());
executor.executeSql(item.getValue());
logger.info("Execution succeeded.");
break;
}
}
Expand All @@ -230,13 +231,13 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
for (StatementParam item : execute) {
executes.add(item.getValue());
executor.executeSql(item.getValue());
if (!executorSetting.isUseStatementSet()) {
if (!executorConfig.isUseStatementSet()) {
break;
}
}
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
executor.execute(executorSetting.getJobName());
executor.execute(executorConfig.getJobName());
logger.info("执行成功");
} catch (Exception e) {
logger.error("执行失败, {}", e.getMessage(), e);
Expand All @@ -245,7 +246,7 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
logger.info("{}任务提交成功", LocalDateTime.now());
}

private static void loadDep(String type, Integer taskId, String dinkyAddr, ExecutorSetting executorSetting) {
private static void loadDep(String type, Integer taskId, String dinkyAddr, ExecutorConfig executorConfig) {
if (StringUtils.isBlank(dinkyAddr)) {
return;
}
Expand Down Expand Up @@ -275,13 +276,13 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu
.toArray(URL[]::new);

addURLs(jarUrls);
executorSetting
executorConfig
.getConfig()
.put(
PipelineOptions.JARS.key(),
Arrays.stream(jarUrls).map(URL::toString).collect(Collectors.joining(";")));
if (ArrayUtil.isNotEmpty(pyUrls)) {
executorSetting
executorConfig
.getConfig()
.put(
PythonOptions.PYTHON_FILES.key(),
Expand All @@ -295,7 +296,7 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu
throw new RuntimeException(e);
}
}
executorSetting.getConfig().put("python.files", "./python_udf.zip");
executorConfig.getConfig().put("python.files", "./python_udf.zip");
}

private static void addURLs(URL[] jarUrls) {
Expand Down
29 changes: 29 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/model/ID.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.model;

import io.swagger.annotations.ApiModel;
import lombok.Data;

@Data
@ApiModel(value = "ID", description = "The ID of a object")
public class ID {
private Integer id;
}
13 changes: 9 additions & 4 deletions dinky-core/src/main/java/org/dinky/data/result/ResultPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.dinky.data.result;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;

/**
* ResultPool
Expand All @@ -31,7 +33,7 @@ public final class ResultPool {

private ResultPool() {}

private static final Map<String, SelectResult> results = new ConcurrentHashMap<>();
private static final Cache<String, SelectResult> results = new TimedCache<>(TimeUnit.MINUTES.toMillis(10));

public static boolean containsKey(String key) {
return results.containsKey(key);
Expand All @@ -42,7 +44,10 @@ public static void put(SelectResult result) {
}

public static SelectResult get(String key) {
return results.getOrDefault(key, SelectResult.buildDestruction(key));
if (containsKey(key)) {
return results.get(key);
}
return SelectResult.buildDestruction(key);
}

public static boolean remove(String key) {
Expand Down
10 changes: 5 additions & 5 deletions dinky-core/src/main/java/org/dinky/data/result/RunResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.dinky.data.result;

import org.dinky.executor.ExecutorSetting;
import org.dinky.executor.ExecutorConfig;

import java.time.LocalDateTime;

Expand All @@ -42,7 +42,7 @@ public class RunResult {
private String msg;
private String error;
private IResult result;
private ExecutorSetting setting;
private ExecutorConfig setting;

public RunResult() {}

Expand All @@ -51,7 +51,7 @@ public RunResult(
String statement,
String flinkHost,
Integer flinkPort,
ExecutorSetting setting,
ExecutorConfig setting,
String jobName) {
this.sessionId = sessionId;
this.statement = statement;
Expand All @@ -77,11 +77,11 @@ public void setJobId(String jobId) {
this.jobId = jobId;
}

public ExecutorSetting getSetting() {
public ExecutorConfig getSetting() {
return setting;
}

public void setSetting(ExecutorSetting setting) {
public void setSetting(ExecutorConfig setting) {
this.setting = setting;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.data.result;

import org.dinky.assertion.Asserts;

import org.apache.flink.table.api.TableResult;

/**
Expand All @@ -34,7 +36,7 @@ public class SelectResultBuilder implements ResultBuilder {
private final String timeZone;

public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.maxRowNum = maxRowNum;
this.maxRowNum = Asserts.isNotNull(maxRowNum) ? maxRowNum : 100;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
this.timeZone = timeZone;
Expand Down
3 changes: 2 additions & 1 deletion dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.dinky.process.context.ProcessContextHolder;
import org.dinky.process.model.ProcessEntity;
import org.dinky.trans.Operations;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static Explainer build(Executor executor, boolean useStatementSet, String
}

public Explainer initialize(JobManager jobManager, JobConfig config, String statement) {
jobManager.initClassLoader(config);
DinkyClassLoaderUtil.initClassLoader(config);
String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement), sqlSeparator);
jobManager.initUDF(parseUDFFromStatements(statements));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.dinky.explainer.lineage;

import org.dinky.data.model.LineageRel;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorFactory;
import org.dinky.explainer.Explainer;

import java.util.ArrayList;
Expand All @@ -36,7 +36,7 @@
public class LineageBuilder {

public static LineageResult getColumnLineageByLogicalPlan(String statement) {
Explainer explainer = new Explainer(Executor.build(), false);
Explainer explainer = new Explainer(ExecutorFactory.getExecutor(), false);
List<LineageRel> lineageRelList = explainer.getLineage(statement);
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
Expand Down
12 changes: 6 additions & 6 deletions dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.dinky.data.result.IResult;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorSetting;
import org.dinky.executor.ExecutorConfig;
import org.dinky.gateway.enums.GatewayType;

import java.time.LocalDateTime;
Expand All @@ -48,7 +48,7 @@ public class Job {
private String jobId;
private String error;
private IResult result;
private ExecutorSetting executorSetting;
private ExecutorConfig executorConfig;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Executor executor;
Expand All @@ -68,14 +68,14 @@ public Job(
GatewayType type,
JobStatus status,
String statement,
ExecutorSetting executorSetting,
ExecutorConfig executorConfig,
Executor executor,
boolean useGateway) {
this.jobConfig = jobConfig;
this.type = type;
this.status = status;
this.statement = statement;
this.executorSetting = executorSetting;
this.executorConfig = executorConfig;
this.startTime = LocalDateTime.now();
this.executor = executor;
this.useGateway = useGateway;
Expand All @@ -84,11 +84,11 @@ public Job(
public static Job init(
GatewayType type,
JobConfig jobConfig,
ExecutorSetting executorSetting,
ExecutorConfig executorConfig,
Executor executor,
String statement,
boolean useGateway) {
return new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorSetting, executor, useGateway);
return new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorConfig, executor, useGateway);
}

public JobResult getJobResult() {
Expand Down
10 changes: 6 additions & 4 deletions dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.dinky.assertion.Asserts;
import org.dinky.data.constant.NetConstant;
import org.dinky.executor.ExecutorSetting;
import org.dinky.executor.ExecutorConfig;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.enums.SavePointStrategy;
Expand Down Expand Up @@ -387,16 +387,18 @@ public JobConfig(
this.isJarTask = isJarTask;
}

public ExecutorSetting getExecutorSetting() {
return new ExecutorSetting(
public ExecutorConfig getExecutorSetting() {
return ExecutorConfig.build(
address,
checkpoint,
parallelism,
useSqlFragment,
useStatementSet,
useBatchModel,
savePointPath,
jobName,
configJson);
configJson,
variables);
}

public void buildGatewayConfig(FlinkClusterConfig config) {
Expand Down
Loading

0 comments on commit 0459e09

Please sign in to comment.