diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 79e57ad5..fde02644 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -11,7 +11,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: 1.19.x + go-version: 1.20.x - name: golangci-lint uses: golangci/golangci-lint-action@08e2f20817b15149a52b5b3ebe7de50aff2ba8c5 # v3.4.0 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fed411eb..f30ca7d6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: v1.19.x + go-version: v1.20.x - uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2 with: path: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 14793cbe..264cefd0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ jobs: - name: Install Go uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0 with: - go-version: 1.19.x + go-version: 1.20.x - name: Checkout code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 - uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2 diff --git a/Dockerfile b/Dockerfile index 9c9e58d7..65444a5a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19@sha256:8cefba2710250b21a8b8e32281788c5b53dc561ba0c51ea7de92b9a350663b7d as builder +FROM golang:1.20@sha256:bc5f0b5e43282627279fe5262ae275fecb3d2eae3b33977a7fd200c7a760d6f1 as builder WORKDIR /app COPY ./ ./ @@ -10,7 +10,7 @@ WORKDIR /app RUN go version RUN make build -FROM ubuntu:bionic@sha256:14f1045816502e16fcbfc0b2a76747e9f5e40bc3899f8cfe20745abaafeaeab3 +FROM ubuntu:jammy@sha256:0bced47fffa3361afa981854fcabcd4577cd43cebbb808cea2b1f33a3dd7f508 WORKDIR /app COPY --from=builder /app/.bin/config-db /app diff --git a/api/global.go b/api/global.go index 592bb6c3..5493aad3 100644 --- a/api/global.go +++ b/api/global.go @@ -2,6 +2,7 @@ package api import ( v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/upstream" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -11,6 +12,8 @@ var ( KubernetesRestConfig *rest.Config Namespace string DefaultContext ScrapeContext + + UpstreamConfig upstream.UpstreamConfig ) type Scraper interface { diff --git a/chart/Chart.yaml b/chart/Chart.yaml index c180c0bb..c9a29d46 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -4,6 +4,6 @@ description: A Helm chart for config-db type: application -version: 0.1.0 +version: 0.2.0 appVersion: "0.0.5" diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 281d47b5..c98af5af 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -42,6 +42,11 @@ spec: - --disable-postgrest={{ .Values.disablePostgrest }} - --change-retention-days={{ .Values.configChangeRetentionDays }} - --analysis-retention-days={{ .Values.configAnalysisRetentionDays }} + {{- if .Values.upstream.enabled}} + envFrom: + - secretRef: + name: {{ .Values.upstream.secretKeyRef.name }} + {{- end}} env: - name: DB_URL valueFrom: @@ -50,6 +55,12 @@ spec: key: {{ .Values.db.secretKeyRef.key }} - name: NAMESPACE value: {{ .Values.namespace | default .Release.Namespace }} + {{- if .Values.upstream.enabled}} + - name: AGENT_NAME + value: '{{ .Values.upstream.agentName }}' + - name: UPSTREAM_PAGE_SIZE + value: '{{ .Values.upstream.pageSize }}' + {{- end}} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.extra }} diff --git a/chart/templates/postgres.yaml b/chart/templates/postgres.yaml index 2567cd5a..93f25204 100644 --- a/chart/templates/postgres.yaml +++ b/chart/templates/postgres.yaml @@ -26,7 +26,7 @@ spec: envFrom: - secretRef: name: {{ .Values.db.secretKeyRef.name }} - volumeClaimTemplates: + volumeClaimTemplates: - metadata: name: postgresql spec: @@ -64,7 +64,7 @@ stringData: {{- $secretObj := ( lookup "v1" "Secret" .Release.Namespace "postgres-connection" ) }} {{- $secretData := ( get $secretObj "data" ) }} {{- $user := (( get $secretData "POSTGRES_USER" ) | b64dec ) | default "postgres" }} - {{- $password := (( get $secretData "POSTGRES_PASSWORD" ) | b64dec ) | default randAlphaNum 32 }} + {{- $password := (( get $secretData "POSTGRES_PASSWORD" ) | b64dec ) | default (randAlphaNum 32) }} {{- $host := print "postgres." .Release.Namespace ".svc.cluster.local:5432" }} {{- $url := print "postgresql://" $user ":" $password "@" $host }} {{- $configDbUrl := ( get $secretData .Values.db.secretKeyRef.key ) | default ( print $url "/config-db" ) }} diff --git a/chart/values.yaml b/chart/values.yaml index 78b82b36..4cb830e2 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -27,11 +27,9 @@ db: enabled: true secretKeyRef: create: true - # Setting the name of the secret will disable secret creation in this chart and look for an existing secret (whose name is specified in this field) to mount. - # When setting this up in a fresh environment as a standalone app, it's best to leave the value empty. + # (Required) The name of the secret to look for. name: - # This is the key that either the secret will create(if secretRefKey is empty) or this is the key it'll look for in the secret(if secretRefKey is mentioned). - # The name of the key is mandatory to set. + # (Required) This is the key that we look for in the secret. key: DB_URL storageClass: storage: @@ -61,6 +59,13 @@ serviceAccount: name: "" annotations: {} +upstream: + enabled: false + secretKeyRef: + name: upstream # Must contain: UPSTREAM_USER, UPSTREAM_PASS & UPSTREAM_HOST + agentName: default-agent + pageSize: 500 + extra: {} # nodeSelector: diff --git a/cmd/root.go b/cmd/root.go index 1dbd7fe8..41b7b8df 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "os" + "strconv" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -10,16 +11,10 @@ import ( "github.com/flanksource/config-db/jobs" "github.com/flanksource/config-db/scrapers" "github.com/flanksource/config-db/utils/kube" - "github.com/google/uuid" "github.com/spf13/cobra" "github.com/spf13/pflag" ) -var ( - agentID = uuid.Nil // the derived agent id from the agentName - agentName string // name of the agent passed as a CLI arg -) - var dev bool var httpPort, metricsPort, devGuiPort int var disableKubernetes bool @@ -78,7 +73,22 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&scrapers.DefaultSchedule, "default-schedule", "@every 60m", "Default schedule for configs that don't specfiy one") flags.StringVar(&scrapers.StaleTimeout, "stale-timeout", "30m", "Delete config items not scraped within the timeout") flags.StringVar(&publicEndpoint, "public-endpoint", "http://localhost:8080", "Public endpoint that this instance is exposed under") - flags.StringVar(&agentName, "agent-name", "", "Name of the agent") + + // Flags for push/pull + var upstreamPageSizeDefault = 500 + if val, exists := os.LookupEnv("UPSTREAM_PAGE_SIZE"); exists { + if parsed, err := strconv.Atoi(val); err != nil || parsed < 0 { + logger.Fatalf("invalid value=%s for UPSTREAM_PAGE_SIZE. Must be a postive number", val) + } else { + upstreamPageSizeDefault = parsed + } + } + + flags.StringVar(&api.UpstreamConfig.Host, "upstream-host", os.Getenv("UPSTREAM_HOST"), "central mission control instance to sync scrape configs & their results") + flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", os.Getenv("UPSTREAM_USER"), "upstream username") + flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password") + flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", os.Getenv("AGENT_NAME"), "name of this agent") + flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", upstreamPageSizeDefault, "upstream reconciliation page size") } func init() { diff --git a/cmd/server.go b/cmd/server.go index 84b802ee..1bf6af90 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/jobs" "github.com/flanksource/config-db/query" + "github.com/google/uuid" "github.com/flanksource/config-db/scrapers" "github.com/labstack/echo/v4" @@ -55,17 +56,6 @@ func serve(configFiles []string) { e.GET("/query", query.Handler) e.POST("/run/:id", scrapers.RunNowHandler) - if agentName != "" { - agent, err := db.FindAgentByName(context.Background(), agentName) - if err != nil { - logger.Fatalf("error searching for agent (name=%s): %v", agentName, err) - } else if agent == nil { - logger.Fatalf("agent not found (name=%s)", agentName) - } else { - agentID = agent.ID - } - } - go startScraperCron(configFiles) go jobs.ScheduleJobs() @@ -89,7 +79,7 @@ func startScraperCron(configFiles []string) { } } - scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(agentID) + scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(uuid.Nil) if err != nil { logger.Fatalf("error getting configs from database: %v", err) } diff --git a/go.mod b/go.mod index b1ba6d17..2385eaae 100644 --- a/go.mod +++ b/go.mod @@ -40,13 +40,14 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible github.com/fergusstrange/embedded-postgres v1.21.0 github.com/flanksource/commons v1.12.0 - github.com/flanksource/duty v1.0.183 + github.com/flanksource/duty v1.0.188 github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 github.com/flanksource/ketall v1.1.1 + github.com/flanksource/postq v0.1.0 github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a - github.com/google/cel-go v0.18.0 + github.com/google/cel-go v0.18.1 github.com/google/uuid v1.3.1 github.com/hashicorp/go-getter v1.7.2 github.com/henvic/httpretty v0.1.0 @@ -75,7 +76,7 @@ require ( ) require ( - ariga.io/atlas v0.14.1 // indirect + ariga.io/atlas v0.14.2 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.9.0 // indirect @@ -112,6 +113,7 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/robertkrimen/otto v0.2.1 // indirect + github.com/sethvargo/go-retry v0.2.4 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/tidwall/gjson v1.17.0 // indirect @@ -138,7 +140,7 @@ require ( cloud.google.com/go/storage v1.33.0 // indirect github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect - github.com/aws/aws-sdk-go v1.45.15 // indirect + github.com/aws/aws-sdk-go v1.45.19 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect @@ -155,7 +157,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/flanksource/gomplate/v3 v3.20.12 + github.com/flanksource/gomplate/v3 v3.20.13 github.com/ghodss/yaml v1.0.0 // indirect github.com/go-errors/errors v1.4.2 // indirect github.com/go-logr/logr v1.2.4 @@ -237,7 +239,7 @@ require ( golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/api v0.142.0 // indirect + google.golang.org/api v0.143.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/grpc v1.58.2 // indirect @@ -248,7 +250,7 @@ require ( k8s.io/apiextensions-apiserver v0.28.0 // indirect k8s.io/cli-runtime v0.28.0 // indirect k8s.io/klog/v2 v2.100.1 // indirect - k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d // indirect + k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect diff --git a/go.sum b/go.sum index f8e150b9..8ba5736d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -ariga.io/atlas v0.14.1 h1:mun+I5QiFaKVJBfHNnlTqa0PCj6qCZsp/M3dxFC9WPg= -ariga.io/atlas v0.14.1/go.mod h1:isZrlzJ5cpoCoKFoY9knZug7Lq4pP1cm8g3XciLZ0Pw= +ariga.io/atlas v0.14.2 h1:efxCuSGnDuhx7xm4JaqImR6xd+PqyizgGy5u/XUEI/g= +ariga.io/atlas v0.14.2/go.mod h1:isZrlzJ5cpoCoKFoY9knZug7Lq4pP1cm8g3XciLZ0Pw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -700,8 +700,8 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.37.32/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.45.15 h1:gYBTVSYuhXdatrLbsPaRgVcc637zzdgThWmsDRwXLOo= -github.com/aws/aws-sdk-go v1.45.15/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.45.19 h1:+4yXWhldhCVXWFOQRF99ZTJ92t4DtoHROZIbN7Ujk/U= +github.com/aws/aws-sdk-go v1.45.19/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= @@ -862,11 +862,11 @@ github.com/fergusstrange/embedded-postgres v1.21.0 h1:Sups0nR31+OB4iOZ0ZU4IwUDsB github.com/fergusstrange/embedded-postgres v1.21.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874= github.com/flanksource/commons v1.12.0 h1:8B7+AbRbWH3KVFwbmXYkG3gS42pF+uVaF4lAgDY+ZJA= github.com/flanksource/commons v1.12.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.183 h1:EPJGvrVhc8mvXufwp3TCrNzpMFle/znkoNTiWgxkRrc= -github.com/flanksource/duty v1.0.183/go.mod h1:/TW8OfsHrDt2s7QIpDbTOlOhadJGQ652C5vbapc6a7E= +github.com/flanksource/duty v1.0.188 h1:hr83m0pJO+bV/hRu+LnmkAc6Wj+NA040bgXaYLl95AI= +github.com/flanksource/duty v1.0.188/go.mod h1:6VTHWqN36OIXVm288kaT41TYUW0RdNg2/I5uxfuTJ70= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= -github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= -github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/gomplate/v3 v3.20.13 h1:ibWpQL7Hsf6f0bik9xwRS++nzey/DXVjtzln9MsD/qo= +github.com/flanksource/gomplate/v3 v3.20.13/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230705092916-3b4cf510c5fc/go.mod h1:4pQhmF+TnVqJroQKY8wSnSp+T18oLson6YQ2M0qPHfQ= github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 h1:s6jf6P1pRfdvksVFjIXFRfnimvEYUR0/Mmla1EIjiRM= github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= @@ -874,6 +874,8 @@ github.com/flanksource/ketall v1.1.1 h1:eIHI7FNAG0qC9W7adYzafPUqjQD3Ev/z23x4+tsX github.com/flanksource/ketall v1.1.1/go.mod h1:uuH0BLv3hDVNVu+SQzQ/1fXejfTyGb8wLk7saKsc7mU= github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhGKrA4BtwTE= github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps= +github.com/flanksource/postq v0.1.0 h1:z67bskZFaiKs6Nlzaw2U7FL4S1mu8BoCfonedt/fW70= +github.com/flanksource/postq v0.1.0/go.mod h1:nE3mLh0vpF43TT+0HszC0QmORB37xSWPVHmhOY6PqBk= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -1039,8 +1041,8 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.16.0/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY= -github.com/google/cel-go v0.18.0 h1:u74MPiEC8mejBrkXqrTWT102g5IFEUjxOngzQIijMzU= -github.com/google/cel-go v0.18.0/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= +github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= +github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -1449,6 +1451,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -2117,8 +2121,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.142.0 h1:mf+7EJ94fi5ZcnpPy+m0Yv2dkz8bKm+UL0snTCuwXlY= -google.golang.org/api v0.142.0/go.mod h1:zJAN5o6HRqR7O+9qJUFOWrZkYE66RH+efPBdTLA4xBA= +google.golang.org/api v0.143.0 h1:o8cekTkqhywkbZT6p1UHJPZ9+9uuCAJs/KYomxZB8fA= +google.golang.org/api v0.143.0/go.mod h1:FoX9DO9hT7DLNn97OuoZAGSDuNAXdJRuGK98rSUgurk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -2420,8 +2424,8 @@ k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d h1:/CFeJBjBrZvHX09rObS2+2iEEDevMWYc1v3aIYAjIYI= -k8s.io/kube-openapi v0.0.0-20230918164632-68afd615200d/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= +k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833 h1:iFFEmmB7szQhJP42AvRD2+gzdVP7EuIKY1rJgxf0JZY= +k8s.io/kube-openapi v0.0.0-20230928205116-a78145627833/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/hack/generate-schemas/go.mod b/hack/generate-schemas/go.mod index e139f1b2..4a7e81b3 100644 --- a/hack/generate-schemas/go.mod +++ b/hack/generate-schemas/go.mod @@ -13,14 +13,14 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect - github.com/flanksource/duty v1.0.183 // indirect - github.com/flanksource/gomplate/v3 v3.20.12 // indirect + github.com/flanksource/duty v1.0.188 // indirect + github.com/flanksource/gomplate/v3 v3.20.13 // indirect github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 // indirect github.com/flanksource/mapstructure v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/cel-go v0.18.0 // indirect + github.com/google/cel-go v0.18.1 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/gosimple/slug v1.13.1 // indirect diff --git a/hack/generate-schemas/go.sum b/hack/generate-schemas/go.sum index 8fbef7a6..53e5cace 100644 --- a/hack/generate-schemas/go.sum +++ b/hack/generate-schemas/go.sum @@ -13,10 +13,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/flanksource/commons v1.12.0 h1:8B7+AbRbWH3KVFwbmXYkG3gS42pF+uVaF4lAgDY+ZJA= github.com/flanksource/commons v1.12.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.183 h1:EPJGvrVhc8mvXufwp3TCrNzpMFle/znkoNTiWgxkRrc= -github.com/flanksource/duty v1.0.183/go.mod h1:/TW8OfsHrDt2s7QIpDbTOlOhadJGQ652C5vbapc6a7E= -github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= -github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/duty v1.0.188 h1:hr83m0pJO+bV/hRu+LnmkAc6Wj+NA040bgXaYLl95AI= +github.com/flanksource/duty v1.0.188/go.mod h1:6VTHWqN36OIXVm288kaT41TYUW0RdNg2/I5uxfuTJ70= +github.com/flanksource/gomplate/v3 v3.20.13 h1:ibWpQL7Hsf6f0bik9xwRS++nzey/DXVjtzln9MsD/qo= +github.com/flanksource/gomplate/v3 v3.20.13/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 h1:s6jf6P1pRfdvksVFjIXFRfnimvEYUR0/Mmla1EIjiRM= github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhGKrA4BtwTE= @@ -30,8 +30,8 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/cel-go v0.18.0 h1:u74MPiEC8mejBrkXqrTWT102g5IFEUjxOngzQIijMzU= -github.com/google/cel-go v0.18.0/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= +github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= +github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/jobs/jobs.go b/jobs/jobs.go index 33a6cb42..36ad071c 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -5,15 +5,22 @@ import ( "runtime" "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" "github.com/robfig/cron/v3" ) var FuncScheduler = cron.New() +const ( + PullConfigScrapersFromUpstreamSchedule = "@every 5m" + PushConfigResultsToUpstreamSchedule = "@every 10s" + ReconcileConfigsToUpstreamSchedule = "@every 3h" +) + func ScheduleJobs() { scheduleFunc := func(schedule string, fn func()) { if _, err := FuncScheduler.AddFunc(schedule, fn); err != nil { - logger.Errorf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) + logger.Fatalf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) } } @@ -21,5 +28,21 @@ func ScheduleJobs() { scheduleFunc("@every 24h", DeleteOldConfigAnalysis) scheduleFunc("@every 24h", CleanupConfigItems) + if api.UpstreamConfig.Valid() { + pullJob := &UpstreamPullJob{} + pullJob.Run() + + if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { + logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err) + } + + // Syncs scrape config results to upstream in real-time + if err := StartConsumser(api.DefaultContext); err != nil { + logger.Fatalf("Failed to start event consumer: %v", err) + } + + scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults) + } + FuncScheduler.Start() } diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go new file mode 100644 index 00000000..5934a3f2 --- /dev/null +++ b/jobs/sync_upstream.go @@ -0,0 +1,123 @@ +package jobs + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db" + "github.com/flanksource/duty/models" + "github.com/flanksource/duty/upstream" + "github.com/flanksource/postq/pg" + "gorm.io/gorm/clause" +) + +var ReconcilePageSize int + +const ( + EventPushQueueCreate = "push_queue.create" + eventQueueUpdateChannel = "event_queue_updates" +) + +// ReconcileConfigScraperResults pushes missing scrape config results to the upstream server +func ReconcileConfigScraperResults() { + ctx := api.DefaultContext + + jobHistory := models.NewJobHistory("PushUpstream", "Config", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize) + if err := reconciler.SyncAfter(ctx, "config_items", time.Hour*48); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("failed to sync table config_items: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +// UpstreamPullJob pulls scrape configs from the upstream server +type UpstreamPullJob struct { + lastRuntime time.Time +} + +func (t *UpstreamPullJob) Run() { + jobHistory := models.NewJobHistory("PullUpstream", "Config", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + if err := t.pull(api.DefaultContext, api.UpstreamConfig); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error pulling scrape configs from upstream: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { + logger.Tracef("pulling scrape configs from upstream since: %v", t.lastRuntime) + + endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName) + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) + } + + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return fmt.Errorf("error creating new http request: %w", err) + } + + req.SetBasicAuth(config.Username, config.Password) + + params := url.Values{} + params.Add("since", t.lastRuntime.Format(time.RFC3339Nano)) + req.URL.RawQuery = params.Encode() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("server returned unexpected status:%s (%s)", resp.Status, body) + } + + var scrapeConfigs []models.ConfigScraper + if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil { + return fmt.Errorf("error decoding JSON response: %w", err) + } + + if len(scrapeConfigs) == 0 { + return nil + } + + t.lastRuntime = scrapeConfigs[len(scrapeConfigs)-1].UpdatedAt + + logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs)) + + return ctx.DB().Omit("agent_id").Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(&scrapeConfigs).Error +} + +func StartConsumser(ctx api.ScrapeContext) error { + consumer, err := upstream.NewPushQueueConsumer(api.UpstreamConfig).EventConsumer() + if err != nil { + return err + } + + pgNotifyChannel := make(chan string) + go pg.Listen(ctx, eventQueueUpdateChannel, pgNotifyChannel) + + go consumer.Listen(ctx, pgNotifyChannel) + return nil +}