From 41339431a8fc7b12cf784cd1fc098f4b10f39be5 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Thu, 19 Dec 2024 14:05:05 +0100 Subject: [PATCH 1/2] queue item fix Signed-off-by: Per Goncalves da Silva --- pkg/controller/operators/catalog/operator.go | 57 ++++++++++------- .../operators/catalog/operator_test.go | 12 ++-- .../operators/catalog/subscription/config.go | 4 +- .../operators/catalogtemplate/operator.go | 7 ++- pkg/controller/operators/olm/operator.go | 61 +++++++++++-------- pkg/controller/operators/olm/operatorgroup.go | 6 +- pkg/lib/kubestate/kubestate.go | 7 +++ pkg/lib/queueinformer/config.go | 10 +-- pkg/lib/queueinformer/queueinformer.go | 2 +- .../queueinformer/queueinformer_operator.go | 15 ++--- pkg/lib/queueinformer/resourcequeue.go | 8 +-- pkg/package-server/server/server.go | 10 +-- 12 files changed, 116 insertions(+), 83 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..42094728e8 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper" errorwrap "github.com/pkg/errors" @@ -114,7 +116,7 @@ type Operator struct { subQueueSet *queueinformer.ResourceQueueSet ipQueueSet *queueinformer.ResourceQueueSet ogQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.TypedRateLimitingInterface[any] + nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] namespace string recorder record.EventRecorder sources *grpc.SourceStore @@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire InstallPlans ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister()) - ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ips", }) op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue) @@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ogs", }) op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) @@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catsrcs", }) op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue) @@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subIndexer := subInformer.Informer().GetIndexer() op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer - subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "subs", }) op.subQueueSet.Set(metav1.NamespaceAll, subQueue) @@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()}) logger.Info("registering owner reference fixer") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolve", }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( @@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { if err == nil { for ns := range namespaces { - o.nsResolveQueue.Add(ns) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns)) } } } - o.nsResolveQueue.Add(state.Key.Namespace) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace)) } if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") @@ -1411,7 +1424,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } logger.Info("unpacking is not complete yet, requeueing") - o.nsResolveQueue.AddAfter(namespace, 5*time.Second) + o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second) return nil } } @@ -1506,7 +1519,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return fmt.Errorf("casting Subscription failed") } - o.nsResolveQueue.Add(sub.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace())) return nil } @@ -1520,7 +1533,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error { return fmt.Errorf("casting OperatorGroup failed") } - o.nsResolveQueue.Add(og.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace())) return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index c251cfc77b..79575e02ae 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -13,6 +13,8 @@ import ( "testing/quick" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/utils/ptr" controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" @@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, client: clientFake, lister: lister, namespace: namespace, - nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.NewTypedMaxOfRateLimiter[any]( - workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second), + nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent]( + workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second), // 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, + &workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, ), - workqueue.TypedRateLimitingQueueConfig[any]{ + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }), resolver: config.resolver, diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index c4c1877b64..1e16395154 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -23,7 +23,7 @@ type syncerConfig struct { subscriptionInformer cache.SharedIndexInformer catalogInformer cache.SharedIndexInformer installPlanInformer cache.SharedIndexInformer - subscriptionQueue workqueue.TypedRateLimitingInterface[any] + subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption { } // WithSubscriptionQueue sets a syncer's subscription queue. -func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption { +func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption { return func(config *syncerConfig) { config.subscriptionQueue = subscriptionQueue } diff --git a/pkg/controller/operators/catalogtemplate/operator.go b/pkg/controller/operators/catalogtemplate/operator.go index ea10454506..9c4ba85b74 100644 --- a/pkg/controller/operators/catalogtemplate/operator.go +++ b/pkg/controller/operators/catalogtemplate/operator.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/distribution/reference" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/sirupsen/logrus" @@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catalogSourceTemplate", }) op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index e6dfaaf1d0..721a23e055 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins" "github.com/sirupsen/logrus" @@ -83,11 +85,11 @@ type Operator struct { copiedCSVLister metadatalister.Lister ogQueueSet *queueinformer.ResourceQueueSet csvQueueSet *queueinformer.ResourceQueueSet - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvCopyQueueSet *queueinformer.ResourceQueueSet copiedCSVGCQueueSet *queueinformer.ResourceQueueSet - nsQueueSet workqueue.TypedRateLimitingInterface[any] - apiServiceQueue workqueue.TypedRateLimitingInterface[any] + nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] + apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvIndexers map[string]cache.Indexer recorder record.EventRecorder resolver install.StrategyResolverInterface @@ -198,17 +200,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat client: config.externalClient, ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), csvQueueSet: queueinformer.NewEmptyResourceQueueSet(), - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(), copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(), - apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "apiservice", }), resolver: config.strategyResolver, @@ -246,9 +248,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ).Operators().V1alpha1().ClusterServiceVersions() informersByNamespace[namespace].CSVInformer = csvInformer op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) - csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv", namespace), }) op.csvQueueSet.Set(namespace, csvQueue) @@ -273,7 +275,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat op.csvIndexers[namespace] = csvIndexer // Register separate queue for copying csvs - csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace)) + csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: fmt.Sprintf("%s/csv-copy", namespace), + }) op.csvCopyQueueSet.Set(namespace, csvCopyQueue) csvCopyQueueInformer, err := queueinformer.NewQueueInformer( ctx, @@ -307,9 +313,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister // Register separate queue for gcing copied csvs - copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv-gc", namespace), }) op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue) @@ -333,9 +339,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups() informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/og", namespace), }) op.ogQueueSet.Set(namespace, ogQueue) @@ -522,9 +528,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -696,9 +705,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces() informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }) namespaceInformer.Informer().AddEventHandler( @@ -1665,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5) + go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ diff --git a/pkg/controller/operators/olm/operatorgroup.go b/pkg/controller/operators/olm/operatorgroup.go index 18c8b19008..cb382cd9b3 100644 --- a/pkg/controller/operators/olm/operatorgroup.go +++ b/pkg/controller/operators/olm/operatorgroup.go @@ -8,6 +8,8 @@ import ( "reflect" "strings" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/apimachinery/pkg/api/equality" "github.com/sirupsen/logrus" @@ -182,7 +184,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error { logger.Debug("Requeueing out of sync namespaces") for _, ns := range outOfSyncNamespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } // CSV requeue is handled by the succeeding sync in `annotateCSVs` @@ -263,7 +265,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) { logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces") for _, ns := range op.Status.Namespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } } diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3f656069de..b249cf26ff 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} { return r.resource } +func NewUpdateEvent(resource interface{}) ResourceEvent { + return resourceEvent{ + eventType: ResourceUpdated, + resource: resource, + } +} + func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { return resourceEvent{ eventType: eventType, diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index bd69d2403b..3842aa9db4 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -14,7 +14,7 @@ import ( type queueInformerConfig struct { provider metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc @@ -105,9 +105,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) { func defaultConfig() *queueInformerConfig { return &queueInformerConfig{ provider: metrics.NewMetricsNil(), - queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "default", }), logger: logrus.New(), @@ -130,7 +130,7 @@ func WithLogger(logger *logrus.Logger) Option { } // WithQueue sets the queue used by a QueueInformer. -func WithQueue(queue workqueue.RateLimitingInterface) Option { +func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option { return func(config *queueInformerConfig) { config.queue = queue } diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 02a66cb527..6187a8da5b 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -23,7 +23,7 @@ type QueueInformer struct { metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..3cd4a143b5 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -273,8 +273,8 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - event, ok := item.(kubestate.ResourceEvent) - if !ok || event.Type() != kubestate.ResourceDeleted { + var event = item + if item.Type() != kubestate.ResourceDeleted { // Get the key key, keyable := loop.key(item) if !keyable { @@ -287,7 +287,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) var resource interface{} if loop.indexer == nil { - resource = event.Resource() + resource = item.Resource() } else { // Get the current cached version of the resource var exists bool @@ -304,17 +304,12 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) return true } } - - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) - } + event = kubestate.NewResourceEvent(item.Type(), resource) } // Sync and requeue on error (throw out failed deletion syncs) err := loop.Sync(ctx, event) - if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { + if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && item.Type() != kubestate.ResourceDeleted { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) queue.AddRateLimited(item) diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 0e4da56cde..6881b9784b 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -14,22 +14,22 @@ import ( // ResourceQueueSet is a set of workqueues that is assumed to be keyed by namespace type ResourceQueueSet struct { - queueSet map[string]workqueue.RateLimitingInterface + queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] mutex sync.RWMutex } // NewResourceQueueSet returns a new queue set with the given queue map -func NewResourceQueueSet(queueSet map[string]workqueue.RateLimitingInterface) *ResourceQueueSet { +func NewResourceQueueSet(queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) *ResourceQueueSet { return &ResourceQueueSet{queueSet: queueSet} } // NewEmptyResourceQueueSet returns a new queue set with an empty but initialized queue map func NewEmptyResourceQueueSet() *ResourceQueueSet { - return &ResourceQueueSet{queueSet: make(map[string]workqueue.RateLimitingInterface)} + return &ResourceQueueSet{queueSet: make(map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent])} } // Set sets the queue at the given key -func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface) { +func (r *ResourceQueueSet) Set(key string, queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) { r.mutex.Lock() defer r.mutex.Unlock() r.queueSet[key] = queue diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index 85ad4931dd..030bf001f6 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -8,6 +8,8 @@ import ( "os" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,7 +37,7 @@ const DefaultWakeupInterval = 12 * time.Hour type Operator struct { queueinformer.Operator - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] options *PackageServerOptions } @@ -239,9 +241,9 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { op := &Operator{ Operator: queueOperator, - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), options: o, From 3ffc106fd00385d0e89386a25d3e44d932581908 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Fri, 20 Dec 2024 18:11:50 +0100 Subject: [PATCH 2/2] refactor key function and ensure items always carry strings Signed-off-by: Per Goncalves da Silva --- pkg/lib/queueinformer/config.go | 23 ++------ pkg/lib/queueinformer/queueinformer.go | 33 +++++------ .../queueinformer/queueinformer_operator.go | 56 +++++++++---------- 3 files changed, 49 insertions(+), 63 deletions(-) diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index 3842aa9db4..99457834b5 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -82,24 +82,13 @@ func (c *queueInformerConfig) validateQueue() (err error) { func defaultKeyFunc(obj interface{}) (string, bool) { // Get keys nested in resource events up to depth 2 - keyable := false - for d := 0; d < 2 && !keyable; d++ { - switch v := obj.(type) { - case string: - return v, true - case kubestate.ResourceEvent: - obj = v.Resource() - default: - keyable = true - } + switch v := obj.(type) { + case string: + return v, true + default: + k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + return k, err == nil } - - k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - return k, false - } - - return k, true } func defaultConfig() *queueInformerConfig { diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 6187a8da5b..ae7fbb9677 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -35,29 +35,23 @@ func (q *QueueInformer) Sync(ctx context.Context, event kubestate.ResourceEvent) return q.syncer.Sync(ctx, event) } -// Enqueue adds a key to the queue. If obj is a key already it gets added directly. +// enqueue adds a key to the queue. If obj is a key already it gets added directly. // Otherwise, the key is extracted via keyFunc. -func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { - if event == nil { +func (q *QueueInformer) enqueue(eventType kubestate.ResourceEventType, obj interface{}) { + if obj == nil { // Don't enqueue nil events return } - resource := event.Resource() - if event.Type() == kubestate.ResourceDeleted { - // Get object from tombstone if possible - if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok { - resource = tombstone - } - } else { - // Extract key for add and update events - if key, ok := q.key(resource); ok { - resource = key - } + // Extract key for add and update events + key, ok := q.key(obj) + if !ok { + q.logger.Warnf("object %v not keyable: %v", obj, key) + return } // Create new resource event and add to queue - e := kubestate.NewResourceEvent(event.Type(), resource) + e := kubestate.NewResourceEvent(eventType, key) q.logger.WithField("event", e).Trace("enqueuing resource event") q.queue.Add(e) } @@ -72,13 +66,16 @@ func (q *QueueInformer) key(obj interface{}) (string, bool) { func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj)) + q.enqueue(kubestate.ResourceUpdated, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, newObj)) + q.enqueue(kubestate.ResourceUpdated, newObj) }, DeleteFunc: func(obj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceDeleted, obj)) + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone + } + q.enqueue(kubestate.ResourceDeleted, obj) }, } } diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 3cd4a143b5..684f797d50 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -273,39 +273,39 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - var event = item - if item.Type() != kubestate.ResourceDeleted { - // Get the key - key, keyable := loop.key(item) - if !keyable { - logger.WithField("item", item).Warn("could not form key") + if _, ok := item.Resource().(string); !ok { + panic(fmt.Sprintf("unexpected item resource type: %T", item.Resource())) + } + + key, keyable := loop.key(item.Resource()) + if !keyable { + logger.WithField("item", item).Warn("could not form key") + queue.Forget(item) + return true + } + + logger = logger.WithField("cache-key", key) + + var resource interface{} + if loop.indexer == nil { + resource = item.Resource() + } else { + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err = loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") queue.Forget(item) return true } - - logger = logger.WithField("cache-key", key) - - var resource interface{} - if loop.indexer == nil { - resource = item.Resource() - } else { - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err = loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true } - event = kubestate.NewResourceEvent(item.Type(), resource) } + event := kubestate.NewResourceEvent(item.Type(), resource) // Sync and requeue on error (throw out failed deletion syncs) err := loop.Sync(ctx, event)