diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index bf1e761f..4a23998c 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -87,7 +87,7 @@ type SharedInformerManager struct { type DeleteObjHandler func(ctx context.Context, id string) error func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error { - start := time.Now() + registrationTime := time.Now() apiVersion, kind := watchResource.ApiVersion, watchResource.Kind @@ -107,6 +107,8 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource) _, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { + receivedAt := time.Now().Round(time.Second) + u, err := getUnstructuredFromInformedObj(watchResource, obj) if err != nil { ctx.Counter("kubernetes_informer_errors", @@ -127,19 +129,24 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 "type", "add", "kind", u.GetKind(), "scraper_id", ctx.ScraperID(), - "valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(start), "true", "false"), + "valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(registrationTime), "true", "false"), ).Add(1) // This is a way to avoid instrumenting old objects so they don't skew the lag time. - if u.GetCreationTimestamp().Time.After(start) { + if u.GetCreationTimestamp().Time.After(registrationTime) { ctx.Histogram("informer_receive_lag", informerLagBuckets, "scraper", ctx.ScraperID(), "kind", watchResource.Kind, "operation", "add", - ).Record(time.Duration(time.Since(u.GetCreationTimestamp().Time).Milliseconds())) + ).Record(time.Duration(u.GetCreationTimestamp().Time.Sub(receivedAt).Milliseconds())) } }, UpdateFunc: func(oldObj any, newObj any) { + // Kubernetes object timestamps are only precise to the second, so we round + // the current time to the nearest second to avoid incorrectly marking + // timestamps as being in the past due to millisecond differences. + receivedAt := time.Now().UTC().Round(time.Second) + u, err := getUnstructuredFromInformedObj(watchResource, newObj) if err != nil { ctx.Counter("kubernetes_informer_errors", @@ -155,24 +162,24 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) } - lastUpdatedTime := health.GetLastUpdatedTime(u) - lastUpdatedInPast := lastUpdatedTime != nil && lastUpdatedTime.After(u.GetCreationTimestamp().Time) && lastUpdatedTime.Before(start) - if lastUpdatedInPast { + lastUpdatedTime := lo.FromPtr(health.GetLastUpdatedTime(u)) + lastUpdatedInFuture := lastUpdatedTime.After(receivedAt) + if !lastUpdatedInFuture { ctx.Histogram("informer_receive_lag", informerLagBuckets, "scraper", ctx.ScraperID(), "kind", watchResource.Kind, "operation", "update", - ).Record(time.Duration(time.Since(*lastUpdatedTime).Milliseconds())) + ).Record(time.Duration(receivedAt.Sub(lastUpdatedTime).Milliseconds())) } else { - ctx.Warnf("%s/%s/%s has last updated time %s into the future. now=%s, lastupdatedTime=%s", - u.GetKind(), u.GetNamespace(), u.GetName(), time.Until(*lastUpdatedTime), start, *lastUpdatedTime) + ctx.Warnf("%s/%s/%s has last updated time %s into the future. receivedAt=%s, lastupdatedTime=%s", + u.GetKind(), u.GetNamespace(), u.GetName(), lastUpdatedTime.Sub(receivedAt), receivedAt, lastUpdatedTime) } ctx.Counter("kubernetes_informer_events", "type", "update", "kind", u.GetKind(), "scraper_id", ctx.ScraperID(), - "valid_timestamp", lo.Ternary(lastUpdatedInPast, "true", "false"), + "valid_timestamp", lo.Ternary(!lastUpdatedInFuture, "true", "false"), ).Add(1) queue.Enqueue(NewQueueItem(u, QueueItemOperationUpdate))