Skip to content

Commit

Permalink
Merge pull request #145 from practo/resync-fix
Browse files Browse the repository at this point in the history
Cooldown after last scale activity: scale-down-delay-after-last-scale-activity
  • Loading branch information
alok87 authored Feb 21, 2022
2 parents e86f75e + 19d43f0 commit 0ee8f23
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 37 deletions.
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ min=2, max=1000, current=500, maxDisruption=125: then the scale down cannot brin
## WPA Controller
```
$ bin/darwin_amd64/workerpodautoscaler run --help
Run the workerpodautoscaler

Usage:
Expand All @@ -127,21 +126,22 @@ Examples:
workerpodautoscaler run

Flags:
--aws-regions string comma separated aws regions of SQS (default "ap-south-1,ap-southeast-1")
--beanstalk-long-poll-interval int the duration (in seconds) for which the beanstalk receive message call waits for a message to arrive (default 20)
--beanstalk-short-poll-interval int the duration (in seconds) after which the next beanstalk api call is made to fetch the queue length (default 20)
-h, --help help for run
--k8s-api-burst int maximum burst for throttle between requests from clients(wpa) to k8s api (default 10)
--k8s-api-qps float qps indicates the maximum QPS to the k8s api from the clients(wpa). (default 5)
--kube-config string path of the kube config file, if not specified in cluster config is used
--metrics-port string specify where to serve the /metrics and /status endpoint. /metrics serve the prometheus metrics for WPA (default ":8787")
--namespace specify the namespace to listen to (default "" all namespaces)
--queue-services string comma separated queue services, the WPA will start with (default "sqs,beanstalkd")
--resync-period int sync period for the worker pod autoscaler (default 20)
--sqs-long-poll-interval int the duration (in seconds) for which the sqs receive message call waits for a message to arrive (default 20)
--sqs-short-poll-interval int the duration (in seconds) after which the next sqs api call is made to fetch the queue length (default 20)
--wpa-default-max-disruption string it is the default value for the maxDisruption in the WPA spec. This specifies how much percentage of pods can be disrupted in a single scale down acitivity. Can be expressed as integers or as a percentage. (default "100%")
--wpa-threads int wpa threadiness, number of threads to process wpa resources (default 10)
--aws-regions string comma separated aws regions of SQS (default "ap-south-1,ap-southeast-1")
--beanstalk-long-poll-interval int the duration (in seconds) for which the beanstalk receive message call waits for a message to arrive (default 20)
--beanstalk-short-poll-interval int the duration (in seconds) after which the next beanstalk api call is made to fetch the queue length (default 20)
-h, --help help for run
--k8s-api-burst int maximum burst for throttle between requests from clients(wpa) to k8s api (default 10)
--k8s-api-qps float qps indicates the maximum QPS to the k8s api from the clients(wpa). (default 5)
--kube-config string path of the kube config file, if not specified in cluster config is used
--metrics-port string specify where to serve the /metrics and /status endpoint. /metrics serve the prometheus metrics for WPA (default ":8787")
--namespace string specify the namespace to listen to
--queue-services string comma separated queue services, the WPA will start with (default "sqs,beanstalkd")
--resync-period int maximum sync period for the control loop but the control loop can execute sooner if the wpa status object gets updated. (default 20)
--scale-down-delay-after-last-scale-activity int scale down delay after last scale up or down in seconds (default 600)
--sqs-long-poll-interval int the duration (in seconds) for which the sqs receive message call waits for a message to arrive (default 20)
--sqs-short-poll-interval int the duration (in seconds) after which the next sqs api call is made to fetch the queue length (default 20)
--wpa-default-max-disruption string it is the default value for the maxDisruption in the WPA spec. This specifies how much percentage of pods can be disrupted in a single scale down acitivity. Can be expressed as integers or as a percentage. (default "100%")
--wpa-threads int wpa threadiness, number of threads to process wpa resources (default 10)

Global Flags:
-v, --v Level number for the log level verbosity
Expand Down
14 changes: 10 additions & 4 deletions cmd/workerpodautoscaler/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (v *runCmd) new() *cobra.Command {
flags := v.Cmd.Flags()

flagNames := []string{
"scale-down-delay-after-last-scale-activity",
"resync-period",
"wpa-threads",
"wpa-default-max-disruption",
Expand All @@ -63,7 +64,8 @@ func (v *runCmd) new() *cobra.Command {
"namespace",
}

flags.Int("resync-period", 20, "sync period for the worker pod autoscaler")
flags.Int("scale-down-delay-after-last-scale-activity", 600, "scale down delay after last scale up or down in seconds")
flags.Int("resync-period", 20, "maximum sync period for the control loop but the control loop can execute sooner if the wpa status object gets updated.")
flags.Int("wpa-threads", 10, "wpa threadiness, number of threads to process wpa resources")
flags.String("wpa-default-max-disruption", "100%", "it is the default value for the maxDisruption in the WPA spec. This specifies how much percentage of pods can be disrupted in a single scale down acitivity. Can be expressed as integers or as a percentage.")
flags.String("aws-regions", "ap-south-1,ap-southeast-1", "comma separated aws regions of SQS")
Expand All @@ -73,7 +75,6 @@ func (v *runCmd) new() *cobra.Command {
flags.Int("beanstalk-short-poll-interval", 20, "the duration (in seconds) after which the next beanstalk api call is made to fetch the queue length")
flags.Int("beanstalk-long-poll-interval", 20, "the duration (in seconds) for which the beanstalk receive message call waits for a message to arrive")
flags.String("queue-services", "sqs,beanstalkd", "comma separated queue services, the WPA will start with")

flags.String("metrics-port", ":8787", "specify where to serve the /metrics and /status endpoint. /metrics serve the prometheus metrics for WPA")
flags.Float64("k8s-api-qps", 5.0, "qps indicates the maximum QPS to the k8s api from the clients(wpa).")
flags.Int("k8s-api-burst", 10, "maximum burst for throttle between requests from clients(wpa) to k8s api")
Expand All @@ -99,8 +100,12 @@ func parseRegions(regionNames string) []string {
}

func (v *runCmd) run(cmd *cobra.Command, args []string) {
scaleDownDelay := time.Second * time.Duration(
v.Viper.GetInt("scale-down-delay-after-last-scale-activity"),
)
resyncPeriod := time.Second * time.Duration(
v.Viper.GetInt("resync-period"))
v.Viper.GetInt("resync-period"),
)
wpaThraeds := v.Viper.GetInt("wpa-threads")
wpaDefaultMaxDisruption := v.Viper.GetString("wpa-default-max-disruption")
awsRegions := parseRegions(v.Viper.GetString("aws-regions"))
Expand Down Expand Up @@ -192,6 +197,8 @@ func (v *runCmd) run(cmd *cobra.Command, args []string) {
kubeInformerFactory.Apps().V1().ReplicaSets(),
customInformerFactory.K8s().V1().WorkerPodAutoScalers(),
wpaDefaultMaxDisruption,
resyncPeriod,
scaleDownDelay,
queues,
)

Expand All @@ -209,7 +216,6 @@ func (v *runCmd) run(cmd *cobra.Command, args []string) {
if err = controller.Run(wpaThraeds, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
return
}

func serveMetrics(metricsPort string) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/workerpodautoscaler/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type WorkerPodAutoScalerStatus struct {
CurrentReplicas int32 `json:"CurrentReplicas"`
AvailableReplicas int32 `json:"AvailableReplicas"`
DesiredReplicas int32 `json:"DesiredReplicas"`

// LastScaleTime is the last time the WorkerPodAutoscaler scaled the workers
// It is used by the autoscaler to control
// how often the number of pods is changed.
// +optional
LastScaleTime *metav1.Time `json:"LastScaleTime,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
61 changes: 44 additions & 17 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ type Controller struct {
defaultMaxDisruption string
// QueueList keeps the list of all the queues in memeory
// which is used by the core controller and the sqs exporter

// scaleDownDelay after last scale up
// the no of seconds to wait after the last scale up before scaling down
scaleDownDelay time.Duration

Queues *queue.Queues
}

