Skip to content

Commit

Permalink
Migrate Task Components to Boot 3, Task 3
Browse files Browse the repository at this point in the history
This is Phase 1 of migrate the Task and Job components to Boot3/Task3/Batch5
Its purpose was to focus on migrating as much Task related features over as possible.

Status:
* All Task only tests in Spring Cloud Data Flow Server Core are passing
* Job related migrations have not started.  Nor those that are Batch/Task related
* Dataflow does start and a person can register a Task app and create a task definition.
* Task launches are still failing but Chris is looking to resolve that due a dependency hiccup from deployer

Noteable changes
* Establish property that tells which platformTransactionManager Task should use
* Update Task components to run Boot3 only components.
* Remove Schema related code from Task Launches and exploration
* Remove AggregateExecutionSupport from project and remove usages.
* AggregateTaskExplorer is still present and debating renaming it and keeping it because of some of the features it offered.
* Removed the AggregateTaskExecution

Updated based on code review
  • Loading branch information
cppwfs committed Mar 5, 2024
1 parent 64f8bd0 commit 3eea971
Show file tree
Hide file tree
Showing 83 changed files with 706 additions and 2,031 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import javax.sql.DataSource;

import org.springframework.cloud.dataflow.aggregate.task.impl.DefaultAggregateExecutionSupport;
import org.springframework.cloud.dataflow.aggregate.task.impl.DefaultAggregateTaskExplorer;
import org.springframework.cloud.dataflow.registry.service.AppRegistryService;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.schema.service.SchemaServiceConfiguration;
import org.springframework.context.annotation.Bean;
Expand All @@ -36,33 +34,21 @@
@Import(SchemaServiceConfiguration.class)
public class AggregateTaskConfiguration {

@Bean
public AggregateExecutionSupport aggregateExecutionSupport(
AppRegistryService registryService,
SchemaService schemaService
) {
return new DefaultAggregateExecutionSupport(registryService, schemaService);
}

@Bean
public AggregateTaskExplorer aggregateTaskExplorer(
DataSource dataSource,
DataflowTaskExecutionQueryDao taskExecutionQueryDao,
SchemaService schemaService,
AggregateExecutionSupport aggregateExecutionSupport,
TaskDefinitionReader taskDefinitionReader,
TaskDeploymentReader taskDeploymentReader
) {
Assert.notNull(dataSource, "dataSource required");
Assert.notNull(taskExecutionQueryDao, "taskExecutionQueryDao required");
Assert.notNull(schemaService, "schemaService required");
Assert.notNull(aggregateExecutionSupport, "aggregateExecutionSupport required");
Assert.notNull(taskDefinitionReader, "taskDefinitionReader required");
Assert.notNull(taskDeploymentReader, "taskDeploymentReader required");
return new DefaultAggregateTaskExplorer(dataSource,
taskExecutionQueryDao,
schemaService,
aggregateExecutionSupport,
taskDefinitionReader,
taskDeploymentReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.Set;

import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -35,10 +34,9 @@ public interface AggregateTaskExplorer {
* find a task execution given an execution id and schema target.
*
* @param executionId the task execution id
* @param schemaTarget the schema target
* @return the task execution
*/
AggregateTaskExecution getTaskExecution(long executionId, String schemaTarget);
TaskExecution getTaskExecution(long executionId);

/**
* find a task execution given an external execution id and platform name.
Expand All @@ -47,11 +45,11 @@ public interface AggregateTaskExplorer {
* @param platform the platform name
* @return the task execution
*/
AggregateTaskExecution getTaskExecutionByExternalExecutionId(String externalExecutionId, String platform);
TaskExecution getTaskExecutionByExternalExecutionId(String externalExecutionId, String platform);

List<AggregateTaskExecution> findChildTaskExecutions(long executionId, String schemaTarget);
List<TaskExecution> findChildTaskExecutions(long executionId);

List<AggregateTaskExecution> findChildTaskExecutions(Collection<Long> parentIds, String schemaTarget);
List<TaskExecution> findChildTaskExecutions(Collection<Long> parentIds);

/**
* Retrieve a collection of taskExecutions that have the task name provided.
Expand All @@ -60,7 +58,7 @@ public interface AggregateTaskExplorer {
* @param pageable the constraints for the search
* @return the set of running executions for tasks with the specified name
*/
Page<AggregateTaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieve a list of available task names.
Expand Down Expand Up @@ -98,7 +96,7 @@ public interface AggregateTaskExplorer {
* @param onlyCompleted whether to include only completed tasks
* @return list of task executions
*/
List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean onlyCompleted);
List<TaskExecution> findTaskExecutions(String taskName, boolean onlyCompleted);

/**
* Get a list of executions for a task by name, completion status and end time.
Expand All @@ -108,7 +106,7 @@ public interface AggregateTaskExplorer {
* @return list of task executions
* @since 2.11.0
*/
List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, Date endTime);
List<TaskExecution> findTaskExecutionsBeforeEndTime(String taskName, Date endTime);

/**
* Get a collection/page of executions.
Expand All @@ -117,7 +115,7 @@ public interface AggregateTaskExplorer {
* @param pageable the constraints for the search
* @return list of task executions
*/
Page<AggregateTaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable);
Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable);

/**
* Retrieves all the task executions within the pageable constraints sorted by start
Expand All @@ -126,28 +124,26 @@ public interface AggregateTaskExplorer {
* @param pageable the constraints for the search
* @return page containing the results from the search
*/
Page<AggregateTaskExecution> findAll(Pageable pageable);
Page<TaskExecution> findAll(Pageable pageable);

/**
* Returns the id of the TaskExecution that the requested Spring Batch job execution
* was executed within the context of. Returns null if none were found.
*
* @param jobExecutionId the id of the JobExecution
* @param schemaTarget the schema target
* @return the id of the {@link TaskExecution}
*/
Long getTaskExecutionIdByJobExecutionId(long jobExecutionId, String schemaTarget);
Long getTaskExecutionIdByJobExecutionId(long jobExecutionId);

/**
* Returns a Set of JobExecution ids for the jobs that were executed within the scope
* of the requested task.
*
* @param taskExecutionId id of the {@link TaskExecution}
* @param schemaTarget the schema target
* @return a <code>Set</code> of the ids of the job executions executed within the
* task.
*/
Set<Long> getJobExecutionIdsByTaskExecutionId(long taskExecutionId, String schemaTarget);
Set<Long> getJobExecutionIdsByTaskExecutionId(long taskExecutionId);

/**
* Returns a {@link List} of the latest {@link TaskExecution} for 1 or more task
Expand All @@ -167,7 +163,7 @@ public interface AggregateTaskExplorer {
* @param taskNames At least 1 task name must be provided
* @return List of TaskExecutions. May be empty but never null.
*/
List<AggregateTaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames);
List<TaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames);

