Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): introduce an always block on flow & flowable #6686

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void run() {
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), containsString("\"root.date\"[shape=box,label=\"date\"];"));
assertThat(out.toString(), containsString("\"root.date\"[shape=box];"));
}
}
}
39 changes: 31 additions & 8 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,15 @@ public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors afters tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, null);
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors,
List<ResolvedTask> resolvedAfters
) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedAfters, null);
}

/**
Expand All @@ -349,15 +353,28 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedFinally afters tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
resolvedFinally = removeDisabled(resolvedFinally);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> finallyFlow = this.findTaskRunByTasks(resolvedFinally, parentTaskRun);

// finally is already started, just continue theses finally
if (!finallyFlow.isEmpty()) {
return resolvedFinally == null ? Collections.emptyList() : resolvedFinally;
}

// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
Expand All @@ -366,8 +383,15 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
return Collections.emptyList();
}

return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
if (resolvedFinally != null && resolvedErrors != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
return resolvedErrors;
} else if (resolvedFinally == null) {
return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedFinally != null) {
return resolvedFinally;
}

return resolvedTasks;
Expand All @@ -390,8 +414,7 @@ private List<ResolvedTask> removeDisabled(List<ResolvedTask> tasks) {
.toList();
}

public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks,
TaskRun parentTaskRun) {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return Collections.emptyList();
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -32,10 +33,7 @@
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +79,15 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<Task> errors;

@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@Valid
@Deprecated
List<Listener> listeners;
Expand Down Expand Up @@ -188,6 +195,7 @@ public Stream<Task> allTasks() {
return Stream.of(
this.tasks != null ? this.tasks : new ArrayList<Task>(),
this.errors != null ? this.errors : new ArrayList<Task>(),
this._finally != null ? this._finally : new ArrayList<Task>(),
this.listenersTasks()
)
.flatMap(Collection::stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class FlowForExecution extends AbstractFlow {
@Valid
List<TaskForExecution> errors;

@Valid
List<TaskForExecution> _finally;

@Valid
List<AbstractTriggerForExecution> triggers;

Expand All @@ -36,6 +39,7 @@ public static FlowForExecution of(Flow flow) {
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
._finally(ListUtils.emptyOnNull(flow.getFinally()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public Flow toFlow() {
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
Expand Down Expand Up @@ -69,6 +70,7 @@ public static FlowWithSource of(Flow flow, String source) {
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
._finally(flow._finally)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public Graph<T, V> addEdge(T previous, T next, V value) {
return this;
}

public Graph<T, V> removeEdge(T previous, T next) {
this.graph.removeEdge(previous, next);

return this;
}

public Set<T> nodes() {
return this.graph.nodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import lombok.Builder;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -25,6 +25,14 @@ public class GraphCluster extends AbstractGraph {
@JsonIgnore
private final GraphClusterRoot root;

@JsonIgnore
@Getter(AccessLevel.NONE)
private final GraphClusterFinally _finally;

public GraphClusterFinally getFinally() {
return _finally;
}

@JsonIgnore
private final GraphClusterEnd end;

Expand All @@ -41,17 +49,22 @@ public GraphCluster(String uid) {

this.relationType = null;
this.root = new GraphClusterRoot();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = null;

this.addNode(this.root);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public GraphCluster(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
this(new GraphTask(task.getId(), task, taskRun, values, relationType), task.getId(), relationType);

this.addNode(this.taskNode, false);

this.addEdge(this.getRoot(), this.taskNode, new Relation());
}

Expand All @@ -60,11 +73,15 @@ protected GraphCluster(AbstractGraphTask taskNode, String uid, RelationType rela

this.relationType = relationType;
this.root = new GraphClusterRoot();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = taskNode;

this.addNode(this.root);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public void addNode(AbstractGraph node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
@Getter
public class GraphClusterEnd extends AbstractGraph {
public GraphClusterEnd() {
super(IdUtils.create());
super("end-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.models.hierarchies;

import io.kestra.core.utils.IdUtils;
import lombok.Getter;

@Getter
public class GraphClusterFinally extends AbstractGraph {
public GraphClusterFinally() {
super("finally-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
@Getter
public class GraphClusterRoot extends AbstractGraph {
public GraphClusterRoot() {
super(IdUtils.create());
super("root-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public enum RelationType {
SEQUENTIAL,
CHOICE,
ERROR,
FINALLY,
PARALLEL,
DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface FlowableTask <T extends Output> {
@PluginProperty
List<Task> getErrors();

@Schema(
title = "List of tasks to run after any tasks failed or success on this FlowableTask."
)
@PluginProperty
List<Task> getFinally();

/**
* Create the topology representation of a flowable task.
* <p>
Expand Down Expand Up @@ -71,6 +77,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure(),
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/models/templates/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
Expand Down Expand Up @@ -67,6 +68,15 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
private List<Task> errors;

@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@NotNull
@Builder.Default
private final boolean deleted = false;
Expand Down Expand Up @@ -138,6 +148,7 @@ public Template toDeleted() {
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution ex
// Then wait for completion (KILLED or whatever) on child tasks to KILLED the parent one.
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
flowableParent.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun)
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getFinally(), parentTaskRun)
);

List<TaskRun> taskRunByTasks = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
Expand Down Expand Up @@ -426,7 +427,8 @@ private Executor handleNext(Executor executor) {
.resolveSequentialNexts(
executor.getExecution(),
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getFinally())
);

if (nextTaskRuns.isEmpty()) {
Expand Down Expand Up @@ -686,7 +688,8 @@ private Executor handleEnd(Executor executor) {

List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getFinally())
);

if (!executor.getExecution().isTerminated(currentTasks)) {
Expand Down
Loading
Loading