Skip to content

Commit

Permalink
fix: metric lag calculation for update events
Browse files Browse the repository at this point in the history
the lastUpdated time was calculated against the informer's registration
time instead of the current time.
  • Loading branch information
adityathebe committed Dec 9, 2024
1 parent 1413865 commit c3905aa
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type SharedInformerManager struct {
type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error {
start := time.Now()
registrationTime := time.Now()

apiVersion, kind := watchResource.ApiVersion, watchResource.Kind

Expand Down Expand Up @@ -127,11 +127,11 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
"type", "add",
"kind", u.GetKind(),
"scraper_id", ctx.ScraperID(),
"valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(start), "true", "false"),
"valid_timestamp", lo.Ternary(u.GetCreationTimestamp().Time.After(registrationTime), "true", "false"),
).Add(1)

// This is a way to avoid instrumenting old objects so they don't skew the lag time.
if u.GetCreationTimestamp().Time.After(start) {
if u.GetCreationTimestamp().Time.After(registrationTime) {
ctx.Histogram("informer_receive_lag", informerLagBuckets,
"scraper", ctx.ScraperID(),
"kind", watchResource.Kind,
Expand All @@ -140,6 +140,11 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
}
},
UpdateFunc: func(oldObj any, newObj any) {
// Kubernetes object timestamps are only precise to the second, so we round
// the current time to the nearest second to avoid incorrectly marking
// timestamps as being in the past due to millisecond differences.
now := time.Now().UTC().Round(time.Second)

u, err := getUnstructuredFromInformedObj(watchResource, newObj)
if err != nil {
ctx.Counter("kubernetes_informer_errors",
Expand All @@ -155,24 +160,24 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
}

lastUpdatedTime := health.GetLastUpdatedTime(u)
lastUpdatedInPast := lastUpdatedTime != nil && lastUpdatedTime.After(u.GetCreationTimestamp().Time) && lastUpdatedTime.Before(start)
if lastUpdatedInPast {
lastUpdatedTime := lo.FromPtr(health.GetLastUpdatedTime(u))
lastUpdatedInFuture := lastUpdatedTime.After(now)
if !lastUpdatedInFuture {
ctx.Histogram("informer_receive_lag", informerLagBuckets,
"scraper", ctx.ScraperID(),
"kind", watchResource.Kind,
"operation", "update",
).Record(time.Duration(time.Since(*lastUpdatedTime).Milliseconds()))
).Record(time.Duration(now.Sub(lastUpdatedTime).Milliseconds()))
} else {
ctx.Warnf("%s/%s/%s has last updated time %s into the future. now=%s, lastupdatedTime=%s",
u.GetKind(), u.GetNamespace(), u.GetName(), time.Until(*lastUpdatedTime), start, *lastUpdatedTime)
u.GetKind(), u.GetNamespace(), u.GetName(), lastUpdatedTime.Sub(now), now, lastUpdatedTime)
}

ctx.Counter("kubernetes_informer_events",
"type", "update",
"kind", u.GetKind(),
"scraper_id", ctx.ScraperID(),
"valid_timestamp", lo.Ternary(lastUpdatedInPast, "true", "false"),
"valid_timestamp", lo.Ternary(!lastUpdatedInFuture, "true", "false"),
).Add(1)

queue.Enqueue(NewQueueItem(u, QueueItemOperationUpdate))
Expand Down

0 comments on commit c3905aa

Please sign in to comment.