Skip to content

Commit

Permalink
Support stopping Boot3 jobs (#5581)
Browse files Browse the repository at this point in the history
This commit adds support to SimpleJobService for stopping Boot3 based 
Spring Batch jobs. 

- Update tests to use new DAO infrastructure for setting up the DB
- Deprecate unused methods
  • Loading branch information
cppwfs authored Dec 9, 2023
1 parent b76aeec commit 433d9a3
Show file tree
Hide file tree
Showing 16 changed files with 697 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public interface JobService {
* @throws JobInstanceAlreadyCompleteException thrown if job was already complete
* @throws JobParametersInvalidException thrown if job parameters are invalid
*/
@Deprecated
JobExecution launch(String jobName, JobParameters params) throws NoSuchJobException,
JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException;
Expand All @@ -75,6 +76,7 @@ JobExecution launch(String jobName, JobParameters params) throws NoSuchJobExcept
*
* @throws NoSuchJobException thrown if job specified does not exist
*/
@Deprecated
JobParameters getLastJobParameters(String jobName) throws NoSuchJobException;

/**
Expand All @@ -90,6 +92,7 @@ JobExecution launch(String jobName, JobParameters params) throws NoSuchJobExcept
* @throws JobInstanceAlreadyCompleteException thrown if job was already complete
* @throws JobParametersInvalidException thrown if job parameters are invalid
*/
@Deprecated
JobExecution restart(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersInvalidException;

Expand All @@ -108,6 +111,7 @@ JobExecution restart(Long jobExecutionId) throws NoSuchJobExecutionException, Jo
* @throws JobInstanceAlreadyCompleteException thrown if job was already complete
* @throws JobParametersInvalidException thrown if job parameters are invalid
*/
@Deprecated
JobExecution restart(Long jobExecutionId, JobParameters params)
throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersInvalidException;
Expand Down Expand Up @@ -138,6 +142,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params)
* @throws JobExecutionAlreadyRunningException thrown if the job is running (it should be
* stopped first)
*/
@Deprecated
JobExecution abandon(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException;

/**
Expand All @@ -149,13 +154,15 @@ JobExecution restart(Long jobExecutionId, JobParameters params)
* @param count the maximum number of job names to return
* @return a collection of job names
*/
@Deprecated
Collection<String> listJobs(int start, int count);

/**
* Count the total number of jobs that can be returned by {@link #listJobs(int, int)}.
*
* @return the total number of jobs
*/
@Deprecated
int countJobs();

/**
Expand All @@ -177,6 +184,8 @@ JobExecution restart(Long jobExecutionId, JobParameters params)
* @return a collection of {@link JobInstance job instances}
* @throws NoSuchJobException thrown if job specified does not exist
*/

@Deprecated
Collection<JobInstance> listJobInstances(String jobName, int start, int count) throws NoSuchJobException;

/**
Expand All @@ -187,6 +196,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params)
* @return the number of job instances available
* @throws NoSuchJobException thrown if job specified does not exist
*/
@Deprecated
int countJobInstances(String jobName) throws NoSuchJobException;

/**
Expand All @@ -199,6 +209,7 @@ JobExecution restart(Long jobExecutionId, JobParameters params)
* @return a collection of {@link JobExecutionWithStepCount}
* @throws NoSuchJobException thrown if job specified does not exist
*/
@Deprecated
Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(String jobName, int start, int count)
throws NoSuchJobException;

Expand All @@ -210,6 +221,7 @@ Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Strin
* @return the number of executions
* @throws NoSuchJobException thrown if job specified does not exist
*/
@Deprecated
int countJobExecutionsForJob(String jobName, BatchStatus status) throws NoSuchJobException;

/**
Expand All @@ -222,6 +234,7 @@ Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Strin
* @return all the job executions
* @throws NoSuchJobException thrown if job specified does not exist
*/
@Deprecated
Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long jobInstanceId)
throws NoSuchJobException;

Expand All @@ -233,6 +246,7 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
* @param count the maximum number of executions
* @return a collection of {@link JobExecution}
*/
@Deprecated
Collection<JobExecution> listJobExecutions(int start, int count);

/**
Expand All @@ -243,6 +257,7 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
* @param count the maximum number of executions
* @return a collection of {@link JobExecutionWithStepCount}
*/
@Deprecated
Collection<JobExecutionWithStepCount> listJobExecutionsWithStepCount(int start, int count);

/**
Expand All @@ -251,6 +266,7 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
*
* @return the number of job executions in the job repository
*/
@Deprecated
int countJobExecutions();

/**
Expand All @@ -271,6 +287,7 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
*
* @throws NoSuchJobExecutionException thrown if job execution specified does not exist
*/
@Deprecated
Collection<StepExecution> getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException;
Collection<StepExecution> getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException;
void addStepExecutions(JobExecution jobExecution);
Expand Down Expand Up @@ -305,6 +322,7 @@ Collection<StepExecution> listStepExecutionsForStep(String jobName, String stepN
*
* @return the number of executions.
*/
@Deprecated
int countStepExecutionsForJobExecution(long jobExecutionId);

/**
Expand All @@ -327,6 +345,7 @@ StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId) throws
*
* @return the number of executions affected
*/
@Deprecated
int stopAll();

/**
Expand All @@ -337,6 +356,7 @@ StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId) throws
* @return {@link Collection} of step names.
* @throws NoSuchJobException thrown if the job name cannot be located
*/
@Deprecated
Collection<String> getStepNamesForJob(String jobName) throws NoSuchJobException;

/**
Expand All @@ -363,6 +383,7 @@ Collection<JobExecution> listJobExecutionsForJob(String jobName, BatchStatus sta
* @param count the maximum number of executions to return
* @return a collection of {@link JobExecutionWithStepCount}
*/
@Deprecated
Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Date fromDate,
Date toDate, int start, int count);

Expand All @@ -375,6 +396,7 @@ Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Date
* @param count the maximum number of executions to return
* @return a collection of {@link JobExecutionWithStepCount}
*/
@Deprecated
Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCountFilteredByJobInstanceId(int jobInstanceId, int start, int count);

/**
Expand All @@ -386,5 +408,6 @@ Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCount(Date
* @param count the maximum number of executions to return
* @return a collection of {@link JobExecutionWithStepCount}
*/
@Deprecated
Collection<JobExecutionWithStepCount> listJobExecutionsForJobWithStepCountFilteredByTaskExecutionId(int taskExecutionId, int start, int count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.core.step.NoSuchStepException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.scheduling.annotation.Scheduled;
Expand Down Expand Up @@ -83,17 +85,24 @@ public class SimpleJobService implements JobService, DisposableBean {

private JobOperator jsrJobOperator;

private final AggregateJobQueryDao aggregateJobQueryDao;

private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;

private final SchemaVersionTarget schemaVersionTarget;

public SimpleJobService(SearchableJobInstanceDao jobInstanceDao, SearchableJobExecutionDao jobExecutionDao,
SearchableStepExecutionDao stepExecutionDao, JobRepository jobRepository,
ExecutionContextDao executionContextDao, JobOperator jsrJobOperator) {
ExecutionContextDao executionContextDao, JobOperator jsrJobOperator, AggregateJobQueryDao aggregateJobQueryDao,
SchemaVersionTarget schemaVersionTarget) {
super();
this.jobInstanceDao = jobInstanceDao;
this.jobExecutionDao = jobExecutionDao;
this.stepExecutionDao = stepExecutionDao;
this.jobRepository = jobRepository;
this.executionContextDao = executionContextDao;
this.aggregateJobQueryDao = aggregateJobQueryDao;
this.schemaVersionTarget = schemaVersionTarget;

if (jsrJobOperator == null) {
logger.warn("No JobOperator compatible with JSR-352 was provided.");
Expand Down Expand Up @@ -336,17 +345,13 @@ public int countJobInstances(String name) {

@Override
public JobExecution getJobExecution(Long jobExecutionId) throws NoSuchJobExecutionException {
JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
if (jobExecution == null) {
throw new NoSuchJobExecutionException("There is no JobExecution with id=" + jobExecutionId);
}
jobExecution.setJobInstance(Objects.requireNonNull(jobInstanceDao.getJobInstance(jobExecution)));
JobExecution jobExecution = this.aggregateJobQueryDao.getJobExecution(jobExecutionId, this.schemaVersionTarget.getName()).getJobExecution();
jobExecution.setJobInstance(Objects.requireNonNull(this.jobInstanceDao.getJobInstance(jobExecution)));
try {
jobExecution.setExecutionContext(executionContextDao.getExecutionContext(jobExecution));
jobExecution.setExecutionContext(this.executionContextDao.getExecutionContext(jobExecution));
} catch (Exception e) {
logger.info("Cannot load execution context for job execution: " + jobExecution);
this.logger.info("Cannot load execution context for job execution: " + jobExecution);
}
stepExecutionDao.addStepExecutions(jobExecution);
return jobExecution;
}

Expand Down Expand Up @@ -412,10 +417,7 @@ public int countStepExecutionsForJobExecution(long jobExecutionId) {

@Override
public JobInstance getJobInstance(long jobInstanceId) throws NoSuchJobInstanceException {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
if (jobInstance == null) {
throw new NoSuchJobInstanceException("JobInstance with id=" + jobInstanceId + " does not exist");
}
JobInstance jobInstance = this.aggregateJobQueryDao.getJobInstance(jobInstanceId, this.schemaVersionTarget.getName());
return jobInstance;
}

Expand Down Expand Up @@ -545,5 +547,4 @@ public void removeInactiveExecutions() {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao;
import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao;
import org.springframework.cloud.dataflow.server.service.JobServiceContainer;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
Expand Down Expand Up @@ -83,10 +88,24 @@ public class SimpleJobServiceFactoryBean implements FactoryBean<JobService>, Ini

private PlatformTransactionManager transactionManager;

private JobServiceContainer jobServiceContainer;

private SchemaService schemaService;

private SchemaVersionTarget schemaVersionTarget;

public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

/**
* Set the schemaVersionTarget to be used by the created SimpleJobService.
* @param schemaVersionTarget the schemaVersionTarget to be associated with this service.
*/
public void setAppBootSchemaVersionTarget(SchemaVersionTarget schemaVersionTarget) {
this.schemaVersionTarget = schemaVersionTarget;
}

/**
* A special handler for large objects. The default is usually fine, except for some
* (usually older) versions of Oracle. The default is determined from the data base type.
Expand Down Expand Up @@ -137,6 +156,22 @@ public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}

/**
* Sets the {@link JobServiceContainer} for the service.
* @param jobServiceContainer the JobServiceContainer for this service.
*/
public void setJobServiceContainer(JobServiceContainer jobServiceContainer) {
this.jobServiceContainer = jobServiceContainer;
}

/**
* Sets the {@link SchemaService} for this factory bean.
* @param schemaService the schemaService for this factory bean.
*/
public void setSchemaService(SchemaService schemaService) {
this.schemaService = schemaService;
}

/**
* A factory for incrementers (used to build primary keys for meta data). Defaults to
* {@link DefaultDataFieldMaxValueIncrementerFactory}.
Expand Down Expand Up @@ -266,7 +301,10 @@ private int determineClobTypeToUse(String databaseType) {
return Types.CLOB;
}
}

protected AggregateJobQueryDao createAggregateJobQueryDao() throws Exception {
AggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobServiceContainer);
return dao;
}
/**
* Create a {@link SimpleJobService} from the configuration provided.
*
Expand All @@ -280,7 +318,7 @@ public JobService getObject() throws Exception {
transactionManager);
jsrJobOperator.afterPropertiesSet();
return new SimpleJobService(createJobInstanceDao(), createJobExecutionDao(), createStepExecutionDao(),
jobRepository, createExecutionContextDao(), jsrJobOperator);
jobRepository, createExecutionContextDao(), jsrJobOperator, createAggregateJobQueryDao(), schemaVersionTarget);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -54,4 +56,7 @@ public interface AggregateJobQueryDao {
JobInstanceExecutions getJobInstanceExecution(String jobName, long instanceId);

JobInstanceExecutions getJobInstanceExecutions(long id, String schemaTarget);

JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException;

}
Loading

0 comments on commit 433d9a3

Please sign in to comment.