-
Notifications
You must be signed in to change notification settings - Fork 53
Add support raw container in the map task #329
base: master
Are you sure you want to change the base?
Changes from 6 commits
47d5add
57a2d3a
62577a2
79095a0
2e76f29
d201378
b9551e3
b1be556
ea31d6c
77e62b8
33893f7
2a9d132
4ec9d31
06877a8
10be909
b80c8a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ package pod | |
import ( | ||
"context" | ||
|
||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" | ||
|
||
pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors" | ||
"github.com/flyteorg/flyteplugins/go/tasks/logs" | ||
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" | ||
|
@@ -132,6 +134,11 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu | |
pod.ObjectMeta = *objectMeta | ||
pod.Spec = *podSpec | ||
|
||
if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { | ||
pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName | ||
pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar | ||
} | ||
|
||
return pod, nil | ||
} | ||
|
||
|
@@ -184,8 +191,17 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin | |
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil | ||
} | ||
|
||
// When the copilot is running, we should wait until the data is uploaded by the copilot. | ||
copilotContainerName, exists := r.GetAnnotations()[flytek8s.FlyteCopilotName] | ||
if exists { | ||
copilotContainerPhase := flytek8s.DetermineContainerPhase(copilotContainerName, pod.Status.ContainerStatuses, &info) | ||
if copilotContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { | ||
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, copilotContainerPhase.Info()), nil | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So right now this is done in I think the issue is that here we always add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue is the propeller will only pass array index to primary container, so we have to set raw-container to primary. However, if we set it to primary, propeller won't wait for the uploader complete, so I added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so we should probably update this logic then to support This means we could keep the logic in |
||
// if the primary container annotation exists, we use the status of the specified container | ||
primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) | ||
primaryContainerPhase := flytek8s.DetermineContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) | ||
if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { | ||
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hamersaw should we pass env
FlyteK8sArrayIndex
to copilot, and construct final output prefix in the copilot?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong feelings here. How are we passing the array index to the inputs downloader? Because in flytekit we pass the input data ref and a subtask index, IIUC it reads the full list of inputs and only uses the value at the subtask index. We need to do the same thing here right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we pass array index to primary container instead of downloader, and raw container task will read the value at subtask index. here is an example. flyteorg/flytekit#1547.
The problem is that in the regular map task, we construct the final output prefix in the flytekit (output_prefix + array index.), but the raw container doesn't know the output prefix, it write to a local share dir instead. uploader will read the data in the share dir and upload to s3.