Skip to content

Commit

Permalink
feat: add involved objects back into the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Dec 9, 2024
1 parent 18b03b6 commit fa784fc
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 58 deletions.
96 changes: 41 additions & 55 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scrapers

import (
gocontext "context"
"fmt"
"time"

Expand All @@ -16,10 +17,14 @@ import (
"github.com/flanksource/config-db/utils/kube"
"github.com/flanksource/duty/job"
"github.com/samber/lo"
"github.com/sethvargo/go-retry"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

var consumeLagBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000}
var (
consumeLagBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000}
involvedObjectsFetchBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000}
)

func consumeKubernetesWatchJobKey(id string) string {
return id + "-consume-kubernetes-watch"
Expand Down Expand Up @@ -55,7 +60,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
queuedTime = map[string]time.Time{}

seenObjects = map[string]struct{}{}
objectsFromEvents = map[string]v1.InvolvedObject{}
objectsFromEvents []v1.InvolvedObject
)

for {
Expand Down Expand Up @@ -87,7 +92,6 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q

if obj.GetKind() == "Event" {
// For events, we want to re-scrape their involved objects as well.

involvedObjectRaw, ok, _ := unstructured.NestedMap(obj.Object, "involvedObject")
if !ok {
continue
Expand All @@ -102,13 +106,42 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
return fmt.Errorf("failed to unmarshal endpoint (%s/%s): %w", obj.GetUID(), obj.GetName(), err)
}

// involved objects assume its event's queuedTime
queuedTime[string(involvedObject.UID)] = queueItem.Timestamp

objectsFromEvents[string(involvedObject.UID)] = involvedObject
if _, ok := seenObjects[string(involvedObject.UID)]; !ok {
objectsFromEvents = append(objectsFromEvents, involvedObject)
seenObjects[string(involvedObject.UID)] = struct{}{}
}
}
}

if len(objectsFromEvents) > 0 {
go func() {
start := time.Now()

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(sc.ScrapeConfig()).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))

backoff := retry.WithMaxRetries(3, retry.NewExponential(time.Second))
err := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
objs, err := kube.FetchInvolvedObjects(cc, objectsFromEvents)
if err != nil {
return retry.RetryableError(err)
}

// we put these involved objects back into the queue
for _, obj := range objs {
queue.Enqueue(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue))
ctx.Histogram("involved_objects_enqueue", involvedObjectsFetchBuckets, "scraper_id", cc.ScraperID()).
Record(time.Duration(time.Since(start).Milliseconds()))
}

return nil
})
if err != nil {
ctx.History.AddErrorf("failed to get invovled objects: %v", err)
}
}()
}

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// All those resource objects are seen as distinct new config items.
Expand Down Expand Up @@ -136,23 +169,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
ctx.Histogram("informer_consume_lag", consumeLagBuckets,
"scraper", sc.ScraperID(),
"kind", obj.GetKind(),
"is_involved_object", "false",
).
Record(time.Duration(lag.Milliseconds()))
}

// NOTE: Events whose involved objects aren't watched by informers, should be rescraped.
// If we trigger delayed re-scrape on addition of a config_change then this shouldn't be necessary.
var involvedObjectsToScrape []v1.InvolvedObject
for id, involvedObject := range objectsFromEvents {
if _, ok := seenObjects[id]; !ok {
involvedObjectsToScrape = append(involvedObjectsToScrape, involvedObject)
}
}

if err := consumeInvolvedObjects(ctx, sc, *config, queuedTime, involvedObjectsToScrape); err != nil {
ctx.History.AddErrorf("failed to consume involved objects: %v", err)
return err
).Record(time.Duration(lag.Milliseconds()))
}

return nil
Expand Down Expand Up @@ -211,37 +228,6 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v
return nil
}

func consumeInvolvedObjects(ctx job.JobRuntime, sc api.ScrapeContext, config v1.Kubernetes, queuedTime map[string]time.Time, involvedObjectsToScrape []v1.InvolvedObject) error {
objs, err := kube.FetchInvolvedObjects(sc, involvedObjectsToScrape)
if err != nil {
ctx.History.AddErrorf("failed to fetch involved objects from events: %v", err)
return err
}

if err := consumeResources(ctx, *sc.ScrapeConfig(), config, objs, nil); err != nil {
ctx.History.AddErrorf("failed to consume resources: %v", err)
return err
}

for _, obj := range objs {
queuedtime, ok := queuedTime[string(obj.GetUID())]
if !ok {
ctx.Warnf("found involved object (%s/%s/%s) with zero queuedTime", obj.GetNamespace(), obj.GetName(), obj.GetUID())
continue
}

lag := time.Since(queuedtime)
ctx.Histogram("informer_consume_lag", consumeLagBuckets,
"scraper", sc.ScraperID(),
"kind", obj.GetKind(),
"is_involved_object", "true",
).
Record(time.Duration(lag.Milliseconds()))
}

return nil
}

// processObjects runs the given fully populated objects through the kubernetes scraper.
func processObjects(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured) ([]v1.ScrapeResult, error) {
var results v1.ScrapeResults
Expand Down
8 changes: 5 additions & 3 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,16 @@ const (
QueueItemOperationAdd QueueItemOperation = iota + 1
QueueItemOperationUpdate
QueueItemOperationDelete
QueueItemOperationReEnqueue // Involved objects from events are re-enqueued
)

func (t *QueueItemOperation) Priority() int {
// smaller value represents higher priority
priority := map[QueueItemOperation]int{
QueueItemOperationAdd: 1,
QueueItemOperationUpdate: 2,
QueueItemOperationDelete: 3,
QueueItemOperationAdd: 1,
QueueItemOperationReEnqueue: 1,
QueueItemOperationUpdate: 2,
QueueItemOperationDelete: 3,
}

return priority[*t]
Expand Down

0 comments on commit fa784fc

Please sign in to comment.