Skip to content

Commit

Permalink
Support remote in dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Hongxin Liang <[email protected]>
  • Loading branch information
honnix committed Sep 27, 2023
1 parent 1dcb818 commit e81d44b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/** Overrides project, domain and version for nodes in {@link WorkflowTemplate}. */
@AutoValue
abstract class IdentifierRewrite {
public abstract class IdentifierRewrite {

abstract String domain();

Expand All @@ -47,12 +47,11 @@ WorkflowTemplate apply(WorkflowTemplate template) {
return visitor().visitWorkflowTemplate(template);
}

@VisibleForTesting
Visitor visitor() {
public Visitor visitor() {
return new Visitor();
}

class Visitor extends WorkflowNodeVisitor {
public class Visitor extends WorkflowNodeVisitor {
@Override
protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) {
return apply(value);
Expand Down Expand Up @@ -201,20 +200,20 @@ private static <T> T coalesce(T value1, Supplier<T> value2) {
return value1 != null ? value1 : value2.get();
}

static Builder builder() {
public static Builder builder() {
return new AutoValue_IdentifierRewrite.Builder();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder domain(String domain);
public abstract static class Builder {
public abstract Builder domain(String domain);

abstract Builder project(String project);
public abstract Builder project(String project);

abstract Builder version(String version);
public abstract Builder version(String version);

abstract Builder adminClient(FlyteAdminClient adminClient);
public abstract Builder adminClient(FlyteAdminClient adminClient);

abstract IdentifierRewrite build();
public abstract IdentifierRewrite build();
}
}
79 changes: 10 additions & 69 deletions jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import org.flyte.api.v1.DynamicWorkflowTaskRegistrar;
import org.flyte.api.v1.Literal;
import org.flyte.api.v1.Node;
import org.flyte.api.v1.PartialLaunchPlanIdentifier;
import org.flyte.api.v1.PartialTaskIdentifier;
import org.flyte.api.v1.PartialWorkflowIdentifier;
import org.flyte.api.v1.RunnableTask;
import org.flyte.api.v1.RunnableTaskRegistrar;
import org.flyte.api.v1.Struct;
Expand All @@ -57,6 +54,7 @@
import org.flyte.jflyte.utils.Config;
import org.flyte.jflyte.utils.ExecutionConfig;
import org.flyte.jflyte.utils.FileSystemLoader;
import org.flyte.jflyte.utils.IdentifierRewrite;
import org.flyte.jflyte.utils.JFlyteCustom;
import org.flyte.jflyte.utils.PackageLoader;
import org.flyte.jflyte.utils.ProjectClosure;
Expand Down Expand Up @@ -203,10 +201,16 @@ static DynamicJobSpec rewrite(
Map<TaskIdentifier, TaskTemplate> taskTemplates,
Map<WorkflowIdentifier, WorkflowTemplate> workflowTemplates) {

DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config);
WorkflowNodeVisitor workflowNodeVisitor =
IdentifierRewrite.builder()
.domain(config.domain())
.project(config.project())
.version(config.version())
.build()
.visitor();

List<Node> rewrittenNodes =
spec.nodes().stream().map(rewrite::visitNode).collect(toUnmodifiableList());
spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList());

Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);
Expand All @@ -218,7 +222,7 @@ static DynamicJobSpec rewrite(
// and workflows

Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
mapValues(usedSubWorkflows, rewrite::visitWorkflowTemplate);
mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate);

return spec.toBuilder()
.nodes(rewrittenNodes)
Expand All @@ -235,69 +239,6 @@ static DynamicJobSpec rewrite(
.build();
}

static class DynamicWorkflowIdentifierRewrite extends WorkflowNodeVisitor {
private final ExecutionConfig config;

DynamicWorkflowIdentifierRewrite(ExecutionConfig config) {
this.config = config;
}

@Override
protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialTaskIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote tasks: " + value);
}

@Override
protected PartialWorkflowIdentifier visitWorkflowIdentifier(PartialWorkflowIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialWorkflowIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

// in these cases all referenced workflows are sub-workflows, and we can't include
// templates for tasks used in them

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote workflows: " + value);
}

@Override
protected PartialLaunchPlanIdentifier visitLaunchPlanIdentifier(
PartialLaunchPlanIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialLaunchPlanIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

// we don't need to fetch anything, so we can use this reference, because
// for launch plans we don't need to include task and workflow templates into closure
if (value.project() != null && value.domain() != null && value.version() != null) {
return value;
}

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote launch plans: " + value);
}
}

private static DynamicWorkflowTask getDynamicWorkflowTask(String name) {
// be careful not to pass extra
Map<String, String> env = getEnv();
Expand Down

0 comments on commit e81d44b

Please sign in to comment.