From 3c5e02113d7eae1e576d19599478c3d709c78624 Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Mon, 19 Feb 2024 18:49:22 +0800 Subject: [PATCH] [BugFix][Jar] Fix Flink jar submission and set parallelism failure issue (#3165) Co-authored-by: zackyoungh --- .../java/org/dinky/trans/dml/ExecuteJarOperation.java | 6 +++++- docs/docs/extend/expand_statements/execute_jar.md | 10 +++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 255583da41..61209ef87e 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -38,6 +38,7 @@ import java.io.File; import java.util.Optional; +import cn.hutool.core.convert.Convert; import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Opt; import cn.hutool.core.util.StrUtil; @@ -91,7 +92,10 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable .setSavepointRestoreSettings(savepointRestoreSettings) .setArguments(RunTimeUtil.handleCmds(submitParam.getArgs())) .build(); - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, 1, true); + int parallelism = StrUtil.isNumeric(submitParam.getParallelism()) + ? Convert.toInt(submitParam.getParallelism()) + : tEnv.getStreamExecutionEnvironment().getParallelism(); + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true); program.close(); Assert.isTrue(pipeline instanceof StreamGraph, "can not translate"); return (StreamGraph) pipeline; diff --git a/docs/docs/extend/expand_statements/execute_jar.md b/docs/docs/extend/expand_statements/execute_jar.md index 91fc879a7a..e40565b56b 100644 --- a/docs/docs/extend/expand_statements/execute_jar.md +++ b/docs/docs/extend/expand_statements/execute_jar.md @@ -14,6 +14,8 @@ title: EXECUTE JAR 如果使用 `Checkpoint` 或 `Savepoint` ,请在右边作业,选择 `Savepoint策略`,其次检查点 跳过 请使用 execution.savepoint.ignore-unclaimed-state: true 参数控制 + +此flink sql jar任务支持 `set` 和 `add customjar` 联动使用 ::: ## 语法结构 @@ -23,8 +25,7 @@ title: EXECUTE JAR EXECUTE JAR WITH ( 'uri'='.jar', -- 该参数 必填 'main-class'='', -- 该参数 必填 -'args'='', -- 主类入参 该参数可选 -'parallelism'='', -- 任务并行度 该参数可选 +'args'='' -- 主类入参 该参数可选 ); ``` @@ -35,15 +36,14 @@ EXECUTE JAR WITH ( EXECUTE JAR WITH ( 'uri'='rs:/jar/flink/demo/SocketWindowWordCount.jar', 'main-class'='org.apache.flink.streaming.examples.socket', -'args'=' --hostname localhost ', -'parallelism'='' +'args'=' --hostname localhost ' ); ``` :::warning 注意 1. 以上示例中, uri 的值为 rs:/jar/flink/demo/SocketWindowWordCount.jar, 该值为资源中心中的资源路径, 请确保资源中心中存在该资源,请忽略资源中心 Root 节点(该节点为虚拟节点) -2. 如果要读取S3,HDFS,LCOAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式 +2. 如果要读取S3,HDFS,LOCAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式 ::: ## PyFlink 任务提交: