From 9baf3a719197cd02ea6ed25ae5cd4f736ece1875 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Wed, 21 Feb 2024 09:16:43 -0500 Subject: [PATCH] Update Code to use JobRepository bean directly Since we will only support BOOT 3 we do not need the JobRepositoryContainer to retrieve BOOT3 or BOOT 2 based JobRepositories. Update test code to set the default time for the local date time sample to a default of midnight Allow services to use JobServie and JobExplorer directly Currently we use containers to allocate the JobService and JobExplorer based on boot version. This is no longer necessary. So this PR removes these containers Update Project as to restore tests success percentage to original state After the updates below tests that were passing started to fail. Some because of Batch 5 updates, but others because of the removal of some of the container classes. Update code based on code review comments Removed todo Removed ExtendsWith statement --- .../JobExecutionsDocumentation.java | 10 +- .../JobInstancesDocumentation.java | 6 +- .../JobStepExecutionsDocumentation.java | 6 +- .../batch/JdbcSearchableJobExecutionDao.java | 72 ++++--------- .../batch/SimpleJobServiceFactoryBean.java | 18 ++-- .../AggregateDataFlowTaskConfiguration.java | 55 +++++++--- .../DataFlowControllerAutoConfiguration.java | 10 +- .../config/features/TaskConfiguration.java | 6 +- .../JobStepExecutionController.java | 15 ++- .../JobStepExecutionProgressController.java | 13 +-- .../repository/JdbcAggregateJobQueryDao.java | 10 +- .../repository/JobExecutionDaoContainer.java | 6 +- .../repository/JobRepositoryContainer.java | 59 ----------- .../server/service/JobExplorerContainer.java | 42 -------- .../server/service/JobServiceContainer.java | 86 --------------- .../service/impl/DefaultTaskJobService.java | 10 +- ...AbstractJdbcAggregateJobQueryDaoTests.java | 5 +- .../batch/AbstractSimpleJobServiceTests.java | 100 ++++++++++-------- .../DataFlowServerConfigurationTests.java | 20 +++- .../server/configuration/JobDependencies.java | 32 ++++-- .../TaskServiceDependencies.java | 17 ++- .../configuration/TestDependencies.java | 19 +++- .../JobExecutionControllerTests.java | 21 ++-- .../JobExecutionThinControllerTests.java | 6 +- .../server/controller/JobExecutionUtils.java | 43 ++++---- .../JobInstanceControllerTests.java | 5 +- .../JobStepExecutionControllerTests.java | 9 +- .../TaskExecutionControllerTests.java | 5 +- .../controller/TasksInfoControllerTests.java | 5 +- ...JobQueryDaoRowNumberOptimizationTests.java | 8 +- .../impl/DefaultTaskDeleteServiceTests.java | 4 +- .../impl/DefaultTaskJobServiceTests.java | 5 +- .../shell/command/JobCommandTests.java | 6 +- 33 files changed, 278 insertions(+), 456 deletions(-) delete mode 100644 spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobRepositoryContainer.java delete mode 100644 spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobExplorerContainer.java delete mode 100644 spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java diff --git a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java index 5b8886ae3b..b970411e27 100644 --- a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java +++ b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java @@ -43,7 +43,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDao; import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDaoContainer; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.task.batch.listener.TaskBatchDao; @@ -83,7 +82,7 @@ public class JobExecutionsDocumentation extends BaseDocumentation { private static boolean initialized; - private JobRepositoryContainer jobRepositoryContainer; + private JobRepository jobRepository; private TaskExecutionDaoContainer daoContainer; @@ -370,7 +369,7 @@ public void jobRestart() throws Exception { private void initialize() { this.daoContainer = context.getBean(TaskExecutionDaoContainer.class); this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class); - this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class); + this.jobRepository = context.getBean(JobRepository.class); this.dataflowTaskExecutionMetadataDaoContainer = context.getBean(DataflowTaskExecutionMetadataDaoContainer.class); this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class); this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class); @@ -383,13 +382,12 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null); Map> jobParameterMap = new HashMap<>(); JobParameters jobParameters = new JobParameters(jobParameterMap); - JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName()); - JobExecution jobExecution = jobRepository.createJobExecution(name, jobParameters); + JobExecution jobExecution = this.jobRepository.createJobExecution(name, jobParameters); TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName()); taskBatchDao.saveRelationship(taskExecution, jobExecution); jobExecution.setStatus(status); jobExecution.setStartTime(LocalDateTime.now()); - jobRepository.update(jobExecution); + this.jobRepository.update(jobExecution); final TaskManifest manifest = new TaskManifest(); manifest.setPlatformName("default"); DataflowTaskExecutionMetadataDao metadataDao = dataflowTaskExecutionMetadataDaoContainer.get(schemaVersionTarget.getName()); diff --git a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobInstancesDocumentation.java b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobInstancesDocumentation.java index afbd9d82f7..6d9f5e179d 100644 --- a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobInstancesDocumentation.java +++ b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobInstancesDocumentation.java @@ -36,7 +36,6 @@ import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; import org.springframework.cloud.dataflow.core.ApplicationType; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.task.batch.listener.TaskBatchDao; @@ -71,7 +70,7 @@ public class JobInstancesDocumentation extends BaseDocumentation { private final static String JOB_NAME = "DOCJOB"; private static boolean initialized; - private JobRepositoryContainer jobRepositoryContainer; + private JobRepository jobRepository; private TaskExecutionDaoContainer daoContainer; private TaskBatchDaoContainer taskBatchDaoContainer; private AggregateExecutionSupport aggregateExecutionSupport; @@ -136,7 +135,7 @@ public void jobDisplayDetail() throws Exception { private void initialize() { this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class); this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class); - this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class); + this.jobRepository = context.getBean(JobRepository.class); this.daoContainer = context.getBean(TaskExecutionDaoContainer.class); this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class); } @@ -145,7 +144,6 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader); TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName()); TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null); - JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName()); JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters()); TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName()); taskBatchDao.saveRelationship(taskExecution, jobExecution); diff --git a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobStepExecutionsDocumentation.java b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobStepExecutionsDocumentation.java index 3f9ad263fd..ec09a5d26b 100644 --- a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobStepExecutionsDocumentation.java +++ b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobStepExecutionsDocumentation.java @@ -37,7 +37,6 @@ import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; import org.springframework.cloud.dataflow.core.ApplicationType; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.task.batch.listener.TaskBatchDao; @@ -72,7 +71,7 @@ public class JobStepExecutionsDocumentation extends BaseDocumentation { private static boolean initialized; - private JobRepositoryContainer jobRepositoryContainer; + private JobRepository jobRepository; private TaskExecutionDaoContainer daoContainer; @@ -171,7 +170,7 @@ public void stepProgress() throws Exception { private void initialize() { this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class); - this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class); + this.jobRepository = context.getBean(JobRepository.class); this.daoContainer = context.getBean(TaskExecutionDaoContainer.class); this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class); this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class); @@ -182,7 +181,6 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader); TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName()); TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null); - JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName()); JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters()); StepExecution stepExecution = new StepExecution(name + "_STEP", jobExecution, jobExecution.getId()); stepExecution.setId(null); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableJobExecutionDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableJobExecutionDao.java index c320abee42..5f4da3a1c7 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableJobExecutionDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableJobExecutionDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2014 the original author or authors. + * Copyright 2006-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,7 +60,7 @@ */ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implements SearchableJobExecutionDao { - private static final String FIND_PARAMS_FROM_ID_5 = "SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID = ?"; + private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID = ?"; private static final String GET_COUNT = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION"; @@ -94,28 +94,16 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement private static final String TASK_EXECUTION_ID_FILTER = "B.JOB_EXECUTION_ID = E.JOB_EXECUTION_ID AND B.TASK_EXECUTION_ID = ?"; - private static final String FIND_JOB_EXECUTIONS_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION" + private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc"; - private static final String FIND_JOB_EXECUTIONS_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" - + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc"; - - private static final String GET_LAST_EXECUTION_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION" + private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" + " from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)"; - private static final String GET_LAST_EXECUTION_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" - + " from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)"; - - private static final String GET_RUNNING_EXECUTIONS_4 = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " - + "E.JOB_INSTANCE_ID, E.JOB_CONFIGURATION_LOCATION from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; - - private static final String GET_RUNNING_EXECUTIONS_5 = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " + private static final String GET_RUNNING_EXECUTIONS_BY_JOB_NAME = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; - private static final String GET_EXECUTION_BY_ID_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION" - + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; - - private static final String GET_EXECUTION_BY_ID_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" + private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; private static final String FROM_CLAUSE_TASK_TASK_BATCH = "%PREFIX%TASK_BATCH B"; @@ -160,15 +148,7 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement private DataSource dataSource; - private BatchVersion batchVersion; - public JdbcSearchableJobExecutionDao() { - this(BatchVersion.BATCH_4); - } - - @SuppressWarnings("deprecation") - public JdbcSearchableJobExecutionDao(BatchVersion batchVersion) { - this.batchVersion = batchVersion; conversionService = new DefaultConversionService(); conversionService.addConverter(new StringToDateConverter()); } @@ -245,17 +225,17 @@ public List findJobExecutions(JobInstance job) { Assert.notNull(job, "Job cannot be null."); Assert.notNull(job.getId(), "Job Id cannot be null."); - String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? FIND_JOB_EXECUTIONS_4 : FIND_JOB_EXECUTIONS_5; - return getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion, job), job.getId()); + String sqlQuery = FIND_JOB_EXECUTIONS; + return getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(job), job.getId()); } @Override public JobExecution getLastJobExecution(JobInstance jobInstance) { Long id = jobInstance.getId(); - String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_LAST_EXECUTION_4 : GET_LAST_EXECUTION_5; + String sqlQuery = GET_LAST_EXECUTION; List executions = getJdbcTemplate().query(getQuery(sqlQuery), - new JobExecutionRowMapper(batchVersion, jobInstance), id, id); + new JobExecutionRowMapper(jobInstance), id, id); Assert.state(executions.size() <= 1, "There must be at most one latest job execution"); @@ -270,9 +250,8 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) { @Override public Set findRunningJobExecutions(String jobName) { Set result = new HashSet<>(); - String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_RUNNING_EXECUTIONS_4 - : GET_RUNNING_EXECUTIONS_5; - getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion), jobName); + String sqlQuery = GET_RUNNING_EXECUTIONS_BY_JOB_NAME; + getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(), jobName); return result; } @@ -280,8 +259,8 @@ public Set findRunningJobExecutions(String jobName) { @Override public JobExecution getJobExecution(Long executionId) { try { - String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_EXECUTION_BY_ID_4 : GET_EXECUTION_BY_ID_5; - return getJdbcTemplate().queryForObject(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion), + String sqlQuery = GET_EXECUTION_BY_ID; + return getJdbcTemplate().queryForObject(getQuery(sqlQuery), new JobExecutionRowMapper(), executionId); } catch (EmptyResultDataAccessException e) { @@ -642,7 +621,7 @@ public JobExecutionWithStepCount mapRow(ResultSet rs, int rowNum) throws SQLExce } //TODO: Boot3x followup - need to handle LocalDateTime and possibly Integer - protected JobParameters getJobParametersBatch5(Long executionId) { + protected JobParameters getJobParameters(Long executionId) { Map> map = new HashMap<>(); RowCallbackHandler handler = rs -> { String parameterName = rs.getString("PARAMETER_NAME"); @@ -686,21 +665,11 @@ else if (typedValue instanceof Date) { } }; - getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID_5), handler, executionId); + getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), handler, executionId); return new JobParameters(map); } - @Override - protected JobParameters getJobParameters(Long executionId) { - if (batchVersion == BatchVersion.BATCH_4) { - return super.getJobParameters(executionId); - } - else { - return getJobParametersBatch5(executionId); - } - } - JobExecution createJobExecutionFromResultSet(ResultSet rs, int rowNum) throws SQLException { Long id = rs.getLong(1); JobExecution jobExecution; @@ -723,16 +692,13 @@ JobExecution createJobExecutionFromResultSet(ResultSet rs, int rowNum) throws SQ private final class JobExecutionRowMapper implements RowMapper { - private final BatchVersion batchVersion; - private JobInstance jobInstance; - public JobExecutionRowMapper(BatchVersion batchVersion) { - this.batchVersion = batchVersion; + public JobExecutionRowMapper() { + } - public JobExecutionRowMapper(BatchVersion batchVersion, JobInstance jobInstance) { - this.batchVersion = batchVersion; + public JobExecutionRowMapper(JobInstance jobInstance) { this.jobInstance = jobInstance; } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java index 5cd2d704e3..084c171340 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2009-2023 the original author or authors. + * Copyright 2009-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,6 @@ import org.springframework.cloud.dataflow.schema.service.SchemaService; import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao; import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.jdbc.core.JdbcOperations; @@ -90,7 +89,7 @@ public class SimpleJobServiceFactoryBean implements FactoryBean, Ini private PlatformTransactionManager transactionManager; - private JobServiceContainer jobServiceContainer; + private JobService jobService; private SchemaService schemaService; @@ -166,11 +165,11 @@ public void setTablePrefix(String tablePrefix) { } /** - * Sets the {@link JobServiceContainer} for the service. - * @param jobServiceContainer the JobServiceContainer for this service. + * Sets the {@link JobService} for the factory bean. + * @param jobService the JobService for this Factory Bean. */ - public void setJobServiceContainer(JobServiceContainer jobServiceContainer) { - this.jobServiceContainer = jobServiceContainer; + public void setJobService(JobService jobService) { + this.jobService = jobService; } /** @@ -264,8 +263,7 @@ protected SearchableJobInstanceDao createJobInstanceDao() throws Exception { } protected SearchableJobExecutionDao createJobExecutionDao() throws Exception { - BatchVersion batchVersion = BatchVersion.from(this.schemaVersionTarget); - JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(batchVersion); + JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(); dao.setDataSource(dataSource); dao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix + "JOB_EXECUTION_SEQ")); @@ -313,7 +311,7 @@ private int determineClobTypeToUse(String databaseType) { } protected AggregateJobQueryDao createAggregateJobQueryDao() throws Exception { - return new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobServiceContainer, this.environment); + return new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobService, this.environment); } /** diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/AggregateDataFlowTaskConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/AggregateDataFlowTaskConfiguration.java index 5bdcb6abbe..758b41162b 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/AggregateDataFlowTaskConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/AggregateDataFlowTaskConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory; import org.springframework.beans.BeanUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -28,6 +32,9 @@ import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.server.batch.AllInOneExecutionContextSerializer; +import org.springframework.cloud.dataflow.server.batch.JobService; +import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean; import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao; import org.springframework.cloud.dataflow.server.repository.DataflowJobExecutionDao; import org.springframework.cloud.dataflow.server.repository.DataflowJobExecutionDaoContainer; @@ -42,14 +49,11 @@ import org.springframework.cloud.dataflow.server.repository.JdbcDataflowTaskExecutionDao; import org.springframework.cloud.dataflow.server.repository.JdbcDataflowTaskExecutionMetadataDao; import org.springframework.cloud.dataflow.server.repository.JobExecutionDaoContainer; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.dataflow.server.repository.support.SchemaUtilities; -import org.springframework.cloud.dataflow.server.service.JobExplorerContainer; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.task.configuration.TaskProperties; import org.springframework.cloud.task.repository.support.DatabaseType; import org.springframework.context.annotation.Bean; @@ -124,21 +128,38 @@ public TaskExecutionDaoContainer taskExecutionDaoContainer(DataSource dataSource } @Bean - public JobRepositoryContainer jobRepositoryContainer(DataSource dataSource, PlatformTransactionManager platformTransactionManager, SchemaService schemaService) { - return new JobRepositoryContainer(dataSource, platformTransactionManager, schemaService); - } + public JobRepository jobRepositoryContainer(DataSource dataSource, + PlatformTransactionManager platformTransactionManager) throws Exception{ + JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); - @Bean - public JobExplorerContainer jobExplorerContainer(DataSource dataSource, SchemaService schemaService, PlatformTransactionManager platformTransactionManager) { - return new JobExplorerContainer(dataSource, schemaService, platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobRepository", x); + } + return factoryBean.getObject(); } @Bean - public JobServiceContainer jobServiceContainer(DataSource dataSource, PlatformTransactionManager platformTransactionManager, - SchemaService schemaService, JobRepositoryContainer jobRepositoryContainer, - JobExplorerContainer jobExplorerContainer, Environment environment) { - return new JobServiceContainer(dataSource, platformTransactionManager, schemaService, jobRepositoryContainer, - jobExplorerContainer, environment); + public JobService jobService(DataSource dataSource, PlatformTransactionManager platformTransactionManager, + JobRepository jobRepository, JobExplorer jobExplorer, Environment environment) + throws Exception{ + SimpleJobServiceFactoryBean factoryBean = new SimpleJobServiceFactoryBean(); + factoryBean.setEnvironment(environment); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + factoryBean.setJobLauncher(new SimpleJobLauncher()); + factoryBean.setJobExplorer(jobExplorer); + factoryBean.setJobRepository(jobRepository); + factoryBean.setSerializer(new AllInOneExecutionContextSerializer()); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobService", x); + } + return factoryBean.getObject(); } @Bean @@ -160,8 +181,8 @@ public TaskDeploymentReader taskDeploymentReader(TaskDeploymentRepository reposi @Bean public AggregateJobQueryDao aggregateJobQueryDao(DataSource dataSource, SchemaService schemaService, - JobServiceContainer jobServiceContainer, Environment environment) throws Exception { - return new JdbcAggregateJobQueryDao(dataSource, schemaService, jobServiceContainer, environment); + JobService jobService, Environment environment) throws Exception { + return new JdbcAggregateJobQueryDao(dataSource, schemaService, jobService, environment); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java index 7b718e3e3e..fa7d1a9879 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java @@ -60,6 +60,7 @@ import org.springframework.cloud.dataflow.schema.service.SchemaService; import org.springframework.cloud.dataflow.server.DockerValidatorProperties; import org.springframework.cloud.dataflow.server.TaskValidationController; +import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.config.features.ConditionalOnStreamsEnabled; import org.springframework.cloud.dataflow.server.config.features.ConditionalOnTasksEnabled; @@ -104,7 +105,6 @@ import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport; import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.LauncherService; import org.springframework.cloud.dataflow.server.service.SchedulerService; import org.springframework.cloud.dataflow.server.service.SpringSecurityAuditorAware; @@ -342,13 +342,13 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { - return new JobStepExecutionController(jobServiceContainer); + public JobStepExecutionController jobStepExecutionController(JobService jobService) { + return new JobStepExecutionController(jobService); } @Bean - public JobStepExecutionProgressController jobStepExecutionProgressController(JobServiceContainer jobServiceContainer, TaskJobService taskJobService) { - return new JobStepExecutionProgressController(jobServiceContainer, taskJobService); + public JobStepExecutionProgressController jobStepExecutionProgressController(JobService jobService, TaskJobService taskJobService) { + return new JobStepExecutionProgressController(jobService, taskJobService); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java index a0f20c5b76..a63d38e93c 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java @@ -40,6 +40,7 @@ import org.springframework.cloud.dataflow.schema.service.SchemaService; import org.springframework.cloud.dataflow.schema.service.SchemaServiceConfiguration; import org.springframework.cloud.dataflow.server.DockerValidatorProperties; +import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.config.AggregateDataFlowTaskConfiguration; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.job.LauncherRepository; @@ -52,7 +53,6 @@ import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository; import org.springframework.cloud.dataflow.server.service.DeployerConfigurationMetadataResolver; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.LauncherInitializationService; import org.springframework.cloud.dataflow.server.service.SchedulerService; import org.springframework.cloud.dataflow.server.service.TaskDeleteService; @@ -264,7 +264,7 @@ public TaskExecutionService taskService( public static class TaskJobServiceConfig { @Bean public TaskJobService taskJobExecutionRepository( - JobServiceContainer serviceContainer, + JobService service, AggregateTaskExplorer taskExplorer, TaskDefinitionRepository taskDefinitionRepository, TaskExecutionService taskExecutionService, @@ -274,7 +274,7 @@ public TaskJobService taskJobExecutionRepository( TaskDefinitionReader taskDefinitionReader ) { return new DefaultTaskJobService( - serviceContainer, + service, taskExplorer, taskDefinitionRepository, taskExecutionService, diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java index 5a4c121e4f..6854e66400 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java @@ -28,7 +28,6 @@ import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException; import org.springframework.cloud.dataflow.server.job.support.StepExecutionResourceBuilder; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; @@ -55,17 +54,17 @@ @ExposesResourceFor(StepExecutionResource.class) public class JobStepExecutionController { - private final JobServiceContainer jobServiceContainer; + private final JobService jobService; /** * Creates a {@code JobStepExecutionsController} that retrieves Job Step Execution - * information from a the {@link JobServiceContainer} + * information from a the {@link JobService} * - * @param jobServiceContainer JobServiceContainer to select the JobService + * @param jobService JobService used for this controller */ @Autowired - public JobStepExecutionController(JobServiceContainer jobServiceContainer) { - Assert.notNull(jobServiceContainer, "jobServiceContainer required"); - this.jobServiceContainer = jobServiceContainer; + public JobStepExecutionController(JobService jobService) { + Assert.notNull(jobService, "jobService required"); + this.jobService = jobService; } /** @@ -89,7 +88,6 @@ public PagedModel stepExecutions( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - JobService jobService = jobServiceContainer.get(schemaTarget); List result = new ArrayList<>(jobService.getStepExecutions(id)); Page page = new PageImpl<>(result, pageable, result.size()); final Assembler stepAssembler = new Assembler(schemaTarget); @@ -116,7 +114,6 @@ public StepExecutionResource getStepExecution( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - JobService jobService = jobServiceContainer.get(schemaTarget); StepExecution stepExecution = jobService.getStepExecution(id, stepId); final Assembler stepAssembler = new Assembler(schemaTarget); return stepAssembler.toModel(stepExecution); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java index 388dec86d4..99d9cb1a6a 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java @@ -26,7 +26,6 @@ import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException; import org.springframework.cloud.dataflow.server.job.support.StepExecutionProgressInfo; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.TaskJobService; import org.springframework.hateoas.server.ExposesResourceFor; import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport; @@ -53,20 +52,20 @@ public class JobStepExecutionProgressController { private final TaskJobService taskJobService; - private final JobServiceContainer jobServiceContainer; + private final JobService jobService; /** * Creates a {@code JobStepProgressInfoExecutionsController} that retrieves Job Step - * Progress Execution information from a the {@link JobServiceContainer} + * Progress Execution information from a the {@link JobService} * - * @param jobServiceContainer A container of JobServices that this controller will use for retrieving job step + * @param jobService The JobService this controller will use for retrieving job step * progress execution information. * @param taskJobService Queries both schemas. */ @Autowired - public JobStepExecutionProgressController(JobServiceContainer jobServiceContainer, TaskJobService taskJobService) { + public JobStepExecutionProgressController(JobService jobService, TaskJobService taskJobService) { this.taskJobService = taskJobService; - this.jobServiceContainer = jobServiceContainer; + this.jobService = jobService; } /** @@ -92,7 +91,6 @@ public StepExecutionProgressInfoResource progress( if (!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - JobService jobService = jobServiceContainer.get(schemaTarget); StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId); String stepName = stepExecution.getStepName(); if (stepName.contains(":partition")) { @@ -118,7 +116,6 @@ public StepExecutionProgressInfoResource progress( * @return the step execution history for the given step */ private StepExecutionHistory computeHistory(String jobName, String stepName, String schemaTarget) { - JobService jobService = jobServiceContainer.get(schemaTarget); int total = jobService.countStepExecutionsForStep(jobName, stepName); StepExecutionHistory stepExecutionHistory = new StepExecutionHistory(stepName); for (int i = 0; i < total; i += 1000) { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java index ff3dfdc879..fcf93c9ab3 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java @@ -65,7 +65,6 @@ import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.converter.DateToStringConverter; import org.springframework.cloud.dataflow.server.converter.StringToDateConverter; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.impl.OffsetOutOfBoundsException; import org.springframework.core.convert.support.ConfigurableConversionService; import org.springframework.core.convert.support.DefaultConversionService; @@ -242,7 +241,7 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao { private final SchemaService schemaService; - private final JobServiceContainer jobServiceContainer; + private final JobService jobService; private final ConfigurableConversionService conversionService = new DefaultConversionService(); @@ -251,12 +250,12 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao { public JdbcAggregateJobQueryDao( DataSource dataSource, SchemaService schemaService, - JobServiceContainer jobServiceContainer, + JobService jobService, Environment environment) throws Exception { this.dataSource = dataSource; this.jdbcTemplate = new JdbcTemplate(dataSource); this.schemaService = schemaService; - this.jobServiceContainer = jobServiceContainer; + this.jobService = jobService; this.useRowNumberOptimization = determineUseRowNumberOptimization(environment); conversionService.addConverter(new DateToStringConverter()); @@ -337,7 +336,7 @@ public JobInstanceExecutions getJobInstanceExecutions(long jobInstanceId, String JobInstanceExecutions jobInstanceExecution = executions.get(0); if (!ObjectUtils.isEmpty(jobInstanceExecution.getTaskJobExecutions())) { jobInstanceExecution.getTaskJobExecutions().forEach((execution) -> - jobServiceContainer.get(execution.getSchemaTarget()).addStepExecutions(execution.getJobExecution()) + jobService.addStepExecutions(execution.getJobExecution()) ); } return jobInstanceExecution; @@ -433,7 +432,6 @@ public TaskJobExecution getJobExecution(long jobExecutionId, String schemaTarget } TaskJobExecution taskJobExecution = jobExecutions.get(0); - JobService jobService = jobServiceContainer.get(taskJobExecution.getSchemaTarget()); jobService.addStepExecutions(taskJobExecution.getJobExecution()); return taskJobExecution; } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobExecutionDaoContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobExecutionDaoContainer.java index 4876834e69..fe7b7b6a70 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobExecutionDaoContainer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobExecutionDaoContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; -import org.springframework.cloud.dataflow.server.batch.BatchVersion; import org.springframework.cloud.dataflow.server.batch.JdbcSearchableJobExecutionDao; import org.springframework.cloud.dataflow.server.batch.SearchableJobExecutionDao; import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; @@ -37,8 +36,7 @@ public class JobExecutionDaoContainer { public JobExecutionDaoContainer(DataSource dataSource, SchemaService schemaService) { for (SchemaVersionTarget target : schemaService.getTargets().getSchemas()) { - BatchVersion batchVersion = BatchVersion.from(target); - JdbcSearchableJobExecutionDao jdbcSearchableJobExecutionDao = new JdbcSearchableJobExecutionDao(batchVersion); + JdbcSearchableJobExecutionDao jdbcSearchableJobExecutionDao = new JdbcSearchableJobExecutionDao(); jdbcSearchableJobExecutionDao.setDataSource(dataSource); jdbcSearchableJobExecutionDao.setTablePrefix(target.getBatchPrefix()); try { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobRepositoryContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobRepositoryContainer.java deleted file mode 100644 index c3914de4b1..0000000000 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JobRepositoryContainer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.dataflow.server.repository; - -import javax.sql.DataSource; -import java.util.HashMap; -import java.util.Map; - -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; -import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.schema.service.SchemaService; -import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.util.StringUtils; - -public class JobRepositoryContainer { - private final Map container = new HashMap<>(); - - public JobRepositoryContainer(DataSource dataSource, PlatformTransactionManager transactionManager, SchemaService schemaService) { - for (SchemaVersionTarget target : schemaService.getTargets().getSchemas()) { - JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean(); - factoryBean.setDataSource(dataSource); - factoryBean.setTablePrefix(target.getBatchPrefix()); - factoryBean.setTransactionManager(transactionManager); - - try { - factoryBean.afterPropertiesSet(); - container.put(target.getName(), factoryBean.getObject()); - } catch (Throwable x) { - throw new RuntimeException("Exception creating JobRepository for:" + target.getName(), x); - } - } - } - - public JobRepository get(String schemaTarget) { - if(!StringUtils.hasText(schemaTarget)) { - schemaTarget = SchemaVersionTarget.defaultTarget().getName(); - } - if(!container.containsKey(schemaTarget)) { - throw new NoSuchSchemaTargetException(schemaTarget); - } - return container.get(schemaTarget); - } -} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobExplorerContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobExplorerContainer.java deleted file mode 100644 index 841224fba6..0000000000 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobExplorerContainer.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.springframework.cloud.dataflow.server.service; - -import javax.sql.DataSource; -import java.util.HashMap; -import java.util.Map; - -import org.springframework.batch.core.explore.JobExplorer; -import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; -import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.schema.service.SchemaService; -import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.util.StringUtils; - -public class JobExplorerContainer { - private final Map container = new HashMap<>(); - - public JobExplorerContainer(DataSource dataSource, SchemaService schemaService, PlatformTransactionManager platformTransactionManager) { - for (SchemaVersionTarget target : schemaService.getTargets().getSchemas()) { - JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); - factoryBean.setDataSource(dataSource); - factoryBean.setTablePrefix(target.getBatchPrefix()); - factoryBean.setTransactionManager(platformTransactionManager); - try { - factoryBean.afterPropertiesSet(); - container.put(target.getName(), factoryBean.getObject()); - } catch (Throwable x) { - throw new RuntimeException("Exception creating JobExplorer for " + target.getName(), x); - } - } - } - - public JobExplorer get(String schemaTarget) { - if(!StringUtils.hasText(schemaTarget)) { - schemaTarget = SchemaVersionTarget.defaultTarget().getName(); - } - if(!container.containsKey(schemaTarget)) { - throw new NoSuchSchemaTargetException(schemaTarget); - } - return container.get(schemaTarget); - } -} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java deleted file mode 100644 index f8dcffc582..0000000000 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.cloud.dataflow.server.service; - -import java.util.HashMap; -import java.util.Map; - -import javax.sql.DataSource; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.batch.core.launch.support.SimpleJobLauncher; -import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.schema.service.SchemaService; -import org.springframework.cloud.dataflow.server.batch.AllInOneExecutionContextSerializer; -import org.springframework.cloud.dataflow.server.batch.JobService; -import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean; -import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; -import org.springframework.core.env.Environment; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.util.StringUtils; - -/** - * The container provides implementations of JobService for each SchemaTarget. - * - * @author Corneil du Plessis - */ -public class JobServiceContainer { - private final static Logger logger = LoggerFactory.getLogger(JobServiceContainer.class); - private final Map container = new HashMap<>(); - - public JobServiceContainer( - DataSource dataSource, - PlatformTransactionManager platformTransactionManager, - SchemaService schemaService, - JobRepositoryContainer jobRepositoryContainer, - JobExplorerContainer jobExplorerContainer, - Environment environment) { - - for(SchemaVersionTarget target : schemaService.getTargets().getSchemas()) { - SimpleJobServiceFactoryBean factoryBean = new SimpleJobServiceFactoryBean(); - factoryBean.setEnvironment(environment); - factoryBean.setDataSource(dataSource); - factoryBean.setTransactionManager(platformTransactionManager); - factoryBean.setJobServiceContainer(this); - factoryBean.setJobLauncher(new SimpleJobLauncher()); - factoryBean.setJobExplorer(jobExplorerContainer.get(target.getName())); - factoryBean.setJobRepository(jobRepositoryContainer.get(target.getName())); - factoryBean.setTablePrefix(target.getBatchPrefix()); - factoryBean.setAppBootSchemaVersionTarget(target); - factoryBean.setSchemaService(schemaService); - factoryBean.setSerializer(new AllInOneExecutionContextSerializer()); - try { - factoryBean.afterPropertiesSet(); - container.put(target.getName(), factoryBean.getObject()); - } catch (Throwable x) { - throw new RuntimeException("Exception creating JobService for " + target.getName(), x); - } - } - } - public JobService get(String schemaTarget) { - if(!StringUtils.hasText(schemaTarget)) { - schemaTarget = SchemaVersionTarget.defaultTarget().getName(); - logger.info("get:default={}", schemaTarget); - } - if(!container.containsKey(schemaTarget)) { - throw new NoSuchSchemaTargetException(schemaTarget); - } - return container.get(schemaTarget); - } -} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java index 92d4158971..58870a605b 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java @@ -55,7 +55,6 @@ import org.springframework.cloud.dataflow.server.repository.NoSuchTaskBatchException; import org.springframework.cloud.dataflow.server.repository.NoSuchTaskDefinitionException; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.TaskExecutionService; import org.springframework.cloud.dataflow.server.service.TaskJobService; import org.springframework.data.domain.Page; @@ -83,7 +82,7 @@ public class DefaultTaskJobService implements TaskJobService { private final AggregateTaskExplorer taskExplorer; - private final JobServiceContainer jobServiceContainer; + private final JobService jobService; private final TaskDefinitionRepository taskDefinitionRepository; @@ -96,7 +95,7 @@ public class DefaultTaskJobService implements TaskJobService { private final TaskDefinitionReader taskDefinitionReader; public DefaultTaskJobService( - JobServiceContainer jobServiceContainer, + JobService jobService, AggregateTaskExplorer taskExplorer, TaskDefinitionRepository taskDefinitionRepository, TaskExecutionService taskExecutionService, @@ -105,14 +104,14 @@ public DefaultTaskJobService( AggregateJobQueryDao aggregateJobQueryDao, TaskDefinitionReader taskDefinitionReader) { this.aggregateJobQueryDao = aggregateJobQueryDao; - Assert.notNull(jobServiceContainer, "jobService must not be null"); + Assert.notNull(jobService, "jobService must not be null"); Assert.notNull(taskExplorer, "taskExplorer must not be null"); Assert.notNull(taskDefinitionRepository, "taskDefinitionRepository must not be null"); Assert.notNull(taskDefinitionReader, "taskDefinitionReader must not be null"); Assert.notNull(taskExecutionService, "taskExecutionService must not be null"); Assert.notNull(launcherRepository, "launcherRepository must not be null"); Assert.notNull(aggregateExecutionSupport, "CompositeExecutionSupport must not be null"); - this.jobServiceContainer = jobServiceContainer; + this.jobService = jobService; this.taskExplorer = taskExplorer; this.taskDefinitionRepository = taskDefinitionRepository; this.taskDefinitionReader = taskDefinitionReader; @@ -284,7 +283,6 @@ public void stopJobExecution(long jobExecutionId, String schemaTarget) throws No if (!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - JobService jobService = jobServiceContainer.get(schemaTarget); BatchStatus status = jobService.stop(jobExecutionId).getStatus(); logger.info("stopped:{}:{}:status={}", jobExecutionId, schemaTarget, status); } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java index 32788119d6..225d3f3b70 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java @@ -30,7 +30,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService; import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.mock.env.MockEnvironment; import static org.assertj.core.api.Assertions.assertThat; @@ -43,7 +42,7 @@ abstract class AbstractJdbcAggregateJobQueryDaoTests extends AbstractDaoTests { public JdbcSearchableJobInstanceDao jdbcSearchableJobInstanceDao; @Mock - private JobServiceContainer jobServiceContainer; + private JobService jobService; private JdbcAggregateJobQueryDao jdbcAggregateJobQueryDao; @@ -56,7 +55,7 @@ protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaN MockEnvironment environment = new MockEnvironment(); environment.setProperty("spring.cloud.dataflow.task.jdbc.row-number-optimization.enabled", "true"); this.jdbcAggregateJobQueryDao = new JdbcAggregateJobQueryDao(super.getDataSource(), new DefaultSchemaService(), - this.jobServiceContainer, environment); + this.jobService, environment); jdbcSearchableJobInstanceDao = new JdbcSearchableJobInstanceDao(); jdbcSearchableJobInstanceDao.setJdbcTemplate(super.getJdbcTemplate()); incrementerFactory = new MultiSchemaIncrementerFactory(super.getDataSource()); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java index 5da5ce06ce..4d1677dece 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java @@ -23,7 +23,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,6 +30,11 @@ import javax.sql.DataSource; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.testcontainers.containers.JdbcDatabaseContainer; import org.springframework.batch.core.BatchStatus; @@ -51,9 +55,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; -import org.springframework.cloud.dataflow.server.service.JobExplorerContainer; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.TaskRepository; import org.springframework.cloud.task.repository.support.SimpleTaskRepository; @@ -86,7 +87,7 @@ public abstract class AbstractSimpleJobServiceTests extends AbstractDaoTests { private DataFieldMaxValueIncrementerFactory incrementerFactory; @Autowired - private JobServiceContainer jobServiceContainer; + private JobService jobService; private DatabaseType databaseType; @@ -132,7 +133,6 @@ void retrieveJobExecutionCountBeforeAndAfterJobExecutionBoot3() throws Exception } private void doRetrieveJobExecutionCountBeforeAndAfter(SchemaVersionTarget schemaVersionTarget) throws Exception { - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); assertThat(jobService.countJobExecutions()).isEqualTo(0); createJobExecution(BASE_JOB_INST_NAME, schemaVersionTarget.getSchemaVersion()); assertThat(jobService.countJobExecutions()).isEqualTo(1); @@ -150,7 +150,6 @@ void retrieveJobExecutionsByTypeAfterJobExeuctionBoot3() throws Exception { private void doRetrieveJobExecutionsByTypeAfter(SchemaVersionTarget schemaVersionTarget) throws Exception { String suffix = "_BY_NAME"; - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); assertThat(jobService.listJobExecutionsForJob(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, 0, 5).size()) .isEqualTo(0); createJobExecutions(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, schemaVersionTarget.getSchemaVersion(), @@ -179,7 +178,6 @@ void retrieveJobExecutionCountWithoutFilterBoot3() throws Exception { private void doRetrieveJobExecutionCountWithoutFilter(SchemaVersionTarget schemaVersionTarget) throws Exception { String suffix = "_BY_NAME"; String suffixFailed = suffix + "_FAILED"; - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); assertThat(jobService.listJobExecutionsForJob(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, 0, 20).size()) .isEqualTo(0); createJobExecutions(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, schemaVersionTarget.getSchemaVersion(), @@ -204,7 +202,6 @@ void retrieveJobExecutionCountFilteredByNameBoot3() throws Exception { private void doRetrieveJobExecutionCountFilteredByName(SchemaVersionTarget schemaVersionTarget) throws Exception { String suffix = "COUNT_BY_NAME"; - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); assertThat(jobService.listJobExecutionsForJob(BASE_JOB_INST_NAME + suffix, null, 0, 20).size()).isEqualTo(0); createJobExecutions(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, schemaVersionTarget.getSchemaVersion(), false, 5); @@ -225,7 +222,6 @@ void retrieveJobExecutionCountFilteredByStatusBoot3() throws Exception { private void doRetrieveJobExecutionCountFilteredByStatus(SchemaVersionTarget schemaVersionTarget) throws Exception { String suffix = "_COUNT_BY_NAME"; - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); assertThat(jobService.countJobExecutionsForJob(null, BatchStatus.COMPLETED)).isEqualTo(0); createJobExecutions(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, schemaVersionTarget.getSchemaVersion(), false, 5); @@ -246,7 +242,6 @@ void retrieveJobExecutionCountFilteredNameAndStatusBoot3() throws Exception { private void doRetrieveJobExecutionCountFilteredNameAndStatus(SchemaVersionTarget schemaVersionTarget) throws Exception { - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); String suffix = "_COUNT_BY_NAME_STATUS"; assertThat(jobService.listJobExecutionsForJob(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, 0, 20).size()) .isEqualTo(0); @@ -272,7 +267,6 @@ void retrieveJobExecutionWithStepCountBoot3() throws Exception { private void doRetrieveJobExecutionWithStepCount(SchemaVersionTarget schemaVersionTarget) throws Exception { String suffix = "_JOB_EXECUTIONS_WITH_STEP_COUNT"; - JobService jobService = jobServiceContainer.get(schemaVersionTarget.getName()); createJobExecutions(BASE_JOB_INST_NAME + suffix, BatchStatus.COMPLETED, schemaVersionTarget.getSchemaVersion(), false, 5); Collection jobExecutionsWithStepCount = jobService.listJobExecutionsWithStepCount(0, @@ -288,8 +282,7 @@ private void doRetrieveJobExecutionWithStepCount(SchemaVersionTarget schemaVersi void getJobInstancesThatExist() throws Exception { createJobInstance(BASE_JOB_INST_NAME + "BOOT2", AppBootSchemaVersion.BOOT2); createJobInstance(BASE_JOB_INST_NAME + "BOOT3", AppBootSchemaVersion.BOOT3); - verifyJobInstance(1, "boot2", BASE_JOB_INST_NAME + "BOOT2"); - verifyJobInstance(1, "boot3", BASE_JOB_INST_NAME + "BOOT3"); + verifyJobInstance(1, BASE_JOB_INST_NAME + "BOOT3"); } @Test @@ -304,40 +297,29 @@ void getJobExecutionsThatExist() throws Exception { @Test void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() { assertThatThrownBy(() -> { - this.jobServiceContainer.get("boot2").getJobInstance(1); - }).isInstanceOf(NoSuchJobInstanceException.class).hasMessageContaining("JobInstance with id=1 does not exist"); - assertThatThrownBy(() -> { - this.jobServiceContainer.get("boot3").getJobInstance(1); + jobService.getJobInstance(1); }).isInstanceOf(NoSuchJobInstanceException.class).hasMessageContaining("JobInstance with id=1 does not exist"); } @Test void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception { - JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME + "BOOT3", AppBootSchemaVersion.BOOT3, true); - jobExecution = this.jobServiceContainer.get("boot3").getJobExecution(jobExecution.getId()); - assertThat(jobExecution.isRunning()).isTrue(); - assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); - this.jobServiceContainer.get("boot3").stop(jobExecution.getId()); - jobExecution = this.jobServiceContainer.get("boot3").getJobExecution(jobExecution.getId()); - assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); - - jobExecution = createJobExecution(BASE_JOB_INST_NAME + "BOOT2", AppBootSchemaVersion.BOOT2, true); - jobExecution = this.jobServiceContainer.get("boot2").getJobExecution(jobExecution.getId()); + JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME, AppBootSchemaVersion.BOOT3, true); + jobExecution = jobService.getJobExecution(jobExecution.getId()); assertThat(jobExecution.isRunning()).isTrue(); assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); - this.jobServiceContainer.get("boot2").stop(jobExecution.getId()); - jobExecution = this.jobServiceContainer.get("boot2").getJobExecution(jobExecution.getId()); + jobService.stop(jobExecution.getId()); + jobExecution = jobService.getJobExecution(jobExecution.getId()); assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); } - private void verifyJobInstance(long id, String schemaTarget, String name) throws Exception { - JobInstance jobInstance = this.jobServiceContainer.get(schemaTarget).getJobInstance(id); + private void verifyJobInstance(long id, String name) throws Exception { + JobInstance jobInstance = jobService.getJobInstance(id); assertThat(jobInstance).isNotNull(); assertThat(jobInstance.getJobName()).isEqualTo(name); } private void verifyJobExecution(long id, String schemaTarget, String name) throws Exception { - JobExecution jobExecution = this.jobServiceContainer.get(schemaTarget).getJobExecution(id); + JobExecution jobExecution = jobService.getJobExecution(id); assertThat(jobExecution).isNotNull(); assertThat(jobExecution.getId()).isEqualTo(id); assertThat(jobExecution.getJobInstance().getJobName()).isEqualTo(name); @@ -496,23 +478,53 @@ public SchemaService schemaService() { } @Bean - public JobRepositoryContainer jobRepositoryContainer(DataSource dataSource, - PlatformTransactionManager transactionManager, SchemaService schemaService) { - return new JobRepositoryContainer(dataSource, transactionManager, schemaService); + public JobRepository jobRepository(DataSource dataSource, + PlatformTransactionManager transactionManager) throws Exception { + JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(transactionManager); + + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobRepository", x); + } + return factoryBean.getObject(); } @Bean - public JobExplorerContainer jobExplorerContainer(DataSource dataSource, SchemaService schemaService, PlatformTransactionManager platformTransactionManager) { - return new JobExplorerContainer(dataSource, schemaService, platformTransactionManager); + public JobExplorer jobExplorer(DataSource dataSource, PlatformTransactionManager platformTransactionManager) + throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobExplorer", x); + } + return factoryBean.getObject(); } @Bean - public JobServiceContainer jobServiceContainer(DataSource dataSource, - PlatformTransactionManager platformTransactionManager, SchemaService schemaService, - JobRepositoryContainer jobRepositoryContainer, JobExplorerContainer jobExplorerContainer, - Environment environment) { - return new JobServiceContainer(dataSource, platformTransactionManager, schemaService, - jobRepositoryContainer, jobExplorerContainer, environment); + public JobService jobService(DataSource dataSource, + PlatformTransactionManager platformTransactionManager, + JobRepository jobRepository, JobExplorer jobExplorer, + Environment environment) throws Exception { + SimpleJobServiceFactoryBean factoryBean = new SimpleJobServiceFactoryBean(); + factoryBean.setEnvironment(environment); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + factoryBean.setJobLauncher(new SimpleJobLauncher()); + factoryBean.setJobExplorer(jobExplorer); + factoryBean.setJobRepository(jobRepository); + factoryBean.setSerializer(new AllInOneExecutionContextSerializer()); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobService", x); + } + return factoryBean.getObject(); } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/config/DataFlowServerConfigurationTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/config/DataFlowServerConfigurationTests.java index 6a4377338b..c1fdc86015 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/config/DataFlowServerConfigurationTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/config/DataFlowServerConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,14 @@ package org.springframework.cloud.dataflow.server.config; +import javax.sql.DataSource; import java.net.ConnectException; import org.h2.tools.Server; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.beans.factory.BeanCreationException; import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; @@ -49,6 +52,7 @@ import org.springframework.core.NestedExceptionUtils; import org.springframework.hateoas.config.EnableHypermediaSupport; import org.springframework.security.authentication.AuthenticationManager; +import org.springframework.transaction.PlatformTransactionManager; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -172,5 +176,19 @@ public StreamDefinitionService streamDefinitionService() { public ContainerRegistryService containerRegistryService() { return mock(ContainerRegistryService.class); } + + @Bean + public JobExplorer jobExplorer(DataSource dataSource, PlatformTransactionManager platformTransactionManager) + throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobExplorer", x); + } + return factoryBean.getObject(); + } } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index 896883caa0..5081e2babd 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.batch.BatchDataSourceScriptDatabaseInitializer; import org.springframework.boot.autoconfigure.batch.BatchProperties; @@ -60,6 +62,7 @@ import org.springframework.cloud.dataflow.schema.service.SchemaService; import org.springframework.cloud.dataflow.schema.service.SchemaServiceConfiguration; import org.springframework.cloud.dataflow.server.DockerValidatorProperties; +import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.config.AggregateDataFlowTaskConfiguration; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.controller.JobExecutionController; @@ -80,7 +83,6 @@ import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository; import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.cloud.dataflow.server.service.LauncherService; import org.springframework.cloud.dataflow.server.service.SchedulerService; import org.springframework.cloud.dataflow.server.service.TaskDeleteService; @@ -160,6 +162,20 @@ @EnableMapRepositories(basePackages = "org.springframework.cloud.dataflow.server.job") public class JobDependencies { + @Bean + public JobExplorer jobExplorer(DataSource dataSource, PlatformTransactionManager platformTransactionManager) + throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobExplorer", x); + } + return factoryBean.getObject(); + } + @Bean public Jackson2ObjectMapperBuilderCustomizer dataflowObjectMapperBuilderCustomizer() { return (builder) -> { @@ -196,13 +212,13 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { - return new JobStepExecutionController(jobServiceContainer); + public JobStepExecutionController jobStepExecutionController(JobService jobService) { + return new JobStepExecutionController(jobService); } @Bean - public JobStepExecutionProgressController jobStepExecutionProgressController(JobServiceContainer jobServiceContainer, TaskJobService taskJobService) { - return new JobStepExecutionProgressController(jobServiceContainer, taskJobService); + public JobStepExecutionProgressController jobStepExecutionProgressController(JobService jobService, TaskJobService taskJobService) { + return new JobStepExecutionProgressController(jobService, taskJobService); } @Bean @@ -261,7 +277,7 @@ public TaskLogsController taskLogsController(TaskExecutionService taskExecutionS @Bean public TaskJobService taskJobExecutionRepository( - JobServiceContainer jobServiceContainer, + JobService jobService, AggregateTaskExplorer taskExplorer, TaskDefinitionRepository taskDefinitionRepository, TaskExecutionService taskExecutionService, @@ -271,7 +287,7 @@ public TaskJobService taskJobExecutionRepository( TaskDefinitionReader taskDefinitionReader ) { return new DefaultTaskJobService( - jobServiceContainer, + jobService, taskExplorer, taskDefinitionRepository, taskExecutionService, diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TaskServiceDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TaskServiceDependencies.java index e13df8df39..2826de42ba 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TaskServiceDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TaskServiceDependencies.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import javax.sql.DataSource; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; @@ -418,5 +420,18 @@ public OAuth2TokenUtilsService oauth2TokenUtilsService() { when(oauth2TokenUtilsService.getAccessTokenOfAuthenticatedUser()).thenReturn("foo-bar-123-token"); return oauth2TokenUtilsService; } + @Bean + public JobExplorer jobExplorer(DataSource dataSource, PlatformTransactionManager platformTransactionManager) + throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobExplorer", x); + } + return factoryBean.getObject(); + } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java index fbc4347544..87eafb376e 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2021 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,8 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.mockito.Mockito; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.actuate.info.BuildInfoContributor; import org.springframework.boot.actuate.info.GitInfoContributor; @@ -183,7 +185,6 @@ import org.springframework.validation.beanvalidation.MethodValidationPostProcessor; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.PathMatchConfigurer; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import static org.mockito.Mockito.mock; @@ -239,6 +240,20 @@ @EnableMapRepositories("org.springframework.cloud.dataflow.server.job") @EnableTransactionManagement public class TestDependencies implements WebMvcConfigurer { + @Bean + public JobExplorer jobExplorer(DataSource dataSource, PlatformTransactionManager platformTransactionManager) + throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(platformTransactionManager); + try { + factoryBean.afterPropertiesSet(); + } catch (Throwable x) { + throw new RuntimeException("Exception creating JobExplorer", x); + } + return factoryBean.getObject(); + } + @Override public void configurePathMatch(PathMatchConfigurer configurer) { diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java index f889d77abe..f201631aa5 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java @@ -19,14 +19,11 @@ import java.time.LocalDateTime; import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Test; -import org.junit.jupiter.api.Disabled; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; @@ -44,12 +41,10 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.http.MediaType; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder; @@ -73,8 +68,7 @@ */ //TODO: Boot3x followup -@Disabled("TODO: Boot3 followup after boot3/boot2 task changes are complete") -@RunWith(SpringRunner.class) +//@Disabled("TODO: Boot3 followup after boot3/boot2 task changes are complete") @SpringBootTest(classes = {JobDependencies.class, PropertyPlaceholderAutoConfiguration.class, BatchProperties.class}) @EnableConfigurationProperties({CommonApplicationProperties.class}) @@ -86,7 +80,7 @@ public class JobExecutionControllerTests { TaskExecutionDaoContainer daoContainer; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskBatchDaoContainer taskBatchDaoContainer; @@ -105,10 +99,10 @@ public class JobExecutionControllerTests { @Autowired TaskDefinitionReader taskDefinitionReader; - @Before + @BeforeEach public void setupMockMVC() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { this.mockMvc = JobExecutionUtils.createBaseJobExecutionMockMvc( - jobRepositoryContainer, + jobRepository, taskBatchDaoContainer, daoContainer, aggregateExecutionSupport, @@ -175,7 +169,6 @@ public void testStopStartedJobExecutionTwice() throws Exception { .andDo(print()) .andExpect(status().isOk()); SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(JobExecutionUtils.JOB_NAME_STARTED, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); final JobExecution jobExecution = jobRepository.getLastJobExecution(JobExecutionUtils.JOB_NAME_STARTED, new JobParameters()); assertThat(jobExecution).isNotNull(); @@ -193,7 +186,6 @@ public void testStopStoppedJobExecution() throws Exception { .andDo(print()) .andExpect(status().isUnprocessableEntity()); SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(JobExecutionUtils.JOB_NAME_STOPPED, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); final JobExecution jobExecution = jobRepository.getLastJobExecution(JobExecutionUtils.JOB_NAME_STOPPED, new JobParameters()); assertThat(jobExecution).isNotNull(); @@ -350,7 +342,6 @@ public void testWildcardMatchSingleResult() throws Exception { } private void createDirtyJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { - JobRepository jobRepository = jobRepositoryContainer.get(SchemaVersionTarget.defaultTarget().getName()); JobExecution jobExecution = jobRepository.createJobExecution( JobExecutionUtils.BASE_JOB_NAME + "_NO_TASK", new JobParameters()); jobExecution.setStatus(BatchStatus.STOPPED); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinControllerTests.java index bbd88e0ec3..ed9cd1d9a1 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinControllerTests.java @@ -27,6 +27,7 @@ import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.batch.BatchProperties; @@ -40,7 +41,6 @@ import org.springframework.cloud.dataflow.rest.job.support.TimeUtils; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.http.MediaType; @@ -77,7 +77,7 @@ public class JobExecutionThinControllerTests { private TaskExecutionDaoContainer daoContainer; @Autowired - private JobRepositoryContainer jobRepositoryContainer; + private JobRepository jobRepository; @Autowired private TaskBatchDaoContainer taskBatchDaoContainer; @@ -98,7 +98,7 @@ public class JobExecutionThinControllerTests { @Before public void setupMockMVC() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { this.mockMvc = JobExecutionUtils.createBaseJobExecutionMockMvc( - jobRepositoryContainer, + jobRepository, taskBatchDaoContainer, daoContainer, aggregateExecutionSupport, diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionUtils.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionUtils.java index 1d24fae268..05b7618e7e 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionUtils.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionUtils.java @@ -16,17 +16,16 @@ package org.springframework.cloud.dataflow.server.controller; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; @@ -40,7 +39,6 @@ import org.springframework.cloud.dataflow.rest.support.jackson.ISO8601DateFormatWithMilliSeconds; import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.task.batch.listener.TaskBatchDao; @@ -87,7 +85,7 @@ class JobExecutionUtils static MockMvc createBaseJobExecutionMockMvc( - JobRepositoryContainer jobRepositoryContainer, + JobRepository jobRepository, TaskBatchDaoContainer taskBatchDaoContainer, TaskExecutionDaoContainer taskExecutionDaoContainer, AggregateExecutionSupport aggregateExecutionSupport, @@ -97,21 +95,21 @@ static MockMvc createBaseJobExecutionMockMvc( throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(wac) .defaultRequest(get("/").accept(MediaType.APPLICATION_JSON)).build(); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_ORIG, 1, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FOO, 1, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport,JOB_NAME_FOOBAR, 2, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_COMPLETED, 1, BatchStatus.COMPLETED, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_STARTED, 1, BatchStatus.STARTED, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_STOPPED, 1, BatchStatus.STOPPED, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FAILED1, 1, BatchStatus.FAILED, taskDefinitionReader); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FAILED2, 1, BatchStatus.FAILED, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_ORIG, 1, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FOO, 1, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport,JOB_NAME_FOOBAR, 2, BatchStatus.COMPLETED,taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_COMPLETED, 1, BatchStatus.COMPLETED, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_STARTED, 1, BatchStatus.STARTED, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_STOPPED, 1, BatchStatus.STOPPED, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FAILED1, 1, BatchStatus.FAILED, taskDefinitionReader); + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_FAILED2, 1, BatchStatus.FAILED, taskDefinitionReader); Map> jobParameterMap = new HashMap<>(); - String dateInString = "7-Jun-2023"; - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MMM-yyyy"); - LocalDateTime date = LocalDateTime.parse(dateInString, formatter); + String dateInString = "07-Jun-2023"; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MMM-yyyy", Locale.US); + LocalDateTime date = LocalDate.parse(dateInString, formatter).atStartOfDay(); jobParameterMap.put("javaUtilDate", new JobParameter( date, LocalDateTime.class,false)); - JobExecutionUtils.createSampleJob(jobRepositoryContainer, taskBatchDaoContainer, taskExecutionDaoContainer, + JobExecutionUtils.createSampleJob(jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, JOB_NAME_ORIG_WITH_PARAM, 1, BatchStatus.UNKNOWN, taskDefinitionReader, new JobParameters(jobParameterMap)); @@ -126,7 +124,7 @@ static MockMvc createBaseJobExecutionMockMvc( } private static void createSampleJob( - JobRepositoryContainer jobRepositoryContainer, + JobRepository jobRepository, TaskBatchDaoContainer taskBatchDaoContainer, TaskExecutionDaoContainer taskExecutionDaoContainer, AggregateExecutionSupport aggregateExecutionSupport, @@ -135,7 +133,7 @@ private static void createSampleJob( TaskDefinitionReader taskDefinitionReader ) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { createSampleJob( - jobRepositoryContainer, + jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, @@ -148,7 +146,7 @@ private static void createSampleJob( } private static void createSampleJob( - JobRepositoryContainer jobRepositoryContainer, + JobRepository jobRepository, TaskBatchDaoContainer taskBatchDaoContainer, TaskExecutionDaoContainer taskExecutionDaoContainer, AggregateExecutionSupport aggregateExecutionSupport, @@ -158,7 +156,7 @@ private static void createSampleJob( TaskDefinitionReader taskDefinitionReader ) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { createSampleJob( - jobRepositoryContainer, + jobRepository, taskBatchDaoContainer, taskExecutionDaoContainer, aggregateExecutionSupport, @@ -171,7 +169,7 @@ private static void createSampleJob( } private static void createSampleJob( - JobRepositoryContainer jobRepositoryContainer, + JobRepository jobRepository, TaskBatchDaoContainer taskBatchDaoContainer, TaskExecutionDaoContainer taskExecutionDaoContainer, AggregateExecutionSupport aggregateExecutionSupport, @@ -182,7 +180,6 @@ private static void createSampleJob( JobParameters jobParameters ) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(jobName, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); TaskExecutionDao taskExecutionDao = taskExecutionDaoContainer.get(schemaVersionTarget.getName()); TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, LocalDateTime.now(), new ArrayList<>(), null); JobExecution jobExecution; diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobInstanceControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobInstanceControllerTests.java index aee69c299e..aa9b78c3a0 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobInstanceControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobInstanceControllerTests.java @@ -25,7 +25,6 @@ import org.junit.runner.RunWith; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; @@ -44,7 +43,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.task.batch.listener.TaskBatchDao; @@ -94,7 +92,7 @@ public class JobInstanceControllerTests { TaskExecutionDaoContainer daoContainer; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskBatchDaoContainer taskBatchDaoContainer; @@ -170,7 +168,6 @@ public void testGetInstanceByNameNotFound() throws Exception { private void createSampleJob(String jobName, int jobExecutionCount) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { String defaultSchemaTarget = SchemaVersionTarget.defaultTarget().getName(); - JobRepository jobRepository = jobRepositoryContainer.get(defaultSchemaTarget); TaskExecutionDao dao = daoContainer.get(defaultSchemaTarget); TaskExecution taskExecution = dao.createTaskExecution(jobName, LocalDateTime.now(), new ArrayList(), null); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionControllerTests.java index 4df8791dc3..df09b68d70 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionControllerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import org.junit.runner.RunWith; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; @@ -47,7 +46,6 @@ import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.dataflow.server.service.TaskJobService; @@ -76,8 +74,6 @@ * @author Glenn Renfro * @author Corneil du Plessis */ - -//TODO: Boot3x followup @Disabled("TODO: Boot3 followup after boot3/boot2 task changes are complete") @RunWith(SpringRunner.class) @SpringBootTest(classes = { JobDependencies.class, @@ -109,7 +105,7 @@ public class JobStepExecutionControllerTests { TaskExecutionDaoContainer daoContainer; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskBatchDaoContainer taskBatchDaoContainer; @@ -202,7 +198,6 @@ public void testSingleGetStepExecutionProgress() throws Exception { private void createStepExecution(String jobName, String... stepNames) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(jobName, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); JobExecution jobExecution = jobRepository.createJobExecution(jobName, new JobParameters()); for (String stepName : stepNames) { StepExecution stepExecution = new StepExecution(stepName, jobExecution, 1L); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java index 34ef65f710..7e2a6de9d7 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java @@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; @@ -65,7 +64,6 @@ import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; import org.springframework.cloud.dataflow.server.job.LauncherRepository; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository; @@ -142,7 +140,7 @@ public class TaskExecutionControllerTests { private TaskExecutionDaoContainer daoContainer; @Autowired - private JobRepositoryContainer jobRepositoryContainer; + private JobRepository jobRepository; @Autowired private TaskDefinitionRepository taskDefinitionRepository; @@ -235,7 +233,6 @@ public void setupMockMVC() throws JobInstanceAlreadyCompleteException, JobExecut TaskExecution taskExecution = dao.createTaskExecution(TASK_NAME_FOOBAR, LocalDateTime.now(), SAMPLE_ARGUMENT_LIST, null); SchemaVersionTarget fooBarTarget = aggregateExecutionSupport.findSchemaVersionTarget(TASK_NAME_FOOBAR, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(fooBarTarget.getName()); JobExecution jobExecution = jobRepository.createJobExecution(TASK_NAME_FOOBAR, new JobParameters()); TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(fooBarTarget.getName()); taskBatchDao.saveRelationship(taskExecution, jobExecution); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TasksInfoControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TasksInfoControllerTests.java index 6abcf9fa85..3d36f7326d 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TasksInfoControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TasksInfoControllerTests.java @@ -28,7 +28,6 @@ import org.junit.runner.RunWith; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; @@ -51,7 +50,6 @@ import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; import org.springframework.cloud.dataflow.server.job.LauncherRepository; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository; @@ -106,7 +104,7 @@ public class TasksInfoControllerTests { TaskExecutionDaoContainer daoContainer; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskDefinitionRepository taskDefinitionRepository; @@ -178,7 +176,6 @@ public void setupMockMVC() throws JobInstanceAlreadyCompleteException, JobExecut dao.createTaskExecution(TASK_NAME_FOO, LocalDateTime.now(), SAMPLE_ARGUMENT_LIST, null); TaskExecution taskExecution = dao.createTaskExecution(TASK_NAME_FOOBAR, LocalDateTime.now(), SAMPLE_ARGUMENT_LIST, null); - JobRepository jobRepository = jobRepositoryContainer.get(target.getName()); JobExecution jobExecution = jobRepository.createJobExecution(TASK_NAME_FOOBAR, new JobParameters()); TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(target.getName()); taskBatchDao.saveRelationship(taskExecution, jobExecution); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDaoRowNumberOptimizationTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDaoRowNumberOptimizationTests.java index 5d11c111d0..afa179fefa 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDaoRowNumberOptimizationTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDaoRowNumberOptimizationTests.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.cloud.dataflow.server.batch.JobService; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; @@ -27,7 +28,6 @@ import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.cloud.dataflow.schema.service.SchemaService; -import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.mock.env.MockEnvironment; import static org.assertj.core.api.Assertions.assertThat; @@ -59,7 +59,7 @@ static void startContainer() { @Test void shouldUseOptimizationWhenPropertyNotSpecified() throws Exception { MockEnvironment mockEnv = new MockEnvironment(); - JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv); + JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobService.class), mockEnv); assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", true); } @@ -67,7 +67,7 @@ void shouldUseOptimizationWhenPropertyNotSpecified() throws Exception { void shouldUseOptimizationWhenPropertyEnabled() throws Exception { MockEnvironment mockEnv = new MockEnvironment(); mockEnv.setProperty("spring.cloud.dataflow.task.jdbc.row-number-optimization.enabled", "true"); - JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv); + JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobService.class), mockEnv); assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", true); } @@ -75,7 +75,7 @@ void shouldUseOptimizationWhenPropertyEnabled() throws Exception { void shouldNotUseOptimizationWhenPropertyDisabled() throws Exception { MockEnvironment mockEnv = new MockEnvironment(); mockEnv.setProperty("spring.cloud.dataflow.task.jdbc.row-number-optimization.enabled", "false"); - JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv); + JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobService.class), mockEnv); assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", false); } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteServiceTests.java index f8cbc0d8d2..ee59d9d63f 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteServiceTests.java @@ -51,7 +51,6 @@ import org.springframework.cloud.dataflow.server.configuration.TaskServiceDependencies; import org.springframework.cloud.dataflow.server.job.LauncherRepository; import org.springframework.cloud.dataflow.server.repository.JobExecutionDaoContainer; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.service.TaskDeleteService; @@ -116,7 +115,7 @@ public abstract class DefaultTaskDeleteServiceTests { TaskExecutionService taskExecutionService; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskBatchDaoContainer taskBatchDaoContainer; @@ -224,7 +223,6 @@ JobLauncher jobLauncher(JobRepository jobRepository) { public JobLauncherTestUtils jobLauncherTestUtils() { JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils(); - JobRepository jobRepository = jobRepositoryContainer.get(SchemaVersionTarget.defaultTarget().getName()); jobLauncherTestUtils.setJobRepository(jobRepository); jobLauncherTestUtils.setJobLauncher(jobLauncher(jobRepository)); jobLauncherTestUtils.setJob(new Job() { diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java index 88e6248d76..6c38c0b2b5 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java @@ -23,7 +23,6 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,7 +61,6 @@ import org.springframework.cloud.dataflow.server.configuration.JobDependencies; import org.springframework.cloud.dataflow.server.configuration.TaskServiceDependencies; import org.springframework.cloud.dataflow.server.job.LauncherRepository; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; @@ -132,7 +130,7 @@ public class DefaultTaskJobServiceTests { DataSourceProperties dataSourceProperties; @Autowired - JobRepositoryContainer jobRepositoryContainer; + JobRepository jobRepository; @Autowired TaskBatchDaoContainer taskBatchDaoContainer; @@ -238,7 +236,6 @@ private void initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTa String definitionName = (AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) ? "some-name-boot3" : "some-name"; this.taskDefinitionRepository.save(new TaskDefinition(JOB_NAME_ORIG + jobInstanceCount, definitionName )); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(schemaVersionTarget.getName()); TaskExecutionDao taskExecutionDao = taskExecutionDaoContainer.get(schemaVersionTarget.getName()); createSampleJob( diff --git a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java index b9e5267f08..c0d2255600 100644 --- a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java +++ b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java @@ -41,7 +41,6 @@ import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport; import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; -import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer; import org.springframework.cloud.dataflow.shell.AbstractShellIntegrationTest; @@ -73,7 +72,7 @@ public class JobCommandTests extends AbstractShellIntegrationTest { private static TaskExecutionDaoContainer daoContainer; - private static JobRepositoryContainer jobRepositoryContainer; + private static JobRepository jobRepository; private static TaskBatchDaoContainer taskBatchDaoContainer; @@ -91,7 +90,7 @@ public static void setUp() throws Exception { taskDefinitionReader = applicationContext.getBean(TaskDefinitionReader.class); aggregateExecutionSupport = applicationContext.getBean(AggregateExecutionSupport.class); taskBatchDaoContainer = applicationContext.getBean(TaskBatchDaoContainer.class); - jobRepositoryContainer = applicationContext.getBean(JobRepositoryContainer.class); + jobRepository = applicationContext.getBean(JobRepository.class); taskBatchDaoContainer = applicationContext.getBean(TaskBatchDaoContainer.class); taskExecutionIds.add(createSampleJob(JOB_NAME_ORIG, 1)); @@ -119,7 +118,6 @@ public static void tearDown() { private static long createSampleJob(String jobName, int jobExecutionCount) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(jobName, taskDefinitionReader); - JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters()); jobInstances.add(instance); TaskExecutionDao dao = daoContainer.get(schemaVersionTarget.getName());