Skip to content

Commit

Permalink
cluster-autoscaler: Upgrade to k8s 1.29 (#1151)
Browse files Browse the repository at this point in the history
This patch was essentially a rewrite of the original. 

Context being, upstream, the cluster autoscaler team unified the watch
and lister interfaces using informers. So, instead of patching both of
those methods, only the `List()` method on the `allPodLister` is
modified. Assuming my understanding is correct it'll have the same
behaviour but with much less code (as evidenced by the removed lines in
the original patch)

I'm not sure how to validate it works as expected other than allowing
the e2e tests to run, so if there is anything I can do, let me know :)
  • Loading branch information
edude03 authored Nov 28, 2024
1 parent ff8c271 commit 89bd60d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 120 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# NOTE: This must match CA's builder/Dockerfile:
# https://github.com/kubernetes/autoscaler/blob/<GIT_TAG>/builder/Dockerfile
FROM golang:1.20.12 AS builder
FROM golang:1.21.6 AS builder

WORKDIR /workspace

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.branch
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cluster-autoscaler-release-1.28
cluster-autoscaler-release-1.29
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.commit
Original file line number Diff line number Diff line change
@@ -1 +1 @@
10a229ac17ea8049248d1c3ce2923b94a4f9085c
d4bbc686ac02a77a6ad1362fe7bbda387e8f074a
155 changes: 38 additions & 117 deletions cluster-autoscaler/ca.patch
Original file line number Diff line number Diff line change
@@ -1,45 +1,22 @@
diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go
index 198fdfb37..d534fc1ef 100644
index b9be94b6e..5efb40df2 100644
--- a/cluster-autoscaler/utils/kubernetes/listers.go
+++ b/cluster-autoscaler/utils/kubernetes/listers.go
@@ -17,14 +17,19 @@ limitations under the License.
@@ -17,10 +17,12 @@ limitations under the License.
package kubernetes

import (
+ "encoding/json"
"time"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
client "k8s.io/client-go/kubernetes"
v1appslister "k8s.io/client-go/listers/apps/v1"
v1batchlister "k8s.io/client-go/listers/batch/v1"
@@ -169,6 +174,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -212,6 +218,7 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan
selector := fields.ParseSelectorOrDie("status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -221,6 +228,105 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan
}
"k8s.io/client-go/informers"
@@ -46,6 +48,14 @@ type ListerRegistry interface {
StatefulSetLister() v1appslister.StatefulSetLister
}

+// copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go.
Expand All @@ -50,97 +27,41 @@ index 198fdfb37..d534fc1ef 100644
+ Memory resource.Quantity `json:"memory"`
+}
+
+func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch {
+ updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
type listerRegistryImpl struct {
allNodeLister NodeLister
readyNodeLister NodeLister
@@ -221,6 +231,22 @@ type AllPodLister struct {
podLister v1lister.PodLister
}

+func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ return &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ obj, err := lw.List(options)
+ if err != nil {
+ return obj, err
+ }
+
+ list := obj.(*apiv1.PodList)
+ for i := range list.Items {
+ updatePodRequestsFromNeonVMAnnotation(&list.Items[i])
+ }
+ return obj, nil
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ iface, err := lw.Watch(options)
+ if err != nil {
+ return iface, err
+ }
+
+ // Wrap the channel to update the pods as they come through
+ wrappedEvents := make(chan watch.Event)
+ proxyIface := watch.NewProxyWatcher(wrappedEvents)
+
+ go func() {
+ events := iface.ResultChan()
+
+ for {
+ var ok bool
+ var ev watch.Event
+
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case ev, ok = <-events:
+ if !ok {
+ close(wrappedEvents)
+ return
+ }
+ }
+
+ // Quoting the docs on watch.Event.Object:
+ //
+ // > Object is:
+ // > * If Type is Added or Modified: the new state of the object
+ // > * If type is Deleted: the state of the object immediately before deletion.
+ // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field
+ // > is set.
+ // > * If Type is Error: *api.Status is recommended; other types may make sense
+ // > depending on context.
+ //
+ // So basically, we want to process the object only if ev.Type is Added,
+ // Modified, or Deleted.
+ if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted {
+ pod := ev.Object.(*apiv1.Pod)
+ updatePodRequestsFromNeonVMAnnotation(pod)
+ }
+
+ // Pass along the maybe-updated event
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case wrappedEvents <- ev:
+ // continue on to next event
+ }
+ }
+ }()
+
+ return proxyIface, nil
+ },
+ DisableChunking: lw.DisableChunking,
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
+}
+
// NodeLister lists nodes.
type NodeLister interface {
List() ([]*apiv1.Node, error)
// List returns all scheduled pods.
func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
var pods []*apiv1.Pod
@@ -231,7 +257,10 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
}
for _, p := range allPods {
if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed {
- pods = append(pods, p)
+ // We need to make a copy of the pod to avoid modifying the original pod, since *p is a pointer to the object in the informer cache.
+ podCopy := p.DeepCopy()
+ updatePodRequestsFromNeonVMAnnotation(podCopy)
+ pods = append(pods, podCopy)
}
}
return pods, nil

0 comments on commit 89bd60d

Please sign in to comment.