From 29b40ffcf743f7ce326798246f25fea0bbbc7ddb Mon Sep 17 00:00:00 2001 From: Corneil du Plessis Date: Mon, 13 May 2024 15:17:25 +0200 Subject: [PATCH] Added test for org.springframework.cloud.dataflow.server.service.TaskJobService.populateComposeTaskRunnerStatus. Fix DefaultTaskJobServiceTests to use just created ids in subsequent methods. Fix error in JdbcAggregateJobQueryDao with fixed schema in parameter. Remove logging of sql query. Polished the tests in the PR Used AssertJ instead of Junit Asserts polished the code that cleaned up the database Updated the populateCtrStatus so that the status was checked for `FAILED` vs notNull --- .../repository/JdbcAggregateJobQueryDao.java | 4 +- .../impl/DefaultTaskJobServiceTests.java | 170 +++++++++++------- 2 files changed, 112 insertions(+), 62 deletions(-) diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java index e4e360667a..ec0ca0b581 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java @@ -184,7 +184,7 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao { " where (select count(*) from AGGREGATE_TASK_EXECUTION_PARAMS where" + " CT.TASK_EXECUTION_ID = TASK_EXECUTION_ID and" + " CT.SCHEMA_TARGET = SCHEMA_TARGET and" + - " TASK_PARAM = '--spring.cloud.task.parent-schema-target=boot2') > 0" + + " TASK_PARAM = '--spring.cloud.task.parent-schema-target=:schemaTarget') > 0" + " AND CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0"; private static final String FIND_JOB_BY_NAME_INSTANCE_ID = FIND_JOB_BY + @@ -308,7 +308,7 @@ public void populateCtrStatus(Collection aggregateTaskEx execution.setCtrTaskStatus(ctrStatus); }); } - LOG.debug("updated {} ctr statuses", updated.get()); + LOG.debug("updated {} ctr statuses", updated); } @Override diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java index 4dd497fa5f..3672cedc89 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; @@ -45,6 +46,7 @@ import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport; +import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer; import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; import org.springframework.cloud.dataflow.core.AppRegistration; import org.springframework.cloud.dataflow.core.ApplicationType; @@ -52,6 +54,7 @@ import org.springframework.cloud.dataflow.core.TaskDefinition; import org.springframework.cloud.dataflow.core.TaskPlatformFactory; import org.springframework.cloud.dataflow.registry.service.AppRegistryService; +import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; @@ -68,11 +71,15 @@ import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.dao.TaskExecutionDao; import org.springframework.core.io.FileUrlResource; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.util.Pair; import org.springframework.jdbc.core.JdbcTemplate; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -102,8 +109,6 @@ public class DefaultTaskJobServiceTests { private static long jobInstanceCount = 0; - private static long boot3JobInstanceCount = 0; - @Autowired TaskDefinitionRepository taskDefinitionRepository; @@ -139,6 +144,9 @@ public class DefaultTaskJobServiceTests { @Autowired AggregateExecutionSupport aggregateExecutionSupport; + @Autowired + AggregateTaskExplorer aggregateTaskExplorer; + private JobParameters jobParameters; @Autowired @@ -151,116 +159,148 @@ public void setup() { this.jobParameters = new JobParameters(jobParameterMap); this.jdbcTemplate = new JdbcTemplate(this.dataSource); - resetTaskTables("TASK_"); + resetTables("TASK_", "BATCH_"); initializeSuccessfulRegistry(this.appRegistry); - resetTaskTables("BOOT3_TASK_"); + resetTables("BOOT3_TASK_", "BOOT3_BATCH_"); reset(this.taskLauncher); when(this.taskLauncher.launch(any())).thenReturn("1234"); clearLaunchers(); } - private void resetTaskTables(String prefix) { - this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_PARAMS"); - this.jdbcTemplate.execute("DELETE FROM " + prefix + "TASK_BATCH"); - this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_METADATA"); - this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION;"); - this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "EXECUTION_METADATA_SEQ RESTART WITH 50"); - this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "SEQ RESTART WITH 1"); - this.jdbcTemplate.execute("INSERT INTO " + prefix + "EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');"); + private void resetTables(String taskPrefix, String batchPrefix) { + deleteTable(taskPrefix, "EXECUTION_PARAMS"); + deleteTable(taskPrefix, "TASK_BATCH"); + deleteTable(taskPrefix, "EXECUTION_METADATA"); + deleteTable(taskPrefix, "EXECUTION"); + this.jdbcTemplate.execute(String.format("ALTER SEQUENCE %s%s", taskPrefix, "EXECUTION_METADATA_SEQ RESTART WITH 50")); + this.jdbcTemplate.execute(String.format("ALTER SEQUENCE %s%s", taskPrefix, "SEQ RESTART WITH 1")); + deleteTable(batchPrefix, "STEP_EXECUTION_CONTEXT"); + deleteTable(batchPrefix, "STEP_EXECUTION"); + deleteTable(batchPrefix, "JOB_EXECUTION_CONTEXT"); + deleteTable(batchPrefix, "JOB_EXECUTION_PARAMS"); + deleteTable(batchPrefix, "JOB_EXECUTION"); + deleteTable(batchPrefix, "JOB_INSTANCE"); + this.jdbcTemplate.execute(String.format("INSERT INTO %s%s", taskPrefix, "EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');")); + } + + private void deleteTable(String prefix, String tableName) { + this.jdbcTemplate.execute(String.format("DELETE FROM %s%s", prefix, tableName)); } @Test public void testRestart() throws Exception { createBaseLaunchers(); - initializeJobs(true); - - this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName()); + Pair executionPair = initializeJobs(true, SchemaVersionTarget.defaultTarget()); + this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), SchemaVersionTarget.defaultTarget().getName()); final ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); verify(this.taskLauncher, times(1)).launch(argument.capture()); AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); - assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param(string)=testparam")); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param(string)=testparam"); } @Test public void testRestartBoot3() throws Exception { - SchemaVersionTarget schemaVersionTarget = new SchemaVersionTarget("boot3", AppBootSchemaVersion.BOOT3, - "BOOT3_TASK_", "BOOT3_BATCH_", "H2"); + SchemaVersionTarget schemaVersionTarget = new SchemaVersionTarget("boot3", AppBootSchemaVersion.BOOT3,"BOOT3_TASK_", "BOOT3_BATCH_", "H2"); createBaseLaunchers(); - initializeJobs(true, schemaVersionTarget); - this.taskJobService.restartJobExecution(boot3JobInstanceCount, - SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3).getName()); + Pair executionPair = initializeJobs(true, schemaVersionTarget); + this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), schemaVersionTarget.getName()); final ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); verify(this.taskLauncher, times(1)).launch(argument.capture()); AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); - assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param=testparm,java.lang.String")); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparm,java.lang.String"); } @Test public void testRestartNoPlatform() { createBaseLaunchers(); - initializeJobs(false); + Pair executionPair = initializeJobs(false, SchemaVersionTarget.defaultTarget()); Exception exception = assertThrows(IllegalStateException.class, () -> { - this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName()); + JobExecution jobExecution = executionPair.getSecond(); + this.taskJobService.restartJobExecution(jobExecution.getId(), SchemaVersionTarget.defaultTarget().getName()); }); - assertTrue(exception.getMessage().contains("Did not find platform for taskName=[myJob_ORIG")); + TaskExecution execution = executionPair.getFirst(); + assertThat(exception.getMessage()).contains("Did not find platform for taskName=[" + execution.getTaskName() + "]"); } @Test public void testRestartOnePlatform() throws Exception { this.launcherRepository.save(new Launcher("demo", TaskPlatformFactory.LOCAL_PLATFORM_TYPE, this.taskLauncher)); - initializeJobs(false); - this.taskJobService.restartJobExecution(jobInstanceCount, SchemaVersionTarget.defaultTarget().getName()); + Pair executionPair = initializeJobs(false); + this.taskJobService.restartJobExecution(executionPair.getSecond().getId(), SchemaVersionTarget.defaultTarget().getName()); final ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); verify(this.taskLauncher, times(1)).launch(argument.capture()); AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); - assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param(string)=testparam")); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param(string)=testparam"); } - private void initializeJobs(boolean insertTaskExecutionMetadata) { - initializeJobs(insertTaskExecutionMetadata, + @Test + public void populateCtrStatus() { + Pair ctr = initialiseJob(true, SchemaVersionTarget.defaultTarget(), "a && b", + "a-b"); + initialiseJob(true, SchemaVersionTarget.defaultTarget(), "a", "a", ctr.getFirst().getExecutionId()); + initialiseJob(true, SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3), "b", "b", + ctr.getFirst().getExecutionId()); + Page page = aggregateTaskExplorer.findAll(Pageable.ofSize(100)); + assertThat(page.getContent().size()).isEqualTo(5); + AggregateTaskExecution ctrTask = page.stream() + .filter(aggregateTaskExecution -> aggregateTaskExecution.getTaskName().equals("a-b")) + .findFirst() + .orElse(null); + assertThat(ctrTask).isNotNull(); + assertThat(ctrTask.getCtrTaskStatus()).isNull(); + taskJobService.populateComposeTaskRunnerStatus(page.getContent()); + assertThat(page.stream()).anyMatch(aggregateTaskExecution -> aggregateTaskExecution.getExecutionId() == ctrTask.getExecutionId()); + assertThat(ctrTask.getCtrTaskStatus()).isEqualTo("FAILED"); + } + private Pair initializeJobs(boolean insertTaskExecutionMetadata) { + return initializeJobs(insertTaskExecutionMetadata, new SchemaVersionTarget("boot2", AppBootSchemaVersion.BOOT2, "TASK_", "BATCH_", "H2")); } - private void initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTarget schemaVersionTarget) { + private Pair initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTarget schemaVersionTarget) { String definitionName = (AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) ? "some-name-boot3" : "some-name"; - this.taskDefinitionRepository.save(new TaskDefinition(JOB_NAME_ORIG + jobInstanceCount, definitionName )); + String definition = JOB_NAME_ORIG + jobInstanceCount; + return initialiseJob(insertTaskExecutionMetadata, schemaVersionTarget, definition, definitionName); + + } + + private Pair initialiseJob(boolean insertTaskExecutionMetadata, + SchemaVersionTarget schemaVersionTarget, String definition, String definitionName) { + return initialiseJob(insertTaskExecutionMetadata, schemaVersionTarget, definition, definitionName, null); + } + + private Pair initialiseJob(boolean insertTaskExecutionMetadata, + SchemaVersionTarget schemaVersionTarget, String definition, String definitionName, Long parentId) { + this.taskDefinitionRepository.save(new TaskDefinition(definitionName, definition)); JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName()); TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(schemaVersionTarget.getName()); TaskExecutionDao taskExecutionDao = taskExecutionDaoContainer.get(schemaVersionTarget.getName()); - createSampleJob( - jobRepository, - taskBatchDao, - taskExecutionDao, - JOB_NAME_ORIG + jobInstanceCount, - BatchStatus.FAILED, - insertTaskExecutionMetadata, - schemaVersionTarget - ); - if(AppBootSchemaVersion.BOOT2.equals(schemaVersionTarget.getSchemaVersion())) { - jobInstanceCount++; - } - else { - boot3JobInstanceCount++; - } + Pair jobExecutionPair = createSampleJob(jobRepository, taskBatchDao, + taskExecutionDao, definitionName, BatchStatus.FAILED, insertTaskExecutionMetadata, schemaVersionTarget, + parentId + ); + return jobExecutionPair; } - private void createSampleJob( + private Pair createSampleJob( JobRepository jobRepository, TaskBatchDao taskBatchDao, TaskExecutionDao taskExecutionDao, String jobName, BatchStatus status, boolean insertTaskExecutionMetadata, - SchemaVersionTarget schemaVersionTarget + SchemaVersionTarget schemaVersionTarget, Long parentId ) { JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters()); - - TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.emptyList(), null); + jobInstanceCount++; + TaskExecution taskExecution = parentId != null + ? taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.singletonList("--spring.cloud.task.parent-schema-target=" + schemaVersionTarget.getName()), null, parentId) + : taskExecutionDao.createTaskExecution(jobName, new Date(), Collections.singletonList("--spring.cloud.task.parent-schema-target=" + schemaVersionTarget.getName()), null); JobExecution jobExecution; JdbcTemplate template = new JdbcTemplate(this.dataSource); @@ -268,29 +308,34 @@ private void createSampleJob( template.execute(String.format("INSERT INTO " + schemaVersionTarget.getTaskPrefix() + "EXECUTION_METADATA (ID, TASK_EXECUTION_ID, TASK_EXECUTION_MANIFEST) VALUES (%s, %s, '{\"taskDeploymentRequest\":{\"definition\":{\"name\":\"bd0917a\",\"properties\":{\"spring.datasource.username\":\"root\",\"spring.cloud.task.name\":\"bd0917a\",\"spring.datasource.url\":\"jdbc:mariadb://localhost:3306/task\",\"spring.datasource.driverClassName\":\"org.mariadb.jdbc.Driver\",\"spring.datasource.password\":\"password\"}},\"resource\":\"file:/Users/glennrenfro/tmp/batchdemo-0.0.1-SNAPSHOT.jar\",\"deploymentProperties\":{},\"commandlineArguments\":[\"run.id_long=1\",\"--spring.cloud.task.executionid=201\"]},\"platformName\":\"demo\"}')", taskExecution.getExecutionId(), taskExecution.getExecutionId())); } if(AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) { - jobExecution = new JobExecution(instance, 1L, this.jobParameters, "foo"); + jobExecution = new JobExecution(instance, taskExecution.getExecutionId(), this.jobParameters, "foo"); jobExecution.setCreateTime(new Date()); jobExecution.setVersion(1); - Object[] jobExecutionParameters = new Object[] { 1, 1, new Date(), new Date(), + + Object[] jobExecutionParameters = new Object[] { jobExecution.getId(), instance.getInstanceId(), new Date(), new Date(), BatchStatus.COMPLETED, ExitStatus.COMPLETED, ExitStatus.COMPLETED.getExitDescription(), 1, new Date(), new Date() }; - Object[] jobExecutionParmParameters = new Object[] { 1, "identifying.param", "java.lang.String", "testparm", "Y"}; this.jdbcTemplate.update(SAVE_JOB_EXECUTION, jobExecutionParameters, new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }); + + Object[] jobExecutionParmParameters = new Object[] { jobExecution.getId(), "identifying.param", "java.lang.String", "testparm", "Y"}; this.jdbcTemplate.update(SAVE_JOB_EXECUTION_PARAM, jobExecutionParmParameters, new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.CHAR}); } else { jobExecution = jobRepository.createJobExecution(instance, this.jobParameters, null); - StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L); + StepExecution stepExecution = new StepExecution("foo", jobExecution, jobExecution.getJobId()); stepExecution.setId(null); jobRepository.add(stepExecution); } taskBatchDao.saveRelationship(taskExecution, jobExecution); jobExecution.setStatus(status); jobExecution.setStartTime(new Date()); + ExitStatus exitStatus = new ExitStatus(BatchStatus.COMPLETED.equals(status.getBatchStatus()) ? "0" : "1", status.toString()); + jobExecution.setExitStatus(exitStatus); jobRepository.update(jobExecution); + return Pair.of(taskExecution, jobExecution); } private void clearLaunchers() { @@ -307,10 +352,15 @@ private void createBaseLaunchers() { } private static void initializeSuccessfulRegistry(AppRegistryService appRegistry) { - when(appRegistry.find(eq("some-name"), any(ApplicationType.class))).thenReturn( - new AppRegistration("some-name", ApplicationType.task, URI.create("https://helloworld"))); - when(appRegistry.find(eq("some-name-boot3"), any(ApplicationType.class))).thenReturn( - new AppRegistration("some-name-boot3", ApplicationType.task, "", URI.create("https://helloworld"), URI.create("https://helloworld"), AppBootSchemaVersion.fromBootVersion("3"))); + AppRegistration someName = new AppRegistration("some-name", ApplicationType.task, URI.create("https://helloworld")); + when(appRegistry.find(eq("some-name"), any(ApplicationType.class))).thenReturn(someName); + AppRegistration someNameBoot3 = new AppRegistration("some-name-boot3", ApplicationType.task, "", URI.create("https://helloworld"), URI.create("https://helloworld"), AppBootSchemaVersion.fromBootVersion("3")); + when(appRegistry.find(eq("some-name-boot3"), any(ApplicationType.class))).thenReturn(someNameBoot3); + AppRegistration myJobOrig = new AppRegistration("myJob_ORIG", ApplicationType.task, URI.create("https://myjob")); + AppRegistration myJobOrigBoot3 = new AppRegistration("myJob_ORIG", ApplicationType.task, "3.0.0", URI.create("https://myjob"), URI.create("https:/myjob/metadata"), AppBootSchemaVersion.fromBootVersion("3")); + when(appRegistry.find(contains("myJob_ORIG"), any(ApplicationType.class), eq("3.0.0"))).thenReturn(myJobOrigBoot3); + when(appRegistry.find(contains("myJob_ORIG"), any(ApplicationType.class))).thenReturn(myJobOrig); + try { when(appRegistry.getAppResource(any())).thenReturn(new FileUrlResource("src/test/resources/apps/foo-task")); }