diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index b7ca58928..be83a8c0a 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -179,6 +179,9 @@ type FlyteCoPilotConfig struct { // Time for which the sidecar container should wait after starting up, for the primary process to appear. If it does not show up in this time // the process will be assumed to be dead or in a terminal condition and will trigger an abort. StartTimeout config2.Duration `json:"start-timeout" pflag:"-,Time for which the sidecar should wait on startup before assuming the primary container to have failed startup."` + // Time for which the sidecar container should wait for the primary process to complete. If it does not end up in this time + // the process will be assumed to be dead or in a terminal condition and will trigger an abort. + FinishTimeout config2.Duration `json:"finish-timeout" pflag:"-,Time for which the sidecar should wait for primary container to complete."` // Resources for CoPilot Containers CPU string `json:"cpu" pflag:",Used to set cpu for co-pilot containers"` Memory string `json:"memory" pflag:",Used to set memory for co-pilot containers"` diff --git a/go/tasks/pluginmachinery/flytek8s/copilot.go b/go/tasks/pluginmachinery/flytek8s/copilot.go index 50e7b2d74..baef2bb4a 100644 --- a/go/tasks/pluginmachinery/flytek8s/copilot.go +++ b/go/tasks/pluginmachinery/flytek8s/copilot.go @@ -24,8 +24,6 @@ const ( flyteInitContainerName = "downloader" ) -var pTraceCapability = v1.Capability("SYS_PTRACE") - func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []string, volumeMounts ...v1.VolumeMount) (v1.Container, error) { cpu, err := resource.ParseQuantity(cfg.CPU) if err != nil { @@ -88,7 +86,8 @@ func CopilotCommandArgs(storageConfig *storage.Config) []string { }...) } -func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, startTimeout time.Duration, iface *core.TypedInterface) ([]string, error) { +func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, + startTimeout time.Duration, finishTimeout time.Duration, iface *core.TypedInterface) ([]string, error) { if iface == nil { return nil, fmt.Errorf("interface is required for CoPilot Sidecar") } @@ -96,7 +95,7 @@ func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storag if err != nil { return nil, errors.Wrap(err, "failed to marshal given core.TypedInterface") } - return []string{ + command := []string{ "sidecar", "--start-timeout", startTimeout.String(), @@ -108,7 +107,12 @@ func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storag fromLocalPath, "--interface", base64.StdEncoding.EncodeToString(b), - }, nil + } + // Keep backward compatibility here since older version of copilot doesn't have this flag. + if finishTimeout.String() != time.Duration(0).String() { + command = append(command, "--finish-timeout", finishTimeout.String()) + } + return command, nil } func DownloadCommandArgs(fromInputsPath, outputPrefix storage.DataReference, toLocalPath string, format core.DataLoadingConfig_LiteralMapFormat, inputInterface *core.VariableMap) ([]string, error) { @@ -166,13 +170,6 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c return nil } logger.Infof(ctx, "Enabling CoPilot on main container [%s]", c.Name) - if c.SecurityContext == nil { - c.SecurityContext = &v1.SecurityContext{} - } - if c.SecurityContext.Capabilities == nil { - c.SecurityContext.Capabilities = &v1.Capabilities{} - } - c.SecurityContext.Capabilities.Add = append(c.SecurityContext.Capabilities.Add, pTraceCapability) if iFace != nil { if iFace.Inputs != nil { @@ -207,8 +204,6 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot } logger.Infof(ctx, "CoPilot Enabled for task [%s]", taskExecMetadata.GetTaskExecutionID().GetID().TaskId.Name) - shareProcessNamespaceEnabled := true - coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled if iFace != nil { if iFace.Inputs != nil { inPath := cfg.DefaultInputDataPath @@ -258,7 +253,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot coPilotPod.Volumes = append(coPilotPod.Volumes, DataVolume(cfg.OutputVolumeName, size)) // Lets add the Inputs init container - args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, iFace) + args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, cfg.FinishTimeout.Duration, iFace) if err != nil { return err } diff --git a/go/tasks/pluginmachinery/flytek8s/copilot_test.go b/go/tasks/pluginmachinery/flytek8s/copilot_test.go index 587a29ac4..4a2951948 100644 --- a/go/tasks/pluginmachinery/flytek8s/copilot_test.go +++ b/go/tasks/pluginmachinery/flytek8s/copilot_test.go @@ -143,7 +143,7 @@ func TestDownloadCommandArgs(t *testing.T) { } func TestSidecarCommandArgs(t *testing.T) { - _, err := SidecarCommandArgs("", "", "", time.Second*10, nil) + _, err := SidecarCommandArgs("", "", "", time.Second*10, time.Duration(0), nil) assert.Error(t, err) iFace := &core.TypedInterface{ @@ -154,15 +154,18 @@ func TestSidecarCommandArgs(t *testing.T) { }, }, } - d, err := SidecarCommandArgs("/from", "s3://output-meta", "s3://raw-output", time.Second*10, iFace) + d, err := SidecarCommandArgs("/from", "s3://output-meta", "s3://raw-output", time.Second*10, time.Second*10, iFace) assert.NoError(t, err) - expected := []string{"sidecar", "--start-timeout", "10s", "--to-raw-output", "s3://raw-output", "--to-output-prefix", "s3://output-meta", "--from-local-dir", "/from", "--interface", ""} + expected := []string{"sidecar", "--start-timeout", "10s", "--to-raw-output", "s3://raw-output", "--to-output-prefix", "s3://output-meta", "--from-local-dir", "/from", "--interface", "", "--finish-timeout", "10s"} if assert.Len(t, d, len(expected)) { - for i := 0; i < len(expected)-1; i++ { + for i := 0; i < len(expected)-3; i++ { + assert.Equal(t, expected[i], d[i]) + } + for i := len(expected) - 2; i < len(expected); i++ { assert.Equal(t, expected[i], d[i]) } // We cannot compare the last one, as the interface is a map the order is not guaranteed. - ifaceB64 := d[len(expected)-1] + ifaceB64 := d[len(expected)-3] serIFaceBytes, err := base64.StdEncoding.DecodeString(ifaceB64) if assert.NoError(t, err) { if2 := &core.TypedInterface{} @@ -220,34 +223,6 @@ func assertContainerHasVolumeMounts(t *testing.T, cfg config.FlyteCoPilotConfig, } } -func assertContainerHasPTrace(t *testing.T, c *v1.Container) { - assert.NotNil(t, c.SecurityContext) - assert.NotNil(t, c.SecurityContext.Capabilities) - assert.NotNil(t, c.SecurityContext.Capabilities.Add) - capFound := false - for _, cap := range c.SecurityContext.Capabilities.Add { - if cap == pTraceCapability { - capFound = true - } - } - assert.True(t, capFound, "ptrace not found?") -} - -func assertPodHasSNPS(t *testing.T, pod *v1.PodSpec) { - assert.NotNil(t, pod.ShareProcessNamespace) - assert.True(t, *pod.ShareProcessNamespace) - - found := false - for _, c := range pod.Containers { - if c.Name == "test" { - found = true - cntr := c - assertContainerHasPTrace(t, &cntr) - } - } - assert.False(t, found, "user container absent?") -} - func assertPodHasCoPilot(t *testing.T, cfg config.FlyteCoPilotConfig, pilot *core.DataLoadingConfig, iFace *core.TypedInterface, pod *v1.PodSpec) { for _, c := range pod.Containers { if c.Name == "test" { @@ -360,7 +335,6 @@ func TestAddCoPilotToContainer(t *testing.T) { pilot := &core.DataLoadingConfig{Enabled: true} assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, nil, pilot)) assertContainerHasVolumeMounts(t, cfg, pilot, nil, &c) - assertContainerHasPTrace(t, &c) }) t.Run("happy-iface-empty-config", func(t *testing.T) { @@ -381,7 +355,6 @@ func TestAddCoPilotToContainer(t *testing.T) { } pilot := &core.DataLoadingConfig{Enabled: true} assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot)) - assertContainerHasPTrace(t, &c) assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c) }) @@ -407,7 +380,6 @@ func TestAddCoPilotToContainer(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot)) - assertContainerHasPTrace(t, &c) assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c) }) @@ -428,7 +400,6 @@ func TestAddCoPilotToContainer(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot)) - assertContainerHasPTrace(t, &c) assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c) }) @@ -448,7 +419,6 @@ func TestAddCoPilotToContainer(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot)) - assertContainerHasPTrace(t, &c) assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c) }) } @@ -534,7 +504,6 @@ func TestAddCoPilotToPod(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) - assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) @@ -546,7 +515,6 @@ func TestAddCoPilotToPod(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot)) - assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, nil, &pod) }) @@ -566,7 +534,6 @@ func TestAddCoPilotToPod(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) - assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) @@ -585,7 +552,6 @@ func TestAddCoPilotToPod(t *testing.T) { OutputPath: "out", } assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)) - assertPodHasSNPS(t, &pod) assertPodHasCoPilot(t, cfg, pilot, iface, &pod) }) diff --git a/go/tasks/plugins/array/k8s/subtask_exec_context.go b/go/tasks/plugins/array/k8s/subtask_exec_context.go index 0eb0a92e8..81c22ccc6 100644 --- a/go/tasks/plugins/array/k8s/subtask_exec_context.go +++ b/go/tasks/plugins/array/k8s/subtask_exec_context.go @@ -60,8 +60,7 @@ func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecut } // construct TaskTemplate - subtaskTemplate := &core.TaskTemplate{} - *subtaskTemplate = *taskTemplate + subtaskTemplate := taskTemplate subtaskTemplate.TaskTypeVersion = 2 if subtaskTemplate.GetContainer() != nil { diff --git a/go/tasks/plugins/k8s/pod/container.go b/go/tasks/plugins/k8s/pod/container.go index 0ec8240ec..b32290ee9 100644 --- a/go/tasks/plugins/k8s/pod/container.go +++ b/go/tasks/plugins/k8s/pod/container.go @@ -28,5 +28,11 @@ func (containerPodBuilder) buildPodSpec(ctx context.Context, task *core.TaskTemp } func (containerPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error { + pilot := task.GetContainer().GetDataConfig() + if pilot == nil || !pilot.Enabled || task.Interface.Outputs == nil { + return nil + } + pod.Annotations = make(map[string]string) + pod.Annotations[RawContainerName] = pod.Spec.Containers[0].Name return nil } diff --git a/go/tasks/plugins/k8s/pod/container_test.go b/go/tasks/plugins/k8s/pod/container_test.go index d4b907915..5dd913f3f 100644 --- a/go/tasks/plugins/k8s/pod/container_test.go +++ b/go/tasks/plugins/k8s/pod/container_test.go @@ -51,6 +51,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore. tID := &pluginsCoreMock.TaskExecutionID{} tID.On("GetID").Return(core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{Name: "flyte"}, NodeExecutionId: &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ Name: "my_name", @@ -69,17 +70,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore. return taskMetadata } -func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []string, args []string) pluginsCore.TaskExecutionContext { - task := &core.TaskTemplate{ - Type: "test", - Target: &core.TaskTemplate_Container{ - Container: &core.Container{ - Command: command, - Args: args, - }, - }, - } - +func dummyContainerTaskContext(resources *v1.ResourceRequirements, template *core.TaskTemplate, command []string, args []string) pluginsCore.TaskExecutionContext { dummyTaskMetadata := dummyContainerTaskMetadata(resources) taskCtx := &pluginsCoreMock.TaskExecutionContext{} inputReader := &pluginsIOMock.InputReader{} @@ -98,7 +89,7 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str taskCtx.OnOutputWriter().Return(outputReader) taskReader := &pluginsCoreMock.TaskReader{} - taskReader.OnReadMatch(mock.Anything).Return(task, nil) + taskReader.OnReadMatch(mock.Anything).Return(template, nil) taskCtx.OnTaskReader().Return(taskReader) taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata) @@ -118,25 +109,63 @@ func TestContainerTaskExecutor_BuildIdentityResource(t *testing.T) { func TestContainerTaskExecutor_BuildResource(t *testing.T) { command := []string{"command"} args := []string{"{{.Input}}"} - taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args) + task := &core.TaskTemplate{ + Type: "test", + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Command: command, + Args: args, + DataConfig: &core.DataLoadingConfig{Enabled: true}, + }, + }, + Interface: &core.TypedInterface{Outputs: &core.VariableMap{}}, + } - r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx) - assert.NoError(t, err) - assert.NotNil(t, r) - j, ok := r.(*v1.Pod) - assert.True(t, ok) + t.Run("Enable copilot", func(t *testing.T) { + taskCtx := dummyContainerTaskContext(containerResourceRequirements, task, command, args) + r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx) + assert.NoError(t, err) + assert.NotNil(t, r) + j, ok := r.(*v1.Pod) + assert.True(t, ok) + + assert.NotEmpty(t, j.Spec.Containers) + assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU]) + + // TODO: Once configurable, test when setting storage is supported on the cluster vs not. + storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage] + assert.Equal(t, int64(0), (&storageRes).Value()) + + assert.Equal(t, command, j.Spec.Containers[0].Command) + assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args) + + assert.Equal(t, "service-account", j.Spec.ServiceAccountName) + assert.NotNil(t, j.Annotations) + }) + + t.Run("Disable copilot", func(t *testing.T) { + task.GetContainer().DataConfig.Enabled = false + taskCtx := dummyContainerTaskContext(containerResourceRequirements, task, command, args) + r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx) + assert.NoError(t, err) + assert.NotNil(t, r) + j, ok := r.(*v1.Pod) + assert.True(t, ok) - assert.NotEmpty(t, j.Spec.Containers) - assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU]) + assert.NotEmpty(t, j.Spec.Containers) + assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU]) - // TODO: Once configurable, test when setting storage is supported on the cluster vs not. - storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage] - assert.Equal(t, int64(0), (&storageRes).Value()) + // TODO: Once configurable, test when setting storage is supported on the cluster vs not. + storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage] + assert.Equal(t, int64(0), (&storageRes).Value()) - assert.Equal(t, command, j.Spec.Containers[0].Command) - assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args) + assert.Equal(t, command, j.Spec.Containers[0].Command) + assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args) + + assert.Equal(t, "service-account", j.Spec.ServiceAccountName) + assert.Nil(t, j.Annotations) + }) - assert.Equal(t, "service-account", j.Spec.ServiceAccountName) } func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { @@ -191,6 +220,18 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { assert.NotNil(t, phaseInfo) assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase()) }) + + t.Run("raw container failed", func(t *testing.T) { + pod := &v1.Pod{ + Status: v1.PodStatus{}, + } + pod.Annotations = map[string]string{RawContainerName: "raw-container"} + pod.Status.ContainerStatuses = []v1.ContainerStatus{{Name: pod.Annotations[RawContainerName], State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}} + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pod) + assert.NoError(t, err) + assert.NotNil(t, phaseInfo) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase()) + }) } func TestContainerTaskExecutor_GetProperties(t *testing.T) { diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 9e0aea1cb..701aff666 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -21,6 +21,7 @@ import ( const ( podTaskType = "pod" PrimaryContainerKey = "primary_container_name" + RawContainerName = "raw_container_name" ) var ( @@ -121,6 +122,19 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin return pluginsCore.PhaseInfoUndefined, nil } + RawContainerName, exists := r.GetAnnotations()[RawContainerName] + if exists { + // we declare the task as "failure" If raw container fail, but the rawContainer + // task requires all containers (raw container and sidecar) to succeed to declare success + for _, s := range pod.Status.ContainerStatuses { + if s.Name == RawContainerName { + if s.State.Terminated != nil && s.State.Terminated.ExitCode != 0 { + return pluginsCore.PhaseInfoRetryableFailure(s.State.Terminated.Reason, s.State.Terminated.Message, &info), nil + } + } + } + } + primaryContainerName, exists := r.GetAnnotations()[PrimaryContainerKey] if !exists { // if the primary container annotation dos not exist, then the task requires all containers