Skip to content

Commit

Permalink
Update Code to use JobRepository bean directly
Browse files Browse the repository at this point in the history
Since we will only support BOOT 3 we do not need the JobRepositoryContainer
to retrieve BOOT3 or BOOT 2 based JobRepositories.

Update test code to set the default time for the local date time sample to
a default of midnight

Allow services to use JobServie and JobExplorer directly

Currently we use containers to allocate the JobService and JobExplorer based on boot version.

This is no longer necessary.  So this PR removes these containers

Update Project as to restore tests success percentage to original state

After the updates below tests that were passing started to fail.
Some because of Batch 5 updates, but others because of the removal
of some of the container classes.

Update code based on code review comments

Removed todo
Removed ExtendsWith statement
  • Loading branch information
cppwfs committed Feb 27, 2024
1 parent 39715e9 commit 9baf3a7
Show file tree
Hide file tree
Showing 33 changed files with 278 additions and 456 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDao;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDaoContainer;
import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
Expand Down Expand Up @@ -83,7 +82,7 @@ public class JobExecutionsDocumentation extends BaseDocumentation {

private static boolean initialized;

private JobRepositoryContainer jobRepositoryContainer;
private JobRepository jobRepository;

private TaskExecutionDaoContainer daoContainer;

Expand Down Expand Up @@ -370,7 +369,7 @@ public void jobRestart() throws Exception {
private void initialize() {
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class);
this.jobRepository = context.getBean(JobRepository.class);
this.dataflowTaskExecutionMetadataDaoContainer = context.getBean(DataflowTaskExecutionMetadataDaoContainer.class);
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class);
Expand All @@ -383,13 +382,12 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null);
Map<String, JobParameter<?>> jobParameterMap = new HashMap<>();
JobParameters jobParameters = new JobParameters(jobParameterMap);
JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName());
JobExecution jobExecution = jobRepository.createJobExecution(name, jobParameters);
JobExecution jobExecution = this.jobRepository.createJobExecution(name, jobParameters);
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(LocalDateTime.now());
jobRepository.update(jobExecution);
this.jobRepository.update(jobExecution);
final TaskManifest manifest = new TaskManifest();
manifest.setPlatformName("default");
DataflowTaskExecutionMetadataDao metadataDao = dataflowTaskExecutionMetadataDaoContainer.get(schemaVersionTarget.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
Expand Down Expand Up @@ -71,7 +70,7 @@ public class JobInstancesDocumentation extends BaseDocumentation {
private final static String JOB_NAME = "DOCJOB";

private static boolean initialized;
private JobRepositoryContainer jobRepositoryContainer;
private JobRepository jobRepository;
private TaskExecutionDaoContainer daoContainer;
private TaskBatchDaoContainer taskBatchDaoContainer;
private AggregateExecutionSupport aggregateExecutionSupport;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void jobDisplayDetail() throws Exception {
private void initialize() {
this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class);
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class);
this.jobRepository = context.getBean(JobRepository.class);
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
}
Expand All @@ -145,7 +144,6 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName());
JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters());
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.task.batch.listener.TaskBatchDao;
Expand Down Expand Up @@ -72,7 +71,7 @@ public class JobStepExecutionsDocumentation extends BaseDocumentation {

private static boolean initialized;

private JobRepositoryContainer jobRepositoryContainer;
private JobRepository jobRepository;

private TaskExecutionDaoContainer daoContainer;

Expand Down Expand Up @@ -171,7 +170,7 @@ public void stepProgress() throws Exception {

private void initialize() {
this.aggregateExecutionSupport = context.getBean(AggregateExecutionSupport.class);
this.jobRepositoryContainer = context.getBean(JobRepositoryContainer.class);
this.jobRepository = context.getBean(JobRepository.class);
this.daoContainer = context.getBean(TaskExecutionDaoContainer.class);
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
this.taskDefinitionReader = context.getBean(TaskDefinitionReader.class);
Expand All @@ -182,7 +181,6 @@ private void createJobExecution(String name, BatchStatus status) throws JobInsta
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName());
JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters());
StepExecution stepExecution = new StepExecution(name + "_STEP", jobExecution, jobExecution.getId());
stepExecution.setId(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2014 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,7 @@
*/
public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implements SearchableJobExecutionDao {

private static final String FIND_PARAMS_FROM_ID_5 = "SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID = ?";
private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID = ?";

private static final String GET_COUNT = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION";

Expand Down Expand Up @@ -94,28 +94,16 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement

private static final String TASK_EXECUTION_ID_FILTER = "B.JOB_EXECUTION_ID = E.JOB_EXECUTION_ID AND B.TASK_EXECUTION_ID = ?";

private static final String FIND_JOB_EXECUTIONS_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION"
private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
+ " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc";

private static final String FIND_JOB_EXECUTIONS_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
+ " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc";

private static final String GET_LAST_EXECUTION_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION"
private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
+ " from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)";

private static final String GET_LAST_EXECUTION_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
+ " from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)";

private static final String GET_RUNNING_EXECUTIONS_4 = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
+ "E.JOB_INSTANCE_ID, E.JOB_CONFIGURATION_LOCATION from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";

private static final String GET_RUNNING_EXECUTIONS_5 = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
private static final String GET_RUNNING_EXECUTIONS_BY_JOB_NAME = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";

private static final String GET_EXECUTION_BY_ID_4 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION"
+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";

private static final String GET_EXECUTION_BY_ID_5 = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";

private static final String FROM_CLAUSE_TASK_TASK_BATCH = "%PREFIX%TASK_BATCH B";
Expand Down Expand Up @@ -160,15 +148,7 @@ public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implement

private DataSource dataSource;

private BatchVersion batchVersion;

public JdbcSearchableJobExecutionDao() {
this(BatchVersion.BATCH_4);
}

@SuppressWarnings("deprecation")
public JdbcSearchableJobExecutionDao(BatchVersion batchVersion) {
this.batchVersion = batchVersion;
conversionService = new DefaultConversionService();
conversionService.addConverter(new StringToDateConverter());
}
Expand Down Expand Up @@ -245,17 +225,17 @@ public List<JobExecution> findJobExecutions(JobInstance job) {
Assert.notNull(job, "Job cannot be null.");
Assert.notNull(job.getId(), "Job Id cannot be null.");

String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? FIND_JOB_EXECUTIONS_4 : FIND_JOB_EXECUTIONS_5;
return getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion, job), job.getId());
String sqlQuery = FIND_JOB_EXECUTIONS;
return getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(job), job.getId());

}

