From f5f833f2deb2d22b26d7d995612983854f39b07a Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Mon, 16 Dec 2024 21:18:46 +0800 Subject: [PATCH] [BugFix][FlinkJar]Fix the issue where FlinkJar cannot use global variables (#4052) Co-authored-by: zackyoungh --- .../java/org/dinky/data/model/JarSubmitParam.java | 13 +++++++++++++ .../main/java/org/dinky/executor/Executor.java | 7 ++++++- .../main/java/org/dinky/explainer/Explainer.java | 15 --------------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java index dc4bb9b222..8da95a15d9 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/data/model/JarSubmitParam.java @@ -93,4 +93,17 @@ public String getArgs() { } return args; } + + public String getStatement() { + return StrUtil.format( + "EXECUTE JAR WITH (\n" + "'uri'='{}',\n" + + "'main-class'='{}',\n" + + "'args'='{}',\n" + + "'allowNonRestoredState'='{}'\n" + + ");", + getUri(), + getMainClass(), + getArgs(), + getAllowNonRestoredState()); + } } diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index b68bf736b3..1490eef394 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -25,6 +25,7 @@ import org.dinky.data.job.JobStatement; import org.dinky.data.job.JobStatementType; import org.dinky.data.job.SqlType; +import org.dinky.data.model.JarSubmitParam; import org.dinky.data.model.LineageRel; import org.dinky.data.result.SqlExplainResult; import org.dinky.explainer.print_table.PrintStatementExplainer; @@ -66,6 +67,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import cn.hutool.core.codec.Base64; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.URLUtil; @@ -214,7 +216,10 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements) if (operationType.equals(SqlType.SET) || operationType.equals(SqlType.RESET)) { jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType); } else if (operationType.equals(SqlType.EXECUTE_JAR)) { - jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE_JAR, operationType); + JarSubmitParam jarSubmitParam = JarSubmitParam.build(statement); + jarSubmitParam.setUri("base64@" + Base64.encode(pretreatStatement(jarSubmitParam.getArgs()))); + jobStatementPlan.addJobStatement( + jarSubmitParam.toString(), JobStatementType.EXECUTE_JAR, operationType); } else if (operationType.equals(SqlType.EXECUTE)) { jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType); } else if (operationType.equals(SqlType.PRINT)) { diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 0971e510c1..7b6e78583e 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -32,7 +32,6 @@ import org.dinky.explainer.mock.MockStatementExplainer; import org.dinky.function.data.model.UDF; import org.dinky.function.pool.UdfCodePool; -import org.dinky.function.util.UDFUtil; import org.dinky.job.JobConfig; import org.dinky.job.JobManager; import org.dinky.job.JobRunnerFactory; @@ -108,20 +107,6 @@ private void generateUDFStatement(JobStatementPlan jobStatementPlan) { } } - public List parseUDFFromStatements(String[] statements) { - List udfList = new ArrayList<>(); - for (String statement : statements) { - if (statement.isEmpty()) { - continue; - } - UDF udf = UDFUtil.toUDF(statement, jobManager.getDinkyClassLoader()); - if (Asserts.isNotNull(udf)) { - udfList.add(udf); - } - } - return udfList; - } - public ExplainResult explainSql(String statement) { log.info("Start explain FlinkSQL..."); JobStatementPlan jobStatementPlan;