Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change retention #324

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ var AllScraperConfigs = map[string]any{
"trivy": Trivy{},
}

type ChangeRetentionSpec struct {
Name string `json:"name,omitempty"`
Age string `json:"age,omitempty"`
Count int `json:"count,omitempty"`
}

type RetentionSpec struct {
Changes []ChangeRetentionSpec `json:"changes,omitempty"`
}

// ScraperSpec defines the desired state of Config scraper
type ScraperSpec struct {
LogLevel string `json:"logLevel,omitempty"`
Expand All @@ -35,6 +45,7 @@ type ScraperSpec struct {
Azure []Azure `json:"azure,omitempty" yaml:"azure,omitempty"`
SQL []SQL `json:"sql,omitempty" yaml:"sql,omitempty"`
Trivy []Trivy `json:"trivy,omitempty" yaml:"trivy,omitempty"`
Retention RetentionSpec `json:"retention,omitempty"`

// Full flag when set will try to extract out changes from the scraped config.
Full bool `json:"full,omitempty"`
Expand Down
36 changes: 36 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions chart/crds/configs.flanksource.com_scrapeconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,20 @@ spec:
type: array
logLevel:
type: string
retention:
properties:
changes:
items:
properties:
age:
type: string
count:
type: integer
name:
type: string
type: object
type: array
type: object
schedule:
type: string
sql:
Expand Down
2 changes: 1 addition & 1 deletion config/schemas/scrape_config.schema.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ScheduleJobs() {
scheduleFunc("@every 24h", DeleteOldConfigChanges)
scheduleFunc("@every 24h", DeleteOldConfigAnalysis)
scheduleFunc("@every 24h", CleanupConfigItems)
scheduleFunc("@every 1h", ProcessChangeRetentionRules)

if api.UpstreamConfig.Valid() {
pullJob := &UpstreamPullJob{}
Expand Down
48 changes: 48 additions & 0 deletions jobs/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jobs

import (
gocontext "context"
"encoding/json"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/scrapers"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
)

func ProcessChangeRetentionRules() {
ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool)
jobHistory := models.NewJobHistory("ProcessChangeRetentionRules", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() {
_ = db.PersistJobHistory(jobHistory.End())
}()

var activeScrapers []models.ConfigScraper
if err := ctx.DB().Where("deleted_at IS NULL").Find(&activeScrapers).Error; err != nil {
logger.Errorf("Error querying config scrapers from db: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, s := range activeScrapers {
var spec v1.ScraperSpec
if err := json.Unmarshal([]byte(s.Spec), &spec); err != nil {
logger.Errorf("Error unmarshaling config scraper[%s] into json: %v", s.ID, err)
jobHistory.AddError(err.Error())
continue
}

for _, changeSpec := range spec.Retention.Changes {
err := scrapers.ProcessChangeRetention(ctx, s.ID, changeSpec)
if err != nil {
logger.Errorf("Error processing change retention for scraper[%s] config analysis: %v", s.ID, err)
jobHistory.AddError(err.Error())
} else {
jobHistory.IncrSuccess()
}
}
}
}
66 changes: 66 additions & 0 deletions scrapers/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package scrapers

import (
"database/sql"
"fmt"
"strings"

"github.com/flanksource/commons/duration"
"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/context"
"github.com/google/uuid"
)

func ProcessChangeRetention(ctx context.Context, scraperID uuid.UUID, spec v1.ChangeRetentionSpec) error {
var whereClauses []string

var ageMinutes int
if spec.Age != "" {
age, err := duration.ParseDuration(spec.Age)
if err != nil {
return fmt.Errorf("error parsing age %s as duration: %w", spec.Age, err)
}
ageMinutes = int(age.Minutes())

whereClauses = append(whereClauses, `((now()- created_at) > interval '1 minute' * @ageMinutes)`)
}

if spec.Count > 0 {
whereClauses = append(whereClauses, `seq > @count`)
}

if len(whereClauses) == 0 {
return fmt.Errorf("both age and count cannot be empty")
}

query := fmt.Sprintf(`
WITH latest_config_changes AS (
SELECT id, change_type, created_at, ROW_NUMBER() OVER(ORDER BY created_at DESC) AS seq
FROM config_changes
WHERE
change_type = @changeType AND
config_id IN (SELECT id FROM config_items WHERE scraper_id = @scraperID)
)
DELETE FROM config_changes
WHERE id IN (
SELECT id from latest_config_changes
WHERE %s
)
`, strings.Join(whereClauses, " OR "))

result := ctx.DB().Exec(query,
sql.Named("changeType", spec.Name),
sql.Named("scraperID", scraperID),
sql.Named("ageMinutes", ageMinutes),
sql.Named("count", spec.Count),
)
if err := result.Error; err != nil {
return fmt.Errorf("error retaining config changes: %w", err)
}

if result.RowsAffected > 0 {
logger.Infof("Deleted %d config_changes as per ChangeRetentionSpec[%s]", result.RowsAffected, spec.Name)
}
return nil
}
8 changes: 4 additions & 4 deletions scrapers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func RunScraper(ctx api.ScrapeContext) (v1.ScrapeResults, error) {
return nil, fmt.Errorf("failed to update db: %w", dbErr)
}

// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
persistedID := ctx.ScrapeConfig().GetPersistedID()
if persistedID != nil {
// If error in any of the scrape results, don't delete old items
if len(results) > 0 && !v1.ScrapeResults(results).HasErr() {
if err := DeleteStaleConfigItems(ctx.DutyContext(), *persistedID); err != nil {
return nil, fmt.Errorf("error deleting stale config items: %w", err)
}
Expand Down
96 changes: 96 additions & 0 deletions scrapers/runscrapers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
dutymodels "github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -218,6 +222,98 @@ var _ = Describe("Scrapers test", Ordered, func() {

Expect(configItem, storedConfigItem)
})

It("should retain config changes as per the spec", func() {
dummyScraper := dutymodels.ConfigScraper{
Name: "Test",
Spec: `{"foo":"bar"}`,
Source: dutymodels.SourceConfigFile,
}
err := db.DefaultDB().Create(&dummyScraper).Error
Expect(err).To(BeNil())

configItemID := uuid.New().String()
dummyCI := models.ConfigItem{
ID: configItemID,
ConfigClass: "Test",
ScraperID: &dummyScraper.ID,
}
err = db.DefaultDB().Create(&dummyCI).Error
Expect(err).To(BeNil())

twoDaysAgo := time.Now().Add(-2 * 24 * time.Hour)
fiveDaysAgo := time.Now().Add(-5 * 24 * time.Hour)
tenDaysAgo := time.Now().Add(-10 * 24 * time.Hour)
configChanges := []models.ConfigChange{
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &twoDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &fiveDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &tenDaysAgo, ExternalChangeId: uuid.New().String()},
{ConfigID: configItemID, ChangeType: "TestDiff", CreatedAt: &tenDaysAgo, ExternalChangeId: uuid.New().String()},
}

err = db.DefaultDB().Table("config_changes").Create(&configChanges).Error
Expect(err).To(BeNil())

var currentCount int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&currentCount).
Error
Expect(err).To(BeNil())
Expect(currentCount).To(Equal(len(configChanges)))

ctx := context.NewContext(gocontext.Background()).WithDB(db.DefaultDB(), db.Pool)

// Everything older than 8 days should be removed
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Age: "8d"})
Expect(err).To(BeNil())
var count1 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count1).
Error
Expect(err).To(BeNil())
Expect(count1).To(Equal(15))

// Only keep latest 12 config changes
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Count: 12})
Expect(err).To(BeNil())
var count2 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count2).
Error
Expect(err).To(BeNil())
Expect(count2).To(Equal(12))

// Keep config changes which are newer than 3 days and max count can be 10
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff", Age: "3d", Count: 10})
Expect(err).To(BeNil())
var count3 int
err = db.DefaultDB().
Raw(`SELECT COUNT(*) FROM config_changes WHERE change_type = ? AND config_id = ?`, "TestDiff", configItemID).
Scan(&count3).
Error
Expect(err).To(BeNil())
Expect(count3).To(Equal(9))

// No params in ChangeRetentionSpec should fail
err = ProcessChangeRetention(ctx, dummyScraper.ID, v1.ChangeRetentionSpec{Name: "TestDiff"})
Expect(err).ToNot(BeNil())
})
})
})

Expand Down
Loading