From e81d44ba48ce56725a52a42c189cf9d911cfc2cc Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Wed, 27 Sep 2023 23:21:01 +0200 Subject: [PATCH] Support remote in dynamic Signed-off-by: Hongxin Liang --- .../flyte/jflyte/utils/IdentifierRewrite.java | 21 +++-- .../flyte/jflyte/ExecuteDynamicWorkflow.java | 79 +++---------------- 2 files changed, 20 insertions(+), 80 deletions(-) diff --git a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java index f9591bae2..570971d30 100644 --- a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java +++ b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java @@ -33,7 +33,7 @@ /** Overrides project, domain and version for nodes in {@link WorkflowTemplate}. */ @AutoValue -abstract class IdentifierRewrite { +public abstract class IdentifierRewrite { abstract String domain(); @@ -47,12 +47,11 @@ WorkflowTemplate apply(WorkflowTemplate template) { return visitor().visitWorkflowTemplate(template); } - @VisibleForTesting - Visitor visitor() { + public Visitor visitor() { return new Visitor(); } - class Visitor extends WorkflowNodeVisitor { + public class Visitor extends WorkflowNodeVisitor { @Override protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) { return apply(value); @@ -201,20 +200,20 @@ private static T coalesce(T value1, Supplier value2) { return value1 != null ? value1 : value2.get(); } - static Builder builder() { + public static Builder builder() { return new AutoValue_IdentifierRewrite.Builder(); } @AutoValue.Builder - abstract static class Builder { - abstract Builder domain(String domain); + public abstract static class Builder { + public abstract Builder domain(String domain); - abstract Builder project(String project); + public abstract Builder project(String project); - abstract Builder version(String version); + public abstract Builder version(String version); - abstract Builder adminClient(FlyteAdminClient adminClient); + public abstract Builder adminClient(FlyteAdminClient adminClient); - abstract IdentifierRewrite build(); + public abstract IdentifierRewrite build(); } } diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 03af3a40f..e7fb8d352 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -41,9 +41,6 @@ import org.flyte.api.v1.DynamicWorkflowTaskRegistrar; import org.flyte.api.v1.Literal; import org.flyte.api.v1.Node; -import org.flyte.api.v1.PartialLaunchPlanIdentifier; -import org.flyte.api.v1.PartialTaskIdentifier; -import org.flyte.api.v1.PartialWorkflowIdentifier; import org.flyte.api.v1.RunnableTask; import org.flyte.api.v1.RunnableTaskRegistrar; import org.flyte.api.v1.Struct; @@ -57,6 +54,7 @@ import org.flyte.jflyte.utils.Config; import org.flyte.jflyte.utils.ExecutionConfig; import org.flyte.jflyte.utils.FileSystemLoader; +import org.flyte.jflyte.utils.IdentifierRewrite; import org.flyte.jflyte.utils.JFlyteCustom; import org.flyte.jflyte.utils.PackageLoader; import org.flyte.jflyte.utils.ProjectClosure; @@ -203,10 +201,16 @@ static DynamicJobSpec rewrite( Map taskTemplates, Map workflowTemplates) { - DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config); + WorkflowNodeVisitor workflowNodeVisitor = + IdentifierRewrite.builder() + .domain(config.domain()) + .project(config.project()) + .version(config.version()) + .build() + .visitor(); List rewrittenNodes = - spec.nodes().stream().map(rewrite::visitNode).collect(toUnmodifiableList()); + spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); Map usedSubWorkflows = ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); @@ -218,7 +222,7 @@ static DynamicJobSpec rewrite( // and workflows Map rewrittenUsedSubWorkflows = - mapValues(usedSubWorkflows, rewrite::visitWorkflowTemplate); + mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); return spec.toBuilder() .nodes(rewrittenNodes) @@ -235,69 +239,6 @@ static DynamicJobSpec rewrite( .build(); } - static class DynamicWorkflowIdentifierRewrite extends WorkflowNodeVisitor { - private final ExecutionConfig config; - - DynamicWorkflowIdentifierRewrite(ExecutionConfig config) { - this.config = config; - } - - @Override - protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialTaskIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote tasks: " + value); - } - - @Override - protected PartialWorkflowIdentifier visitWorkflowIdentifier(PartialWorkflowIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialWorkflowIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // in these cases all referenced workflows are sub-workflows, and we can't include - // templates for tasks used in them - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote workflows: " + value); - } - - @Override - protected PartialLaunchPlanIdentifier visitLaunchPlanIdentifier( - PartialLaunchPlanIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialLaunchPlanIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // we don't need to fetch anything, so we can use this reference, because - // for launch plans we don't need to include task and workflow templates into closure - if (value.project() != null && value.domain() != null && value.version() != null) { - return value; - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote launch plans: " + value); - } - } - private static DynamicWorkflowTask getDynamicWorkflowTask(String name) { // be careful not to pass extra Map env = getEnv();