Skip to content

Commit

Permalink
retry when CRD not installed
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Jul 17, 2024
1 parent ab209c4 commit 750d3c1
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
18 changes: 15 additions & 3 deletions internal/kstatus/watcher/object_status_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -133,6 +134,7 @@ type ObjectStatusReporter struct {

started bool
stopped bool
name string
}

func (in *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
Expand Down Expand Up @@ -284,6 +286,11 @@ func (in *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) {
in.watcherRefs[gkn].Stop()
}

// restartInformer the informer watching the specified GroupKindNamespace.
func (in *ObjectStatusReporter) restartInformer(gkn GroupKindNamespace) {
in.watcherRefs[gkn].Restart()
}

func (in *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) {
realClock := &clock.RealClock{}
// TODO nolint can be removed once https://github.com/kubernetes/kubernetes/issues/118638 is resolved
Expand All @@ -297,13 +304,18 @@ func (in *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn
)
if err != nil {
if meta.IsNoMatchError(err) {
// CRD (or api extension) not installed
// TODO: retry if CRDs are not being watched
klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
klog.V(3).Infof("Watch start error: %v: %v", gkn, err)
// Cancel the parent context, which will stop the retries too.
in.stopInformer(gkn)
return
}
if apierrors.IsNotFound(err) {
// CRD (or api extension) not installed
klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
// doesn't cancel the parent context, which will not stop the retries too.
in.restartInformer(gkn)
return
}

// Create a temporary input channel to send the error event.
eventCh := make(chan event.Event)
Expand Down
5 changes: 4 additions & 1 deletion internal/kstatus/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type DynamicStatusWatcher struct {

// informerRefs tracks which informers have been started and stopped by the ObjectStatusReporter
informerRefs map[GroupKindNamespace]*watcherReference
name string
}

func (in *DynamicStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts kwatcher.Options) <-chan event.Event {
Expand Down Expand Up @@ -76,12 +77,13 @@ func (in *DynamicStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadat
LabelSelector: labelSelector,
DynamicClient: in.DynamicClient,
watcherRefs: in.informerRefs,
name: in.name,
}

return informer.Start(ctx)
}

func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper, options Options) kwatcher.StatusWatcher {
func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper, options Options, name string) kwatcher.StatusWatcher {
var informerRefs map[GroupKindNamespace]*watcherReference
if options.UseInformerRefCache {
informerRefs = make(map[GroupKindNamespace]*watcherReference)
Expand All @@ -95,5 +97,6 @@ func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMa
// Custom options
Options: options,
informerRefs: informerRefs,
name: name,
}
}
17 changes: 17 additions & 0 deletions internal/kstatus/watcher/watcher_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,20 @@ func (ir *watcherReference) Stop() {
ir.started = false
ir.context = nil
}

// Restart ...
func (ir *watcherReference) Restart() {
ir.lock.Lock()
defer ir.lock.Unlock()

if !ir.started {
return
}

if ir.watcher != nil {
ir.watcher.Stop()
}

ir.started = false
ir.context = nil
}
2 changes: 1 addition & 1 deletion pkg/applier/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (cb *commonBuilder) finalize() (*commonBuilder, error) {
Labels: common.ManagedByAgentLabelSelector(),
Fields: nil,
},
})
}, "applier")
}
return &cx, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/resource_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func Init(ctx context.Context, config *rest.Config, ttl time.Duration) {
Labels: common.ManagedByAgentLabelSelector(),
Fields: nil,
},
})
}, "resourceCache")

resourceCache = &ResourceCache{
ctx: ctx,
Expand Down

0 comments on commit 750d3c1

Please sign in to comment.