diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/AllInOneExecutionContextSerializer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/AllInOneExecutionContextSerializer.java new file mode 100644 index 0000000000..f855e3cc44 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/AllInOneExecutionContextSerializer.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.server.batch; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; + +import java.io.*; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +/** + * Implements the same logic as used in Batch 5.x + * @author Corneil du Plessis + */ +public class AllInOneExecutionContextSerializer extends Jackson2ExecutionContextStringSerializer { + private final static Logger logger = LoggerFactory.getLogger(AllInOneExecutionContextSerializer.class); + @SuppressWarnings({"unchecked", "NullableProblems"}) + @Override + public Map deserialize(InputStream inputStream) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + IOUtils.copy(inputStream, buffer); + Map result = new HashMap<>(); + // Try Jackson + try { + return super.deserialize(new ByteArrayInputStream(buffer.toByteArray())); + } catch (Throwable x) { + result.put("context.deserialize.error.jackson", x.toString()); + } + InputStream decodingStream = new ByteArrayInputStream(buffer.toByteArray()); + try { + // Try decode base64 + decodingStream = Base64.getDecoder().wrap(decodingStream); + } catch (Throwable x) { + // Use original input for java deserialization + decodingStream = new ByteArrayInputStream(buffer.toByteArray()); + result.put("context.deserialize.error.base64.decode", x.toString()); + } + try { + ObjectInputStream objectInputStream = new ObjectInputStream(decodingStream); + return (Map) objectInputStream.readObject(); + } catch (Throwable x) { + result.put("context.deserialize.error.java.deserialization", x.toString()); + } + // They may have a custom serializer or custom classes. + logger.warn("deserialization failed:{}", result); + return result; + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java index 3063892c31..1fd8afd86a 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JobService.java @@ -1,5 +1,5 @@ /* - * Copyright 2009-2010 the original author or authors. + * Copyright 2009-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ * * @author Dave Syer * @author Glenn Renfro - * + * @author Corneil du Plessis */ public interface JobService { @@ -287,7 +287,6 @@ Collection getJobExecutionsForJobInstance(String jobName, Long job * * @throws NoSuchJobExecutionException thrown if job execution specified does not exist */ - @Deprecated Collection getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException; Collection getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException; void addStepExecutions(JobExecution jobExecution); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java index a638efc232..c1e99eb963 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -158,6 +158,7 @@ * @author Andy Clement * @author Glenn Renfro * @author Christian Tzolov + * @author Corneil du Plessis */ @SuppressWarnings("all") @Configuration @@ -341,8 +342,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) { - return new JobStepExecutionController(taskJobService); + public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { + return new JobStepExecutionController(jobServiceContainer); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java index 280268eba2..5a4c121e4f 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,14 +23,12 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.cloud.dataflow.rest.resource.StepExecutionResource; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException; import org.springframework.cloud.dataflow.server.job.support.StepExecutionResourceBuilder; import org.springframework.cloud.dataflow.server.service.JobServiceContainer; -import org.springframework.cloud.dataflow.server.service.TaskJobService; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; @@ -39,7 +37,6 @@ import org.springframework.hateoas.server.ExposesResourceFor; import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport; import org.springframework.http.HttpStatus; -import org.springframework.lang.NonNull; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PathVariable; @@ -49,29 +46,26 @@ import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; -import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo; - /** * @author Glenn Renfro + * @author Corneil du Plessis */ @RestController @RequestMapping("/jobs/executions/{jobExecutionId}/steps") @ExposesResourceFor(StepExecutionResource.class) public class JobStepExecutionController { - - private final TaskJobService taskJobService; - + private final JobServiceContainer jobServiceContainer; /** * Creates a {@code JobStepExecutionsController} that retrieves Job Step Execution * information from a the {@link JobServiceContainer} * - * @param taskJobService TaskJobService can query all schemas. + * @param jobServiceContainer JobServiceContainer to select the JobService */ @Autowired - public JobStepExecutionController(TaskJobService taskJobService) { - Assert.notNull(taskJobService, "taskJobService required"); - this.taskJobService = taskJobService; + public JobStepExecutionController(JobServiceContainer jobServiceContainer) { + Assert.notNull(jobServiceContainer, "jobServiceContainer required"); + this.jobServiceContainer = jobServiceContainer; } /** @@ -95,8 +89,8 @@ public PagedModel stepExecutions( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget); - List result = new ArrayList<>(taskJobExecution.getJobExecution().getStepExecutions()); + JobService jobService = jobServiceContainer.get(schemaTarget); + List result = new ArrayList<>(jobService.getStepExecutions(id)); Page page = new PageImpl<>(result, pageable, result.size()); final Assembler stepAssembler = new Assembler(schemaTarget); return assembler.toModel(page, stepAssembler); @@ -122,13 +116,9 @@ public StepExecutionResource getStepExecution( if(!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget); + JobService jobService = jobServiceContainer.get(schemaTarget); + StepExecution stepExecution = jobService.getStepExecution(id, stepId); final Assembler stepAssembler = new Assembler(schemaTarget); - StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions() - .stream() - .filter(s -> s.getId().equals(stepId)) - .findFirst() - .orElseThrow(() -> new NoSuchStepExecutionException("Step " + stepId + " in Job " + id + " not found")); return stepAssembler.toModel(stepExecution); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java index 9dc1d505d8..388dec86d4 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobStepExecutionProgressController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory; -import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.cloud.dataflow.rest.resource.StepExecutionProgressInfoResource; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.batch.JobService; @@ -45,6 +44,7 @@ /** * @author Glenn Renfro + * @author Corneil du Plessis */ @RestController @RequestMapping("/jobs/executions/{jobExecutionId}/steps") @@ -92,12 +92,8 @@ public StepExecutionProgressInfoResource progress( if (!StringUtils.hasText(schemaTarget)) { schemaTarget = SchemaVersionTarget.defaultTarget().getName(); } - TaskJobExecution taskJobExecution = taskJobService.getJobExecution(jobExecutionId, schemaTarget); - StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions() - .stream() - .filter(s -> s.getId().equals(stepExecutionId)) - .findFirst() - .orElseThrow(() -> new NoSuchStepExecutionException("Step execution " + stepExecutionId + " for Job " + jobExecutionId + " not found")); + JobService jobService = jobServiceContainer.get(schemaTarget); + StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId); String stepName = stepExecution.getStepName(); if (stepName.contains(":partition")) { // assume we want to compare all partitions diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java index 95af1f6455..f8dcffc582 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/JobServiceContainer.java @@ -1,15 +1,32 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.cloud.dataflow.server.service; -import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; +import javax.sql.DataSource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.server.batch.AllInOneExecutionContextSerializer; import org.springframework.cloud.dataflow.server.batch.JobService; import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean; import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException; @@ -18,6 +35,11 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.StringUtils; +/** + * The container provides implementations of JobService for each SchemaTarget. + * + * @author Corneil du Plessis + */ public class JobServiceContainer { private final static Logger logger = LoggerFactory.getLogger(JobServiceContainer.class); private final Map container = new HashMap<>(); @@ -42,6 +64,7 @@ public JobServiceContainer( factoryBean.setTablePrefix(target.getBatchPrefix()); factoryBean.setAppBootSchemaVersionTarget(target); factoryBean.setSchemaService(schemaService); + factoryBean.setSerializer(new AllInOneExecutionContextSerializer()); try { factoryBean.afterPropertiesSet(); container.put(target.getName(), factoryBean.getObject()); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index f3c72381d8..0854ca5390 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -197,8 +197,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo } @Bean - public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) { - return new JobStepExecutionController(taskJobService); + public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) { + return new JobStepExecutionController(jobServiceContainer); } @Bean diff --git a/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/DB2SmokeTest.java b/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/DB2SmokeTest.java index efd0d0f6e7..76320dfe78 100644 --- a/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/DB2SmokeTest.java +++ b/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/server/db/migration/DB2SmokeTest.java @@ -27,7 +27,7 @@ public class DB2SmokeTest extends AbstractSmokeTest { @BeforeAll static void startContainer() { - container = new Db2Container().acceptLicense(); + container = new Db2Container("ibmcom/db2:11.5.0.0a").acceptLicense(); container.start(); } }