Skip to content

Commit

Permalink
Completed refactoring of jobs to Spring Batch 5, reverted changes to …
Browse files Browse the repository at this point in the history
…migration script
  • Loading branch information
spinsysmpeterson committed Dec 18, 2024
1 parent f0de900 commit 9bbf929
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 390 deletions.
140 changes: 36 additions & 104 deletions src/main/java/org/ohdsi/webapi/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,42 @@

import javax.sql.DataSource;

import jakarta.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.ohdsi.webapi.audittrail.listeners.AuditTrailJobListener;
import org.ohdsi.webapi.common.generation.AutoremoveJobListener;
import org.ohdsi.webapi.common.generation.CancelJobListener;
import org.ohdsi.webapi.job.JobTemplate;
import org.ohdsi.webapi.shiro.management.Security;
import org.ohdsi.webapi.util.ManagedThreadPoolTaskExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.admin.service.JdbcSearchableJobExecutionDao;
import org.springframework.batch.admin.service.JdbcSearchableJobInstanceDao;
import org.springframework.batch.admin.service.SearchableJobExecutionDao;
import org.springframework.batch.admin.service.SearchableJobInstanceDao;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@EnableBatchProcessing
//@DependsOn({"batchDatabaseInitializer"})
public class JobConfig extends DefaultBatchConfiguration {
public class JobConfig {

private static final Logger log = LoggerFactory.getLogger(JobConfig.class);

@Value("${spring.batch.repository.tableprefix}")
private String tablePrefix;

@Value("${spring.batch.repository.isolationLevelForCreate}")
private String isolationLevelForCreate;

@Value("${spring.batch.taskExecutor.corePoolSize}")
private Integer corePoolSize;

Expand All @@ -61,128 +47,74 @@ public class JobConfig extends DefaultBatchConfiguration {
@Value("${spring.batch.taskExecutor.queueCapacity}")
private Integer queueCapacity;

@Value("${spring.batch.taskExecutor.threadGroupName}")
private String threadGroupName;

@Value("${spring.batch.taskExecutor.threadNamePrefix}")
private String threadNamePrefix;

@Autowired
private DataSource dataSource;
private DataSource primaryDataSource;

@Autowired
private AuditTrailJobListener auditTrailJobListener;

@Bean
String batchTablePrefix() {
return this.tablePrefix;
}

@Bean
TaskExecutor taskExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ManagedThreadPoolTaskExecutor();
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueCapacity);
if (StringUtils.isNotBlank(threadGroupName)) {
taskExecutor.setThreadGroupName(threadGroupName);
}
if (StringUtils.isNotBlank(threadNamePrefix)) {
taskExecutor.setThreadNamePrefix(threadNamePrefix);
}
taskExecutor.afterPropertiesSet();
taskExecutor.setThreadNamePrefix(threadNamePrefix);
taskExecutor.initialize();
return taskExecutor;
}

@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
return new DataSourceTransactionManager(primaryDataSource);
}

@Bean
public JobRepository jobRepository() {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setIsolationLevelForCreate(isolationLevelForCreate);
factory.setDataSource(primaryDataSource);
factory.setTransactionManager(transactionManager());
factory.setTablePrefix(tablePrefix);
factory.setTransactionManager(new ResourcelessTransactionManager());
factory.setValidateTransactionState(false);
try {
factory.afterPropertiesSet();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
return factory.getObject();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}

@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.setTaskExecutor(taskExecutor());
try {
jobLauncher.afterPropertiesSet();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return jobLauncher;
factory.afterPropertiesSet();
return factory.getObject();
} catch (Exception e) {
throw new IllegalStateException("Could not initialize JobRepository", e);
}
}

