From 6f6a096675f7e2f9c4ddd55e22c3ed1574c82247 Mon Sep 17 00:00:00 2001 From: Corneil du Plessis Date: Fri, 15 Sep 2023 14:00:52 +0200 Subject: [PATCH] Provide for correctly resolving app registration from version supplied during launch. Fixes #5467 --- .../task/AggregateExecutionSupport.java | 3 + .../DefaultAggregateExecutionSupport.java | 41 ++++- .../dataflow/core/DataFlowAppDefinition.java | 2 + .../service/DefaultAppRegistryService.java | 4 +- .../service/TaskExecutionCreationService.java | 2 +- .../service/TaskExecutionInfoService.java | 1 + .../service/impl/DefaultSchedulerService.java | 22 ++- .../impl/DefaultTaskExecutionInfoService.java | 153 ++++++++++-------- ...DefaultTaskExecutionRepositoryService.java | 4 +- .../impl/DefaultTaskExecutionService.java | 57 ++++--- .../controller/TaskControllerTests.java | 19 ++- .../DefaultTaskExecutionServiceTests.java | 52 +++++- 12 files changed, 251 insertions(+), 109 deletions(-) diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateExecutionSupport.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateExecutionSupport.java index 7cfc3ba5d6..1b62fa81a4 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateExecutionSupport.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateExecutionSupport.java @@ -43,12 +43,15 @@ public interface AggregateExecutionSupport { * @return The {@link SchemaVersionTarget} for the taskName specified. */ SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinitionReader taskDefinitionReader); + SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinitionReader taskDefinitionReader); SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinition taskDefinition); + SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinition taskDefinition); /** * Retrieve the {@link AppRegistration} for the registeredName. */ AppRegistration findTaskAppRegistration(String registeredName); + AppRegistration findTaskAppRegistration(String registeredName, String version); /** * Return the {@link AggregateTaskExecution} for the {@link TaskExecution} and Schema Target name specified. diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java index 0e49f165f4..c660c95471 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java @@ -42,7 +42,7 @@ */ public class DefaultAggregateExecutionSupport implements AggregateExecutionSupport { - private static Logger logger = LoggerFactory.getLogger(AggregateExecutionSupport.class); + private static final Logger logger = LoggerFactory.getLogger(AggregateExecutionSupport.class); private final AppRegistryService registryService; @@ -80,12 +80,29 @@ public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefiniti return findSchemaVersionTarget(taskName, definition); } + @Override + public SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinitionReader taskDefinitionReader) { + logger.debug("findSchemaVersionTarget:{}:{}", taskName, version); + TaskDefinition definition = taskDefinitionReader.findTaskDefinition(taskName); + return findSchemaVersionTarget(taskName, version, definition); + } + @Override public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinition taskDefinition) { + return findSchemaVersionTarget(taskName, null, taskDefinition); + } + + @Override + public SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinition taskDefinition) { + logger.debug("findSchemaVersionTarget:{}:{}", taskName, version); String registeredName = taskDefinition != null ? taskDefinition.getRegisteredAppName() : taskName; - AppRegistration registration = findTaskAppRegistration(registeredName); + AppRegistration registration = findTaskAppRegistration(registeredName, version); if (registration == null) { - logger.warn("Cannot find AppRegistration for {}", taskName); + if(StringUtils.hasLength(version)) { + logger.warn("Cannot find AppRegistration for {}:{}", taskName, version); + } else { + logger.warn("Cannot find AppRegistration for {}", taskName); + } return SchemaVersionTarget.defaultTarget(); } final AppRegistration finalRegistration = registration; @@ -101,16 +118,26 @@ public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefiniti throw new IllegalStateException("Multiple SchemaVersionTargets for " + registration.getBootVersion()); } SchemaVersionTarget schemaVersionTarget = versionTargets.get(0); - logger.debug("findSchemaVersionTarget:{}={},{}", taskName, registeredName, schemaVersionTarget); + logger.debug("findSchemaVersionTarget:{}:{}:{}={}", taskName, registeredName, version, schemaVersionTarget); return schemaVersionTarget; } @Override - public AppRegistration findTaskAppRegistration(String registeredAppName) { - AppRegistration registration = registryService.find(registeredAppName, ApplicationType.task); + public AppRegistration findTaskAppRegistration(String registeredName) { + return findTaskAppRegistration(registeredName, null); + } + + @Override + public AppRegistration findTaskAppRegistration(String registeredAppName, String version) { + AppRegistration registration = StringUtils.hasLength(version) ? + registryService.find(registeredAppName, ApplicationType.task, version) : + registryService.find(registeredAppName, ApplicationType.task); if (registration == null) { - registration = registryService.find(registeredAppName, ApplicationType.app); + registration = StringUtils.hasLength(version) ? + registryService.find(registeredAppName, ApplicationType.app, version) : + registryService.find(registeredAppName, ApplicationType.app); } + logger.debug("findTaskAppRegistration:{}:{}={}", registeredAppName, version, registration); return registration; } diff --git a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/DataFlowAppDefinition.java b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/DataFlowAppDefinition.java index dc604a4002..acbeec7d6b 100644 --- a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/DataFlowAppDefinition.java +++ b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/DataFlowAppDefinition.java @@ -109,6 +109,8 @@ public ApplicationType getApplicationType() { return applicationType; } + public AppDefinition getAppDefinition() { return appDefinition; } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/spring-cloud-dataflow-registry/src/main/java/org/springframework/cloud/dataflow/registry/service/DefaultAppRegistryService.java b/spring-cloud-dataflow-registry/src/main/java/org/springframework/cloud/dataflow/registry/service/DefaultAppRegistryService.java index 292e93fcd2..65845f0270 100644 --- a/spring-cloud-dataflow-registry/src/main/java/org/springframework/cloud/dataflow/registry/service/DefaultAppRegistryService.java +++ b/spring-cloud-dataflow-registry/src/main/java/org/springframework/cloud/dataflow/registry/service/DefaultAppRegistryService.java @@ -106,7 +106,9 @@ public AppRegistration find(String name, ApplicationType type) { @Override public AppRegistration find(String name, ApplicationType type, String version) { - return this.appRegistrationRepository.findAppRegistrationByNameAndTypeAndVersion(name, type, version); + AppRegistration registration = this.appRegistrationRepository.findAppRegistrationByNameAndTypeAndVersion(name, type, version); + logger.debug("find:{}:{}:{}={}", name, type, version, registration); + return registration; } @Override diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionCreationService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionCreationService.java index 2fc649d463..e9fae6a26f 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionCreationService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionCreationService.java @@ -30,5 +30,5 @@ public interface TaskExecutionCreationService { * @param taskName the name to be associated with the {@link TaskExecution} * @return {@link TaskExecution} */ - TaskExecution createTaskExecution(String taskName); + TaskExecution createTaskExecution(String taskName, String version); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionInfoService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionInfoService.java index 8367916a27..1392b869e2 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionInfoService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionInfoService.java @@ -61,4 +61,5 @@ TaskExecutionInformation findTaskExecutionInformation(String taskName, List createTaskDeploymentRequests(String taskName, String dslText); Set composedTaskChildNames(String taskName); + Set taskNames(String taskName); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java index c53e13d76b..089540bad3 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java @@ -222,7 +222,12 @@ public void schedule( } String taskAppName = taskDefinition.getRegisteredAppName(); - SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, taskDefinitionReader); + String taskLabel = taskDefinition.getAppDefinition().getName(); + if(!StringUtils.hasText(taskLabel)) { + taskLabel = taskAppName; + } + String version = taskDeploymentProperties.get("version." + taskLabel); + SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, version, taskDefinition); Assert.notNull(schemaVersionTarget, "schemaVersionTarget not found for " + taskAppName); TaskParser taskParser = new TaskParser(taskDefinition.getName(), taskDefinition.getDslText(), true, true); TaskNode taskNode = taskParser.parse(); @@ -258,8 +263,15 @@ public void schedule( if (names.size() > 1) { appId = names.get(1); } - SchemaVersionTarget appSchemaTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(registeredName, taskDefinitionReader); - logger.debug("ctr:{}:registeredName={}, schemaTarget={}", names, registeredName, appSchemaTarget.getName()); + String appVersion = taskDeploymentProperties.get("version." + taskAppName + "-" + appId + "." + appId); + if(!StringUtils.hasText(appVersion)) { + appVersion = taskDeploymentProperties.get("version." + taskAppName + "-" + appId); + } + if(!StringUtils.hasText(appVersion)) { + appVersion = taskDeploymentProperties.get("version." + appId); + } + SchemaVersionTarget appSchemaTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(registeredName, appVersion, taskDefinitionReader); + logger.debug("ctr:{}:registeredName={}, version={}, schemaTarget={}", names, registeredName, appVersion, appSchemaTarget.getName()); taskDeploymentProperties.put("app.composed-task-runner.composed-task-app-properties.app." + scheduleName + "-" + appId + ".spring.cloud.task.tablePrefix", appSchemaTarget.getTaskPrefix()); taskDeploymentProperties.put("app.composed-task-runner.composed-task-app-properties.app." + appId + ".spring.cloud.task.tablePrefix", @@ -437,9 +449,7 @@ private Launcher getTaskLauncher(String platformName) { private List getLaunchers() { List launchers = new ArrayList<>(); for (TaskPlatform taskPlatform : this.taskPlatforms) { - for (Launcher launcher : taskPlatform.getLaunchers()) { - launchers.add(launcher); - } + launchers.addAll(taskPlatform.getLaunchers()); } return launchers; } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionInfoService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionInfoService.java index c5f7629a5c..ca504b3d89 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionInfoService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionInfoService.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer; import org.springframework.cloud.dataflow.core.AllPlatformsTaskExecutionInformation; import org.springframework.cloud.dataflow.core.AppRegistration; import org.springframework.cloud.dataflow.core.ApplicationType; @@ -38,7 +39,6 @@ import org.springframework.cloud.dataflow.core.dsl.TaskNode; import org.springframework.cloud.dataflow.core.dsl.TaskParser; import org.springframework.cloud.dataflow.registry.service.AppRegistryService; -import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer; import org.springframework.cloud.dataflow.server.job.LauncherRepository; import org.springframework.cloud.dataflow.server.repository.NoSuchTaskDefinitionException; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; @@ -64,6 +64,7 @@ * @author Michael Wirth * @author David Turanski * @author Daniel Serleg + * @author Corneil du Plessis */ public class DefaultTaskExecutionInfoService implements TaskExecutionInfoService { private final static Logger logger = LoggerFactory.getLogger(DefaultTaskExecutionInfoService.class); @@ -103,21 +104,23 @@ public class DefaultTaskExecutionInfoService implements TaskExecutionInfoService * @param taskPlatforms the task platforms */ @Deprecated - public DefaultTaskExecutionInfoService(DataSourceProperties dataSourceProperties, - AppRegistryService appRegistryService, - AggregateTaskExplorer taskExplorer, - TaskDefinitionRepository taskDefinitionRepository, - TaskConfigurationProperties taskConfigurationProperties, - LauncherRepository launcherRepository, - List taskPlatforms) { + public DefaultTaskExecutionInfoService( + DataSourceProperties dataSourceProperties, + AppRegistryService appRegistryService, + AggregateTaskExplorer taskExplorer, + TaskDefinitionRepository taskDefinitionRepository, + TaskConfigurationProperties taskConfigurationProperties, + LauncherRepository launcherRepository, + List taskPlatforms + ) { this(dataSourceProperties, - appRegistryService, - taskExplorer, - taskDefinitionRepository, - taskConfigurationProperties, - launcherRepository, - taskPlatforms, - null); + appRegistryService, + taskExplorer, + taskDefinitionRepository, + taskConfigurationProperties, + launcherRepository, + taskPlatforms, + null); } /** @@ -133,14 +136,16 @@ public DefaultTaskExecutionInfoService(DataSourceProperties dataSourceProperties * @param taskPlatforms the task platforms * @param composedTaskRunnerConfigurationProperties the properties used to define the behavior of CTR */ - public DefaultTaskExecutionInfoService(DataSourceProperties dataSourceProperties, - AppRegistryService appRegistryService, - AggregateTaskExplorer taskExplorer, - TaskDefinitionRepository taskDefinitionRepository, - TaskConfigurationProperties taskConfigurationProperties, - LauncherRepository launcherRepository, - List taskPlatforms, - ComposedTaskRunnerConfigurationProperties composedTaskRunnerConfigurationProperties) { + public DefaultTaskExecutionInfoService( + DataSourceProperties dataSourceProperties, + AppRegistryService appRegistryService, + AggregateTaskExplorer taskExplorer, + TaskDefinitionRepository taskDefinitionRepository, + TaskConfigurationProperties taskConfigurationProperties, + LauncherRepository launcherRepository, + List taskPlatforms, + ComposedTaskRunnerConfigurationProperties composedTaskRunnerConfigurationProperties + ) { Assert.notNull(dataSourceProperties, "DataSourceProperties must not be null"); Assert.notNull(appRegistryService, "AppRegistryService must not be null"); Assert.notNull(taskDefinitionRepository, "TaskDefinitionRepository must not be null"); @@ -160,8 +165,10 @@ public DefaultTaskExecutionInfoService(DataSourceProperties dataSourceProperties } @Override - public TaskExecutionInformation findTaskExecutionInformation(String taskName, - Map taskDeploymentProperties, boolean addDatabaseCredentials, Map previousTaskDeploymentProperties) { + public TaskExecutionInformation findTaskExecutionInformation( + String taskName, + Map taskDeploymentProperties, boolean addDatabaseCredentials, Map previousTaskDeploymentProperties + ) { Assert.hasText(taskName, "The provided taskName must not be null or empty."); Assert.notNull(taskDeploymentProperties, "The provided runtimeProperties must not be null."); @@ -169,11 +176,11 @@ public TaskExecutionInformation findTaskExecutionInformation(String taskName, taskExecutionInformation.setTaskDeploymentProperties(taskDeploymentProperties); TaskDefinition originalTaskDefinition = taskDefinitionRepository.findById(taskName) - .orElseThrow(() -> new NoSuchTaskDefinitionException(taskName)); + .orElseThrow(() -> new NoSuchTaskDefinitionException(taskName)); //TODO: This normally called by JPA automatically but `AutoCreateTaskDefinitionTests` fails without this. originalTaskDefinition.initialize(); TaskParser taskParser = new TaskParser(originalTaskDefinition.getName(), originalTaskDefinition.getDslText(), - true, true); + true, true); TaskNode taskNode = taskParser.parse(); // if composed task definition replace definition with one composed task // runner and executable graph. @@ -181,24 +188,24 @@ public TaskExecutionInformation findTaskExecutionInformation(String taskName, AppRegistration appRegistration; if (taskNode.isComposed()) { taskDefinitionToUse = new TaskDefinition(originalTaskDefinition.getName(), - TaskServiceUtils.createComposedTaskDefinition(taskNode.toExecutableDSL())); + TaskServiceUtils.createComposedTaskDefinition(taskNode.toExecutableDSL())); taskExecutionInformation.setTaskDeploymentProperties( - TaskServiceUtils.establishComposedTaskProperties(taskDeploymentProperties, - taskNode)); + TaskServiceUtils.establishComposedTaskProperties(taskDeploymentProperties, + taskNode)); taskDefinitionToUse = TaskServiceUtils.updateTaskProperties(taskDefinitionToUse, - dataSourceProperties, addDatabaseCredentials); + dataSourceProperties, addDatabaseCredentials); try { appRegistration = new AppRegistration(ComposedTaskRunnerConfigurationProperties.COMPOSED_TASK_RUNNER_NAME, - ApplicationType.task, - new URI(TaskServiceUtils.getComposedTaskLauncherUri(this.taskConfigurationProperties, - this.composedTaskRunnerConfigurationProperties))); + ApplicationType.task, + new URI(TaskServiceUtils.getComposedTaskLauncherUri(this.taskConfigurationProperties, + this.composedTaskRunnerConfigurationProperties))); } catch (URISyntaxException e) { throw new IllegalStateException("Invalid Compose Task Runner Resource", e); } } else { taskDefinitionToUse = TaskServiceUtils.updateTaskProperties(originalTaskDefinition, - dataSourceProperties, addDatabaseCredentials); + dataSourceProperties, addDatabaseCredentials); String label = null; if (taskNode.getTaskApp() != null) { @@ -217,10 +224,10 @@ public TaskExecutionInformation findTaskExecutionInformation(String taskName, // if we have version, use that or rely on default version set if (version == null) { appRegistration = appRegistryService.find(taskDefinitionToUse.getRegisteredAppName(), - ApplicationType.task); + ApplicationType.task); } else { appRegistration = appRegistryService.find(taskDefinitionToUse.getRegisteredAppName(), - ApplicationType.task, version); + ApplicationType.task, version); } } @@ -241,42 +248,56 @@ public Set composedTaskChildNames(String taskName) { Set result = new HashSet<>(); TaskNode taskNode = taskParser.parse(); if (taskNode.isComposed()) { - for (TaskApp subTask : taskNode.getTaskApps()) { - logger.debug("subTask:{}:{}:{}", subTask.getName(), subTask.getTaskName(), subTask); - TaskDefinition subTaskDefinition = taskDefinitionRepository.findByTaskName(subTask.getName()); - if (subTaskDefinition != null) { - result.add(subTaskDefinition.getRegisteredAppName() + "," + subTask.getLabel()); - TaskParser subTaskParser = new TaskParser(subTaskDefinition.getTaskName(), subTaskDefinition.getDslText(), true, true); - TaskNode subTaskNode = subTaskParser.parse(); - if (subTaskNode != null && subTaskNode.getTaskApp() != null) { - for (TaskApp subSubTask : subTaskNode.getTaskApps()) { - logger.debug("subSubTask:{}:{}:{}", subSubTask.getName(), subSubTask.getTaskName(), subSubTask); - TaskDefinition subSubTaskDefinition = taskDefinitionRepository.findByTaskName(subSubTask.getName()); - if (subSubTaskDefinition != null) { - if(!subTask.getLabel().contains("$")) { - result.add(subSubTaskDefinition.getRegisteredAppName() + "," + subSubTask.getLabel()); - } else { - result.add(subSubTaskDefinition.getRegisteredAppName()); - } + extractNames(taskNode, result); + } + return result; + } + + @Override + public Set taskNames(String taskName) { + TaskDefinition taskDefinition = taskDefinitionRepository.findByTaskName(taskName); + TaskParser taskParser = new TaskParser(taskDefinition.getTaskName(), taskDefinition.getDslText(), true, true); + Set result = new HashSet<>(); + TaskNode taskNode = taskParser.parse(); + extractNames(taskNode, result); + return result; + } + + private void extractNames(TaskNode taskNode, Set result) { + for (TaskApp subTask : taskNode.getTaskApps()) { + logger.debug("subTask:{}:{}:{}:{}", subTask.getName(), subTask.getTaskName(), subTask.getLabel(), subTask); + TaskDefinition subTaskDefinition = taskDefinitionRepository.findByTaskName(subTask.getName()); + if (subTaskDefinition != null) { + result.add(subTaskDefinition.getRegisteredAppName() + "," + subTask.getLabel()); + TaskParser subTaskParser = new TaskParser(subTaskDefinition.getTaskName(), subTaskDefinition.getDslText(), true, true); + TaskNode subTaskNode = subTaskParser.parse(); + if (subTaskNode != null && subTaskNode.getTaskApp() != null) { + for (TaskApp subSubTask : subTaskNode.getTaskApps()) { + logger.debug("subSubTask:{}:{}:{}:{}", subSubTask.getName(), subSubTask.getTaskName(), subSubTask.getLabel(), subSubTask); + TaskDefinition subSubTaskDefinition = taskDefinitionRepository.findByTaskName(subSubTask.getName()); + if (subSubTaskDefinition != null) { + if (!subTask.getLabel().contains("$")) { + result.add(subSubTaskDefinition.getRegisteredAppName() + "," + subSubTask.getLabel()); + } else { + result.add(subSubTaskDefinition.getRegisteredAppName()); } } } + } + } else { + if ((subTask.getLabel() == null || subTask.getLabel().equals(subTask.getName())) && !subTask.getName().contains("$")) { + result.add(subTask.getName()); } else { - if((subTask.getLabel() == null || subTask.getLabel().equals(subTask.getName())) && !subTask.getName().contains("$")) { + if (!subTask.getName().contains("$") && !subTask.getLabel().contains("$")) { + result.add(subTask.getName() + "," + subTask.getLabel()); + } else if (!subTask.getName().contains("$")) { result.add(subTask.getName()); - } else { - if(!subTask.getName().contains("$") && !subTask.getLabel().contains("$")) { - result.add(subTask.getName() + "," + subTask.getLabel()); - } else if(!subTask.getName().contains("$")) { - result.add(subTask.getName()); - } else if(!subTask.getTaskName().contains("$")) { - result.add(subTask.getTaskName()); - } + } else if (!subTask.getTaskName().contains("$")) { + result.add(subTask.getTaskName()); } } } } - return result; } @Override @@ -293,7 +314,7 @@ public List createTaskDeploymentRequests(String taskName, TaskNode subTaskNode = subTaskParser.parse(); String subTaskName = subTaskNode.getTaskApp().getName(); AppRegistration appRegistration = appRegistryService.find(subTaskName, - ApplicationType.task); + ApplicationType.task); Assert.notNull(appRegistration, "Unknown task app: " + subTask.getName()); Resource appResource = appRegistryService.getAppResource(appRegistration); @@ -302,7 +323,7 @@ public List createTaskDeploymentRequests(String taskName, AppDefinition appDefinition = new AppDefinition(subTask.getName(), subTaskNode.getTaskApp().getArgumentsAsMap()); AppDeploymentRequest appDeploymentRequest = new AppDeploymentRequest(appDefinition, - appResource, null, null); + appResource, null, null); appDeploymentRequests.add(appDeploymentRequest); } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionRepositoryService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionRepositoryService.java index aadd817d39..79c449cd63 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionRepositoryService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionRepositoryService.java @@ -54,8 +54,8 @@ public DefaultTaskExecutionRepositoryService( @Override @Transactional(propagation = Propagation.REQUIRES_NEW) - public TaskExecution createTaskExecution(String taskName) { - SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(taskName, taskDefinitionReader); + public TaskExecution createTaskExecution(String taskName, String version) { + SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(taskName, version, taskDefinitionReader); TaskRepository taskRepository = this.taskRepositoryContainer.get(schemaVersionTarget.getName()); return taskRepository.createTaskExecution(taskName); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java index 25576fd0a3..7d10cff4c0 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java @@ -379,7 +379,8 @@ public LaunchResponse executeTask(String taskName, Map taskDeplo TaskDefinition taskDefinition = taskDefinitionRepository.findByTaskName(taskName); String taskAppName = taskDefinition != null ? taskDefinition.getRegisteredAppName() : taskName; - SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, taskDefinitionReader); + + SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, taskDefinition); Assert.notNull(schemaVersionTarget, "schemaVersionTarget not found for " + taskAppName); DataflowTaskExecutionMetadataDao dataflowTaskExecutionMetadataDao = dataflowTaskExecutionMetadataDaoContainer.get(schemaVersionTarget.getName()); @@ -394,23 +395,32 @@ public LaunchResponse executeTask(String taskName, Map taskDeplo TaskExecutionInformation taskExecutionInformation = findOrCreateTaskExecutionInformation(taskName, deploymentProperties, launcher.getType(), previousTaskDeploymentProperties); - - // pre process command-line args - // moving things like app.