From 93c43d1c4cbf9b00333116419b5e2ad528131bbe Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Fri, 1 Mar 2024 09:24:55 +0800 Subject: [PATCH] [Feature] [Flink SQL] Added ADD FILE syntax not only submits JAR to Java ClassPath (#3205) Co-authored-by: zackyoungh --- .../dinky/configure/cache/PaimonCache.java | 2 +- .../resource/impl/ResourceServiceImpl.java | 4 +- .../org/dinky/app/flinksql/Submitter.java | 7 ++ .../main/java/org/dinky/parser/SqlType.java | 1 + .../dinky/trans/ddl/AddFilerOperation.java | 50 ++++++++++ .../dinky/trans/dml/ExecuteJarOperation.java | 18 +++- .../trans/parse/AddFileSqlParseStrategy.java | 96 +++++++++++++++++++ .../context/FlinkUdfPathContextHolder.java | 21 +++- .../java/org/dinky/explainer/Explainer.java | 14 ++- .../main/java/org/dinky/job/JobManager.java | 14 +++ .../job/builder/JobJarStreamGraphBuilder.java | 9 +- .../org/dinky/job/builder/JobUDFBuilder.java | 14 +-- .../org/dinky/utils/DinkyClassLoaderUtil.java | 3 +- .../org/dinky/gateway/yarn/YarnGateway.java | 2 +- 14 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/AddFilerOperation.java create mode 100644 dinky-client/dinky-client-base/src/main/java/org/dinky/trans/parse/AddFileSqlParseStrategy.java diff --git a/dinky-admin/src/main/java/org/dinky/configure/cache/PaimonCache.java b/dinky-admin/src/main/java/org/dinky/configure/cache/PaimonCache.java index 4f10645517..7076e35222 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/cache/PaimonCache.java +++ b/dinky-admin/src/main/java/org/dinky/configure/cache/PaimonCache.java @@ -46,7 +46,7 @@ public class PaimonCache extends AbstractValueAdaptingCache { /** * TIMEOUT CACHE */ - private final cn.hutool.cache.Cache cache = new TimedCache<>(1000 * 60 * 10); + private final cn.hutool.cache.Cache cache = new TimedCache<>(1000 * 60); public PaimonCache(String cacheName) { super(true); diff --git a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java index ddd234904d..38c73da3ea 100644 --- a/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/resource/impl/ResourceServiceImpl.java @@ -255,7 +255,7 @@ public void uploadFile(Integer pid, String desc, File file) { * @param size size */ @Transactional(rollbackFor = Exception.class) - private void upload( + public void upload( Integer pid, String desc, Consumer uploadAction, String fileName, Resources pResource, long size) { Resources currentUploadResource = getOne( new LambdaQueryWrapper().eq(Resources::getPid, pid).eq(Resources::getFileName, fileName)); @@ -272,7 +272,7 @@ private void upload( resources.setIsDirectory(false); resources.setType(0); String prefixPath = pResource == null ? "" : pResource.getFullName(); - fullName = prefixPath + "/" + fileName; + fullName = StrUtil.removePrefix(prefixPath + "/" + fileName, StrUtil.SLASH); resources.setFullName(fullName); resources.setSize(size); diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 13f533730b..ddc0ebb93f 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -38,6 +38,7 @@ import org.dinky.resource.BaseResourceManager; import org.dinky.trans.Operations; import org.dinky.trans.dml.ExecuteJarOperation; +import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.trans.parse.ExecuteJarParseStrategy; import org.dinky.url.RsURLStreamHandlerFactory; @@ -274,6 +275,12 @@ public static Optional executeJarJob(String type, Executor executor, if ("kubernetes-application".equals(type)) { executor.addJar(info); } + } else if (Operations.getOperationType(sqlStatement) == SqlType.ADD_FILE) { + File[] info = AddFileSqlParseStrategy.getInfo(sqlStatement); + Arrays.stream(info).forEach(executor.getDinkyClassLoader().getUdfPathContextHolder()::addFile); + if ("kubernetes-application".equals(type)) { + executor.addJar(info); + } } } return jobClient; diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java index 7db21aec4c..3dbae13f3e 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/parser/SqlType.java @@ -59,6 +59,7 @@ public enum SqlType { ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+"), ADD("ADD", "^ADD\\s+CUSTOMJAR\\s+\\S+"), + ADD_FILE("ADD_FILE", "^ADD\\s+FILE\\s+\\S+"), PRINT("PRINT", "^PRINT.*"), diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/AddFilerOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/AddFilerOperation.java new file mode 100644 index 0000000000..8dfa31ff55 --- /dev/null +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/AddFilerOperation.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.trans.ddl; + +import org.dinky.executor.CustomTableEnvironment; +import org.dinky.trans.AbstractOperation; +import org.dinky.trans.ExtendOperation; + +import org.apache.flink.table.api.TableResult; + +import java.util.Optional; + +/** + * @since 0.7.0 + */ +public class AddFilerOperation extends AbstractOperation implements ExtendOperation { + + public AddFilerOperation(String statement) { + super(statement); + } + + public AddFilerOperation() {} + + @Override + public Optional execute(CustomTableEnvironment tEnv) { + return Optional.of(TABLE_RESULT_OK); + } + + @Override + public String asSummaryString() { + return statement; + } +} diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 61209ef87e..d745144553 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -36,6 +36,9 @@ import org.apache.flink.table.api.TableResult; import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.List; import java.util.Optional; import cn.hutool.core.convert.Convert; @@ -63,11 +66,16 @@ public Optional execute(CustomTableEnvironment tEnv) { } public StreamGraph getStreamGraph(CustomTableEnvironment tEnv) { + return getStreamGraph(tEnv, Collections.emptyList()); + } + + public StreamGraph getStreamGraph(CustomTableEnvironment tEnv, List classpaths) { JarSubmitParam submitParam = JarSubmitParam.build(statement); - return getStreamGraph(submitParam, tEnv); + return getStreamGraph(submitParam, tEnv, classpaths); } - public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTableEnvironment tEnv) { + public static StreamGraph getStreamGraph( + JarSubmitParam submitParam, CustomTableEnvironment tEnv, List classpaths) { SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath()) ? SavepointRestoreSettings.none() : SavepointRestoreSettings.forPath( @@ -85,12 +93,14 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable + Opt.ofBlankAble(submitParam.getArgs()).orElse("")); file = null; } + program = PackagedProgram.newBuilder() .setJarFile(file) .setEntryPointClassName(submitParam.getMainClass()) .setConfiguration(configuration) .setSavepointRestoreSettings(savepointRestoreSettings) .setArguments(RunTimeUtil.handleCmds(submitParam.getArgs())) + .setUserClassPaths(classpaths) .build(); int parallelism = StrUtil.isNumeric(submitParam.getParallelism()) ? Convert.toInt(submitParam.getParallelism()) @@ -113,6 +123,10 @@ public StreamGraph explain(CustomTableEnvironment tEnv) { return getStreamGraph(tEnv); } + public StreamGraph explain(CustomTableEnvironment tEnv, List classpaths) { + return getStreamGraph(tEnv, classpaths); + } + @Setter @Getter public static class JarSubmitParam { diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/parse/AddFileSqlParseStrategy.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/parse/AddFileSqlParseStrategy.java new file mode 100644 index 0000000000..f291b6b6c6 --- /dev/null +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/parse/AddFileSqlParseStrategy.java @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.trans.parse; + +import org.dinky.data.exception.DinkyException; +import org.dinky.trans.ddl.AddFilerOperation; +import org.dinky.utils.URLUtils; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.parse.AbstractRegexParseStrategy; + +import java.io.File; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; + +/** + * @since 0.7.0 + */ +public class AddFileSqlParseStrategy extends AbstractRegexParseStrategy { + + private static final String ADD_FILE = "(add\\s+file)\\s+'(.*)'"; + private static final Pattern ADD_FILE_PATTERN = Pattern.compile(ADD_FILE, Pattern.CASE_INSENSITIVE); + public static final AddFileSqlParseStrategy INSTANCE = new AddFileSqlParseStrategy(); + + public AddFileSqlParseStrategy() { + super(ADD_FILE_PATTERN); + } + + public static File[] getInfo(String statement) { + return getAllFilePath(statement).toArray(new File[0]); + } + + protected static List patternStatements(String[] statements) { + return Stream.of(statements) + .filter(s -> ReUtil.isMatch(ADD_FILE_PATTERN, s)) + .map(x -> ReUtil.findAllGroup0(ADD_FILE_PATTERN, x).get(0)) + .collect(Collectors.toList()); + } + + public static Set getAllFilePath(String... statements) { + Set fileSet = new HashSet<>(); + patternStatements(statements).stream() + .map(x -> ReUtil.findAll(ADD_FILE_PATTERN, x, 2).get(0)) + .distinct() + .forEach(urlPath -> { + try { + File file = URLUtils.toFile(urlPath); + if (file == null || !file.exists()) { + throw new DinkyException(StrUtil.format("file : {} not exists!", urlPath)); + } + fileSet.add(file); + } catch (Exception e) { + throw new DinkyException(StrUtil.format("url:{} request failed!", urlPath), e); + } + }); + return fileSet; + } + + public static Set getAllFilePath(String statements) { + return getAllFilePath(new String[] {statements}); + } + + @Override + public Operation convert(String statement) { + return new AddFilerOperation(statement); + } + + @Override + public String[] getHints() { + return new String[0]; + } +} diff --git a/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java b/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java index 59c9305ba4..0b00a6b9ac 100644 --- a/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java +++ b/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java @@ -23,17 +23,24 @@ import java.util.HashSet; import java.util.Set; -/** @since 0.7.0 */ +/** + * @since 0.7.0 + */ public class FlinkUdfPathContextHolder { private final Set UDF_PATH_CONTEXT = new HashSet<>(); private final Set OTHER_PLUGINS_PATH_CONTEXT = new HashSet<>(); private final Set PYTHON_UDF_FILE = new HashSet<>(); + private final Set FILES = new HashSet<>(); public void addUdfPath(File file) { getUdfFile().add(file); } + public void addFile(File file) { + getFiles().add(file); + } + public void addPyUdfPath(File file) { getPyUdfFile().add(file); } @@ -53,4 +60,16 @@ public Set getPyUdfFile() { public Set getOtherPluginsFiles() { return OTHER_PLUGINS_PATH_CONTEXT; } + + public Set getAllFileSet() { + Set allFileSet = new HashSet<>(); + allFileSet.addAll(getUdfFile()); + allFileSet.addAll(getOtherPluginsFiles()); + allFileSet.addAll(getFiles()); + return allFileSet; + } + + public Set getFiles() { + return FILES; + } } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 25064b357f..f151b84701 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -38,6 +38,7 @@ import org.dinky.parser.SqlType; import org.dinky.trans.Operations; import org.dinky.trans.dml.ExecuteJarOperation; +import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.trans.parse.ExecuteJarParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; @@ -51,6 +52,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import java.net.URL; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; @@ -119,6 +121,12 @@ public JobParam pretreatStatements(String[] statements) { (executor.getDinkyClassLoader()) .addURLs(URLUtils.getURLs( jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); + } else if (operationType.equals(SqlType.ADD_FILE)) { + AddFileSqlParseStrategy.getAllFilePath(statement) + .forEach(t -> jobManager.getUdfPathContextHolder().addFile(t)); + (executor.getDinkyClassLoader()) + .addURLs(URLUtils.getURLs( + jobManager.getUdfPathContextHolder().getFiles())); } else if (operationType.equals(SqlType.ADD_JAR)) { Configuration combinationConfig = getCombinationConfig(); FileSystem.initialize(combinationConfig, null); @@ -285,8 +293,10 @@ public ExplainResult explainSql(String statement) { if (Asserts.isNull(sqlExplainResult)) { sqlExplainResult = new SqlExplainResult(); } else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) { - StreamGraph streamGraph = - new ExecuteJarOperation(item.getValue()).explain(executor.getCustomTableEnvironment()); + + List allFileByAdd = jobManager.getAllFileSet(); + StreamGraph streamGraph = new ExecuteJarOperation(item.getValue()) + .explain(executor.getCustomTableEnvironment(), allFileByAdd); sqlExplainResult.setExplain(streamGraph.getStreamingPlanAsJSON()); } else { executor.executeSql(item.getValue()); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 7bffba5162..ec7d8a1421 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -57,6 +57,7 @@ import org.dinky.job.builder.JobUDFBuilder; import org.dinky.parser.SqlType; import org.dinky.trans.Operations; +import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.JsonUtils; @@ -81,8 +82,10 @@ import java.io.File; import java.io.IOException; import java.lang.ref.WeakReference; +import java.net.URL; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -90,6 +93,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.text.StrFormatter; import lombok.extern.slf4j.Slf4j; @@ -374,6 +378,9 @@ public IResult executeDDL(String statement) { } else if (operationType.equals(SqlType.ADD) || operationType.equals(SqlType.ADD_JAR)) { Set allFilePath = AddJarSqlParseStrategy.getAllFilePath(item); getExecutor().getDinkyClassLoader().addURLs(allFilePath); + } else if (operationType.equals(SqlType.ADD_FILE)) { + Set allFilePath = AddFileSqlParseStrategy.getAllFilePath(item); + getExecutor().getDinkyClassLoader().addURLs(allFilePath); } LocalDateTime startTime = LocalDateTime.now(); TableResult tableResult = executor.executeSql(newStatement); @@ -505,4 +512,11 @@ public String exportSql(String sql) { sb.append(statement); return sb.toString(); } + + public List getAllFileSet() { + return CollUtil.isEmpty(getUdfPathContextHolder().getAllFileSet()) + ? Collections.emptyList() + : Arrays.asList(URLUtils.getURLs( + getUdfPathContextHolder().getAllFileSet().toArray(new File[0]))); + } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 59d613f2f7..4cd6653076 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -27,6 +27,7 @@ import org.dinky.trans.Operations; import org.dinky.trans.ddl.CustomSetOperation; import org.dinky.trans.dml.ExecuteJarOperation; +import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.trans.parse.ExecuteJarParseStrategy; import org.dinky.trans.parse.SetSqlParseStrategy; @@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.File; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -76,10 +78,15 @@ public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyCla Set files = AddJarSqlParseStrategy.getAllFilePath(sqlStatement); files.forEach(executor::addJar); files.forEach(jobManager.getUdfPathContextHolder()::addOtherPlugins); + } else if (operationType.equals(SqlType.ADD_FILE)) { + Set files = AddFileSqlParseStrategy.getAllFilePath(sqlStatement); + files.forEach(executor::addJar); + files.forEach(jobManager.getUdfPathContextHolder()::addFile); } } Assert.notNull(executeJarOperation, () -> new DinkyException("Not found execute jar operation.")); - return executeJarOperation.explain(executor.getCustomTableEnvironment()); + List urLs = jobManager.getAllFileSet(); + return executeJarOperation.explain(executor.getCustomTableEnvironment(), urLs); } public List getUris(String statement) { diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java index d62fd49222..ee1a1dca5b 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java @@ -34,10 +34,8 @@ import java.io.File; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; @@ -70,15 +68,7 @@ public void run() throws Exception { } // 1. Obtain the path of the jar package and inject it into the remote environment List jarFiles = - new ArrayList<>(jobManager.getUdfPathContextHolder().getUdfFile()); - - Set otherPluginsFiles = jobManager.getUdfPathContextHolder().getOtherPluginsFiles(); - jarFiles.addAll(otherPluginsFiles); - - List udfJars = Arrays.stream(UDFUtil.initJavaUDF(udfList, runMode, taskId)) - .map(File::new) - .collect(Collectors.toList()); - jarFiles.addAll(udfJars); + new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); String[] jarPaths = CollUtil.removeNull(jarFiles).stream() .map(File::getAbsolutePath) @@ -117,7 +107,7 @@ public void run() throws Exception { UDFUtil.addConfigurationClsAndJars( jobManager.getExecutor().getCustomTableEnvironment(), jarList, - CollUtil.newArrayList(URLUtils.getURLs(otherPluginsFiles))); + CollUtil.newArrayList(URLUtils.getURLs(jarFiles))); } catch (Exception e) { throw new RuntimeException("add configuration failed: ", e); } diff --git a/dinky-core/src/main/java/org/dinky/utils/DinkyClassLoaderUtil.java b/dinky-core/src/main/java/org/dinky/utils/DinkyClassLoaderUtil.java index bd3cf09349..37be2b6f25 100644 --- a/dinky-core/src/main/java/org/dinky/utils/DinkyClassLoaderUtil.java +++ b/dinky-core/src/main/java/org/dinky/utils/DinkyClassLoaderUtil.java @@ -64,7 +64,6 @@ public static void initClassLoader(JobConfig config, DinkyClassLoader dinkyClass } } - dinkyClassLoader.addURLs( - CollUtil.addAll(udfPathContextHolder.getUdfFile(), udfPathContextHolder.getOtherPluginsFiles())); + dinkyClassLoader.addURLs(udfPathContextHolder.getAllFileSet()); } } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index 8989fef7f7..a5a05d2d3a 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -312,7 +312,7 @@ protected YarnClusterDescriptor createYarnClusterDescriptorWithJar(FlinkUdfPathC Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList())); yarnClusterDescriptor.addShipFiles(new ArrayList<>(udfPathContextHolder.getPyUdfFile())); } - Set otherPluginsFiles = udfPathContextHolder.getOtherPluginsFiles(); + Set otherPluginsFiles = udfPathContextHolder.getAllFileSet(); if (CollUtil.isNotEmpty(otherPluginsFiles)) { yarnClusterDescriptor.addShipFiles(CollUtil.newArrayList(otherPluginsFiles));