diff --git a/pom.xml b/pom.xml
index 67a68f459a..6591f0e050 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,8 +76,7 @@
spring-cloud-dataflow-server
spring-cloud-dataflow-tasklauncher
spring-cloud-dataflow-single-step-batch-job
-
-
+ spring-cloud-dataflow-composed-task-runner
spring-cloud-dataflow-test
spring-cloud-dataflow-dependencies
spring-cloud-dataflow-classic-docs
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedBatchConfigurer.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedBatchConfigurer.java
deleted file mode 100644
index 12f2118cf0..0000000000
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedBatchConfigurer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2017-2021 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.dataflow.composedtaskrunner;
-
-import javax.sql.DataSource;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.batch.core.repository.JobRepository;
-import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
-
-import org.springframework.boot.autoconfigure.batch.BatchProperties;
-import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
-import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
-import org.springframework.cloud.dataflow.composedtaskrunner.support.ComposedTaskException;
-import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
-import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
-
-/**
- * A BatchConfigurer for CTR that will establish the transaction isolation level to ISOLATION_REPEATABLE_READ by default.
- *
- * @author Glenn Renfro
- */
-public class ComposedBatchConfigurer extends BasicBatchConfigurer {
-
- private static final Logger logger = LoggerFactory.getLogger(ComposedBatchConfigurer.class);
-
- private DataSource incrementerDataSource;
-
- private Map incrementerMap;
-
- private ComposedTaskProperties composedTaskProperties;
-
- /**
- * Create a new {@link BasicBatchConfigurer} instance.
- *
- * @param properties the batch properties
- * @param dataSource the underlying data source
- * @param transactionManagerCustomizers transaction manager customizers (or
- * {@code null})
- * @param composedTaskProperties composed task properties
- */
- protected ComposedBatchConfigurer(BatchProperties properties, DataSource dataSource,
- TransactionManagerCustomizers transactionManagerCustomizers, ComposedTaskProperties composedTaskProperties) {
- super(properties, dataSource, transactionManagerCustomizers);
- this.incrementerDataSource = dataSource;
- incrementerMap = new HashMap<>();
- this.composedTaskProperties = composedTaskProperties;
- }
-
- protected JobRepository createJobRepository() {
- return getJobRepository();
- }
-
- @Override
- public JobRepository getJobRepository() {
- JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
- MultiSchemaIncrementerFactory incrementerFactory = new MultiSchemaIncrementerFactory(this.incrementerDataSource);
- factory.setIncrementerFactory(incrementerFactory);
- factory.setDataSource(this.incrementerDataSource);
- factory.setTransactionManager(this.getTransactionManager());
- factory.setIsolationLevelForCreate(this.composedTaskProperties.getTransactionIsolationLevel());
- try {
- factory.afterPropertiesSet();
- return factory.getObject();
- }
- catch (Exception exception) {
- throw new ComposedTaskException(exception.getMessage());
- }
- }
-}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerJobFactory.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerJobFactory.java
index e19f537255..7013edd36b 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerJobFactory.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerJobFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2021 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,11 +27,12 @@
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
-import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.FlowJobBuilder;
+import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
+import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
@@ -65,7 +66,7 @@ public class ComposedRunnerJobFactory implements FactoryBean {
private TaskExecutor taskExecutor;
@Autowired
- private JobBuilderFactory jobBuilderFactory;
+ private JobRepository jobRepository;
@Autowired
private TaskNameResolver taskNameResolver;
@@ -105,9 +106,8 @@ public Job getObject() throws Exception {
taskParser.parse().accept(composedRunnerVisitor);
this.visitorDeque = composedRunnerVisitor.getFlow();
-
- FlowJobBuilder builder = this.jobBuilderFactory
- .get(this.taskNameResolver.getTaskName())
+ JobBuilder jobBuilder = new JobBuilder(this.taskNameResolver.getTaskName(), jobRepository);
+ FlowJobBuilder builder = jobBuilder
.start(this.flowBuilder
.start(createFlow())
.end())
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java
index 3b6c526600..e13f4bd945 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2023 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,34 +17,21 @@
package org.springframework.cloud.dataflow.composedtaskrunner;
import javax.sql.DataSource;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecutionListener;
-import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
-import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
-import org.springframework.boot.autoconfigure.batch.BatchProperties;
-import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
-import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean;
-import org.springframework.cloud.dataflow.core.dsl.TaskParser;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.listener.TaskExecutionListener;
import org.springframework.cloud.task.repository.TaskExplorer;
-import org.springframework.cloud.task.repository.support.SimpleTaskExplorer;
-import org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
-import org.springframework.core.env.Environment;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.StringUtils;
+import org.springframework.transaction.PlatformTransactionManager;
/**
* Configures the Job that will execute the Composed Task Execution.
@@ -52,13 +39,11 @@
* @author Glenn Renfro
* @author Corneil du Plessis
*/
-@EnableBatchProcessing
@EnableTask
@EnableConfigurationProperties(ComposedTaskProperties.class)
@Configuration
@Import(org.springframework.cloud.dataflow.composedtaskrunner.StepBeanDefinitionRegistrar.class)
public class ComposedTaskRunnerConfiguration {
- private final static Logger logger = LoggerFactory.getLogger(ComposedTaskRunnerConfiguration.class);
@Bean
public TaskExecutionListener taskExecutionListener() {
@@ -66,54 +51,8 @@ public TaskExecutionListener taskExecutionListener() {
}
@Bean
- public StepExecutionListener composedTaskStepExecutionListener(TaskExplorerContainer taskExplorerContainer) {
- return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorerContainer);
- }
-
- @Bean
- TaskExplorerContainer taskExplorerContainer(TaskExplorer taskExplorer, DataSource dataSource, ComposedTaskProperties properties, Environment env) {
- Map explorers = new HashMap<>();
- String ctrName = env.getProperty("spring.cloud.task.name");
- if (!StringUtils.hasText(ctrName)) {
- throw new IllegalStateException("spring.cloud.task.name property must have a value.");
- }
- TaskParser parser = new TaskParser("ctr", properties.getGraph(), false, true);
- StepBeanDefinitionRegistrar.TaskAppsMapCollector collector = new StepBeanDefinitionRegistrar.TaskAppsMapCollector();
- parser.parse().accept(collector);
- Set taskNames = collector.getTaskApps().keySet();
- logger.debug("taskExplorerContainer:taskNames:{}", taskNames);
- for (String taskName : taskNames) {
- addTaskExplorer(dataSource, properties, env, explorers, taskName);
- String appName = taskName.replace(ctrName + "-", "");
- addTaskExplorer(dataSource, properties, env, explorers, appName);
- if(taskName.length() > ctrName.length()) {
- String shortTaskName = taskName.substring(ctrName.length() + 1);
- addTaskExplorer(dataSource, properties, env, explorers, shortTaskName);
- }
- }
- return new TaskExplorerContainer(explorers, taskExplorer);
- }
-
- private static void addTaskExplorer(
- DataSource dataSource,
- ComposedTaskProperties properties,
- Environment env,
- Map explorers,
- String taskName
- ) {
- logger.debug("addTaskExplorer:{}", taskName);
- String propertyName = String.format("app.%s.spring.cloud.task.tablePrefix", taskName);
- String prefix = properties.getComposedTaskAppProperties().get(propertyName);
- if (prefix == null) {
- prefix = env.getProperty(propertyName);
- }
- if (prefix != null) {
- TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
- logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
- explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
- } else {
- logger.warn("Cannot find {} in {} ", propertyName, properties.getComposedTaskAppProperties());
- }
+ public StepExecutionListener composedTaskStepExecutionListener(TaskExplorer taskExplorer) {
+ return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorer);
}
@Bean
@@ -128,25 +67,21 @@ public TaskExecutor taskExecutor(ComposedTaskProperties properties) {
taskExecutor.setMaxPoolSize(properties.getSplitThreadMaxPoolSize());
taskExecutor.setKeepAliveSeconds(properties.getSplitThreadKeepAliveSeconds());
taskExecutor.setAllowCoreThreadTimeOut(
- properties.isSplitThreadAllowCoreThreadTimeout());
+ properties.isSplitThreadAllowCoreThreadTimeout());
taskExecutor.setQueueCapacity(properties.getSplitThreadQueueCapacity());
taskExecutor.setWaitForTasksToCompleteOnShutdown(
- properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
+ properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
return taskExecutor;
}
+ /**
+ * Provides the {@link JobRepository} that is configured to be used by the composed task runner.
+ */
@Bean
- public BatchConfigurer getComposedBatchConfigurer(
- BatchProperties properties,
- DataSource dataSource,
- TransactionManagerCustomizers transactionManagerCustomizers,
- ComposedTaskProperties composedTaskProperties
- ) {
- return new ComposedBatchConfigurer(
- properties,
- dataSource,
- transactionManagerCustomizers,
- composedTaskProperties
- );
+ public BeanPostProcessor jobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager,
+ DataSource incrementerDataSource,
+ ComposedTaskProperties composedTaskProperties) {
+ return new JobRepositoryBeanPostProcessor(transactionManager, incrementerDataSource, composedTaskProperties);
}
+
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactory.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactory.java
index 789cc83ed4..8a361b8cff 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactory.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2021 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,18 +31,21 @@
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
-import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
import org.springframework.cloud.dataflow.core.Base64Utils;
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
import org.springframework.cloud.task.configuration.TaskProperties;
+import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.core.env.Environment;
import org.springframework.hateoas.mediatype.hal.Jackson2HalModule;
import org.springframework.security.oauth2.client.endpoint.OAuth2AccessTokenResponseClient;
import org.springframework.security.oauth2.client.endpoint.OAuth2ClientCredentialsGrantRequest;
import org.springframework.security.oauth2.client.registration.ClientRegistrationRepository;
+import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
@@ -74,13 +77,16 @@ public class ComposedTaskRunnerStepFactory implements FactoryBean {
private List arguments = new ArrayList<>();
@Autowired
- private StepBuilderFactory steps;
+ private JobRepository jobRepository;
+
+ @Autowired
+ private PlatformTransactionManager transactionManager;
@Autowired
private StepExecutionListener composedTaskStepExecutionListener;
@Autowired
- private TaskExplorerContainer taskExplorerContainer;
+ private TaskExplorer taskExplorer;
@Autowired
private TaskProperties taskProperties;
@@ -133,7 +139,7 @@ public Step getObject() {
TaskLauncherTasklet taskLauncherTasklet = new TaskLauncherTasklet(
this.clientRegistrations,
this.clientCredentialsTokenResponseClient,
- this.taskExplorerContainer.get(this.taskNameId),
+ this.taskExplorer,
this.composedTaskPropertiesFromEnv,
this.taskName,
taskProperties,
@@ -168,9 +174,9 @@ public Step getObject() {
taskLauncherTasklet.setProperties(propertiesToUse);
logger.debug("Properties to use {}", propertiesToUse);
-
- return this.steps.get(this.taskName)
- .tasklet(taskLauncherTasklet)
+ StepBuilder stepBuilder = new StepBuilder(this.taskName, this.jobRepository);
+ return stepBuilder
+ .tasklet(taskLauncherTasklet, this.transactionManager)
.transactionAttribute(getTransactionAttribute())
.listener(this.composedTaskStepExecutionListener)
.build();
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListener.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListener.java
index d494195569..da7fa5a541 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListener.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2023 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,12 +38,11 @@
public class ComposedTaskStepExecutionListener extends StepExecutionListenerSupport {
private final static Logger logger = LoggerFactory.getLogger(ComposedTaskStepExecutionListener.class);
- private final TaskExplorerContainer taskExplorerContainer;
+ private final TaskExplorer taskExplorer;
- public ComposedTaskStepExecutionListener(TaskExplorerContainer taskExplorerContainer) {
- Assert.notNull(taskExplorerContainer, "taskExplorerContainer must not be null.");
- this.taskExplorerContainer = taskExplorerContainer;
- logger.info("ComposedTaskStepExecutionListener supporting {}", taskExplorerContainer.getKeys());
+ public ComposedTaskStepExecutionListener(TaskExplorer taskExplorer) {
+ Assert.notNull(taskExplorer, "taskExplorer must not be null.");
+ this.taskExplorer = taskExplorer;
}
/**
@@ -66,18 +65,6 @@ public ExitStatus afterStep(StepExecution stepExecution) {
Long executionId = (Long) stepExecution.getExecutionContext().get("task-execution-id");
Assert.notNull(executionId, "TaskLauncherTasklet for job " + stepExecution.getJobExecutionId() +
" did not return a task-execution-id. Check to see if task exists.");
- String schemaTarget = stepExecution.getExecutionContext().getString("schema-target");
- String taskName = stepExecution.getExecutionContext().getString("task-name");
- Assert.notNull(taskName, "TaskLauncherTasklet for job " + stepExecution.getJobExecutionId() +
- " did not return a task-name. Check to see if task exists.");
- String explorerName = taskName;
- if (!this.taskExplorerContainer.getKeys().contains(taskName)) {
- Assert.notNull(schemaTarget, "TaskLauncherTasklet for job " + stepExecution.getJobExecutionId() +
- " did not return a schema-target. Check to see if task exists.");
- explorerName = schemaTarget;
- }
- logger.info("AfterStep for {}:{}:{}:{}:{}", stepExecution.getStepName(), stepExecution.getJobExecutionId(), taskName, executionId, schemaTarget);
- TaskExplorer taskExplorer = this.taskExplorerContainer.get(explorerName);
TaskExecution resultExecution = taskExplorer.getTaskExecution(executionId);
if (!stepExecution.getExecutionContext().containsKey(TaskLauncherTasklet.IGNORE_EXIT_MESSAGE) &&
StringUtils.hasText(resultExecution.getExitMessage())) {
@@ -85,16 +72,7 @@ public ExitStatus afterStep(StepExecution stepExecution) {
} else if (resultExecution.getExitCode() != 0) {
result = ExitStatus.FAILED;
}
- logger.info("AfterStep processing complete for stepExecution {} with taskExecution {}:{}:{}:{}", stepExecution.getStepName(), stepExecution.getJobExecutionId(), taskName, executionId, schemaTarget);
+ logger.info("AfterStep processing complete for stepExecution {} with taskExecution {}:{}", stepExecution.getStepName(), stepExecution.getJobExecutionId());
return result;
}
-
- @Override
- public void beforeStep(StepExecution stepExecution) {
- logger.info("beforeStep:{}:{}>>>>", stepExecution.getStepName(), stepExecution.getJobExecutionId());
- super.beforeStep(stepExecution);
- logger.debug("beforeStep:{}", stepExecution.getExecutionContext());
- logger.info("beforeStep:{}:{}<<<", stepExecution.getStepName(), stepExecution.getJobExecutionId());
-
- }
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java
new file mode 100644
index 0000000000..99c544cf2b
--- /dev/null
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/JobRepositoryBeanPostProcessor.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.dataflow.composedtaskrunner;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
+import org.springframework.cloud.dataflow.composedtaskrunner.support.ComposedTaskException;
+import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
+import org.springframework.core.Ordered;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * CTR requires that the JobRepository that it uses to have its own {@link MultiSchemaIncrementerFactory}.
+ * As of Batch 5.x DefaultBatchConfiguration is now used to override default beans, however this disables
+ * BatchAutoConfiguration. To work around this we use a bean post processor to create our own {@link JobRepository}.
+ *
+ * @author Glenn Renfro
+ */
+public class JobRepositoryBeanPostProcessor implements BeanPostProcessor, Ordered {
+ private static final Logger logger = LoggerFactory.getLogger(JobRepositoryBeanPostProcessor.class);
+
+ private PlatformTransactionManager transactionManager;
+ private DataSource incrementerDataSource;
+ private ComposedTaskProperties composedTaskProperties;
+
+ public JobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager, DataSource incrementerDataSource,
+ ComposedTaskProperties composedTaskProperties) {
+ this.transactionManager = transactionManager;
+ this.incrementerDataSource = incrementerDataSource;
+ this.composedTaskProperties = composedTaskProperties;
+ }
+
+ @Override
+ public int getOrder() {
+ return Ordered.HIGHEST_PRECEDENCE;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (beanName.equals("jobRepository")) {
+ logger.debug("Replacing BatchAutoConfiguration's jobRepository Bean with one provided by composed task runner.");
+ bean = jobRepository(transactionManager, incrementerDataSource, composedTaskProperties);
+ }
+ return bean;
+ }
+
+ private JobRepository jobRepository(PlatformTransactionManager transactionManager, DataSource incrementerDataSource,
+ ComposedTaskProperties composedTaskProperties) {
+ JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
+ MultiSchemaIncrementerFactory incrementerFactory = new MultiSchemaIncrementerFactory(incrementerDataSource);
+ factory.setIncrementerFactory(incrementerFactory);
+ factory.setDataSource(incrementerDataSource);
+ factory.setTransactionManager(transactionManager);
+ factory.setIsolationLevelForCreate(composedTaskProperties.getTransactionIsolationLevel());
+ try {
+ factory.afterPropertiesSet();
+ return factory.getObject();
+ }
+ catch (Exception exception) {
+ throw new ComposedTaskException(exception.getMessage());
+ }
+ }
+}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskExplorerContainer.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskExplorerContainer.java
deleted file mode 100644
index 4cd95b1727..0000000000
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskExplorerContainer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2023 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.dataflow.composedtaskrunner;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
-import org.springframework.cloud.task.repository.TaskExplorer;
-
-/**
- * A container for the TaskExplorers for each Task by name.
- * @author Corneil du Plessis
- */
-public class TaskExplorerContainer {
- private final static Logger logger = LoggerFactory.getLogger(TaskExplorerContainer.class);
-
- private final Map taskExplorers;
-
- private final TaskExplorer defaultTaskExplorer;
-
- public TaskExplorerContainer(Map taskExplorers, TaskExplorer defaultTaskExplorer) {
- this.taskExplorers = taskExplorers;
- this.defaultTaskExplorer = defaultTaskExplorer;
-
- }
-
- public TaskExplorer get(String name) {
- TaskExplorer result = taskExplorers.get(name);
- if (result == null) {
- result = taskExplorers.get(SchemaVersionTarget.defaultTarget().getName());
- }
- if(result == null) {
- logger.warn("Cannot find TaskExplorer for {}. Using default", name);
- result = defaultTaskExplorer;
- }
- return result;
- }
- public Set getKeys() {
- return taskExplorers.keySet();
- }
-}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTasklet.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTasklet.java
index 7a8696b511..91fa2480fe 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTasklet.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTasklet.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2023 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,7 +46,6 @@
import org.springframework.cloud.dataflow.rest.resource.LaunchResponseResource;
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
import org.springframework.cloud.dataflow.rest.util.HttpClientConfigurer;
-import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
@@ -89,8 +88,6 @@ public class TaskLauncherTasklet implements Tasklet {
private Long executionId;
- private final String ctrSchemaTarget;
-
private long startTimeout;
private long timeout;
@@ -134,7 +131,6 @@ public TaskLauncherTasklet(
this.taskProperties = taskProperties;
this.clientRegistrations = clientRegistrations;
this.clientCredentialsTokenResponseClient = clientCredentialsTokenResponseClient;
- this.ctrSchemaTarget = environment.getProperty("spring.cloud.task.schemaTarget");
}
public void setProperties(Map properties) {
@@ -203,9 +199,6 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
Long parentTaskExecutionId = getParentTaskExecutionId(contribution);
if (parentTaskExecutionId != null) {
args.add("--spring.cloud.task.parent-execution-id=" + parentTaskExecutionId);
- String parentSchemaTarget = StringUtils.hasText(ctrSchemaTarget) ? ctrSchemaTarget : SchemaVersionTarget.defaultTarget().getName();
- args.add("--spring.cloud.task.parent-schema-target=" + parentSchemaTarget);
-
} else {
logger.error("Cannot find task execution id");
}
@@ -219,7 +212,6 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
this.executionId = response.getExecutionId();
stepExecutionContext.put("task-execution-id", response.getExecutionId());
- stepExecutionContext.put("schema-target", response.getSchemaTarget());
stepExecutionContext.put("task-name", tmpTaskName);
if (!args.isEmpty()) {
diff --git a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/support/UnexpectedTaskExecutionException.java b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/support/UnexpectedTaskExecutionException.java
index 4f0c54339c..6056ae7f52 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/support/UnexpectedTaskExecutionException.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/support/UnexpectedTaskExecutionException.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2023 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package org.springframework.cloud.dataflow.composedtaskrunner.support;
-import java.util.Date;
+import java.time.LocalDateTime;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.boot.ExitCodeGenerator;
@@ -55,12 +55,12 @@ public class UnexpectedTaskExecutionException extends UnexpectedJobExecutionExce
/**
* Time of when the task was started.
*/
- private Date startTime;
+ private LocalDateTime startTime;
/**
* Timestamp of when the task was completed/terminated.
*/
- private Date endTime;
+ private LocalDateTime endTime;
/**
* Message returned from the task or stacktrace.
@@ -160,12 +160,12 @@ public String getTaskName() {
return this.taskName;
}
- public Date getStartTime() {
- return (this.startTime != null) ? (Date) this.startTime.clone() : null;
+ public LocalDateTime getStartTime() {
+ return (this.startTime != null) ? this.startTime: null;
}
- public Date getEndTime() {
- return (this.endTime != null) ? (Date) this.endTime.clone() : null;
+ public LocalDateTime getEndTime() {
+ return (this.endTime != null) ? this.endTime : null;
}
public String getExitMessage() {
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerVisitorTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerVisitorTests.java
index 60e644d6d2..f5be909bfc 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerVisitorTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerVisitorTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2020 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.cloud.dataflow.composedtaskrunner;
+import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -38,6 +39,7 @@
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.BeanCreationException;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
@@ -46,6 +48,10 @@
import org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration;
import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.support.JdbcTransactionManager;
+import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -379,8 +385,10 @@ private void setupContextForGraph(String graph, String... args) {
setupContextForGraph(argsForCtx.toArray(new String[0]));
}
- private void setupContextForGraph(String[] args) {
- this.applicationContext = SpringApplication.run(new Class[]{ComposedRunnerVisitorConfiguration.class,
+ private void setupContextForGraph(String[] args) throws RuntimeException{
+ this.applicationContext = SpringApplication.
+ run(new Class[]{ ComposedRunnerVisitorTestsConfiguration.class,
+ ComposedRunnerVisitorConfiguration.class,
PropertyPlaceholderAutoConfiguration.class,
EmbeddedDataSourceConfiguration.class,
BatchAutoConfiguration.class,
@@ -403,7 +411,7 @@ private Collection getStepExecutions(boolean isCTR) {
if(isCTR) {
assertThat(jobExecution.getJobParameters().getParameters().get("ctr.id")).isNotNull();
} else {
- assertThat(jobExecution.getJobParameters().getParameters().get("run.id")).isEqualTo(new JobParameter(1L));
+ assertThat(jobExecution.getJobParameters().getParameters().get("run.id")).isEqualTo(new JobParameter(1L, Long.class));
}
return jobExecution.getStepExecutions();
}
@@ -419,4 +427,14 @@ private void verifyExceptionThrown(String message, String graph) {
assertThat(exception.getCause().getCause().getMessage()).isEqualTo(message);
}
+ @Configuration
+ public static class ComposedRunnerVisitorTestsConfiguration {
+ @Autowired
+ DataSource dataSource;
+ @Bean
+ public PlatformTransactionManager transactionManager() {
+ return new JdbcTransactionManager(dataSource);
+ }
+ }
+
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationNoPropertiesTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationNoPropertiesTests.java
index 09d8d2b0af..e4a6160ee2 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationNoPropertiesTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationNoPropertiesTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2022 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -86,7 +86,7 @@ public void testComposedConfiguration() throws Exception {
verify(taskOperations).launch(
"AAA",
Collections.emptyMap(),
- Arrays.asList("--spring.cloud.task.parent-execution-id=1", "--spring.cloud.task.parent-schema-target=boot2")
+ Arrays.asList("--spring.cloud.task.parent-execution-id=1")
);
}
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationWithPropertiesTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationWithPropertiesTests.java
index 7a78b0aa0f..b06976cc8a 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationWithPropertiesTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerConfigurationWithPropertiesTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2022 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -111,7 +111,6 @@ public void testComposedConfiguration() throws Exception {
List args = new ArrayList<>(2);
args.add("--baz=boo --foo=bar");
args.add("--spring.cloud.task.parent-execution-id=1");
- args.add("--spring.cloud.task.parent-schema-target=boot2");
Assert.notNull(job.getJobParametersIncrementer(), "JobParametersIncrementer must not be null.");
verify(taskOperations).launch("ComposedTest-AAA", props, args);
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactoryTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactoryTests.java
index 3218544f3c..40ffe4ff19 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactoryTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactoryTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2021 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,26 +18,32 @@
import javax.sql.DataSource;
-import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
-import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.common.security.CommonSecurityAutoConfiguration;
+import org.springframework.cloud.dataflow.composedtaskrunner.configuration.DataFlowTestConfiguration;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
import org.springframework.cloud.dataflow.rest.client.TaskOperations;
import org.springframework.cloud.task.configuration.TaskConfigurer;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.repository.TaskExplorer;
+import org.springframework.cloud.task.repository.TaskNameResolver;
import org.springframework.cloud.task.repository.TaskRepository;
+import org.springframework.cloud.task.repository.support.SimpleTaskNameResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
@@ -49,7 +55,12 @@
* @author Corneil du Plessis
*/
@ExtendWith(SpringExtension.class)
-@ContextConfiguration(classes = {org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskRunnerStepFactoryTests.StepFactoryConfiguration.class})
+@ContextConfiguration(classes={EmbeddedDataSourceConfiguration.class,
+ DataFlowTestConfiguration.class,StepBeanDefinitionRegistrar.class,
+ ComposedTaskRunnerConfiguration.class,
+ StepBeanDefinitionRegistrar.class})
+@EnableAutoConfiguration(exclude = { CommonSecurityAutoConfiguration.class})
+@TestPropertySource(properties = {"graph=FOOBAR","max-wait-time=1000", "increment-instance-enabled=true", "spring.cloud.task.name=footest"})
public class ComposedTaskRunnerStepFactoryTests {
@Autowired
@@ -59,7 +70,7 @@ public class ComposedTaskRunnerStepFactoryTests {
public void testStep() throws Exception {
Step step = stepFactory.getObject();
assertThat(step).isNotNull();
- assertThat(step.getName()).isEqualTo("FOOBAR");
+ assertThat(step.getName()).isEqualTo("FOOBAR_0");
assertThat(step.getStartLimit()).isEqualTo(Integer.MAX_VALUE);
}
@@ -72,12 +83,6 @@ public static class StepFactoryConfiguration {
@MockBean
public TaskOperations taskOperations;
- @Bean
- public TaskExplorerContainer taskExplorerContainer() {
- TaskExplorer taskExplorer = mock(TaskExplorer.class);
- return new TaskExplorerContainer(Collections.emptyMap(), taskExplorer);
- }
-
@Bean
public ComposedTaskProperties composedTaskProperties() {
return new ComposedTaskProperties();
@@ -89,8 +94,8 @@ public TaskProperties taskProperties() {
}
@Bean
- public StepBuilderFactory steps() {
- return new StepBuilderFactory(mock(JobRepository.class), mock(PlatformTransactionManager.class));
+ public StepBuilder steps() {
+ return new StepBuilder("foo", mock(JobRepository.class));
}
@Bean
@@ -115,12 +120,12 @@ public TaskExplorer getTaskExplorer() {
public DataSource getTaskDataSource() {
return mock(DataSource.class);
}
- };
- }
- @Bean
- public ComposedTaskRunnerStepFactory stepFactory(TaskProperties taskProperties) {
- return new ComposedTaskRunnerStepFactory(new ComposedTaskProperties(), "FOOBAR", "BAR");
+ @Override
+ public TaskNameResolver getTaskNameResolver() {
+ return new SimpleTaskNameResolver();
+ }
+ };
}
}
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListenerTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListenerTests.java
index 08f31756a0..9c52e97030 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListenerTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskStepExecutionListenerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2020 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,7 @@
package org.springframework.cloud.dataflow.composedtaskrunner;
-import java.util.Collections;
-import java.util.Date;
+import java.time.LocalDateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -25,10 +24,8 @@
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
-import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
-import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -40,7 +37,6 @@
*/
public class ComposedTaskStepExecutionListenerTests {
- private TaskExplorerContainer taskExplorerContainer;
private TaskExplorer taskExplorer;
private StepExecution stepExecution;
@@ -50,16 +46,15 @@ public class ComposedTaskStepExecutionListenerTests {
@BeforeEach
public void setup() {
this.taskExplorer = mock(TaskExplorer.class);
- this.taskExplorerContainer = new TaskExplorerContainer(Collections.emptyMap(), taskExplorer);
this.stepExecution = getStepExecution();
- this.taskListener = new ComposedTaskStepExecutionListener(this.taskExplorerContainer);
+ this.taskListener = new ComposedTaskStepExecutionListener(taskExplorer);
}
@Test
public void testSuccessfulRun() {
TaskExecution taskExecution = getDefaultTaskExecution(0, null);
when(this.taskExplorer.getTaskExecution(anyLong())).thenReturn(taskExecution);
- populateExecutionContext(taskExecution.getTaskName(),111L, SchemaVersionTarget.defaultTarget().getName());
+ populateExecutionContext(taskExecution.getTaskName(),111L);
assertThat(this.taskListener.afterStep(this.stepExecution)).isEqualTo(ExitStatus.COMPLETED);
}
@@ -69,7 +64,7 @@ public void testExitMessageRunSuccess() {
TaskExecution taskExecution = getDefaultTaskExecution(0,
expectedTaskStatus.getExitCode());
when(this.taskExplorer.getTaskExecution(anyLong())).thenReturn(taskExecution);
- populateExecutionContext(taskExecution.getTaskName(), 111L, SchemaVersionTarget.defaultTarget().getName());
+ populateExecutionContext(taskExecution.getTaskName(), 111L);
assertThat(this.taskListener.afterStep(this.stepExecution)).isEqualTo(expectedTaskStatus);
}
@@ -80,7 +75,7 @@ public void testExitMessageRunFail() {
TaskExecution taskExecution = getDefaultTaskExecution(1,
expectedTaskStatus.getExitCode());
when(this.taskExplorer.getTaskExecution(anyLong())).thenReturn(taskExecution);
- populateExecutionContext(taskExecution.getTaskName(), 111L, SchemaVersionTarget.defaultTarget().getName());
+ populateExecutionContext(taskExecution.getTaskName(), 111L);
assertThat(this.taskListener.afterStep(this.stepExecution)).isEqualTo(expectedTaskStatus);
}
@@ -89,7 +84,7 @@ public void testExitMessageRunFail() {
public void testFailedRun() {
TaskExecution taskExecution = getDefaultTaskExecution(1, null);
when(this.taskExplorer.getTaskExecution(anyLong())).thenReturn(taskExecution);
- populateExecutionContext(taskExecution.getTaskName(), 111L, SchemaVersionTarget.defaultTarget().getName());
+ populateExecutionContext(taskExecution.getTaskName(), 111L);
assertThat(this.taskListener.afterStep(this.stepExecution)).isEqualTo(ExitStatus.FAILED);
}
@@ -110,10 +105,9 @@ private StepExecution getStepExecution() {
return new StepExecution(STEP_NAME, jobExecution);
}
- private void populateExecutionContext(String taskName, Long taskExecutionId, String schemaTarget) {
+ private void populateExecutionContext(String taskName, Long taskExecutionId) {
this.stepExecution.getExecutionContext().put("task-name", taskName);
this.stepExecution.getExecutionContext().put("task-execution-id", taskExecutionId);
- this.stepExecution.getExecutionContext().put("schema-target", schemaTarget);
}
private TaskExecution getDefaultTaskExecution (int exitCode,
@@ -122,7 +116,7 @@ private TaskExecution getDefaultTaskExecution (int exitCode,
taskExecution.setTaskName("test-ctr");
taskExecution.setExitMessage(exitMessage);
taskExecution.setExitCode(exitCode);
- taskExecution.setEndTime(new Date());
+ taskExecution.setEndTime(LocalDateTime.now());
return taskExecution;
}
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTaskletTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTaskletTests.java
index 82f56d6497..ea143c4c58 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTaskletTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/TaskLauncherTaskletTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2022 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,9 @@
package org.springframework.cloud.dataflow.composedtaskrunner;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.List;
import javax.sql.DataSource;
@@ -40,7 +40,6 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
-import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.item.ExecutionContext;
@@ -52,13 +51,11 @@
import org.springframework.cloud.dataflow.composedtaskrunner.support.ComposedTaskException;
import org.springframework.cloud.dataflow.composedtaskrunner.support.TaskExecutionTimeoutException;
import org.springframework.cloud.dataflow.composedtaskrunner.support.UnexpectedTaskExecutionException;
-import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean;
import org.springframework.cloud.dataflow.rest.client.DataFlowClientException;
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.TaskOperations;
import org.springframework.cloud.dataflow.rest.resource.LaunchResponseResource;
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
-import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.task.batch.listener.support.JdbcTaskBatchDao;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.repository.TaskExecution;
@@ -138,7 +135,7 @@ public void setup() throws Exception{
this.taskRepositoryInitializer.afterPropertiesSet();
this.taskOperations = mock(TaskOperations.class);
TaskExecutionDaoFactoryBean taskExecutionDaoFactoryBean =
- new MultiSchemaTaskExecutionDaoFactoryBean(this.dataSource, "TASK_");
+ new TaskExecutionDaoFactoryBean(this.dataSource);
this.taskRepository = new SimpleTaskRepository(taskExecutionDaoFactoryBean);
this.taskExplorer = new SimpleTaskExplorer(taskExecutionDaoFactoryBean);
this.composedTaskProperties.setIntervalTimeBetweenChecks(500);
@@ -156,9 +153,6 @@ public void testTaskLauncherTasklet() {
assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(1L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
mockReturnValForTaskExecution(2L);
chunkContext = chunkContext();
@@ -168,9 +162,6 @@ public void testTaskLauncherTasklet() {
assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(2L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
}
@Test
@@ -209,9 +200,6 @@ public void testTaskLauncherTaskletWithTaskExecutionId() {
assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(2L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
assertThat(((List>) chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments")).get(0)).isEqualTo("--spring.cloud.task.parent-execution-id=88");
@@ -235,7 +223,6 @@ public void testTaskLauncherTaskletWithoutTaskExecutionId() {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
logger.info("execution-context:{}", executionContext.entrySet());
assertThat(executionContext.get("task-execution-id")).isEqualTo(2L);
- assertThat(executionContext.get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
assertThat(executionContext.get("task-arguments")).as("task-arguments not null").isNotNull();
assertThat(((List>) executionContext.get("task-arguments")).get(0)).isEqualTo("--spring.cloud.task.parent-execution-id=1");
}
@@ -261,7 +248,6 @@ public void testTaskLauncherTaskletWithTaskExecutionIdWithPreviousParentID() {
executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
taskArguments = (List) executionContext.get("task-arguments");
assertThat(executionContext.get("task-execution-id")).isEqualTo(2L);
- assertThat(executionContext.get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
assertThat(((List>) taskArguments).get(0)).isEqualTo("--spring.cloud.task.parent-execution-id=88");
}
@@ -402,9 +388,6 @@ public void testTaskLauncherTaskletIgnoreExitMessage() {
Assertions.assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(1L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
Assertions.assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.containsKey(TaskLauncherTasklet.IGNORE_EXIT_MESSAGE)).isTrue();
@@ -424,9 +407,6 @@ public void testTaskLauncherTaskletIgnoreExitMessageViaProperties() {
Assertions.assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(1L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
Assertions.assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.containsKey(TaskLauncherTasklet.IGNORE_EXIT_MESSAGE)).isTrue();
@@ -447,9 +427,6 @@ public void testTaskLauncherTaskletIgnoreExitMessageViaCommandLineOverride() {
Assertions.assertThat(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id")).isEqualTo(1L);
- assertThat(chunkContext.getStepContext()
- .getStepExecution().getExecutionContext()
- .get("schema-target")).isEqualTo(SchemaVersionTarget.defaultTarget().getName());
boolean value = chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.containsKey(TaskLauncherTasklet.IGNORE_EXIT_MESSAGE);
@@ -482,7 +459,7 @@ public void testTaskOperationsConfiguredWithMissingUsername() {
private void createCompleteTaskExecution(int exitCode, String... message) {
TaskExecution taskExecution = this.taskRepository.createTaskExecution();
this.taskRepository.completeTaskExecution(taskExecution.getExecutionId(),
- exitCode, new Date(), message != null && message.length > 0 ? message[0] : "");
+ exitCode, LocalDateTime.now(), message != null && message.length > 0 ? message[0] : "");
}
private void createAndStartCompleteTaskExecution(int exitCode, JobExecution jobExecution) {
@@ -490,12 +467,13 @@ private void createAndStartCompleteTaskExecution(int exitCode, JobExecution jobE
JdbcTaskBatchDao taskBatchDao = new JdbcTaskBatchDao(this.dataSource);
taskBatchDao.saveRelationship(taskExecution, jobExecution);
this.taskRepository.completeTaskExecution(taskExecution.getExecutionId(),
- exitCode, new Date(), "");
+ exitCode, LocalDateTime.now(), "");
}
private TaskExecution getCompleteTaskExecutionWithNull() {
TaskExecution taskExecution = this.taskRepository.createTaskExecution();
- taskExecutionDao.completeTaskExecution(taskExecution.getExecutionId(), null, new Date(), "hello", "goodbye");
+ taskExecutionDao.completeTaskExecution(taskExecution.getExecutionId(), null, LocalDateTime.now(),
+ "hello", "goodbye");
return taskExecution;
}
@@ -521,11 +499,9 @@ private ChunkContext chunkContext ()
StepContext stepContext = new StepContext(stepExecution);
return new ChunkContext(stepContext);
}
+
private void mockReturnValForTaskExecution(long executionId) {
- mockReturnValForTaskExecution(executionId, SchemaVersionTarget.defaultTarget().getName());
- }
- private void mockReturnValForTaskExecution(long executionId, String schemaTarget) {
- Mockito.doReturn(new LaunchResponseResource(executionId, schemaTarget))
+ Mockito.doReturn(new LaunchResponseResource(executionId))
.when(this.taskOperations)
.launch(ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
@@ -533,7 +509,6 @@ private void mockReturnValForTaskExecution(long executionId, String schemaTarget
}
@Configuration
- @EnableBatchProcessing
@EnableConfigurationProperties(ComposedTaskProperties.class)
public static class TestConfiguration {
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/configuration/ComposedRunnerVisitorConfiguration.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/configuration/ComposedRunnerVisitorConfiguration.java
index 1126386af0..3b8ffff91f 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/configuration/ComposedRunnerVisitorConfiguration.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/configuration/ComposedRunnerVisitorConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2020 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,9 +21,9 @@
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
-import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
-import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,6 +35,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
@@ -44,12 +45,14 @@
* @author Ilayaperumal Gopinathan
*/
@Configuration
-@EnableBatchProcessing
@EnableConfigurationProperties(ComposedTaskProperties.class)
public class ComposedRunnerVisitorConfiguration {
@Autowired
- private StepBuilderFactory steps;
+ private JobRepository jobRepository;
+
+ @Autowired
+ private PlatformTransactionManager transactionManager;
@Autowired
private ComposedTaskProperties composedTaskProperties;
@@ -173,26 +176,28 @@ public ExitStatus afterStep(StepExecution stepExecution) {
private Step createTaskletStepWithListener(final String taskName,
StepExecutionListener stepExecutionListener) {
- return this.steps.get(taskName)
+ StepBuilder stepBuilder = new StepBuilder(taskName, jobRepository);
+ return stepBuilder
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
- })
+ }, this.transactionManager)
.transactionAttribute(getTransactionAttribute())
.listener(stepExecutionListener)
.build();
}
private Step createTaskletStep(final String taskName) {
- return this.steps.get(taskName)
+ StepBuilder stepBuilder = new StepBuilder(taskName, jobRepository);
+ return stepBuilder
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
- })
+ }, transactionManager)
.transactionAttribute(getTransactionAttribute())
.build();
}
diff --git a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/properties/ComposedTaskPropertiesTests.java b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/properties/ComposedTaskPropertiesTests.java
index af57f49b8b..a8312ef81b 100644
--- a/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/properties/ComposedTaskPropertiesTests.java
+++ b/spring-cloud-dataflow-composed-task-runner/src/test/java/org/springframework/cloud/dataflow/composedtaskrunner/properties/ComposedTaskPropertiesTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2021 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -127,12 +127,12 @@ public void testComposedTaskAppArguments() {
@Test
public void testAssignmentOfOauth2ClientCredentialsClientAuthenticationMethod(){
this.contextRunner
- .withSystemProperties("OAUTH2_CLIENT_CREDENTIALS_CLIENT_AUTHENTICATION_METHOD=POST")
+ .withSystemProperties("OAUTH2_CLIENT_CREDENTIALS_CLIENT_AUTHENTICATION_METHOD=client_secret_post")
.withUserConfiguration(Config1.class).run((context) -> {
ComposedTaskProperties properties = context.getBean(ComposedTaskProperties.class);
assertThat(properties.getOauth2ClientCredentialsClientAuthenticationMethod())
.withFailMessage("The OAuth2 client credentials client authentication method couldn't be assigned correctly.")
- .isEqualTo(ClientAuthenticationMethod.POST);
+ .isEqualTo(ClientAuthenticationMethod.CLIENT_SECRET_POST);
});
}
diff --git a/spring-cloud-dataflow-single-step-batch-job/src/main/java/org/springframework/cloud/dataflow/singlestepbatchjob/SingleStepBatchJobApplication.java b/spring-cloud-dataflow-single-step-batch-job/src/main/java/org/springframework/cloud/dataflow/singlestepbatchjob/SingleStepBatchJobApplication.java
index d06ebe247d..7abb2586c0 100644
--- a/spring-cloud-dataflow-single-step-batch-job/src/main/java/org/springframework/cloud/dataflow/singlestepbatchjob/SingleStepBatchJobApplication.java
+++ b/spring-cloud-dataflow-single-step-batch-job/src/main/java/org/springframework/cloud/dataflow/singlestepbatchjob/SingleStepBatchJobApplication.java
@@ -16,7 +16,6 @@
package org.springframework.cloud.dataflow.singlestepbatchjob;
-import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;