/**
* Returns the latest task execution for a given task name. Will ultimately apply the
Expand All @@ -178,5 +174,5 @@ public interface AggregateTaskExplorer {
* @return The latest Task Execution or null
* @see #getLatestTaskExecutionsByTaskNames(String...)
*/
AggregateTaskExecution getLatestTaskExecutionForTaskName(String taskName);
TaskExecution getLatestTaskExecutionForTaskName(String taskName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Date;
import java.util.List;

import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.data.domain.Page;
Expand All @@ -39,28 +38,25 @@ public interface DataflowTaskExecutionQueryDao {
* Retrieves a task execution from the task repository.
*
* @param executionId the id associated with the task execution.
* @param schemaTarget the schema target.
* @return a fully qualified TaskExecution instance.
*/
AggregateTaskExecution getTaskExecution(long executionId, String schemaTarget);
TaskExecution getTaskExecution(long executionId);

/**
* Retrieves a list of task executions where the provided execution id and schemaTarget represents the parent of task execution.
*
* @param executionId parent task execution id
* @param schemaTarget parent task schema target
* @return the task executions
*/
List<AggregateTaskExecution> findChildTaskExecutions(long executionId, String schemaTarget);
List<TaskExecution> findChildTaskExecutions(long executionId);

/**
* Retrieves a list of task executions where the provided execution ids and schemaTarget represents the parents of task executions.
*
* @param parentIds parent task execution ids
* @param schemaTarget parent task schema target
* @return the task executions
*/
List<AggregateTaskExecution> findChildTaskExecutions(Collection<Long> parentIds, String schemaTarget);
List<TaskExecution> findChildTaskExecutions(Collection<Long> parentIds);

/**
* Find task executions by task name and completion status.
Expand All @@ -69,7 +65,7 @@ public interface DataflowTaskExecutionQueryDao {
* @param completed whether to include only completed task executions.
* @return list of task executions
*/
List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean completed);
List<TaskExecution> findTaskExecutions(String taskName, boolean completed);

/**
* Find task executions by task name whose end date is before the specified date.
Expand All @@ -78,7 +74,7 @@ public interface DataflowTaskExecutionQueryDao {
* @param endTime the time before the task ended.
* @return list of task executions.
*/
List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime);
List<TaskExecution> findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime);

/**
* Retrieves current number of task executions for a taskName.
Expand Down Expand Up @@ -135,7 +131,7 @@ public interface DataflowTaskExecutionQueryDao {
* @param pageable the constraints for the search.
* @return set of running task executions.
*/
Page<AggregateTaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieves a subset of task executions by task name, start location and size.
Expand All @@ -145,7 +141,7 @@ public interface DataflowTaskExecutionQueryDao {
* @return a list that contains task executions from the query bound by the start
* position and count specified by the user.
*/
Page<AggregateTaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable);
Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable);

/**
* Retrieves a sorted list of distinct task names for the task executions.
Expand All @@ -161,7 +157,7 @@ public interface DataflowTaskExecutionQueryDao {
* @return page containing the results from the search
*/

Page<AggregateTaskExecution> findAll(Pageable pageable);
Page<TaskExecution> findAll(Pageable pageable);

/**
* Returns a {@link List} of the latest {@link TaskExecution} for 1 or more task
Expand All @@ -181,7 +177,7 @@ public interface DataflowTaskExecutionQueryDao {
* @param taskNames At least 1 task name must be provided
* @return List of TaskExecutions. May be empty but never null.
*/
List<AggregateTaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames);
List<TaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames);

/**
* Returns the latest task execution for a given task name. Will ultimately apply the
Expand All @@ -192,8 +188,8 @@ public interface DataflowTaskExecutionQueryDao {
* @return The latest Task Execution or null
* @see #getLatestTaskExecutionsByTaskNames(String...)
*/
AggregateTaskExecution getLatestTaskExecutionForTaskName(String taskName);
TaskExecution getLatestTaskExecutionForTaskName(String taskName);

AggregateTaskExecution geTaskExecutionByExecutionId(String executionId, String taskName);
TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName);

}
Loading

0 comments on commit 3eea971

Please sign in to comment.