Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate package plugin.core.execution to dynamic properties #6708

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions core/src/main/java/io/kestra/plugin/core/execution/Count.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
Expand Down Expand Up @@ -41,7 +42,7 @@
code = """
id: executions_count
namespace: company.team
tasks:
- id: counts
type: io.kestra.plugin.core.execution.Counts
Expand All @@ -64,7 +65,7 @@
"text": ":warning: Flow `{{ jq taskrun.value '.namespace' true }}`.`{{ jq taskrun.value '.flowId' true }}` has no execution for last 24h!"
}
url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
Expand All @@ -86,21 +87,18 @@ public class Count extends Task implements RunnableTask<Count.Output> {
@Schema(
title = "A list of states to be filtered."
)
@PluginProperty
protected List<State.Type> states;
protected Property<List<State.Type>> states;

@NotNull
@Schema(
title = "The start date."
)
@PluginProperty(dynamic = true)
protected String startDate;
protected Property<String> startDate;

@Schema(
title = "The end date."
)
@PluginProperty(dynamic = true)
protected String endDate;
protected Property<String> endDate;

@NotNull
@Schema(
Expand All @@ -110,11 +108,9 @@ public class Count extends Task implements RunnableTask<Count.Output> {
"- ```yaml {{ eq count 0 }} ```: no execution found\n" +
"- ```yaml {{ gte count 5 }} ```: more than 5 executions\n"
)
@PluginProperty(dynamic = true)
protected String expression;
protected Property<String> expression;

@PluginProperty
protected List<String> namespaces;
protected Property<List<String>> namespaces;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -134,17 +130,19 @@ public Output run(RunContext runContext) throws Exception {
if (flows != null) {
flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
}

if (namespaces != null) {
namespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
var renderedNamespaces = runContext.render(this.namespaces).asList(String.class);
renderedNamespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}

List<ExecutionCount> executionCounts = executionRepository.executionCounts(
flowInfo.tenantId(),
flows,
this.states,
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null,
namespaces
runContext.render(this.states).asList(State.Type.class),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()) : null,
runContext.render(this.namespaces).asList(String.class)
);

logger.trace("{} flows matching filters", executionCounts.size());
Expand All @@ -153,7 +151,7 @@ public Output run(RunContext runContext) throws Exception {
.stream()
.filter(throwPredicate(item -> runContext
.render(
this.expression,
runContext.render(this.expression).as(String.class).orElseThrow(),
ImmutableMap.of("count", item.getCount().intValue())
)
.equals("true")
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/io/kestra/plugin/core/execution/Fail.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -89,29 +90,27 @@
aliases = "io.kestra.core.tasks.executions.Fail"
)
public class Fail extends Task implements RunnableTask<VoidOutput> {
@PluginProperty(dynamic = true)
@Schema(
title = "Optional condition, must coerce to a boolean.",
description = "Boolean coercion allows 0, -0, and '' to coerce to false, all other values to coerce to true."
)
private String condition;
private Property<String> condition;

@PluginProperty(dynamic = true)
@Schema(title = "Optional error message.")
@Builder.Default
private String errorMessage = "Task failure";
private Property<String> errorMessage = Property.of("Task failure");

@Override
public VoidOutput run(RunContext runContext) throws Exception {
if (condition != null) {
String rendered = runContext.render(condition);
String rendered = runContext.render(condition).as(String.class).orElse(null);
if (TruthUtils.isTruthy(rendered)) {
runContext.logger().error(runContext.render(errorMessage));
runContext.logger().error(runContext.render(errorMessage).as(String.class).orElse(null));
throw new Exception("Fail on a condition");
}
return null;
}

throw new Exception(runContext.render(errorMessage));
throw new Exception(runContext.render(errorMessage).as(String.class).orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
Expand Down Expand Up @@ -48,68 +49,59 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
title = "Namespace whose flows need to be purged, or namespace of the flow that needs to be purged.",
description = "If `flowId` isn't provided, this is a namespace prefix, else the namespace of the flow."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@Schema(
title = "The flow ID to be purged.",
description = "You need to provide the `namespace` properties if you want to purge a flow."
)
@PluginProperty(dynamic = true)
private String flowId;
private Property<String> flowId;

@Schema(
title = "The minimum date to be purged.",
description = "All data of flows executed after this date will be purged."
)
@PluginProperty(dynamic = true)
private String startDate;
private Property<String> startDate;

@Schema(
title = "The maximum date to be purged.",
description = "All data of flows executed before this date will be purged."
)
@PluginProperty(dynamic = true)
@NotNull
private String endDate;
private Property<String> endDate;

@Schema(
title = "The state of the executions to be purged.",
description = "If not set, executions for any states will be purged."
)
@PluginProperty
private List<State.Type> states;
private Property<List<State.Type>> states;

@Schema(
title = "Whether to purge executions."
)
@PluginProperty
@Builder.Default
private boolean purgeExecution = true;
private Property<Boolean> purgeExecution = Property.of(true);

@Schema(
title = "Whether to purge execution's logs.",
description = """
This will only purge logs from executions not from triggers, and it will do it execution by execution.
The `io.kestra.plugin.core.log.PurgeLogs` task is a better fit to purge logs as it will purge logs in bulk, and will also purge logs not tied to an execution like trigger logs."""
)
@PluginProperty
@Builder.Default
private boolean purgeLog = true;
private Property<Boolean> purgeLog = Property.of(true);

@Schema(
title = "Whether to purge execution's metrics."
)
@PluginProperty
@Builder.Default
private boolean purgeMetric = true;
private Property<Boolean> purgeMetric = Property.of(true);

@Schema(
title = "Whether to purge execution's files from the Kestra's internal storage."
)
@PluginProperty
@Builder.Default
private boolean purgeStorage = true;
private Property<Boolean> purgeStorage = Property.of(true);

@Override
public PurgeExecutions.Output run(RunContext runContext) throws Exception {
Expand All @@ -118,23 +110,24 @@ public PurgeExecutions.Output run(RunContext runContext) throws Exception {

// validate that this namespace is authorized on the target namespace / all namespaces
var flowInfo = runContext.flowInfo();
if (namespace == null){
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);
if (renderedNamespace == null){
flowService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
} else if (!runContext.render(namespace).equals(flowInfo.namespace())) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), runContext.render(namespace), flowInfo.tenantId(), flowInfo.namespace());
} else if (!renderedNamespace.equals(flowInfo.namespace())) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), renderedNamespace, flowInfo.tenantId(), flowInfo.namespace());
}

ExecutionService.PurgeResult purgeResult = executionService.purge(
purgeExecution,
purgeLog,
purgeMetric,
purgeStorage,
runContext.render(this.purgeExecution).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeLog).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeMetric).as(Boolean.class).orElseThrow(),
runContext.render(this.purgeStorage).as(Boolean.class).orElseThrow(),
flowInfo.tenantId(),
runContext.render(namespace),
runContext.render(flowId),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
ZonedDateTime.parse(runContext.render(endDate)),
states
renderedNamespace,
runContext.render(flowId).as(String.class).orElse(null),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()),
this.states == null ? null : runContext.render(this.states).asList(State.Type.class)
);

return Output.builder()
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/java/io/kestra/plugin/core/execution/Resume.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.kestra.plugin.core.execution;

import com.ctc.wstx.util.PrefixedName;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
Expand Down Expand Up @@ -49,14 +51,12 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
@Schema(
title = "Filter for a specific namespace in case `executionId` is set. In case you wonder why `executionId` is not enough — we require specifying the namespace to make permissions explicit. The Enterprise Edition of Kestra allows you to resume executions from another namespaces only if the permissions allow it. Check the [Allowed Namespaces](https://kestra.io/docs/enterprise/allowed-namespaces) documentation for more details."
)
@PluginProperty(dynamic = true)
private String namespace;
private Property<String> namespace;

@Schema(
title = "Filter for a specific flow identifier in case `executionId` is set."
)
@PluginProperty(dynamic = true)
private String flowId;
private Property<String> flowId;

@Schema(
title = "Filter for a specific execution.",
Expand All @@ -67,19 +67,22 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
If `executionId` is not set, the task will use the ID of the current execution."""
)
@PluginProperty(dynamic = true)
private String executionId;
private Property<String> executionId;

