From f9ed54ea394572439205fabea7015b5c19b22ed8 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Thu, 19 Dec 2024 14:05:05 +0100 Subject: [PATCH] queue item fix Signed-off-by: Per Goncalves da Silva --- pkg/controller/operators/catalog/operator.go | 56 ++++++++++------- .../operators/catalog/subscription/config.go | 4 +- .../operators/catalogtemplate/operator.go | 6 +- pkg/controller/operators/olm/operator.go | 60 +++++++++++-------- pkg/controller/operators/olm/operatorgroup.go | 5 +- pkg/lib/kubestate/kubestate.go | 7 +++ pkg/lib/queueinformer/config.go | 10 ++-- pkg/lib/queueinformer/queueinformer.go | 2 +- .../queueinformer/queueinformer_operator.go | 22 +++---- pkg/lib/queueinformer/resourcequeue.go | 8 +-- pkg/package-server/server/server.go | 9 +-- 11 files changed, 108 insertions(+), 81 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..de47d98389 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "reflect" "sort" "strings" @@ -114,7 +115,7 @@ type Operator struct { subQueueSet *queueinformer.ResourceQueueSet ipQueueSet *queueinformer.ResourceQueueSet ogQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.TypedRateLimitingInterface[any] + nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] namespace string recorder record.EventRecorder sources *grpc.SourceStore @@ -268,8 +269,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire InstallPlans ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister()) - ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ips", }) op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue) @@ -290,8 +292,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ogs", }) op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) @@ -312,8 +315,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catsrcs", }) op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue) @@ -341,8 +345,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subIndexer := subInformer.Informer().GetIndexer() op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer - subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "subs", }) op.subQueueSet.Set(metav1.NamespaceAll, subQueue) @@ -415,9 +420,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -560,9 +568,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()}) logger.Info("registering owner reference fixer") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -745,8 +756,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolve", }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( @@ -787,12 +799,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { if err == nil { for ns := range namespaces { - o.nsResolveQueue.Add(ns) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns)) } } } - o.nsResolveQueue.Add(state.Key.Namespace) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace)) } if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") @@ -1411,7 +1423,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } logger.Info("unpacking is not complete yet, requeueing") - o.nsResolveQueue.AddAfter(namespace, 5*time.Second) + o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second) return nil } } @@ -1506,7 +1518,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return fmt.Errorf("casting Subscription failed") } - o.nsResolveQueue.Add(sub.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace())) return nil } @@ -1520,7 +1532,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error { return fmt.Errorf("casting OperatorGroup failed") } - o.nsResolveQueue.Add(og.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace())) return nil } diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index c4c1877b64..1e16395154 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -23,7 +23,7 @@ type syncerConfig struct { subscriptionInformer cache.SharedIndexInformer catalogInformer cache.SharedIndexInformer installPlanInformer cache.SharedIndexInformer - subscriptionQueue workqueue.TypedRateLimitingInterface[any] + subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption { } // WithSubscriptionQueue sets a syncer's subscription queue. -func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption { +func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption { return func(config *syncerConfig) { config.subscriptionQueue = subscriptionQueue } diff --git a/pkg/controller/operators/catalogtemplate/operator.go b/pkg/controller/operators/catalogtemplate/operator.go index ea10454506..b0fadb2193 100644 --- a/pkg/controller/operators/catalogtemplate/operator.go +++ b/pkg/controller/operators/catalogtemplate/operator.go @@ -3,6 +3,7 @@ package catalogtemplate import ( "context" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "strings" "time" @@ -101,8 +102,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catalogSourceTemplate", }) op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index e6dfaaf1d0..4280049724 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "strings" "sync" "time" @@ -83,11 +84,11 @@ type Operator struct { copiedCSVLister metadatalister.Lister ogQueueSet *queueinformer.ResourceQueueSet csvQueueSet *queueinformer.ResourceQueueSet - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvCopyQueueSet *queueinformer.ResourceQueueSet copiedCSVGCQueueSet *queueinformer.ResourceQueueSet - nsQueueSet workqueue.TypedRateLimitingInterface[any] - apiServiceQueue workqueue.TypedRateLimitingInterface[any] + nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] + apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvIndexers map[string]cache.Indexer recorder record.EventRecorder resolver install.StrategyResolverInterface @@ -198,17 +199,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat client: config.externalClient, ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), csvQueueSet: queueinformer.NewEmptyResourceQueueSet(), - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(), copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(), - apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "apiservice", }), resolver: config.strategyResolver, @@ -246,9 +247,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ).Operators().V1alpha1().ClusterServiceVersions() informersByNamespace[namespace].CSVInformer = csvInformer op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) - csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv", namespace), }) op.csvQueueSet.Set(namespace, csvQueue) @@ -273,7 +274,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat op.csvIndexers[namespace] = csvIndexer // Register separate queue for copying csvs - csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace)) + csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: fmt.Sprintf("%s/csv-copy", namespace), + }) op.csvCopyQueueSet.Set(namespace, csvCopyQueue) csvCopyQueueInformer, err := queueinformer.NewQueueInformer( ctx, @@ -307,9 +312,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister // Register separate queue for gcing copied csvs - copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv-gc", namespace), }) op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue) @@ -333,9 +338,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups() informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/og", namespace), }) op.ogQueueSet.Set(namespace, ogQueue) @@ -522,9 +527,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -696,9 +704,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces() informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }) namespaceInformer.Informer().AddEventHandler( @@ -1665,7 +1673,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5) + go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ diff --git a/pkg/controller/operators/olm/operatorgroup.go b/pkg/controller/operators/olm/operatorgroup.go index 18c8b19008..d253303943 100644 --- a/pkg/controller/operators/olm/operatorgroup.go +++ b/pkg/controller/operators/olm/operatorgroup.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "math/big" "reflect" "strings" @@ -182,7 +183,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error { logger.Debug("Requeueing out of sync namespaces") for _, ns := range outOfSyncNamespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } // CSV requeue is handled by the succeeding sync in `annotateCSVs` @@ -263,7 +264,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) { logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces") for _, ns := range op.Status.Namespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } } diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3f656069de..b249cf26ff 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} { return r.resource } +func NewUpdateEvent(resource interface{}) ResourceEvent { + return resourceEvent{ + eventType: ResourceUpdated, + resource: resource, + } +} + func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { return resourceEvent{ eventType: eventType, diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index bd69d2403b..3842aa9db4 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -14,7 +14,7 @@ import ( type queueInformerConfig struct { provider metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc @@ -105,9 +105,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) { func defaultConfig() *queueInformerConfig { return &queueInformerConfig{ provider: metrics.NewMetricsNil(), - queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "default", }), logger: logrus.New(), @@ -130,7 +130,7 @@ func WithLogger(logger *logrus.Logger) Option { } // WithQueue sets the queue used by a QueueInformer. -func WithQueue(queue workqueue.RateLimitingInterface) Option { +func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option { return func(config *queueInformerConfig) { config.queue = queue } diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 02a66cb527..6187a8da5b 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -23,7 +23,7 @@ type QueueInformer struct { metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..ae9baa2a30 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -273,8 +273,8 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - event, ok := item.(kubestate.ResourceEvent) - if !ok || event.Type() != kubestate.ResourceDeleted { + var syncErr error + if item.Type() != kubestate.ResourceDeleted { // Get the key key, keyable := loop.key(item) if !keyable { @@ -287,7 +287,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) var resource interface{} if loop.indexer == nil { - resource = event.Resource() + resource = item.Resource() } else { // Get the current cached version of the resource var exists bool @@ -304,26 +304,22 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) return true } } - - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) - } + syncErr = loop.Sync(ctx, kubestate.NewResourceEvent(item.Type(), resource)) + } else { + syncErr = loop.Sync(ctx, item) } // Sync and requeue on error (throw out failed deletion syncs) - err := loop.Sync(ctx, event) - if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { + if requeues := queue.NumRequeues(item); syncErr != nil && requeues < 8 && item.Type() != kubestate.ResourceDeleted { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") - utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) + utilruntime.HandleError(errors.Wrap(syncErr, fmt.Sprintf("sync %q failed", item))) queue.AddRateLimited(item) return true } queue.Forget(item) select { - case o.syncCh <- err: + case o.syncCh <- syncErr: default: } diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 0e4da56cde..6881b9784b 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -14,22 +14,22 @@ import ( // ResourceQueueSet is a set of workqueues that is assumed to be keyed by namespace type ResourceQueueSet struct { - queueSet map[string]workqueue.RateLimitingInterface + queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] mutex sync.RWMutex } // NewResourceQueueSet returns a new queue set with the given queue map -func NewResourceQueueSet(queueSet map[string]workqueue.RateLimitingInterface) *ResourceQueueSet { +func NewResourceQueueSet(queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) *ResourceQueueSet { return &ResourceQueueSet{queueSet: queueSet} } // NewEmptyResourceQueueSet returns a new queue set with an empty but initialized queue map func NewEmptyResourceQueueSet() *ResourceQueueSet { - return &ResourceQueueSet{queueSet: make(map[string]workqueue.RateLimitingInterface)} + return &ResourceQueueSet{queueSet: make(map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent])} } // Set sets the queue at the given key -func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface) { +func (r *ResourceQueueSet) Set(key string, queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) { r.mutex.Lock() defer r.mutex.Unlock() r.queueSet[key] = queue diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index 85ad4931dd..1415dd53c4 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "io" "net" "os" @@ -35,7 +36,7 @@ const DefaultWakeupInterval = 12 * time.Hour type Operator struct { queueinformer.Operator - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] options *PackageServerOptions } @@ -239,9 +240,9 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { op := &Operator{ Operator: queueOperator, - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), options: o,