From 218ff3362053ee671a71f8c4a032bb49289d1f0f Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 5 Dec 2024 19:34:00 +0545 Subject: [PATCH] fix: consume lag metrics --- scrapers/incremental.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/scrapers/incremental.go b/scrapers/incremental.go index 5167a838..bf42dc3c 100644 --- a/scrapers/incremental.go +++ b/scrapers/incremental.go @@ -74,6 +74,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q return fmt.Errorf("unexpected item in the priority queue: %T", val) } obj := queueItem.Obj + queuedTime[string(obj.GetUID())] = queueItem.Timestamp if queueItem.Operation == kubernetes.QueueItemOperationDelete { deletedObjects = append(deletedObjects, string(obj.GetUID())) @@ -100,7 +101,6 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q seenObjects[string(obj.GetUID())] = struct{}{} } - queuedTime[string(obj.GetUID())] = queueItem.Timestamp objs = append(objs, obj) } @@ -137,7 +137,12 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q } for _, obj := range objs { - lag := time.Since(queuedTime[string(obj.GetUID())]) + queuedtime, ok := queuedTime[string(obj.GetUID())] + if !ok { + continue // involved objects have 0 queuedtime as they never enter the queue + } + + lag := time.Since(queuedtime) ctx.Histogram("informer_consume_lag", consumeLagBuckets, "scraper", sc.ScraperID(), "kind", obj.GetKind()). Record(time.Duration(lag.Milliseconds())) }