From 13a46ab77c5d9a94e98687273f7aa0af2fac168a Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Wed, 16 Oct 2024 10:02:03 -0400 Subject: [PATCH] Provide method for stopping Batch 5 Jobs upon user request In the previous release of SCDF we used the JsrJobOperator to stop job executions. The 2 stages of stopping jobs is as follows: 1) Sets the Batch Status of the job execution to STOPPING. This signals to Spring Batch to stop execution at the next step. 2) If the Job is a StepLocator it will go through each of the StoppableTasklets and stop them. So when using Batch 4.x it was just a quick check of the JobRegistry to retrieve the job, which was always empty since SCDF never deals with Jobs directly. But with Batch 5.x they loaded the Job Registry and attempted to retrieve the Job in a different way using the SimpleJobOperator. In the updated solution, SCDF doesn't use the SimpleJobOperator since SCDF doesn't have access to the StepLocator for the Job, nor does it use the JobRegistry and even if it did, it would always be empty. w * Modify stopAll so that it calls stop() instead of using its own logic * Add Test for stopAll Updated based on code review Extract job stop code from stop(long) and place into stopJobExecution method. The stopJobExecution method will be used by stop(long) and stopAll. Updated Based on Code Review Moved Job Stop Code from assert methods to the tests. Updated the JobExecution Create methods so that they verify that the job is not Stopping --- .../server/batch/SimpleJobService.java | 19 ++++++----- .../batch/AbstractSimpleJobServiceTests.java | 32 +++++++++++++++---- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java index d71711648f..261a3acc5c 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java @@ -225,9 +225,7 @@ public int stopAll() { Collection result = jobExecutionDao.getRunningJobExecutions(); for (JobExecution jobExecution : result) { try { - jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly); - jobExecution.setStatus( BatchStatus.STOPPING); - jobRepository.update(jobExecution); + stopJobExecution(jobExecution); } catch (Exception e) { throw new IllegalArgumentException("The following JobExecutionId was not found: " + jobExecution.getId(), e); } @@ -238,14 +236,19 @@ public int stopAll() { @Override public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { - JobExecution jobExecution = getJobExecution(jobExecutionId); + return stopJobExecution(getJobExecution(jobExecutionId)); + } + + private JobExecution stopJobExecution(JobExecution jobExecution) throws JobExecutionNotRunningException{ if (!jobExecution.isRunning()) { throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped"); } - - logger.info("Stopping job execution: " + jobExecution); - - jobExecution.setStatus(BatchStatus.STOPPED); + // Indicate the execution should be stopped by setting it's status to + // 'STOPPING'. It is assumed that + // the step implementation will check this status at chunk boundaries. + logger.info("Stopping job execution: {}", jobExecution); + jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly); + jobExecution.setStatus(BatchStatus.STOPPING); jobRepository.update(jobExecution); return jobExecution; diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java index e5f5e3c3da..0ed92681de 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java @@ -28,6 +28,8 @@ import javax.sql.DataSource; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.launch.JobExecutionNotRunningException; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.testcontainers.containers.JdbcDatabaseContainer; import org.springframework.batch.core.BatchStatus; @@ -191,13 +193,25 @@ void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() { @Test void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception { - JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME,true); - jobExecution = jobService.getJobExecution(jobExecution.getId()); - assertThat(jobExecution.isRunning()).isTrue(); - assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + JobExecution jobExecution = createRunningJobExecution(BASE_JOB_INST_NAME); jobService.stop(jobExecution.getId()); + assertJobHasStopped(jobExecution); + } + + @Test + void stoppingAllJobExecutionsShouldLeaveJobExecutionsWithStatusOfStopping() throws Exception { + JobExecution jobExecutionOne = createRunningJobExecution(BASE_JOB_INST_NAME); + JobExecution jobExecutionTwo = createRunningJobExecution(BASE_JOB_INST_NAME+"_TWO"); + jobService.stop(jobExecutionOne.getId()); + assertJobHasStopped(jobExecutionOne); + jobService.stop(jobExecutionTwo.getId()); + assertJobHasStopped(jobExecutionTwo); + } + + private void assertJobHasStopped(JobExecution jobExecution) throws NoSuchJobExecutionException, JobExecutionNotRunningException { jobExecution = jobService.getJobExecution(jobExecution.getId()); - assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPED); + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING); + assertThat(jobExecution.isRunning()).isTrue(); } private void verifyJobInstance(long id, String name) throws Exception { @@ -221,9 +235,13 @@ private JobExecution createJobExecution(String name) throws Exception { return createJobExecution(name, BatchStatus.STARTING, false); } - private JobExecution createJobExecution(String name, boolean isRunning) + private JobExecution createRunningJobExecution(String name) throws Exception { - return createJobExecution(name, BatchStatus.STARTING, isRunning); + JobExecution jobExecution = createJobExecution(name, BatchStatus.STARTING, true); + jobExecution = jobService.getJobExecution(jobExecution.getId()); + assertThat(jobExecution.isRunning()).isTrue(); + assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING); + return jobExecution; } private JobExecution createJobExecution(String name, BatchStatus batchStatus, boolean isRunning) throws Exception {