From 433d9a3a7d86d3155cb2010b41a754f8d6dd5f3c Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Sat, 9 Dec 2023 00:02:42 -0500 Subject: [PATCH] Support stopping Boot3 jobs (#5581) This commit adds support to SimpleJobService for stopping Boot3 based Spring Batch jobs. - Update tests to use new DAO infrastructure for setting up the DB - Deprecate unused methods --- .../dataflow/server/batch/JobService.java | 23 ++ .../server/batch/SimpleJobService.java | 29 +- .../batch/SimpleJobServiceFactoryBean.java | 42 ++- .../repository/AggregateJobQueryDao.java | 5 + .../repository/JdbcAggregateJobQueryDao.java | 28 ++ .../server/service/JobServiceContainer.java | 3 + .../server/batch/AbstractDaoTests.java | 8 + ...AbstractJdbcAggregateJobQueryDaoTests.java | 93 ++++++ ...actJdbcJobSearchableExecutionDaoTests.java | 10 +- .../batch/AbstractSimpleJobServiceTests.java | 281 ++++++++++++++++++ .../JdbcAggregateJobQueryMariadbDaoTests.java | 38 +++ ...JdbcAggregateJobQueryPostgresDaoTests.java | 38 +++ ...JobSearchableExecutionMariadbDaoTests.java | 2 +- .../batch/SimpleJobServiceMariadbTests.java | 60 ++++ .../batch/SimpleJobServicePostgresTests.java | 60 ++++ .../schemas/drop-table-schema-mariadb.sql | 1 + 16 files changed, 697 insertions(+), 24 deletions(-) create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryMariadbDaoTests.java create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryPostgresDaoTests.java create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceMariadbTests.java create mode 100644 spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServicePostgresTests.java diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java index 32048cad06..3063892c31 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java @@ -63,6 +63,7 @@ public interface JobService { * @throws JobInstanceAlreadyCompleteException thrown if job was already complete * @throws JobParametersInvalidException thrown if job parameters are invalid */ + @Deprecated JobExecution launch(String jobName, JobParameters params) throws NoSuchJobException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; @@ -75,6 +76,7 @@ JobExecution launch(String jobName, JobParameters params) throws NoSuchJobExcept * * @throws NoSuchJobException thrown if job specified does not exist */ + @Deprecated JobParameters getLastJobParameters(String jobName) throws NoSuchJobException; /** @@ -90,6 +92,7 @@ JobExecution launch(String jobName, JobParameters params) throws NoSuchJobExcept * @throws JobInstanceAlreadyCompleteException thrown if job was already complete * @throws JobParametersInvalidException thrown if job parameters are invalid */ + @Deprecated JobExecution restart(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersInvalidException; @@ -108,6 +111,7 @@ JobExecution restart(Long jobExecutionId) throws NoSuchJobExecutionException, Jo * @throws JobInstanceAlreadyCompleteException thrown if job was already complete * @throws JobParametersInvalidException thrown if job parameters are invalid */ + @Deprecated JobExecution restart(Long jobExecutionId, JobParameters params) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersInvalidException; @@ -138,6 +142,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * @throws JobExecutionAlreadyRunningException thrown if the job is running (it should be * stopped first) */ + @Deprecated JobExecution abandon(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException; /** @@ -149,6 +154,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * @param count the maximum number of job names to return * @return a collection of job names */ + @Deprecated Collection listJobs(int start, int count); /** @@ -156,6 +162,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * * @return the total number of jobs */ + @Deprecated int countJobs(); /** @@ -177,6 +184,8 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * @return a collection of {@link JobInstance job instances} * @throws NoSuchJobException thrown if job specified does not exist */ + + @Deprecated Collection listJobInstances(String jobName, int start, int count) throws NoSuchJobException; /** @@ -187,6 +196,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * @return the number of job instances available * @throws NoSuchJobException thrown if job specified does not exist */ + @Deprecated int countJobInstances(String jobName) throws NoSuchJobException; /** @@ -199,6 +209,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params) * @return a collection of {@link JobExecutionWithStepCount} * @throws NoSuchJobException thrown if job specified does not exist */ + @Deprecated Collection listJobExecutionsForJobWithStepCount(String jobName, int start, int count) throws NoSuchJobException; @@ -210,6 +221,7 @@ Collection listJobExecutionsForJobWithStepCount(Strin * @return the number of executions * @throws NoSuchJobException thrown if job specified does not exist */ + @Deprecated int countJobExecutionsForJob(String jobName, BatchStatus status) throws NoSuchJobException; /** @@ -222,6 +234,7 @@ Collection listJobExecutionsForJobWithStepCount(Strin * @return all the job executions * @throws NoSuchJobException thrown if job specified does not exist */ + @Deprecated Collection getJobExecutionsForJobInstance(String jobName, Long jobInstanceId) throws NoSuchJobException; @@ -233,6 +246,7 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * @param count the maximum number of executions * @return a collection of {@link JobExecution} */ + @Deprecated Collection listJobExecutions(int start, int count); /** @@ -243,6 +257,7 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * @param count the maximum number of executions * @return a collection of {@link JobExecutionWithStepCount} */ + @Deprecated Collection listJobExecutionsWithStepCount(int start, int count); /** @@ -251,6 +266,7 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * * @return the number of job executions in the job repository */ + @Deprecated int countJobExecutions(); /** @@ -271,6 +287,7 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * * @throws NoSuchJobExecutionException thrown if job execution specified does not exist */ + @Deprecated Collection getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException; Collection getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException; void addStepExecutions(JobExecution jobExecution); @@ -305,6 +322,7 @@ Collection listStepExecutionsForStep(String jobName, String stepN * * @return the number of executions. */ + @Deprecated int countStepExecutionsForJobExecution(long jobExecutionId); /** @@ -327,6 +345,7 @@ StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId) throws * * @return the number of executions affected */ + @Deprecated int stopAll(); /** @@ -337,6 +356,7 @@ StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId) throws * @return {@link Collection} of step names. * @throws NoSuchJobException thrown if the job name cannot be located */ + @Deprecated Collection getStepNamesForJob(String jobName) throws NoSuchJobException; /** @@ -363,6 +383,7 @@ Collection listJobExecutionsForJob(String jobName, BatchStatus sta * @param count the maximum number of executions to return * @return a collection of {@link JobExecutionWithStepCount} */ + @Deprecated Collection listJobExecutionsForJobWithStepCount(Date fromDate, Date toDate, int start, int count); @@ -375,6 +396,7 @@ Collection listJobExecutionsForJobWithStepCount(Date * @param count the maximum number of executions to return * @return a collection of {@link JobExecutionWithStepCount} */ + @Deprecated Collection listJobExecutionsForJobWithStepCountFilteredByJobInstanceId(int jobInstanceId, int start, int count); /** @@ -386,5 +408,6 @@ Collection listJobExecutionsForJobWithStepCount(Date * @param count the maximum number of executions to return * @return a collection of {@link JobExecutionWithStepCount} */ + @Deprecated Collection listJobExecutionsForJobWithStepCountFilteredByTaskExecutionId(int taskExecutionId, int start, int count); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java index ee974ad11c..9f36cff283 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java @@ -46,6 +46,8 @@ import org.springframework.batch.core.repository.dao.ExecutionContextDao; import org.springframework.batch.core.step.NoSuchStepException; import org.springframework.beans.factory.DisposableBean; +import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; +import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.scheduling.annotation.Scheduled; @@ -83,17 +85,24 @@ public class SimpleJobService implements JobService, DisposableBean { private JobOperator jsrJobOperator; + private final AggregateJobQueryDao aggregateJobQueryDao; + private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; + private final SchemaVersionTarget schemaVersionTarget; + public SimpleJobService(SearchableJobInstanceDao jobInstanceDao, SearchableJobExecutionDao jobExecutionDao, SearchableStepExecutionDao stepExecutionDao, JobRepository jobRepository, - ExecutionContextDao executionContextDao, JobOperator jsrJobOperator) { + ExecutionContextDao executionContextDao, JobOperator jsrJobOperator, AggregateJobQueryDao aggregateJobQueryDao, + SchemaVersionTarget schemaVersionTarget) { super(); this.jobInstanceDao = jobInstanceDao; this.jobExecutionDao = jobExecutionDao; this.stepExecutionDao = stepExecutionDao; this.jobRepository = jobRepository; this.executionContextDao = executionContextDao; + this.aggregateJobQueryDao = aggregateJobQueryDao; + this.schemaVersionTarget = schemaVersionTarget; if (jsrJobOperator == null) { logger.warn("No JobOperator compatible with JSR-352 was provided."); @@ -336,17 +345,13 @@ public int countJobInstances(String name) { @Override public JobExecution getJobExecution(Long jobExecutionId) throws NoSuchJobExecutionException { - JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId); - if (jobExecution == null) { - throw new NoSuchJobExecutionException("There is no JobExecution with id=" + jobExecutionId); - } - jobExecution.setJobInstance(Objects.requireNonNull(jobInstanceDao.getJobInstance(jobExecution))); + JobExecution jobExecution = this.aggregateJobQueryDao.getJobExecution(jobExecutionId, this.schemaVersionTarget.getName()).getJobExecution(); + jobExecution.setJobInstance(Objects.requireNonNull(this.jobInstanceDao.getJobInstance(jobExecution))); try { - jobExecution.setExecutionContext(executionContextDao.getExecutionContext(jobExecution)); + jobExecution.setExecutionContext(this.executionContextDao.getExecutionContext(jobExecution)); } catch (Exception e) { - logger.info("Cannot load execution context for job execution: " + jobExecution); + this.logger.info("Cannot load execution context for job execution: " + jobExecution); } - stepExecutionDao.addStepExecutions(jobExecution); return jobExecution; } @@ -412,10 +417,7 @@ public int countStepExecutionsForJobExecution(long jobExecutionId) { @Override public JobInstance getJobInstance(long jobInstanceId) throws NoSuchJobInstanceException { - JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - if (jobInstance == null) { - throw new NoSuchJobInstanceException("JobInstance with id=" + jobInstanceId + " does not exist"); - } + JobInstance jobInstance = this.aggregateJobQueryDao.getJobInstance(jobInstanceId, this.schemaVersionTarget.getName()); return jobInstance; } @@ -545,5 +547,4 @@ public void removeInactiveExecutions() { } } - } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java index f502a8e974..15c3dba1df 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceFactoryBean.java @@ -40,6 +40,11 @@ import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory; +import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; +import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao; +import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao; +import org.springframework.cloud.dataflow.server.service.JobServiceContainer; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.support.lob.DefaultLobHandler; @@ -83,10 +88,24 @@ public class SimpleJobServiceFactoryBean implements FactoryBean, Ini private PlatformTransactionManager transactionManager; + private JobServiceContainer jobServiceContainer; + + private SchemaService schemaService; + + private SchemaVersionTarget schemaVersionTarget; + public void setTransactionManager(PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; } + /** + * Set the schemaVersionTarget to be used by the created SimpleJobService. + * @param schemaVersionTarget the schemaVersionTarget to be associated with this service. + */ + public void setAppBootSchemaVersionTarget(SchemaVersionTarget schemaVersionTarget) { + this.schemaVersionTarget = schemaVersionTarget; + } + /** * A special handler for large objects. The default is usually fine, except for some * (usually older) versions of Oracle. The default is determined from the data base type. @@ -137,6 +156,22 @@ public void setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; } + /** + * Sets the {@link JobServiceContainer} for the service. + * @param jobServiceContainer the JobServiceContainer for this service. + */ + public void setJobServiceContainer(JobServiceContainer jobServiceContainer) { + this.jobServiceContainer = jobServiceContainer; + } + + /** + * Sets the {@link SchemaService} for this factory bean. + * @param schemaService the schemaService for this factory bean. + */ + public void setSchemaService(SchemaService schemaService) { + this.schemaService = schemaService; + } + /** * A factory for incrementers (used to build primary keys for meta data). Defaults to * {@link DefaultDataFieldMaxValueIncrementerFactory}. @@ -266,7 +301,10 @@ private int determineClobTypeToUse(String databaseType) { return Types.CLOB; } } - + protected AggregateJobQueryDao createAggregateJobQueryDao() throws Exception { + AggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobServiceContainer); + return dao; + } /** * Create a {@link SimpleJobService} from the configuration provided. * @@ -280,7 +318,7 @@ public JobService getObject() throws Exception { transactionManager); jsrJobOperator.afterPropertiesSet(); return new SimpleJobService(createJobInstanceDao(), createJobExecutionDao(), createStepExecutionDao(), - jobRepository, createExecutionContextDao(), jsrJobOperator); + jobRepository, createExecutionContextDao(), jsrJobOperator, createAggregateJobQueryDao(), schemaVersionTarget); } /** diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java index 9b0ff62dec..c8f6e9e3e0 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java @@ -20,8 +20,10 @@ import java.util.List; import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.launch.NoSuchJobInstanceException; import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions; import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.data.domain.Page; @@ -54,4 +56,7 @@ public interface AggregateJobQueryDao { JobInstanceExecutions getJobInstanceExecution(String jobName, long instanceId); JobInstanceExecutions getJobInstanceExecutions(long id, String schemaTarget); + + JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException; + } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java index 58ce5ab97a..0a019597c8 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java @@ -41,6 +41,7 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.launch.NoSuchJobInstanceException; import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao; import org.springframework.batch.core.repository.dao.StepExecutionDao; import org.springframework.batch.item.database.Order; @@ -129,6 +130,10 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao { " join AGGREGATE_JOB_INSTANCE I ON E.JOB_INSTANCE_ID = I.JOB_INSTANCE_ID AND E.SCHEMA_TARGET = I.SCHEMA_TARGET" + " where and E.END_TIME is NULL"; + private static final String GET_JOB_INSTANCE_BY_ID = "SELECT I.JOB_INSTANCE_ID, I.VERSION, I.JOB_NAME, I.JOB_KEY" + + " FROM AGGREGATE_JOB_INSTANCE I" + + " WHERE I.JOB_INSTANCE_ID = ? AND I.SCHEMA_TARGET = ?"; + private static final String NAME_FILTER = "I.JOB_NAME LIKE ?"; private static final String DATE_RANGE_FILTER = "E.START_TIME BETWEEN ? AND ?"; @@ -284,6 +289,17 @@ public JobInstanceExecutions getJobInstanceExecutions(long jobInstanceId, String return jobInstanceExecution; } + @Override + public JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException { + List instances = jdbcTemplate.query(GET_JOB_INSTANCE_BY_ID, new JobInstanceExtractor(), id, schemaTarget); + if (ObjectUtils.isEmpty(instances)) { + throw new NoSuchJobInstanceException(String.format("JobInstance with id=%d does not exist", id)); + } else if (instances.size() > 1) { + throw new NoSuchJobInstanceException(String.format("More than one Job Instance exists for ID %d ", id)); + } + return instances.get(0); + } + @Override public Page listJobExecutions(String jobName, BatchStatus status, Pageable pageable) throws NoSuchJobExecutionException { int total = countJobExecutions(jobName, status); @@ -665,6 +681,18 @@ private List getTaskJobExecutionsByDate(Date startDate, Date e endDate ); } + private class JobInstanceExtractor implements ResultSetExtractor> { + + @Override + public List extractData(ResultSet rs) throws SQLException, + DataAccessException { + List jobInstances = new ArrayList(); + while (rs.next()) { + jobInstances.add( new JobInstance(rs.getLong("JOB_INSTANCE_ID"), rs.getString("JOB_NAME"))); + } + return jobInstances; + } + } private class JobInstanceExecutionsExtractor implements ResultSetExtractor> { final boolean readStepCount; diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java index 91e4629f01..7c76a1040a 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java @@ -33,10 +33,13 @@ public JobServiceContainer( SimpleJobServiceFactoryBean factoryBean = new SimpleJobServiceFactoryBean(); factoryBean.setDataSource(dataSource); factoryBean.setTransactionManager(platformTransactionManager); + factoryBean.setJobServiceContainer(this); factoryBean.setJobLauncher(new SimpleJobLauncher()); factoryBean.setJobExplorer(jobExplorerContainer.get(target.getName())); factoryBean.setJobRepository(jobRepositoryContainer.get(target.getName())); factoryBean.setTablePrefix(target.getBatchPrefix()); + factoryBean.setAppBootSchemaVersionTarget(target); + factoryBean.setSchemaService(schemaService); try { factoryBean.afterPropertiesSet(); container.put(target.getName(), factoryBean.getObject()); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractDaoTests.java index 2c0d26b43d..3df757c2d2 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractDaoTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractDaoTests.java @@ -26,6 +26,7 @@ import javax.sql.DataSource; +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.ext.ScriptUtils; import org.testcontainers.jdbc.JdbcDatabaseDelegate; @@ -46,10 +47,13 @@ protected JdbcTemplate getJdbcTemplate() { return this.jdbcTemplate; } + protected JdbcDatabaseContainer dbContainer; + protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaName) throws Exception { this.dataSource = createDataSourceForContainer(dbContainer); this.jdbcTemplate = new JdbcTemplate(this.dataSource); createDataFlowSchema(dbContainer, schemaName); + this.dbContainer = dbContainer; } protected DataSource createDataSourceForContainer(JdbcDatabaseContainer dbContainer) { @@ -70,6 +74,10 @@ protected void createDataFlowSchema(JdbcDatabaseContainer dbContainer, String sc }); } + protected DatabaseType determineDatabaseType(DatabaseType databaseType) { + return (databaseType != DatabaseType.MARIADB) ? databaseType: DatabaseType.MYSQL; + } + private List getResourceFiles(String path) throws IOException { List fileNames = new ArrayList<>(); try ( diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java new file mode 100644 index 0000000000..8336076ecc --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcAggregateJobQueryDaoTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.springframework.batch.core.launch.NoSuchJobInstanceException; +import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao; +import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory; +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; +import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory; +import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion; +import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService; +import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao; +import org.springframework.cloud.dataflow.server.service.JobServiceContainer; +import org.springframework.jdbc.core.JdbcTemplate; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +abstract class AbstractJdbcAggregateJobQueryDaoTests extends AbstractDaoTests { + + private static final String BASE_JOB_INST_NAME = "JOB_INST_"; + + public JdbcSearchableJobInstanceDao jdbcSearchableJobInstanceDao; + + @Mock + private JobServiceContainer jobServiceContainer; + + private JdbcAggregateJobQueryDao jdbcAggregateJobQueryDao; + + private DataFieldMaxValueIncrementerFactory incrementerFactory; + + private DatabaseType databaseType; + + protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaName, DatabaseType databaseType) throws Exception { + super.prepareForTest(dbContainer, schemaName); + this.jdbcAggregateJobQueryDao = new JdbcAggregateJobQueryDao(super.getDataSource(), new DefaultSchemaService(), + this.jobServiceContainer); + jdbcSearchableJobInstanceDao = new JdbcSearchableJobInstanceDao(); + jdbcSearchableJobInstanceDao.setJdbcTemplate(super.getJdbcTemplate()); + incrementerFactory = new MultiSchemaIncrementerFactory(super.getDataSource()); + this.databaseType = databaseType; + } + + @Test + void getJobInstancesForBoot3AndBoot2Instances() throws Exception { + assertThatThrownBy( () -> this.jdbcAggregateJobQueryDao.getJobInstance(1, "boot2")) + .isInstanceOf(NoSuchJobInstanceException.class) + .hasMessageContaining("JobInstance with id=1 does not exist"); + assertThatThrownBy( () -> this.jdbcAggregateJobQueryDao.getJobInstance(1, "boot3")) + .isInstanceOf(NoSuchJobInstanceException.class) + .hasMessageContaining("JobInstance with id=1 does not exist"); + createJobInstance("BOOT2", SchemaVersionTarget.defaultTarget()); + createJobInstance("BOOT3", SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3)); + verifyJobInstance(1, "boot2", "BOOT2"); + verifyJobInstance(1, "boot3", "BOOT3"); + } + + private void verifyJobInstance(long id, String schemaTarget, String suffix) throws Exception{ + JobInstance jobInstance = this.jdbcAggregateJobQueryDao.getJobInstance(id, schemaTarget); + assertThat(jobInstance).isNotNull(); + assertThat(jobInstance.getJobName()).isEqualTo(BASE_JOB_INST_NAME + suffix ); + } + + private JobInstance createJobInstance(String suffix, SchemaVersionTarget schemaVersionTarget) { + this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(this.databaseType.name(), + schemaVersionTarget.getBatchPrefix()+ "JOB_SEQ")); + this.jdbcSearchableJobInstanceDao.setTablePrefix(schemaVersionTarget.getBatchPrefix()); + return jdbcSearchableJobInstanceDao.createJobInstance(BASE_JOB_INST_NAME + suffix, + new JobParameters()); + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcJobSearchableExecutionDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcJobSearchableExecutionDaoTests.java index 9b75fec042..e673b50df2 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcJobSearchableExecutionDaoTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractJdbcJobSearchableExecutionDaoTests.java @@ -52,10 +52,6 @@ abstract class AbstractJdbcJobSearchableExecutionDaoTests extends AbstractDaoTes protected JdbcStepExecutionDao stepExecutionDao; - protected String determineDatabaseType(DatabaseType databaseType) { - return (databaseType != DatabaseType.MARIADB) ? databaseType.name() : DatabaseType.MYSQL.name(); - } - protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaName, DatabaseType databaseType) throws Exception { super.prepareForTest(dbContainer, schemaName); @@ -67,18 +63,18 @@ protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaNa this.jdbcSearchableJobInstanceDao.setJdbcTemplate(getJdbcTemplate()); incrementerFactory = new MultiSchemaIncrementerFactory(getDataSource()); this.jdbcSearchableJobInstanceDao.afterPropertiesSet(); - this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType), + this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType).name(), AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "JOB_SEQ")); jobExecutionDao = new JdbcJobExecutionDao(); - jobExecutionDao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType), + jobExecutionDao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType).name(), AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "JOB_EXECUTION_SEQ")); this.jobExecutionDao.setJdbcTemplate(new JdbcTemplate(getDataSource())); jobExecutionDao.afterPropertiesSet(); this.stepExecutionDao = new JdbcStepExecutionDao(); this.stepExecutionDao.setJdbcTemplate(getJdbcTemplate()); - this.stepExecutionDao.setStepExecutionIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType), + this.stepExecutionDao.setStepExecutionIncrementer(incrementerFactory.getIncrementer(determineDatabaseType(databaseType).name(), AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "STEP_EXECUTION_SEQ")); this.stepExecutionDao.afterPropertiesSet(); } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java new file mode 100644 index 0000000000..c81672d185 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java @@ -0,0 +1,281 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import javax.sql.DataSource; + +import java.sql.Timestamp; +import java.sql.Types; +import java.time.ZoneId; +import java.util.Date; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.launch.NoSuchJobInstanceException; +import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao; +import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; +import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory; +import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean; +import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion; +import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService; +import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; +import org.springframework.cloud.dataflow.server.service.JobExplorerContainer; +import org.springframework.cloud.dataflow.server.service.JobServiceContainer; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.cloud.task.repository.TaskRepository; +import org.springframework.cloud.task.repository.support.SimpleTaskRepository; +import org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.util.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + + +public abstract class AbstractSimpleJobServiceTests extends AbstractDaoTests { + + private static final String SAVE_JOB_EXECUTION = "INSERT INTO %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, " + + "JOB_INSTANCE_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + private static final String BASE_JOB_INST_NAME = "JOB_INST_"; + + + private JdbcSearchableJobExecutionDao jdbcSearchableJobExecutionDao; + + private JdbcSearchableJobInstanceDao jdbcSearchableJobInstanceDao; + + private DataFieldMaxValueIncrementerFactory incrementerFactory; + + @Autowired + private JobServiceContainer jobServiceContainer; + + private DatabaseType databaseType; + + private TaskRepository taskRepositoryBoot2; + + private TaskRepository taskRepositoryBoot3; + + protected void prepareForTest(JdbcDatabaseContainer dbContainer, String schemaName, DatabaseType databaseType) throws Exception { + this.databaseType = databaseType; + super.prepareForTest(dbContainer, schemaName); + this.jdbcSearchableJobExecutionDao = new JdbcSearchableJobExecutionDao(); + this.jdbcSearchableJobExecutionDao.setDataSource(getDataSource()); + this.jdbcSearchableJobExecutionDao.afterPropertiesSet(); + this.jdbcSearchableJobInstanceDao = new JdbcSearchableJobInstanceDao(); + this.jdbcSearchableJobInstanceDao.setJdbcTemplate(getJdbcTemplate()); + incrementerFactory = new MultiSchemaIncrementerFactory(getDataSource()); + + this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(databaseType.name(), + AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "JOB_SEQ")); + TaskExecutionDaoFactoryBean teFactory = new MultiSchemaTaskExecutionDaoFactoryBean(getDataSource(), "TASK_") ; + taskRepositoryBoot2 = new SimpleTaskRepository(teFactory); + teFactory = new MultiSchemaTaskExecutionDaoFactoryBean(getDataSource(), "BOOT3_TASK_") ; + taskRepositoryBoot3 = new SimpleTaskRepository(teFactory); + } + + @Test + void getJobInstancesThatExist() throws Exception { + createJobInstance("BOOT2", AppBootSchemaVersion.BOOT2); + createJobInstance("BOOT3", AppBootSchemaVersion.BOOT3); + verifyJobInstance(1, "boot2", "BOOT2"); + verifyJobInstance(1, "boot3", "BOOT3"); + } + + @Test + void getJobExecutionsThatExist() throws Exception { + createJobExecution("BOOT2", AppBootSchemaVersion.BOOT2); + verifyJobExecution(1, "boot2", "BOOT2"); + createJobExecution("BOOT3", AppBootSchemaVersion.BOOT3); + createJobExecution("BOOT3A", AppBootSchemaVersion.BOOT3); + verifyJobExecution(2, "boot3", "BOOT3A"); + } + + @Test + void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() { + assertThatThrownBy(() -> { + this.jobServiceContainer.get("boot2").getJobInstance(1); + }).isInstanceOf(NoSuchJobInstanceException.class) + .hasMessageContaining("JobInstance with id=1 does not exist"); + assertThatThrownBy(() -> { + this.jobServiceContainer.get("boot3").getJobInstance(1); + }).isInstanceOf(NoSuchJobInstanceException.class) + .hasMessageContaining("JobInstance with id=1 does not exist"); + } + + @Test + void stoppingJobExecutionShouldLeaveJobExecutionwithStatusOfStopping() throws Exception{ + JobExecution jobExecution = createJobExecution("BOOT3", AppBootSchemaVersion.BOOT3, true); + jobExecution = this.jobServiceContainer.get("boot3").getJobExecution(jobExecution.getId()); + assertThat(jobExecution.isRunning()).isTrue(); + assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + this.jobServiceContainer.get("boot3").stop(jobExecution.getId()); + jobExecution = this.jobServiceContainer.get("boot3").getJobExecution(jobExecution.getId()); + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); + + jobExecution = createJobExecution("BOOT2", AppBootSchemaVersion.BOOT2, true); + jobExecution = this.jobServiceContainer.get("boot2").getJobExecution(jobExecution.getId()); + assertThat(jobExecution.isRunning()).isTrue(); + assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + this.jobServiceContainer.get("boot2").stop(jobExecution.getId()); + jobExecution = this.jobServiceContainer.get("boot2").getJobExecution(jobExecution.getId()); + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); + } + + private void verifyJobInstance(long id, String schemaTarget, String suffix) throws Exception { + JobInstance jobInstance = this.jobServiceContainer.get(schemaTarget).getJobInstance(id); + assertThat(jobInstance).isNotNull(); + assertThat(jobInstance.getJobName()).isEqualTo(BASE_JOB_INST_NAME + suffix); + } + + private void verifyJobExecution(long id, String schemaTarget, String suffix) throws Exception { + JobExecution jobExecution = this.jobServiceContainer.get(schemaTarget).getJobExecution(id); + assertThat(jobExecution).isNotNull(); + assertThat(jobExecution.getId()).isEqualTo(id); + assertThat(jobExecution.getJobInstance().getJobName()).isEqualTo(BASE_JOB_INST_NAME + suffix); + } + + private JobInstance createJobInstance(String suffix, AppBootSchemaVersion appBootSchemaVersion) throws Exception { + String prefix = AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX; + + if (appBootSchemaVersion.equals(AppBootSchemaVersion.BOOT3)) { + prefix = "BOOT3_BATCH_"; + } + this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(this.databaseType.name(), + prefix + "JOB_SEQ")); + this.jdbcSearchableJobInstanceDao.setTablePrefix(prefix); + return jdbcSearchableJobInstanceDao.createJobInstance(BASE_JOB_INST_NAME + suffix, + new JobParameters()); + } + + private JobExecution createJobExecution(String suffix, AppBootSchemaVersion appBootSchemaVersion) throws Exception{ + return createJobExecution(suffix, appBootSchemaVersion, false); + } + private JobExecution createJobExecution(String suffix, AppBootSchemaVersion appBootSchemaVersion, boolean isRunning) + throws Exception { + String prefix = AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX; + + if (appBootSchemaVersion.equals(AppBootSchemaVersion.BOOT3)) { + prefix = "BOOT3_BATCH_"; + } + JobInstance jobInstance = createJobInstance(suffix, appBootSchemaVersion); + JobExecution jobExecution = new JobExecution(jobInstance, null, "foo"); + DataFieldMaxValueIncrementer jobExecutionIncrementer = incrementerFactory.getIncrementer(databaseType.name(), + prefix+ "JOB_EXECUTION_SEQ"); + jobExecution.setId(jobExecutionIncrementer.nextLongValue()); + jobExecution.setStartTime(new Date()); + if(!isRunning) { + jobExecution.setEndTime(new Date()); + } + jobExecution.setVersion(3); + Timestamp startTime = jobExecution.getStartTime() == null ? null + : Timestamp.valueOf(jobExecution.getStartTime().toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime()); + Timestamp endTime = jobExecution.getEndTime() == null ? null : Timestamp.valueOf(jobExecution.getEndTime().toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime()); + Timestamp createTime = jobExecution.getCreateTime() == null ? null + : Timestamp.valueOf(jobExecution.getCreateTime().toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime()); + Timestamp lastUpdated = jobExecution.getLastUpdated() == null ? null + : Timestamp.valueOf(jobExecution.getLastUpdated().toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime()); + Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), startTime, endTime, + jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), + jobExecution.getExitStatus().getExitDescription(), jobExecution.getVersion(), createTime, lastUpdated }; + getJdbcTemplate().update(getQuery(SAVE_JOB_EXECUTION, appBootSchemaVersion), parameters, + new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }); + createTaskExecution(appBootSchemaVersion, jobExecution); + return jobExecution; + } + + private TaskExecution createTaskExecution(AppBootSchemaVersion appBootSchemaVersion, JobExecution jobExecution) { + TaskRepository taskRepository = this.taskRepositoryBoot2; + String taskPrefix = "TASK_"; + if (appBootSchemaVersion.equals(AppBootSchemaVersion.BOOT3)) { + taskPrefix = "BOOT3_TASK_"; + taskRepository = this.taskRepositoryBoot3; + } + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setStartTime(new Date()); + taskExecution = taskRepository.createTaskExecution(taskExecution); + getJdbcTemplate().execute("INSERT INTO " + taskPrefix + "TASK_BATCH (TASK_EXECUTION_ID, JOB_EXECUTION_ID) VALUES (" + + taskExecution.getExecutionId() + ", " + jobExecution.getJobId() + ")"); + return taskExecution; + } + + private String getQuery(String base, AppBootSchemaVersion appBootSchemaVersion) { + String tablePrefix = "BATCH_"; + if(appBootSchemaVersion.equals(AppBootSchemaVersion.BOOT3)) { + tablePrefix = "BOOT3_BATCH_"; + } + return StringUtils.replace(base, "%PREFIX%", tablePrefix); + } + protected static class SimpleJobTestConfiguration { + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public PlatformTransactionManager platformTransactionManager() { + return new ResourcelessTransactionManager(); + } + + @Bean + public SchemaService schemaService() { + return new DefaultSchemaService(); + } + + @Bean + public JobRepositoryContainer jobRepositoryContainer(DataSource dataSource, PlatformTransactionManager transactionManager, + SchemaService schemaService) { + return new JobRepositoryContainer(dataSource, transactionManager, schemaService); + } + + @Bean + public JobExplorerContainer jobExplorerContainer(DataSource dataSource, SchemaService schemaService) { + return new JobExplorerContainer(dataSource, schemaService); + } + + @Bean + public JobServiceContainer jobServiceContainer(DataSource dataSource, + PlatformTransactionManager platformTransactionManager, + SchemaService schemaService, + JobRepositoryContainer jobRepositoryContainer, + JobExplorerContainer jobExplorerContainer) { + return new JobServiceContainer(dataSource, platformTransactionManager, schemaService, jobRepositoryContainer, jobExplorerContainer); + } + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryMariadbDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryMariadbDaoTests.java new file mode 100644 index 0000000000..146225d955 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryMariadbDaoTests.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; + + +@Testcontainers +public class JdbcAggregateJobQueryMariadbDaoTests extends AbstractJdbcAggregateJobQueryDaoTests{ + + @Container + private static final JdbcDatabaseContainer dbContainer = new MariaDBContainer("mariadb:10.9.3"); + + @BeforeEach + void prepareForTest() throws Exception { + super.prepareForTest(dbContainer, "mariadb", determineDatabaseType(DatabaseType.MARIADB)); + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryPostgresDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryPostgresDaoTests.java new file mode 100644 index 0000000000..111b2c88b0 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcAggregateJobQueryPostgresDaoTests.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; + + +@Testcontainers +public class JdbcAggregateJobQueryPostgresDaoTests extends AbstractJdbcAggregateJobQueryDaoTests{ + + @Container + private static final JdbcDatabaseContainer dbContainer = new PostgreSQLContainer("postgres:11.1"); + + @BeforeEach + void prepareForTest() throws Exception { + super.prepareForTest(dbContainer, "postgresql", DatabaseType.POSTGRES); + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcJobSearchableExecutionMariadbDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcJobSearchableExecutionMariadbDaoTests.java index 3fd01dfdb7..78ff7ed422 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcJobSearchableExecutionMariadbDaoTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/JdbcJobSearchableExecutionMariadbDaoTests.java @@ -32,6 +32,6 @@ class JdbcJobSearchableExecutionMariadbDaoTests extends AbstractJdbcJobSearchabl @BeforeEach void prepareForTest() throws Exception { - super.prepareForTest(dbContainer, "mariadb", DatabaseType.MARIADB); + super.prepareForTest(dbContainer, "mariadb", determineDatabaseType(DatabaseType.MARIADB)); } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceMariadbTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceMariadbTests.java new file mode 100644 index 0000000000..36adb991bd --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServiceMariadbTests.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.sql.DataSource; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = SimpleJobServiceMariadbTests.SimpleJobTestPostgresConfiguration.class) +@Testcontainers +public class SimpleJobServiceMariadbTests extends AbstractSimpleJobServiceTests{ + + @Container + private static final JdbcDatabaseContainer mariaDBContainer = new MariaDBContainer("mariadb:10.9.3"); + + @BeforeEach + void setup() throws Exception { + super.prepareForTest(mariaDBContainer, "mariadb", determineDatabaseType(DatabaseType.MARIADB)); + } + + @Configuration + public static class SimpleJobTestPostgresConfiguration extends SimpleJobTestConfiguration { + @Bean + public DataSource dataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName(mariaDBContainer.getDriverClassName()); + dataSource.setUrl(mariaDBContainer.getJdbcUrl()); + dataSource.setUsername(mariaDBContainer.getUsername()); + dataSource.setPassword(mariaDBContainer.getPassword()); + return dataSource; + } + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServicePostgresTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServicePostgresTests.java new file mode 100644 index 0000000000..29513ea562 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/SimpleJobServicePostgresTests.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.batch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.cloud.dataflow.core.database.support.DatabaseType; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.sql.DataSource; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = SimpleJobServicePostgresTests.SimpleJobTestPostgresConfiguration.class) +@Testcontainers +public class SimpleJobServicePostgresTests extends AbstractSimpleJobServiceTests{ + + @Container + private static final JdbcDatabaseContainer postgreSQLContainer = new PostgreSQLContainer("postgres:11.1"); + + @BeforeEach + void setup() throws Exception { + super.prepareForTest(postgreSQLContainer, "postgresql", determineDatabaseType(DatabaseType.POSTGRES)); + } + + @Configuration + public static class SimpleJobTestPostgresConfiguration extends SimpleJobTestConfiguration { + @Bean + public DataSource dataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName(postgreSQLContainer.getDriverClassName()); + dataSource.setUrl(postgreSQLContainer.getJdbcUrl()); + dataSource.setUsername(postgreSQLContainer.getUsername()); + dataSource.setPassword(postgreSQLContainer.getPassword()); + return dataSource; + } + } +} diff --git a/spring-cloud-dataflow-server-core/src/test/resources/schemas/drop-table-schema-mariadb.sql b/spring-cloud-dataflow-server-core/src/test/resources/schemas/drop-table-schema-mariadb.sql index 89b8d86d02..bfdd2b3fa9 100644 --- a/spring-cloud-dataflow-server-core/src/test/resources/schemas/drop-table-schema-mariadb.sql +++ b/spring-cloud-dataflow-server-core/src/test/resources/schemas/drop-table-schema-mariadb.sql @@ -65,3 +65,4 @@ DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_EXECUTION_SEQ; DROP SEQUENCE IF EXISTS BOOT3_BATCH_JOB_SEQ; DROP SEQUENCE IF EXISTS BOOT3_TASK_SEQ; DROP SEQUENCE IF EXISTS BOOT3_TASK_EXECUTION_METADATA_SEQ; +