diff --git a/src/main/java/org/ohdsi/webapi/JobConfig.java b/src/main/java/org/ohdsi/webapi/JobConfig.java index c974d1eca..7731d5ed5 100644 --- a/src/main/java/org/ohdsi/webapi/JobConfig.java +++ b/src/main/java/org/ohdsi/webapi/JobConfig.java @@ -2,24 +2,17 @@ import javax.sql.DataSource; -import jakarta.annotation.PostConstruct; import org.apache.commons.lang3.StringUtils; import org.ohdsi.webapi.audittrail.listeners.AuditTrailJobListener; import org.ohdsi.webapi.common.generation.AutoremoveJobListener; import org.ohdsi.webapi.common.generation.CancelJobListener; import org.ohdsi.webapi.job.JobTemplate; import org.ohdsi.webapi.shiro.management.Security; -import org.ohdsi.webapi.util.ManagedThreadPoolTaskExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.admin.service.JdbcSearchableJobExecutionDao; -import org.springframework.batch.admin.service.JdbcSearchableJobInstanceDao; -import org.springframework.batch.admin.service.SearchableJobExecutionDao; -import org.springframework.batch.admin.service.SearchableJobInstanceDao; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; -import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.batch.core.job.builder.JobBuilder; @@ -27,31 +20,24 @@ import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; -import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Primary; import org.springframework.core.task.TaskExecutor; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.transaction.PlatformTransactionManager; @Configuration @EnableBatchProcessing -//@DependsOn({"batchDatabaseInitializer"}) -public class JobConfig extends DefaultBatchConfiguration { +public class JobConfig { private static final Logger log = LoggerFactory.getLogger(JobConfig.class); @Value("${spring.batch.repository.tableprefix}") private String tablePrefix; - @Value("${spring.batch.repository.isolationLevelForCreate}") - private String isolationLevelForCreate; - @Value("${spring.batch.taskExecutor.corePoolSize}") private Integer corePoolSize; @@ -61,128 +47,74 @@ public class JobConfig extends DefaultBatchConfiguration { @Value("${spring.batch.taskExecutor.queueCapacity}") private Integer queueCapacity; - @Value("${spring.batch.taskExecutor.threadGroupName}") - private String threadGroupName; - @Value("${spring.batch.taskExecutor.threadNamePrefix}") private String threadNamePrefix; @Autowired - private DataSource dataSource; + private DataSource primaryDataSource; @Autowired private AuditTrailJobListener auditTrailJobListener; @Bean - String batchTablePrefix() { - return this.tablePrefix; - } - - @Bean - TaskExecutor taskExecutor() { - final ThreadPoolTaskExecutor taskExecutor = new ManagedThreadPoolTaskExecutor(); + public TaskExecutor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(corePoolSize); taskExecutor.setMaxPoolSize(maxPoolSize); taskExecutor.setQueueCapacity(queueCapacity); - if (StringUtils.isNotBlank(threadGroupName)) { - taskExecutor.setThreadGroupName(threadGroupName); - } - if (StringUtils.isNotBlank(threadNamePrefix)) { - taskExecutor.setThreadNamePrefix(threadNamePrefix); - } - taskExecutor.afterPropertiesSet(); + taskExecutor.setThreadNamePrefix(threadNamePrefix); + taskExecutor.initialize(); return taskExecutor; } @Bean public PlatformTransactionManager transactionManager() { - return new ResourcelessTransactionManager(); + return new DataSourceTransactionManager(primaryDataSource); } @Bean public JobRepository jobRepository() { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); - factory.setDataSource(dataSource); - factory.setIsolationLevelForCreate(isolationLevelForCreate); + factory.setDataSource(primaryDataSource); + factory.setTransactionManager(transactionManager()); factory.setTablePrefix(tablePrefix); - factory.setTransactionManager(new ResourcelessTransactionManager()); factory.setValidateTransactionState(false); - try { - factory.afterPropertiesSet(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - try { - return factory.getObject(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - } - @Bean - public JobLauncher jobLauncher() { - SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); - jobLauncher.setJobRepository(jobRepository()); - jobLauncher.setTaskExecutor(taskExecutor()); try { - jobLauncher.afterPropertiesSet(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return jobLauncher; + factory.afterPropertiesSet(); + return factory.getObject(); + } catch (Exception e) { + throw new IllegalStateException("Could not initialize JobRepository", e); + } } @Bean - public JobExplorer jobExplorer() { - JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean(); - jobExplorerFactoryBean.setDataSource(dataSource); - jobExplorerFactoryBean.setTablePrefix(tablePrefix); - jobExplorerFactoryBean.setTransactionManager(getTransactionManager()); - try { - jobExplorerFactoryBean.afterPropertiesSet(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - try { - return jobExplorerFactoryBean.getObject(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; + public JobExplorer jobExplorer() throws Exception { + JobExplorerFactoryBean factory = new JobExplorerFactoryBean(); + factory.setDataSource(primaryDataSource); + factory.setTransactionManager(transactionManager()); + factory.setTablePrefix(tablePrefix); + factory.afterPropertiesSet(); + return factory.getObject(); } @Bean - JobTemplate jobTemplate(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilders, - final StepBuilderFactory stepBuilders, final Security security) { + public JobTemplate jobTemplate(JobLauncher jobLauncher, JobBuilderFactory jobBuilders, + StepBuilderFactory stepBuilders, Security security) { return new JobTemplate(jobLauncher, jobBuilders, stepBuilders, security); } - + @Bean - SearchableJobExecutionDao searchableJobExecutionDao(DataSource dataSource) { - JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(); - dao.setDataSource(dataSource); - dao.setTablePrefix(this.tablePrefix); - return dao; - } - - @Bean - SearchableJobInstanceDao searchableJobInstanceDao(JdbcTemplate jdbcTemplate) { - JdbcSearchableJobInstanceDao dao = new JdbcSearchableJobInstanceDao(); - dao.setJdbcTemplate(jdbcTemplate); - dao.setTablePrefix(this.tablePrefix); - return dao; + public JobLauncher jobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) throws Exception { + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + jobLauncher.setJobRepository(jobRepository); + jobLauncher.setTaskExecutor(taskExecutor); + jobLauncher.afterPropertiesSet(); + return jobLauncher; } - - - @Primary + @Bean - JobBuilderFactory jobBuilders(JobRepository jobRepository) { + public JobBuilderFactory jobBuilders(JobRepository jobRepository) { return new JobBuilderFactory(jobRepository) { @Override public JobBuilder get(String name) { @@ -194,7 +126,7 @@ public JobBuilder get(String name) { } @Bean - StepBuilderFactory stepBuilders(JobRepository jobRepository) { + public StepBuilderFactory stepBuilders(JobRepository jobRepository) { return new StepBuilderFactory(jobRepository); } -} \ No newline at end of file +} diff --git a/src/main/java/org/ohdsi/webapi/JobInvalidator.java b/src/main/java/org/ohdsi/webapi/JobInvalidator.java index 7acd34a7e..ede0e6d26 100644 --- a/src/main/java/org/ohdsi/webapi/JobInvalidator.java +++ b/src/main/java/org/ohdsi/webapi/JobInvalidator.java @@ -4,18 +4,18 @@ import org.ohdsi.webapi.executionengine.entity.ExecutionEngineAnalysisStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.admin.service.SearchableJobExecutionDao; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Calendar; +import java.util.Set; @Component @DependsOn("flyway") @@ -27,38 +27,60 @@ public class JobInvalidator { private final JobRepository jobRepository; private final TransactionTemplate transactionTemplateRequiresNew; - private final SearchableJobExecutionDao jobExecutionDao; + private final JobExplorer jobExplorer; - public JobInvalidator(JobRepository repository, TransactionTemplate transactionTemplateRequiresNew, - SearchableJobExecutionDao jobExecutionDao) { - this.jobRepository = repository; + public JobInvalidator(JobRepository jobRepository, + TransactionTemplate transactionTemplateRequiresNew, + JobExplorer jobExplorer) { + this.jobRepository = jobRepository; this.transactionTemplateRequiresNew = transactionTemplateRequiresNew; - this.jobExecutionDao = jobExecutionDao; + this.jobExplorer = jobExplorer; } + /** + * Invalidates all running job executions during initialization. + */ @PostConstruct private void invalidateGenerations() { - transactionTemplateRequiresNew.execute(s -> { - jobExecutionDao.getRunningJobExecutions().forEach(this::invalidationJobExecution); + transactionTemplateRequiresNew.execute(status -> { + // Retrieve all running job executions for all job names + jobExplorer.getJobNames().forEach(jobName -> { + Set runningJobs = jobExplorer.findRunningJobExecutions(jobName); + runningJobs.forEach(this::invalidateJobExecution); + }); return null; }); } + /** + * Invalidates a specific job execution based on an analysis status. + * + * @param executionEngineAnalysisStatus The status containing the job execution ID. + */ + @Transactional public void invalidateJobExecutionById(ExecutionEngineAnalysisStatus executionEngineAnalysisStatus) { - JobExecution job = jobExecutionDao.getJobExecution(executionEngineAnalysisStatus.getExecutionEngineGeneration().getId()); - if (job == null || job.getJobId() == null) { - log.error("Cannot invalidate job. There is no job for execution-engine-analysis-status with id = {}", executionEngineAnalysisStatus.getId()); + Long executionId = executionEngineAnalysisStatus.getExecutionEngineGeneration().getId(); + JobExecution jobExecution = jobExplorer.getJobExecution(executionId); + + if (jobExecution == null || jobExecution.getJobId() == null) { + log.error("Cannot invalidate job. No job found for execution-engine-analysis-status with id = {}", + executionEngineAnalysisStatus.getId()); return; } - invalidationJobExecution(job); - + invalidateJobExecution(jobExecution); } - public void invalidationJobExecution(JobExecution job) { - job.setStatus(BatchStatus.FAILED); - job.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), INVALIDATED_BY_SYSTEM_EXIT_MESSAGE)); - job.setEndTime(LocalDateTime.ofInstant(Calendar.getInstance().getTime().toInstant(), ZoneId.systemDefault())); - jobRepository.update(job); + /** + * Marks a job execution as failed and updates its status in the repository. + * + * @param jobExecution The job execution to invalidate. + */ + public void invalidateJobExecution(JobExecution jobExecution) { + jobExecution.setStatus(BatchStatus.FAILED); + jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), INVALIDATED_BY_SYSTEM_EXIT_MESSAGE)); + jobExecution.setEndTime(LocalDateTime.now()); + jobRepository.update(jobExecution); + log.info("Job execution with ID {} has been invalidated.", jobExecution.getId()); } -} +} \ No newline at end of file diff --git a/src/main/java/org/ohdsi/webapi/cohortcharacterization/CcServiceImpl.java b/src/main/java/org/ohdsi/webapi/cohortcharacterization/CcServiceImpl.java index 4a269f34a..2e4a4b0f3 100644 --- a/src/main/java/org/ohdsi/webapi/cohortcharacterization/CcServiceImpl.java +++ b/src/main/java/org/ohdsi/webapi/cohortcharacterization/CcServiceImpl.java @@ -1228,8 +1228,8 @@ private void invalidateGenerations() { getTransactionTemplateRequiresNew().execute(transactionStatus -> { List generations = findAllIncompleteGenerations(); generations.forEach(gen -> { - JobExecution job = jobService.getJobExecution(gen.getId()); - jobInvalidator.invalidationJobExecution(job); + JobExecution job = jobService.findJobExecution(gen.getId()); + jobInvalidator.invalidateJobExecution(job); }); return null; }); diff --git a/src/main/java/org/ohdsi/webapi/job/JobTemplate.java b/src/main/java/org/ohdsi/webapi/job/JobTemplate.java index 7919a3087..c478d31df 100644 --- a/src/main/java/org/ohdsi/webapi/job/JobTemplate.java +++ b/src/main/java/org/ohdsi/webapi/job/JobTemplate.java @@ -10,6 +10,11 @@ import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Response; @@ -21,9 +26,8 @@ import static org.ohdsi.webapi.Constants.WARM_CACHE; import static org.ohdsi.webapi.util.SecurityUtils.whitelist; -/** - * - */ +import javax.sql.DataSource; + public class JobTemplate { private static final Logger log = LoggerFactory.getLogger(JobTemplate.class); @@ -33,59 +37,61 @@ public class JobTemplate { private final StepBuilderFactory stepBuilders; private final Security security; - public JobTemplate(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilders, - final StepBuilderFactory stepBuilders, final Security security) { + @Autowired + private PlatformTransactionManager transactionManager; + + public JobTemplate(JobLauncher jobLauncher, JobBuilderFactory jobBuilders, + StepBuilderFactory stepBuilders, Security security) { this.jobLauncher = jobLauncher; this.jobBuilders = jobBuilders; this.stepBuilders = stepBuilders; this.security = security; } - public JobExecutionResource launch(final Job job, JobParameters jobParameters) throws WebApplicationException { - JobExecution exec; - try { - JobParametersBuilder builder = new JobParametersBuilder(jobParameters); - builder.addLong(JOB_START_TIME, System.currentTimeMillis()); - if (jobParameters.getString(JOB_AUTHOR) == null) { - builder.addString(JOB_AUTHOR, security.getSubject()); - } - jobParameters = builder.toJobParameters(); - exec = this.jobLauncher.run(job, jobParameters); - if (log.isDebugEnabled()) { - log.debug("JobExecution queued: {}", exec); - } - } catch (final JobExecutionAlreadyRunningException e) { - throw new WebApplicationException(e, Response.status(Status.CONFLICT).entity(whitelist(e)).build()); - } catch (final Exception e) { - throw new WebApplicationException(e, Response.status(Status.INTERNAL_SERVER_ERROR).entity(whitelist(e)).build()); - } - return JobUtils.toJobExecutionResource(exec); + public JobExecutionResource launch(Job job, JobParameters jobParameters) throws WebApplicationException { + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + return transactionTemplate.execute(status -> { + JobExecution exec; + try { + JobParametersBuilder builder = new JobParametersBuilder(jobParameters); + builder.addLong(JOB_START_TIME, System.currentTimeMillis()); + if (jobParameters.getString(JOB_AUTHOR) == null) { + builder.addString(JOB_AUTHOR, security.getSubject()); + } + final JobParameters jobParams = builder.toJobParameters(); + exec = this.jobLauncher.run(job, jobParams); + if (log.isDebugEnabled()) { + log.debug("JobExecution queued: {}", exec); + } + } catch (final JobExecutionAlreadyRunningException e) { + throw new WebApplicationException(e, Response.status(Status.CONFLICT).entity(whitelist(e)).build()); + } catch (final Exception e) { + throw new WebApplicationException(e, Response.status(Status.INTERNAL_SERVER_ERROR).entity(whitelist(e)).build()); + } + return JobUtils.toJobExecutionResource(exec); + }); } - public JobExecutionResource launchTasklet(final String jobName, final String stepName, final Tasklet tasklet, - JobParameters jobParameters) throws WebApplicationException { - JobExecution exec; + public JobExecutionResource launchTasklet(String jobName, String stepName, Tasklet tasklet, + JobParameters jobParameters) { try { - //TODO Consider JobParametersIncrementer jobParameters = new JobParametersBuilder(jobParameters) - .addLong(JOB_START_TIME, System.currentTimeMillis()) - .addString(JOB_AUTHOR, getAuthorForTasklet(jobName)) + .addLong("jobStartTime", System.currentTimeMillis()) + .addString("jobAuthor", getAuthorForTasklet(jobName)) .toJobParameters(); - //TODO Consider our own check (since adding unique JobParameter) to see if related-job is running and throw "already running" - final Step step = this.stepBuilders.get(stepName).tasklet(tasklet).allowStartIfComplete(true).build(); - final Job job = this.jobBuilders.get(jobName).start(step).build(); - exec = this.jobLauncher.run(job, jobParameters); - } catch (final JobExecutionAlreadyRunningException e) { - throw new WebApplicationException(Response.status(Status.CONFLICT).entity(whitelist(e.getMessage())).build()); - } catch (final JobInstanceAlreadyCompleteException e) { - throw new WebApplicationException(Response.status(Status.CONFLICT).entity(whitelist(e.getMessage())).build()); - } catch (final Exception e) { - throw new WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR).entity(whitelist(e.getMessage())).build()); + Step step = this.stepBuilders.get(stepName).tasklet(tasklet).build(); + Job job = this.jobBuilders.get(jobName).start(step).build(); + JobExecution execution = this.jobLauncher.run(job, jobParameters); + return JobUtils.toJobExecutionResource(execution); + } catch (Exception e) { + throw new WebApplicationException(e, Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build()); } - return JobUtils.toJobExecutionResource(exec); } - private String getAuthorForTasklet(final String jobName) { - return WARM_CACHE.equals(jobName) ? SYSTEM_USER : security.getSubject(); + private String getAuthorForTasklet(String jobName) { + return "warmCache".equals(jobName) ? "systemUser" : security.getSubject(); } } diff --git a/src/main/java/org/ohdsi/webapi/job/JobUtils.java b/src/main/java/org/ohdsi/webapi/job/JobUtils.java index 7a3e109ab..916d72b81 100644 --- a/src/main/java/org/ohdsi/webapi/job/JobUtils.java +++ b/src/main/java/org/ohdsi/webapi/job/JobUtils.java @@ -32,8 +32,14 @@ public static JobExecutionResource toJobExecutionResource(final JobExecution job final JobExecutionResource execution = new JobExecutionResource( toJobInstanceResource(jobExecution.getJobInstance()), jobExecution.getId()); execution.setStatus(jobExecution.getStatus().name()); - execution.setStartDate(Timestamp.valueOf(jobExecution.getStartTime())); - execution.setEndDate(Timestamp.valueOf(jobExecution.getEndTime())); + + if(jobExecution.getStartTime() != null) { + execution.setStartDate(Timestamp.valueOf(jobExecution.getStartTime())); + } + if(jobExecution.getEndTime() != null) { + execution.setEndDate(Timestamp.valueOf(jobExecution.getEndTime())); + } + execution.setExitStatus(jobExecution.getExitStatus().getExitCode()); JobParameters jobParams = jobExecution.getJobParameters(); if (jobParams != null) { diff --git a/src/main/java/org/ohdsi/webapi/service/JobService.java b/src/main/java/org/ohdsi/webapi/service/JobService.java index 90eff1fae..930334ab0 100644 --- a/src/main/java/org/ohdsi/webapi/service/JobService.java +++ b/src/main/java/org/ohdsi/webapi/service/JobService.java @@ -5,14 +5,7 @@ import org.ohdsi.webapi.job.JobInstanceResource; import org.ohdsi.webapi.job.JobTemplate; import org.ohdsi.webapi.job.JobUtils; -import org.ohdsi.webapi.util.PreparedStatementRenderer; -import org.springframework.batch.admin.service.SearchableJobExecutionDao; -import org.springframework.batch.core.BatchStatus; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.JobParameters; -import org.springframework.batch.core.Step; +import org.springframework.batch.core.*; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.repository.JobRepository; @@ -21,12 +14,14 @@ import org.springframework.batch.core.step.tasklet.StoppableTasklet; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.Transactional; import jakarta.ws.rs.DefaultValue; @@ -36,241 +31,180 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; -import net.bytebuddy.description.annotation.AnnotationValue.Sort; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.function.Predicate; -/** - * REST Services related to working with the Spring Batch jobs - * - * @summary Jobs - */ @Path("/job/") @Component -public class JobService extends AbstractDaoService { +public class JobService { - private final JobExplorer jobExplorer; + private final JobExplorer jobExplorer; + private final JobRepository jobRepository; + private final JobTemplate jobTemplate; - private final SearchableJobExecutionDao jobExecutionDao; + private final Map jobMap = new HashMap<>(); - private final JobRepository jobRepository; - - private final JobTemplate jobTemplate; - - private Map jobMap = new HashMap<>(); - - public JobService(JobExplorer jobExplorer, SearchableJobExecutionDao jobExecutionDao, JobRepository jobRepository, JobTemplate jobTemplate) { - - this.jobExplorer = jobExplorer; - this.jobExecutionDao = jobExecutionDao; - this.jobRepository = jobRepository; - this.jobTemplate = jobTemplate; - } + public JobService(JobExplorer jobExplorer, JobRepository jobRepository, JobTemplate jobTemplate) { + this.jobExplorer = jobExplorer; + this.jobRepository = jobRepository; + this.jobTemplate = jobTemplate; + } - /** - * Get the job information by job ID - * - * @summary Get job by ID - * @param jobId The job ID - * @return The job information - */ - @GET - @Path("{jobId}") - @Produces(MediaType.APPLICATION_JSON) - public JobInstanceResource findJob(@PathParam("jobId") final Long jobId) { - final JobInstance job = this.jobExplorer.getJobInstance(jobId); - if (job == null) { - return null;//TODO #8 conventions under review + @GET + @Path("{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public JobInstanceResource findJob(@PathParam("jobId") final Long jobId) { + JobInstance job = jobExplorer.getJobInstance(jobId); + return job == null ? null : JobUtils.toJobInstanceResource(job); } - return JobUtils.toJobInstanceResource(job); - } - /** - * Get the job execution information by job type and name - * - * @summary Get job by name and type - * @param jobName The job name - * @param jobType The job type - * @return JobExecutionResource - */ @GET @Path("/type/{jobType}/name/{jobName}") @Produces(MediaType.APPLICATION_JSON) - public JobExecutionResource findJobByName(@PathParam("jobName") final String jobName, @PathParam("jobType") final String jobType) { - final Optional jobExecution = jobExplorer.findRunningJobExecutions(jobType).stream() - .filter(job -> jobName.equals(job.getJobParameters().getString(Constants.Params.JOB_NAME))) - .findFirst(); - return jobExecution.isPresent() ? JobUtils.toJobExecutionResource(jobExecution.get()) : null; + public JobExecutionResource findJobByName(@PathParam("jobName") String jobName, @PathParam("jobType") String jobType) { + Optional jobExecution = jobExplorer.findRunningJobExecutions(jobType).stream() + .filter(job -> jobName.equals(job.getJobParameters().getString(Constants.Params.JOB_NAME))) + .findFirst(); + return jobExecution.map(JobUtils::toJobExecutionResource).orElse(null); } - /** - * Get the job execution information by execution ID and job ID - * - * @summary Get job by job ID and execution ID - * @param jobId The job ID - * @param executionId The execution ID - * @return JobExecutionResource - */ - @GET - @Path("{jobId}/execution/{executionId}") - @Produces(MediaType.APPLICATION_JSON) - public JobExecutionResource findJobExecution(@PathParam("jobId") final Long jobId, - @PathParam("executionId") final Long executionId) { - return service(jobId, executionId); - } - - /** - * Find job execution by execution ID - * - * @summary Get job by execution ID - * @param executionId The job execution ID - * @return JobExecutionResource - */ - @GET - @Path("/execution/{executionId}") - @Produces(MediaType.APPLICATION_JSON) - public JobExecutionResource findJobExecution(@PathParam("executionId") final Long executionId) { - return service(null, executionId); - } - - private JobExecutionResource service(final Long jobId, final Long executionId) { - final JobExecution exec = this.jobExplorer.getJobExecution(executionId); - if ((exec == null) || ((jobId != null) && !jobId.equals(exec.getJobId()))) { - return null;//TODO #8 conventions under review + @GET + @Path("{jobId}/execution/{executionId}") + @Produces(MediaType.APPLICATION_JSON) + public JobExecutionResource findJobExecution(@PathParam("jobId") Long jobId, @PathParam("executionId") Long executionId) { + return service(jobId, executionId); } - return JobUtils.toJobExecutionResource(exec); - } - /** - * Get job names (unique names). Note: this path (GET /job) should really - * return pages of job instances. This could be implemented should the need - * arise. See {@link JobService#list(String, Integer, Integer)} to obtain - * executions and filter by job name. - * - * @summary Get list of jobs - * @return A list of jobs - */ - @GET - @Produces(MediaType.APPLICATION_JSON) - public List findJobNames() { - return this.jobExplorer.getJobNames(); - } - - /** - * Variation of spring-batch-admin support: - * org.springframework.batch.admin.web.BatchJobExecutionsController. - *

