diff --git a/scrapers/incremental.go b/scrapers/incremental.go index 6903f19b..78fa2cb7 100644 --- a/scrapers/incremental.go +++ b/scrapers/incremental.go @@ -1,6 +1,7 @@ package scrapers import ( + gocontext "context" "fmt" "time" @@ -16,10 +17,14 @@ import ( "github.com/flanksource/config-db/utils/kube" "github.com/flanksource/duty/job" "github.com/samber/lo" + "github.com/sethvargo/go-retry" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) -var consumeLagBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000} +var ( + consumeLagBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000} + involvedObjectsFetchBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 15_000, 30_000, 60_000, 100_000, 150_000, 300_000, 600_000} +) func consumeKubernetesWatchJobKey(id string) string { return id + "-consume-kubernetes-watch" @@ -55,7 +60,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q queuedTime = map[string]time.Time{} seenObjects = map[string]struct{}{} - objectsFromEvents = map[string]v1.InvolvedObject{} + objectsFromEvents []v1.InvolvedObject ) for { @@ -87,7 +92,6 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q if obj.GetKind() == "Event" { // For events, we want to re-scrape their involved objects as well. - involvedObjectRaw, ok, _ := unstructured.NestedMap(obj.Object, "involvedObject") if !ok { continue @@ -102,13 +106,42 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q return fmt.Errorf("failed to unmarshal endpoint (%s/%s): %w", obj.GetUID(), obj.GetName(), err) } - // involved objects assume its event's queuedTime - queuedTime[string(involvedObject.UID)] = queueItem.Timestamp - - objectsFromEvents[string(involvedObject.UID)] = involvedObject + if _, ok := seenObjects[string(involvedObject.UID)]; !ok { + objectsFromEvents = append(objectsFromEvents, involvedObject) + seenObjects[string(involvedObject.UID)] = struct{}{} + } } } + if len(objectsFromEvents) > 0 { + go func() { + start := time.Now() + + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(sc.ScrapeConfig()).WithJobHistory(ctx.History).AsIncrementalScrape() + cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) + + backoff := retry.WithMaxRetries(3, retry.NewExponential(time.Second)) + err := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error { + objs, err := kube.FetchInvolvedObjects(cc, objectsFromEvents) + if err != nil { + return retry.RetryableError(err) + } + + // we put these involved objects back into the queue + for _, obj := range objs { + queue.Enqueue(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue)) + ctx.Histogram("involved_objects_enqueue", involvedObjectsFetchBuckets, "scraper_id", cc.ScraperID()). + Record(time.Duration(time.Since(start).Milliseconds())) + } + + return nil + }) + if err != nil { + ctx.History.AddErrorf("failed to get invovled objects: %v", err) + } + }() + } + // NOTE: The resource watcher can return multiple objects for the same NEW resource. // Example: if a new pod is created, we'll get that pod object multiple times for different events. // All those resource objects are seen as distinct new config items. @@ -136,23 +169,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q ctx.Histogram("informer_consume_lag", consumeLagBuckets, "scraper", sc.ScraperID(), "kind", obj.GetKind(), - "is_involved_object", "false", - ). - Record(time.Duration(lag.Milliseconds())) - } - - // NOTE: Events whose involved objects aren't watched by informers, should be rescraped. - // If we trigger delayed re-scrape on addition of a config_change then this shouldn't be necessary. - var involvedObjectsToScrape []v1.InvolvedObject - for id, involvedObject := range objectsFromEvents { - if _, ok := seenObjects[id]; !ok { - involvedObjectsToScrape = append(involvedObjectsToScrape, involvedObject) - } - } - - if err := consumeInvolvedObjects(ctx, sc, *config, queuedTime, involvedObjectsToScrape); err != nil { - ctx.History.AddErrorf("failed to consume involved objects: %v", err) - return err + ).Record(time.Duration(lag.Milliseconds())) } return nil @@ -211,37 +228,6 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v return nil } -func consumeInvolvedObjects(ctx job.JobRuntime, sc api.ScrapeContext, config v1.Kubernetes, queuedTime map[string]time.Time, involvedObjectsToScrape []v1.InvolvedObject) error { - objs, err := kube.FetchInvolvedObjects(sc, involvedObjectsToScrape) - if err != nil { - ctx.History.AddErrorf("failed to fetch involved objects from events: %v", err) - return err - } - - if err := consumeResources(ctx, *sc.ScrapeConfig(), config, objs, nil); err != nil { - ctx.History.AddErrorf("failed to consume resources: %v", err) - return err - } - - for _, obj := range objs { - queuedtime, ok := queuedTime[string(obj.GetUID())] - if !ok { - ctx.Warnf("found involved object (%s/%s/%s) with zero queuedTime", obj.GetNamespace(), obj.GetName(), obj.GetUID()) - continue - } - - lag := time.Since(queuedtime) - ctx.Histogram("informer_consume_lag", consumeLagBuckets, - "scraper", sc.ScraperID(), - "kind", obj.GetKind(), - "is_involved_object", "true", - ). - Record(time.Duration(lag.Milliseconds())) - } - - return nil -} - // processObjects runs the given fully populated objects through the kubernetes scraper. func processObjects(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured) ([]v1.ScrapeResult, error) { var results v1.ScrapeResults diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 11eb81cd..bf1e761f 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -349,14 +349,16 @@ const ( QueueItemOperationAdd QueueItemOperation = iota + 1 QueueItemOperationUpdate QueueItemOperationDelete + QueueItemOperationReEnqueue // Involved objects from events are re-enqueued ) func (t *QueueItemOperation) Priority() int { // smaller value represents higher priority priority := map[QueueItemOperation]int{ - QueueItemOperationAdd: 1, - QueueItemOperationUpdate: 2, - QueueItemOperationDelete: 3, + QueueItemOperationAdd: 1, + QueueItemOperationReEnqueue: 1, + QueueItemOperationUpdate: 2, + QueueItemOperationDelete: 3, } return priority[*t]