From 588f5cc85a34112c8b51fe25c708a9661b2d2a07 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 6 Dec 2024 09:32:26 +0545 Subject: [PATCH] fix: metrics updates --- api/v1/types.go | 2 ++ db/config.go | 10 +++++++--- db/update.go | 10 ++++++++++ scrapers/incremental.go | 31 ++++++++++++++++++++++--------- scrapers/kubernetes/kubernetes.go | 13 +++++++------ scrapers/stale.go | 31 +++++++++++++++++++------------ utils/kube/fetcher.go | 2 +- 7 files changed, 68 insertions(+), 31 deletions(-) diff --git a/api/v1/types.go b/api/v1/types.go index 82ee08c1..281c93eb 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -212,4 +212,6 @@ var ( // DeletedReasonFromDeleteField is used when a deletion field (& reason) // is picked up from the JSONPath expression provided in the scraper config. DeletedReasonFromDeleteField ConfigDeleteReason = "FROM_DELETE_FIELD" + + DeleteReasonEvent ConfigDeleteReason = "FROM_EVENT" ) diff --git a/db/config.go b/db/config.go index 25d52ed6..a003bd7e 100644 --- a/db/config.go +++ b/db/config.go @@ -21,7 +21,6 @@ import ( "github.com/lib/pq" "github.com/ohler55/ojg/oj" "github.com/samber/lo" - "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -224,10 +223,15 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids ...string) (int, error) { +func SoftDeleteConfigItems(ctx context.Context, reason v1.ConfigDeleteReason, ids ...string) (int, error) { tx := ctx.DB(). Model(&models.ConfigItem{}). Where("id IN ?", ids). - Update("deleted_at", gorm.Expr("NOW()")) + UpdateColumns( + map[string]any{ + "deleted_at": duty.Now(), + "delete_reason": reason, + }, + ) return int(tx.RowsAffected), tx.Error } diff --git a/db/update.go b/db/update.go index f6124797..47da5afb 100644 --- a/db/update.go +++ b/db/update.go @@ -125,6 +125,8 @@ func updateCI(ctx api.ScrapeContext, summary *v1.ScrapeSummary, result v1.Scrape updates := make(map[string]interface{}) changes := make([]*models.ConfigChange, 0) + isDeleted := existing.DeletedAt == nil && ci.DeletedAt != nil + if lo.FromPtr(ci.DeletedAt) != lo.FromPtr(existing.DeletedAt) { updates["deleted_at"] = ci.DeletedAt updates["delete_reason"] = ci.DeleteReason @@ -233,6 +235,14 @@ func updateCI(ctx api.ScrapeContext, summary *v1.ScrapeSummary, result v1.Scrape return false, nil, errors.Wrapf(dutydb.ErrorDetails(err), "unable to update config item: %s", ci) } + if isDeleted { + ctx.Counter("scraper_deleted", + "scraper_id", ctx.ScraperID(), + "kind", ci.Type, + "reason", string(ci.DeleteReason), + ).Add(1) + } + return true, changes, nil } diff --git a/scrapers/incremental.go b/scrapers/incremental.go index bf42dc3c..cf99d673 100644 --- a/scrapers/incremental.go +++ b/scrapers/incremental.go @@ -15,6 +15,7 @@ import ( "github.com/flanksource/config-db/scrapers/kubernetes" "github.com/flanksource/config-db/utils/kube" "github.com/flanksource/duty/job" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -50,7 +51,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q var ( objs []*unstructured.Unstructured - deletedObjects []string + deletedObjects []*unstructured.Unstructured queuedTime = map[string]time.Time{} seenObjects = map[string]struct{}{} @@ -77,7 +78,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q queuedTime[string(obj.GetUID())] = queueItem.Timestamp if queueItem.Operation == kubernetes.QueueItemOperationDelete { - deletedObjects = append(deletedObjects, string(obj.GetUID())) + deletedObjects = append(deletedObjects, obj) continue } @@ -152,7 +153,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q } } -func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured, deletedResourcesIDs []string) error { +func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs, deletedResources []*unstructured.Unstructured) error { cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) results, err := processObjects(cc, config, objs) @@ -174,17 +175,29 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v } } - if len(deletedResourcesIDs) > 0 { - total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...) + if len(deletedResources) > 0 { + deletedResourceIDs := lo.Map(deletedResources, func(item *unstructured.Unstructured, _ int) string { + return string(item.GetUID()) + }) + + total, err := db.SoftDeleteConfigItems(ctx.Context, v1.DeleteReasonEvent, deletedResourceIDs...) if err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } else if total != len(deletedResourcesIDs) { - ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs)) + return fmt.Errorf("failed to delete %d resources: %w", len(deletedResources), err) + } else if total != len(deletedResources) { + ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourceIDs)) if cc.PropertyOn(false, "log.missing") { - ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total) + ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResources), total) } } + for _, c := range deletedResources { + ctx.Counter("scraper_deleted", + "scraper_id", cc.ScraperID(), + "kind", kubernetes.GetConfigType(c), + "reason", string(v1.DeleteReasonEvent), + ).Add(1) + } + ctx.History.SuccessCount += total } diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index e001254a..19664850 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -30,16 +30,17 @@ const ConfigTypePrefix = "Kubernetes::" var resourceIDMapPerCluster PerClusterResourceIDMap -func getConfigTypePrefix(apiVersion string) string { +func GetConfigType(obj *unstructured.Unstructured) string { + apiVersion := obj.GetAPIVersion() if strings.Contains(apiVersion, ".upbound.io") || strings.Contains(apiVersion, ".crossplane.io") { - return "Crossplane::" + return "Crossplane::" + obj.GetKind() } if strings.HasSuffix(apiVersion, ".flanksource.com/v1") { - return api.MissionControlConfigTypePrefix + return api.MissionControlConfigTypePrefix + obj.GetKind() } - return ConfigTypePrefix + return ConfigTypePrefix + obj.GetKind() } type KubernetesScraper struct{} @@ -350,7 +351,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v ConfigID: id.String(), }.WithRelated( ctx.GetID(obj.GetNamespace(), obj.GetKind(), obj.GetName()), - v1.ExternalID{ExternalID: string(obj.GetUID()), ConfigType: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind()}, + v1.ExternalID{ExternalID: string(obj.GetUID()), ConfigType: GetConfigType(obj)}, ) relationships = append(relationships, rel) @@ -435,7 +436,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v BaseScraper: ctx.config.BaseScraper, Name: obj.GetName(), ConfigClass: obj.GetKind(), - Type: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind(), + Type: GetConfigType(obj), Status: string(resourceHealth.Status), Health: models.Health(resourceHealth.Health), Ready: resourceHealth.Ready, diff --git a/scrapers/stale.go b/scrapers/stale.go index 36cc6696..db4744d3 100644 --- a/scrapers/stale.go +++ b/scrapers/stale.go @@ -6,6 +6,7 @@ import ( "github.com/flanksource/commons/duration" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db/models" "github.com/flanksource/duty/context" "github.com/google/uuid" ) @@ -40,22 +41,28 @@ func DeleteStaleConfigItems(ctx context.Context, staleTimeout string, scraperID } deleteQuery := ` - UPDATE config_items - SET - deleted_at = NOW(), - delete_reason = ? - WHERE - ((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND - deleted_at IS NULL AND - scraper_id = ?` - - result := ctx.DB().Exec(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID) + UPDATE config_items + SET + deleted_at = NOW(), + delete_reason = ? + WHERE + ((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND + deleted_at IS NULL AND + scraper_id = ? + RETURNING type` + + var deletedConfigs []models.ConfigItem + result := ctx.DB().Raw(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID).Scan(&deletedConfigs) if err := result.Error; err != nil { return 0, fmt.Errorf("failed to delete stale config items: %w", err) } - if result.RowsAffected > 0 { - ctx.Logger.V(3).Infof("Deleted %d stale config items", result.RowsAffected) + if len(deletedConfigs) > 0 { + ctx.Logger.V(3).Infof("deleted %d stale config items for scraper: %s", len(deletedConfigs), scraperID) + } + + for _, c := range deletedConfigs { + ctx.Counter("scraper_deleted", "scraper_id", scraperID.String(), "kind", c.Type, "reason", string(v1.DeletedReasonStale)).Add(1) } return result.RowsAffected, nil diff --git a/utils/kube/fetcher.go b/utils/kube/fetcher.go index 624d5038..49d53392 100644 --- a/utils/kube/fetcher.go +++ b/utils/kube/fetcher.go @@ -16,7 +16,7 @@ import ( v1 "github.com/flanksource/config-db/api/v1" ) -var fetchDelayBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 20_000, 30_000, 60_000} +var fetchDelayBuckets = []float64{10, 50, 100, 500, 1_000, 5_000, 10_000, 30_000, 60_000} func FetchInvolvedObjects(ctx api.ScrapeContext, iObjs []v1.InvolvedObject) ([]*unstructured.Unstructured, error) { clientMap := map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface{}