Skip to content

Commit

Permalink
Fix loading of StepExecutionContext.
Browse files Browse the repository at this point in the history
  • Loading branch information
Corneil du Plessis committed Dec 14, 2023
1 parent e45fcd8 commit 0ca26b3
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2009-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.batch;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer;

import java.io.*;
import java.util.Base64;
import java.util.Map;

/**
* Implements the same logic as used in Batch 5.x
* @author Corneil du Plessis
*/
public class Base64ExecutionContextSerializer extends DefaultExecutionContextSerializer {
private final static Logger logger = LoggerFactory.getLogger(Base64ExecutionContextSerializer.class);
@SuppressWarnings({"unchecked", "NullableProblems"})
@Override
public Map<String, Object> deserialize(InputStream inputStream) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
IOUtils.copy(inputStream, buffer);
InputStream decodingStream = new ByteArrayInputStream(buffer.toByteArray());
try {
decodingStream = Base64.getDecoder().wrap(decodingStream);
} catch (Throwable x) {
logger.info("Cannot decode input as base");
}
try {
ObjectInputStream objectInputStream = new ObjectInputStream(decodingStream);
return (Map<String, Object>) objectInputStream.readObject();
} catch (IOException ex) {
throw new IllegalArgumentException("Failed to deserialize object", ex);
} catch (ClassNotFoundException ex) {
throw new IllegalStateException("Failed to deserialize object type", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*
* @author Dave Syer
* @author Glenn Renfro
*
* @author Corneil du Plessis
*/
public interface JobService {

Expand Down Expand Up @@ -287,7 +287,6 @@ Collection<JobExecution> getJobExecutionsForJobInstance(String jobName, Long job
*
* @throws NoSuchJobExecutionException thrown if job execution specified does not exist
*/
@Deprecated
Collection<StepExecution> getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException;
Collection<StepExecution> getStepExecutions(JobExecution jobExecution) throws NoSuchJobExecutionException;
void addStepExecutions(JobExecution jobExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
* @author Andy Clement
* @author Glenn Renfro
* @author Christian Tzolov
* @author Corneil du Plessis
*/
@SuppressWarnings("all")
@Configuration
Expand Down Expand Up @@ -341,8 +342,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo
}

@Bean
public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) {
return new JobStepExecutionController(taskJobService);
public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) {
return new JobStepExecutionController(jobServiceContainer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.rest.resource.StepExecutionResource;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.batch.JobService;
import org.springframework.cloud.dataflow.server.batch.NoSuchStepExecutionException;
import org.springframework.cloud.dataflow.server.job.support.StepExecutionResourceBuilder;
import org.springframework.cloud.dataflow.server.service.JobServiceContainer;
import org.springframework.cloud.dataflow.server.service.TaskJobService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
Expand All @@ -39,39 +37,28 @@
import org.springframework.hateoas.server.ExposesResourceFor;
import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport;
import org.springframework.http.HttpStatus;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo;
import org.springframework.web.bind.annotation.*;

/**
* @author Glenn Renfro
* @author Corneil du Plessis
*/
@RestController
@RequestMapping("/jobs/executions/{jobExecutionId}/steps")
@ExposesResourceFor(StepExecutionResource.class)
public class JobStepExecutionController {


private final TaskJobService taskJobService;

private final JobServiceContainer jobServiceContainer;
/**
* Creates a {@code JobStepExecutionsController} that retrieves Job Step Execution
* information from a the {@link JobServiceContainer}
*
* @param taskJobService TaskJobService can query all schemas.
* @param jobServiceContainer JobServiceContainer to select the JobService
*/
@Autowired
public JobStepExecutionController(TaskJobService taskJobService) {
Assert.notNull(taskJobService, "taskJobService required");
this.taskJobService = taskJobService;
public JobStepExecutionController(JobServiceContainer jobServiceContainer) {
this.jobServiceContainer = jobServiceContainer;
}

/**
Expand All @@ -95,8 +82,8 @@ public PagedModel<StepExecutionResource> stepExecutions(
if(!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget);
List<StepExecution> result = new ArrayList<>(taskJobExecution.getJobExecution().getStepExecutions());
JobService jobService = jobServiceContainer.get(schemaTarget);
List<StepExecution> result = new ArrayList<>(jobService.getStepExecutions(id));
Page<StepExecution> page = new PageImpl<>(result, pageable, result.size());
final Assembler stepAssembler = new Assembler(schemaTarget);
return assembler.toModel(page, stepAssembler);
Expand All @@ -122,13 +109,9 @@ public StepExecutionResource getStepExecution(
if(!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(id, schemaTarget);
JobService jobService = jobServiceContainer.get(schemaTarget);
StepExecution stepExecution = jobService.getStepExecution(id, stepId);
final Assembler stepAssembler = new Assembler(schemaTarget);
StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions()
.stream()
.filter(s -> s.getId().equals(stepId))
.findFirst()
.orElseThrow(() -> new NoSuchStepExecutionException("Step " + stepId + " in Job " + id + " not found"));
return stepAssembler.toModel(stepExecution);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.rest.resource.StepExecutionProgressInfoResource;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.batch.JobService;
Expand All @@ -33,18 +32,14 @@
import org.springframework.hateoas.server.mvc.RepresentationModelAssemblerSupport;
import org.springframework.http.HttpStatus;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo;
import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.methodOn;

/**
* @author Glenn Renfro
* @author Corneil du Plessis
*/
@RestController
@RequestMapping("/jobs/executions/{jobExecutionId}/steps")
Expand Down Expand Up @@ -92,12 +87,8 @@ public StepExecutionProgressInfoResource progress(
if (!StringUtils.hasText(schemaTarget)) {
schemaTarget = SchemaVersionTarget.defaultTarget().getName();
}
TaskJobExecution taskJobExecution = taskJobService.getJobExecution(jobExecutionId, schemaTarget);
StepExecution stepExecution = taskJobExecution.getJobExecution().getStepExecutions()
.stream()
.filter(s -> s.getId().equals(stepExecutionId))
.findFirst()
.orElseThrow(() -> new NoSuchStepExecutionException("Step execution " + stepExecutionId + " for Job " + jobExecutionId + " not found"));
JobService jobService = jobServiceContainer.get(schemaTarget);
StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
String stepName = stepExecution.getStepName();
if (stepName.contains(":partition")) {
// assume we want to compare all partitions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
/*
* Copyright 2009-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.service;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.batch.Base64ExecutionContextSerializer;
import org.springframework.cloud.dataflow.server.batch.JobService;
import org.springframework.cloud.dataflow.server.batch.SimpleJobServiceFactoryBean;
import org.springframework.cloud.dataflow.server.controller.NoSuchSchemaTargetException;
Expand All @@ -18,6 +36,11 @@
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.StringUtils;

/**
* The container provides implementations of JobService for each SchemaTarget.
*
* @author Corneil du Plessis
*/
public class JobServiceContainer {
private final static Logger logger = LoggerFactory.getLogger(JobServiceContainer.class);
private final Map<String, JobService> container = new HashMap<>();
Expand All @@ -42,6 +65,14 @@ public JobServiceContainer(
factoryBean.setTablePrefix(target.getBatchPrefix());
factoryBean.setAppBootSchemaVersionTarget(target);
factoryBean.setSchemaService(schemaService);
switch (target.getSchemaVersion()) {
case BOOT3:
factoryBean.setSerializer(new Base64ExecutionContextSerializer());
break;
case BOOT2:
factoryBean.setSerializer(new Jackson2ExecutionContextStringSerializer());
break;
}
try {
factoryBean.afterPropertiesSet();
container.put(target.getName(), factoryBean.getObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ public JobExecutionThinController jobExecutionThinController(TaskJobService repo
}

@Bean
public JobStepExecutionController jobStepExecutionController(TaskJobService taskJobService) {
return new JobStepExecutionController(taskJobService);
public JobStepExecutionController jobStepExecutionController(JobServiceContainer jobServiceContainer) {
return new JobStepExecutionController(jobServiceContainer);
}

@Bean
Expand Down

0 comments on commit 0ca26b3

Please sign in to comment.