Skip to content

Commit

Permalink
Implementing job cancellation and preemption on queue (#271) (#4039)
Browse files Browse the repository at this point in the history
* Implementing job cancellation/preemption by queue

* Adding aliases for queue preempt and cancel

* Improving error messages, fixing type

* Adding cmd tests

Co-authored-by: Mustafa Ilyas <[email protected]>
  • Loading branch information
MustafaI and mustafai-gr authored Nov 11, 2024
1 parent fb4d939 commit 7275344
Show file tree
Hide file tree
Showing 27 changed files with 2,619 additions and 286 deletions.
89 changes: 89 additions & 0 deletions cmd/armadactl/cmd/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"
)

func cancelCmd() *cobra.Command {
Expand All @@ -19,6 +21,7 @@ func cancelCmd() *cobra.Command {
cancelJobCmd(),
cancelJobSetCmd(),
cancelExecutorCmd(),
cancelQueueCmd(),
)
return cmd
}
Expand Down Expand Up @@ -101,3 +104,89 @@ func cancelExecutorCmd() *cobra.Command {
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}

func cancelQueueCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Short: "Cancels jobs on queues.",
Long: `Cancels jobs on queues with provided name, priority classes and job states. Allows selecting of queues by label or name, one of which must be provided. All flags with multiple values must be comma separated.`,
Aliases: []string{"queue"},
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("job-states"); err != nil {
return err
}
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return err
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

onlyCordoned, err := cmd.Flags().GetBool("only-cordoned")
if err != nil {
return fmt.Errorf("error reading only-cordoned flag: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("selector")
if err != nil {
return fmt.Errorf("error reading queue label selector: %s", err)
}

jobStates, err := cmd.Flags().GetStringSlice("job-states")
if err != nil {
return fmt.Errorf("error reading job-states flag: %s", err)
}

var activeJobStates []utils.ActiveJobState
for _, state := range jobStates {
activeState, err := utils.ActiveJobStateFromString(state)
if err != nil {
return fmt.Errorf("error determining active job state corresponding to %s: %s", state, err)
}
activeJobStates = append(activeJobStates, activeState)
}

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-classes flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) > 0 && len(labels) > 0 {
return fmt.Errorf("queues can be selected either with a set of names or a set of labels, but not both")
} else if len(queues) == 0 && len(labels) == 0 {
// This check makes accidentally cancelling all jobs far less likely
return fmt.Errorf("queue selection must be narrowed down either by names or by labels")
}

return a.CancelOnQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: labels,
InvertResult: inverse,
OnlyCordoned: onlyCordoned,
}, priorityClasses, activeJobStates, dryRun)
},
}
cmd.Flags().StringSliceP("job-states", "s", []string{}, "Jobs in the provided job states will be cancelled. Allowed values: queued,leased,pending,running.")
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Jobs matching the provided priority classes will be cancelled.")
cmd.Flags().StringSliceP("selector", "l", []string{}, "Select queues by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to cancel all queues that don't match the specified criteria. Defaults to false.")
cmd.Flags().Bool("only-cordoned", false, "Only cancels queues that are cordoned. Defaults to false.")
cmd.Flags().Bool("dry-run", false, "Prints out queues on which jobs will be cancelled. Defaults to false.")

return cmd
}
66 changes: 66 additions & 0 deletions cmd/armadactl/cmd/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,69 @@ func TestCancel(t *testing.T) {
})
}
}

func TestCancelQueue(t *testing.T) {
tests := map[string]struct {
Flags []flag
jobStates []string
selectors []string
priorityClasses []string
inverse bool
onlyCordoned bool
dryRun bool
}{
"default flags": {nil, []string{}, []string{}, []string{}, false, false, false},
"valid selectors": {[]flag{{"selectors", "armadaproject.io/priority=high,armadaproject.io/category=critical"}}, []string{}, []string{"armadaproject.io/priority=high", "armadaproject.io/category=critical"}, []string{}, false, false, false},
"valid job-states 1": {[]flag{{"job-states", "queued"}}, []string{"queued"}, []string{}, []string{}, false, false, false},
"valid job-states 2": {[]flag{{"job-states", "queued,leased,pending,running"}}, []string{"queued", "leased", "pending", "running"}, []string{}, []string{}, false, false, false},
"valid priority-classes 1": {[]flag{{"priority-classes", "armada-default"}}, []string{}, []string{}, []string{"armada-default"}, false, false, false},
"valid priority-classes 2": {[]flag{{"priority-classes", "armada-default,armada-preemptible"}}, []string{}, []string{}, []string{"armada-default", "armada-preemptible"}, false, false, false},
"valid multiple flags": {
[]flag{{"selectors", "armadaproject.io/priority=high,armadaproject.io/category=critical"}, {"job-states", "queued,leased,pending,running"}, {"priority-classes", "armada-default,armada-preemptible"}},
[]string{"queued", "leased", "pending", "running"},
[]string{"armadaproject.io/priority=high", "armadaproject.io/category=critical"},
[]string{"armada-default", "armada-preemptible"},
true, true, true,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
a := armadactl.New()
cmd := cancelQueueCmd()

cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
a.Out = io.Discard

if len(test.jobStates) > 0 {
jobStatesFlag, err := cmd.Flags().GetString("job-states")
require.NoError(t, err)
require.Equal(t, test.jobStates, jobStatesFlag)
}
if len(test.selectors) > 0 {
selectorsFlag, err := cmd.Flags().GetString("selectors")
require.Error(t, err)
require.Equal(t, test.selectors, selectorsFlag)
}
if len(test.priorityClasses) > 0 {
priorityClassesFlag, err := cmd.Flags().GetString("priority-classes")
require.Error(t, err)
require.Equal(t, test.priorityClasses, priorityClassesFlag)
}

inverseValue, err := cmd.Flags().GetBool("inverse")
require.NoError(t, err)
require.Equal(t, test, inverseValue)

onlyCordonedValue, err := cmd.Flags().GetBool("only-cordoned")
require.NoError(t, err)
require.Equal(t, test, onlyCordonedValue)

dryRunValue, err := cmd.Flags().GetBool("dry-run")
require.NoError(t, err)
require.Equal(t, test, dryRunValue)

return nil
}
})
}
}
9 changes: 5 additions & 4 deletions cmd/armadactl/cmd/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cmd
import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"

"github.com/spf13/cobra"
)

func cordon() *cobra.Command {
Expand Down Expand Up @@ -43,7 +44,7 @@ func cordonQueues(a *armadactl.App) *cobra.Command {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func uncordonQueues(a *armadactl.App) *cobra.Command {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.QueueAPI.Update = cq.Update(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cordon = cq.Cordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Uncordon = cq.Uncordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Preempt = cq.Preempt(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cancel = cq.Cancel(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
Expand Down
71 changes: 71 additions & 0 deletions cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"
)

func preemptCmd() *cobra.Command {
Expand All @@ -17,6 +19,7 @@ func preemptCmd() *cobra.Command {
cmd.AddCommand(
preemptJobCmd(),
preemptExecutorCmd(),
preemptQueuesCmd(),
)
return cmd
}
Expand Down Expand Up @@ -80,3 +83,71 @@ func preemptExecutorCmd() *cobra.Command {
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}

func preemptQueuesCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Short: "Preempts jobs on queues.",
Long: `Preempts jobs on selected queues in specified priority classes. Allows selecting of queues by label or name, one of which must be provided. All flags with multiple values must be comma separated.`,
Aliases: []string{"queue"},
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return err
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

onlyCordoned, err := cmd.Flags().GetBool("only-cordoned")
if err != nil {
return fmt.Errorf("error reading only-cordoned flag: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("selector")
if err != nil {
return fmt.Errorf("error reading queue label selector: %s", err)
}

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-classes flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) > 0 && len(labels) > 0 {
return fmt.Errorf("queues can be selected either with a set of names or a set of labels, but not both")
} else if len(queues) == 0 && len(labels) == 0 {
// This check makes accidentally preempting all jobs far less likely
return fmt.Errorf("queue selection must be narrowed down either by names or by labels")
}

return a.PreemptOnQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: labels,
InvertResult: inverse,
OnlyCordoned: onlyCordoned,
}, priorityClasses, dryRun)
},
}
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Jobs matching the provided priority classes will be preempted.")
cmd.Flags().StringSliceP("selector", "l", []string{}, "Select queues to preempt by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to preempt all queues that don't match the specified criteria. Defaults to false.")
cmd.Flags().Bool("only-cordoned", false, "Only preempts queues that are cordoned. Defaults to false.")
cmd.Flags().Bool("dry-run", false, "Prints out queues on which jobs will be preempted. Defaults to false.")

return cmd
}
68 changes: 68 additions & 0 deletions cmd/armadactl/cmd/preempt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cmd

import (
"io"
"testing"

"github.com/spf13/cobra"
"github.com/stretchr/testify/require"

"github.com/armadaproject/armada/internal/armadactl"
)

func TestPreemptQueue(t *testing.T) {
tests := map[string]struct {
Flags []flag
selectors []string
priorityClasses []string
inverse bool
onlyCordoned bool
dryRun bool
}{
"default flags": {nil, []string{}, []string{}, false, false, false},
"valid selectors": {[]flag{{"selectors", "armadaproject.io/priority=high,armadaproject.io/category=critical"}}, []string{"armadaproject.io/priority=high", "armadaproject.io/category=critical"}, []string{}, false, false, false},
"valid priority-classes 1": {[]flag{{"priority-classes", "armada-default"}}, []string{}, []string{"armada-default"}, false, false, false},
"valid priority-classes 2": {[]flag{{"priority-classes", "armada-default,armada-preemptible"}}, []string{}, []string{"armada-default", "armada-preemptible"}, false, false, false},
"valid multiple flags": {
[]flag{{"selectors", "armadaproject.io/priority=high,armadaproject.io/category=critical"}, {"priority-classes", "armada-default,armada-preemptible"}},
[]string{"armadaproject.io/priority=high", "armadaproject.io/category=critical"},
[]string{"armada-default", "armada-preemptible"},
true, true, true,
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
a := armadactl.New()
cmd := preemptQueuesCmd()

cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
a.Out = io.Discard

if len(test.selectors) > 0 {
selectorsFlag, err := cmd.Flags().GetString("selectors")
require.Error(t, err)
require.Equal(t, test.selectors, selectorsFlag)
}
if len(test.priorityClasses) > 0 {
priorityClassesFlag, err := cmd.Flags().GetString("priority-classes")
require.Error(t, err)
require.Equal(t, test.priorityClasses, priorityClassesFlag)
}

inverseValue, err := cmd.Flags().GetBool("inverse")
require.NoError(t, err)
require.Equal(t, test, inverseValue)

onlyCordonedValue, err := cmd.Flags().GetBool("only-cordoned")
require.NoError(t, err)
require.Equal(t, test, onlyCordonedValue)

dryRunValue, err := cmd.Flags().GetBool("dry-run")
require.NoError(t, err)
require.Equal(t, test, dryRunValue)

return nil
}
})
}
}
Loading

0 comments on commit 7275344

Please sign in to comment.