@Schema(
title = "Inputs to be passed to the execution when it's resumed."
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
private Property<Map<String, Object>> inputs;

@SuppressWarnings("unchecked")
@Override
public VoidOutput run(RunContext runContext) throws Exception {
var executionInfo = PluginUtilsService.executionFromTaskParameters(runContext, this.namespace, this.flowId, this.executionId);
var executionInfo = PluginUtilsService.executionFromTaskParameters(
runContext,
runContext.render(this.namespace).as(String.class).orElse(null),
runContext.render(this.flowId).as(String.class).orElse(null),
runContext.render(this.executionId).as(String.class).orElse(null)
);

ApplicationContext applicationContext = ((DefaultRunContext)runContext).getApplicationContext();
ExecutionService executionService = applicationContext.getBean(ExecutionService.class);
Expand All @@ -90,7 +93,9 @@ public VoidOutput run(RunContext runContext) throws Exception {
Execution execution = executionRepository.findById(executionInfo.tenantId(), executionInfo.id())
.orElseThrow(() -> new IllegalArgumentException("No execution found for execution id " + executionInfo.id()));
Flow flow = flowExecutor.findByExecution(execution).orElseThrow(() -> new IllegalArgumentException("Flow not found for execution id " + executionInfo.id()));
Map<String, Object> renderedInputs = inputs != null ? runContext.render(inputs) : null;

Map<String, Object> renderedInputs = runContext.render(this.inputs).asMap(String.class, Object.class);
renderedInputs = !renderedInputs.isEmpty() ? renderedInputs : null;
Execution resumed = executionService.resume(execution, flow, State.Type.RUNNING, renderedInputs);
executionQueue.emit(resumed);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
Expand Down Expand Up @@ -39,9 +40,9 @@ void run() throws Exception {
executionRepository.save(execution);

var purge = PurgeExecutions.builder()
.flowId(flowId)
.namespace(namespace)
.endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.flowId(Property.of(flowId))
.namespace(Property.of(namespace))
.endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var output = purge.run(runContext);
Expand All @@ -65,9 +66,9 @@ void deleted() throws Exception {
executionRepository.delete(execution);

var purge = PurgeExecutions.builder()
.namespace(namespace)
.flowId(flowId)
.endDate(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.namespace(Property.of(namespace))
.flowId(Property.of(flowId))
.endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var output = purge.run(runContext);
Expand Down
Loading