diff --git a/cluster-autoscaler/Dockerfile b/cluster-autoscaler/Dockerfile index 29fd874bc..0530a1ce8 100644 --- a/cluster-autoscaler/Dockerfile +++ b/cluster-autoscaler/Dockerfile @@ -1,6 +1,6 @@ # NOTE: This must match CA's builder/Dockerfile: # https://github.com/kubernetes/autoscaler/blob//builder/Dockerfile -FROM golang:1.20.12 AS builder +FROM golang:1.21.6 AS builder WORKDIR /workspace diff --git a/cluster-autoscaler/ca.branch b/cluster-autoscaler/ca.branch index 52404bdd7..0a98806c6 100644 --- a/cluster-autoscaler/ca.branch +++ b/cluster-autoscaler/ca.branch @@ -1 +1 @@ -cluster-autoscaler-release-1.28 +cluster-autoscaler-release-1.29 diff --git a/cluster-autoscaler/ca.commit b/cluster-autoscaler/ca.commit index 76f5aef45..f12c734f0 100644 --- a/cluster-autoscaler/ca.commit +++ b/cluster-autoscaler/ca.commit @@ -1 +1 @@ -10a229ac17ea8049248d1c3ce2923b94a4f9085c +d4bbc686ac02a77a6ad1362fe7bbda387e8f074a diff --git a/cluster-autoscaler/ca.patch b/cluster-autoscaler/ca.patch index 77408ad44..f18670005 100644 --- a/cluster-autoscaler/ca.patch +++ b/cluster-autoscaler/ca.patch @@ -1,37 +1,22 @@ diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go -index d0033550f..fa3c2ec30 100644 +index b9be94b6e..df9dc08a9 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go -@@ -17,14 +17,19 @@ limitations under the License. +@@ -17,10 +17,12 @@ limitations under the License. package kubernetes import ( + "encoding/json" + "time" + + apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/resource" -+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -+ "k8s.io/apimachinery/pkg/runtime" -+ "k8s.io/apimachinery/pkg/watch" - client "k8s.io/client-go/kubernetes" - v1appslister "k8s.io/client-go/listers/apps/v1" - v1batchlister "k8s.io/client-go/listers/batch/v1" -@@ -185,6 +190,7 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace - selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + - string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector) -+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) - podLister := v1lister.NewPodLister(store) - go reflector.Run(stopchannel) -@@ -209,6 +215,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc - selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + - string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector) -+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) - podLister := v1lister.NewPodLister(store) - go reflector.Run(stopchannel) -@@ -218,6 +225,105 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc - } + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" +@@ -46,6 +48,14 @@ type ListerRegistry interface { + StatefulSetLister() v1appslister.StatefulSetLister } +// copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go. @@ -42,97 +27,43 @@ index d0033550f..fa3c2ec30 100644 + Memory resource.Quantity `json:"memory"` +} + -+func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch { -+ updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) { -+ annotation, ok := pod.Annotations["vm.neon.tech/usage"] -+ if !ok { -+ return -+ } -+ -+ var usage virtualMachineUsage -+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil { -+ return -+ } -+ -+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{ -+ apiv1.ResourceCPU: usage.CPU, -+ apiv1.ResourceMemory: usage.Memory, -+ }) + type listerRegistryImpl struct { + allNodeLister NodeLister + readyNodeLister NodeLister +@@ -221,6 +231,22 @@ type AllPodLister struct { + podLister v1lister.PodLister + } + ++func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) { ++ annotation, ok := pod.Annotations["vm.neon.tech/usage"] ++ if !ok { ++ return + } + -+ return &cache.ListWatch{ -+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { -+ obj, err := lw.List(options) -+ if err != nil { -+ return obj, err -+ } -+ -+ list := obj.(*apiv1.PodList) -+ for i := range list.Items { -+ updatePodRequestsFromNeonVMAnnotation(&list.Items[i]) -+ } -+ return obj, nil -+ }, -+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { -+ iface, err := lw.Watch(options) -+ if err != nil { -+ return iface, err -+ } -+ -+ // Wrap the channel to update the pods as they come through -+ wrappedEvents := make(chan watch.Event) -+ proxyIface := watch.NewProxyWatcher(wrappedEvents) -+ -+ go func() { -+ events := iface.ResultChan() -+ -+ for { -+ var ok bool -+ var ev watch.Event -+ -+ select { -+ case <-proxyIface.StopChan(): -+ return -+ case ev, ok = <-events: -+ if !ok { -+ close(wrappedEvents) -+ return -+ } -+ } -+ -+ // Quoting the docs on watch.Event.Object: -+ // -+ // > Object is: -+ // > * If Type is Added or Modified: the new state of the object -+ // > * If type is Deleted: the state of the object immediately before deletion. -+ // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field -+ // > is set. -+ // > * If Type is Error: *api.Status is recommended; other types may make sense -+ // > depending on context. -+ // -+ // So basically, we want to process the object only if ev.Type is Added, -+ // Modified, or Deleted. -+ if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted { -+ pod := ev.Object.(*apiv1.Pod) -+ updatePodRequestsFromNeonVMAnnotation(pod) -+ } -+ -+ // Pass along the maybe-updated event -+ select { -+ case <-proxyIface.StopChan(): -+ return -+ case wrappedEvents <- ev: -+ // continue on to next event -+ } -+ } -+ }() -+ -+ return proxyIface, nil -+ }, -+ DisableChunking: lw.DisableChunking, ++ var usage virtualMachineUsage ++ if err := json.Unmarshal([]byte(annotation), &usage); err != nil { ++ return + } ++ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{ ++ apiv1.ResourceCPU: usage.CPU, ++ apiv1.ResourceMemory: usage.Memory, ++ }) +} + - // NodeLister lists nodes. - type NodeLister interface { - List() ([]*apiv1.Node, error) + // List returns all scheduled pods. + func (lister *AllPodLister) List() ([]*apiv1.Pod, error) { + var pods []*apiv1.Pod +@@ -229,9 +255,12 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) { + if err != nil { + return pods, err + } ++ + for _, p := range allPods { + if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed { +- pods = append(pods, p) ++ podCopy := p.DeepCopy() ++ updatePodRequestsFromNeonVMAnnotation(podCopy) ++ pods = append(pods, podCopy) + } + } + return pods, nil