@Override
public JobExecution getLastJobExecution(JobInstance jobInstance) {
Long id = jobInstance.getId();
String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_LAST_EXECUTION_4 : GET_LAST_EXECUTION_5;
String sqlQuery = GET_LAST_EXECUTION;
List<JobExecution> executions = getJdbcTemplate().query(getQuery(sqlQuery),
new JobExecutionRowMapper(batchVersion, jobInstance), id, id);
new JobExecutionRowMapper(jobInstance), id, id);

Assert.state(executions.size() <= 1, "There must be at most one latest job execution");

Expand All @@ -270,18 +250,17 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
@Override
public Set<JobExecution> findRunningJobExecutions(String jobName) {
Set<JobExecution> result = new HashSet<>();
String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_RUNNING_EXECUTIONS_4
: GET_RUNNING_EXECUTIONS_5;
getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion), jobName);
String sqlQuery = GET_RUNNING_EXECUTIONS_BY_JOB_NAME;
getJdbcTemplate().query(getQuery(sqlQuery), new JobExecutionRowMapper(), jobName);

return result;
}

@Override
public JobExecution getJobExecution(Long executionId) {
try {
String sqlQuery = batchVersion.equals(BatchVersion.BATCH_4) ? GET_EXECUTION_BY_ID_4 : GET_EXECUTION_BY_ID_5;
return getJdbcTemplate().queryForObject(getQuery(sqlQuery), new JobExecutionRowMapper(batchVersion),
String sqlQuery = GET_EXECUTION_BY_ID;
return getJdbcTemplate().queryForObject(getQuery(sqlQuery), new JobExecutionRowMapper(),
executionId);
}
catch (EmptyResultDataAccessException e) {
Expand Down Expand Up @@ -642,7 +621,7 @@ public JobExecutionWithStepCount mapRow(ResultSet rs, int rowNum) throws SQLExce
}

//TODO: Boot3x followup - need to handle LocalDateTime and possibly Integer
protected JobParameters getJobParametersBatch5(Long executionId) {
protected JobParameters getJobParameters(Long executionId) {
Map<String, JobParameter<?>> map = new HashMap<>();
RowCallbackHandler handler = rs -> {
String parameterName = rs.getString("PARAMETER_NAME");
Expand Down Expand Up @@ -686,21 +665,11 @@ else if (typedValue instanceof Date) {
}
};

getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID_5), handler, executionId);
getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), handler, executionId);

return new JobParameters(map);
}

@Override
protected JobParameters getJobParameters(Long executionId) {
if (batchVersion == BatchVersion.BATCH_4) {
return super.getJobParameters(executionId);
}
else {
return getJobParametersBatch5(executionId);
}
}

JobExecution createJobExecutionFromResultSet(ResultSet rs, int rowNum) throws SQLException {
Long id = rs.getLong(1);
JobExecution jobExecution;
Expand All @@ -723,16 +692,13 @@ JobExecution createJobExecutionFromResultSet(ResultSet rs, int rowNum) throws SQ

private final class JobExecutionRowMapper implements RowMapper<JobExecution> {

private final BatchVersion batchVersion;

private JobInstance jobInstance;

public JobExecutionRowMapper(BatchVersion batchVersion) {
this.batchVersion = batchVersion;
public JobExecutionRowMapper() {

}

public JobExecutionRowMapper(BatchVersion batchVersion, JobInstance jobInstance) {
this.batchVersion = batchVersion;
public JobExecutionRowMapper(JobInstance jobInstance) {
this.jobInstance = jobInstance;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2009-2023 the original author or authors.
* Copyright 2009-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,6 @@
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.repository.AggregateJobQueryDao;
import org.springframework.cloud.dataflow.server.repository.JdbcAggregateJobQueryDao;
import org.springframework.cloud.dataflow.server.service.JobServiceContainer;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcOperations;
Expand Down Expand Up @@ -90,7 +89,7 @@ public class SimpleJobServiceFactoryBean implements FactoryBean<JobService>, Ini

private PlatformTransactionManager transactionManager;

private JobServiceContainer jobServiceContainer;
private JobService jobService;

private SchemaService schemaService;

Expand Down Expand Up @@ -166,11 +165,11 @@ public void setTablePrefix(String tablePrefix) {
}

/**
* Sets the {@link JobServiceContainer} for the service.
* @param jobServiceContainer the JobServiceContainer for this service.
* Sets the {@link JobService} for the factory bean.
* @param jobService the JobService for this Factory Bean.
*/
public void setJobServiceContainer(JobServiceContainer jobServiceContainer) {
this.jobServiceContainer = jobServiceContainer;
public void setJobService(JobService jobService) {
this.jobService = jobService;
}

/**
Expand Down Expand Up @@ -264,8 +263,7 @@ protected SearchableJobInstanceDao createJobInstanceDao() throws Exception {
}

protected SearchableJobExecutionDao createJobExecutionDao() throws Exception {
BatchVersion batchVersion = BatchVersion.from(this.schemaVersionTarget);
JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(batchVersion);
JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao();
dao.setDataSource(dataSource);
dao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix
+ "JOB_EXECUTION_SEQ"));
Expand Down Expand Up @@ -313,7 +311,7 @@ private int determineClobTypeToUse(String databaseType) {
}

protected AggregateJobQueryDao createAggregateJobQueryDao() throws Exception {
return new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobServiceContainer, this.environment);
return new JdbcAggregateJobQueryDao(this.dataSource, this.schemaService, this.jobService, this.environment);
}

/**
Expand Down
Loading

0 comments on commit 9baf3a7

Please sign in to comment.