- * Return a paged collection of job executions. Filter for a given job. - * Returned in pages. - * - * @summary Get job executions with filters - * @param jobName name of the job - * @param pageIndex start index for the job execution list - * @param pageSize page size for the list - * @param comprehensivePage boolean if true returns a comprehensive resultset - * as a page (i.e. pageRequest(0,resultset.size())) - * @return collection of JobExecutionInfo - * @throws NoSuchJobException - */ - @GET - @Path("/execution") - @Produces(MediaType.APPLICATION_JSON) - public Page list(@QueryParam("jobName") final String jobName, - @DefaultValue("0") @QueryParam("pageIndex") final Integer pageIndex, - @DefaultValue("20") @QueryParam("pageSize") final Integer pageSize, - @QueryParam("comprehensivePage") boolean comprehensivePage) - throws NoSuchJobException { + @GET + @Path("/execution/{executionId}") + @Produces(MediaType.APPLICATION_JSON) + public JobExecutionResource findJobExecutionResource(@PathParam("executionId") Long executionId) { + return service(null, executionId); + } - List resources = null; + public JobExecution findJobExecution(@PathParam("executionId") Long executionId) { + JobExecution exec = jobExplorer.getJobExecution(executionId); + return exec; + } - if (comprehensivePage) { - String sqlPath = "/resources/job/sql/jobExecutions.sql"; - String tqName = "ohdsi_schema"; - String tqValue = getOhdsiSchema(); - PreparedStatementRenderer psr = new PreparedStatementRenderer(null, sqlPath, tqName, tqValue); - resources = getJdbcTemplate().query(psr.getSql(), psr.getSetter(), new ResultSetExtractor>() { - @Override - public List extractData(ResultSet rs) throws SQLException, DataAccessException { - return JobUtils.toJobExecutionResource(rs); + private JobExecutionResource service(Long jobId, Long executionId) { + JobExecution exec = jobExplorer.getJobExecution(executionId); + if (exec == null || (jobId != null && !jobId.equals(exec.getJobId()))) { + return null; } - }); - return new PageImpl<>(resources, PageRequest.of(0, pageSize), resources.size()); - } else { - resources = new ArrayList<>(); - for (final JobExecution jobExecution : (jobName == null ? this.jobExecutionDao.getJobExecutions(pageIndex, - pageSize) : this.jobExecutionDao.getJobExecutions(jobName, pageIndex, pageSize))) { - resources.add(JobUtils.toJobExecutionResource(jobExecution)); - } - return new PageImpl<>(resources, PageRequest.of(pageIndex, pageSize), - this.jobExecutionDao.countJobExecutions()); + return JobUtils.toJobExecutionResource(exec); } - } - - public void stopJob(JobExecution jobExecution, Job job) { + @GET + @Produces(MediaType.APPLICATION_JSON) + public List findJobNames() { + return jobExplorer.getJobNames(); + } - if (Objects.nonNull(job)) { - jobExecution.getStepExecutions().stream() - .filter(step -> step.getStatus().isRunning()) - .forEach(stepExec -> { - Step step = ((StepLocator) job).getStep(stepExec.getStepName()); - if (step instanceof TaskletStep taskletStep) { - Tasklet tasklet = taskletStep.getTasklet(); - if (tasklet instanceof StoppableTasklet stoppableTasklet) { - StepSynchronizationManager.register(stepExec); - stoppableTasklet.stop(); - StepSynchronizationManager.release(); - } + @GET + @Path("/execution") + @Produces(MediaType.APPLICATION_JSON) + public Page list( + @QueryParam("jobName") String jobName, + @DefaultValue("0") @QueryParam("pageIndex") Integer pageIndex, + @DefaultValue("20") @QueryParam("pageSize") Integer pageSize + ) throws NoSuchJobException { + List resources = new ArrayList<>(); + int offset = pageIndex * pageSize; + + if (jobName == null) { + // Get all job names and fetch job instances and executions + List jobNames = jobExplorer.getJobNames(); + for (String name : jobNames) { + List jobInstances = jobExplorer.findJobInstancesByJobName(name, 0, Integer.MAX_VALUE); + for (JobInstance instance : jobInstances) { + resources.addAll(fetchJobExecutionResources(instance)); + } + } + } else { + // Fetch job instances and executions for the given job name + List jobInstances = jobExplorer.findJobInstancesByJobName(jobName, offset, pageSize); + for (JobInstance instance : jobInstances) { + resources.addAll(fetchJobExecutionResources(instance)); } - }); - } - if (jobExecution.getEndTime() == null) { - jobExecution.setStatus(BatchStatus.STOPPING); - jobRepository.update(jobExecution); - } - } + } - public JobExecution getJobExecution(Long jobExecutionId) { + // Create a paginated result + int totalSize = resources.size(); + int endIndex = Math.min(offset + pageSize, totalSize); + List paginatedResources = resources.subList(offset, endIndex); + return new PageImpl<>(paginatedResources, PageRequest.of(pageIndex, pageSize), totalSize); + } - return jobExplorer.getJobExecution(jobExecutionId); - } + /** + * Fetches job execution resources for a given job instance. + */ + private List fetchJobExecutionResources(JobInstance jobInstance) { + List resources = new ArrayList<>(); + List executions = jobExplorer.getJobExecutions(jobInstance); + for (JobExecution execution : executions) { + resources.add(JobUtils.toJobExecutionResource(execution)); + } + return resources; + } - public Job getRunningJob(Long jobExecutionId) { - return jobMap.get(jobExecutionId); - } + public void stopJob(JobExecution jobExecution, Job job) { + if (Objects.nonNull(job)) { + jobExecution.getStepExecutions().stream() + .filter(step -> step.getStatus().isRunning()) + .forEach(stepExec -> { + Step step = ((StepLocator) job).getStep(stepExec.getStepName()); + if (step instanceof TaskletStep taskletStep) { + Tasklet tasklet = taskletStep.getTasklet(); + if (tasklet instanceof StoppableTasklet stoppableTasklet) { + StepSynchronizationManager.register(stepExec); + stoppableTasklet.stop(); + StepSynchronizationManager.release(); + } + } + }); + } + if (jobExecution.getEndTime() == null) { + jobExecution.setStatus(BatchStatus.STOPPING); + jobRepository.update(jobExecution); + } + } - public void removeJob(Long jobExecutionId) { + public JobExecution getJobExecution(Long jobExecutionId) { + return jobExplorer.getJobExecution(jobExecutionId); + } - jobMap.remove(jobExecutionId); - } + public Job getRunningJob(Long jobExecutionId) { + return jobMap.get(jobExecutionId); + } - public JobExecutionResource runJob(Job job, JobParameters jobParameters) { + public void removeJob(Long jobExecutionId) { + jobMap.remove(jobExecutionId); + } - JobExecutionResource jobExecution = this.jobTemplate.launch(job, jobParameters); - jobMap.put(jobExecution.getExecutionId(), job); - return jobExecution; - } + public JobExecutionResource runJob(Job job, JobParameters jobParameters) { + JobExecutionResource jobExecution = jobTemplate.launch(job, jobParameters); + jobMap.put(jobExecution.getExecutionId(), job); + return jobExecution; + } - @Transactional - public void cancelJobExecution(Predicate filterPredicate) { - jobExecutionDao.getRunningJobExecutions().stream() + @Transactional + public void cancelJobExecution(Predicate filterPredicate) { + jobExplorer.getJobNames().stream() + .flatMap(jobName -> jobExplorer.findRunningJobExecutions(jobName).stream()) .filter(filterPredicate) .findFirst() .ifPresent(jobExecution -> { - Job job = getRunningJob(jobExecution.getJobId()); - if (Objects.nonNull(job)) { - stopJob(jobExecution, job); - } + Job job = getRunningJob(jobExecution.getJobId()); + if (Objects.nonNull(job)) { + stopJob(jobExecution, job); + } }); - } + } + } diff --git a/src/main/resources/db/migration/postgresql/V1.0.0.1__schema-create_spring_batch.sql b/src/main/resources/db/migration/postgresql/V1.0.0.1__schema-create_spring_batch.sql index d7fcde724..c0a2d8383 100644 --- a/src/main/resources/db/migration/postgresql/V1.0.0.1__schema-create_spring_batch.sql +++ b/src/main/resources/db/migration/postgresql/V1.0.0.1__schema-create_spring_batch.sql @@ -1,19 +1,12 @@ -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_JOB_INSTANCE CASCADE; -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION CASCADE; -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION_PARAMS CASCADE; -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION CASCADE; -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION_CONTEXT CASCADE; -DROP TABLE IF EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION_CONTEXT CASCADE; - -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_INSTANCE ( +CREATE TABLE ${ohdsiSchema}.BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ; -commit; -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION ( + +CREATE TABLE ${ohdsiSchema}.BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, @@ -29,7 +22,7 @@ CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION ( references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ; -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION_PARAMS ( +CREATE TABLE ${ohdsiSchema}.BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , @@ -42,7 +35,7 @@ CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION_PARAMS ( references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION ( +CREATE TABLE ${ohdsiSchema}.BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, @@ -65,7 +58,7 @@ CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION ( references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION_CONTEXT ( +CREATE TABLE ${ohdsiSchema}.BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , @@ -73,7 +66,7 @@ CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_STEP_EXECUTION_CONTEXT ( references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ; -CREATE TABLE IF NOT EXISTS ${ohdsiSchema}.BATCH_JOB_EXECUTION_CONTEXT ( +CREATE TABLE ${ohdsiSchema}.BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT ,