Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(options): Add consolidation timeout options #1754

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -29,11 +28,10 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/operator/options"
scheduler "sigs.k8s.io/karpenter/pkg/scheduling"
)

const MultiNodeConsolidationTimeoutDuration = 1 * time.Minute

type MultiNodeConsolidation struct {
consolidation
}
Expand Down Expand Up @@ -119,7 +117,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
lastSavedCommand := Command{}
lastSavedResults := scheduling.Results{}
// Set a timeout
timeout := m.clock.Now().Add(MultiNodeConsolidationTimeoutDuration)
timeout := m.clock.Now().Add(options.FromContext(ctx).MultiNodeConsolidationTimeout)
// binary search to find the maximum number of NodeClaims we can terminate
for min <= max {
if m.clock.Now().After(timeout) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package disruption
import (
"context"
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/operator/options"
)

const SingleNodeConsolidationTimeoutDuration = 3 * time.Minute

// SingleNodeConsolidation is the consolidation controller that performs single-node consolidation.
type SingleNodeConsolidation struct {
consolidation
Expand All @@ -49,7 +47,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue, s.Reason())

// Set a timeout
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
timeout := s.clock.Now().Add(options.FromContext(ctx).SingleNodeConsolidationTimeout)
constrainedByBudgets := false

// binary search to find the maximum number of NodeClaims we can terminate
Expand Down
34 changes: 19 additions & 15 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,23 @@ type FeatureGates struct {

// Options contains all CLI flags / env vars for karpenter-core. It adheres to the options.Injectable interface.
type Options struct {
ServiceName string
MetricsPort int
HealthProbePort int
KubeClientQPS int
KubeClientBurst int
EnableProfiling bool
DisableLeaderElection bool
LeaderElectionNamespace string
MemoryLimit int64
LogLevel string
LogOutputPaths string
LogErrorOutputPaths string
BatchMaxDuration time.Duration
BatchIdleDuration time.Duration
FeatureGates FeatureGates
ServiceName string
MetricsPort int
HealthProbePort int
KubeClientQPS int
KubeClientBurst int
EnableProfiling bool
DisableLeaderElection bool
LeaderElectionNamespace string
MemoryLimit int64
LogLevel string
LogOutputPaths string
LogErrorOutputPaths string
BatchMaxDuration time.Duration
BatchIdleDuration time.Duration
FeatureGates FeatureGates
MultiNodeConsolidationTimeout time.Duration
SingleNodeConsolidationTimeout time.Duration
}

type FlagSet struct {
Expand Down Expand Up @@ -96,6 +98,8 @@ func (o *Options) AddFlags(fs *FlagSet) {
fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.")
fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.")
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation")
fs.DurationVar(&o.MultiNodeConsolidationTimeout, "multi-node-consolidation-timeout", env.WithDefaultDuration("MULTI_NODE_CONSOLIDATION_TIMEOUT", 1*time.Minute), "The maximum amount of time that can be spent doing multinode consolidation before timing out.")
fs.DurationVar(&o.SingleNodeConsolidationTimeout, "single-node-consolidation-timeout", env.WithDefaultDuration("SINGLE_NODE_CONSOLIDATION_TIMEOUT", 3*time.Minute), "The maximum amount of time that can be spent doing single node consolidation before timing out.")
}

func (o *Options) Parse(fs *FlagSet, args ...string) error {
Expand Down
Loading