@Bean
public JobExplorer jobExplorer() {
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(dataSource);
jobExplorerFactoryBean.setTablePrefix(tablePrefix);
jobExplorerFactoryBean.setTransactionManager(getTransactionManager());
try {
jobExplorerFactoryBean.afterPropertiesSet();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
return jobExplorerFactoryBean.getObject();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factory = new JobExplorerFactoryBean();
factory.setDataSource(primaryDataSource);
factory.setTransactionManager(transactionManager());
factory.setTablePrefix(tablePrefix);
factory.afterPropertiesSet();
return factory.getObject();
}

@Bean
JobTemplate jobTemplate(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilders,
final StepBuilderFactory stepBuilders, final Security security) {
public JobTemplate jobTemplate(JobLauncher jobLauncher, JobBuilderFactory jobBuilders,
StepBuilderFactory stepBuilders, Security security) {
return new JobTemplate(jobLauncher, jobBuilders, stepBuilders, security);
}

@Bean
SearchableJobExecutionDao searchableJobExecutionDao(DataSource dataSource) {
JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao();
dao.setDataSource(dataSource);
dao.setTablePrefix(this.tablePrefix);
return dao;
}

@Bean
SearchableJobInstanceDao searchableJobInstanceDao(JdbcTemplate jdbcTemplate) {
JdbcSearchableJobInstanceDao dao = new JdbcSearchableJobInstanceDao();
dao.setJdbcTemplate(jdbcTemplate);
dao.setTablePrefix(this.tablePrefix);
return dao;
public JobLauncher jobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}


@Primary

@Bean
JobBuilderFactory jobBuilders(JobRepository jobRepository) {
public JobBuilderFactory jobBuilders(JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository) {
@Override
public JobBuilder get(String name) {
Expand All @@ -194,7 +126,7 @@ public JobBuilder get(String name) {
}

@Bean
StepBuilderFactory stepBuilders(JobRepository jobRepository) {
public StepBuilderFactory stepBuilders(JobRepository jobRepository) {
return new StepBuilderFactory(jobRepository);
}
}
}
64 changes: 43 additions & 21 deletions src/main/java/org/ohdsi/webapi/JobInvalidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import org.ohdsi.webapi.executionengine.entity.ExecutionEngineAnalysisStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.admin.service.SearchableJobExecutionDao;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Set;

@Component
@DependsOn("flyway")
Expand All @@ -27,38 +27,60 @@ public class JobInvalidator {

private final JobRepository jobRepository;
private final TransactionTemplate transactionTemplateRequiresNew;
private final SearchableJobExecutionDao jobExecutionDao;
private final JobExplorer jobExplorer;

public JobInvalidator(JobRepository repository, TransactionTemplate transactionTemplateRequiresNew,
SearchableJobExecutionDao jobExecutionDao) {
this.jobRepository = repository;
public JobInvalidator(JobRepository jobRepository,
TransactionTemplate transactionTemplateRequiresNew,
JobExplorer jobExplorer) {
this.jobRepository = jobRepository;
this.transactionTemplateRequiresNew = transactionTemplateRequiresNew;
this.jobExecutionDao = jobExecutionDao;
this.jobExplorer = jobExplorer;
}

/**
* Invalidates all running job executions during initialization.
*/
@PostConstruct
private void invalidateGenerations() {
transactionTemplateRequiresNew.execute(s -> {
jobExecutionDao.getRunningJobExecutions().forEach(this::invalidationJobExecution);
transactionTemplateRequiresNew.execute(status -> {
// Retrieve all running job executions for all job names
jobExplorer.getJobNames().forEach(jobName -> {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(jobName);
runningJobs.forEach(this::invalidateJobExecution);
});
return null;
});
}

/**
* Invalidates a specific job execution based on an analysis status.
*
* @param executionEngineAnalysisStatus The status containing the job execution ID.
*/
@Transactional
public void invalidateJobExecutionById(ExecutionEngineAnalysisStatus executionEngineAnalysisStatus) {
JobExecution job = jobExecutionDao.getJobExecution(executionEngineAnalysisStatus.getExecutionEngineGeneration().getId());
if (job == null || job.getJobId() == null) {
log.error("Cannot invalidate job. There is no job for execution-engine-analysis-status with id = {}", executionEngineAnalysisStatus.getId());
Long executionId = executionEngineAnalysisStatus.getExecutionEngineGeneration().getId();
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);

if (jobExecution == null || jobExecution.getJobId() == null) {
log.error("Cannot invalidate job. No job found for execution-engine-analysis-status with id = {}",
executionEngineAnalysisStatus.getId());
return;
}

invalidationJobExecution(job);

invalidateJobExecution(jobExecution);
}

public void invalidationJobExecution(JobExecution job) {
job.setStatus(BatchStatus.FAILED);
job.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), INVALIDATED_BY_SYSTEM_EXIT_MESSAGE));
job.setEndTime(LocalDateTime.ofInstant(Calendar.getInstance().getTime().toInstant(), ZoneId.systemDefault()));
jobRepository.update(job);
/**
* Marks a job execution as failed and updates its status in the repository.
*
* @param jobExecution The job execution to invalidate.
*/
public void invalidateJobExecution(JobExecution jobExecution) {
jobExecution.setStatus(BatchStatus.FAILED);
jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), INVALIDATED_BY_SYSTEM_EXIT_MESSAGE));
jobExecution.setEndTime(LocalDateTime.now());
jobRepository.update(jobExecution);
log.info("Job execution with ID {} has been invalidated.", jobExecution.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,8 @@ private void invalidateGenerations() {
getTransactionTemplateRequiresNew().execute(transactionStatus -> {
List<CcGenerationEntity> generations = findAllIncompleteGenerations();
generations.forEach(gen -> {
JobExecution job = jobService.getJobExecution(gen.getId());
jobInvalidator.invalidationJobExecution(job);
JobExecution job = jobService.findJobExecution(gen.getId());
jobInvalidator.invalidateJobExecution(job);
});
return null;
});
Expand Down
Loading

0 comments on commit 9bbf929

Please sign in to comment.