diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml deleted file mode 100644 index 5aca0a4bfc..0000000000 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2021 The Knative Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: config-kafka-descheduler - namespace: knative-eventing - labels: - app.kubernetes.io/version: devel -data: - predicates: |+ - [] - priorities: |+ - [ - {"Name": "RemoveWithEvenPodSpreadPriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithAvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithHighestOrdinalPriority", - "Weight": 2} - ] diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml deleted file mode 100644 index 13f0cc6bef..0000000000 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright 2021 The Knative Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: config-kafka-scheduler - namespace: knative-eventing - labels: - app.kubernetes.io/version: devel -data: - predicates: |+ - [ - {"Name": "PodFitsResources"}, - {"Name": "NoMaxResourceCount", - "Args": "{\"NumPartitions\": 100}"} - ] - priorities: |+ - [ - {"Name": "AvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "LowestOrdinalPriority", - "Weight": 2}, - {"Name": "EvenPodSpread", - "Weight": 2, - "Args": "{\"MaxSkew\": 2}"} - ] diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 8b653cae7e..48572bb825 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -18,7 +18,6 @@ package consumergroup import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -26,7 +25,6 @@ import ( v1 "k8s.io/client-go/informers/core/v1" "github.com/kelseyhightower/envconfig" - "go.uber.org/multierr" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -45,7 +43,6 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" - nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" "knative.dev/pkg/configmap" @@ -93,11 +90,9 @@ type envConfig struct { } type SchedulerConfig struct { - StatefulSetName string - RefreshPeriod time.Duration - Capacity int32 - SchedulerPolicy *scheduler.SchedulerPolicy - DeSchedulerPolicy *scheduler.SchedulerPolicy + StatefulSetName string + RefreshPeriod time.Duration + Capacity int32 } func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl { @@ -109,10 +104,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I } c := SchedulerConfig{ - RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, - Capacity: env.PodCapacity, - SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap), - DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap), + RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, + Capacity: env.PodCapacity, } dispatcherPodInformer := podinformer.Get(ctx, internalsapi.DispatcherLabelSelectorStr) @@ -332,11 +325,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string, return createStatefulSetScheduler( ctx, SchedulerConfig{ - StatefulSetName: ssName, - RefreshPeriod: c.RefreshPeriod, - Capacity: c.Capacity, - SchedulerPolicy: c.SchedulerPolicy, - DeSchedulerPolicy: c.DeSchedulerPolicy, + StatefulSetName: ssName, + RefreshPeriod: c.RefreshPeriod, + Capacity: c.Capacity, }, func() ([]scheduler.VPod, error) { consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName))) @@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod}, PodCapacity: c.Capacity, RefreshPeriod: c.RefreshPeriod, - SchedulerPolicy: scheduler.MAXFILLUP, - SchedPolicy: c.SchedulerPolicy, - DeschedPolicy: c.DeSchedulerPolicy, Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict, VPodLister: lister, - NodeLister: nodeinformer.Get(ctx).Lister(), PodLister: dispatcherPodInformer.Lister().Pods(system.Namespace()), }) @@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s SchedulerConfig: c, } } - -// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap -func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy { - p, err := schedulerPolicyFromConfigMap(ctx, configMapName) - if err != nil { - logging.FromContext(ctx).Fatal(zap.Error(err)) - } - return p -} - -// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap -func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) { - policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err) - } - - logger := logging.FromContext(ctx). - Desugar(). - With(zap.String("configmap", configMapName)) - policy := &scheduler.SchedulerPolicy{} - - preds, found := policyConfigMap.Data["predicates"] - if !found { - return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName) - } - if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil { - return nil, fmt.Errorf("invalid policy %v: %v", preds, err) - } - - priors, found := policyConfigMap.Data["priorities"] - if !found { - return nil, fmt.Errorf("missing policy config map value at key priorities") - } - if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil { - return nil, fmt.Errorf("invalid policy %v: %v", preds, err) - } - - if errs := validatePolicy(policy); errs != nil { - return nil, multierr.Combine(err) - } - - logger.Info("Schedulers policy registration", zap.Any("policy", policy)) - - return policy, nil -} - -func validatePolicy(policy *scheduler.SchedulerPolicy) []error { - var validationErrors []error - - for _, priority := range policy.Priorities { - if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight { - validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name)) - } - } - return validationErrors -} diff --git a/go.mod b/go.mod index c311ca43fb..a0eb88a4a9 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( k8s.io/apiserver v0.30.3 k8s.io/client-go v0.30.3 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 - knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7 + knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358 knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60 knative.dev/pkg v0.0.0-20241026180704-25f6002b00f3 knative.dev/reconciler-test v0.0.0-20241024141702-aae114c1c0e3 diff --git a/go.sum b/go.sum index 108de875c1..fa1609da46 100644 --- a/go.sum +++ b/go.sum @@ -1214,8 +1214,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7 h1:pYKhXbvHVOmQumyKS7vjQBaB11rXzeAjz84z2L9qrtM= -knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7/go.mod h1:2mdt9J66vQYzxizDz8I/F6IGzV1QgwCkacBR8X12Ssk= +knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358 h1:RwoyL/OsdC2xzWARfLRKWBAkqG64F1CMNHAIbWHTBAI= +knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358/go.mod h1:2mdt9J66vQYzxizDz8I/F6IGzV1QgwCkacBR8X12Ssk= knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60 h1:LjBbosBvW/9/qjzIJtGpehPsbNWVvy1Fz8yZvMbFWe4= knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY= knative.dev/pkg v0.0.0-20241026180704-25f6002b00f3 h1:uUSDGlOIkdPT4svjlhi+JEnP2Ufw7AM/F5QDYiEL02U= diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 108e40d857..44f6da35bf 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -3,7 +3,7 @@ source $(dirname "$0")/e2e-common.sh if ! ${SKIP_INITIALIZE}; then - initialize "$@" --num-nodes=4 + initialize "$@" --num-nodes=5 save_release_artifacts || fail_test "Failed to save release artifacts" fi diff --git a/test/e2e/sacura_test.go b/test/e2e/sacura_test.go index 691fa42d40..0c93a62fc7 100644 --- a/test/e2e/sacura_test.go +++ b/test/e2e/sacura_test.go @@ -21,6 +21,7 @@ package e2e import ( "context" + "encoding/json" "fmt" "strings" "testing" @@ -31,17 +32,22 @@ import ( batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" "k8s.io/utils/pointer" - sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + eventing "knative.dev/eventing/pkg/apis/eventing/v1" messaging "knative.dev/eventing/pkg/apis/messaging/v1" eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" testlib "knative.dev/eventing/test/lib" + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internalskafkaeventing/v1alpha1" + sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + pkgtest "knative.dev/eventing-kafka-broker/test/pkg" kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka" pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" @@ -64,6 +70,8 @@ type SacuraTestConfig struct { // Namespace is the test namespace. Namespace string + ConsumerResourceGVR schema.GroupVersionResource + // BrokerTopic is the expected Broker topic. // It's used to verify the committed offset. BrokerTopic *string @@ -79,15 +87,17 @@ type SacuraTestConfig struct { func TestSacuraSinkSourceJob(t *testing.T) { runSacuraTest(t, SacuraTestConfig{ - Namespace: "sacura-sink-source", - SourceTopic: pointer.String("sacura-sink-source-topic"), + Namespace: "sacura-sink-source", + ConsumerResourceGVR: sources.SchemeGroupVersion.WithResource("kafkasources"), + SourceTopic: pointer.String("sacura-sink-source-topic"), }) } func TestSacuraBrokerJob(t *testing.T) { runSacuraTest(t, SacuraTestConfig{ - Namespace: "sacura", - BrokerTopic: pointer.String("knative-broker-sacura-sink-source-broker"), + Namespace: "sacura", + ConsumerResourceGVR: eventing.SchemeGroupVersion.WithResource("triggers"), + BrokerTopic: pointer.String("knative-broker-sacura-sink-source-broker"), }) } @@ -98,6 +108,15 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) { ctx := context.Background() + watchUserFacingResource := watchResource(t, ctx, c.Dynamic, config.Namespace, config.ConsumerResourceGVR) + t.Cleanup(watchUserFacingResource.Stop) + + watchConsumerGroups := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumergroups")) + t.Cleanup(watchConsumerGroups.Stop) + + watchConsumer := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumers")) + t.Cleanup(watchConsumer.Stop) + jobPollError := wait.Poll(pollInterval, pollTimeout, func() (done bool, err error) { job, err := c.Kube.BatchV1().Jobs(config.Namespace).Get(ctx, app, metav1.GetOptions{}) assert.Nil(t, err) @@ -195,3 +214,22 @@ func getKafkaSubscriptionConsumerGroup(ctx context.Context, c dynamic.Interface, return fmt.Sprintf("kafka.%s.%s.%s", c, sacuraChannelName, string(sub.UID)) } } + +func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface, ns string, gvr schema.GroupVersionResource) watch.Interface { + + w, err := dynamic.Resource(gvr). + Namespace(ns). + Watch(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal("Failed to watch resource", gvr, err) + } + + go func() { + for e := range w.ResultChan() { + bytes, _ := json.MarshalIndent(e, "", " ") + t.Logf("Resource %q changed:\n%s\n\n", gvr.String(), string(bytes)) + } + }() + + return w +} diff --git a/test/keda-reconciler-tests.sh b/test/keda-reconciler-tests.sh index 835ef42f84..5ca0b89766 100755 --- a/test/keda-reconciler-tests.sh +++ b/test/keda-reconciler-tests.sh @@ -18,7 +18,7 @@ source $(dirname "$0")/e2e-common.sh export BROKER_TEMPLATES=./templates/kafka-broker if ! ${SKIP_INITIALIZE}; then - initialize "$@" --num-nodes=4 + initialize "$@" --num-nodes=5 save_release_artifacts || fail_test "Failed to save release artifacts" fi diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index e1fc6050e4..6c345c997c 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -4,7 +4,7 @@ source $(dirname "$0")/e2e-common.sh export BROKER_TEMPLATES=./templates/kafka-broker if ! ${SKIP_INITIALIZE}; then - initialize "$@" --num-nodes=4 + initialize "$@" --num-nodes=5 save_release_artifacts || fail_test "Failed to save release artifacts" fi diff --git a/test/upgrade-tests.sh b/test/upgrade-tests.sh index ead5337b4f..70f29de33b 100755 --- a/test/upgrade-tests.sh +++ b/test/upgrade-tests.sh @@ -31,7 +31,7 @@ function test_setup() { } if ! ${SKIP_INITIALIZE}; then - initialize "$@" + initialize "$@" --num-nodes=5 save_release_artifacts || fail_test "Failed to save release artifacts" fi diff --git a/vendor/knative.dev/eventing/pkg/scheduler/README.md b/vendor/knative.dev/eventing/pkg/scheduler/README.md index 08543f4753..a40828a3ee 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/README.md +++ b/vendor/knative.dev/eventing/pkg/scheduler/README.md @@ -1,147 +1,72 @@ # Knative Eventing Multi-Tenant Scheduler with High-Availability -An eventing source instance (for example, [KafkaSource](https://github.com/knative-extensions/eventing-kafka/tree/main/pkg/source), [RedisStreamSource](https://github.com/knative-extensions/eventing-redis/tree/main/source), etc) gets materialized as a virtual pod (**vpod**) and can be scaled up and down by increasing or decreasing the number of virtual pod replicas (**vreplicas**). A vreplica corresponds to a resource in the source that can replicated for maximum distributed processing (for example, number of consumers running in a consumer group). +An eventing source instance (for example, KafkaSource, etc) gets materialized as a virtual pod (* +*vpod**) and can be scaled up and down by increasing or decreasing the number of virtual pod +replicas (**vreplicas**). A vreplica corresponds to a resource in the source that can replicated for +maximum distributed processing (for example, number of consumers running in a consumer group). -The vpod multi-tenant [scheduler](#1scheduler) is responsible for placing vreplicas onto real Kubernetes pods. Each pod is limited in capacity and can hold a maximum number of vreplicas. The scheduler takes a list of (source, # of vreplicas) tuples and computes a set of Placements. Placement info are added to the source status. +The vpod multi-tenant [scheduler](#scheduler) is responsible for placing vreplicas onto real +Kubernetes pods. Each pod is limited in capacity and can hold a maximum number of vreplicas. The +scheduler takes a list of (source, # of vreplicas) tuples and computes a set of Placements. +Placement info are added to the source status. -Scheduling strategies rely on pods having a sticky identity (StatefulSet replicas) and the current [State](#4state-collector) of the cluster. - -When a vreplica cannot be scheduled it is added to the list of pending vreplicas. The [Autoscaler](#3autoscaler) monitors this list and allocates more pods for placing it. - -To support high-availability the scheduler distributes vreplicas uniformly across failure domains such as zones/nodes/pods containing replicas from a StatefulSet. - -## General Scheduler Requirements - -1. High Availability: Vreplicas for a source must be evenly spread across domains to reduce impact of failure when a zone/node/pod goes unavailable for scheduling.* - -2. Equal event consumption: Vreplicas for a source must be evenly spread across adapter pods to provide an equal rate of processing events. For example, Kafka broker spreads partitions equally across pods so if vreplicas aren’t equally spread, pods with fewer vreplicas will consume events slower than others. - -3. Pod spread not more than available resources: Vreplicas for a source must be evenly spread across pods such that the total number of pods with placements does not exceed the number of resources available from the source (for example, number of Kafka partitions for the topic it's consuming from). Else, the additional pods have no resources (Kafka partitions) to consume events from and could waste Kubernetes resources. - -* Note: StatefulSet anti-affinity rules guarantee new pods to be scheduled on a new zone and node. +Scheduling strategies rely on pods having a sticky identity (StatefulSet replicas) and the +current [State](#state-collector) of the cluster. ## Components: -### 1.Scheduler -The scheduling framework has a pluggable architecture where plugins are registered and compiled into the scheduler. It allows many scheduling features to be implemented as plugins, while keeping the scheduling "core" simple and maintainable. - -Scheduling happens in a series of stages: - - 1. **Filter**: These plugins (predicates) are used to filter out pods where a vreplica cannot be placed. If any filter plugin marks the pod as infeasible, the remaining plugins will not be called for that pod. A vreplica is marked as unschedulable if no pods pass all the filters. - - 2. **Score**: These plugins (priorities) provide a score to each pod that has passed the filtering phase. Scheduler will then select the pod with the highest weighted scores sum. - -Scheduler must be Knative generic with its core functionality implemented as core plugins. Anything specific to an eventing source will be implemented as separate plugins (for example, number of Kafka partitions) - -It allocates one vreplica at a time by filtering and scoring schedulable pods. - -A vreplica can be unschedulable for several reasons such as pods not having enough capacity, constraints cannot be fulfilled, etc. - -### 2.Descheduler - -Similar to scheduler but has its own set of priorities (no predicates today). - -### 3.Autoscaler - -The autoscaler scales up pod replicas of the statefulset adapter when there are vreplicas pending to be scheduled, and scales down if there are unused pods. It takes into consideration a scaling factor that is based on number of domains for HA. - -### 4.State Collector - -Current state information about the cluster is collected after placing each vreplica and during intervals. Cluster information include computing the free capacity for each pod, list of schedulable pods (unschedulable pods are pods that are marked for eviction for compacting, and pods that are on unschedulable nodes (cordoned or unreachable nodes), number of pods (stateful set replicas), number of available nodes, number of zones, a node to zone map, total number of vreplicas in each pod for each vpod (spread), total number of vreplicas in each node for each vpod (spread), total number of vreplicas in each zone for each vpod (spread), etc. - -### 5.Reservation - -Scheduler also tracks vreplicas that have been placed (ie. scheduled) but haven't been committed yet to its vpod status. These reserved veplicas are taken into consideration when computing cluster's state for scheduling the next vreplica. - -### 6.Evictor - -Autoscaler periodically attempts to compact veplicas into a smaller number of free replicas with lower ordinals. Vreplicas placed on higher ordinal pods are evicted and rescheduled to pods with a lower ordinal using the same scheduling strategies. - -## Scheduler Profile - -### Predicates: - -1. **PodFitsResources**: check if a pod has enough capacity [CORE] - -2. **NoMaxResourceCount**: check if total number of placement pods exceed available resources [KAFKA]. It has an argument `NumPartitions` to configure the plugin with the total number of Kafka partitions. - -3. **EvenPodSpread**: check if resources are evenly spread across pods [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. +### Scheduler -### Priorities: +The scheduler allocates as many as vreplicas as possible into the lowest possible StatefulSet +ordinal +number before triggering the autoscaler when no more capacity is left to schedule vpods. -1. **AvailabilityNodePriority**: make sure resources are evenly spread across nodes [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. +### Autoscaler -2. **AvailabilityZonePriority**: make sure resources are evenly spread across zones [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. +The autoscaler scales up pod replicas of the statefulset adapter when there are vreplicas pending to +be scheduled, and scales down if there are unused pods. -3. **LowestOrdinalPriority**: make sure vreplicas are placed on free smaller ordinal pods to minimize resource usage [CORE] +### State Collector -**Example ConfigMap for config-scheduler:** +Current state information about the cluster is collected after placing each vreplica and during +intervals. Cluster information include computing the free capacity for each pod, list of schedulable +pods (unschedulable pods are pods that are marked for eviction for compacting, number of pods ( +stateful set replicas), total number of vreplicas in each pod for each vpod (spread). -``` -data: - predicates: |+ - [ - {"Name": "PodFitsResources"}, - {"Name": "NoMaxResourceCount", - "Args": "{\"NumPartitions\": 100}"}, - {"Name": "EvenPodSpread", - "Args": "{\"MaxSkew\": 2}"} - ] - priorities: |+ - [ - {"Name": "AvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "LowestOrdinalPriority", - "Weight": 2} - ] -``` +### Evictor -## Descheduler Profile: - -### Priorities: - -1. **RemoveWithAvailabilityNodePriority**: make sure resources are evenly spread across nodes [CORE] - -2. **RemoveWithAvailabilityZonePriority**: make sure resources are evenly spread across zones [CORE] - -3. **HighestOrdinalPriority**: make sure vreps are removed from higher ordinal pods to minimize resource usage [CORE] - -**Example ConfigMap for config-descheduler:** - -``` -data: - priorities: |+ - [ - {"Name": "RemoveWithEvenPodSpreadPriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithAvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithHighestOrdinalPriority", - "Weight": 2} - ] -``` +Autoscaler periodically attempts to compact veplicas into a smaller number of free replicas with +lower ordinals. Vreplicas placed on higher ordinal pods are evicted and rescheduled to pods with a +lower ordinal using the same scheduling strategies. ## Normal Operation 1. **Busy scheduler**: -Scheduler can be very busy allocating the best placements for multiple eventing sources at a time using the scheduler predicates and priorities configured. During this time, the cluster could see statefulset replicas increasing, as the autoscaler computes how many more pods are needed to complete scheduling successfully. Also, the replicas could be decreasing during idle time, either caused by less events flowing through the system, or the evictor compacting vreplicas placements into a smaller number of pods or the deletion of event sources. The current placements are stored in the eventing source's status field for observability. +Scheduler can be very busy allocating the best placements for multiple eventing sources at a time +using the scheduler predicates and priorities configured. During this time, the cluster could see +statefulset replicas increasing, as the autoscaler computes how many more pods are needed to +complete scheduling successfully. Also, the replicas could be decreasing during idle time, either +caused by less events flowing through the system, or the evictor compacting vreplicas placements +into a smaller number of pods or the deletion of event sources. The current placements are stored in +the eventing source's status field for observability. 2. **Software upgrades**: -We can expect periodic software version upgrades or fixes to be performed on the Kubernetes cluster running the scheduler or on the Knative framework installed. Either of these scenarios could involve graceful rebooting of nodes and/or reapplying of controllers, adapters and other resources. - -All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. -(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) +We can expect periodic software version upgrades or fixes to be performed on the Kubernetes cluster +running the scheduler or on the Knative framework installed. Either of these scenarios could involve +graceful rebooting of nodes and/or reapplying of controllers, adapters and other resources. -TODO: Measure latencies in events processing using a performance tool (KPerf eventing). +All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica +scheduler. +(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member +changes.) 3. **No more cluster resources**: -When there are no resources available on existing nodes in the cluster to schedule more pods and the autoscaler continues to scale up replicas, the new pods are left in a Pending state till cluster size is increased. Nothing to do for the scheduler until then. +When there are no resources available on existing nodes in the cluster to schedule more pods and the +autoscaler continues to scale up replicas, the new pods are left in a Pending state till cluster +size is increased. Nothing to do for the scheduler until then. ## Disaster Recovery @@ -149,91 +74,14 @@ Some failure scenarios are described below: 1. **Pod failure**: -When a pod/replica in a StatefulSet goes down due to some reason (but its node and zone are healthy), a new replica is spun up by the StatefulSet with the same pod identity (pod can come up on a different node) almost immediately. - -All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. -(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) - -TODO: Measure latencies in events processing using a performance tool (KPerf eventing). - -2. **Node failure (graceful)**: - -When a node is rebooted for upgrades etc, running pods on the node will be evicted (drained), gracefully terminated and rescheduled on a different node. The drained node will be marked as unschedulable by K8 (`node.Spec.Unschedulable` = True) after its cordoning. - -``` -k describe node knative-worker4 -Name: knative-worker4 -CreationTimestamp: Mon, 30 Aug 2021 11:13:11 -0400 -Taints: none -Unschedulable: true -``` - -All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. -(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) - -TODO: Measure latencies in events processing using a performance tool (KPerf eventing). +When a pod/replica in a StatefulSet goes down due to some reason (but its node and zone are +healthy), a new replica is spun up by the StatefulSet with the same pod identity (pod can come up on +a different node) almost immediately. -New vreplicas will not be scheduled on pods running on this cordoned node. - -3. **Node failure (abrupt)**: - -When a node goes down unexpectedly due to some physical machine failure (network isolation/ loss, CPU issue, power loss, etc), the node controller does the following few steps - -Pods running on the failed node receives a NodeNotReady Warning event - -``` -k describe pod kafkasource-mt-adapter-5 -n knative-eventing -Name: kafkasource-mt-adapter-5 -Namespace: knative-eventing -Priority: 0 -Node: knative-worker4/172.18.0.3 -Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s node.kubernetes.io/unreachable:NoExecute op=Exists for 300s - -Events: - Type Reason Age From Message - ---- ------ ---- ---- ------- - Normal Scheduled 11m default-scheduler Successfully assigned knative-eventing/kafkasource-mt-adapter-5 to knative-worker4 - Normal Pulled 11m kubelet Container image - Normal Created 11m kubelet Created container receive-adapter - Normal Started 11m kubelet Started container receive-adapter - Warning NodeNotReady 3m48s node-controller Node is not ready -``` - -Failing node is tainted with the following Key:Condition: by the node controller if the node controller has not heard from the node in the last node-monitor-grace-period (default is 40 seconds) - -``` -k describe node knative-worker4 -Name: knative-worker4 -Taints: node.kubernetes.io/unreachable:NoExecute - node.kubernetes.io/unreachable:NoSchedule -Unschedulable: false - Events: - Type Reason Age From Message - ---- ------ ---- ---- ------- - Normal NodeNotSchedulable 5m42s kubelet Node knative-worker4 status is now: NodeNotSchedulable - Normal NodeSchedulable 2m31s kubelet Node knative-worker4 status is now: NodeSchedulable -``` - -``` -k get nodes -NAME STATUS ROLES AGE VERSION -knative-control-plane Ready control-plane,master 7h23m v1.21.1 -knative-worker Ready 7h23m v1.21.1 -knative-worker2 Ready 7h23m v1.21.1 -knative-worker3 Ready 7h23m v1.21.1 -knative-worker4 NotReady 7h23m v1.21.1 -``` - -After a timeout period (`pod-eviction-timeout` == 5 mins (default)), the pods move to the Terminating state. - -Since statefulset now has a `terminationGracePeriodSeconds: 0` setting, the terminating pods are immediately restarted on another functioning Node. A new replica is spun up with the same ordinal. - -During the time period of the failing node being unreachable (~5mins), vreplicas placed on that pod aren’t available to process work from the eventing source. (Theory) Consumption rate goes down and Kafka eventually triggers rebalancing of partitions. Also, KEDA will scale up the number of consumers to resolve the processing lag. A scale up will cause the Eventing scheduler to rebalance the total vreplicas for that source on available running pods. - -4. **Zone failure**: - -All nodes running in the failing zone will be unavailable for scheduling. Nodes will either be tainted with `unreachable` or Spec’ed as `Unschedulable` -See node failure scenarios above for what happens to vreplica placements. +All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica +scheduler. +(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member +changes.) ## References: @@ -246,7 +94,6 @@ See node failure scenarios above for what happens to vreplica placements. * https://medium.com/tailwinds-navigator/kubernetes-tip-how-statefulsets-behave-differently-than-deployments-when-node-fails-d29e36bca7d5 * https://kubernetes.io/docs/concepts/architecture/nodes/#node-controller - --- To learn more about Knative, please visit the diff --git a/vendor/knative.dev/eventing/pkg/scheduler/doc.go b/vendor/knative.dev/eventing/pkg/scheduler/doc.go index b66262a4be..13cf683a17 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/doc.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// The scheduler is responsible for placing virtual pod (VPod) replicas within real pods. +// Package scheduler is responsible for placing virtual pod (VPod) replicas within real pods. package scheduler diff --git a/vendor/knative.dev/eventing/pkg/scheduler/factory/registry.go b/vendor/knative.dev/eventing/pkg/scheduler/factory/registry.go deleted file mode 100644 index dbc814055c..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/factory/registry.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package factory - -import ( - "fmt" - - state "knative.dev/eventing/pkg/scheduler/state" -) - -// RegistryFP is a collection of all available filter plugins. -type RegistryFP map[string]state.FilterPlugin - -// RegistrySP is a collection of all available scoring plugins. -type RegistrySP map[string]state.ScorePlugin - -var ( - FilterRegistry = make(RegistryFP) - ScoreRegistry = make(RegistrySP) -) - -// Register adds a new plugin to the registry. If a plugin with the same name -// exists, it returns an error. -func RegisterFP(name string, factory state.FilterPlugin) error { - if _, ok := FilterRegistry[name]; ok { - return fmt.Errorf("a filter plugin named %v already exists", name) - } - FilterRegistry[name] = factory - return nil -} - -// Unregister removes an existing plugin from the registry. If no plugin with -// the provided name exists, it returns an error. -func UnregisterFP(name string) error { - if _, ok := FilterRegistry[name]; !ok { - return fmt.Errorf("no filter plugin named %v exists", name) - } - delete(FilterRegistry, name) - return nil -} - -func GetFilterPlugin(name string) (state.FilterPlugin, error) { - if f, exist := FilterRegistry[name]; exist { - return f, nil - } - return nil, fmt.Errorf("no fitler plugin named %v exists", name) -} - -// Register adds a new plugin to the registry. If a plugin with the same name -// exists, it returns an error. -func RegisterSP(name string, factory state.ScorePlugin) error { - if _, ok := ScoreRegistry[name]; ok { - return fmt.Errorf("a score plugin named %v already exists", name) - } - ScoreRegistry[name] = factory - return nil -} - -// Unregister removes an existing plugin from the registry. If no plugin with -// the provided name exists, it returns an error. -func UnregisterSP(name string) error { - if _, ok := ScoreRegistry[name]; !ok { - return fmt.Errorf("no score plugin named %v exists", name) - } - delete(ScoreRegistry, name) - return nil -} - -func GetScorePlugin(name string) (state.ScorePlugin, error) { - if f, exist := ScoreRegistry[name]; exist { - return f, nil - } - return nil, fmt.Errorf("no score plugin named %v exists", name) -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/placement.go b/vendor/knative.dev/eventing/pkg/scheduler/placement.go index 3625032354..65ab7897f0 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/placement.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/placement.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "k8s.io/apimachinery/pkg/util/sets" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" ) @@ -29,24 +28,3 @@ func GetTotalVReplicas(placements []duckv1alpha1.Placement) int32 { } return r } - -// GetPlacementForPod returns the placement corresponding to podName -func GetPlacementForPod(placements []duckv1alpha1.Placement, podName string) *duckv1alpha1.Placement { - for i := 0; i < len(placements); i++ { - if placements[i].PodName == podName { - return &placements[i] - } - } - return nil -} - -// GetPodCount returns the number of pods with the given placements -func GetPodCount(placements []duckv1alpha1.Placement) int { - set := sets.NewString() - for _, p := range placements { - if p.VReplicas > 0 { - set.Insert(p.PodName) - } - } - return set.Len() -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go deleted file mode 100644 index e0e60c8832..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go +++ /dev/null @@ -1,111 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package availabilitynodepriority - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// AvailabilityNodePriority is a score plugin that favors pods that create an even spread of resources across nodes for HA -type AvailabilityNodePriority struct { -} - -// Verify AvailabilityNodePriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &AvailabilityNodePriority{} - -// Name of the plugin -const Name = state.AvailabilityNodePriority - -const ( - ErrReasonInvalidArg = "invalid arguments" - ErrReasonNoResource = "node does not exist" -) - -func init() { - factory.RegisterSP(Name, &AvailabilityNodePriority{}) -} - -// Name returns name of the plugin -func (pl *AvailabilityNodePriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for nodes that create an even spread across nodes. -func (pl *AvailabilityNodePriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.AvailabilityNodePriorityArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - var skew int32 - - _, nodeName, err := states.GetPodInfo(state.PodNameFromOrdinal(states.StatefulSetName, podID)) - if err != nil { - return score, state.NewStatus(state.Error, ErrReasonNoResource) - } - - currentReps := states.NodeSpread[key][nodeName] //get #vreps on this node - for otherNodeName := range states.NodeToZoneMap { //compare with #vreps on other nodes - if otherNodeName != nodeName { - otherReps := states.NodeSpread[key][otherNodeName] - if skew = (currentReps + 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Node %v with %d and Other Node %v with %d causing skew %d", nodeName, currentReps, otherNodeName, otherReps, skew) - if skew > skewVal.MaxSkew { - logger.Infof("Pod %d in node %v will cause an uneven node spread %v with other node %v", podID, nodeName, states.NodeSpread[key], otherNodeName) - } - score = score + uint64(skew) - } - } - - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *AvailabilityNodePriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *AvailabilityNodePriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go deleted file mode 100644 index 397ff075fb..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go +++ /dev/null @@ -1,115 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package availabilityzonepriority - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// AvailabilityZonePriority is a score plugin that favors pods that create an even spread of resources across zones for HA -type AvailabilityZonePriority struct { -} - -// Verify AvailabilityZonePriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &AvailabilityZonePriority{} - -// Name of the plugin -const Name = state.AvailabilityZonePriority - -const ( - ErrReasonInvalidArg = "invalid arguments" - ErrReasonNoResource = "zone does not exist" -) - -func init() { - factory.RegisterSP(Name, &AvailabilityZonePriority{}) -} - -// Name returns name of the plugin -func (pl *AvailabilityZonePriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for zones that create an even spread across zones. -func (pl *AvailabilityZonePriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.AvailabilityZonePriorityArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - var skew int32 - zoneMap := make(map[string]struct{}) - for _, zoneName := range states.NodeToZoneMap { - zoneMap[zoneName] = struct{}{} - } - - zoneName, _, err := states.GetPodInfo(state.PodNameFromOrdinal(states.StatefulSetName, podID)) - if err != nil { - return score, state.NewStatus(state.Error, ErrReasonNoResource) - } - - currentReps := states.ZoneSpread[key][zoneName] //get #vreps on this zone - for otherZoneName := range zoneMap { //compare with #vreps on other zones - if otherZoneName != zoneName { - otherReps := states.ZoneSpread[key][otherZoneName] - if skew = (currentReps + 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Zone %v with %d and Other Zone %v with %d causing skew %d", zoneName, currentReps, otherZoneName, otherReps, skew) - if skew > skewVal.MaxSkew { //score low - logger.Infof("Pod %d in zone %v will cause an uneven zone spread %v with other zone %v", podID, zoneName, states.ZoneSpread[key], otherZoneName) - } - score = score + uint64(skew) - } - } - - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *AvailabilityZonePriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *AvailabilityZonePriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/evenpodspread/even_pod_spread.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/evenpodspread/even_pod_spread.go deleted file mode 100644 index 070e47a995..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/evenpodspread/even_pod_spread.go +++ /dev/null @@ -1,151 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package evenpodspread - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// EvenPodSpread is a filter or score plugin that picks/favors pods that create an equal spread of resources across pods -type EvenPodSpread struct { -} - -// Verify EvenPodSpread Implements FilterPlugin and ScorePlugin Interface -var _ state.FilterPlugin = &EvenPodSpread{} -var _ state.ScorePlugin = &EvenPodSpread{} - -// Name of the plugin -const ( - Name = state.EvenPodSpread - ErrReasonInvalidArg = "invalid arguments" - ErrReasonUnschedulable = "pod will cause an uneven spread" -) - -func init() { - factory.RegisterFP(Name, &EvenPodSpread{}) - factory.RegisterSP(Name, &EvenPodSpread{}) -} - -// Name returns name of the plugin -func (pl *EvenPodSpread) Name() string { - return Name -} - -// Filter invoked at the filter extension point. -func (pl *EvenPodSpread) Filter(ctx context.Context, args interface{}, states *state.State, key types.NamespacedName, podID int32) *state.Status { - logger := logging.FromContext(ctx).With("Filter", pl.Name()) - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Filter args %v for predicate %q are not valid", args, pl.Name()) - return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.EvenPodSpreadArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - currentReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, podID)] //get #vreps on this podID - var skew int32 - for _, otherPodID := range states.SchedulablePods { //compare with #vreps on other pods - if otherPodID != podID { - otherReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, otherPodID)] - - if otherReps == 0 && states.Free(otherPodID) <= 0 { //other pod fully occupied by other vpods - so ignore - continue - } - if skew = (currentReps + 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Pod %d with %d and Other Pod %d with %d causing skew %d", podID, currentReps, otherPodID, otherReps, skew) - if skew > skewVal.MaxSkew { - logger.Infof("Unschedulable! Pod %d will cause an uneven spread %v with other pod %v", podID, states.PodSpread[key], otherPodID) - return state.NewStatus(state.Unschedulable, ErrReasonUnschedulable) - } - } - } - } - - return state.NewStatus(state.Success) -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for pods that create an even spread across pods. -func (pl *EvenPodSpread) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.EvenPodSpreadArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - currentReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, podID)] //get #vreps on this podID - var skew int32 - for _, otherPodID := range states.SchedulablePods { //compare with #vreps on other pods - if otherPodID != podID { - otherReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, otherPodID)] - if otherReps == 0 && states.Free(otherPodID) == 0 { //other pod fully occupied by other vpods - so ignore - continue - } - if skew = (currentReps + 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Pod %d with %d and Other Pod %d with %d causing skew %d", podID, currentReps, otherPodID, otherReps, skew) - if skew > skewVal.MaxSkew { - logger.Infof("Pod %d will cause an uneven spread %v with other pod %v", podID, states.PodSpread[key], otherPodID) - } - score = score + uint64(skew) - } - } - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *EvenPodSpread) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *EvenPodSpread) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go deleted file mode 100644 index a7d84ca390..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lowestordinalpriority - -import ( - "context" - "math" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" -) - -// LowestOrdinalPriority is a score plugin that favors pods that have a lower ordinal -type LowestOrdinalPriority struct { -} - -// Verify LowestOrdinalPriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &LowestOrdinalPriority{} - -// Name of the plugin -const Name = state.LowestOrdinalPriority - -func init() { - factory.RegisterSP(Name, &LowestOrdinalPriority{}) -} - -// Name returns name of the plugin -func (pl *LowestOrdinalPriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for pods with lower ordinal values. -func (pl *LowestOrdinalPriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - score := math.MaxUint64 - uint64(podID) //lower ordinals get higher score - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *LowestOrdinalPriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *LowestOrdinalPriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/podfitsresources/pod_fits_resources.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/podfitsresources/pod_fits_resources.go deleted file mode 100644 index a4a751e847..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/podfitsresources/pod_fits_resources.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package podfitsresources - -import ( - "context" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// PodFitsResources is a plugin that filters pods that do not have sufficient free capacity for a vreplica to be placed on it -type PodFitsResources struct { -} - -// Verify PodFitsResources Implements FilterPlugin Interface -var _ state.FilterPlugin = &PodFitsResources{} - -// Name of the plugin -const Name = state.PodFitsResources - -const ( - ErrReasonUnschedulable = "pod at full capacity" -) - -func init() { - factory.RegisterFP(Name, &PodFitsResources{}) -} - -// Name returns name of the plugin -func (pl *PodFitsResources) Name() string { - return Name -} - -// Filter invoked at the filter extension point. -func (pl *PodFitsResources) Filter(ctx context.Context, args interface{}, states *state.State, key types.NamespacedName, podID int32) *state.Status { - logger := logging.FromContext(ctx).With("Filter", pl.Name()) - - if len(states.FreeCap) == 0 || states.Free(podID) > 0 { //vpods with no placements or pods with positive free cap - return state.NewStatus(state.Success) - } - - logger.Infof("Unschedulable! Pod %d has no free capacity %v", podID, states.FreeCap) - return state.NewStatus(state.Unschedulable, ErrReasonUnschedulable) -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilitynodepriority/remove_with_availability_node_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilitynodepriority/remove_with_availability_node_priority.go deleted file mode 100644 index 62959ee79b..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilitynodepriority/remove_with_availability_node_priority.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package removewithavailabilitynodepriority - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// RemoveWithAvailabilityNodePriority is a score plugin that favors pods that create an even spread of resources across nodes for HA -type RemoveWithAvailabilityNodePriority struct { -} - -// Verify RemoveWithAvailabilityNodePriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &RemoveWithAvailabilityNodePriority{} - -// Name of the plugin -const Name = state.RemoveWithAvailabilityNodePriority - -const ( - ErrReasonInvalidArg = "invalid arguments" - ErrReasonNoResource = "node does not exist" -) - -func init() { - factory.RegisterSP(Name, &RemoveWithAvailabilityNodePriority{}) -} - -// Name returns name of the plugin -func (pl *RemoveWithAvailabilityNodePriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for nodes that create an even spread across nodes. -func (pl *RemoveWithAvailabilityNodePriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.AvailabilityNodePriorityArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - var skew int32 - _, nodeName, err := states.GetPodInfo(state.PodNameFromOrdinal(states.StatefulSetName, podID)) - if err != nil { - return score, state.NewStatus(state.Error, ErrReasonNoResource) - } - - currentReps := states.NodeSpread[key][nodeName] //get #vreps on this node - for otherNodeName := range states.NodeToZoneMap { //compare with #vreps on other pods - if otherNodeName != nodeName { - otherReps, ok := states.NodeSpread[key][otherNodeName] - if !ok { - continue //node does not exist in current placement, so move on - } - if skew = (currentReps - 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Node %v with %d and Other Node %v with %d causing skew %d", nodeName, currentReps, otherNodeName, otherReps, skew) - if skew > skewVal.MaxSkew { //score low - logger.Infof("Pod %d in node %v will cause an uneven node spread %v with other node %v", podID, nodeName, states.NodeSpread[key], otherNodeName) - } - score = score + uint64(skew) - } - } - - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *RemoveWithAvailabilityNodePriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *RemoveWithAvailabilityNodePriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilityzonepriority/remove_with_availability_zone_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilityzonepriority/remove_with_availability_zone_priority.go deleted file mode 100644 index f2e3eb23f0..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilityzonepriority/remove_with_availability_zone_priority.go +++ /dev/null @@ -1,118 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package removewithavailabilityzonepriority - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// RemoveWithAvailabilityZonePriority is a score plugin that favors pods that create an even spread of resources across zones for HA -type RemoveWithAvailabilityZonePriority struct { -} - -// Verify RemoveWithAvailabilityZonePriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &RemoveWithAvailabilityZonePriority{} - -// Name of the plugin -const Name = state.RemoveWithAvailabilityZonePriority - -const ( - ErrReasonInvalidArg = "invalid arguments" - ErrReasonNoResource = "zone does not exist" -) - -func init() { - factory.RegisterSP(Name, &RemoveWithAvailabilityZonePriority{}) -} - -// Name returns name of the plugin -func (pl *RemoveWithAvailabilityZonePriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for zones that create an even spread across zones. -func (pl *RemoveWithAvailabilityZonePriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.AvailabilityZonePriorityArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - var skew int32 - zoneMap := make(map[string]struct{}) - for _, zoneName := range states.NodeToZoneMap { - zoneMap[zoneName] = struct{}{} - } - - zoneName, _, err := states.GetPodInfo(state.PodNameFromOrdinal(states.StatefulSetName, podID)) - if err != nil { - return score, state.NewStatus(state.Error, ErrReasonNoResource) - } - - currentReps := states.ZoneSpread[key][zoneName] //get #vreps on this zone - for otherZoneName := range zoneMap { //compare with #vreps on other pods - if otherZoneName != zoneName { - otherReps, ok := states.ZoneSpread[key][otherZoneName] - if !ok { - continue //zone does not exist in current placement, so move on - } - if skew = (currentReps - 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Zone %v with %d and Other Zone %v with %d causing skew %d", zoneName, currentReps, otherZoneName, otherReps, skew) - if skew > skewVal.MaxSkew { //score low - logger.Infof("Pod %d in zone %v will cause an uneven zone spread %v with other zone %v", podID, zoneName, states.ZoneSpread[key], otherZoneName) - } - score = score + uint64(skew) - } - } - - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *RemoveWithAvailabilityZonePriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *RemoveWithAvailabilityZonePriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithevenpodspreadpriority/remove_with_even_pod_spread_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithevenpodspreadpriority/remove_with_even_pod_spread_priority.go deleted file mode 100644 index e7b008e0b0..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithevenpodspreadpriority/remove_with_even_pod_spread_priority.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package removewithevenpodspreadpriority - -import ( - "context" - "encoding/json" - "math" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// RemoveWithEvenPodSpreadPriority is a filter plugin that eliminates pods that do not create an equal spread of resources across pods -type RemoveWithEvenPodSpreadPriority struct { -} - -// Verify RemoveWithEvenPodSpreadPriority Implements FilterPlugin Interface -var _ state.ScorePlugin = &RemoveWithEvenPodSpreadPriority{} - -// Name of the plugin -const ( - Name = state.RemoveWithEvenPodSpreadPriority - ErrReasonInvalidArg = "invalid arguments" - ErrReasonUnschedulable = "pod will cause an uneven spread" -) - -func init() { - factory.RegisterSP(Name, &RemoveWithEvenPodSpreadPriority{}) -} - -// Name returns name of the plugin -func (pl *RemoveWithEvenPodSpreadPriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for pods that create an even spread across pods. -func (pl *RemoveWithEvenPodSpreadPriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - logger := logging.FromContext(ctx).With("Score", pl.Name()) - var score uint64 = 0 - - spreadArgs, ok := args.(string) - if !ok { - logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name()) - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - skewVal := state.EvenPodSpreadArgs{} - decoder := json.NewDecoder(strings.NewReader(spreadArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&skewVal); err != nil { - return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - if states.Replicas > 0 { //need at least a pod to compute spread - currentReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, podID)] //get #vreps on this podID - var skew int32 - for _, otherPodID := range states.SchedulablePods { //compare with #vreps on other pods - if otherPodID != podID { - otherReps, ok := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, otherPodID)] - if !ok { - continue //pod does not exist in current placement, so move on - } - if skew = (currentReps - 1) - otherReps; skew < 0 { - skew = skew * int32(-1) - } - - //logger.Infof("Current Pod %v with %d and Other Pod %v with %d causing skew %d", podID, currentReps, otherPodID, otherReps, skew) - if skew > skewVal.MaxSkew { //score low - logger.Infof("Pod %d will cause an uneven spread %v with other pod %v", podID, states.PodSpread[key], otherPodID) - } - score = score + uint64(skew) - } - } - score = math.MaxUint64 - score //lesser skews get higher score - } - - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *RemoveWithEvenPodSpreadPriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *RemoveWithEvenPodSpreadPriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithhighestordinalpriority/remove_with_highest_ordinal_priority.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithhighestordinalpriority/remove_with_highest_ordinal_priority.go deleted file mode 100644 index 324454f5e8..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/core/removewithhighestordinalpriority/remove_with_highest_ordinal_priority.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package removewithhighestordinalpriority - -import ( - "context" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" -) - -// RemoveWithHighestOrdinalPriority is a score plugin that favors pods that have a higher ordinal -type RemoveWithHighestOrdinalPriority struct { -} - -// Verify RemoveWithHighestOrdinalPriority Implements ScorePlugin Interface -var _ state.ScorePlugin = &RemoveWithHighestOrdinalPriority{} - -// Name of the plugin -const Name = state.RemoveWithHighestOrdinalPriority - -func init() { - factory.RegisterSP(Name, &RemoveWithHighestOrdinalPriority{}) -} - -// Name returns name of the plugin -func (pl *RemoveWithHighestOrdinalPriority) Name() string { - return Name -} - -// Score invoked at the score extension point. The "score" returned in this function is higher for pods with higher ordinal values. -func (pl *RemoveWithHighestOrdinalPriority) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) { - score := uint64(podID) //higher ordinals get higher score - return score, state.NewStatus(state.Success) -} - -// ScoreExtensions of the Score plugin. -func (pl *RemoveWithHighestOrdinalPriority) ScoreExtensions() state.ScoreExtensions { - return pl -} - -// NormalizeScore invoked after scoring all pods. -func (pl *RemoveWithHighestOrdinalPriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { - return nil -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go b/vendor/knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go deleted file mode 100644 index 49975eefb8..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package nomaxresourcecount - -import ( - "context" - "encoding/json" - "strings" - - "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/scheduler/factory" - state "knative.dev/eventing/pkg/scheduler/state" - "knative.dev/pkg/logging" -) - -// NoMaxResourceCount plugin filters pods that cause total pods with placements to exceed total partitioncount. -type NoMaxResourceCount struct { -} - -// Verify NoMaxResourceCount Implements FilterPlugin Interface -var _ state.FilterPlugin = &NoMaxResourceCount{} - -// Name of the plugin -const Name = state.NoMaxResourceCount - -const ( - ErrReasonInvalidArg = "invalid arguments" - ErrReasonUnschedulable = "pod increases total # of pods beyond partition count" -) - -func init() { - factory.RegisterFP(Name, &NoMaxResourceCount{}) -} - -// Name returns name of the plugin -func (pl *NoMaxResourceCount) Name() string { - return Name -} - -// Filter invoked at the filter extension point. -func (pl *NoMaxResourceCount) Filter(ctx context.Context, args interface{}, states *state.State, key types.NamespacedName, podID int32) *state.Status { - logger := logging.FromContext(ctx).With("Filter", pl.Name()) - - resourceCountArgs, ok := args.(string) - if !ok { - logger.Errorf("Filter args %v for predicate %q are not valid", args, pl.Name()) - return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - resVal := state.NoMaxResourceCountArgs{} - decoder := json.NewDecoder(strings.NewReader(resourceCountArgs)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&resVal); err != nil { - return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg) - } - - podName := state.PodNameFromOrdinal(states.StatefulSetName, podID) - if _, ok := states.PodSpread[key][podName]; !ok && ((len(states.PodSpread[key]) + 1) > resVal.NumPartitions) { //pod not in vrep's partition map and counting this new pod towards total pod count - logger.Infof("Unschedulable! Pod %d filtered due to total pod count %v exceeding partition count", podID, len(states.PodSpread[key])+1) - return state.NewStatus(state.Unschedulable, ErrReasonUnschedulable) - } - - return state.NewStatus(state.Success) -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go index a9ca7b1d5a..62dcf163d2 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -30,57 +30,12 @@ import ( duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" ) -type SchedulerPolicyType string - const ( - // MAXFILLUP policy type adds vreplicas to existing pods to fill them up before adding to new pods - MAXFILLUP SchedulerPolicyType = "MAXFILLUP" - // PodAnnotationKey is an annotation used by the scheduler to be informed of pods // being evicted and not use it for placing vreplicas PodAnnotationKey = "eventing.knative.dev/unschedulable" ) -const ( - ZoneLabel = "topology.kubernetes.io/zone" - - UnknownZone = "unknown" -) - -const ( - // MaxWeight is the maximum weight that can be assigned for a priority. - MaxWeight uint64 = 10 - // MinWeight is the minimum weight that can be assigned for a priority. - MinWeight uint64 = 0 -) - -// Policy describes a struct of a policy resource. -type SchedulerPolicy struct { - // Holds the information to configure the fit predicate functions. - Predicates []PredicatePolicy `json:"predicates"` - // Holds the information to configure the priority functions. - Priorities []PriorityPolicy `json:"priorities"` -} - -// PredicatePolicy describes a struct of a predicate policy. -type PredicatePolicy struct { - // Identifier of the predicate policy - Name string `json:"name"` - // Holds the parameters to configure the given predicate - Args interface{} `json:"args"` -} - -// PriorityPolicy describes a struct of a priority policy. -type PriorityPolicy struct { - // Identifier of the priority policy - Name string `json:"name"` - // The numeric multiplier for the pod scores that the priority function generates - // The weight should be a positive integer - Weight uint64 `json:"weight"` - // Holds the parameters to configure the given priority function - Args interface{} `json:"args"` -} - // VPodLister is the function signature for returning a list of VPods type VPodLister func() ([]VPod, error) diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go index ad3a5aaf76..db5d9216b2 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go @@ -17,14 +17,10 @@ limitations under the License. package state import ( - "context" - "math" "strconv" "strings" - "time" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "knative.dev/eventing/pkg/scheduler" ) @@ -36,7 +32,7 @@ func PodNameFromOrdinal(name string, ordinal int32) string { func OrdinalFromPodName(podName string) int32 { ordinal, err := strconv.ParseInt(podName[strings.LastIndex(podName, "-")+1:], 10, 32) if err != nil { - return math.MaxInt32 + panic(podName + " is not a valid pod name") } return int32(ordinal) } @@ -50,31 +46,3 @@ func GetVPod(key types.NamespacedName, vpods []scheduler.VPod) scheduler.VPod { } return nil } - -func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { - zoneMap := make(map[string]struct{}) - var zoneName string - var err error - for _, podID := range feasiblePods { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - if err != nil { - continue - } - zoneMap[zoneName] = struct{}{} - } - return len(zoneMap) == int(states.NumZones) -} - -func SatisfyNodeAvailability(feasiblePods []int32, states *State) bool { - nodeMap := make(map[string]struct{}) - var nodeName string - var err error - for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - _, nodeName, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) - nodeMap[nodeName] = struct{}{} - } - return len(nodeMap) == int(states.NumNodes) -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/interface.go b/vendor/knative.dev/eventing/pkg/scheduler/state/interface.go deleted file mode 100644 index 44c7a2d4d4..0000000000 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/interface.go +++ /dev/null @@ -1,209 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - "context" - "errors" - "strings" - - "k8s.io/apimachinery/pkg/types" -) - -const ( - PodFitsResources = "PodFitsResources" - NoMaxResourceCount = "NoMaxResourceCount" - EvenPodSpread = "EvenPodSpread" - AvailabilityNodePriority = "AvailabilityNodePriority" - AvailabilityZonePriority = "AvailabilityZonePriority" - LowestOrdinalPriority = "LowestOrdinalPriority" - RemoveWithEvenPodSpreadPriority = "RemoveWithEvenPodSpreadPriority" - RemoveWithAvailabilityNodePriority = "RemoveWithAvailabilityNodePriority" - RemoveWithAvailabilityZonePriority = "RemoveWithAvailabilityZonePriority" - RemoveWithHighestOrdinalPriority = "RemoveWithHighestOrdinalPriority" -) - -// Plugin is the parent type for all the scheduling framework plugins. -type Plugin interface { - Name() string -} - -type FilterPlugin interface { - Plugin - // Filter is called by the scheduler. - // All FilterPlugins should return "Success" to declare that - // the given pod fits the vreplica. - Filter(ctx context.Context, args interface{}, state *State, key types.NamespacedName, podID int32) *Status -} - -// ScoreExtensions is an interface for Score extended functionality. -type ScoreExtensions interface { - // NormalizeScore is called for all pod scores produced by the same plugin's "Score" - // method. A successful run of NormalizeScore will update the scores list and return - // a success status. - NormalizeScore(ctx context.Context, state *State, scores PodScoreList) *Status -} - -type ScorePlugin interface { - Plugin - // Score is called by the scheduler. - // All ScorePlugins should return "Success" unless the args are invalid. - Score(ctx context.Context, args interface{}, state *State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *Status) - - // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not - ScoreExtensions() ScoreExtensions -} - -// NoMaxResourceCountArgs holds arguments used to configure the NoMaxResourceCount plugin. -type NoMaxResourceCountArgs struct { - NumPartitions int -} - -// EvenPodSpreadArgs holds arguments used to configure the EvenPodSpread plugin. -type EvenPodSpreadArgs struct { - MaxSkew int32 -} - -// AvailabilityZonePriorityArgs holds arguments used to configure the AvailabilityZonePriority plugin. -type AvailabilityZonePriorityArgs struct { - MaxSkew int32 -} - -// AvailabilityNodePriorityArgs holds arguments used to configure the AvailabilityNodePriority plugin. -type AvailabilityNodePriorityArgs struct { - MaxSkew int32 -} - -// Code is the Status code/type which is returned from plugins. -type Code int - -// These are predefined codes used in a Status. -const ( - // Success means that plugin ran correctly and found pod schedulable. - Success Code = iota - // Unschedulable is used when a plugin finds a pod unschedulable due to not satisying the predicate. - Unschedulable - // Error is used for internal plugin errors, unexpected input, etc. - Error -) - -// Status indicates the result of running a plugin. -type Status struct { - code Code - reasons []string - err error -} - -// Code returns code of the Status. -func (s *Status) Code() Code { - if s == nil { - return Success - } - return s.code -} - -// Message returns a concatenated message on reasons of the Status. -func (s *Status) Message() string { - if s == nil { - return "" - } - return strings.Join(s.reasons, ", ") -} - -// NewStatus makes a Status out of the given arguments and returns its pointer. -func NewStatus(code Code, reasons ...string) *Status { - s := &Status{ - code: code, - reasons: reasons, - } - if code == Error { - s.err = errors.New(s.Message()) - } - return s -} - -// AsStatus wraps an error in a Status. -func AsStatus(err error) *Status { - return &Status{ - code: Error, - reasons: []string{err.Error()}, - err: err, - } -} - -// AsError returns nil if the status is a success; otherwise returns an "error" object -// with a concatenated message on reasons of the Status. -func (s *Status) AsError() error { - if s.IsSuccess() { - return nil - } - if s.err != nil { - return s.err - } - return errors.New(s.Message()) -} - -// IsSuccess returns true if and only if "Status" is nil or Code is "Success". -func (s *Status) IsSuccess() bool { - return s.Code() == Success -} - -// IsError returns true if and only if "Status" is "Error". -func (s *Status) IsError() bool { - return s.Code() == Error -} - -// IsUnschedulable returns true if "Status" is Unschedulable -func (s *Status) IsUnschedulable() bool { - return s.Code() == Unschedulable -} - -type PodScore struct { - ID int32 - Score uint64 -} - -type PodScoreList []PodScore - -// PluginToPodScores declares a map from plugin name to its PodScoreList. -type PluginToPodScores map[string]PodScoreList - -// PluginToStatus maps plugin name to status. Currently used to identify which Filter plugin -// returned which status. -type PluginToStatus map[string]*Status - -// Merge merges the statuses in the map into one. The resulting status code have the following -// precedence: Error, Unschedulable, Success -func (p PluginToStatus) Merge() *Status { - if len(p) == 0 { - return nil - } - - finalStatus := NewStatus(Success) - for _, s := range p { - if s.Code() == Error { - finalStatus.err = s.AsError() - } - if s.Code() > finalStatus.code { - finalStatus.code = s.Code() - } - - finalStatus.reasons = append(finalStatus.reasons, s.reasons...) - } - - return finalStatus -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index 44069babe9..9d4503b915 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -19,14 +19,12 @@ package state import ( "context" "encoding/json" - "errors" "math" "strconv" "go.uber.org/zap" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" corev1 "k8s.io/client-go/listers/core/v1" @@ -39,7 +37,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context) (*State, error) } // state provides information about the current scheduling of all vpods @@ -61,24 +59,6 @@ type State struct { // Replicas is the (cached) number of statefulset replicas. Replicas int32 - // Number of available zones in cluster - NumZones int32 - - // Number of available nodes in cluster - NumNodes int32 - - // Scheduling policy type for placing vreplicas on pods - SchedulerPolicy scheduler.SchedulerPolicyType - - // Scheduling policy plugin for placing vreplicas on pods - SchedPolicy *scheduler.SchedulerPolicy - - // De-scheduling policy plugin for removing vreplicas from pods - DeschedPolicy *scheduler.SchedulerPolicy - - // Mapping node names of nodes currently in cluster to their zone info - NodeToZoneMap map[string]string - StatefulSetName string PodLister corev1.PodNamespaceLister @@ -86,12 +66,6 @@ type State struct { // Stores for each vpod, a map of podname to number of vreplicas placed on that pod currently PodSpread map[types.NamespacedName]map[string]int32 - // Stores for each vpod, a map of nodename to total number of vreplicas placed on all pods running on that node currently - NodeSpread map[types.NamespacedName]map[string]int32 - - // Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently - ZoneSpread map[types.NamespacedName]map[string]int32 - // Pending tracks the number of virtual replicas that haven't been scheduled yet // because there wasn't enough free capacity. Pending map[types.NamespacedName]int32 @@ -114,7 +88,7 @@ func (s *State) SetFree(ordinal int32, value int32) { s.FreeCap[int(ordinal)] = value } -// freeCapacity returns the number of vreplicas that can be used, +// FreeCapacity returns the number of vreplicas that can be used, // up to the last ordinal func (s *State) FreeCapacity() int32 { t := int32(0) @@ -124,20 +98,6 @@ func (s *State) FreeCapacity() int32 { return t } -func (s *State) GetPodInfo(podName string) (zoneName string, nodeName string, err error) { - pod, err := s.PodLister.Get(podName) - if err != nil { - return zoneName, nodeName, err - } - - nodeName = pod.Spec.NodeName - zoneName, ok := s.NodeToZoneMap[nodeName] - if !ok { - return zoneName, nodeName, errors.New("could not find zone") - } - return zoneName, nodeName, nil -} - func (s *State) IsSchedulablePod(ordinal int32) bool { for _, x := range s.SchedulablePods { if x == ordinal { @@ -151,32 +111,24 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { type stateBuilder struct { vpodLister scheduler.VPodLister capacity int32 - schedulerPolicy scheduler.SchedulerPolicyType - nodeLister corev1.NodeLister statefulSetCache *scheduler.ScaleCache statefulSetName string podLister corev1.PodNamespaceLister - schedPolicy *scheduler.SchedulerPolicy - deschedPolicy *scheduler.SchedulerPolicy } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, podlister corev1.PodNamespaceLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ vpodLister: lister, capacity: podCapacity, - schedulerPolicy: schedulerPolicy, - nodeLister: nodeLister, statefulSetCache: statefulSetCache, statefulSetName: sfsname, podLister: podlister, - schedPolicy: schedPolicy, - deschedPolicy: deschedPolicy, } } -func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err @@ -201,34 +153,6 @@ func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedN withPlacement := make(map[types.NamespacedName]map[string]bool) podSpread := make(map[types.NamespacedName]map[string]int32) - nodeSpread := make(map[types.NamespacedName]map[string]int32) - zoneSpread := make(map[types.NamespacedName]map[string]int32) - - //Build the node to zone map - nodes, err := s.nodeLister.List(labels.Everything()) - if err != nil { - return nil, err - } - - nodeToZoneMap := make(map[string]string) - zoneMap := make(map[string]struct{}) - for i := 0; i < len(nodes); i++ { - node := nodes[i] - - if isNodeUnschedulable(node) { - // Ignore node that is currently unschedulable. - continue - } - - zoneName, ok := node.GetLabels()[scheduler.ZoneLabel] - if ok && zoneName != "" { - nodeToZoneMap[node.Name] = zoneName - zoneMap[zoneName] = struct{}{} - } else { - nodeToZoneMap[node.Name] = scheduler.UnknownZone - zoneMap[scheduler.UnknownZone] = struct{}{} - } - } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) @@ -242,17 +166,6 @@ func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedN continue } - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } - - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) - continue - } - // Pod has no annotation or not annotated as unschedulable and // not on an unschedulable node, so add to feasible schedulablePods.Insert(podId) @@ -271,16 +184,11 @@ func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedN withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) - nodeSpread[vpod.GetKey()] = make(map[string]int32) - zoneSpread[vpod.GetKey()] = make(map[string]int32) for i := 0; i < len(ps); i++ { podName := ps[i].PodName vreplicas := ps[i].VReplicas - // Account for reserved vreplicas - vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true @@ -291,47 +199,15 @@ func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedN } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { - nodeName := pod.Spec.NodeName //node name for this pod - zoneName := nodeToZoneMap[nodeName] //zone name for this pod podSpread[vpod.GetKey()][podName] = podSpread[vpod.GetKey()][podName] + vreplicas - nodeSpread[vpod.GetKey()][nodeName] = nodeSpread[vpod.GetKey()][nodeName] + vreplicas - zoneSpread[vpod.GetKey()][zoneName] = zoneSpread[vpod.GetKey()][zoneName] + vreplicas - } - } - } - - // Account for reserved vreplicas with no prior placements - for key, ps := range reserved { - for podName, rvreplicas := range ps { - if wp, ok := withPlacement[key]; ok { - if _, ok := wp[podName]; ok { - // already accounted for - continue - } - - pod, err := s.podLister.Get(podName) - if err != nil { - logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) - } - - if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { - nodeName := pod.Spec.NodeName //node name for this pod - zoneName := nodeToZoneMap[nodeName] //zone name for this pod - podSpread[key][podName] = podSpread[key][podName] + rvreplicas - nodeSpread[key][nodeName] = nodeSpread[key][nodeName] + rvreplicas - zoneSpread[key][zoneName] = zoneSpread[key][zoneName] + rvreplicas - } } - - free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } - state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)), - SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, - PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} + state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister, + PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state)) return state, nil } @@ -392,27 +268,6 @@ func grow(slice []int32, ordinal int32, def int32) []int32 { return slice } -func withReserved(key types.NamespacedName, podName string, committed int32, reserved map[types.NamespacedName]map[string]int32) int32 { - if reserved != nil { - if rps, ok := reserved[key]; ok { - if rvreplicas, ok := rps[podName]; ok { - if committed == rvreplicas { - // new placement has been committed. - delete(rps, podName) - if len(rps) == 0 { - delete(reserved, key) - } - } else { - // new placement hasn't been committed yet. Adjust locally - // needed for descheduling vreps using policies - return rvreplicas - } - } - } - } - return committed -} - func isPodUnschedulable(pod *v1.Pod) bool { annotVal, ok := pod.ObjectMeta.Annotations[scheduler.PodAnnotationKey] unschedulable, err := strconv.ParseBool(annotVal) @@ -423,50 +278,22 @@ func isPodUnschedulable(pod *v1.Pod) bool { return isMarkedUnschedulable || isPending } -func isNodeUnschedulable(node *v1.Node) bool { - noExec := &v1.Taint{ - Key: "node.kubernetes.io/unreachable", - Effect: v1.TaintEffectNoExecute, - } - - noSched := &v1.Taint{ - Key: "node.kubernetes.io/unreachable", - Effect: v1.TaintEffectNoSchedule, - } - - return node.Spec.Unschedulable || - contains(node.Spec.Taints, noExec) || - contains(node.Spec.Taints, noSched) -} - -func contains(taints []v1.Taint, taint *v1.Taint) bool { - for _, v := range taints { - if v.MatchTaint(taint) { - return true - } - } - return false -} - func (s *State) MarshalJSON() ([]byte, error) { type S struct { - FreeCap []int32 `json:"freeCap"` - SchedulablePods []int32 `json:"schedulablePods"` - LastOrdinal int32 `json:"lastOrdinal"` - Capacity int32 `json:"capacity"` - Replicas int32 `json:"replicas"` - NumZones int32 `json:"numZones"` - NumNodes int32 `json:"numNodes"` - NodeToZoneMap map[string]string `json:"nodeToZoneMap"` - StatefulSetName string `json:"statefulSetName"` - PodSpread map[string]map[string]int32 `json:"podSpread"` - NodeSpread map[string]map[string]int32 `json:"nodeSpread"` - ZoneSpread map[string]map[string]int32 `json:"zoneSpread"` - SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` - SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` - DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"` - Pending map[string]int32 `json:"pending"` + FreeCap []int32 `json:"freeCap"` + SchedulablePods []int32 `json:"schedulablePods"` + LastOrdinal int32 `json:"lastOrdinal"` + Capacity int32 `json:"capacity"` + Replicas int32 `json:"replicas"` + NumZones int32 `json:"numZones"` + NumNodes int32 `json:"numNodes"` + NodeToZoneMap map[string]string `json:"nodeToZoneMap"` + StatefulSetName string `json:"statefulSetName"` + PodSpread map[string]map[string]int32 `json:"podSpread"` + NodeSpread map[string]map[string]int32 `json:"nodeSpread"` + ZoneSpread map[string]map[string]int32 `json:"zoneSpread"` + Pending map[string]int32 `json:"pending"` } sj := S{ @@ -475,23 +302,15 @@ func (s *State) MarshalJSON() ([]byte, error) { LastOrdinal: s.LastOrdinal, Capacity: s.Capacity, Replicas: s.Replicas, - NumZones: s.NumZones, - NumNodes: s.NumNodes, - NodeToZoneMap: s.NodeToZoneMap, StatefulSetName: s.StatefulSetName, - PodSpread: toJSONable(s.PodSpread), - NodeSpread: toJSONable(s.NodeSpread), - ZoneSpread: toJSONable(s.ZoneSpread), - SchedulerPolicy: s.SchedulerPolicy, - SchedPolicy: s.SchedPolicy, - DeschedPolicy: s.DeschedPolicy, + PodSpread: ToJSONable(s.PodSpread), Pending: toJSONablePending(s.Pending), } return json.Marshal(sj) } -func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[string]int32 { +func ToJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[string]int32 { r := make(map[string]map[string]int32, len(ps)) for k, v := range ps { r[k.String()] = v diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index 3245dabc16..8b61ca4a83 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/integer" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -62,7 +63,8 @@ type autoscaler struct { evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. - capacity int32 + capacity int32 + minReplicas int32 // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration @@ -113,6 +115,7 @@ func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache evictor: cfg.Evictor, trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, + minReplicas: cfg.MinReplicas, refreshPeriod: cfg.RefreshPeriod, retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), @@ -188,7 +191,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err logger := logging.FromContext(ctx).With("component", "autoscaler") ctx = logging.WithLogger(ctx, logger) - state, err := a.stateAccessor.State(ctx, a.getReserved()) + state, err := a.stateAccessor.State(ctx) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err @@ -205,46 +208,15 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) - var scaleUpFactor, newreplicas, minNumPods int32 - scaleUpFactor = 1 // Non-HA scaling - if state.SchedPolicy != nil && contains(nil, state.SchedPolicy.Priorities, st.AvailabilityZonePriority) { //HA scaling across zones - scaleUpFactor = state.NumZones - } - if state.SchedPolicy != nil && contains(nil, state.SchedPolicy.Priorities, st.AvailabilityNodePriority) { //HA scaling across nodes - scaleUpFactor = state.NumNodes - } - - newreplicas = state.LastOrdinal + 1 // Ideal number - - if state.SchedulerPolicy == scheduler.MAXFILLUP { - newreplicas = int32(math.Ceil(float64(state.TotalExpectedVReplicas()) / float64(state.Capacity))) - } else { - // Take into account pending replicas and pods that are already filled (for even pod spread) - pending := state.TotalPending() - if pending > 0 { - // Make sure to allocate enough pods for holding all pending replicas. - if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods - leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap) - minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity))) - } else { - minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity))) - } - newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor)) - } - - if newreplicas <= state.LastOrdinal { - // Make sure to never scale down past the last ordinal - newreplicas = state.LastOrdinal + scaleUpFactor - } - } + newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas) // Only scale down if permitted - if !attemptScaleDown && newreplicas < scale.Spec.Replicas { - newreplicas = scale.Spec.Replicas + if !attemptScaleDown && newReplicas < scale.Spec.Replicas { + newReplicas = scale.Spec.Replicas } - if newreplicas != scale.Spec.Replicas { - scale.Spec.Replicas = newreplicas + if newReplicas != scale.Spec.Replicas { + scale.Spec.Replicas = newReplicas logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) @@ -255,12 +227,12 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - return a.mayCompact(logger, state, scaleUpFactor) + return a.mayCompact(logger, state) } return nil } -func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period @@ -275,53 +247,33 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpF } logger.Debugw("Trying to compact and scale down", - zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! - if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { + if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 { return nil } - if s.SchedulerPolicy == scheduler.MAXFILLUP { - // Determine if there is enough free capacity to - // move all vreplicas placed in the last pod to pods with a lower ordinal - freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) - usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) - - if freeCapacity >= usedInLastPod { - a.lastCompactAttempt = time.Now() - err := a.compact(s, scaleUpFactor) - if err != nil { - return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) - } - } - - // only do 1 replica at a time to avoid overloading the scheduler with too many - // rescheduling requests. - } else if s.SchedPolicy != nil { - //Below calculation can be optimized to work for recovery scenarios when nodes/zones are lost due to failure - freeCapacity := s.FreeCapacity() - usedInLastXPods := s.Capacity * scaleUpFactor - for i := int32(0); i < scaleUpFactor && s.LastOrdinal-i >= 0; i++ { - freeCapacity = freeCapacity - s.Free(s.LastOrdinal-i) - usedInLastXPods = usedInLastXPods - s.Free(s.LastOrdinal-i) - } + // Determine if there is enough free capacity to + // move all vreplicas placed in the last pod to pods with a lower ordinal + freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) + usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) - if (freeCapacity >= usedInLastXPods) && //remaining pods can hold all vreps from evicted pods - (s.Replicas-scaleUpFactor >= scaleUpFactor) { //remaining # of pods is enough for HA scaling - a.lastCompactAttempt = time.Now() - err := a.compact(s, scaleUpFactor) - if err != nil { - return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) - } + if freeCapacity >= usedInLastPod { + a.lastCompactAttempt = time.Now() + err := a.compact(s) + if err != nil { + return fmt.Errorf("vreplicas compaction failed: %w", err) } } + + // only do 1 replica at a time to avoid overloading the scheduler with too many + // rescheduling requests. return nil } -func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { +func (a *autoscaler) compact(s *st.State) error { var pod *v1.Pod vpods, err := a.vpodLister() if err != nil { @@ -331,47 +283,20 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { for _, vpod := range vpods { placements := vpod.GetPlacements() for i := len(placements) - 1; i >= 0; i-- { //start from the last placement - for j := int32(0); j < scaleUpFactor; j++ { - ordinal := st.OrdinalFromPodName(placements[i].PodName) - - if ordinal == s.LastOrdinal-j { - pod, err = s.PodLister.Get(placements[i].PodName) - if err != nil { - return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) - } - - err = a.evictor(pod, vpod, &placements[i]) - if err != nil { - return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) - } + ordinal := st.OrdinalFromPodName(placements[i].PodName) + + if ordinal == s.LastOrdinal { + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } + + err = a.evictor(pod, vpod, &placements[i]) + if err != nil { + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } } return nil } - -func contains(preds []scheduler.PredicatePolicy, priors []scheduler.PriorityPolicy, name string) bool { - for _, v := range preds { - if v.Name == name { - return true - } - } - for _, v := range priors { - if v.Name == name { - return true - } - } - - return false -} - -func (a *autoscaler) minNonZeroInt(slice []int32) int32 { - min := a.capacity - for _, v := range slice { - if v < min && v > 0 { - min = v - } - } - return min -} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index 6995d6ff45..cf2834e7c3 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -18,9 +18,7 @@ package statefulset import ( "context" - "crypto/rand" "fmt" - "math/big" "sort" "sync" "time" @@ -28,11 +26,11 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/utils/integer" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -41,19 +39,7 @@ import ( duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" - "knative.dev/eventing/pkg/scheduler/factory" st "knative.dev/eventing/pkg/scheduler/state" - - _ "knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/evenpodspread" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/lowestordinalpriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/podfitsresources" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilitynodepriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilityzonepriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/removewithevenpodspreadpriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/core/removewithhighestordinalpriority" - _ "knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount" ) type GetReserved func() map[types.NamespacedName]map[string]int32 @@ -65,19 +51,16 @@ type Config struct { ScaleCacheConfig scheduler.ScaleCacheConfig `json:"scaleCacheConfig"` // PodCapacity max capacity for each StatefulSet's pod. PodCapacity int32 `json:"podCapacity"` + // MinReplicas is the minimum replicas of the statefulset. + MinReplicas int32 `json:"minReplicas"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` // Autoscaler retry period RetryPeriod time.Duration `json:"retryPeriod"` - SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` - SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` - DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"` - Evictor scheduler.Evictor `json:"-"` - VPodLister scheduler.VPodLister `json:"-"` - NodeLister corev1listers.NodeLister `json:"-"` + VPodLister scheduler.VPodLister `json:"-"` // Pod lister for statefulset: StatefulSetNamespace / StatefulSetName PodLister corev1listers.PodNamespaceLister `json:"-"` @@ -93,7 +76,7 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.PodLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { @@ -118,14 +101,6 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { type Pending map[types.NamespacedName]int32 -func (p Pending) Total() int32 { - t := int32(0) - for _, vr := range p { - t += vr - } - return t -} - // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { statefulSetName string @@ -152,9 +127,35 @@ var ( // Promote implements reconciler.LeaderAware. func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + if !b.Has(ephemeralLeaderElectionObject) { + return nil + } + if v, ok := s.autoscaler.(reconciler.LeaderAware); ok { return v.Promote(b, enq) } + if err := s.initReserved(); err != nil { + return err + } + return nil +} + +func (s *StatefulSetScheduler) initReserved() error { + s.reservedMu.Lock() + defer s.reservedMu.Unlock() + + vPods, err := s.vpodLister() + if err != nil { + return fmt.Errorf("failed to list vPods during init: %w", err) + } + + s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods)) + for _, vPod := range vPods { + s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements())) + for _, placement := range vPod.GetPlacements() { + s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas + } + } return nil } @@ -170,7 +171,7 @@ func newStatefulSetScheduler(ctx context.Context, stateAccessor st.StateAccessor, autoscaler Autoscaler) *StatefulSetScheduler { - scheduler := &StatefulSetScheduler{ + s := &StatefulSetScheduler{ statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), @@ -188,13 +189,16 @@ func newStatefulSetScheduler(ctx context.Context, informers.WithNamespace(cfg.StatefulSetNamespace), ) - sif.Apps().V1().StatefulSets().Informer(). + _, err := sif.Apps().V1().StatefulSets().Informer(). AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), Handler: controller.HandleAll(func(i interface{}) { - scheduler.updateStatefulset(ctx, i) + s.updateStatefulset(ctx, i) }), }) + if err != nil { + logging.FromContext(ctx).Fatalw("Failed to register informer", zap.Error(err)) + } sif.Start(ctx.Done()) _ = sif.WaitForCacheSync(ctx.Done()) @@ -204,7 +208,7 @@ func newStatefulSetScheduler(ctx context.Context, sif.Shutdown() }() - return scheduler + return s } func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { @@ -214,9 +218,6 @@ func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod defer s.reservedMu.Unlock() placements, err := s.scheduleVPod(ctx, vpod) - if placements == nil { - return placements, err - } sort.SliceStable(placements, func(i int, j int) bool { return st.OrdinalFromPodName(placements[i].PodName) < st.OrdinalFromPodName(placements[j].PodName) @@ -234,30 +235,42 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler. // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(ctx, s.reserved) + state, err := s.stateAccessor.State(ctx) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err } - // Clean up reserved from removed resources that don't appear in the vpod list anymore and have - // no pending resources. - reserved := make(map[types.NamespacedName]map[string]int32) - for k, v := range s.reserved { - if pendings, ok := state.Pending[k]; ok { - if pendings == 0 { - reserved[k] = map[string]int32{} - } else { - reserved[k] = v - } + reservedByPodName := make(map[string]int32, 2) + for _, v := range s.reserved { + for podName, vReplicas := range v { + v, _ := reservedByPodName[podName] + reservedByPodName[podName] = vReplicas + v + } + } + + // Use reserved placements as starting point, if we have them. + existingPlacements := make([]duckv1alpha1.Placement, 0) + if placements, ok := s.reserved[vpod.GetKey()]; ok { + existingPlacements = make([]duckv1alpha1.Placement, 0, len(placements)) + for podName, n := range placements { + existingPlacements = append(existingPlacements, duckv1alpha1.Placement{ + PodName: podName, + VReplicas: n, + }) } } - s.reserved = reserved - logger.Debugw("scheduling", zap.Any("state", state)) + sort.SliceStable(existingPlacements, func(i int, j int) bool { + return st.OrdinalFromPodName(existingPlacements[i].PodName) < st.OrdinalFromPodName(existingPlacements[j].PodName) + }) - existingPlacements := vpod.GetPlacements() - var left int32 + logger.Debugw("scheduling state", + zap.Any("state", state), + zap.Any("reservedByPodName", reservedByPodName), + zap.Any("reserved", st.ToJSONable(s.reserved)), + zap.Any("vpod", vpod), + ) // Remove unschedulable or adjust overcommitted pods from placements var placements []duckv1alpha1.Placement @@ -272,23 +285,26 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler. } // Handle overcommitted pods. - if state.Free(ordinal) < 0 { + reserved, _ := reservedByPodName[p.PodName] + if state.Capacity-reserved < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 - overcommit := -state.FreeCap[ordinal] + overcommit := -(state.Capacity - reserved) logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit + reservedByPodName[p.PodName] -= overcommit p.VReplicas = p.VReplicas - overcommit } else { state.SetFree(ordinal, p.VReplicas-overcommit) state.Pending[vpod.GetKey()] += p.VReplicas + reservedByPodName[p.PodName] -= p.VReplicas p.VReplicas = 0 } @@ -314,51 +330,25 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler. return placements, nil } - if state.SchedulerPolicy != "" { - // Need less => scale down - if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), - zap.Any("placements", placements), - zap.Any("existingPlacements", existingPlacements)) - - placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) - - // Do not trigger the autoscaler to avoid unnecessary churn - - return placements, nil - } - - // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + // Need less => scale down + if tr > vpod.GetVReplicas() { + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), zap.Any("placements", placements), zap.Any("existingPlacements", existingPlacements)) - placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) + placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) - } else { //Predicates and priorities must be used for scheduling - // Need less => scale down - if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), - zap.Any("placements", placements), - zap.Any("existingPlacements", existingPlacements)) - placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) + // Do not trigger the autoscaler to avoid unnecessary churn - // Do not trigger the autoscaler to avoid unnecessary churn - - return placements, nil - } + return placements, nil + } - if state.SchedPolicy != nil { + // Need more => scale up + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) - // Need more => scale up - // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation - // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), - zap.Any("placements", placements), - zap.Any("existingPlacements", existingPlacements)) - placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) - } - } + placements, left := s.addReplicas(state, reservedByPodName, vpod, vpod.GetVReplicas()-tr, placements) if left > 0 { // Give time for the autoscaler to do its job @@ -370,12 +360,6 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler. s.autoscaler.Autoscale(ctx) } - if state.SchedulerPolicy == "" && state.SchedPolicy != nil { - logger.Info("reverting to previous placements") - s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed - return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job - } - return placements, s.notEnoughPodReplicas(left) } @@ -384,408 +368,125 @@ func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler. return placements, nil } -func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { - r := make(map[string]int32, len(pending)) - for k, v := range pending { - r[k.String()] = v - } - return r -} - -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list - - return placements, diff -} - -func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := logging.FromContext(ctx).Named("remove replicas with policy") - numVreps := diff - - for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(ctx, s.reserved) - if err != nil { - logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) - return placements - } - - feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) - feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) - if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod - placementPodID := feasiblePods[0] - logger.Infof("Selected pod #%v to remove vreplica #%v from", placementPodID, i) - placements = s.removeSelectionFromPlacements(placementPodID, placements) - state.SetFree(placementPodID, state.Free(placementPodID)+1) - s.reservePlacements(vpod, placements) - continue - } - - priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) - if err != nil { - logger.Info("error while scoring pods using priorities", zap.Error(err)) - s.reservePlacements(vpod, placements) - break - } - - placementPodID, err := s.selectPod(priorityList) - if err != nil { - logger.Info("error while selecting the placement pod", zap.Error(err)) - s.reservePlacements(vpod, placements) - break - } - - logger.Infof("Selected pod #%v to remove vreplica #%v from", placementPodID, i) - placements = s.removeSelectionFromPlacements(placementPodID, placements) - state.SetFree(placementPodID, state.Free(placementPodID)+1) - s.reservePlacements(vpod, placements) - } - return placements -} - -func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { +func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) - - for i := 0; i < len(placements); i++ { - ordinal := st.OrdinalFromPodName(placements[i].PodName) - if placementPodID == ordinal { - if placements[i].VReplicas == 1 { - // remove the entire placement - } else { - newPlacements = append(newPlacements, duckv1alpha1.Placement{ - PodName: placements[i].PodName, - VReplicas: placements[i].VReplicas - 1, - }) - } + for i := len(placements) - 1; i > -1; i-- { + if diff >= placements[i].VReplicas { + // remove the entire placement + diff -= placements[i].VReplicas } else { newPlacements = append(newPlacements, duckv1alpha1.Placement{ PodName: placements[i].PodName, - VReplicas: placements[i].VReplicas, + VReplicas: placements[i].VReplicas - diff, }) + diff = 0 } } return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := logging.FromContext(ctx).Named("add replicas with policy") - - numVreps := diff - for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) - // Get the current placements state - state, err := s.stateAccessor.State(ctx, s.reserved) - if err != nil { - logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) - return placements, diff - } - - if s.replicas == 0 { //no pods to filter - logger.Infow("no pods available in statefulset") - s.reservePlacements(vpod, placements) - diff = numVreps - i //for autoscaling up - break //end the iteration for all vreps since there are not pods - } - - feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) - if len(feasiblePods) == 0 { //no pods available to schedule this vreplica - logger.Info("no feasible pods available to schedule this vreplica") - s.reservePlacements(vpod, placements) - diff = numVreps - i //for autoscaling up and possible rebalancing - break - } - - /* if len(feasiblePods) == 1 { //nothing to score, place vrep on that pod (Update: for HA, must run HA scorers) - placementPodID := feasiblePods[0] - logger.Infof("Selected pod #%v for vreplica #%v ", placementPodID, i) - placements = s.addSelectionToPlacements(placementPodID, placements) - //state.SetFree(placementPodID, state.Free(placementPodID)-1) - s.reservePlacements(vpod, placements) - diff-- - continue - } */ - - priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) - if err != nil { - logger.Info("error while scoring pods using priorities", zap.Error(err)) - s.reservePlacements(vpod, placements) - diff = numVreps - i //for autoscaling up and possible rebalancing - break - } - - placementPodID, err := s.selectPod(priorityList) - if err != nil { - logger.Info("error while selecting the placement pod", zap.Error(err)) - s.reservePlacements(vpod, placements) - diff = numVreps - i //for autoscaling up and possible rebalancing - break - } - - logger.Infof("Selected pod #%v for vreplica #%v", placementPodID, i) - placements = s.addSelectionToPlacements(placementPodID, placements) - state.SetFree(placementPodID, state.Free(placementPodID)-1) - s.reservePlacements(vpod, placements) - diff-- +func (s *StatefulSetScheduler) addReplicas(states *st.State, reservedByPodName map[string]int32, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + if states.Replicas <= 0 { + return placements, diff } - return placements, diff -} -func (s *StatefulSetScheduler) addSelectionToPlacements(placementPodID int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - seen := false - - for i := 0; i < len(placements); i++ { - ordinal := st.OrdinalFromPodName(placements[i].PodName) - if placementPodID == ordinal { - seen = true - placements[i].VReplicas = placements[i].VReplicas + 1 - } - } - if !seen { - placements = append(placements, duckv1alpha1.Placement{ - PodName: st.PodNameFromOrdinal(s.statefulSetName, placementPodID), - VReplicas: 1, - }) - } - return placements -} + newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) -// findFeasiblePods finds the pods that fit the filter plugins -func (s *StatefulSetScheduler) findFeasiblePods(ctx context.Context, state *st.State, vpod scheduler.VPod, policy *scheduler.SchedulerPolicy) []int32 { - feasiblePods := make([]int32, 0) - for _, podId := range state.SchedulablePods { - statusMap := s.RunFilterPlugins(ctx, state, vpod, podId, policy) - status := statusMap.Merge() - if status.IsSuccess() { - feasiblePods = append(feasiblePods, podId) - } + // Preserve existing placements + for _, p := range placements { + newPlacements = append(newPlacements, *p.DeepCopy()) } - return feasiblePods -} + candidates := s.candidatesOrdered(states, vpod, placements) -// removePodsNotInPlacement removes pods that do not have vreplicas placed -func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, feasiblePods []int32) []int32 { - newFeasiblePods := make([]int32, 0) - for _, e := range vpod.GetPlacements() { - for _, podID := range feasiblePods { - if podID == st.OrdinalFromPodName(e.PodName) { //if pod is in current placement list - newFeasiblePods = append(newFeasiblePods, podID) + // Spread replicas in as many candidates as possible. + foundFreeCandidate := true + for diff > 0 && foundFreeCandidate { + foundFreeCandidate = false + for _, ordinal := range candidates { + if diff <= 0 { + break } - } - } - - return newFeasiblePods -} -// prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. -// The scores from each plugin are added together to make the score for that pod. -func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := logging.FromContext(ctx).Named("prioritize all feasible pods") - - // If no priority configs are provided, then all pods will have a score of one - result := make(st.PodScoreList, 0, len(feasiblePods)) - if !s.HasScorePlugins(states, policy) { - for _, podID := range feasiblePods { - result = append(result, st.PodScore{ - ID: podID, - Score: 1, - }) - } - return result, nil - } + podName := st.PodNameFromOrdinal(states.StatefulSetName, ordinal) + reserved, _ := reservedByPodName[podName] + // Is there space? + if states.Capacity-reserved > 0 { + foundFreeCandidate = true + allocation := int32(1) - scoresMap, scoreStatus := s.RunScorePlugins(ctx, states, vpod, feasiblePods, policy) - if !scoreStatus.IsSuccess() { - logger.Infof("FAILURE! Cannot score feasible pods due to plugin errors %v", scoreStatus.AsError()) - return nil, scoreStatus.AsError() - } - - // Summarize all scores. - for i := range feasiblePods { - result = append(result, st.PodScore{ID: feasiblePods[i], Score: 0}) - for j := range scoresMap { - result[i].Score += scoresMap[j][i].Score - } - } - - return result, nil -} + newPlacements = upsertPlacements(newPlacements, duckv1alpha1.Placement{ + PodName: st.PodNameFromOrdinal(states.StatefulSetName, ordinal), + VReplicas: allocation, + }) -// selectPod takes a prioritized list of pods and then picks one -func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, error) { - if len(podScoreList) == 0 { - return -1, fmt.Errorf("empty priority list") //no selected pod - } - - maxScore := podScoreList[0].Score - selected := podScoreList[0].ID - cntOfMaxScore := int64(1) - for _, ps := range podScoreList[1:] { - if ps.Score > maxScore { - maxScore = ps.Score - selected = ps.ID - cntOfMaxScore = 1 - } else if ps.Score == maxScore { //if equal scores, randomly picks one - cntOfMaxScore++ - randNum, err := rand.Int(rand.Reader, big.NewInt(cntOfMaxScore)) - if err != nil { - return -1, fmt.Errorf("failed to generate random number") - } - if randNum.Int64() == int64(0) { - selected = ps.ID + diff -= allocation + reservedByPodName[podName] += allocation } } } - return selected, nil -} -// RunFilterPlugins runs the set of configured Filter plugins for a vrep on the given pod. -// If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. -// Meanwhile, the failure message and status are set for the given pod. -func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := logging.FromContext(ctx).Named("run all filter plugins") - - statuses := make(st.PluginToStatus) - for _, plugin := range policy.Predicates { - pl, err := factory.GetFilterPlugin(plugin.Name) - if err != nil { - logger.Error("Could not find filter plugin in Registry: ", plugin.Name) - continue - } - - //logger.Infof("Going to run filter plugin: %s using state: %v ", pl.Name(), states) - pluginStatus := s.runFilterPlugin(ctx, pl, plugin.Args, states, vpod, podID) - if !pluginStatus.IsSuccess() { - if !pluginStatus.IsUnschedulable() { - errStatus := st.NewStatus(st.Error, fmt.Sprintf("running %q filter plugin for pod %q failed with: %v", pl.Name(), podID, pluginStatus.Message())) - return map[string]*st.Status{pl.Name(): errStatus} //TODO: if one plugin fails, then no more plugins are run - } - statuses[pl.Name()] = pluginStatus - return statuses - } + if len(newPlacements) == 0 { + return nil, diff } - - return statuses + return newPlacements, diff } -func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.FilterPlugin, args interface{}, states *st.State, vpod scheduler.VPod, podID int32) *st.Status { - status := pl.Filter(ctx, args, states, vpod.GetKey(), podID) - return status -} +func (s *StatefulSetScheduler) candidatesOrdered(states *st.State, vpod scheduler.VPod, placements []duckv1alpha1.Placement) []int32 { + existingPlacements := sets.New[string]() + candidates := make([]int32, len(states.SchedulablePods)) -// RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). -// It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. -func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := logging.FromContext(ctx).Named("run all score plugins") + firstIdx := 0 + lastIdx := len(candidates) - 1 - pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) - for _, plugin := range policy.Priorities { - pl, err := factory.GetScorePlugin(plugin.Name) - if err != nil { - logger.Error("Could not find score plugin in registry: ", plugin.Name) + // De-prioritize existing placements pods, add existing placements to the tail of the candidates. + // Start from the last one so that within the "existing replicas" group, we prioritize lower ordinals + // to reduce compaction. + for i := len(placements) - 1; i >= 0; i-- { + placement := placements[i] + ordinal := st.OrdinalFromPodName(placement.PodName) + if !states.IsSchedulablePod(ordinal) { continue } - - //logger.Infof("Going to run score plugin: %s using state: %v ", pl.Name(), states) - pluginToPodScores[pl.Name()] = make(st.PodScoreList, len(feasiblePods)) - for index, podID := range feasiblePods { - score, pluginStatus := s.runScorePlugin(ctx, pl, plugin.Args, states, feasiblePods, vpod, podID) - if !pluginStatus.IsSuccess() { - errStatus := st.NewStatus(st.Error, fmt.Sprintf("running %q scoring plugin for pod %q failed with: %v", pl.Name(), podID, pluginStatus.AsError())) - return pluginToPodScores, errStatus //TODO: if one plugin fails, then no more plugins are run - } - - score = score * plugin.Weight //WEIGHED SCORE VALUE - //logger.Infof("scoring plugin %q produced score %v for pod %q: %v", pl.Name(), score, podID, pluginStatus) - pluginToPodScores[pl.Name()][index] = st.PodScore{ - ID: podID, - Score: score, - } - } - - status := pl.ScoreExtensions().NormalizeScore(ctx, states, pluginToPodScores[pl.Name()]) //NORMALIZE SCORES FOR ALL FEASIBLE PODS - if !status.IsSuccess() { - errStatus := st.NewStatus(st.Error, fmt.Sprintf("running %q scoring plugin failed with: %v", pl.Name(), status.AsError())) - return pluginToPodScores, errStatus + // This should really never happen as placements are de-duped, however, better to handle + // edge cases in case the prerequisite doesn't hold in the future. + if existingPlacements.Has(placement.PodName) { + continue } + candidates[lastIdx] = ordinal + lastIdx-- + existingPlacements.Insert(placement.PodName) } - return pluginToPodScores, st.NewStatus(st.Success) -} - -func (s *StatefulSetScheduler) runScorePlugin(ctx context.Context, pl st.ScorePlugin, args interface{}, states *st.State, feasiblePods []int32, vpod scheduler.VPod, podID int32) (uint64, *st.Status) { - score, status := pl.Score(ctx, args, states, feasiblePods, vpod.GetKey(), podID) - return score, status -} - -// HasScorePlugins returns true if at least one score plugin is defined. -func (s *StatefulSetScheduler) HasScorePlugins(state *st.State, policy *scheduler.SchedulerPolicy) bool { - return len(policy.Priorities) > 0 -} - -func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) - for i := len(placements) - 1; i > -1; i-- { - if diff >= placements[i].VReplicas { - // remove the entire placement - diff -= placements[i].VReplicas - } else { - newPlacements = append(newPlacements, duckv1alpha1.Placement{ - PodName: placements[i].PodName, - VReplicas: placements[i].VReplicas - diff, - }) - diff = 0 + // Prioritize reserved placements that don't appear in the committed placements. + if reserved, ok := s.reserved[vpod.GetKey()]; ok { + for podName := range reserved { + if !states.IsSchedulablePod(st.OrdinalFromPodName(podName)) { + continue + } + if existingPlacements.Has(podName) { + continue + } + candidates[firstIdx] = st.OrdinalFromPodName(podName) + firstIdx++ + existingPlacements.Insert(podName) } } - return newPlacements -} - -func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - // Pod affinity algorithm: prefer adding replicas to existing pods before considering other replicas - newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) - - // Add to existing - for i := 0; i < len(placements); i++ { - podName := placements[i].PodName - ordinal := st.OrdinalFromPodName(podName) - - // Is there space in PodName? - f := states.Free(ordinal) - if diff >= 0 && f > 0 { - allocation := integer.Int32Min(f, diff) - newPlacements = append(newPlacements, duckv1alpha1.Placement{ - PodName: podName, - VReplicas: placements[i].VReplicas + allocation, - }) - diff -= allocation - states.SetFree(ordinal, f-allocation) - } else { - newPlacements = append(newPlacements, placements[i]) + // Add all the ordinals to the candidates list. + // De-prioritize the last ordinals over lower ordinals so that we reduce the chances for compaction. + for ordinal := s.replicas - 1; ordinal >= 0; ordinal-- { + if !states.IsSchedulablePod(ordinal) { + continue } - } - - if diff > 0 { - // Needs to allocate replicas to additional pods - for ordinal := int32(0); ordinal < s.replicas; ordinal++ { - f := states.Free(ordinal) - if f > 0 { - allocation := integer.Int32Min(f, diff) - newPlacements = append(newPlacements, duckv1alpha1.Placement{ - PodName: st.PodNameFromOrdinal(s.statefulSetName, ordinal), - VReplicas: allocation, - }) - - diff -= allocation - states.SetFree(ordinal, f-allocation) - } - - if diff == 0 { - break - } + podName := st.PodNameFromOrdinal(states.StatefulSetName, ordinal) + if existingPlacements.Has(podName) { + continue } + candidates[lastIdx] = ordinal + lastIdx-- } - - return newPlacements, diff + return candidates } func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { @@ -808,31 +509,17 @@ func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interf func (s *StatefulSetScheduler) reservePlacements(vpod scheduler.VPod, placements []duckv1alpha1.Placement) { if len(placements) == 0 { // clear our old placements in reserved - s.reserved[vpod.GetKey()] = make(map[string]int32) + delete(s.reserved, vpod.GetKey()) + return } + s.reserved[vpod.GetKey()] = make(map[string]int32, len(placements)) + for _, p := range placements { - // note: track all vreplicas, not only the new ones since - // the next time `state()` is called some vreplicas might - // have been committed. - if _, ok := s.reserved[vpod.GetKey()]; !ok { - s.reserved[vpod.GetKey()] = make(map[string]int32) - } s.reserved[vpod.GetKey()][p.PodName] = p.VReplicas } } -func (s *StatefulSetScheduler) makeZeroPlacements(vpod scheduler.VPod, placements []duckv1alpha1.Placement) { - newPlacements := make([]duckv1alpha1.Placement, len(placements)) - for i := 0; i < len(placements); i++ { - newPlacements[i].PodName = placements[i].PodName - newPlacements[i].VReplicas = 0 - } - // This is necessary to make sure State() zeroes out initial pod/node/zone spread and - // free capacity when there are existing placements for a vpod - s.reservePlacements(vpod, newPlacements) -} - // newNotEnoughPodReplicas returns an error explaining what is the problem, what are the actions we're taking // to try to fix it (retry), wrapping a controller.requeueKeyError which signals to ReconcileKind to requeue the // object after a given delay. @@ -859,3 +546,18 @@ func (s *StatefulSetScheduler) Reserved() map[types.NamespacedName]map[string]in return r } + +func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha1.Placement) []duckv1alpha1.Placement { + found := false + for i := range placements { + if placements[i].PodName == placement.PodName { + placements[i].VReplicas = placements[i].VReplicas + placement.VReplicas + found = true + break + } + } + if !found { + placements = append(placements, placement) + } + return placements +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e396ec203a..fa2f36ae1d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1167,7 +1167,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7 +# knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358 ## explicit; go 1.22.0 knative.dev/eventing/cmd/event_display knative.dev/eventing/cmd/heartbeats @@ -1277,17 +1277,6 @@ knative.dev/eventing/pkg/reconciler/testing knative.dev/eventing/pkg/reconciler/testing/scheme knative.dev/eventing/pkg/reconciler/testing/v1 knative.dev/eventing/pkg/scheduler -knative.dev/eventing/pkg/scheduler/factory -knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority -knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority -knative.dev/eventing/pkg/scheduler/plugins/core/evenpodspread -knative.dev/eventing/pkg/scheduler/plugins/core/lowestordinalpriority -knative.dev/eventing/pkg/scheduler/plugins/core/podfitsresources -knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilitynodepriority -knative.dev/eventing/pkg/scheduler/plugins/core/removewithavailabilityzonepriority -knative.dev/eventing/pkg/scheduler/plugins/core/removewithevenpodspreadpriority -knative.dev/eventing/pkg/scheduler/plugins/core/removewithhighestordinalpriority -knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount knative.dev/eventing/pkg/scheduler/state knative.dev/eventing/pkg/scheduler/statefulset knative.dev/eventing/pkg/tracing