Skip to content

Commit

Permalink
queue item fix
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Dec 20, 2024
1 parent b7aa493 commit 4133943
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 83 deletions.
57 changes: 35 additions & 22 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -114,7 +116,7 @@ type Operator struct {
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
namespace string
recorder record.EventRecorder
sources *grpc.SourceStore
Expand Down Expand Up @@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire InstallPlans
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ips",
})
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
Expand All @@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ogs",
})
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
Expand All @@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catsrcs",
})
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
Expand Down Expand Up @@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer

subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "subs",
})
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
Expand Down Expand Up @@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
logger.Info("registering owner reference fixer")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolve",
})
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns))
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace))
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down Expand Up @@ -1411,7 +1424,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}

logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second)
return nil
}
}
Expand Down Expand Up @@ -1506,7 +1519,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return fmt.Errorf("casting Subscription failed")
}

o.nsResolveQueue.Add(sub.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace()))

return nil
}
Expand All @@ -1520,7 +1533,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace()))

return nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing/quick"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"k8s.io/utils/ptr"

controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
Expand Down Expand Up @@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
client: clientFake,
lister: lister,
namespace: namespace,
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.NewTypedMaxOfRateLimiter[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second),
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent](
workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second),
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
&workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
),
workqueue.TypedRateLimitingQueueConfig[any]{
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolver",
}),
resolver: config.resolver,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/subscription/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type syncerConfig struct {
subscriptionInformer cache.SharedIndexInformer
catalogInformer cache.SharedIndexInformer
installPlanInformer cache.SharedIndexInformer
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
Expand Down Expand Up @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
}

// WithSubscriptionQueue sets a syncer's subscription queue.
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption {
return func(config *syncerConfig) {
config.subscriptionQueue = subscriptionQueue
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/operators/catalogtemplate/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"github.com/distribution/reference"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catalogSourceTemplate",
})
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)
Expand Down
61 changes: 35 additions & 26 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -83,11 +85,11 @@ type Operator struct {
copiedCSVLister metadatalister.Lister
ogQueueSet *queueinformer.ResourceQueueSet
csvQueueSet *queueinformer.ResourceQueueSet
olmConfigQueue workqueue.TypedRateLimitingInterface[any]
olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
csvCopyQueueSet *queueinformer.ResourceQueueSet
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
nsQueueSet workqueue.TypedRateLimitingInterface[any]
apiServiceQueue workqueue.TypedRateLimitingInterface[any]
nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
csvIndexers map[string]cache.Indexer
recorder record.EventRecorder
resolver install.StrategyResolverInterface
Expand Down Expand Up @@ -198,17 +200,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
client: config.externalClient,
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "olmConfig",
}),

csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "apiservice",
}),
resolver: config.strategyResolver,
Expand Down Expand Up @@ -246,9 +248,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
).Operators().V1alpha1().ClusterServiceVersions()
informersByNamespace[namespace].CSVInformer = csvInformer
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv", namespace),
})
op.csvQueueSet.Set(namespace, csvQueue)
Expand All @@ -273,7 +275,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
op.csvIndexers[namespace] = csvIndexer

// Register separate queue for copying csvs
csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace))
csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv-copy", namespace),
})
op.csvCopyQueueSet.Set(namespace, csvCopyQueue)
csvCopyQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -307,9 +313,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister

// Register separate queue for gcing copied csvs
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/csv-gc", namespace),
})
op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue)
Expand All @@ -333,9 +339,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: fmt.Sprintf("%s/og", namespace),
})
op.ogQueueSet.Set(namespace, ogQueue)
Expand Down Expand Up @@ -522,9 +528,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -696,9 +705,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces()
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolver",
})
namespaceInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -1665,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
}

if err == nil {
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5)
}

logger := a.logger.WithFields(logrus.Fields{
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/operators/olm/operatorgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"reflect"
"strings"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"k8s.io/apimachinery/pkg/api/equality"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -182,7 +184,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error {
logger.Debug("Requeueing out of sync namespaces")
for _, ns := range outOfSyncNamespaces {
logger.WithField("namespace", ns).Debug("requeueing")
a.nsQueueSet.Add(ns)
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
}

// CSV requeue is handled by the succeeding sync in `annotateCSVs`
Expand Down Expand Up @@ -263,7 +265,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) {
logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces")
for _, ns := range op.Status.Namespaces {
logger.WithField("namespace", ns).Debug("requeueing")
a.nsQueueSet.Add(ns)
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/lib/kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} {
return r.resource
}

func NewUpdateEvent(resource interface{}) ResourceEvent {
return resourceEvent{
eventType: ResourceUpdated,
resource: resource,
}
}

func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
return resourceEvent{
eventType: eventType,
Expand Down
Loading

0 comments on commit 4133943

Please sign in to comment.