Skip to content

Commit

Permalink
fix: metrics updates
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Dec 6, 2024
1 parent 218ff33 commit 588f5cc
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 31 deletions.
2 changes: 2 additions & 0 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,6 @@ var (
// DeletedReasonFromDeleteField is used when a deletion field (& reason)
// is picked up from the JSONPath expression provided in the scraper config.
DeletedReasonFromDeleteField ConfigDeleteReason = "FROM_DELETE_FIELD"

DeleteReasonEvent ConfigDeleteReason = "FROM_EVENT"
)
10 changes: 7 additions & 3 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/lib/pq"
"github.com/ohler55/ojg/oj"
"github.com/samber/lo"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

Expand Down Expand Up @@ -224,10 +223,15 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du
return ci, nil
}

func SoftDeleteConfigItems(ctx context.Context, ids ...string) (int, error) {
func SoftDeleteConfigItems(ctx context.Context, reason v1.ConfigDeleteReason, ids ...string) (int, error) {
tx := ctx.DB().
Model(&models.ConfigItem{}).
Where("id IN ?", ids).
Update("deleted_at", gorm.Expr("NOW()"))
UpdateColumns(
map[string]any{
"deleted_at": duty.Now(),
"delete_reason": reason,
},
)
return int(tx.RowsAffected), tx.Error
}
10 changes: 10 additions & 0 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func updateCI(ctx api.ScrapeContext, summary *v1.ScrapeSummary, result v1.Scrape
updates := make(map[string]interface{})
changes := make([]*models.ConfigChange, 0)

isDeleted := existing.DeletedAt == nil && ci.DeletedAt != nil

if lo.FromPtr(ci.DeletedAt) != lo.FromPtr(existing.DeletedAt) {
updates["deleted_at"] = ci.DeletedAt
updates["delete_reason"] = ci.DeleteReason
Expand Down Expand Up @@ -233,6 +235,14 @@ func updateCI(ctx api.ScrapeContext, summary *v1.ScrapeSummary, result v1.Scrape
return false, nil, errors.Wrapf(dutydb.ErrorDetails(err), "unable to update config item: %s", ci)
}

if isDeleted {
ctx.Counter("scraper_deleted",
"scraper_id", ctx.ScraperID(),
"kind", ci.Type,
"reason", string(ci.DeleteReason),
).Add(1)
}

return true, changes, nil
}

Expand Down
31 changes: 22 additions & 9 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/flanksource/config-db/scrapers/kubernetes"
"github.com/flanksource/config-db/utils/kube"
"github.com/flanksource/duty/job"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q

var (
objs []*unstructured.Unstructured
deletedObjects []string
deletedObjects []*unstructured.Unstructured
queuedTime = map[string]time.Time{}

seenObjects = map[string]struct{}{}
Expand All @@ -77,7 +78,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
queuedTime[string(obj.GetUID())] = queueItem.Timestamp

if queueItem.Operation == kubernetes.QueueItemOperationDelete {
deletedObjects = append(deletedObjects, string(obj.GetUID()))
deletedObjects = append(deletedObjects, obj)
continue
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
}
}

func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured, deletedResourcesIDs []string) error {
func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs, deletedResources []*unstructured.Unstructured) error {
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := processObjects(cc, config, objs)
Expand All @@ -174,17 +175,29 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v
}
}

if len(deletedResourcesIDs) > 0 {
total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...)
if len(deletedResources) > 0 {
deletedResourceIDs := lo.Map(deletedResources, func(item *unstructured.Unstructured, _ int) string {
return string(item.GetUID())
})

total, err := db.SoftDeleteConfigItems(ctx.Context, v1.DeleteReasonEvent, deletedResourceIDs...)
if err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
} else if total != len(deletedResourcesIDs) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs))
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResources), err)
} else if total != len(deletedResources) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourceIDs))
if cc.PropertyOn(false, "log.missing") {
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total)
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResources), total)
}
}

