Skip to content

Commit

Permalink
Replace job execution and task batch containers with Task implementat…
Browse files Browse the repository at this point in the history
…ions

Replace JobExecutionDaoContainer with JdbcSearchableJobExecutionDao.
Replace TaskBatchDaoContainer with TaskBatchDao

Replace JdbcTaskExecutionDao with the Dao from task.

Add requested changes based on code review
  • Loading branch information
cppwfs committed Feb 28, 2024
1 parent 2f05ded commit c1f4178
Show file tree
Hide file tree
Showing 25 changed files with 207 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDao;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
Expand Down Expand Up @@ -84,9 +82,9 @@ public class JobExecutionsDocumentation extends BaseDocumentation {

private JobRepository jobRepository;

private TaskExecutionDaoContainer daoContainer;
private TaskExecutionDao taskExecutionDao;

private TaskBatchDaoContainer taskBatchDaoContainer;
private TaskBatchDao taskBatchDao;

private JdbcTemplate jdbcTemplate;

Expand Down Expand Up @@ -367,8 +365,8 @@ public void jobRestart() throws Exception {
}

private void initialize() {
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
this.taskExecutionDao = context.getBean(TaskExecutionDao.class);
this.taskBatchDao = context.getBean(TaskBatchDao.class);
this.jobRepository = context.getBean(JobRepository.class);
this.dataflowTaskExecutionMetadataDaoContainer = context.getBean(DataflowTaskExecutionMetadataDaoContainer.class);
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
Expand All @@ -378,12 +376,10 @@ private void initialize() {

private void createJobExecution(String name, BatchStatus status) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null);
TaskExecution taskExecution = taskExecutionDao.createTaskExecution(name, LocalDateTime.now(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null);
Map<String, JobParameter<?>> jobParameterMap = new HashMap<>();
JobParameters jobParameters = new JobParameters(jobParameterMap);
JobExecution jobExecution = this.jobRepository.createJobExecution(name, jobParameters);
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
Expand Down Expand Up @@ -71,8 +69,8 @@ public class JobInstancesDocumentation extends BaseDocumentation {

private static boolean initialized;
private JobRepository jobRepository;
private TaskExecutionDaoContainer daoContainer;
private TaskBatchDaoContainer taskBatchDaoContainer;
private TaskExecutionDao taskExecutionDao;
private TaskBatchDao taskBatchDao;
private AggregateExecutionSupport aggregateExecutionSupport;
private TaskDefinitionReader taskDefinitionReader;

Expand Down Expand Up @@ -136,16 +134,14 @@ private void initialize() {
this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class);
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
this.jobRepository = context.getBean(JobRepository.class);
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
this.taskExecutionDao = context.getBean(TaskExecutionDao.class);
this.taskBatchDao = context.getBean(TaskBatchDao.class);
}

private void createJobExecution(String name, BatchStatus status) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
TaskExecution taskExecution = taskExecutionDao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters());
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
Expand Down Expand Up @@ -73,9 +71,9 @@ public class JobStepExecutionsDocumentation extends BaseDocumentation {

private JobRepository jobRepository;

private TaskExecutionDaoContainer daoContainer;
private TaskExecutionDao taskExecutionDao;

private TaskBatchDaoContainer taskBatchDaoContainer;
private TaskBatchDao taskBatchDao;

private AggregateExecutionSupport aggregateExecutionSupport;

Expand Down Expand Up @@ -171,21 +169,19 @@ public void stepProgress() throws Exception {
private void initialize() {
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
this.jobRepository = context.getBean(JobRepository.class);
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
this.taskExecutionDao = context.getBean(TaskExecutionDao.class);
this.taskBatchDao = context.getBean(TaskBatchDao.class);
this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class);
}

private void createJobExecution(String name, BatchStatus status) throws JobInstanceAlreadyCompleteException,
JobExecutionAlreadyRunningException, JobRestartException {
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
TaskExecution taskExecution = taskExecutionDao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters());
StepExecution stepExecution = new StepExecution(name + "_STEP", jobExecution, jobExecution.getId());
stepExecution.setId(null);
jobRepository.add(stepExecution);
TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -33,6 +29,7 @@
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.batch.AllInOneExecutionContextSerializer;
import org.springframework.cloud.dataflow.server.batch.JdbcSearchableJobExecutionDao;
import org.springframework.cloud.dataflow.server.batch.JobService;
import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean;
import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao;
Expand All @@ -48,13 +45,13 @@
import org.springframework.cloud.dataflow.server.repository.JdbcDataflowJobExecutionDao;
import org.springframework.cloud.dataflow.server.repository.JdbcDataflowTaskExecutionDao;
import org.springframework.cloud.dataflow.server.repository.JdbcDataflowTaskExecutionMetadataDao;
import org.springframework.cloud.dataflow.server.repository.JobExecutionDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.dataflow.server.repository.support.SchemaUtilities;
import org.springframework.cloud.task.batch.listener.support.JdbcTaskBatchDao;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.repository.dao.JdbcTaskExecutionDao;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.cloud.task.repository.support.DatabaseType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -71,7 +68,6 @@
*/
@Configuration
public class AggregateDataFlowTaskConfiguration {
private static final Logger logger = LoggerFactory.getLogger(AggregateDataFlowTaskConfiguration.class);

@Bean
public DataflowJobExecutionDaoContainer dataflowJobExecutionDao(DataSource dataSource, SchemaService schemaService) {
Expand Down Expand Up @@ -123,23 +119,18 @@ public DataflowTaskExecutionMetadataDaoContainer dataflowTaskExecutionMetadataDa
}

@Bean
public TaskExecutionDaoContainer taskExecutionDaoContainer(DataSource dataSource, SchemaService schemaService) {
return new TaskExecutionDaoContainer(dataSource, schemaService);
}

@Bean
public JobRepository jobRepositoryContainer(DataSource dataSource,
PlatformTransactionManager platformTransactionManager) throws Exception{
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setTransactionManager(platformTransactionManager);

public TaskExecutionDao taskExecutionDaoContainer(DataSource dataSource) throws Exception{
DataFieldMaxValueIncrementerFactory incrementerFactory = new MultiSchemaIncrementerFactory(dataSource);
JdbcTaskExecutionDao dao = new JdbcTaskExecutionDao(dataSource);
String databaseType;
try {
factoryBean.afterPropertiesSet();
} catch (Throwable x) {
throw new RuntimeException("Exception creating JobRepository", x);
databaseType = DatabaseType.fromMetaData(dataSource).name();
}
return factoryBean.getObject();
catch (MetaDataAccessException e) {
throw new IllegalStateException(e);
}
dao.setTaskIncrementer(incrementerFactory.getIncrementer(databaseType, "TASK_SEQ"));
return dao;
}

@Bean
Expand All @@ -163,8 +154,15 @@ public JobService jobService(DataSource dataSource, PlatformTransactionManager p
}

@Bean
public JobExecutionDaoContainer jobExecutionDaoContainer(DataSource dataSource, SchemaService schemaService) {
return new JobExecutionDaoContainer(dataSource, schemaService);
public JdbcSearchableJobExecutionDao jobExecutionDao(DataSource dataSource) {
JdbcSearchableJobExecutionDao jdbcSearchableJobExecutionDao = new JdbcSearchableJobExecutionDao();
jdbcSearchableJobExecutionDao.setDataSource(dataSource);
try {
jdbcSearchableJobExecutionDao.afterPropertiesSet();
} catch (Throwable x) {
throw new RuntimeException("Exception creating JdbcSearchableJobExecutionDao", x);
}
return jdbcSearchableJobExecutionDao;
}

@Bean
Expand All @@ -186,7 +184,7 @@ public AggregateJobQueryDao aggregateJobQueryDao(DataSource dataSource, SchemaSe
}

@Bean
public TaskBatchDaoContainer taskBatchDaoContainer(DataSource dataSource, SchemaService schemaService) {
return new TaskBatchDaoContainer(dataSource, schemaService);
public JdbcTaskBatchDao taskBatchDao(DataSource dataSource) {
return new JdbcTaskBatchDao(dataSource);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit c1f4178

Please sign in to comment.