From 78c7ec77e321de7a71f06affea3000e2d376a100 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 26 Sep 2023 18:30:23 +0545 Subject: [PATCH 01/12] feat: upstream sync [skip ci] --- api/global.go | 6 ++ cmd/root.go | 13 ++- cmd/server.go | 12 +-- jobs/jobs.go | 28 +++++- jobs/sync_upstream.go | 209 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 254 insertions(+), 14 deletions(-) create mode 100644 jobs/sync_upstream.go diff --git a/api/global.go b/api/global.go index 592bb6c3..40207a28 100644 --- a/api/global.go +++ b/api/global.go @@ -2,6 +2,8 @@ package api import ( v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/upstream" + "github.com/google/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -11,6 +13,10 @@ var ( KubernetesRestConfig *rest.Config Namespace string DefaultContext ScrapeContext + + // the derived agent id from the agentName + AgentID uuid.UUID + UpstreamConfig upstream.UpstreamConfig ) type Scraper interface { diff --git a/cmd/root.go b/cmd/root.go index 1dbd7fe8..9f0d8c0b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -10,16 +10,10 @@ import ( "github.com/flanksource/config-db/jobs" "github.com/flanksource/config-db/scrapers" "github.com/flanksource/config-db/utils/kube" - "github.com/google/uuid" "github.com/spf13/cobra" "github.com/spf13/pflag" ) -var ( - agentID = uuid.Nil // the derived agent id from the agentName - agentName string // name of the agent passed as a CLI arg -) - var dev bool var httpPort, metricsPort, devGuiPort int var disableKubernetes bool @@ -78,7 +72,12 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&scrapers.DefaultSchedule, "default-schedule", "@every 60m", "Default schedule for configs that don't specfiy one") flags.StringVar(&scrapers.StaleTimeout, "stale-timeout", "30m", "Delete config items not scraped within the timeout") flags.StringVar(&publicEndpoint, "public-endpoint", "http://localhost:8080", "Public endpoint that this instance is exposed under") - flags.StringVar(&agentName, "agent-name", "", "Name of the agent") + + // Flags for push/pull + flags.StringVar(&api.UpstreamConfig.Host, "upstream-host", "", "central mission control instance to sync scrape configs & their results") + flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", "", "upstream username") + flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", "", "upstream password") + flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", "", "name of this agent") } func init() { diff --git a/cmd/server.go b/cmd/server.go index 84b802ee..4d352fcf 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -55,14 +55,14 @@ func serve(configFiles []string) { e.GET("/query", query.Handler) e.POST("/run/:id", scrapers.RunNowHandler) - if agentName != "" { - agent, err := db.FindAgentByName(context.Background(), agentName) + if api.UpstreamConfig.AgentName != "" { + agent, err := db.FindAgentByName(context.Background(), api.UpstreamConfig.AgentName) if err != nil { - logger.Fatalf("error searching for agent (name=%s): %v", agentName, err) + logger.Fatalf("error searching for agent (name=%s): %v", api.UpstreamConfig.AgentName, err) } else if agent == nil { - logger.Fatalf("agent not found (name=%s)", agentName) + logger.Fatalf("agent not found (name=%s)", api.UpstreamConfig.AgentName) } else { - agentID = agent.ID + api.AgentID = agent.ID } } @@ -89,7 +89,7 @@ func startScraperCron(configFiles []string) { } } - scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(agentID) + scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(api.AgentID) if err != nil { logger.Fatalf("error getting configs from database: %v", err) } diff --git a/jobs/jobs.go b/jobs/jobs.go index 33a6cb42..c08a4359 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -3,17 +3,25 @@ package jobs import ( "reflect" "runtime" + "time" "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" "github.com/robfig/cron/v3" ) var FuncScheduler = cron.New() +const ( + PullConfigScrapersFromUpstreamSchedule = "@every 30s" + PushConfigResultsToUpstreamSchedule = "@every 10s" + ReconcileConfigsToUpstreamSchedule = "@every 3h" +) + func ScheduleJobs() { scheduleFunc := func(schedule string, fn func()) { if _, err := FuncScheduler.AddFunc(schedule, fn); err != nil { - logger.Errorf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) + logger.Fatalf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) } } @@ -21,5 +29,23 @@ func ScheduleJobs() { scheduleFunc("@every 24h", DeleteOldConfigAnalysis) scheduleFunc("@every 24h", CleanupConfigItems) + if api.UpstreamConfig.Valid() { + pullJob := &UpstreamPullJob{} + pullJob.Run() + + pushJob := &UpstreamPushJob{MaxAge: time.Minute * 5} + pushJob.Run() + + if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { + logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err) + } + + if _, err := FuncScheduler.AddJob(PushConfigResultsToUpstreamSchedule, pushJob); err != nil { + logger.Fatalf("Failed to schedule job [UpstreamPushJob]: %v", err) + } + + scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults) + } + FuncScheduler.Start() } diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go new file mode 100644 index 00000000..ae6b0aac --- /dev/null +++ b/jobs/sync_upstream.go @@ -0,0 +1,209 @@ +package jobs + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db" + "github.com/flanksource/duty/models" + "github.com/flanksource/duty/upstream" + "github.com/google/uuid" + "gorm.io/gorm/clause" +) + +var tablesToReconcile = []string{ + "config_items", + "config_changes", + "config_analysis", +} + +// ReconcileConfigScraperResults pushes missing scrape config results to the upstream server +func ReconcileConfigScraperResults() { + ctx := api.DefaultContext + + jobHistory := models.NewJobHistory("PushScraperConfigResultsToUpstream", "Config", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, 500) + for _, table := range tablesToReconcile { + if err := reconciler.Sync(ctx, table); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("failed to sync table %s: %v", table, err) + } else { + jobHistory.IncrSuccess() + } + } +} + +// UpstreamPullJob pulls scrape configs from the upstream server +type UpstreamPullJob struct { + lastFetchedID uuid.UUID +} + +func (t *UpstreamPullJob) Run() { + ctx := api.DefaultContext + + jobHistory := models.NewJobHistory("PullUpstreamScrapeConfigs", "Config", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + if err := t.pull(ctx, api.UpstreamConfig); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error pulling scrape configs from upstream: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { + logger.Tracef("pulling scrape configs from upstream since: %v", t.lastFetchedID) + + endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName) + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) + } + + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return fmt.Errorf("error creating new http request: %w", err) + } + + req.SetBasicAuth(config.Username, config.Password) + + params := url.Values{} + params.Add("since", t.lastFetchedID.String()) + req.URL.RawQuery = params.Encode() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + var scrapeConfigs []models.ConfigScraper + if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil { + return fmt.Errorf("error decoding JSON response: %w", err) + } + + if len(scrapeConfigs) == 0 { + return nil + } + + t.lastFetchedID = scrapeConfigs[len(scrapeConfigs)-1].ID + + logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs)) + + return ctx.DB().Omit("agent_id").Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(&scrapeConfigs).Error +} + +// UpstreamPushJob pushes scrape config results to the upstream server +type UpstreamPushJob struct { + lastConfigItemID uuid.UUID + lastAnalysisID uuid.UUID + lastChangeID uuid.UUID + + initiated bool + + // MaxAge defines how far back we look into the past on startup when + // lastRuntime is zero. + MaxAge time.Duration +} + +// init initializes the last pushed ids ... +func (t *UpstreamPushJob) init(ctx api.ScrapeContext) error { + if err := ctx.DB().Debug().Model(&models.ConfigItem{}).Select("id").Where("NOW() - updated_at <= ?", t.MaxAge).Scan(&t.lastConfigItemID).Error; err != nil { + return fmt.Errorf("error getting last config item id: %w", err) + } + + if err := ctx.DB().Debug().Model(&models.ConfigAnalysis{}).Select("id").Where("NOW() - first_observed <= ?", t.MaxAge).Scan(&t.lastAnalysisID).Error; err != nil { + return fmt.Errorf("error getting last analysis id: %w", err) + } + + if err := ctx.DB().Debug().Model(&models.ConfigChange{}).Select("id").Where("NOW() - created_at <= ?", t.MaxAge).Scan(&t.lastChangeID).Error; err != nil { + return fmt.Errorf("error getting last change id: %w", err) + } + + return nil +} + +func (t *UpstreamPushJob) Run() { + ctx := api.DefaultContext + + jobHistory := models.NewJobHistory("UpstreamPushJob", "Config", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + if !t.initiated { + logger.Debugf("initializing upstream push job") + if err := t.init(ctx); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error initializing upstream push job: %v", err) + return + } + + t.initiated = true + } + + if err := t.run(ctx); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error pushing to upstream: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { + logger.Tracef("running configs upstream push job") + + pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName} + if err := ctx.DB().Where("id > ?", t.lastConfigItemID).Find(&pushData.ConfigItems).Error; err != nil { + return err + } + + if err := ctx.DB().Where("id > ?", t.lastAnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { + return err + } + + if err := ctx.DB().Where("id > ?", t.lastChangeID).Find(&pushData.ConfigChanges).Error; err != nil { + return err + } + + logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) + if pushData.Count() == 0 { + return nil + } + + if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil { + return fmt.Errorf("error pushing to upstream: %w", err) + } + + if len(pushData.ConfigItems) > 0 { + t.lastConfigItemID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID + } + + if len(pushData.ConfigAnalysis) > 0 { + t.lastAnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID + } + + if len(pushData.ConfigChanges) > 0 { + id := pushData.ConfigChanges[len(pushData.ConfigChanges)-1].ID + parsed, err := uuid.Parse(id) + if err != nil { + return err + } + + t.lastChangeID = parsed + } + + return nil +} From f7034481999e307875bb3c1d1ae28d8ae5b9a48f Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 26 Sep 2023 21:15:08 +0545 Subject: [PATCH 02/12] feat: get the last pushed ids from the upstream server --- api/global.go | 3 --- cmd/server.go | 14 ++--------- jobs/jobs.go | 3 +-- jobs/sync_upstream.go | 58 ++++++++++++++++++++++++------------------- 4 files changed, 36 insertions(+), 42 deletions(-) diff --git a/api/global.go b/api/global.go index 40207a28..5493aad3 100644 --- a/api/global.go +++ b/api/global.go @@ -3,7 +3,6 @@ package api import ( v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/duty/upstream" - "github.com/google/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -14,8 +13,6 @@ var ( Namespace string DefaultContext ScrapeContext - // the derived agent id from the agentName - AgentID uuid.UUID UpstreamConfig upstream.UpstreamConfig ) diff --git a/cmd/server.go b/cmd/server.go index 4d352fcf..1bf6af90 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/jobs" "github.com/flanksource/config-db/query" + "github.com/google/uuid" "github.com/flanksource/config-db/scrapers" "github.com/labstack/echo/v4" @@ -55,17 +56,6 @@ func serve(configFiles []string) { e.GET("/query", query.Handler) e.POST("/run/:id", scrapers.RunNowHandler) - if api.UpstreamConfig.AgentName != "" { - agent, err := db.FindAgentByName(context.Background(), api.UpstreamConfig.AgentName) - if err != nil { - logger.Fatalf("error searching for agent (name=%s): %v", api.UpstreamConfig.AgentName, err) - } else if agent == nil { - logger.Fatalf("agent not found (name=%s)", api.UpstreamConfig.AgentName) - } else { - api.AgentID = agent.ID - } - } - go startScraperCron(configFiles) go jobs.ScheduleJobs() @@ -89,7 +79,7 @@ func startScraperCron(configFiles []string) { } } - scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(api.AgentID) + scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(uuid.Nil) if err != nil { logger.Fatalf("error getting configs from database: %v", err) } diff --git a/jobs/jobs.go b/jobs/jobs.go index c08a4359..d8ae2a7c 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -3,7 +3,6 @@ package jobs import ( "reflect" "runtime" - "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -33,7 +32,7 @@ func ScheduleJobs() { pullJob := &UpstreamPullJob{} pullJob.Run() - pushJob := &UpstreamPushJob{MaxAge: time.Minute * 5} + pushJob := &UpstreamPushJob{} pushJob.Run() if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index ae6b0aac..dc6f00cf 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/url" - "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -106,31 +105,42 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo }).Create(&scrapeConfigs).Error } +type LastPushedConfigResult struct { + ConfigID uuid.UUID + AnalysisID uuid.UUID + ChangeID uuid.UUID +} + // UpstreamPushJob pushes scrape config results to the upstream server type UpstreamPushJob struct { - lastConfigItemID uuid.UUID - lastAnalysisID uuid.UUID - lastChangeID uuid.UUID + status LastPushedConfigResult initiated bool - - // MaxAge defines how far back we look into the past on startup when - // lastRuntime is zero. - MaxAge time.Duration } // init initializes the last pushed ids ... -func (t *UpstreamPushJob) init(ctx api.ScrapeContext) error { - if err := ctx.DB().Debug().Model(&models.ConfigItem{}).Select("id").Where("NOW() - updated_at <= ?", t.MaxAge).Scan(&t.lastConfigItemID).Error; err != nil { - return fmt.Errorf("error getting last config item id: %w", err) +func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { + endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName) + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) } - if err := ctx.DB().Debug().Model(&models.ConfigAnalysis{}).Select("id").Where("NOW() - first_observed <= ?", t.MaxAge).Scan(&t.lastAnalysisID).Error; err != nil { - return fmt.Errorf("error getting last analysis id: %w", err) + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return fmt.Errorf("error creating new http request: %w", err) } - if err := ctx.DB().Debug().Model(&models.ConfigChange{}).Select("id").Where("NOW() - created_at <= ?", t.MaxAge).Scan(&t.lastChangeID).Error; err != nil { - return fmt.Errorf("error getting last change id: %w", err) + req.SetBasicAuth(config.Username, config.Password) + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil { + return fmt.Errorf("error decoding JSON response: %w", err) } return nil @@ -145,7 +155,7 @@ func (t *UpstreamPushJob) Run() { if !t.initiated { logger.Debugf("initializing upstream push job") - if err := t.init(ctx); err != nil { + if err := t.init(ctx, api.UpstreamConfig); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("error initializing upstream push job: %v", err) return @@ -163,36 +173,34 @@ func (t *UpstreamPushJob) Run() { } func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { - logger.Tracef("running configs upstream push job") - pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName} - if err := ctx.DB().Where("id > ?", t.lastConfigItemID).Find(&pushData.ConfigItems).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil { return err } - if err := ctx.DB().Where("id > ?", t.lastAnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { return err } - if err := ctx.DB().Where("id > ?", t.lastChangeID).Find(&pushData.ConfigChanges).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil { return err } - logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) if pushData.Count() == 0 { return nil } + logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil { return fmt.Errorf("error pushing to upstream: %w", err) } if len(pushData.ConfigItems) > 0 { - t.lastConfigItemID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID + t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID } if len(pushData.ConfigAnalysis) > 0 { - t.lastAnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID + t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID } if len(pushData.ConfigChanges) > 0 { @@ -202,7 +210,7 @@ func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { return err } - t.lastChangeID = parsed + t.status.ChangeID = parsed } return nil From 2e8c24ea6b659d938a5a2fe329cb522f1a3521eb Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 26 Sep 2023 21:24:51 +0545 Subject: [PATCH 03/12] chore: upgrade to Go version 1.20 in github workflows & Docker image --- .github/workflows/lint.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 2 +- Dockerfile | 12 +++--------- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 79e57ad5..fde02644 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -11,7 +11,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: 1.19.x + go-version: 1.20.x - name: golangci-lint uses: golangci/golangci-lint-action@08e2f20817b15149a52b5b3ebe7de50aff2ba8c5 # v3.4.0 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fed411eb..f30ca7d6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: v1.19.x + go-version: v1.20.x - uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2 with: path: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 14793cbe..264cefd0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: 1.19.x + go-version: 1.20.x - name: Checkout code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 - uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2 diff --git a/Dockerfile b/Dockerfile index f66da1df..94472951 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19@sha256:8cefba2710250b21a8b8e32281788c5b53dc561ba0c51ea7de92b9a350663b7d as builder +FROM golang:1.20@sha256:bc5f0b5e43282627279fe5262ae275fecb3d2eae3b33977a7fd200c7a760d6f1 as builder WORKDIR /app COPY ./ ./ @@ -10,17 +10,11 @@ WORKDIR /app RUN go version RUN make build -FROM ubuntu:bionic@sha256:14f1045816502e16fcbfc0b2a76747e9f5e40bc3899f8cfe20745abaafeaeab3 +FROM ubuntu:jammy@sha256:0bced47fffa3361afa981854fcabcd4577cd43cebbb808cea2b1f33a3dd7f508 WORKDIR /app -# install CA certificates -RUN apt-get update && \ - apt-get install -y ca-certificates && \ - rm -Rf /var/lib/apt/lists/* && \ - rm -Rf /usr/share/doc && rm -Rf /usr/share/man && \ - apt-get clean - COPY --from=builder /app/.bin/config-db /app +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ RUN /app/config-db go-offline From 3472057503d4314125e56aaa194ff1d0fe11483f Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 28 Sep 2023 11:10:27 +0545 Subject: [PATCH 04/12] chore: remove push job (will need to add queue consumer) [skip ci] --- cmd/root.go | 1 + jobs/jobs.go | 7 -- jobs/sync_upstream.go | 155 ++++++------------------------------------ 3 files changed, 22 insertions(+), 141 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 9f0d8c0b..657f0c0a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,6 +78,7 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", "", "upstream username") flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", "", "upstream password") flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", "", "name of this agent") + flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", 500, "upstream reconciliation page size") } func init() { diff --git a/jobs/jobs.go b/jobs/jobs.go index d8ae2a7c..bb1e48db 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -32,17 +32,10 @@ func ScheduleJobs() { pullJob := &UpstreamPullJob{} pullJob.Run() - pushJob := &UpstreamPushJob{} - pushJob.Run() - if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err) } - if _, err := FuncScheduler.AddJob(PushConfigResultsToUpstreamSchedule, pushJob); err != nil { - logger.Fatalf("Failed to schedule job [UpstreamPushJob]: %v", err) - } - scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults) } diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index dc6f00cf..e3968b52 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -3,56 +3,49 @@ package jobs import ( "encoding/json" "fmt" + "io" "net/http" "net/url" + "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" "github.com/flanksource/config-db/db" "github.com/flanksource/duty/models" "github.com/flanksource/duty/upstream" - "github.com/google/uuid" "gorm.io/gorm/clause" ) -var tablesToReconcile = []string{ - "config_items", - "config_changes", - "config_analysis", -} +var ReconcilePageSize int // ReconcileConfigScraperResults pushes missing scrape config results to the upstream server func ReconcileConfigScraperResults() { ctx := api.DefaultContext - jobHistory := models.NewJobHistory("PushScraperConfigResultsToUpstream", "Config", "") + jobHistory := models.NewJobHistory("PushUpstream", "Config", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, 500) - for _, table := range tablesToReconcile { - if err := reconciler.Sync(ctx, table); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("failed to sync table %s: %v", table, err) - } else { - jobHistory.IncrSuccess() - } + reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize) + if err := reconciler.Sync(ctx, "config_items"); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("failed to sync table config_items: %v", err) + } else { + jobHistory.IncrSuccess() } } // UpstreamPullJob pulls scrape configs from the upstream server type UpstreamPullJob struct { - lastFetchedID uuid.UUID + lastRuntime time.Time } func (t *UpstreamPullJob) Run() { - ctx := api.DefaultContext - - jobHistory := models.NewJobHistory("PullUpstreamScrapeConfigs", "Config", "") + jobHistory := models.NewJobHistory("PullUpstream", "Config", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - if err := t.pull(ctx, api.UpstreamConfig); err != nil { + if err := t.pull(api.DefaultContext, api.UpstreamConfig); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("error pulling scrape configs from upstream: %v", err) } else { @@ -61,7 +54,7 @@ func (t *UpstreamPullJob) Run() { } func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { - logger.Tracef("pulling scrape configs from upstream since: %v", t.lastFetchedID) + logger.Tracef("pulling scrape configs from upstream since: %v", t.lastRuntime) endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName) if err != nil { @@ -76,7 +69,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo req.SetBasicAuth(config.Username, config.Password) params := url.Values{} - params.Add("since", t.lastFetchedID.String()) + params.Add("since", t.lastRuntime.Format(time.RFC3339)) req.URL.RawQuery = params.Encode() httpClient := &http.Client{} @@ -86,6 +79,11 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("server returned unexpected status:%s (%s)", resp.Status, body) + } + var scrapeConfigs []models.ConfigScraper if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil { return fmt.Errorf("error decoding JSON response: %w", err) @@ -95,7 +93,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo return nil } - t.lastFetchedID = scrapeConfigs[len(scrapeConfigs)-1].ID + t.lastRuntime = scrapeConfigs[len(scrapeConfigs)-1].UpdatedAt logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs)) @@ -104,114 +102,3 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo UpdateAll: true, }).Create(&scrapeConfigs).Error } - -type LastPushedConfigResult struct { - ConfigID uuid.UUID - AnalysisID uuid.UUID - ChangeID uuid.UUID -} - -// UpstreamPushJob pushes scrape config results to the upstream server -type UpstreamPushJob struct { - status LastPushedConfigResult - - initiated bool -} - -// init initializes the last pushed ids ... -func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { - endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName) - if err != nil { - return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) - } - - req, err := http.NewRequest(http.MethodGet, endpoint, nil) - if err != nil { - return fmt.Errorf("error creating new http request: %w", err) - } - - req.SetBasicAuth(config.Username, config.Password) - - httpClient := &http.Client{} - resp, err := httpClient.Do(req) - if err != nil { - return fmt.Errorf("error making request: %w", err) - } - defer resp.Body.Close() - - if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil { - return fmt.Errorf("error decoding JSON response: %w", err) - } - - return nil -} - -func (t *UpstreamPushJob) Run() { - ctx := api.DefaultContext - - jobHistory := models.NewJobHistory("UpstreamPushJob", "Config", "") - _ = db.PersistJobHistory(jobHistory.Start()) - defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - - if !t.initiated { - logger.Debugf("initializing upstream push job") - if err := t.init(ctx, api.UpstreamConfig); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("error initializing upstream push job: %v", err) - return - } - - t.initiated = true - } - - if err := t.run(ctx); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("error pushing to upstream: %v", err) - } else { - jobHistory.IncrSuccess() - } -} - -func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { - pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName} - if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil { - return err - } - - if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { - return err - } - - if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil { - return err - } - - if pushData.Count() == 0 { - return nil - } - - logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) - if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil { - return fmt.Errorf("error pushing to upstream: %w", err) - } - - if len(pushData.ConfigItems) > 0 { - t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID - } - - if len(pushData.ConfigAnalysis) > 0 { - t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID - } - - if len(pushData.ConfigChanges) > 0 { - id := pushData.ConfigChanges[len(pushData.ConfigChanges)-1].ID - parsed, err := uuid.Parse(id) - if err != nil { - return err - } - - t.status.ChangeID = parsed - } - - return nil -} From e00153f97412d73cdbea580c69bf25a03d4960b0 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 29 Sep 2023 10:35:08 +0545 Subject: [PATCH 05/12] fix: use time.RFC3339Nano instead of RFC3339 --- jobs/sync_upstream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index e3968b52..d0f9ec88 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -69,7 +69,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo req.SetBasicAuth(config.Username, config.Password) params := url.Values{} - params.Add("since", t.lastRuntime.Format(time.RFC3339)) + params.Add("since", t.lastRuntime.Format(time.RFC3339Nano)) req.URL.RawQuery = params.Encode() httpClient := &http.Client{} From 3a412c41bb2178dc86e3569d5cb25dd5c00ebbdd Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 29 Sep 2023 10:39:04 +0545 Subject: [PATCH 06/12] feat: support upstream flags as env vars --- cmd/root.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 657f0c0a..41b7b8df 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "os" + "strconv" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -74,11 +75,20 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&publicEndpoint, "public-endpoint", "http://localhost:8080", "Public endpoint that this instance is exposed under") // Flags for push/pull - flags.StringVar(&api.UpstreamConfig.Host, "upstream-host", "", "central mission control instance to sync scrape configs & their results") - flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", "", "upstream username") - flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", "", "upstream password") - flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", "", "name of this agent") - flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", 500, "upstream reconciliation page size") + var upstreamPageSizeDefault = 500 + if val, exists := os.LookupEnv("UPSTREAM_PAGE_SIZE"); exists { + if parsed, err := strconv.Atoi(val); err != nil || parsed < 0 { + logger.Fatalf("invalid value=%s for UPSTREAM_PAGE_SIZE. Must be a postive number", val) + } else { + upstreamPageSizeDefault = parsed + } + } + + flags.StringVar(&api.UpstreamConfig.Host, "upstream-host", os.Getenv("UPSTREAM_HOST"), "central mission control instance to sync scrape configs & their results") + flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", os.Getenv("UPSTREAM_USER"), "upstream username") + flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password") + flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", os.Getenv("AGENT_NAME"), "name of this agent") + flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", upstreamPageSizeDefault, "upstream reconciliation page size") } func init() { From 5e994f66940b346aa80c976a58d501f702814220 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 29 Sep 2023 18:22:42 +0545 Subject: [PATCH 07/12] feat: real-time syncing of config results to upstream [skip ci] --- go.mod | 16 +++++++++++----- go.sum | 19 +++++++++++++++++-- jobs/jobs.go | 7 ++++++- jobs/sync_upstream.go | 21 ++++++++++++++++++++- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index d8bce280..0f795eda 100644 --- a/go.mod +++ b/go.mod @@ -40,13 +40,14 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible github.com/fergusstrange/embedded-postgres v1.21.0 github.com/flanksource/commons v1.12.0 - github.com/flanksource/duty v1.0.183 + github.com/flanksource/duty v1.0.187 github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 github.com/flanksource/ketall v1.1.1 + github.com/flanksource/postq v1.0.0 github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a - github.com/google/cel-go v0.18.0 + github.com/google/cel-go v0.18.1 github.com/google/uuid v1.3.1 github.com/hashicorp/go-getter v1.7.2 github.com/henvic/httpretty v0.1.0 @@ -112,6 +113,7 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/robertkrimen/otto v0.2.1 // indirect + github.com/sethvargo/go-retry v0.2.4 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/tidwall/gjson v1.17.0 // indirect @@ -138,7 +140,7 @@ require ( cloud.google.com/go/storage v1.33.0 // indirect github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect - github.com/aws/aws-sdk-go v1.45.15 // indirect + github.com/aws/aws-sdk-go v1.45.19 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect @@ -237,7 +239,7 @@ require ( golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/api v0.142.0 // indirect + google.golang.org/api v0.143.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/grpc v1.58.2 // indirect @@ -248,10 +250,14 @@ require ( k8s.io/apiextensions-apiserver v0.28.0 // indirect k8s.io/cli-runtime v0.28.0 // indirect k8s.io/klog/v2 v2.100.1 // indirect - k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d // indirect + k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) + +replace github.com/flanksource/duty => ../duty + +replace github.com/flanksource/postq => ../postq diff --git a/go.sum b/go.sum index d5ec8b72..b353746c 100644 --- a/go.sum +++ b/go.sum @@ -702,6 +702,8 @@ github.com/aws/aws-sdk-go v1.37.32/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.45.15 h1:gYBTVSYuhXdatrLbsPaRgVcc637zzdgThWmsDRwXLOo= github.com/aws/aws-sdk-go v1.45.15/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.45.19 h1:+4yXWhldhCVXWFOQRF99ZTJ92t4DtoHROZIbN7Ujk/U= +github.com/aws/aws-sdk-go v1.45.19/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= @@ -862,8 +864,6 @@ github.com/fergusstrange/embedded-postgres v1.21.0 h1:Sups0nR31+OB4iOZ0ZU4IwUDsB github.com/fergusstrange/embedded-postgres v1.21.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874= github.com/flanksource/commons v1.12.0 h1:8B7+AbRbWH3KVFwbmXYkG3gS42pF+uVaF4lAgDY+ZJA= github.com/flanksource/commons v1.12.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.183 h1:EPJGvrVhc8mvXufwp3TCrNzpMFle/znkoNTiWgxkRrc= -github.com/flanksource/duty v1.0.183/go.mod h1:/TW8OfsHrDt2s7QIpDbTOlOhadJGQ652C5vbapc6a7E= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= @@ -1041,6 +1041,8 @@ github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl76 github.com/google/cel-go v0.16.0/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY= github.com/google/cel-go v0.18.0 h1:u74MPiEC8mejBrkXqrTWT102g5IFEUjxOngzQIijMzU= github.com/google/cel-go v0.18.0/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= +github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= +github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -1171,6 +1173,7 @@ github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= @@ -1449,6 +1452,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -2119,6 +2124,8 @@ google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0 google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= google.golang.org/api v0.142.0 h1:mf+7EJ94fi5ZcnpPy+m0Yv2dkz8bKm+UL0snTCuwXlY= google.golang.org/api v0.142.0/go.mod h1:zJAN5o6HRqR7O+9qJUFOWrZkYE66RH+efPBdTLA4xBA= +google.golang.org/api v0.143.0 h1:o8cekTkqhywkbZT6p1UHJPZ9+9uuCAJs/KYomxZB8fA= +google.golang.org/api v0.143.0/go.mod h1:FoX9DO9hT7DLNn97OuoZAGSDuNAXdJRuGK98rSUgurk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -2392,6 +2399,8 @@ k8s.io/api v0.24.2/go.mod h1:AHqbSkTm6YrQ0ObxjO3Pmp/ubFF/KuM7jU+3khoBsOg= k8s.io/api v0.26.4/go.mod h1:WwKEXU3R1rgCZ77AYa7DFksd9/BAIKyOmRlbVxgvjCk= k8s.io/api v0.28.0 h1:3j3VPWmN9tTDI68NETBWlDiA9qOiGJ7sdKeufehBYsM= k8s.io/api v0.28.0/go.mod h1:0l8NZJzB0i/etuWnIXcwfIv+xnDOhL3lLW919AWYDuY= +k8s.io/api v0.28.2 h1:9mpl5mOb6vXZvqbQmankOfPIGiudghwCoLl1EYfUZbw= +k8s.io/api v0.28.2/go.mod h1:RVnJBsjU8tcMq7C3iaRSGMeaKt2TWEUXcpIt/90fjEg= k8s.io/apiextensions-apiserver v0.28.0 h1:CszgmBL8CizEnj4sj7/PtLGey6Na3YgWyGCPONv7E9E= k8s.io/apiextensions-apiserver v0.28.0/go.mod h1:uRdYiwIuu0SyqJKriKmqEN2jThIJPhVmOWETm8ud1VE= k8s.io/apimachinery v0.21.2/go.mod h1:CdTY8fU/BlvAbJ2z/8kBwimGki5Zp8/fbVuLY8gJumM= @@ -2399,12 +2408,16 @@ k8s.io/apimachinery v0.24.2/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2U k8s.io/apimachinery v0.26.4/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= k8s.io/apimachinery v0.28.0 h1:ScHS2AG16UlYWk63r46oU3D5y54T53cVI5mMJwwqFNA= k8s.io/apimachinery v0.28.0/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= +k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ= +k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU= k8s.io/cli-runtime v0.21.2/go.mod h1:8u/jFcM0QpoI28f6sfrAAIslLCXUYKD5SsPPMWiHYrI= k8s.io/cli-runtime v0.28.0 h1:Tcz1nnccXZDNIzoH6EwjCs+7ezkUGhorzCweEvlVOFg= k8s.io/cli-runtime v0.28.0/go.mod h1:U+ySmOKBm/JUCmebhmecXeTwNN1RzI7DW4+OM8Oryas= k8s.io/client-go v0.21.2/go.mod h1:HdJ9iknWpbl3vMGtib6T2PyI/VYxiZfq936WNVHBRrA= k8s.io/client-go v0.28.0 h1:ebcPRDZsCjpj62+cMk1eGNX1QkMdRmQ6lmz5BLoFWeM= k8s.io/client-go v0.28.0/go.mod h1:0Asy9Xt3U98RypWJmU1ZrRAGKhP6NqDPmptlAzK2kMc= +k8s.io/client-go v0.28.2 h1:DNoYI1vGq0slMBN/SWKMZMw0Rq+0EQW6/AK4v9+3VeY= +k8s.io/client-go v0.28.2/go.mod h1:sMkApowspLuc7omj1FOSUxSoqjr+d5Q0Yc0LOFnYFJY= k8s.io/component-base v0.28.0 h1:HQKy1enJrOeJlTlN4a6dU09wtmXaUvThC0irImfqyxI= k8s.io/component-base v0.28.0/go.mod h1:Yyf3+ZypLfMydVzuLBqJ5V7Kx6WwDr/5cN+dFjw1FNk= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= @@ -2422,6 +2435,8 @@ k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdi k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d h1:/CFeJBjBrZvHX09rObS2+2iEEDevMWYc1v3aIYAjIYI= k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= +k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833 h1:iFFEmmB7szQhJP42AvRD2+gzdVP7EuIKY1rJgxf0JZY= +k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/jobs/jobs.go b/jobs/jobs.go index bb1e48db..36ad071c 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -12,7 +12,7 @@ import ( var FuncScheduler = cron.New() const ( - PullConfigScrapersFromUpstreamSchedule = "@every 30s" + PullConfigScrapersFromUpstreamSchedule = "@every 5m" PushConfigResultsToUpstreamSchedule = "@every 10s" ReconcileConfigsToUpstreamSchedule = "@every 3h" ) @@ -36,6 +36,11 @@ func ScheduleJobs() { logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err) } + // Syncs scrape config results to upstream in real-time + if err := StartConsumser(api.DefaultContext); err != nil { + logger.Fatalf("Failed to start event consumer: %v", err) + } + scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults) } diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index d0f9ec88..5934a3f2 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -13,11 +13,17 @@ import ( "github.com/flanksource/config-db/db" "github.com/flanksource/duty/models" "github.com/flanksource/duty/upstream" + "github.com/flanksource/postq/pg" "gorm.io/gorm/clause" ) var ReconcilePageSize int +const ( + EventPushQueueCreate = "push_queue.create" + eventQueueUpdateChannel = "event_queue_updates" +) + // ReconcileConfigScraperResults pushes missing scrape config results to the upstream server func ReconcileConfigScraperResults() { ctx := api.DefaultContext @@ -27,7 +33,7 @@ func ReconcileConfigScraperResults() { defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize) - if err := reconciler.Sync(ctx, "config_items"); err != nil { + if err := reconciler.SyncAfter(ctx, "config_items", time.Hour*48); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("failed to sync table config_items: %v", err) } else { @@ -102,3 +108,16 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo UpdateAll: true, }).Create(&scrapeConfigs).Error } + +func StartConsumser(ctx api.ScrapeContext) error { + consumer, err := upstream.NewPushQueueConsumer(api.UpstreamConfig).EventConsumer() + if err != nil { + return err + } + + pgNotifyChannel := make(chan string) + go pg.Listen(ctx, eventQueueUpdateChannel, pgNotifyChannel) + + go consumer.Listen(ctx, pgNotifyChannel) + return nil +} From 9f2ff2d6cb7def9aba8dccdbd45fdb7d54ec660a Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 2 Oct 2023 11:05:15 +0545 Subject: [PATCH 08/12] chore: add upstream env vars to helm chart [skip ci] --- chart/Chart.yaml | 2 +- chart/templates/deployment.yaml | 7 +++++++ chart/values.yaml | 6 ++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/chart/Chart.yaml b/chart/Chart.yaml index c180c0bb..c9a29d46 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -4,6 +4,6 @@ description: A Helm chart for config-db type: application -version: 0.1.0 +version: 0.2.0 appVersion: "0.0.5" diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 281d47b5..d41d8c2c 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -42,6 +42,9 @@ spec: - --disable-postgrest={{ .Values.disablePostgrest }} - --change-retention-days={{ .Values.configChangeRetentionDays }} - --analysis-retention-days={{ .Values.configAnalysisRetentionDays }} + envFrom: + - secretRef: + name: {{ .Values.upstream.secretKeyRef.name }} env: - name: DB_URL valueFrom: @@ -50,6 +53,10 @@ spec: key: {{ .Values.db.secretKeyRef.key }} - name: NAMESPACE value: {{ .Values.namespace | default .Release.Namespace }} + - name: AGENT_NAME + value: {{ .Values.upstream.agentName }} + - name: UPSTREAM_PAGE_SIZE + value: {{ .Values.upstream.pageSize }} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.extra }} diff --git a/chart/values.yaml b/chart/values.yaml index 78b82b36..6548c913 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -61,6 +61,12 @@ serviceAccount: name: "" annotations: {} +upstream: + secretKeyRef: + name: upstream + agentName: + pageSize: 500 + extra: {} # nodeSelector: From ac957c35913504f4b68acc887f9b8a8035ee5ed2 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 2 Oct 2023 14:29:04 +0545 Subject: [PATCH 09/12] chore: bump duty & postq --- go.mod | 12 ++++-------- go.sum | 27 ++++++++------------------- hack/generate-schemas/go.mod | 6 +++--- hack/generate-schemas/go.sum | 12 ++++++------ 4 files changed, 21 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 0f795eda..c7d1e517 100644 --- a/go.mod +++ b/go.mod @@ -40,10 +40,10 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible github.com/fergusstrange/embedded-postgres v1.21.0 github.com/flanksource/commons v1.12.0 - github.com/flanksource/duty v1.0.187 + github.com/flanksource/duty v1.0.188 github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 github.com/flanksource/ketall v1.1.1 - github.com/flanksource/postq v1.0.0 + github.com/flanksource/postq v0.1.0 github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a @@ -76,7 +76,7 @@ require ( ) require ( - ariga.io/atlas v0.14.1 // indirect + ariga.io/atlas v0.14.2 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.9.0 // indirect @@ -157,7 +157,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/flanksource/gomplate/v3 v3.20.12 + github.com/flanksource/gomplate/v3 v3.20.13 github.com/ghodss/yaml v1.0.0 // indirect github.com/go-errors/errors v1.4.2 // indirect github.com/go-logr/logr v1.2.4 @@ -257,7 +257,3 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) - -replace github.com/flanksource/duty => ../duty - -replace github.com/flanksource/postq => ../postq diff --git a/go.sum b/go.sum index b353746c..07a960a0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -ariga.io/atlas v0.14.1 h1:mun+I5QiFaKVJBfHNnlTqa0PCj6qCZsp/M3dxFC9WPg= -ariga.io/atlas v0.14.1/go.mod h1:isZrlzJ5cpoCoKFoY9knZug7Lq4pP1cm8g3XciLZ0Pw= +ariga.io/atlas v0.14.2 h1:efxCuSGnDuhx7xm4JaqImR6xd+PqyizgGy5u/XUEI/g= +ariga.io/atlas v0.14.2/go.mod h1:isZrlzJ5cpoCoKFoY9knZug7Lq4pP1cm8g3XciLZ0Pw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -700,8 +700,6 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.37.32/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.45.15 h1:gYBTVSYuhXdatrLbsPaRgVcc637zzdgThWmsDRwXLOo= -github.com/aws/aws-sdk-go v1.45.15/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go v1.45.19 h1:+4yXWhldhCVXWFOQRF99ZTJ92t4DtoHROZIbN7Ujk/U= github.com/aws/aws-sdk-go v1.45.19/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= @@ -864,9 +862,11 @@ github.com/fergusstrange/embedded-postgres v1.21.0 h1:Sups0nR31+OB4iOZ0ZU4IwUDsB github.com/fergusstrange/embedded-postgres v1.21.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874= github.com/flanksource/commons v1.12.0 h1:8B7+AbRbWH3KVFwbmXYkG3gS42pF+uVaF4lAgDY+ZJA= github.com/flanksource/commons v1.12.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= +github.com/flanksource/duty v1.0.188 h1:hr83m0pJO+bV/hRu+LnmkAc6Wj+NA040bgXaYLl95AI= +github.com/flanksource/duty v1.0.188/go.mod h1:6VTHWqN36OIXVm288kaT41TYUW0RdNg2/I5uxfuTJ70= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= -github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= -github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/gomplate/v3 v3.20.13 h1:ibWpQL7Hsf6f0bik9xwRS++nzey/DXVjtzln9MsD/qo= +github.com/flanksource/gomplate/v3 v3.20.13/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230705092916-3b4cf510c5fc/go.mod h1:4pQhmF+TnVqJroQKY8wSnSp+T18oLson6YQ2M0qPHfQ= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 h1:MHXg2Vo/oHB0rGLgsI0tkU9MGV7aDwqvO1lrbX7/shY= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= @@ -874,6 +874,8 @@ github.com/flanksource/ketall v1.1.1 h1:eIHI7FNAG0qC9W7adYzafPUqjQD3Ev/z23x4+tsX github.com/flanksource/ketall v1.1.1/go.mod h1:uuH0BLv3hDVNVu+SQzQ/1fXejfTyGb8wLk7saKsc7mU= github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhGKrA4BtwTE= github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps= +github.com/flanksource/postq v0.1.0 h1:z67bskZFaiKs6Nlzaw2U7FL4S1mu8BoCfonedt/fW70= +github.com/flanksource/postq v0.1.0/go.mod h1:nE3mLh0vpF43TT+0HszC0QmORB37xSWPVHmhOY6PqBk= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -1039,8 +1041,6 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.16.0/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY= -github.com/google/cel-go v0.18.0 h1:u74MPiEC8mejBrkXqrTWT102g5IFEUjxOngzQIijMzU= -github.com/google/cel-go v0.18.0/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= @@ -1173,7 +1173,6 @@ github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= @@ -2122,8 +2121,6 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.142.0 h1:mf+7EJ94fi5ZcnpPy+m0Yv2dkz8bKm+UL0snTCuwXlY= -google.golang.org/api v0.142.0/go.mod h1:zJAN5o6HRqR7O+9qJUFOWrZkYE66RH+efPBdTLA4xBA= google.golang.org/api v0.143.0 h1:o8cekTkqhywkbZT6p1UHJPZ9+9uuCAJs/KYomxZB8fA= google.golang.org/api v0.143.0/go.mod h1:FoX9DO9hT7DLNn97OuoZAGSDuNAXdJRuGK98rSUgurk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -2399,8 +2396,6 @@ k8s.io/api v0.24.2/go.mod h1:AHqbSkTm6YrQ0ObxjO3Pmp/ubFF/KuM7jU+3khoBsOg= k8s.io/api v0.26.4/go.mod h1:WwKEXU3R1rgCZ77AYa7DFksd9/BAIKyOmRlbVxgvjCk= k8s.io/api v0.28.0 h1:3j3VPWmN9tTDI68NETBWlDiA9qOiGJ7sdKeufehBYsM= k8s.io/api v0.28.0/go.mod h1:0l8NZJzB0i/etuWnIXcwfIv+xnDOhL3lLW919AWYDuY= -k8s.io/api v0.28.2 h1:9mpl5mOb6vXZvqbQmankOfPIGiudghwCoLl1EYfUZbw= -k8s.io/api v0.28.2/go.mod h1:RVnJBsjU8tcMq7C3iaRSGMeaKt2TWEUXcpIt/90fjEg= k8s.io/apiextensions-apiserver v0.28.0 h1:CszgmBL8CizEnj4sj7/PtLGey6Na3YgWyGCPONv7E9E= k8s.io/apiextensions-apiserver v0.28.0/go.mod h1:uRdYiwIuu0SyqJKriKmqEN2jThIJPhVmOWETm8ud1VE= k8s.io/apimachinery v0.21.2/go.mod h1:CdTY8fU/BlvAbJ2z/8kBwimGki5Zp8/fbVuLY8gJumM= @@ -2408,16 +2403,12 @@ k8s.io/apimachinery v0.24.2/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2U k8s.io/apimachinery v0.26.4/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= k8s.io/apimachinery v0.28.0 h1:ScHS2AG16UlYWk63r46oU3D5y54T53cVI5mMJwwqFNA= k8s.io/apimachinery v0.28.0/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= -k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ= -k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU= k8s.io/cli-runtime v0.21.2/go.mod h1:8u/jFcM0QpoI28f6sfrAAIslLCXUYKD5SsPPMWiHYrI= k8s.io/cli-runtime v0.28.0 h1:Tcz1nnccXZDNIzoH6EwjCs+7ezkUGhorzCweEvlVOFg= k8s.io/cli-runtime v0.28.0/go.mod h1:U+ySmOKBm/JUCmebhmecXeTwNN1RzI7DW4+OM8Oryas= k8s.io/client-go v0.21.2/go.mod h1:HdJ9iknWpbl3vMGtib6T2PyI/VYxiZfq936WNVHBRrA= k8s.io/client-go v0.28.0 h1:ebcPRDZsCjpj62+cMk1eGNX1QkMdRmQ6lmz5BLoFWeM= k8s.io/client-go v0.28.0/go.mod h1:0Asy9Xt3U98RypWJmU1ZrRAGKhP6NqDPmptlAzK2kMc= -k8s.io/client-go v0.28.2 h1:DNoYI1vGq0slMBN/SWKMZMw0Rq+0EQW6/AK4v9+3VeY= -k8s.io/client-go v0.28.2/go.mod h1:sMkApowspLuc7omj1FOSUxSoqjr+d5Q0Yc0LOFnYFJY= k8s.io/component-base v0.28.0 h1:HQKy1enJrOeJlTlN4a6dU09wtmXaUvThC0irImfqyxI= k8s.io/component-base v0.28.0/go.mod h1:Yyf3+ZypLfMydVzuLBqJ5V7Kx6WwDr/5cN+dFjw1FNk= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= @@ -2433,8 +2424,6 @@ k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d h1:/CFeJBjBrZvHX09rObS2+2iEEDevMWYc1v3aIYAjIYI= -k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833 h1:iFFEmmB7szQhJP42AvRD2+gzdVP7EuIKY1rJgxf0JZY= k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/hack/generate-schemas/go.mod b/hack/generate-schemas/go.mod index 1fca9da4..3fdd83f5 100644 --- a/hack/generate-schemas/go.mod +++ b/hack/generate-schemas/go.mod @@ -13,14 +13,14 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect - github.com/flanksource/duty v1.0.183 // indirect - github.com/flanksource/gomplate/v3 v3.20.12 // indirect + github.com/flanksource/duty v1.0.188 // indirect + github.com/flanksource/gomplate/v3 v3.20.13 // indirect github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 // indirect github.com/flanksource/mapstructure v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/cel-go v0.18.0 // indirect + github.com/google/cel-go v0.18.1 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/gosimple/slug v1.13.1 // indirect diff --git a/hack/generate-schemas/go.sum b/hack/generate-schemas/go.sum index e4975981..6d757b2a 100644 --- a/hack/generate-schemas/go.sum +++ b/hack/generate-schemas/go.sum @@ -13,10 +13,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/flanksource/commons v1.12.0 h1:8B7+AbRbWH3KVFwbmXYkG3gS42pF+uVaF4lAgDY+ZJA= github.com/flanksource/commons v1.12.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.183 h1:EPJGvrVhc8mvXufwp3TCrNzpMFle/znkoNTiWgxkRrc= -github.com/flanksource/duty v1.0.183/go.mod h1:/TW8OfsHrDt2s7QIpDbTOlOhadJGQ652C5vbapc6a7E= -github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= -github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/duty v1.0.188 h1:hr83m0pJO+bV/hRu+LnmkAc6Wj+NA040bgXaYLl95AI= +github.com/flanksource/duty v1.0.188/go.mod h1:6VTHWqN36OIXVm288kaT41TYUW0RdNg2/I5uxfuTJ70= +github.com/flanksource/gomplate/v3 v3.20.13 h1:ibWpQL7Hsf6f0bik9xwRS++nzey/DXVjtzln9MsD/qo= +github.com/flanksource/gomplate/v3 v3.20.13/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 h1:MHXg2Vo/oHB0rGLgsI0tkU9MGV7aDwqvO1lrbX7/shY= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhGKrA4BtwTE= @@ -30,8 +30,8 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/cel-go v0.18.0 h1:u74MPiEC8mejBrkXqrTWT102g5IFEUjxOngzQIijMzU= -github.com/google/cel-go v0.18.0/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= +github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= +github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= From e3b045fba8dda191b15f90c5368b84f8de47be24 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 2 Oct 2023 14:57:07 +0545 Subject: [PATCH 10/12] fix: helm chart --- chart/templates/deployment.yaml | 10 +++++++--- chart/values.yaml | 5 +++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index d41d8c2c..c98af5af 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -42,9 +42,11 @@ spec: - --disable-postgrest={{ .Values.disablePostgrest }} - --change-retention-days={{ .Values.configChangeRetentionDays }} - --analysis-retention-days={{ .Values.configAnalysisRetentionDays }} + {{- if .Values.upstream.enabled}} envFrom: - secretRef: - name: {{ .Values.upstream.secretKeyRef.name }} + name: {{ .Values.upstream.secretKeyRef.name }} + {{- end}} env: - name: DB_URL valueFrom: @@ -53,10 +55,12 @@ spec: key: {{ .Values.db.secretKeyRef.key }} - name: NAMESPACE value: {{ .Values.namespace | default .Release.Namespace }} + {{- if .Values.upstream.enabled}} - name: AGENT_NAME - value: {{ .Values.upstream.agentName }} + value: '{{ .Values.upstream.agentName }}' - name: UPSTREAM_PAGE_SIZE - value: {{ .Values.upstream.pageSize }} + value: '{{ .Values.upstream.pageSize }}' + {{- end}} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.extra }} diff --git a/chart/values.yaml b/chart/values.yaml index 6548c913..d9853670 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -62,9 +62,10 @@ serviceAccount: annotations: {} upstream: + enabled: true secretKeyRef: - name: upstream - agentName: + name: upstream # Must contain: UPSTREAM_USER, UPSTREAM_PASS & UPSTREAM_HOST + agentName: default-agent pageSize: 500 extra: From 7c1e913c4fa788d4faf87c40435168bf171ba0a4 Mon Sep 17 00:00:00 2001 From: Moshe Immerman Date: Tue, 3 Oct 2023 10:17:05 +0300 Subject: [PATCH 11/12] chore: disable upstream push by default --- chart/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chart/values.yaml b/chart/values.yaml index d9853670..f1130012 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -62,7 +62,7 @@ serviceAccount: annotations: {} upstream: - enabled: true + enabled: false secretKeyRef: name: upstream # Must contain: UPSTREAM_USER, UPSTREAM_PASS & UPSTREAM_HOST agentName: default-agent From 1e81eab62f063cf368593a2205217ee03382283b Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 3 Oct 2023 14:33:22 +0545 Subject: [PATCH 12/12] fix: some helm issues --- chart/templates/postgres.yaml | 4 ++-- chart/values.yaml | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/chart/templates/postgres.yaml b/chart/templates/postgres.yaml index 2567cd5a..93f25204 100644 --- a/chart/templates/postgres.yaml +++ b/chart/templates/postgres.yaml @@ -26,7 +26,7 @@ spec: envFrom: - secretRef: name: {{ .Values.db.secretKeyRef.name }} - volumeClaimTemplates: + volumeClaimTemplates: - metadata: name: postgresql spec: @@ -64,7 +64,7 @@ stringData: {{- $secretObj := ( lookup "v1" "Secret" .Release.Namespace "postgres-connection" ) }} {{- $secretData := ( get $secretObj "data" ) }} {{- $user := (( get $secretData "POSTGRES_USER" ) | b64dec ) | default "postgres" }} - {{- $password := (( get $secretData "POSTGRES_PASSWORD" ) | b64dec ) | default randAlphaNum 32 }} + {{- $password := (( get $secretData "POSTGRES_PASSWORD" ) | b64dec ) | default (randAlphaNum 32) }} {{- $host := print "postgres." .Release.Namespace ".svc.cluster.local:5432" }} {{- $url := print "postgresql://" $user ":" $password "@" $host }} {{- $configDbUrl := ( get $secretData .Values.db.secretKeyRef.key ) | default ( print $url "/config-db" ) }} diff --git a/chart/values.yaml b/chart/values.yaml index f1130012..4cb830e2 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -27,11 +27,9 @@ db: enabled: true secretKeyRef: create: true - # Setting the name of the secret will disable secret creation in this chart and look for an existing secret (whose name is specified in this field) to mount. - # When setting this up in a fresh environment as a standalone app, it's best to leave the value empty. + # (Required) The name of the secret to look for. name: - # This is the key that either the secret will create(if secretRefKey is empty) or this is the key it'll look for in the secret(if secretRefKey is mentioned). - # The name of the key is mandatory to set. + # (Required) This is the key that we look for in the secret. key: DB_URL storageClass: storage: