Skip to content

Commit

Permalink
Added test for org.springframework.cloud.dataflow.server.service.Task…
Browse files Browse the repository at this point in the history
…JobService.populateComposeTaskRunnerStatus.

Fix DefaultTaskJobServiceTests to use just created ids in subsequent methods.
Fix error in JdbcAggregateJobQueryDao with fixed schema in parameter.

Remove logging of sql query.

Polished the tests in the PR

Used AssertJ instead of Junit Asserts
polished the code that cleaned up the database
Updated the populateCtrStatus so that the status was checked for `FAILED` vs notNull
  • Loading branch information
corneil authored and cppwfs committed May 13, 2024
1 parent fa65ce2 commit 29b40ff
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao {
" where (select count(*) from AGGREGATE_TASK_EXECUTION_PARAMS where" +
" CT.TASK_EXECUTION_ID = TASK_EXECUTION_ID and" +
" CT.SCHEMA_TARGET = SCHEMA_TARGET and" +
" TASK_PARAM = '--spring.cloud.task.parent-schema-target=boot2') > 0" +
" TASK_PARAM = '--spring.cloud.task.parent-schema-target=:schemaTarget') > 0" +
" AND CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0";

private static final String FIND_JOB_BY_NAME_INSTANCE_ID = FIND_JOB_BY +
Expand Down Expand Up @@ -308,7 +308,7 @@ public void populateCtrStatus(Collection<AggregateTaskExecution> aggregateTaskEx
execution.setCtrTaskStatus(ctrStatus);
});
}
LOG.debug("updated {} ctr statuses", updated.get());
LOG.debug("updated {} ctr statuses", updated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -45,13 +46,15 @@
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport;
import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer;
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.AppRegistration;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.core.Launcher;
import org.springframework.cloud.dataflow.core.TaskDefinition;
import org.springframework.cloud.dataflow.core.TaskPlatformFactory;
import org.springframework.cloud.dataflow.registry.service.AppRegistryService;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.configuration.JobDependencies;
Expand All @@ -68,11 +71,15 @@
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.core.io.FileUrlResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.util.Pair;
import org.springframework.jdbc.core.JdbcTemplate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -102,8 +109,6 @@ public class DefaultTaskJobServiceTests {

private static long jobInstanceCount = 0;

private static long boot3JobInstanceCount = 0;

@Autowired
TaskDefinitionRepository taskDefinitionRepository;

Expand Down Expand Up @@ -139,6 +144,9 @@ public class DefaultTaskJobServiceTests {
@Autowired
AggregateExecutionSupport aggregateExecutionSupport;

@Autowired
AggregateTaskExplorer aggregateTaskExplorer;

private JobParameters jobParameters;

@Autowired
Expand All @@ -151,146 +159,183 @@ public void setup() {
this.jobParameters = new JobParameters(jobParameterMap);

this.jdbcTemplate = new JdbcTemplate(this.dataSource);
resetTaskTables("TASK_");
resetTables("TASK_", "BATCH_");
initializeSuccessfulRegistry(this.appRegistry);
resetTaskTables("BOOT3_TASK_");
resetTables("BOOT3_TASK_", "BOOT3_BATCH_");

reset(this.taskLauncher);
when(this.taskLauncher.launch(any())).thenReturn("1234");
clearLaunchers();
}

private void resetTaskTables(String prefix) {
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_PARAMS");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "TASK_BATCH");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_METADATA");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION;");
this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "EXECUTION_METADATA_SEQ RESTART WITH 50");
this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "SEQ RESTART WITH 1");
this.jdbcTemplate.execute("INSERT INTO " + prefix + "EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');");
private void resetTables(String taskPrefix, String batchPrefix) {
deleteTable(taskPrefix, "EXECUTION_PARAMS");
deleteTable(taskPrefix, "TASK_BATCH");
deleteTable(taskPrefix, "EXECUTION_METADATA");
deleteTable(taskPrefix, "EXECUTION");
this.jdbcTemplate.execute(String.format("ALTER SEQUENCE %s%s", taskPrefix, "EXECUTION_METADATA_SEQ RESTART WITH 50"));
this.jdbcTemplate.execute(String.format("ALTER SEQUENCE %s%s", taskPrefix, "SEQ RESTART WITH 1"));
deleteTable(batchPrefix, "STEP_EXECUTION_CONTEXT");
deleteTable(batchPrefix, "STEP_EXECUTION");
deleteTable(batchPrefix, "JOB_EXECUTION_CONTEXT");
deleteTable(batchPrefix, "JOB_EXECUTION_PARAMS");
deleteTable(batchPrefix, "JOB_EXECUTION");
deleteTable(batchPrefix, "JOB_INSTANCE");
this.jdbcTemplate.execute(String.format("INSERT INTO %s%s", taskPrefix, "EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');"));
}

private void deleteTable(String prefix, String tableName) {
this.jdbcTemplate.execute(String.format("DELETE FROM %s%s", prefix, tableName));
}

@Test
public void testRestart() throws Exception {
createBaseLaunchers();
initializeJobs(true);

this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName());
Pair<TaskExecution, JobExecution> executionPair = initializeJobs(true, SchemaVersionTarget.defaultTarget());
this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), SchemaVersionTarget.defaultTarget().getName());
final ArgumentCaptor<AppDeploymentRequest> argument = ArgumentCaptor.forClass(AppDeploymentRequest.class);
verify(this.taskLauncher, times(1)).launch(argument.capture());
AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0);

assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param(string)=testparam"));
assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param(string)=testparam");
}

@Test
public void testRestartBoot3() throws Exception {
SchemaVersionTarget schemaVersionTarget = new SchemaVersionTarget("boot3", AppBootSchemaVersion.BOOT3,
"BOOT3_TASK_", "BOOT3_BATCH_", "H2");
SchemaVersionTarget schemaVersionTarget = new SchemaVersionTarget("boot3", AppBootSchemaVersion.BOOT3,"BOOT3_TASK_", "BOOT3_BATCH_", "H2");
createBaseLaunchers();
initializeJobs(true, schemaVersionTarget);
this.taskJobService.restartJobExecution(boot3JobInstanceCount,
SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3).getName());
Pair<TaskExecution, JobExecution> executionPair = initializeJobs(true, schemaVersionTarget);
this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), schemaVersionTarget.getName());
final ArgumentCaptor<AppDeploymentRequest> argument = ArgumentCaptor.forClass(AppDeploymentRequest.class);
verify(this.taskLauncher, times(1)).launch(argument.capture());
AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0);
assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param=testparm,java.lang.String"));
assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparm,java.lang.String");
}

@Test
public void testRestartNoPlatform() {
createBaseLaunchers();
initializeJobs(false);
Pair<TaskExecution, JobExecution> executionPair = initializeJobs(false, SchemaVersionTarget.defaultTarget());
Exception exception = assertThrows(IllegalStateException.class, () -> {
this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName());
JobExecution jobExecution = executionPair.getSecond();
this.taskJobService.restartJobExecution(jobExecution.getId(), SchemaVersionTarget.defaultTarget().getName());
});
assertTrue(exception.getMessage().contains("Did not find platform for taskName=[myJob_ORIG"));
TaskExecution execution = executionPair.getFirst();
assertThat(exception.getMessage()).contains("Did not find platform for taskName=[" + execution.getTaskName() + "]");
}

@Test
public void testRestartOnePlatform() throws Exception {
this.launcherRepository.save(new Launcher("demo", TaskPlatformFactory.LOCAL_PLATFORM_TYPE, this.taskLauncher));

initializeJobs(false);
this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName());
Pair<TaskExecution, JobExecution> executionPair = initializeJobs(false);
this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), SchemaVersionTarget.defaultTarget().getName());
final ArgumentCaptor<AppDeploymentRequest> argument = ArgumentCaptor.forClass(AppDeploymentRequest.class);
verify(this.taskLauncher, times(1)).launch(argument.capture());
AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0);
assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param(string)=testparam"));
assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param(string)=testparam");
}

private void initializeJobs(boolean insertTaskExecutionMetadata) {
initializeJobs(insertTaskExecutionMetadata,
@Test
public void populateCtrStatus() {
Pair<TaskExecution, JobExecution> ctr = initialiseJob(true, SchemaVersionTarget.defaultTarget(), "a && b",
"a-b");
initialiseJob(true, SchemaVersionTarget.defaultTarget(), "a", "a", ctr.getFirst().getExecutionId());
initialiseJob(true, SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3), "b", "b",
ctr.getFirst().getExecutionId());
Page<AggregateTaskExecution> page = aggregateTaskExplorer.findAll(Pageable.ofSize(100));
assertThat(page.getContent().size()).isEqualTo(5);
AggregateTaskExecution ctrTask = page.stream()
.filter(aggregateTaskExecution -> aggregateTaskExecution.getTaskName().equals("a-b"))
.findFirst()
.orElse(null);
assertThat(ctrTask).isNotNull();
assertThat(ctrTask.getCtrTaskStatus()).isNull();
taskJobService.populateComposeTaskRunnerStatus(page.getContent());
assertThat(page.stream()).anyMatch(aggregateTaskExecution -> aggregateTaskExecution.getExecutionId() == ctrTask.getExecutionId());
assertThat(ctrTask.getCtrTaskStatus()).isEqualTo("FAILED");
}
private Pair<TaskExecution, JobExecution> initializeJobs(boolean insertTaskExecutionMetadata) {
return initializeJobs(insertTaskExecutionMetadata,
new SchemaVersionTarget("boot2", AppBootSchemaVersion.BOOT2, "TASK_",
"BATCH_", "H2"));
}
private void initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTarget schemaVersionTarget) {
private Pair<TaskExecution, JobExecution> initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTarget schemaVersionTarget) {
String definitionName = (AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) ?
"some-name-boot3" : "some-name";
this.taskDefinitionRepository.save(new TaskDefinition(JOB_NAME_ORIG + jobInstanceCount, definitionName ));
String definition = JOB_NAME_ORIG + jobInstanceCount;
return initialiseJob(insertTaskExecutionMetadata, schemaVersionTarget, definition, definitionName);

}

private Pair<TaskExecution, JobExecution> initialiseJob(boolean insertTaskExecutionMetadata,
SchemaVersionTarget schemaVersionTarget, String definition, String definitionName) {
return initialiseJob(insertTaskExecutionMetadata, schemaVersionTarget, definition, definitionName, null);
}

private Pair<TaskExecution, JobExecution> initialiseJob(boolean insertTaskExecutionMetadata,
SchemaVersionTarget schemaVersionTarget, String definition, String definitionName, Long parentId) {
this.taskDefinitionRepository.save(new TaskDefinition(definitionName, definition));
JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName());
TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(schemaVersionTarget.getName());
TaskExecutionDao taskExecutionDao = taskExecutionDaoContainer.get(schemaVersionTarget.getName());
createSampleJob(
jobRepository,
taskBatchDao,
taskExecutionDao,
JOB_NAME_ORIG + jobInstanceCount,
BatchStatus.FAILED,
insertTaskExecutionMetadata,
schemaVersionTarget
);
if(AppBootSchemaVersion.BOOT2.equals(schemaVersionTarget.getSchemaVersion())) {
jobInstanceCount++;
}
else {
boot3JobInstanceCount++;
}

Pair<TaskExecution, JobExecution> jobExecutionPair = createSampleJob(jobRepository, taskBatchDao,
taskExecutionDao, definitionName, BatchStatus.FAILED, insertTaskExecutionMetadata, schemaVersionTarget,
parentId
);
return jobExecutionPair;
}

private void createSampleJob(
private Pair<TaskExecution, JobExecution> createSampleJob(
JobRepository jobRepository,
TaskBatchDao taskBatchDao,
TaskExecutionDao taskExecutionDao,
String jobName,
BatchStatus status,
boolean insertTaskExecutionMetadata,
SchemaVersionTarget schemaVersionTarget
SchemaVersionTarget schemaVersionTarget, Long parentId
) {
JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters());

TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.emptyList(), null);
jobInstanceCount++;
TaskExecution taskExecution = parentId != null
? taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.singletonList("--spring.cloud.task.parent-schema-target=" + schemaVersionTarget.getName()), null, parentId)
: taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.singletonList("--spring.cloud.task.parent-schema-target=" + schemaVersionTarget.getName()), null);
JobExecution jobExecution;
JdbcTemplate template = new JdbcTemplate(this.dataSource);

if (insertTaskExecutionMetadata) {
template.execute(String.format("INSERT INTO " + schemaVersionTarget.getTaskPrefix() + "EXECUTION_METADATA (ID, TASK_EXECUTION_ID, TASK_EXECUTION_MANIFEST) VALUES (%s, %s, '{\"taskDeploymentRequest\":{\"definition\":{\"name\":\"bd0917a\",\"properties\":{\"spring.datasource.username\":\"root\",\"spring.cloud.task.name\":\"bd0917a\",\"spring.datasource.url\":\"jdbc:mariadb://localhost:3306/task\",\"spring.datasource.driverClassName\":\"org.mariadb.jdbc.Driver\",\"spring.datasource.password\":\"password\"}},\"resource\":\"file:/Users/glennrenfro/tmp/batchdemo-0.0.1-SNAPSHOT.jar\",\"deploymentProperties\":{},\"commandlineArguments\":[\"run.id_long=1\",\"--spring.cloud.task.executionid=201\"]},\"platformName\":\"demo\"}')", taskExecution.getExecutionId(), taskExecution.getExecutionId()));
}
if(AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) {
jobExecution = new JobExecution(instance, 1L, this.jobParameters, "foo");
jobExecution = new JobExecution(instance, taskExecution.getExecutionId(), this.jobParameters, "foo");
jobExecution.setCreateTime(new Date());
jobExecution.setVersion(1);
Object[] jobExecutionParameters = new Object[] { 1, 1, new Date(), new Date(),

Object[] jobExecutionParameters = new Object[] { jobExecution.getId(), instance.getInstanceId(), new Date(), new Date(),
BatchStatus.COMPLETED, ExitStatus.COMPLETED,
ExitStatus.COMPLETED.getExitDescription(), 1, new Date(), new Date() };
Object[] jobExecutionParmParameters = new Object[] { 1, "identifying.param", "java.lang.String", "testparm", "Y"};
this.jdbcTemplate.update(SAVE_JOB_EXECUTION, jobExecutionParameters,
new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP });

Object[] jobExecutionParmParameters = new Object[] { jobExecution.getId(), "identifying.param", "java.lang.String", "testparm", "Y"};
this.jdbcTemplate.update(SAVE_JOB_EXECUTION_PARAM, jobExecutionParmParameters, new int[] { Types.BIGINT,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.CHAR});
} else {
jobExecution = jobRepository.createJobExecution(instance,
this.jobParameters, null);
StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L);
StepExecution stepExecution = new StepExecution("foo", jobExecution, jobExecution.getJobId());
stepExecution.setId(null);
jobRepository.add(stepExecution);
}
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
ExitStatus exitStatus = new ExitStatus(BatchStatus.COMPLETED.equals(status.getBatchStatus()) ? "0" : "1", status.toString());
jobExecution.setExitStatus(exitStatus);
jobRepository.update(jobExecution);
return Pair.of(taskExecution, jobExecution);
}

private void clearLaunchers() {
Expand All @@ -307,10 +352,15 @@ private void createBaseLaunchers() {
}

private static void initializeSuccessfulRegistry(AppRegistryService appRegistry) {
when(appRegistry.find(eq("some-name"), any(ApplicationType.class))).thenReturn(
new AppRegistration("some-name", ApplicationType.task, URI.create("https://helloworld")));
when(appRegistry.find(eq("some-name-boot3"), any(ApplicationType.class))).thenReturn(
new AppRegistration("some-name-boot3", ApplicationType.task, "", URI.create("https://helloworld"), URI.create("https://helloworld"), AppBootSchemaVersion.fromBootVersion("3")));
AppRegistration someName = new AppRegistration("some-name", ApplicationType.task, URI.create("https://helloworld"));
when(appRegistry.find(eq("some-name"), any(ApplicationType.class))).thenReturn(someName);
AppRegistration someNameBoot3 = new AppRegistration("some-name-boot3", ApplicationType.task, "", URI.create("https://helloworld"), URI.create("https://helloworld"), AppBootSchemaVersion.fromBootVersion("3"));
when(appRegistry.find(eq("some-name-boot3"), any(ApplicationType.class))).thenReturn(someNameBoot3);
AppRegistration myJobOrig = new AppRegistration("myJob_ORIG", ApplicationType.task, URI.create("https://myjob"));
AppRegistration myJobOrigBoot3 = new AppRegistration("myJob_ORIG", ApplicationType.task, "3.0.0", URI.create("https://myjob"), URI.create("https:/myjob/metadata"), AppBootSchemaVersion.fromBootVersion("3"));
when(appRegistry.find(contains("myJob_ORIG"), any(ApplicationType.class), eq("3.0.0"))).thenReturn(myJobOrigBoot3);
when(appRegistry.find(contains("myJob_ORIG"), any(ApplicationType.class))).thenReturn(myJobOrig);

try {
when(appRegistry.getAppResource(any())).thenReturn(new FileUrlResource("src/test/resources/apps/foo-task"));
}
Expand Down

0 comments on commit 29b40ff

Please sign in to comment.