diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java index 2b5724239e..e494b87ee7 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java @@ -581,6 +581,7 @@ public AggregateTaskExecution mapRow(ResultSet rs, int rowNum) throws SQLExcepti rs.getString("EXTERNAL_EXECUTION_ID"), parentExecutionId, null, + null, schemaTarget ); } diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java index c660c95471..342b09e4d4 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateExecutionSupport.java @@ -156,6 +156,7 @@ public AggregateTaskExecution from(TaskExecution execution, String schemaTarget, execution.getExternalExecutionId(), execution.getParentExecutionId(), platformName, + null, schemaTarget); } return null; diff --git a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java index a3493c3934..dac01ad2e7 100644 --- a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java +++ b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java @@ -71,6 +71,8 @@ public class TaskExecutionThinResource extends RepresentationModel arguments, - String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName, String schemaTarget) { + String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName, + String ctrTaskStatus, String schemaTarget) { Assert.notNull(arguments, "arguments must not be null"); this.executionId = executionId; @@ -107,14 +110,15 @@ public AggregateTaskExecution(long executionId, Integer exitCode, String taskNam this.parentExecutionId = parentExecutionId; this.schemaTarget = schemaTarget; this.platformName = platformName; + this.ctrTaskStatus = ctrTaskStatus; } public AggregateTaskExecution(long executionId, Integer exitCode, String taskName, Date startTime, Date endTime, String exitMessage, List arguments, - String errorMessage, String externalExecutionId, String platformName, String schemaTarget) { + String errorMessage, String externalExecutionId, String platformName, String ctrTaskStatus, String schemaTarget) { this(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, - errorMessage, externalExecutionId, null, platformName, schemaTarget); + errorMessage, externalExecutionId, null, platformName, ctrTaskStatus, schemaTarget); } public long getExecutionId() { @@ -209,22 +213,31 @@ public void setPlatformName(String platformName) { this.platformName = platformName; } + public String getCtrTaskStatus() { + return ctrTaskStatus; + } + + public void setCtrTaskStatus(String ctrTaskStatus) { + this.ctrTaskStatus = ctrTaskStatus; + } + @Override public String toString() { return "AggregateTaskExecution{" + - "executionId=" + executionId + - ", parentExecutionId=" + parentExecutionId + - ", exitCode=" + exitCode + - ", taskName='" + taskName + '\'' + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", exitMessage='" + exitMessage + '\'' + - ", externalExecutionId='" + externalExecutionId + '\'' + - ", errorMessage='" + errorMessage + '\'' + - ", schemaTarget='" + schemaTarget + '\'' + - ", platformName='" + platformName + '\'' + - ", arguments=" + arguments + - '}'; + "executionId=" + executionId + + ", parentExecutionId=" + parentExecutionId + + ", exitCode=" + exitCode + + ", taskName='" + taskName + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", exitMessage='" + exitMessage + '\'' + + ", externalExecutionId='" + externalExecutionId + '\'' + + ", errorMessage='" + errorMessage + '\'' + + ", schemaTarget='" + schemaTarget + '\'' + + ", platformName='" + platformName + '\'' + + ", ctrTaskStatus='" + ctrTaskStatus + '\'' + + ", arguments=" + arguments + + '}'; } public TaskExecution toTaskExecution() { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java index 8f6e467bc4..122f69ca91 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java @@ -300,8 +300,8 @@ public TaskExecutionController taskExecutionController( } @Bean - public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer) { - return new TaskExecutionThinController(aggregateTaskExplorer); + public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer, TaskJobService taskJobService) { + return new TaskExecutionThinController(aggregateTaskExplorer, taskJobService); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java index 64ad6d5925..210911b3c3 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java @@ -18,6 +18,8 @@ import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer; import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource; import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; +import org.springframework.cloud.dataflow.server.service.TaskJobService; +import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PagedResourcesAssembler; import org.springframework.hateoas.PagedModel; @@ -26,6 +28,8 @@ import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; @@ -44,15 +48,29 @@ public class TaskExecutionThinController { private final AggregateTaskExplorer explorer; private final TaskExecutionThinResourceAssembler resourceAssembler; - public TaskExecutionThinController(AggregateTaskExplorer explorer) { + private final TaskJobService taskJobService; + + public TaskExecutionThinController(AggregateTaskExplorer explorer, TaskJobService taskJobService) { this.explorer = explorer; + this.taskJobService = taskJobService; this.resourceAssembler = new TaskExecutionThinResourceAssembler(); } @GetMapping(produces = "application/json") @ResponseStatus(HttpStatus.OK) public PagedModel listTasks(Pageable pageable, PagedResourcesAssembler pagedAssembler) { - return pagedAssembler.toModel(explorer.findAll(pageable, true), resourceAssembler); + Page page = explorer.findAll(pageable, true); + taskJobService.populateComposeTaskRunnerStatus(page.getContent()); + return pagedAssembler.toModel(page, resourceAssembler); + } + + @RequestMapping(value = "", method = RequestMethod.GET, params = "name") + @ResponseStatus(HttpStatus.OK) + public PagedModel retrieveTasksByName(@RequestParam("name") String taskName, + Pageable pageable, PagedResourcesAssembler pagedAssembler) { + Page page = this.explorer.findTaskExecutionsByName(taskName, pageable); + taskJobService.populateComposeTaskRunnerStatus(page.getContent()); + return pagedAssembler.toModel(page, resourceAssembler); } static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/AbstractCreateTaskParentIndexMigration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/AbstractCreateTaskParentIndexMigration.java new file mode 100644 index 0000000000..11d44e7d18 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/AbstractCreateTaskParentIndexMigration.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.server.db.migration; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.cloud.dataflow.common.flyway.AbstractMigration; +import org.springframework.cloud.dataflow.common.flyway.SqlCommand; + +/** + * Provide indexes to improve performance of finding child tasks. + * @author Corneil du Plessis + */ +public abstract class AbstractCreateTaskParentIndexMigration extends AbstractMigration { + protected static final String CREATE_TASK_PARENT_INDEX = + "create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID)"; + protected static final String CREATE_BOOT3_TASK_PARENT_INDEX = + "create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID)"; + + public AbstractCreateTaskParentIndexMigration() { + super(null); + } + + @Override + public List getCommands() { + return Arrays.asList( + SqlCommand.from(CREATE_TASK_PARENT_INDEX), + SqlCommand.from(CREATE_BOOT3_TASK_PARENT_INDEX) + ); + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/db2/V11__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/db2/V11__CreateTaskParentIndex.java new file mode 100644 index 0000000000..6efeb5db52 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/db2/V11__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.db2; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mariadb/V12__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mariadb/V12__CreateTaskParentIndex.java new file mode 100644 index 0000000000..3d48380d07 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mariadb/V12__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.mariadb; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mysql/V12__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mysql/V12__CreateTaskParentIndex.java new file mode 100644 index 0000000000..e4a651874d --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/mysql/V12__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.mysql; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/oracle/V12__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/oracle/V12__CreateTaskParentIndex.java new file mode 100644 index 0000000000..1c712a627e --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/oracle/V12__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.oracle; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/postgresql/V13__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/postgresql/V13__CreateTaskParentIndex.java new file mode 100644 index 0000000000..e37e523924 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/postgresql/V13__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.postgresql; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V13__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/sqlserver/V11__CreateTaskParentIndex.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/sqlserver/V11__CreateTaskParentIndex.java new file mode 100644 index 0000000000..a6000b3372 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/db/migration/sqlserver/V11__CreateTaskParentIndex.java @@ -0,0 +1,7 @@ +package org.springframework.cloud.dataflow.server.db.migration.sqlserver; + +import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration; + +public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration { + +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java index 81a18384af..3d77fc8d30 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/AggregateJobQueryDao.java @@ -16,8 +16,8 @@ package org.springframework.cloud.dataflow.server.repository; +import java.util.Collection; import java.util.Date; -import java.util.List; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobInstance; @@ -26,6 +26,7 @@ import org.springframework.batch.core.launch.NoSuchJobInstanceException; import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions; import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; +import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -60,4 +61,6 @@ public interface AggregateJobQueryDao { JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException; + void populateCtrStatus(Collection aggregateTaskExecutions); + } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java index 90eca34ebd..e4e360667a 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/repository/JdbcAggregateJobQueryDao.java @@ -22,13 +22,15 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import javax.sql.DataSource; @@ -58,6 +60,7 @@ import org.springframework.cloud.dataflow.core.database.support.DatabaseType; import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions; import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; +import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; @@ -171,6 +174,19 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao { " LEFT OUTER JOIN AGGREGATE_TASK_BATCH TT ON E.JOB_EXECUTION_ID = TT.JOB_EXECUTION_ID AND E.SCHEMA_TARGET = TT.SCHEMA_TARGET" + " LEFT OUTER JOIN AGGREGATE_TASK_EXECUTION T ON TT.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID AND TT.SCHEMA_TARGET = T.SCHEMA_TARGET"; + private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_MESSAGE as CTR_STATUS" + + " from AGGREGATE_TASK_EXECUTION T" + + " JOIN AGGREGATE_TASK_BATCH TB ON TB.TASK_EXECUTION_ID=T.TASK_EXECUTION_ID AND TB.SCHEMA_TARGET=T.SCHEMA_TARGET" + + " JOIN AGGREGATE_JOB_EXECUTION J ON J.JOB_EXECUTION_ID=TB.JOB_EXECUTION_ID AND J.SCHEMA_TARGET=TB.SCHEMA_TARGET" + + " WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) " + + " AND T.SCHEMA_TARGET = ':schemaTarget'" + + " AND (select count(*) from AGGREGATE_TASK_EXECUTION CT" + + " where (select count(*) from AGGREGATE_TASK_EXECUTION_PARAMS where" + + " CT.TASK_EXECUTION_ID = TASK_EXECUTION_ID and" + + " CT.SCHEMA_TARGET = SCHEMA_TARGET and" + + " TASK_PARAM = '--spring.cloud.task.parent-schema-target=boot2') > 0" + + " AND CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0"; + private static final String FIND_JOB_BY_NAME_INSTANCE_ID = FIND_JOB_BY + " where I.JOB_NAME = ? AND I.JOB_INSTANCE_ID = ?"; @@ -269,6 +285,32 @@ public Page listJobInstances(String jobName, Pageable pag } + @Override + public void populateCtrStatus(Collection aggregateTaskExecutions) { + Map> targets = aggregateTaskExecutions.stream().collect(Collectors.groupingBy(aggregateTaskExecution -> aggregateTaskExecution.getSchemaTarget())); + final AtomicInteger updated = new AtomicInteger(0); + for(Map.Entry> entry : targets.entrySet()) { + String target = entry.getKey(); + Map aggregateTaskExecutionMap = entry.getValue().stream() + .collect(Collectors.toMap(AggregateTaskExecution::getExecutionId, Function.identity())); + String ids = aggregateTaskExecutionMap.keySet() + .stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + String sql = FIND_CTR_STATUS.replace(":taskExecutionIds", ids).replace(":schemaTarget", target); + jdbcTemplate.query(sql, rs -> { + Long id = rs.getLong("TASK_EXECUTION_ID"); + String ctrStatus = rs.getString("CTR_STATUS"); + LOG.debug("populateCtrStatus:{}={}", id, ctrStatus); + AggregateTaskExecution execution = aggregateTaskExecutionMap.get(id); + Assert.notNull(execution, "Expected AggregateTaskExecution for " + id + " from " + ids); + updated.incrementAndGet(); + execution.setCtrTaskStatus(ctrStatus); + }); + } + LOG.debug("updated {} ctr statuses", updated.get()); + } + @Override public JobInstanceExecutions getJobInstanceExecution(String jobName, long instanceId) { LOG.debug("getJobInstanceExecution:{}:{}:{}", jobName, instanceId, FIND_JOB_BY_NAME_INSTANCE_ID); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java index 35f79cd9a4..079745b245 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Set; @@ -31,6 +32,7 @@ import org.springframework.batch.core.launch.NoSuchJobInstanceException; import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions; import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; +import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.cloud.dataflow.server.batch.JobExecutionWithStepCount; import org.springframework.cloud.dataflow.server.job.support.JobNotRestartableException; import org.springframework.cloud.task.repository.TaskExecution; @@ -206,4 +208,6 @@ Page listJobExecutionsForJobWithStepCountFilteredByTaskExecuti Map> getJobExecutionIdsByTaskExecutionIds(Collection taskExecutionIds, String schemaTarget); + void populateComposeTaskRunnerStatus(Collection taskExecutions); + } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java index 17cb501d56..9c4cadfa00 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java @@ -313,6 +313,10 @@ public void stopJobExecution(long jobExecutionId, String schemaTarget) throws No logger.info("stopped:{}:{}:status={}", jobExecutionId, schemaTarget, status); } + @Override + public void populateComposeTaskRunnerStatus(Collection taskExecutions) { + aggregateJobQueryDao.populateCtrStatus(taskExecutions); + } private TaskJobExecution getTaskJobExecution(JobExecution jobExecution, String schemaTarget) { return new TaskJobExecution( diff --git a/spring-cloud-dataflow-server-core/src/main/resources/org/springframework/cloud/dataflow/server/db/migration/h2/V1__INITIAL_SETUP.sql b/spring-cloud-dataflow-server-core/src/main/resources/org/springframework/cloud/dataflow/server/db/migration/h2/V1__INITIAL_SETUP.sql index 0c37d000ca..1099b9c086 100644 --- a/spring-cloud-dataflow-server-core/src/main/resources/org/springframework/cloud/dataflow/server/db/migration/h2/V1__INITIAL_SETUP.sql +++ b/spring-cloud-dataflow-server-core/src/main/resources/org/springframework/cloud/dataflow/server/db/migration/h2/V1__INITIAL_SETUP.sql @@ -351,4 +351,6 @@ create index BOOT3_BATCH_STEP_EXECUTION_JOB_EXECUTION_ID_IX on BOOT3_BATCH_STEP_ create index BOOT3_TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on BOOT3_TASK_TASK_BATCH(JOB_EXECUTION_ID); create index TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on TASK_TASK_BATCH(JOB_EXECUTION_ID); create index BATCH_JOB_EXECUTION_START_TIME_IX on BATCH_JOB_EXECUTION(START_TIME); -create index BOOT3_BATCH_JOB_EXECUTION_START_TIME_IX on BOOT3_BATCH_JOB_EXECUTION(START_TIME); \ No newline at end of file +create index BOOT3_BATCH_JOB_EXECUTION_START_TIME_IX on BOOT3_BATCH_JOB_EXECUTION(START_TIME); +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/db2/V10-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/db2/V10-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/db2/V10-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/mariadb/V12-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/mariadb/V12-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/mariadb/V12-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/mysql/V11-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/mysql/V11-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/mysql/V11-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/oracle/V10-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/oracle/V10-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/oracle/V10-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/postgresql/V11-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/postgresql/V11-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/postgresql/V11-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/main/resources/schemas/sqlserver/V10-dataflow.sql b/spring-cloud-dataflow-server-core/src/main/resources/schemas/sqlserver/V10-dataflow.sql new file mode 100644 index 0000000000..b932385f30 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/resources/schemas/sqlserver/V10-dataflow.sql @@ -0,0 +1,2 @@ +create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID); +create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID); \ No newline at end of file diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index 8c033241a8..5022cff975 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -241,8 +241,8 @@ public TaskExecutionController taskExecutionController( } @Bean - public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer) { - return new TaskExecutionThinController(aggregateTaskExplorer); + public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer, TaskJobService taskJobService) { + return new TaskExecutionThinController(aggregateTaskExplorer, taskJobService); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java index 936f6cfeca..71367fe520 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java @@ -550,8 +550,8 @@ public TaskExecutionController taskExecutionController( ); } @Bean - public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer explorer) { - return new TaskExecutionThinController(explorer); + public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer explorer, TaskJobService taskJobService) { + return new TaskExecutionThinController(explorer, taskJobService); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java index 6b30ed741a..6401dd4052 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java @@ -359,6 +359,7 @@ void getAllExecutions() throws Exception { .andExpect(status().isOk())) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(4))); } @@ -369,6 +370,7 @@ void getAllThinExecutions() throws Exception { .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(4))); } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcDataflowTaskExecutionDaoTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcDataflowTaskExecutionDaoTests.java index 3689e28913..e1891f466d 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcDataflowTaskExecutionDaoTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/repository/JdbcDataflowTaskExecutionDaoTests.java @@ -28,10 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; -import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport; +import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader; import org.springframework.cloud.dataflow.aggregate.task.TaskRepositoryContainer; +import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.server.configuration.TaskServiceDependencies; import org.springframework.cloud.task.repository.TaskRepository; @@ -108,7 +108,8 @@ private List createSampleTaskExecutions(String taskName, for (int i = 0; i < numExecutions; i++) { long executionId = randomGenerator.nextLong(); taskExecutions.add(new AggregateTaskExecution(executionId, null, taskName, startTime, - null, null, new ArrayList<>(), null, externalExecutionId, schemaVersionTarget.getName(), null)); + null, null, new ArrayList<>(), null, externalExecutionId, null, + schemaVersionTarget.getName(), null)); } return taskExecutions; } diff --git a/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/integration/test/tasks/TaskExecutionQueryIT.java b/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/integration/test/tasks/TaskExecutionQueryIT.java index 04b38e2d96..ae2193df3d 100644 --- a/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/integration/test/tasks/TaskExecutionQueryIT.java +++ b/spring-cloud-dataflow-server/src/test/java/org/springframework/cloud/dataflow/integration/test/tasks/TaskExecutionQueryIT.java @@ -160,27 +160,27 @@ void queryWithLargeNumberOfTaskExecutions() throws Exception { .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(greaterThanOrEqualTo(20)))); long totalTime = System.currentTimeMillis() - startTime; - logger.info("result:totalTime={}ms", totalTime); long startTime2 = System.currentTimeMillis(); mockMvc.perform( get("/tasks/executions").accept(MediaType.APPLICATION_JSON).param("size", "200").param("page", "2")) .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(greaterThanOrEqualTo(200)))); long totalTime2 = System.currentTimeMillis() - startTime2; - logger.info("result:totalTime2={}ms", totalTime2); long startTime3 = System.currentTimeMillis(); mockMvc.perform( get("/tasks/thinexecutions").accept(MediaType.APPLICATION_JSON).param("size", "20").param("page", "3")) .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(greaterThanOrEqualTo(20)))); long totalTime3 = System.currentTimeMillis() - startTime3; - logger.info("result:totalTime3={}ms", totalTime3); long startTime4 = System.currentTimeMillis(); mockMvc.perform( get("/tasks/thinexecutions").accept(MediaType.APPLICATION_JSON).param("size", "200").param("page", "2")) .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(greaterThanOrEqualTo(200)))); long totalTime4 = System.currentTimeMillis() - startTime4; + logger.info("result:totalTime={}ms", totalTime); + logger.info("result:totalTime2={}ms", totalTime2); + logger.info("result:totalTime3={}ms", totalTime3); logger.info("result:totalTime4={}ms", totalTime4); double ratioExecution = (double) totalTime / (double) totalTime2; double ratioThinExecution = (double) totalTime3 / (double) totalTime4; @@ -188,12 +188,9 @@ void queryWithLargeNumberOfTaskExecutions() throws Exception { logger.info("Ratio for tasks/executions:{}", ratioExecution); logger.info("Ratio for tasks/thinexecutions:{}", ratioThinExecution); logger.info("Ratio for tasks/executions to thinexecutions:{}", ratioThinToExecution); - assertThat(totalTime).isLessThan(totalTime2); - assertThat(totalTime3).isLessThan(totalTime); - assertThat(totalTime3).isLessThan(totalTime4); assertThat(totalTime4).isLessThan(totalTime2); assertThat(ratioThinExecution).isGreaterThan(ratioExecution); - assertThat(ratioThinToExecution).isGreaterThan(3.0); + assertThat(ratioThinToExecution).isGreaterThan(2.0); } }