for _, c := range deletedResources {
ctx.Counter("scraper_deleted",
"scraper_id", cc.ScraperID(),
"kind", kubernetes.GetConfigType(c),
"reason", string(v1.DeleteReasonEvent),
).Add(1)
}

ctx.History.SuccessCount += total
}

Expand Down
13 changes: 7 additions & 6 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ const ConfigTypePrefix = "Kubernetes::"

var resourceIDMapPerCluster PerClusterResourceIDMap

func getConfigTypePrefix(apiVersion string) string {
func GetConfigType(obj *unstructured.Unstructured) string {
apiVersion := obj.GetAPIVersion()
if strings.Contains(apiVersion, ".upbound.io") || strings.Contains(apiVersion, ".crossplane.io") {
return "Crossplane::"
return "Crossplane::" + obj.GetKind()
}

if strings.HasSuffix(apiVersion, ".flanksource.com/v1") {
return api.MissionControlConfigTypePrefix
return api.MissionControlConfigTypePrefix + obj.GetKind()
}

return ConfigTypePrefix
return ConfigTypePrefix + obj.GetKind()
}

type KubernetesScraper struct{}
Expand Down Expand Up @@ -350,7 +351,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
ConfigID: id.String(),
}.WithRelated(
ctx.GetID(obj.GetNamespace(), obj.GetKind(), obj.GetName()),
v1.ExternalID{ExternalID: string(obj.GetUID()), ConfigType: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind()},
v1.ExternalID{ExternalID: string(obj.GetUID()), ConfigType: GetConfigType(obj)},
)

relationships = append(relationships, rel)
Expand Down Expand Up @@ -435,7 +436,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
BaseScraper: ctx.config.BaseScraper,
Name: obj.GetName(),
ConfigClass: obj.GetKind(),
Type: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind(),
Type: GetConfigType(obj),
Status: string(resourceHealth.Status),
Health: models.Health(resourceHealth.Health),
Ready: resourceHealth.Ready,
Expand Down
31 changes: 19 additions & 12 deletions scrapers/stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flanksource/commons/duration"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty/context"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -40,22 +41,28 @@ func DeleteStaleConfigItems(ctx context.Context, staleTimeout string, scraperID
}

deleteQuery := `
UPDATE config_items
SET
deleted_at = NOW(),
delete_reason = ?
WHERE
((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND
deleted_at IS NULL AND
scraper_id = ?`

result := ctx.DB().Exec(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID)
UPDATE config_items
SET
deleted_at = NOW(),
delete_reason = ?
WHERE
((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND
deleted_at IS NULL AND
scraper_id = ?
RETURNING type`

var deletedConfigs []models.ConfigItem
result := ctx.DB().Raw(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID).Scan(&deletedConfigs)
if err := result.Error; err != nil {
return 0, fmt.Errorf("failed to delete stale config items: %w", err)
}

if result.RowsAffected > 0 {
ctx.Logger.V(3).Infof("Deleted %d stale config items", result.RowsAffected)
if len(deletedConfigs) > 0 {
ctx.Logger.V(3).Infof("deleted %d stale config items for scraper: %s", len(deletedConfigs), scraperID)
}

for _, c := range deletedConfigs {
ctx.Counter("scraper_deleted", "scraper_id", scraperID.String(), "kind", c.Type, "reason", string(v1.DeletedReasonStale)).Add(1)
}

return result.RowsAffected, nil
Expand Down
2 changes: 1 addition & 1 deletion utils/kube/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
v1 "github.com/flanksource/config-db/api/v1"
)

var fetchDelayBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 20_000, 30_000, 60_000}
var fetchDelayBuckets = []float64{10, 50, 100, 500, 1_000, 5_000, 10_000, 30_000, 60_000}

func FetchInvolvedObjects(ctx api.ScrapeContext, iObjs []v1.InvolvedObject) ([]*unstructured.Unstructured, error) {
clientMap := map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface{}
Expand Down

0 comments on commit 588f5cc

Please sign in to comment.