diff --git a/pom.xml b/pom.xml
index a2ee37d1ce..f8c3df2436 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
spring-cloud-dataflow-shell
spring-cloud-dataflow-shell-core
spring-cloud-dataflow-completion
- spring-cloud-skipper
+
spring-cloud-starter-dataflow-server
spring-cloud-starter-dataflow-ui
spring-cloud-dataflow-server
diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java
index d71711648f..261a3acc5c 100644
--- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java
+++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java
@@ -225,9 +225,7 @@ public int stopAll() {
Collection result = jobExecutionDao.getRunningJobExecutions();
for (JobExecution jobExecution : result) {
try {
- jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
- jobExecution.setStatus( BatchStatus.STOPPING);
- jobRepository.update(jobExecution);
+ stopJobExecution(jobExecution);
} catch (Exception e) {
throw new IllegalArgumentException("The following JobExecutionId was not found: " + jobExecution.getId(), e);
}
@@ -238,14 +236,19 @@ public int stopAll() {
@Override
public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
- JobExecution jobExecution = getJobExecution(jobExecutionId);
+ return stopJobExecution(getJobExecution(jobExecutionId));
+ }
+
+ private JobExecution stopJobExecution(JobExecution jobExecution) throws JobExecutionNotRunningException{
if (!jobExecution.isRunning()) {
throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped");
}
-
- logger.info("Stopping job execution: " + jobExecution);
-
- jobExecution.setStatus(BatchStatus.STOPPED);
+ // Indicate the execution should be stopped by setting it's status to
+ // 'STOPPING'. It is assumed that
+ // the step implementation will check this status at chunk boundaries.
+ logger.info("Stopping job execution: {}", jobExecution);
+ jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
+ jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
return jobExecution;
diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java
index e5f5e3c3da..0ed92681de 100644
--- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java
+++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java
@@ -28,6 +28,8 @@
import javax.sql.DataSource;
import org.junit.jupiter.api.Test;
+import org.springframework.batch.core.launch.JobExecutionNotRunningException;
+import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.springframework.batch.core.BatchStatus;
@@ -191,13 +193,25 @@ void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() {
@Test
void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception {
- JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME,true);
- jobExecution = jobService.getJobExecution(jobExecution.getId());
- assertThat(jobExecution.isRunning()).isTrue();
- assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
+ JobExecution jobExecution = createRunningJobExecution(BASE_JOB_INST_NAME);
jobService.stop(jobExecution.getId());
+ assertJobHasStopped(jobExecution);
+ }
+
+ @Test
+ void stoppingAllJobExecutionsShouldLeaveJobExecutionsWithStatusOfStopping() throws Exception {
+ JobExecution jobExecutionOne = createRunningJobExecution(BASE_JOB_INST_NAME);
+ JobExecution jobExecutionTwo = createRunningJobExecution(BASE_JOB_INST_NAME+"_TWO");
+ jobService.stop(jobExecutionOne.getId());
+ assertJobHasStopped(jobExecutionOne);
+ jobService.stop(jobExecutionTwo.getId());
+ assertJobHasStopped(jobExecutionTwo);
+ }
+
+ private void assertJobHasStopped(JobExecution jobExecution) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
jobExecution = jobService.getJobExecution(jobExecution.getId());
- assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPED);
+ assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING);
+ assertThat(jobExecution.isRunning()).isTrue();
}
private void verifyJobInstance(long id, String name) throws Exception {
@@ -221,9 +235,13 @@ private JobExecution createJobExecution(String name) throws Exception {
return createJobExecution(name, BatchStatus.STARTING, false);
}
- private JobExecution createJobExecution(String name, boolean isRunning)
+ private JobExecution createRunningJobExecution(String name)
throws Exception {
- return createJobExecution(name, BatchStatus.STARTING, isRunning);
+ JobExecution jobExecution = createJobExecution(name, BatchStatus.STARTING, true);
+ jobExecution = jobService.getJobExecution(jobExecution.getId());
+ assertThat(jobExecution.isRunning()).isTrue();
+ assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
+ return jobExecution;
}
private JobExecution createJobExecution(String name, BatchStatus batchStatus, boolean isRunning) throws Exception {