Skip to content

Commit

Permalink
Fix display of StepExecutionContext. (#5604)
Browse files Browse the repository at this point in the history
* Fix StepExecutionControllers and add AllInOneExecutionContextSerializer to support Jackson and other serialization options.
Fixes #5557
  • Loading branch information
Corneil du Plessis authored Dec 14, 2023
1 parent e972661 commit 2b3c13b
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.batch;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer;

import java.io.*;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

/**
* Implements the same logic as used in Batch 5.x
* @author Corneil du Plessis
*/
public class AllInOneExecutionContextSerializer extends Jackson2ExecutionContextStringSerializer {
private final static Logger logger = LoggerFactory.getLogger(AllInOneExecutionContextSerializer.class);
@SuppressWarnings({"unchecked", "NullableProblems"})
@Override
public Map<String, Object> deserialize(InputStream inputStream) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
IOUtils.copy(inputStream, buffer);
Map<String, Object> result = new HashMap<>();
// Try Jackson
try {
return super.deserialize(new ByteArrayInputStream(buffer.toByteArray()));
} catch (Throwable x) {
result.put("context.deserialize.error.jackson", x.toString());
}
InputStream decodingStream = new ByteArrayInputStream(buffer.toByteArray());
try {
// Try decode base64
decodingStream = Base64.getDecoder().wrap(decodingStream);
} catch (Throwable x) {
// Use original input for java deserialization
decodingStream = new ByteArrayInputStream(buffer.toByteArray());
result.put("context.deserialize.error.base64.decode", x.toString());
}
try {
ObjectInputStream objectInputStream = new ObjectInputStream(decodingStream);
return (Map<String, Object>) objectInputStream.readObject();
} catch (Throwable x) {
result.put("context.deserialize.error.java.deserialization", x.toString());
}
// They may have a custom serializer or custom classes.
logger.warn("deserialization failed:{}", result);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2009-2010 the original author or authors.
* Copyright 2009-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,7 +45,7 @@
*
* @author Dave Syer
* @author Glenn Renfro
*
* @author Corneil du Plessis
*/
public interface JobService {

Expand Down Expand Up @@ -287,7 +287,6 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
*
* @throws NoSuchJobExecutionException thrown if job execution specified does not exist
*/
@Deprecated
Collection<StepExecution> getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException;
Collection<StepExecution> getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException;
void addStepExecutions(JobExecution jobExecution);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -158,6 +158,7 @@
* @author Andy Clement
* @author Glenn Renfro
* @author Christian Tzolov
* @author Corneil du Plessis
*/
@SuppressWarnings("all")
@Configuration
Expand Down Expand Up @@ -341,8 +342,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo
}

@Bean
public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) {
return new JobStepExecutionController(taskJobService);
public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) {
return new JobStepExecutionController(jobServiceContainer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,14 +23,12 @@
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.rest.resource.StepExecutionResource;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.batch.JobService;
import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException;
import org.springframework.cloud.dataflow.server.job.support.StepExecutionResourceBuilder;
import org.springframework.cloud.dataflow.server.service.JobServiceContainer;
import org.springframework.cloud.dataflow.server.service.TaskJobService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
Expand All @@ -39,7 +37,6 @@
import org.springframework.hateoas.server.ExposesResourceFor;
import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport;
import org.springframework.http.HttpStatus;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -49,29 +46,26 @@
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo;

/**
* @author Glenn Renfro
* @author Corneil du Plessis
*/
@RestController
@RequestMapping("/jobs/executions/{jobExecutionId}/steps")
@ExposesResourceFor(StepExecutionResource.class)
public class JobStepExecutionController {


private final TaskJobService taskJobService;

private final JobServiceContainer jobServiceContainer;
/**
* Creates a {@code JobStepExecutionsController} that retrieves Job Step Execution
* information from a the {@link JobServiceContainer}
*
* @param taskJobService TaskJobService can query all schemas.
* @param jobServiceContainer JobServiceContainer to select the JobService
*/
@Autowired
public JobStepExecutionController(TaskJobService taskJobService) {
Assert.notNull(taskJobService, "taskJobService required");
this.taskJobService = taskJobService;
public JobStepExecutionController(JobServiceContainer jobServiceContainer) {
Assert.notNull(jobServiceContainer, "jobServiceContainer required");
this.jobServiceContainer = jobServiceContainer;
}

/**
Expand All @@ -95,8 +89,8 @@ public PagedModel<StepExecutionResource> stepExecutions(
if(!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget);
List<StepExecution> result = new ArrayList<>(taskJobExecution.getJobExecution().getStepExecutions());
JobService jobService = jobServiceContainer.get(schemaTarget);
List<StepExecution> result = new ArrayList<>(jobService.getStepExecutions(id));
Page<StepExecution> page = new PageImpl<>(result, pageable, result.size());
final Assembler stepAssembler = new Assembler(schemaTarget);
return assembler.toModel(page, stepAssembler);
Expand All @@ -122,13 +116,9 @@ public StepExecutionResource getStepExecution(
if(!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget);
JobService jobService = jobServiceContainer.get(schemaTarget);
StepExecution stepExecution = jobService.getStepExecution(id, stepId);
final Assembler stepAssembler = new Assembler(schemaTarget);
StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions()
.stream()
.filter(s -> s.getId().equals(stepId))
.findFirst()
.orElseThrow(() -> new NoSuchStepExecutionException("Step " + stepId + " in Job " + id + " not found"));
return stepAssembler.toModel(stepExecution);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.rest.resource.StepExecutionProgressInfoResource;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.batch.JobService;
Expand All @@ -45,6 +44,7 @@

/**
* @author Glenn Renfro
* @author Corneil du Plessis
*/
@RestController
@RequestMapping("/jobs/executions/{jobExecutionId}/steps")
Expand Down Expand Up @@ -92,12 +92,8 @@ public StepExecutionProgressInfoResource progress(
if (!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(jobExecutionId, schemaTarget);
StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions()
.stream()
.filter(s -> s.getId().equals(stepExecutionId))
.findFirst()
.orElseThrow(() -> new NoSuchStepExecutionException("Step execution " + stepExecutionId + " for Job " + jobExecutionId + " not found"));
JobService jobService = jobServiceContainer.get(schemaTarget);
StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
String stepName = stepExecution.getStepName();
if (stepName.contains(":partition")) {
// assume we want to compare all partitions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.service;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.batch.AllInOneExecutionContextSerializer;
import org.springframework.cloud.dataflow.server.batch.JobService;
import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean;
import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException;
Expand All @@ -18,6 +35,11 @@
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.StringUtils;

/**
* The container provides implementations of JobService for each SchemaTarget.
*
* @author Corneil du Plessis
*/
public class JobServiceContainer {
private final static Logger logger = LoggerFactory.getLogger(JobServiceContainer.class);
private final Map<String, JobService> container = new HashMap<>();
Expand All @@ -42,6 +64,7 @@ public JobServiceContainer(
factoryBean.setTablePrefix(target.getBatchPrefix());
factoryBean.setAppBootSchemaVersionTarget(target);
factoryBean.setSchemaService(schemaService);
factoryBean.setSerializer(new AllInOneExecutionContextSerializer());
try {
factoryBean.afterPropertiesSet();
container.put(target.getName(), factoryBean.getObject());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,8 +197,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo
}

@Bean
public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) {
return new JobStepExecutionController(taskJobService);
public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) {
return new JobStepExecutionController(jobServiceContainer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class DB2SmokeTest extends AbstractSmokeTest {
@BeforeAll
static void startContainer() {
container = new Db2Container().acceptLicense();
container = new Db2Container("ibmcom/db2:11.5.0.0a").acceptLicense();
container.start();
}
}

0 comments on commit 2b3c13b

Please sign in to comment.