diff --git a/pom.xml b/pom.xml index a2ee37d1ce..f8c3df2436 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ spring-cloud-dataflow-shell spring-cloud-dataflow-shell-core spring-cloud-dataflow-completion - spring-cloud-skipper + spring-cloud-starter-dataflow-server spring-cloud-starter-dataflow-ui spring-cloud-dataflow-server diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java index d71711648f..261a3acc5c 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java @@ -225,9 +225,7 @@ public int stopAll() { Collection result = jobExecutionDao.getRunningJobExecutions(); for (JobExecution jobExecution : result) { try { - jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly); - jobExecution.setStatus( BatchStatus.STOPPING); - jobRepository.update(jobExecution); + stopJobExecution(jobExecution); } catch (Exception e) { throw new IllegalArgumentException("The following JobExecutionId was not found: " + jobExecution.getId(), e); } @@ -238,14 +236,19 @@ public int stopAll() { @Override public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { - JobExecution jobExecution = getJobExecution(jobExecutionId); + return stopJobExecution(getJobExecution(jobExecutionId)); + } + + private JobExecution stopJobExecution(JobExecution jobExecution) throws JobExecutionNotRunningException{ if (!jobExecution.isRunning()) { throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped"); } - - logger.info("Stopping job execution: " + jobExecution); - - jobExecution.setStatus(BatchStatus.STOPPED); + // Indicate the execution should be stopped by setting it's status to + // 'STOPPING'. It is assumed that + // the step implementation will check this status at chunk boundaries. + logger.info("Stopping job execution: {}", jobExecution); + jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly); + jobExecution.setStatus(BatchStatus.STOPPING); jobRepository.update(jobExecution); return jobExecution; diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java index e5f5e3c3da..0ed92681de 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java @@ -28,6 +28,8 @@ import javax.sql.DataSource; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.launch.JobExecutionNotRunningException; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.testcontainers.containers.JdbcDatabaseContainer; import org.springframework.batch.core.BatchStatus; @@ -191,13 +193,25 @@ void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() { @Test void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception { - JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME,true); - jobExecution = jobService.getJobExecution(jobExecution.getId()); - assertThat(jobExecution.isRunning()).isTrue(); - assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + JobExecution jobExecution = createRunningJobExecution(BASE_JOB_INST_NAME); jobService.stop(jobExecution.getId()); + assertJobHasStopped(jobExecution); + } + + @Test + void stoppingAllJobExecutionsShouldLeaveJobExecutionsWithStatusOfStopping() throws Exception { + JobExecution jobExecutionOne = createRunningJobExecution(BASE_JOB_INST_NAME); + JobExecution jobExecutionTwo = createRunningJobExecution(BASE_JOB_INST_NAME+"_TWO"); + jobService.stop(jobExecutionOne.getId()); + assertJobHasStopped(jobExecutionOne); + jobService.stop(jobExecutionTwo.getId()); + assertJobHasStopped(jobExecutionTwo); + } + + private void assertJobHasStopped(JobExecution jobExecution) throws NoSuchJobExecutionException, JobExecutionNotRunningException { jobExecution = jobService.getJobExecution(jobExecution.getId()); - assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPED); + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); + assertThat(jobExecution.isRunning()).isTrue(); } private void verifyJobInstance(long id, String name) throws Exception { @@ -221,9 +235,13 @@ private JobExecution createJobExecution(String name) throws Exception { return createJobExecution(name, BatchStatus.STARTING, false); } - private JobExecution createJobExecution(String name, boolean isRunning) + private JobExecution createRunningJobExecution(String name) throws Exception { - return createJobExecution(name, BatchStatus.STARTING, isRunning); + JobExecution jobExecution = createJobExecution(name, BatchStatus.STARTING, true); + jobExecution = jobService.getJobExecution(jobExecution.getId()); + assertThat(jobExecution.isRunning()).isTrue(); + assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + return jobExecution; } private JobExecution createJobExecution(String name, BatchStatus batchStatus, boolean isRunning) throws Exception {