Skip to content

Commit

Permalink
Migrate CTR to Boot 3.x and Batch 5.x (spring-cloud#5839)
Browse files Browse the repository at this point in the history
* Migrate CTR to Boot 3.3 and Batch 5

* Remove the Batch Configurer and replace with a Configuration
* Update Tests so that they will work with Boot3
* Removed EnableBatchAutoConfiguration no longer needed if using BatchAutoConfiguration
* Removed schema requirements

* Re-enable CTR Module build in main pom.xml

* Add ability for composed task runner to use the proper JobRepository and TaskExecutor

* BeanPostProcessor has been added so that CTR can use its jobRepository vs. the one provided by BatchAutoConfiguration
  • Loading branch information
cppwfs authored Aug 1, 2024
1 parent ff19e23 commit e1c8485
Show file tree
Hide file tree
Showing 19 changed files with 219 additions and 377 deletions.
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@
<module>spring-cloud-dataflow-server</module>
<module>spring-cloud-dataflow-tasklauncher</module>
<module>spring-cloud-dataflow-single-step-batch-job</module>
<!-- TODO: Boot3x followup -->
<!-- <module>spring-cloud-dataflow-composed-task-runner</module>-->
<module>spring-cloud-dataflow-composed-task-runner</module>
<module>spring-cloud-dataflow-test</module>
<module>spring-cloud-dataflow-dependencies</module>
<module>spring-cloud-dataflow-classic-docs</module>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,11 +27,12 @@
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.FlowJobBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ComposedRunnerJobFactory implements FactoryBean<Job> {
private TaskExecutor taskExecutor;

@Autowired
private JobBuilderFactory jobBuilderFactory;
private JobRepository jobRepository;

@Autowired
private TaskNameResolver taskNameResolver;
Expand Down Expand Up @@ -105,9 +106,8 @@ public Job getObject() throws Exception {
taskParser.parse().accept(composedRunnerVisitor);

this.visitorDeque = composedRunnerVisitor.getFlow();

FlowJobBuilder builder = this.jobBuilderFactory
.get(this.taskNameResolver.getTaskName())
JobBuilder jobBuilder = new JobBuilder(this.taskNameResolver.getTaskName(), jobRepository);
FlowJobBuilder builder = jobBuilder
.start(this.flowBuilder
.start(createFlow())
.end())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,103 +17,42 @@
package org.springframework.cloud.dataflow.composedtaskrunner;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.autoconfigure.batch.BatchProperties;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean;
import org.springframework.cloud.dataflow.core.dsl.TaskParser;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.listener.TaskExecutionListener;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.support.SimpleTaskExplorer;
import org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.StringUtils;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Configures the Job that will execute the Composed Task Execution.
*
* @author Glenn Renfro
* @author Corneil du Plessis
*/
@EnableBatchProcessing
@EnableTask
@EnableConfigurationProperties(ComposedTaskProperties.class)
@Configuration
@Import(org.springframework.cloud.dataflow.composedtaskrunner.StepBeanDefinitionRegistrar.class)
public class ComposedTaskRunnerConfiguration {
private final static Logger logger = LoggerFactory.getLogger(ComposedTaskRunnerConfiguration.class);

@Bean
public TaskExecutionListener taskExecutionListener() {
return new ComposedTaskRunnerTaskListener();
}

@Bean
public StepExecutionListener composedTaskStepExecutionListener(TaskExplorerContainer taskExplorerContainer) {
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorerContainer);
}

@Bean
TaskExplorerContainer taskExplorerContainer(TaskExplorer taskExplorer, DataSource dataSource, ComposedTaskProperties properties, Environment env) {
Map<String, TaskExplorer> explorers = new HashMap<>();
String ctrName = env.getProperty("spring.cloud.task.name");
if (!StringUtils.hasText(ctrName)) {
throw new IllegalStateException("spring.cloud.task.name property must have a value.");
}
TaskParser parser = new TaskParser("ctr", properties.getGraph(), false, true);
StepBeanDefinitionRegistrar.TaskAppsMapCollector collector = new StepBeanDefinitionRegistrar.TaskAppsMapCollector();
parser.parse().accept(collector);
Set<String> taskNames = collector.getTaskApps().keySet();
logger.debug("taskExplorerContainer:taskNames:{}", taskNames);
for (String taskName : taskNames) {
addTaskExplorer(dataSource, properties, env, explorers, taskName);
String appName = taskName.replace(ctrName + "-", "");
addTaskExplorer(dataSource, properties, env, explorers, appName);
if(taskName.length() > ctrName.length()) {
String shortTaskName = taskName.substring(ctrName.length() + 1);
addTaskExplorer(dataSource, properties, env, explorers, shortTaskName);
}
}
return new TaskExplorerContainer(explorers, taskExplorer);
}

private static void addTaskExplorer(
DataSource dataSource,
ComposedTaskProperties properties,
Environment env,
Map<String, TaskExplorer> explorers,
String taskName
) {
logger.debug("addTaskExplorer:{}", taskName);
String propertyName = String.format("app.%s.spring.cloud.task.tablePrefix", taskName);
String prefix = properties.getComposedTaskAppProperties().get(propertyName);
if (prefix == null) {
prefix = env.getProperty(propertyName);
}
if (prefix != null) {
TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
} else {
logger.warn("Cannot find {} in {} ", propertyName, properties.getComposedTaskAppProperties());
}
public StepExecutionListener composedTaskStepExecutionListener(TaskExplorer taskExplorer) {
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorer);
}

@Bean
Expand All @@ -128,25 +67,21 @@ public TaskExecutor taskExecutor(ComposedTaskProperties properties) {
taskExecutor.setMaxPoolSize(properties.getSplitThreadMaxPoolSize());
taskExecutor.setKeepAliveSeconds(properties.getSplitThreadKeepAliveSeconds());
taskExecutor.setAllowCoreThreadTimeOut(
properties.isSplitThreadAllowCoreThreadTimeout());
properties.isSplitThreadAllowCoreThreadTimeout());
taskExecutor.setQueueCapacity(properties.getSplitThreadQueueCapacity());
taskExecutor.setWaitForTasksToCompleteOnShutdown(
properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
return taskExecutor;
}

/**
* Provides the {@link JobRepository} that is configured to be used by the composed task runner.
*/
@Bean
public BatchConfigurer getComposedBatchConfigurer(
BatchProperties properties,
DataSource dataSource,
TransactionManagerCustomizers transactionManagerCustomizers,
ComposedTaskProperties composedTaskProperties
) {
return new ComposedBatchConfigurer(
properties,
dataSource,
transactionManagerCustomizers,
composedTaskProperties
);
public BeanPostProcessor jobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager,
DataSource incrementerDataSource,
ComposedTaskProperties composedTaskProperties) {
return new JobRepositoryBeanPostProcessor(transactionManager, incrementerDataSource, composedTaskProperties);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,18 +31,21 @@

import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
import org.springframework.cloud.dataflow.core.Base64Utils;
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.core.env.Environment;
import org.springframework.hateoas.mediatype.hal.Jackson2HalModule;
import org.springframework.security.oauth2.client.endpoint.OAuth2AccessTokenResponseClient;
import org.springframework.security.oauth2.client.endpoint.OAuth2ClientCredentialsGrantRequest;
import org.springframework.security.oauth2.client.registration.ClientRegistrationRepository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
Expand Down Expand Up @@ -74,13 +77,16 @@ public class ComposedTaskRunnerStepFactory implements FactoryBean<Step> {
private List<String> arguments = new ArrayList<>();

@Autowired
private StepBuilderFactory steps;
private JobRepository jobRepository;

@Autowired
private PlatformTransactionManager transactionManager;

@Autowired
private StepExecutionListener composedTaskStepExecutionListener;

@Autowired
private TaskExplorerContainer taskExplorerContainer;
private TaskExplorer taskExplorer;

@Autowired
private TaskProperties taskProperties;
Expand Down Expand Up @@ -133,7 +139,7 @@ public Step getObject() {
TaskLauncherTasklet taskLauncherTasklet = new TaskLauncherTasklet(
this.clientRegistrations,
this.clientCredentialsTokenResponseClient,
this.taskExplorerContainer.get(this.taskNameId),
this.taskExplorer,
this.composedTaskPropertiesFromEnv,
this.taskName,
taskProperties,
Expand Down Expand Up @@ -168,9 +174,9 @@ public Step getObject() {

taskLauncherTasklet.setProperties(propertiesToUse);
logger.debug("Properties to use {}", propertiesToUse);

return this.steps.get(this.taskName)
.tasklet(taskLauncherTasklet)
StepBuilder stepBuilder = new StepBuilder(this.taskName, this.jobRepository);
return stepBuilder
.tasklet(taskLauncherTasklet, this.transactionManager)
.transactionAttribute(getTransactionAttribute())
.listener(this.composedTaskStepExecutionListener)
.build();
Expand Down
Loading

0 comments on commit e1c8485

Please sign in to comment.