Skip to content

Commit

Permalink
queue item fix
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Dec 19, 2024
1 parent b7aa493 commit f9ed54e
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 81 deletions.
56 changes: 34 additions & 22 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -114,7 +115,7 @@ type Operator struct {
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
namespace string
recorder record.EventRecorder
sources *grpc.SourceStore
Expand Down Expand Up @@ -268,8 +269,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire InstallPlans
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ips",
})
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
Expand All @@ -290,8 +292,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ogs",
})
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
Expand All @@ -312,8 +315,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catsrcs",
})
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
Expand Down Expand Up @@ -341,8 +345,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer

subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "subs",
})
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
Expand Down Expand Up @@ -415,9 +420,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -560,9 +568,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
logger.Info("registering owner reference fixer")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -745,8 +756,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolve",
})
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -787,12 +799,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns))
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace))
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down Expand Up @@ -1411,7 +1423,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}

logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second)
return nil
}
}
Expand Down Expand Up @@ -1506,7 +1518,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return fmt.Errorf("casting Subscription failed")
}

o.nsResolveQueue.Add(sub.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace()))

return nil
}
Expand All @@ -1520,7 +1532,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace()))

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/subscription/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type syncerConfig struct {
subscriptionInformer cache.SharedIndexInformer
catalogInformer cache.SharedIndexInformer
installPlanInformer cache.SharedIndexInformer
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
Expand Down Expand Up @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
}

// WithSubscriptionQueue sets a syncer's subscription queue.
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption {
return func(config *syncerConfig) {
config.subscriptionQueue = subscriptionQueue
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/operators/catalogtemplate/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalogtemplate
import (
"context"
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"strings"
"time"

Expand Down Expand Up @@ -101,8 +102,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catalogSourceTemplate",
})
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)
Expand Down
60 changes: 34 additions & 26 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -83,11 +84,11 @@ type Operator struct {
copiedCSVLister metadatalister.Lister
ogQueueSet *queueinformer.ResourceQueueSet
csvQueueSet *queueinformer.ResourceQueueSet
olmConfigQueue workqueue.TypedRateLimitingInterface[any]
olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
csvCopyQueueSet *queueinformer.ResourceQueueSet
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
nsQueueSet workqueue.TypedRateLimitingInterface[any]
apiServiceQueue workqueue.TypedRateLimitingInterface[any]
nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
csvIndexers map[string]cache.Indexer
recorder record.EventRecorder
resolver install.StrategyResolverInterface
Expand Down Expand Up @@ -198,17 +199,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
client: config.externalClient,
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "olmConfig",
}),

csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "apiservice",
}),
resolver: config.strategyResolver,
Expand Down Expand Up @@ -246,9 +247,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
).Operators().V1alpha1().ClusterServiceVersions()
informersByNamespace[namespace].CSVInformer = csvInformer
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv", namespace),
})
op.csvQueueSet.Set(namespace, csvQueue)
Expand All @@ -273,7 +274,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
op.csvIndexers[namespace] = csvIndexer

// Register separate queue for copying csvs
csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace))
csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv-copy", namespace),
})
op.csvCopyQueueSet.Set(namespace, csvCopyQueue)
csvCopyQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -307,9 +312,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister

// Register separate queue for gcing copied csvs
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv-gc", namespace),
})
op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue)
Expand All @@ -333,9 +338,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/og", namespace),
})
op.ogQueueSet.Set(namespace, ogQueue)
Expand Down Expand Up @@ -522,9 +527,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -696,9 +704,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces()
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolver",
})
namespaceInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -1665,7 +1673,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
}

if err == nil {
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5)
}

logger := a.logger.WithFields(logrus.Fields{
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/operators/olm/operatorgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"math/big"
"reflect"
"strings"
Expand Down Expand Up @@ -182,7 +183,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error {
logger.Debug("Requeueing out of sync namespaces")
for _, ns := range outOfSyncNamespaces {
logger.WithField("namespace", ns).Debug("requeueing")
a.nsQueueSet.Add(ns)
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
}

// CSV requeue is handled by the succeeding sync in `annotateCSVs`
Expand Down Expand Up @@ -263,7 +264,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) {
logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces")
for _, ns := range op.Status.Namespaces {
logger.WithField("namespace", ns).Debug("requeueing")
a.nsQueueSet.Add(ns)
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/lib/kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} {
return r.resource
}

func NewUpdateEvent(resource interface{}) ResourceEvent {
return resourceEvent{
eventType: ResourceUpdated,
resource: resource,
}
}

func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
return resourceEvent{
eventType: eventType,
Expand Down
10 changes: 5 additions & 5 deletions pkg/lib/queueinformer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type queueInformerConfig struct {
provider metrics.MetricsProvider
logger *logrus.Logger
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
informer cache.SharedIndexInformer
indexer cache.Indexer
keyFunc KeyFunc
Expand Down Expand Up @@ -105,9 +105,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) {
func defaultConfig() *queueInformerConfig {
return &queueInformerConfig{
provider: metrics.NewMetricsNil(),
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "default",
}),
logger: logrus.New(),
Expand All @@ -130,7 +130,7 @@ func WithLogger(logger *logrus.Logger) Option {
}

// WithQueue sets the queue used by a QueueInformer.
func WithQueue(queue workqueue.RateLimitingInterface) Option {
func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option {
return func(config *queueInformerConfig) {
config.queue = queue
}
Expand Down
Loading

0 comments on commit f9ed54e

Please sign in to comment.