Expand All @@ -199,6 +204,8 @@ func NewController(
replicaSetInformer appsinformers.ReplicaSetInformer,
workerPodAutoScalerInformer informers.WorkerPodAutoScalerInformer,
defaultMaxDisruption string,
resyncPeriod time.Duration,
scaleDownDelay time.Duration,
queues *queue.Queues) *Controller {

// Create event broadcaster
Expand All @@ -224,19 +231,20 @@ func NewController(
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkerPodAutoScalers"),
recorder: recorder,
defaultMaxDisruption: defaultMaxDisruption,
scaleDownDelay: scaleDownDelay,
Queues: queues,
}

klog.V(4).Info("Setting up event handlers")

// Set up an event handler for when WorkerPodAutoScaler resources change
workerPodAutoScalerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
workerPodAutoScalerInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddWorkerPodAutoScaler,
UpdateFunc: func(old, new interface{}) {
controller.enqueueUpdateWorkerPodAutoScaler(new)
},
DeleteFunc: controller.enqueueDeleteWorkerPodAutoScaler,
})
}, resyncPeriod)
return controller
}

Expand Down Expand Up @@ -364,7 +372,7 @@ func (c *Controller) syncHandler(ctx context.Context, event WokerPodAutoScalerEv
// Get the Deployment with the name specified in WorkerPodAutoScaler.spec
deployment, err := c.deploymentLister.Deployments(workerPodAutoScaler.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
return fmt.Errorf("Deployment %s not found in namespace %s",
return fmt.Errorf("deployment %s not found in namespace %s",
deploymentName, workerPodAutoScaler.Namespace)
} else if err != nil {
return err
Expand Down Expand Up @@ -447,6 +455,7 @@ func (c *Controller) syncHandler(ctx context.Context, event WokerPodAutoScalerEv
*workerPodAutoScaler.Spec.MaxReplicas,
workerPodAutoScaler.GetMaxDisruption(c.defaultMaxDisruption),
)
klog.V(2).Infof("%s current: %d", queueName, currentWorkers)
klog.V(2).Infof("%s qMsgs: %d, desired: %d",
queueName, queueMessages, desiredWorkers)

Expand Down Expand Up @@ -482,14 +491,33 @@ func (c *Controller) syncHandler(ctx context.Context, event WokerPodAutoScalerEv
queueName,
).Set(float64(availableWorkers))

if desiredWorkers != currentWorkers {
lastScaleTime := workerPodAutoScaler.Status.LastScaleTime.DeepCopy()

op := GetScaleOperation(
queueName,
desiredWorkers,
currentWorkers,
lastScaleTime,
c.scaleDownDelay,
)

if op == ScaleUp || op == ScaleDown {
if deploymentName != "" {
c.updateDeployment(ctx, workerPodAutoScaler.Namespace, deploymentName, &desiredWorkers)
c.updateDeployment(
ctx,
workerPodAutoScaler.Namespace, deploymentName, &desiredWorkers)
} else {
c.updateReplicaSet(ctx, workerPodAutoScaler.Namespace, replicaSetName, &desiredWorkers)
c.updateReplicaSet(
ctx,
workerPodAutoScaler.Namespace, replicaSetName, &desiredWorkers)
}

now := metav1.Now()
lastScaleTime = &now
}

klog.V(2).Infof("%s scaleOp: %v", queueName, scaleOpString(op))

// Finally, we update the status block of the WorkerPodAutoScaler resource to reflect the
// current state of the world
updateWorkerPodAutoScalerStatus(
Expand All @@ -502,6 +530,7 @@ func (c *Controller) syncHandler(ctx context.Context, event WokerPodAutoScalerEv
currentWorkers,
availableWorkers,
queueMessages,
lastScaleTime,
)

loopDurationSeconds.WithLabelValues(
Expand All @@ -524,15 +553,15 @@ func (c *Controller) updateDeployment(ctx context.Context, namespace string, dep
// Retrieve the latest version of the Deployment before attempting update
deployment, getErr := c.deploymentLister.Deployments(namespace).Get(deploymentName)
if errors.IsNotFound(getErr) {
return fmt.Errorf("Deployment %s was not found in namespace %s",
return fmt.Errorf("deployment %s was not found in namespace %s",
deploymentName, namespace)
}
if getErr != nil {
klog.Fatalf("Failed to get deployment: %v", getErr)
}

deployment.Spec.Replicas = replicas
deployment, updateErr := c.kubeclientset.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{})
_, updateErr := c.kubeclientset.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if updateErr != nil {
klog.Errorf("Failed to update deployment: %v", updateErr)
}
Expand All @@ -557,7 +586,7 @@ func (c *Controller) updateReplicaSet(ctx context.Context, namespace string, rep
}

replicaSet.Spec.Replicas = replicas
replicaSet, updateErr := c.kubeclientset.AppsV1().ReplicaSets(namespace).Update(ctx, replicaSet, metav1.UpdateOptions{})
_, updateErr := c.kubeclientset.AppsV1().ReplicaSets(namespace).Update(ctx, replicaSet, metav1.UpdateOptions{})
if updateErr != nil {
klog.Errorf("Failed to update ReplicaSet: %v", updateErr)
}
Expand Down Expand Up @@ -612,11 +641,7 @@ func getMinWorkers(
}

func isChangeTooSmall(desired int32, current int32, tolerance float64) bool {
if math.Abs(float64(desired-current))/float64(current) <= tolerance {
return true
}

return false
return math.Abs(float64(desired-current))/float64(current) <= tolerance
}

// GetDesiredWorkers finds the desired number of workers which are required
Expand Down Expand Up @@ -767,12 +792,14 @@ func updateWorkerPodAutoScalerStatus(
workerPodAutoScaler *v1.WorkerPodAutoScaler,
currentWorkers int32,
availableWorkers int32,
queueMessages int32) {
queueMessages int32,
lastScaleTime *metav1.Time) {

if workerPodAutoScaler.Status.CurrentReplicas == currentWorkers &&
workerPodAutoScaler.Status.AvailableReplicas == availableWorkers &&
workerPodAutoScaler.Status.DesiredReplicas == desiredWorkers &&
workerPodAutoScaler.Status.CurrentMessages == queueMessages {
workerPodAutoScaler.Status.CurrentMessages == queueMessages &&
workerPodAutoScaler.Status.LastScaleTime.Equal(lastScaleTime) {
klog.V(4).Infof("%s/%s: WPA status is already up to date\n", namespace, name)
return
} else {
Expand All @@ -787,6 +814,7 @@ func updateWorkerPodAutoScalerStatus(
workerPodAutoScalerCopy.Status.AvailableReplicas = availableWorkers
workerPodAutoScalerCopy.Status.DesiredReplicas = desiredWorkers
workerPodAutoScalerCopy.Status.CurrentMessages = queueMessages
workerPodAutoScalerCopy.Status.LastScaleTime = lastScaleTime
// If the CustomResourceSubresources feature gate is not enabled,
// we must use Update instead of UpdateStatus to update the Status block of the WorkerPodAutoScaler resource.
// UpdateStatus will not allow changes to the Spec of the resource,
Expand All @@ -797,7 +825,6 @@ func updateWorkerPodAutoScalerStatus(
return
}
klog.V(4).Infof("%s/%s: Updated wpa status\n", namespace, name)
return
}

// getKeyForWorkerPodAutoScaler takes a WorkerPodAutoScaler resource and converts it into a namespace/name
Expand Down
84 changes: 84 additions & 0 deletions pkg/controller/scale_operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package controller

import (
"time"

"github.com/practo/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type ScaleOperation int

const (
ScaleUp ScaleOperation = iota
ScaleDown
ScaleNoop
)

func GetScaleOperation(
q string,
desiredWorkers int32,
currentWorkers int32,
lastScaleTime *metav1.Time,
scaleDownDelay time.Duration) ScaleOperation {

if desiredWorkers > currentWorkers {
return ScaleUp
}

if desiredWorkers == currentWorkers {
return ScaleNoop
}

if canScaleDown(
q, desiredWorkers, currentWorkers, lastScaleTime, scaleDownDelay) {
return ScaleDown
}

return ScaleNoop
}

// canScaleDown checks the scaleDownDelay and the lastScaleTime to decide
// if scaling is required. Checks coolOff!
func canScaleDown(
q string,
desiredWorkers int32,
currentWorkers int32,
lastScaleTime *metav1.Time, scaleDownDelay time.Duration) bool {

if lastScaleTime == nil {
klog.V(2).Infof("%s scaleDownDelay ignored, lastScaleTime is nil", q)
return true
}

nextScaleDownTime := metav1.NewTime(
lastScaleTime.Time.Add(scaleDownDelay),
)
now := metav1.Now()

if nextScaleDownTime.Before(&now) {
klog.V(2).Infof("%s scaleDown is allowed, cooloff passed", q)
return true
}

klog.V(2).Infof(
"%s scaleDown forbidden, nextScaleDownTime: %v",
q,
nextScaleDownTime,
)

return false
}

func scaleOpString(op ScaleOperation) string {
switch op {
case ScaleUp:
return "scale-up"
case ScaleDown:
return "scale-down"
case ScaleNoop:
return "no scaling operation"
default:
return ""
}
}
Loading

0 comments on commit 0ee8f23

Please sign in to comment.