Skip to content

Commit

Permalink
fix: metric lag calculation for update events
Browse files Browse the repository at this point in the history
the lastUpdated time was calculated against the informer's registration
time instead of the current time.
  • Loading branch information
adityathebe committed Dec 9, 2024
1 parent 1413865 commit 198f687
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type SharedInformerManager struct {
type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error {
start := time.Now()
registrationTime := time.Now()

apiVersion, kind := watchResource.ApiVersion, watchResource.Kind

Expand Down Expand Up @@ -127,11 +127,11 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
"type", "add",
"kind", u.GetKind(),
"scraper_id", ctx.ScraperID(),
"valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(start), "true", "false"),
"valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(registrationTime), "true", "false"),
).Add(1)

// This is a way to avoid instrumenting old objects so they don't skew the lag time.
if u.GetCreationTimestamp().Time.After(start) {
if u.GetCreationTimestamp().Time.After(registrationTime) {
ctx.Histogram("informer_receive_lag", informerLagBuckets,
"scraper", ctx.ScraperID(),
"kind", watchResource.Kind,
Expand All @@ -140,6 +140,11 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
}
},
UpdateFunc: func(oldObj any, newObj any) {
// Because the kubernetes object timestamp are precise only upto a second.
// we need to round down the current time to the second.
// Else the current time will be just a few milliseconds ahead and we the last updated time in the past.
now := time.Now().UTC().Truncate(time.Second)

u, err := getUnstructuredFromInformedObj(watchResource, newObj)
if err != nil {
ctx.Counter("kubernetes_informer_errors",
Expand All @@ -156,23 +161,23 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
}

lastUpdatedTime := health.GetLastUpdatedTime(u)
lastUpdatedInPast := lastUpdatedTime != nil && lastUpdatedTime.After(u.GetCreationTimestamp().Time) && lastUpdatedTime.Before(start)
if lastUpdatedInPast {
lastUpdatedInFuture := lastUpdatedTime != nil && lastUpdatedTime.After(now)
if !lastUpdatedInFuture {
ctx.Histogram("informer_receive_lag", informerLagBuckets,
"scraper", ctx.ScraperID(),
"kind", watchResource.Kind,
"operation", "update",
).Record(time.Duration(time.Since(*lastUpdatedTime).Milliseconds()))
).Record(time.Duration(now.Sub(*lastUpdatedTime).Milliseconds()))
} else {
ctx.Warnf("%s/%s/%s has last updated time %s into the future. now=%s, lastupdatedTime=%s",
u.GetKind(), u.GetNamespace(), u.GetName(), time.Until(*lastUpdatedTime), start, *lastUpdatedTime)
u.GetKind(), u.GetNamespace(), u.GetName(), lastUpdatedTime.Sub(now), now, *lastUpdatedTime)
}

ctx.Counter("kubernetes_informer_events",
"type", "update",
"kind", u.GetKind(),
"scraper_id", ctx.ScraperID(),
"valid_timestamp", lo.Ternary(lastUpdatedInPast, "true", "false"),
"valid_timestamp", lo.Ternary(!lastUpdatedInFuture, "true", "false"),
).Add(1)

queue.Enqueue(NewQueueItem(u, QueueItemOperationUpdate))
Expand Down

0 comments on commit 198f687

Please sign in to comment.