Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote in dynamic #248

Merged
merged 9 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
*/
package org.flyte.examples;

import static org.flyte.examples.FlyteEnvironment.DOMAIN;
import static org.flyte.examples.FlyteEnvironment.PROJECT;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.errorprone.annotations.Var;
import org.flyte.examples.SumTask.SumInput;
import org.flyte.flytekit.SdkBindingData;
import org.flyte.flytekit.SdkBindingDataFactory;
import org.flyte.flytekit.SdkDynamicWorkflowTask;
import org.flyte.flytekit.SdkNode;
import org.flyte.flytekit.SdkRemoteTask;
import org.flyte.flytekit.SdkTypes;
import org.flyte.flytekit.SdkWorkflowBuilder;
import org.flyte.flytekit.jackson.JacksonSdkType;

Expand Down Expand Up @@ -59,11 +65,23 @@ public Output run(SdkWorkflowBuilder builder, Input input) {
} else if (input.n().get() == 0) {
return Output.create(SdkBindingDataFactory.of(0));
} else {
SdkNode<Void> hello =
builder.apply(
"hello",
SdkRemoteTask.create(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the remote task we reference from a dynamic workflow task.

DOMAIN,
PROJECT,
HelloWorldTask.class.getName(),
SdkTypes.nulls(),
SdkTypes.nulls()));
@Var SdkBindingData<Long> prev = SdkBindingDataFactory.of(0);
@Var SdkBindingData<Long> value = SdkBindingDataFactory.of(1);
for (int i = 2; i <= input.n().get(); i++) {
SdkBindingData<Long> next =
builder.apply("fib-" + i, new SumTask(), SumInput.create(value, prev)).getOutputs();
builder
.apply(
"fib-" + i, new SumTask().withUpstreamNode(hello), SumInput.create(value, prev))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make a connection for fun, as it is not required.

.getOutputs();
prev = value;
value = next;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 Flyte Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.examples;

public final class FlyteEnvironment {

public static final String DOMAIN = "development";
public static final String PROJECT = "flytesnacks";

private FlyteEnvironment() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public SumWorkflow.Output expand(SdkWorkflowBuilder builder, Input input) {
.result();
SdkBindingData<Long> abcd =
builder.apply("post-sum", new SumTask(), SumTask.SumInput.create(abc, d)).getOutputs();
return SumWorkflow.Output.create(abcd);
SdkBindingData<Long> result =
builder
.apply(
"fibonacci",
new DynamicFibonacciWorkflowTask(),
DynamicFibonacciWorkflowTask.Input.create(abcd))
.getOutputs()
.output();
return SumWorkflow.Output.create(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ public void testMockTasks() {
new SumTask(),
SumTask.SumInput.create(SdkBindingDataFactory.of(0L), SdkBindingDataFactory.of(4L)),
SdkBindingDataFactory.of(42L))
.withTaskOutput(
new DynamicFibonacciWorkflowTask(),
DynamicFibonacciWorkflowTask.Input.create(SdkBindingDataFactory.of(42L)),
DynamicFibonacciWorkflowTask.Output.create(SdkBindingDataFactory.of(123L)))
.execute();

assertEquals(42L, result.getIntegerOutput("result"));
assertEquals(123L, result.getIntegerOutput("result"));
}

@Test
Expand Down Expand Up @@ -87,9 +91,12 @@ public void testMockSubWorkflow() {
new SumTask(),
SumInput.create(SdkBindingDataFactory.of(10L), SdkBindingDataFactory.of(4L)),
SdkBindingDataFactory.of(15L))
.withTask(
new DynamicFibonacciWorkflowTask(),
input -> DynamicFibonacciWorkflowTask.Output.create(SdkBindingDataFactory.of(42L)))
.execute();

assertEquals(15L, result.getIntegerOutput("result"));
assertEquals(42L, result.getIntegerOutput("result"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.flyte.api.v1.WorkflowNode;
import org.flyte.api.v1.WorkflowNode.Reference;
import org.flyte.api.v1.WorkflowTemplate;
import org.flyte.flytekit.SdkDynamicWorkflowTask;
import org.flyte.flytekit.SdkRemoteLaunchPlan;
import org.flyte.flytekit.SdkRemoteTask;
import org.flyte.flytekit.SdkRunnableTask;
import org.flyte.flytekit.SdkTransform;
import org.flyte.flytekit.SdkType;
import org.flyte.flytekit.SdkWorkflow;
import org.flyte.localengine.ExecutionContext;
Expand Down Expand Up @@ -321,20 +323,32 @@ public <T> SdkTestingExecutor withFixedInputs(SdkType<T> type, T value) {

public <InputT, OutputT> SdkTestingExecutor withTaskOutput(
SdkRunnableTask<InputT, OutputT> task, InputT input, OutputT output) {
return withTaskOutput0(task, input, output);
}

public <InputT, OutputT> SdkTestingExecutor withTaskOutput(
SdkDynamicWorkflowTask<InputT, OutputT> task, InputT input, OutputT output) {
return withTaskOutput0(task, input, output);
}

public <InputT, OutputT> SdkTestingExecutor withTaskOutput(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is weird, but there is no change made to withTaskOutput(SdkRemoteTask, ...),

SdkRemoteTask<InputT, OutputT> task, InputT input, OutputT output) {
TestingRunnableTask<InputT, OutputT> fixedTask =
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType());
getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs());

return toBuilder()
.putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output))
.build();
}

public <InputT, OutputT> SdkTestingExecutor withTaskOutput(
SdkRemoteTask<InputT, OutputT> task, InputT input, OutputT output) {
private <InputT, OutputT> SdkTestingExecutor withTaskOutput0(
SdkTransform<InputT, OutputT> task, InputT input, OutputT output) {
TestingRunnableTask<InputT, OutputT> fixedTask =
getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs());
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType());

return toBuilder().putFixedTask(task.name(), fixedTask.withFixedOutput(input, output)).build();
return toBuilder()
.putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output))
.build();
}

