Skip to content

Commit

Permalink
fix: improve consume lag
Browse files Browse the repository at this point in the history
* separate consume lag of involved objects
* process objects from shared informers immediately without fetching the
  involved objects. This way we see the actual consume lag without the
  delay caused by kubernetes get.
  • Loading branch information
adityathebe authored and moshloop committed Dec 6, 2024
1 parent 142a975 commit 64b17a6
Showing 1 changed file with 61 additions and 23 deletions.
84 changes: 61 additions & 23 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

var consumeLagBuckets = []float64{1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000}
var consumeLagBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000}

func consumeKubernetesWatchJobKey(id string) string {
return id + "-consume-kubernetes-watch"
Expand Down Expand Up @@ -82,7 +82,12 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
continue
}

seenObjects[string(obj.GetUID())] = struct{}{}
objs = append(objs, obj)

if obj.GetKind() == "Event" {
// For events, we want to re-scrape their involved objects as well.

involvedObjectRaw, ok, _ := unstructured.NestedMap(obj.Object, "involvedObject")
if !ok {
continue
Expand All @@ -97,30 +102,13 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
return fmt.Errorf("failed to unmarshal endpoint (%s/%s): %w", obj.GetUID(), obj.GetName(), err)
}

objectsFromEvents[string(involvedObject.UID)] = involvedObject
} else {
seenObjects[string(obj.GetUID())] = struct{}{}
}

objs = append(objs, obj)
}
// involved objects assume its event's queuedTime
queuedTime[string(involvedObject.UID)] = queueItem.Timestamp

// NOTE: Events whose involved objects aren't watched by informers, should be rescraped.
// If we trigger delayed re-scrape on addition of a config_change then this shouldn't be necessary.
var involvedObjectsToScrape []v1.InvolvedObject
for id, involvedObject := range objectsFromEvents {
if _, ok := seenObjects[id]; !ok {
involvedObjectsToScrape = append(involvedObjectsToScrape, involvedObject)
objectsFromEvents[string(involvedObject.UID)] = involvedObject
}
}

if res, err := kube.FetchInvolvedObjects(sc, involvedObjectsToScrape); err != nil {
ctx.History.AddErrorf("failed to fetch involved objects from events: %v", err)
return err
} else {
objs = append(objs, res...)
}

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// All those resource objects are seen as distinct new config items.
Expand All @@ -140,14 +128,33 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
for _, obj := range objs {
queuedtime, ok := queuedTime[string(obj.GetUID())]
if !ok {
continue // involved objects have 0 queuedtime as they never enter the queue
ctx.Warnf("found object (%s/%s/%s) with zero queuedTime", obj.GetNamespace(), obj.GetName(), obj.GetUID())
continue
}

lag := time.Since(queuedtime)
ctx.Histogram("informer_consume_lag", consumeLagBuckets, "scraper", sc.ScraperID(), "kind", obj.GetKind()).
ctx.Histogram("informer_consume_lag", consumeLagBuckets,
"scraper", sc.ScraperID(),
"kind", obj.GetKind(),
"is_involved_object", "false",
).
Record(time.Duration(lag.Milliseconds()))
}

// NOTE: Events whose involved objects aren't watched by informers, should be rescraped.
// If we trigger delayed re-scrape on addition of a config_change then this shouldn't be necessary.
var involvedObjectsToScrape []v1.InvolvedObject
for id, involvedObject := range objectsFromEvents {
if _, ok := seenObjects[id]; !ok {
involvedObjectsToScrape = append(involvedObjectsToScrape, involvedObject)
}
}

if err := consumeInvolvedObjects(ctx, sc, *config, queuedTime, involvedObjectsToScrape); err != nil {
ctx.History.AddErrorf("failed to consume involved objects: %v", err)
return err
}

return nil
},
}
Expand Down Expand Up @@ -204,6 +211,37 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v
return nil
}

func consumeInvolvedObjects(ctx job.JobRuntime, sc api.ScrapeContext, config v1.Kubernetes, queuedTime map[string]time.Time, involvedObjectsToScrape []v1.InvolvedObject) error {
objs, err := kube.FetchInvolvedObjects(sc, involvedObjectsToScrape)
if err != nil {
ctx.History.AddErrorf("failed to fetch involved objects from events: %v", err)
return err
}

if err := consumeResources(ctx, *sc.ScrapeConfig(), config, objs, nil); err != nil {
ctx.History.AddErrorf("failed to consume resources: %v", err)
return err
}

for _, obj := range objs {
queuedtime, ok := queuedTime[string(obj.GetUID())]
if !ok {
ctx.Warnf("found involved object (%s/%s/%s) with zero queuedTime", obj.GetNamespace(), obj.GetName(), obj.GetUID())
continue
}

lag := time.Since(queuedtime)
ctx.Histogram("informer_consume_lag", consumeLagBuckets,
"scraper", sc.ScraperID(),
"kind", obj.GetKind(),
"is_involved_object", "true",
).
Record(time.Duration(lag.Milliseconds()))
}

return nil
}

// processObjects runs the given fully populated objects through the kubernetes scraper.
func processObjects(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured) ([]v1.ScrapeResult, error) {
var results v1.ScrapeResults
Expand Down

0 comments on commit 64b17a6

Please sign in to comment.