diff --git a/internal/kstatus/watcher/object_status_reporter.go b/internal/kstatus/watcher/object_status_reporter.go index c4ef2b27..3f66d1d7 100644 --- a/internal/kstatus/watcher/object_status_reporter.go +++ b/internal/kstatus/watcher/object_status_reporter.go @@ -10,6 +10,7 @@ import ( "sync" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -133,6 +134,7 @@ type ObjectStatusReporter struct { started bool stopped bool + name string } func (in *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event { @@ -284,6 +286,11 @@ func (in *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) { in.watcherRefs[gkn].Stop() } +// restartInformer the informer watching the specified GroupKindNamespace. +func (in *ObjectStatusReporter) restartInformer(gkn GroupKindNamespace) { + in.watcherRefs[gkn].Restart() +} + func (in *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) { realClock := &clock.RealClock{} // TODO nolint can be removed once https://github.com/kubernetes/kubernetes/issues/118638 is resolved @@ -297,13 +304,18 @@ func (in *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn ) if err != nil { if meta.IsNoMatchError(err) { - // CRD (or api extension) not installed - // TODO: retry if CRDs are not being watched - klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err) + klog.V(3).Infof("Watch start error: %v: %v", gkn, err) // Cancel the parent context, which will stop the retries too. in.stopInformer(gkn) return } + if apierrors.IsNotFound(err) { + // CRD (or api extension) not installed + klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err) + // doesn't cancel the parent context, which will not stop the retries too. + in.restartInformer(gkn) + return + } // Create a temporary input channel to send the error event. eventCh := make(chan event.Event) diff --git a/internal/kstatus/watcher/watcher.go b/internal/kstatus/watcher/watcher.go index 8164a24e..483fd3f6 100644 --- a/internal/kstatus/watcher/watcher.go +++ b/internal/kstatus/watcher/watcher.go @@ -23,6 +23,7 @@ type DynamicStatusWatcher struct { // informerRefs tracks which informers have been started and stopped by the ObjectStatusReporter informerRefs map[GroupKindNamespace]*watcherReference + name string } func (in *DynamicStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts kwatcher.Options) <-chan event.Event { @@ -76,12 +77,13 @@ func (in *DynamicStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadat LabelSelector: labelSelector, DynamicClient: in.DynamicClient, watcherRefs: in.informerRefs, + name: in.name, } return informer.Start(ctx) } -func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper, options Options) kwatcher.StatusWatcher { +func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper, options Options, name string) kwatcher.StatusWatcher { var informerRefs map[GroupKindNamespace]*watcherReference if options.UseInformerRefCache { informerRefs = make(map[GroupKindNamespace]*watcherReference) @@ -95,5 +97,6 @@ func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMa // Custom options Options: options, informerRefs: informerRefs, + name: name, } } diff --git a/internal/kstatus/watcher/watcher_reference.go b/internal/kstatus/watcher/watcher_reference.go index 34ec6848..2291da47 100644 --- a/internal/kstatus/watcher/watcher_reference.go +++ b/internal/kstatus/watcher/watcher_reference.go @@ -82,3 +82,20 @@ func (ir *watcherReference) Stop() { ir.started = false ir.context = nil } + +// Restart ... +func (ir *watcherReference) Restart() { + ir.lock.Lock() + defer ir.lock.Unlock() + + if !ir.started { + return + } + + if ir.watcher != nil { + ir.watcher.Stop() + } + + ir.started = false + ir.context = nil +} diff --git a/pkg/applier/builder.go b/pkg/applier/builder.go index bd7b280f..02beb30c 100644 --- a/pkg/applier/builder.go +++ b/pkg/applier/builder.go @@ -90,7 +90,7 @@ func (cb *commonBuilder) finalize() (*commonBuilder, error) { Labels: common.ManagedByAgentLabelSelector(), Fields: nil, }, - }) + }, "applier") } return &cx, nil } diff --git a/pkg/cache/resource_cache.go b/pkg/cache/resource_cache.go index ee0b9520..ea44ea3e 100644 --- a/pkg/cache/resource_cache.go +++ b/pkg/cache/resource_cache.go @@ -101,7 +101,7 @@ func Init(ctx context.Context, config *rest.Config, ttl time.Duration) { Labels: common.ManagedByAgentLabelSelector(), Fields: nil, }, - }) + }, "resourceCache") resourceCache = &ResourceCache{ ctx: ctx,