-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support remote in dynamic #248
Changes from all commits
8b95d48
fc22187
ba3abc4
04fea71
4f52b36
eb0af74
848dc42
fa0176b
d87ab1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,19 @@ | |
*/ | ||
package org.flyte.examples; | ||
|
||
import static org.flyte.examples.FlyteEnvironment.DOMAIN; | ||
import static org.flyte.examples.FlyteEnvironment.PROJECT; | ||
|
||
import com.google.auto.service.AutoService; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.errorprone.annotations.Var; | ||
import org.flyte.examples.SumTask.SumInput; | ||
import org.flyte.flytekit.SdkBindingData; | ||
import org.flyte.flytekit.SdkBindingDataFactory; | ||
import org.flyte.flytekit.SdkDynamicWorkflowTask; | ||
import org.flyte.flytekit.SdkNode; | ||
import org.flyte.flytekit.SdkRemoteTask; | ||
import org.flyte.flytekit.SdkTypes; | ||
import org.flyte.flytekit.SdkWorkflowBuilder; | ||
import org.flyte.flytekit.jackson.JacksonSdkType; | ||
|
||
|
@@ -59,11 +65,23 @@ public Output run(SdkWorkflowBuilder builder, Input input) { | |
} else if (input.n().get() == 0) { | ||
return Output.create(SdkBindingDataFactory.of(0)); | ||
} else { | ||
SdkNode<Void> hello = | ||
builder.apply( | ||
"hello", | ||
SdkRemoteTask.create( | ||
DOMAIN, | ||
PROJECT, | ||
HelloWorldTask.class.getName(), | ||
SdkTypes.nulls(), | ||
SdkTypes.nulls())); | ||
@Var SdkBindingData<Long> prev = SdkBindingDataFactory.of(0); | ||
@Var SdkBindingData<Long> value = SdkBindingDataFactory.of(1); | ||
for (int i = 2; i <= input.n().get(); i++) { | ||
SdkBindingData<Long> next = | ||
builder.apply("fib-" + i, new SumTask(), SumInput.create(value, prev)).getOutputs(); | ||
builder | ||
.apply( | ||
"fib-" + i, new SumTask().withUpstreamNode(hello), SumInput.create(value, prev)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make a connection for fun, as it is not required. |
||
.getOutputs(); | ||
prev = value; | ||
value = next; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright 2023 Flyte Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.flyte.examples; | ||
|
||
public final class FlyteEnvironment { | ||
|
||
public static final String DOMAIN = "development"; | ||
public static final String PROJECT = "flytesnacks"; | ||
|
||
private FlyteEnvironment() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,9 +42,11 @@ | |
import org.flyte.api.v1.WorkflowNode; | ||
import org.flyte.api.v1.WorkflowNode.Reference; | ||
import org.flyte.api.v1.WorkflowTemplate; | ||
import org.flyte.flytekit.SdkDynamicWorkflowTask; | ||
import org.flyte.flytekit.SdkRemoteLaunchPlan; | ||
import org.flyte.flytekit.SdkRemoteTask; | ||
import org.flyte.flytekit.SdkRunnableTask; | ||
import org.flyte.flytekit.SdkTransform; | ||
import org.flyte.flytekit.SdkType; | ||
import org.flyte.flytekit.SdkWorkflow; | ||
import org.flyte.localengine.ExecutionContext; | ||
|
@@ -321,20 +323,32 @@ public <T> SdkTestingExecutor withFixedInputs(SdkType<T> type, T value) { | |
|
||
public <InputT, OutputT> SdkTestingExecutor withTaskOutput( | ||
SdkRunnableTask<InputT, OutputT> task, InputT input, OutputT output) { | ||
return withTaskOutput0(task, input, output); | ||
} | ||
|
||
public <InputT, OutputT> SdkTestingExecutor withTaskOutput( | ||
SdkDynamicWorkflowTask<InputT, OutputT> task, InputT input, OutputT output) { | ||
return withTaskOutput0(task, input, output); | ||
} | ||
|
||
public <InputT, OutputT> SdkTestingExecutor withTaskOutput( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The diff is weird, but there is no change made to |
||
SdkRemoteTask<InputT, OutputT> task, InputT input, OutputT output) { | ||
TestingRunnableTask<InputT, OutputT> fixedTask = | ||
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); | ||
getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs()); | ||
|
||
return toBuilder() | ||
.putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output)) | ||
.build(); | ||
} | ||
|
||
public <InputT, OutputT> SdkTestingExecutor withTaskOutput( | ||
SdkRemoteTask<InputT, OutputT> task, InputT input, OutputT output) { | ||
private <InputT, OutputT> SdkTestingExecutor withTaskOutput0( | ||
SdkTransform<InputT, OutputT> task, InputT input, OutputT output) { | ||
TestingRunnableTask<InputT, OutputT> fixedTask = | ||
getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs()); | ||
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); | ||
|
||
return toBuilder().putFixedTask(task.name(), fixedTask.withFixedOutput(input, output)).build(); | ||
return toBuilder() | ||
.putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output)) | ||
.build(); | ||
} | ||
|
||
public <InputT, OutputT> SdkTestingExecutor withLaunchPlanOutput( | ||
|
@@ -361,6 +375,16 @@ public <InputT, OutputT> SdkTestingExecutor withLaunchPlan( | |
|
||
public <InputT, OutputT> SdkTestingExecutor withTask( | ||
SdkRunnableTask<InputT, OutputT> task, Function<InputT, OutputT> runFn) { | ||
return withTask0(task, runFn); | ||
} | ||
|
||
public <InputT, OutputT> SdkTestingExecutor withTask( | ||
SdkDynamicWorkflowTask<InputT, OutputT> task, Function<InputT, OutputT> runFn) { | ||
return withTask0(task, runFn); | ||
} | ||
|
||
private <InputT, OutputT> SdkTestingExecutor withTask0( | ||
SdkTransform<InputT, OutputT> task, Function<InputT, OutputT> runFn) { | ||
TestingRunnableTask<InputT, OutputT> fixedTask = | ||
getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ | |
|
||
public class FlyteSandboxContainer extends GenericContainer<FlyteSandboxContainer> { | ||
|
||
public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.1.0"; | ||
public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.9.1"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not required by this PR, but it's nice to be on latest version. |
||
|
||
public static final FlyteSandboxContainer INSTANCE = | ||
new FlyteSandboxContainer() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -199,7 +199,7 @@ | |
<environment> | ||
<FLYTE_INTERNAL_MODULE_DIR>/jflyte/modules</FLYTE_INTERNAL_MODULE_DIR> | ||
<FLYTE_INTERNAL_IMAGE>${docker.image}:${docker.tag}</FLYTE_INTERNAL_IMAGE> | ||
<FLYTE_PLATFORM_URL>flyte:30081</FLYTE_PLATFORM_URL> | ||
<FLYTE_PLATFORM_URL>flyteadmin.flyte.svc.cluster.local:81</FLYTE_PLATFORM_URL> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required for remote lookup to work because the task container running in k3 will need to talk to flyteadmin and using service DNS name is the best option. Strictly speaking this is a breaking change but it is extremely unlikely that the existing config |
||
<FLYTE_PLATFORM_INSECURE>True</FLYTE_PLATFORM_INSECURE> | ||
</environment> | ||
<jvmFlags> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the remote task we reference from a dynamic workflow task.