diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/Base64ExecutionContextSerializer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/Base64ExecutionContextSerializer.java new file mode 100644 index 0000000000..bc250bc25e --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/Base64ExecutionContextSerializer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2009-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.server.batch; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer; + +import java.io.*; +import java.util.Base64; +import java.util.Map; + +/** + * Implements the same logic as used in Batch 5.x + * @author Corneil du Plessis + */ +public class Base64ExecutionContextSerializer extends DefaultExecutionContextSerializer { + private final static Logger logger = LoggerFactory.getLogger(Base64ExecutionContextSerializer.class); + @SuppressWarnings({"unchecked", "NullableProblems"}) + @Override + public Map deserialize(InputStream inputStream) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + IOUtils.copy(inputStream, buffer); + InputStream decodingStream = new ByteArrayInputStream(buffer.toByteArray()); + try { + decodingStream = Base64.getDecoder().wrap(decodingStream); + } catch (Throwable x) { + logger.info("Cannot decode input as base"); + } + try { + ObjectInputStream objectInputStream = new ObjectInputStream(decodingStream); + return (Map) objectInputStream.readObject(); + } catch (IOException ex) { + throw new IllegalArgumentException("Failed to deserialize object", ex); + } catch (ClassNotFoundException ex) { + throw new IllegalStateException("Failed to deserialize object type", ex); + } + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java index 3063892c31..dd7669bbe4 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java @@ -45,7 +45,7 @@ * * @author Dave Syer * @author Glenn Renfro - * + * @author Corneil du Plessis */ public interface JobService { @@ -287,7 +287,6 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * * @throws NoSuchJobExecutionException thrown if job execution specified does not exist */ - @Deprecated Collection getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException; Collection getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException; void addStepExecutions(JobExecution jobExecution); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java index a638efc232..7b0322fa34 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java @@ -158,6 +158,7 @@ * @author Andy Clement * @author Glenn Renfro * @author Christian Tzolov + * @author Corneil du Plessis */ @SuppressWarnings("all") @Configuration @@ -341,8 +342,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) { - return new JobStepExecutionController(taskJobService); + public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { + return new JobStepExecutionController(jobServiceContainer); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java index 280268eba2..b39ff14236 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java @@ -23,14 +23,12 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.cloud.dataflow.rest.resource.StepExecutionResource; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException; import org.springframework.cloud.dataflow.server.job.support.StepExecutionResourceBuilder; import org.springframework.cloud.dataflow.server.service.JobServiceContainer; -import org.springframework.cloud.dataflow.server.service.TaskJobService; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; @@ -39,39 +37,28 @@ import org.springframework.hateoas.server.ExposesResourceFor; import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport; import org.springframework.http.HttpStatus; -import org.springframework.lang.NonNull; -import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; - -import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo; +import org.springframework.web.bind.annotation.*; /** * @author Glenn Renfro + * @author Corneil du Plessis */ @RestController @RequestMapping("/jobs/executions/{jobExecutionId}/steps") @ExposesResourceFor(StepExecutionResource.class) public class JobStepExecutionController { - - private final TaskJobService taskJobService; - + private final JobServiceContainer jobServiceContainer; /** * Creates a {@code JobStepExecutionsController} that retrieves Job Step Execution * information from a the {@link JobServiceContainer} * - * @param taskJobService TaskJobService can query all schemas. + * @param jobServiceContainer JobServiceContainer to select the JobService */ @Autowired - public JobStepExecutionController(TaskJobService taskJobService) { - Assert.notNull(taskJobService, "taskJobService required"); - this.taskJobService = taskJobService; + public JobStepExecutionController(JobServiceContainer jobServiceContainer) { + this.jobServiceContainer = jobServiceContainer; } /** @@ -95,8 +82,8 @@ public PagedModel stepExecutions( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget); - List result = new ArrayList<>(taskJobExecution.getJobExecution().getStepExecutions()); + JobService jobService = jobServiceContainer.get(schemaTarget); + List result = new ArrayList<>(jobService.getStepExecutions(id)); Page page = new PageImpl<>(result, pageable, result.size()); final Assembler stepAssembler = new Assembler(schemaTarget); return assembler.toModel(page, stepAssembler); @@ -122,13 +109,9 @@ public StepExecutionResource getStepExecution( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget); + JobService jobService = jobServiceContainer.get(schemaTarget); + StepExecution stepExecution = jobService.getStepExecution(id, stepId); final Assembler stepAssembler = new Assembler(schemaTarget); - StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions() - .stream() - .filter(s -> s.getId().equals(stepId)) - .findFirst() - .orElseThrow(() -> new NoSuchStepExecutionException("Step " + stepId + " in Job " + id + " not found")); return stepAssembler.toModel(stepExecution); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java index 9dc1d505d8..2e454d10d5 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java @@ -21,7 +21,6 @@ import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory; -import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.cloud.dataflow.rest.resource.StepExecutionProgressInfoResource; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.batch.JobService; @@ -33,18 +32,14 @@ import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport; import org.springframework.http.HttpStatus; import org.springframework.util.StringUtils; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo; import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.methodOn; /** * @author Glenn Renfro + * @author Corneil du Plessis */ @RestController @RequestMapping("/jobs/executions/{jobExecutionId}/steps") @@ -92,12 +87,8 @@ public StepExecutionProgressInfoResource progress( if (!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(jobExecutionId, schemaTarget); - StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions() - .stream() - .filter(s -> s.getId().equals(stepExecutionId)) - .findFirst() - .orElseThrow(() -> new NoSuchStepExecutionException("Step execution " + stepExecutionId + " for Job " + jobExecutionId + " not found")); + JobService jobService = jobServiceContainer.get(schemaTarget); + StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId); String stepName = stepExecution.getStepName(); if (stepName.contains(":partition")) { // assume we want to compare all partitions diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java index 95af1f6455..faf239963f 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java @@ -1,15 +1,33 @@ +/* + * Copyright 2009-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.cloud.dataflow.server.service; -import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; +import javax.sql.DataSource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.server.batch.Base64ExecutionContextSerializer; import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean; import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; @@ -18,6 +36,11 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.StringUtils; +/** + * The container provides implementations of JobService for each SchemaTarget. + * + * @author Corneil du Plessis + */ public class JobServiceContainer { private final static Logger logger = LoggerFactory.getLogger(JobServiceContainer.class); private final Map container = new HashMap<>(); @@ -42,6 +65,14 @@ public JobServiceContainer( factoryBean.setTablePrefix(target.getBatchPrefix()); factoryBean.setAppBootSchemaVersionTarget(target); factoryBean.setSchemaService(schemaService); + switch (target.getSchemaVersion()) { + case BOOT3: + factoryBean.setSerializer(new Base64ExecutionContextSerializer()); + break; + case BOOT2: + factoryBean.setSerializer(new Jackson2ExecutionContextStringSerializer()); + break; + } try { factoryBean.afterPropertiesSet(); container.put(target.getName(), factoryBean.getObject()); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index f3c72381d8..167ba87757 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -197,8 +197,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) { - return new JobStepExecutionController(taskJobService); + public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { + return new JobStepExecutionController(jobServiceContainer); } @Bean