Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Remove sys_ptrace dependency #264

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type FlyteCoPilotConfig struct {
// Time for which the sidecar container should wait after starting up, for the primary process to appear. If it does not show up in this time
// the process will be assumed to be dead or in a terminal condition and will trigger an abort.
StartTimeout config2.Duration `json:"start-timeout" pflag:"-,Time for which the sidecar should wait on startup before assuming the primary container to have failed startup."`
// Time for which the sidecar container should wait for the primary process to complete. If it does not end up in this time
// the process will be assumed to be dead or in a terminal condition and will trigger an abort.
FinishTimeout config2.Duration `json:"finish-timeout" pflag:"-,Time for which the sidecar should wait for primary container to complete."`
// Resources for CoPilot Containers
CPU string `json:"cpu" pflag:",Used to set cpu for co-pilot containers"`
Memory string `json:"memory" pflag:",Used to set memory for co-pilot containers"`
Expand Down
25 changes: 10 additions & 15 deletions go/tasks/pluginmachinery/flytek8s/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ const (
flyteInitContainerName = "downloader"
)

var pTraceCapability = v1.Capability("SYS_PTRACE")

func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []string, volumeMounts ...v1.VolumeMount) (v1.Container, error) {
cpu, err := resource.ParseQuantity(cfg.CPU)
if err != nil {
Expand Down Expand Up @@ -88,15 +86,16 @@ func CopilotCommandArgs(storageConfig *storage.Config) []string {
}...)
}

func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, startTimeout time.Duration, iface *core.TypedInterface) ([]string, error) {
func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference,
startTimeout time.Duration, finishTimeout time.Duration, iface *core.TypedInterface) ([]string, error) {
if iface == nil {
return nil, fmt.Errorf("interface is required for CoPilot Sidecar")
}
b, err := proto.Marshal(iface)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal given core.TypedInterface")
}
return []string{
command := []string{
"sidecar",
"--start-timeout",
startTimeout.String(),
Expand All @@ -108,7 +107,12 @@ func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storag
fromLocalPath,
"--interface",
base64.StdEncoding.EncodeToString(b),
}, nil
}
// Keep backward compatibility here since older version of copilot doesn't have this flag.
if finishTimeout.String() != time.Duration(0).String() {
command = append(command, "--finish-timeout", finishTimeout.String())
}
return command, nil
}

func DownloadCommandArgs(fromInputsPath, outputPrefix storage.DataReference, toLocalPath string, format core.DataLoadingConfig_LiteralMapFormat, inputInterface *core.VariableMap) ([]string, error) {
Expand Down Expand Up @@ -166,13 +170,6 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
return nil
}
logger.Infof(ctx, "Enabling CoPilot on main container [%s]", c.Name)
if c.SecurityContext == nil {
c.SecurityContext = &v1.SecurityContext{}
}
if c.SecurityContext.Capabilities == nil {
c.SecurityContext.Capabilities = &v1.Capabilities{}
}
c.SecurityContext.Capabilities.Add = append(c.SecurityContext.Capabilities.Add, pTraceCapability)

if iFace != nil {
if iFace.Inputs != nil {
Expand Down Expand Up @@ -207,8 +204,6 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
}

logger.Infof(ctx, "CoPilot Enabled for task [%s]", taskExecMetadata.GetTaskExecutionID().GetID().TaskId.Name)
shareProcessNamespaceEnabled := true
coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled
if iFace != nil {
if iFace.Inputs != nil {
inPath := cfg.DefaultInputDataPath
Expand Down Expand Up @@ -258,7 +253,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
coPilotPod.Volumes = append(coPilotPod.Volumes, DataVolume(cfg.OutputVolumeName, size))

// Lets add the Inputs init container
args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, iFace)
args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, cfg.FinishTimeout.Duration, iFace)
if err != nil {
return err
}
Expand Down
50 changes: 8 additions & 42 deletions go/tasks/pluginmachinery/flytek8s/copilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestDownloadCommandArgs(t *testing.T) {
}

func TestSidecarCommandArgs(t *testing.T) {
_, err := SidecarCommandArgs("", "", "", time.Second*10, nil)
_, err := SidecarCommandArgs("", "", "", time.Second*10, time.Duration(0), nil)
assert.Error(t, err)

iFace := &core.TypedInterface{
Expand All @@ -154,15 +154,18 @@ func TestSidecarCommandArgs(t *testing.T) {
},
},
}
d, err := SidecarCommandArgs("/from", "s3://output-meta", "s3://raw-output", time.Second*10, iFace)
d, err := SidecarCommandArgs("/from", "s3://output-meta", "s3://raw-output", time.Second*10, time.Second*10, iFace)
assert.NoError(t, err)
expected := []string{"sidecar", "--start-timeout", "10s", "--to-raw-output", "s3://raw-output", "--to-output-prefix", "s3://output-meta", "--from-local-dir", "/from", "--interface", "<interface>"}
expected := []string{"sidecar", "--start-timeout", "10s", "--to-raw-output", "s3://raw-output", "--to-output-prefix", "s3://output-meta", "--from-local-dir", "/from", "--interface", "<interface>", "--finish-timeout", "10s"}
if assert.Len(t, d, len(expected)) {
for i := 0; i < len(expected)-1; i++ {
for i := 0; i < len(expected)-3; i++ {
assert.Equal(t, expected[i], d[i])
}
for i := len(expected) - 2; i < len(expected); i++ {
assert.Equal(t, expected[i], d[i])
}
// We cannot compare the last one, as the interface is a map the order is not guaranteed.
ifaceB64 := d[len(expected)-1]
ifaceB64 := d[len(expected)-3]
serIFaceBytes, err := base64.StdEncoding.DecodeString(ifaceB64)
if assert.NoError(t, err) {
if2 := &core.TypedInterface{}
Expand Down Expand Up @@ -220,34 +223,6 @@ func assertContainerHasVolumeMounts(t *testing.T, cfg config.FlyteCoPilotConfig,
}
}

func assertContainerHasPTrace(t *testing.T, c *v1.Container) {
assert.NotNil(t, c.SecurityContext)
assert.NotNil(t, c.SecurityContext.Capabilities)
assert.NotNil(t, c.SecurityContext.Capabilities.Add)
capFound := false
for _, cap := range c.SecurityContext.Capabilities.Add {
if cap == pTraceCapability {
capFound = true
}
}
assert.True(t, capFound, "ptrace not found?")
}

func assertPodHasSNPS(t *testing.T, pod *v1.PodSpec) {
assert.NotNil(t, pod.ShareProcessNamespace)
assert.True(t, *pod.ShareProcessNamespace)

found := false
for _, c := range pod.Containers {
if c.Name == "test" {
found = true
cntr := c
assertContainerHasPTrace(t, &cntr)
}
}
assert.False(t, found, "user container absent?")
}

func assertPodHasCoPilot(t *testing.T, cfg config.FlyteCoPilotConfig, pilot *core.DataLoadingConfig, iFace *core.TypedInterface, pod *v1.PodSpec) {
for _, c := range pod.Containers {
if c.Name == "test" {
Expand Down Expand Up @@ -360,7 +335,6 @@ func TestAddCoPilotToContainer(t *testing.T) {
pilot := &core.DataLoadingConfig{Enabled: true}
assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, nil, pilot))
assertContainerHasVolumeMounts(t, cfg, pilot, nil, &c)
assertContainerHasPTrace(t, &c)
})

t.Run("happy-iface-empty-config", func(t *testing.T) {
Expand All @@ -381,7 +355,6 @@ func TestAddCoPilotToContainer(t *testing.T) {
}
pilot := &core.DataLoadingConfig{Enabled: true}
assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot))
assertContainerHasPTrace(t, &c)
assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c)
})

Expand All @@ -407,7 +380,6 @@ func TestAddCoPilotToContainer(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot))
assertContainerHasPTrace(t, &c)
assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c)
})

Expand All @@ -428,7 +400,6 @@ func TestAddCoPilotToContainer(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot))
assertContainerHasPTrace(t, &c)
assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c)
})

Expand All @@ -448,7 +419,6 @@ func TestAddCoPilotToContainer(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToContainer(ctx, cfg, &c, iface, pilot))
assertContainerHasPTrace(t, &c)
assertContainerHasVolumeMounts(t, cfg, pilot, iface, &c)
})
}
Expand Down Expand Up @@ -534,7 +504,6 @@ func TestAddCoPilotToPod(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})

Expand All @@ -546,7 +515,6 @@ func TestAddCoPilotToPod(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot))
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, nil, &pod)
})

Expand All @@ -566,7 +534,6 @@ func TestAddCoPilotToPod(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})

Expand All @@ -585,7 +552,6 @@ func TestAddCoPilotToPod(t *testing.T) {
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})

Expand Down
3 changes: 1 addition & 2 deletions go/tasks/plugins/array/k8s/subtask_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecut
}

// construct TaskTemplate
subtaskTemplate := &core.TaskTemplate{}
*subtaskTemplate = *taskTemplate
subtaskTemplate := taskTemplate

subtaskTemplate.TaskTypeVersion = 2
if subtaskTemplate.GetContainer() != nil {
Expand Down
6 changes: 6 additions & 0 deletions go/tasks/plugins/k8s/pod/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ func (containerPodBuilder) buildPodSpec(ctx context.Context, task *core.TaskTemp
}

func (containerPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error {
pilot := task.GetContainer().GetDataConfig()
if pilot == nil || !pilot.Enabled || task.Interface.Outputs == nil {
return nil
}
pod.Annotations = make(map[string]string)
pod.Annotations[RawContainerName] = pod.Spec.Containers[0].Name
return nil
}
93 changes: 67 additions & 26 deletions go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore.

tID := &pluginsCoreMock.TaskExecutionID{}
tID.On("GetID").Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{Name: "flyte"},
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my_name",
Expand All @@ -69,17 +70,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore.
return taskMetadata
}

func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []string, args []string) pluginsCore.TaskExecutionContext {
task := &core.TaskTemplate{
Type: "test",
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Command: command,
Args: args,
},
},
}

func dummyContainerTaskContext(resources *v1.ResourceRequirements, template *core.TaskTemplate, command []string, args []string) pluginsCore.TaskExecutionContext {
dummyTaskMetadata := dummyContainerTaskMetadata(resources)
taskCtx := &pluginsCoreMock.TaskExecutionContext{}
inputReader := &pluginsIOMock.InputReader{}
Expand All @@ -98,7 +89,7 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
taskCtx.OnOutputWriter().Return(outputReader)

taskReader := &pluginsCoreMock.TaskReader{}
taskReader.OnReadMatch(mock.Anything).Return(task, nil)
taskReader.OnReadMatch(mock.Anything).Return(template, nil)
taskCtx.OnTaskReader().Return(taskReader)

taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)
Expand All @@ -118,25 +109,63 @@ func TestContainerTaskExecutor_BuildIdentityResource(t *testing.T) {
func TestContainerTaskExecutor_BuildResource(t *testing.T) {
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)
task := &core.TaskTemplate{
Type: "test",
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Command: command,
Args: args,
DataConfig: &core.DataLoadingConfig{Enabled: true},
},
},
Interface: &core.TypedInterface{Outputs: &core.VariableMap{}},
}

r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx)
assert.NoError(t, err)
assert.NotNil(t, r)
j, ok := r.(*v1.Pod)
assert.True(t, ok)
t.Run("Enable copilot", func(t *testing.T) {
taskCtx := dummyContainerTaskContext(containerResourceRequirements, task, command, args)
r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx)
assert.NoError(t, err)
assert.NotNil(t, r)
j, ok := r.(*v1.Pod)
assert.True(t, ok)

assert.NotEmpty(t, j.Spec.Containers)
assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])

// TODO: Once configurable, test when setting storage is supported on the cluster vs not.
storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage]
assert.Equal(t, int64(0), (&storageRes).Value())

assert.Equal(t, command, j.Spec.Containers[0].Command)
assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args)

assert.Equal(t, "service-account", j.Spec.ServiceAccountName)
assert.NotNil(t, j.Annotations)
})

t.Run("Disable copilot", func(t *testing.T) {
task.GetContainer().DataConfig.Enabled = false
taskCtx := dummyContainerTaskContext(containerResourceRequirements, task, command, args)
r, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx)
assert.NoError(t, err)
assert.NotNil(t, r)
j, ok := r.(*v1.Pod)
assert.True(t, ok)

assert.NotEmpty(t, j.Spec.Containers)
assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])
assert.NotEmpty(t, j.Spec.Containers)
assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])

// TODO: Once configurable, test when setting storage is supported on the cluster vs not.
storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage]
assert.Equal(t, int64(0), (&storageRes).Value())
// TODO: Once configurable, test when setting storage is supported on the cluster vs not.
storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage]
assert.Equal(t, int64(0), (&storageRes).Value())

assert.Equal(t, command, j.Spec.Containers[0].Command)
assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args)
assert.Equal(t, command, j.Spec.Containers[0].Command)
assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args)

assert.Equal(t, "service-account", j.Spec.ServiceAccountName)
assert.Nil(t, j.Annotations)
})

assert.Equal(t, "service-account", j.Spec.ServiceAccountName)
}

func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
Expand Down Expand Up @@ -191,6 +220,18 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
assert.NotNil(t, phaseInfo)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
})

t.Run("raw container failed", func(t *testing.T) {
pod := &v1.Pod{
Status: v1.PodStatus{},
}
pod.Annotations = map[string]string{RawContainerName: "raw-container"}
pod.Status.ContainerStatuses = []v1.ContainerStatus{{Name: pod.Annotations[RawContainerName], State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}}
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pod)
assert.NoError(t, err)
assert.NotNil(t, phaseInfo)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
})
}

func TestContainerTaskExecutor_GetProperties(t *testing.T) {
Expand Down
Loading