public <InputT, OutputT> SdkTestingExecutor withLaunchPlanOutput(
Expand All @@ -361,6 +375,16 @@ public <InputT, OutputT> SdkTestingExecutor withLaunchPlan(

public <InputT, OutputT> SdkTestingExecutor withTask(
SdkRunnableTask<InputT, OutputT> task, Function<InputT, OutputT> runFn) {
return withTask0(task, runFn);
}

public <InputT, OutputT> SdkTestingExecutor withTask(
SdkDynamicWorkflowTask<InputT, OutputT> task, Function<InputT, OutputT> runFn) {
return withTask0(task, runFn);
}

private <InputT, OutputT> SdkTestingExecutor withTask0(
SdkTransform<InputT, OutputT> task, Function<InputT, OutputT> runFn) {
TestingRunnableTask<InputT, OutputT> fixedTask =
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.flyte.utils;

import static org.flyte.examples.FlyteEnvironment.DOMAIN;
import static org.flyte.examples.FlyteEnvironment.PROJECT;

import flyteidl.admin.ExecutionOuterClass;
import flyteidl.core.Execution;
import flyteidl.core.IdentifierOuterClass;
Expand All @@ -28,8 +31,6 @@
import org.rnorth.ducttape.unreliables.Unreliables;

public class FlyteSandboxClient {
private static final String DOMAIN = "development";
private static final String PROJECT = "flytesnacks";
private static final int EXECUTION_TIMEOUT_SECONDS = 300;

private final String version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class FlyteSandboxContainer extends GenericContainer<FlyteSandboxContainer> {

public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.1.0";
public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.9.1";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required by this PR, but it's nice to be on latest version.


public static final FlyteSandboxContainer INSTANCE =
new FlyteSandboxContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/** Overrides project, domain and version for nodes in {@link WorkflowTemplate}. */
@AutoValue
abstract class IdentifierRewrite {
public abstract class IdentifierRewrite {

abstract String domain();

Expand All @@ -47,12 +47,11 @@ WorkflowTemplate apply(WorkflowTemplate template) {
return visitor().visitWorkflowTemplate(template);
}

@VisibleForTesting
Visitor visitor() {
public Visitor visitor() {
return new Visitor();
}

class Visitor extends WorkflowNodeVisitor {
public class Visitor extends WorkflowNodeVisitor {
@Override
protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) {
return apply(value);
Expand Down Expand Up @@ -201,20 +200,20 @@ private static <T> T coalesce(T value1, Supplier<T> value2) {
return value1 != null ? value1 : value2.get();
}

static Builder builder() {
public static Builder builder() {
return new AutoValue_IdentifierRewrite.Builder();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder domain(String domain);
public abstract static class Builder {
public abstract Builder domain(String domain);

abstract Builder project(String project);
public abstract Builder project(String project);

abstract Builder version(String version);
public abstract Builder version(String version);

abstract Builder adminClient(FlyteAdminClient adminClient);
public abstract Builder adminClient(FlyteAdminClient adminClient);

abstract IdentifierRewrite build();
public abstract IdentifierRewrite build();
}
}
2 changes: 1 addition & 1 deletion jflyte/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
<environment>
<FLYTE_INTERNAL_MODULE_DIR>/jflyte/modules</FLYTE_INTERNAL_MODULE_DIR>
<FLYTE_INTERNAL_IMAGE>${docker.image}:${docker.tag}</FLYTE_INTERNAL_IMAGE>
<FLYTE_PLATFORM_URL>flyte:30081</FLYTE_PLATFORM_URL>
<FLYTE_PLATFORM_URL>flyteadmin.flyte.svc.cluster.local:81</FLYTE_PLATFORM_URL>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required for remote lookup to work because the task container running in k3 will need to talk to flyteadmin and using service DNS name is the best option.

Strictly speaking this is a breaking change but it is extremely unlikely that the existing config flyte:30081 would work for any users anyway and they have likely already customized it.

<FLYTE_PLATFORM_INSECURE>True</FLYTE_PLATFORM_INSECURE>
</environment>
<jvmFlags>
Expand Down
Loading
Loading