Skip to content

Commit

Permalink
refactor: migrate package plugin.core.execution to dynamic properties
Browse files Browse the repository at this point in the history
migrate Count
migrate Fail
migrate PurgeExecutions
migrate Resume
  • Loading branch information
mgabelle committed Jan 10, 2025
1 parent 063ac10 commit 80a5ecb
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 71 deletions.
34 changes: 16 additions & 18 deletions core/src/main/java/io/kestra/plugin/core/execution/Count.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
Expand Down Expand Up @@ -41,7 +42,7 @@
code = """
id: executions_count
namespace: company.team
tasks:
- id: counts
type: io.kestra.plugin.core.execution.Counts
Expand All @@ -64,7 +65,7 @@
"text": ":warning: Flow `{{ jq taskrun.value '.namespace' true }}`.`{{ jq taskrun.value '.flowId' true }}` has no execution for last 24h!"
}
url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
Expand All @@ -86,21 +87,18 @@ public class Count extends Task implements RunnableTask<Count.Output> {
@Schema(
title = "A list of states to be filtered."
)
@PluginProperty
protected List<State.Type> states;
protected Property<List<State.Type>> states;

@NotNull
@Schema(
title = "The start date."
)
@PluginProperty(dynamic = true)
protected String startDate;
protected Property<String> startDate;

@Schema(
title = "The end date."
)
@PluginProperty(dynamic = true)
protected String endDate;
protected Property<String> endDate;

@NotNull
@Schema(
Expand All @@ -110,11 +108,9 @@ public class Count extends Task implements RunnableTask<Count.Output> {
"- ```yaml {{ eq count 0 }} ```: no execution found\n" +
"- ```yaml {{ gte count 5 }} ```: more than 5 executions\n"
)
@PluginProperty(dynamic = true)
protected String expression;
protected Property<String> expression;

@PluginProperty
protected List<String> namespaces;
protected Property<List<String>> namespaces;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -134,17 +130,19 @@ public Output run(RunContext runContext) throws Exception {
if (flows != null) {
flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
}

if (namespaces != null) {
namespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
var renderedNamespaces = runContext.render(this.namespaces).asList(String.class);
renderedNamespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}

List<ExecutionCount> executionCounts = executionRepository.executionCounts(
flowInfo.tenantId(),
flows,
this.states,
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null,
namespaces
runContext.render(this.states).asList(State.Type.class),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()) : null,
runContext.render(this.namespaces).asList(String.class)
);

logger.trace("{} flows matching filters", executionCounts.size());
Expand All @@ -153,7 +151,7 @@ public Output run(RunContext runContext) throws Exception {
.stream()
.filter(throwPredicate(item -> runContext
.render(
this.expression,
runContext.render(this.expression).as(String.class).orElseThrow(),
ImmutableMap.of("count", item.getCount().intValue())
)
.equals("true")
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/io/kestra/plugin/core/execution/Fail.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -89,29 +90,27 @@
aliases = "io.kestra.core.tasks.executions.Fail"
)
public class Fail extends Task implements RunnableTask<VoidOutput> {
@PluginProperty(dynamic = true)
@Schema(
title = "Optional condition, must coerce to a boolean.",
description = "Boolean coercion allows 0, -0, and '' to coerce to false, all other values to coerce to true."
)
private String condition;
private Property<String> condition;

@PluginProperty(dynamic = true)
@Schema(title = "Optional error message.")
@Builder.Default
private String errorMessage = "Task failure";
private Property<String> errorMessage = Property.of("Task failure");

@Override
public VoidOutput run(RunContext runContext) throws Exception {
if (condition != null) {
String rendered = runContext.render(condition);
String rendered = runContext.render(condition).as(String.class).orElse(null);
if (TruthUtils.isTruthy(rendered)) {
runContext.logger().error(runContext.render(errorMessage));
runContext.logger().error(runContext.render(errorMessage).as(String.class).orElse(null));
throw new Exception("Fail on a condition");
}
return null;
}

throw new Exception(runContext.render(errorMessage));
throw new Exception(runContext.render(errorMessage).as(String.class).orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -48,68 +49,59 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
title = "Namespace whose flows need to be purged, or namespace of the flow that needs to be purged.",
description = "If `flowId` isn't provided, this is a namespace prefix, else the namespace of the flow."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@Schema(
title = "The flow ID to be purged.",
description = "You need to provide the `namespace` properties if you want to purge a flow."
)
@PluginProperty(dynamic = true)
private String flowId;
private Property<String> flowId;

@Schema(
title = "The minimum date to be purged.",
description = "All data of flows executed after this date will be purged."
)
@PluginProperty(dynamic = true)
private String startDate;
private Property<String> startDate;

@Schema(
title = "The maximum date to be purged.",
description = "All data of flows executed before this date will be purged."
)
@PluginProperty(dynamic = true)
@NotNull
private String endDate;
private Property<String> endDate;

@Schema(
title = "The state of the executions to be purged.",
description = "If not set, executions for any states will be purged."
)
@PluginProperty
private List<State.Type> states;
private Property<List<State.Type>> states;

@Schema(
title = "Whether to purge executions."
)
@PluginProperty
@Builder.Default
private boolean purgeExecution = true;
private Property<Boolean> purgeExecution = Property.of(true);

@Schema(
title = "Whether to purge execution's logs.",
description = """
This will only purge logs from executions not from triggers, and it will do it execution by execution.
The `io.kestra.plugin.core.log.PurgeLogs` task is a better fit to purge logs as it will purge logs in bulk, and will also purge logs not tied to an execution like trigger logs."""
)
@PluginProperty
@Builder.Default
private boolean purgeLog = true;
private Property<Boolean> purgeLog = Property.of(true);

@Schema(
title = "Whether to purge execution's metrics."
)
@PluginProperty
@Builder.Default
private boolean purgeMetric = true;
private Property<Boolean> purgeMetric = Property.of(true);

@Schema(
title = "Whether to purge execution's files from the Kestra's internal storage."
)
@PluginProperty
@Builder.Default
private boolean purgeStorage = true;
private Property<Boolean> purgeStorage = Property.of(true);

@Override
public PurgeExecutions.Output run(RunContext runContext) throws Exception {
Expand All @@ -118,23 +110,24 @@ public PurgeExecutions.Output run(RunContext runContext) throws Exception {

// validate that this namespace is authorized on the target namespace / all namespaces
var flowInfo = runContext.flowInfo();
if (namespace == null){
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);
if (renderedNamespace == null){
flowService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
} else if (!runContext.render(namespace).equals(flowInfo.namespace())) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), runContext.render(namespace), flowInfo.tenantId(), flowInfo.namespace());
} else if (!renderedNamespace.equals(flowInfo.namespace())) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), renderedNamespace, flowInfo.tenantId(), flowInfo.namespace());
}

ExecutionService.PurgeResult purgeResult = executionService.purge(
purgeExecution,
purgeLog,
purgeMetric,
purgeStorage,
runContext.render(this.purgeExecution).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeLog).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeMetric).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeStorage).as(Boolean.class).orElseThrow(),
flowInfo.tenantId(),
runContext.render(namespace),
runContext.render(flowId),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
ZonedDateTime.parse(runContext.render(endDate)),
states
renderedNamespace,
runContext.render(flowId).as(String.class).orElse(null),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()),
this.states == null ? null : runContext.render(this.states).asList(State.Type.class)
);

return Output.builder()
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/java/io/kestra/plugin/core/execution/Resume.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.kestra.plugin.core.execution;

import com.ctc.wstx.util.PrefixedName;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -49,14 +51,12 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
@Schema(
title = "Filter for a specific namespace in case `executionId` is set. In case you wonder why `executionId` is not enough — we require specifying the namespace to make permissions explicit. The Enterprise Edition of Kestra allows you to resume executions from another namespaces only if the permissions allow it. Check the [Allowed Namespaces](https://kestra.io/docs/enterprise/allowed-namespaces) documentation for more details."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@Schema(
title = "Filter for a specific flow identifier in case `executionId` is set."
)
@PluginProperty(dynamic = true)
private String flowId;
private Property<String> flowId;

@Schema(
title = "Filter for a specific execution.",
Expand All @@ -67,19 +67,22 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
If `executionId` is not set, the task will use the ID of the current execution."""
)
@PluginProperty(dynamic = true)
private String executionId;
private Property<String> executionId;

@Schema(
title = "Inputs to be passed to the execution when it's resumed."
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
private Property<Map<String, Object>> inputs;

@SuppressWarnings("unchecked")
@Override
public VoidOutput run(RunContext runContext) throws Exception {
var executionInfo = PluginUtilsService.executionFromTaskParameters(runContext, this.namespace, this.flowId, this.executionId);
var executionInfo = PluginUtilsService.executionFromTaskParameters(
runContext,
runContext.render(this.namespace).as(String.class).orElse(null),
runContext.render(this.flowId).as(String.class).orElse(null),
runContext.render(this.executionId).as(String.class).orElse(null)
);

ApplicationContext applicationContext = ((DefaultRunContext)runContext).getApplicationContext();
ExecutionService executionService = applicationContext.getBean(ExecutionService.class);
Expand All @@ -90,7 +93,9 @@ public VoidOutput run(RunContext runContext) throws Exception {
Execution execution = executionRepository.findById(executionInfo.tenantId(), executionInfo.id())
.orElseThrow(() -> new IllegalArgumentException("No execution found for execution id " + executionInfo.id()));
Flow flow = flowExecutor.findByExecution(execution).orElseThrow(() -> new IllegalArgumentException("Flow not found for execution id " + executionInfo.id()));
Map<String, Object> renderedInputs = inputs != null ? runContext.render(inputs) : null;

Map<String, Object> renderedInputs = runContext.render(this.inputs).asMap(String.class, Object.class);
renderedInputs = !renderedInputs.isEmpty() ? renderedInputs : null;
Execution resumed = executionService.resume(execution, flow, State.Type.RUNNING, renderedInputs);
executionQueue.emit(resumed);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
Expand Down Expand Up @@ -39,9 +40,9 @@ void run() throws Exception {
executionRepository.save(execution);

var purge = PurgeExecutions.builder()
.flowId(flowId)
.namespace(namespace)
.endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.flowId(Property.of(flowId))
.namespace(Property.of(namespace))
.endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var output = purge.run(runContext);
Expand All @@ -65,9 +66,9 @@ void deleted() throws Exception {
executionRepository.delete(execution);

var purge = PurgeExecutions.builder()
.namespace(namespace)
.flowId(flowId)
.endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.namespace(Property.of(namespace))
.flowId(Property.of(flowId))
.endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var output = purge.run(runContext);
Expand Down

0 comments on commit 80a5ecb

Please sign in to comment.