From c44a2b2037a5edcf2bd9ca39bd3bfb7e9fc877f4 Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Wed, 12 Jun 2024 17:09:49 -0500 Subject: [PATCH 1/8] fix: add custom step output scanner with error detection (#216) * feat: add custom step output scanner with error detection * fix lint * improve keywords * add comment * fix exec error handling bug and update error detection * fix err handling * fix lint * simplify logic * require approval for destroy stages too --------- Co-authored-by: michaeljguarino --- go.mod | 3 +- go.sum | 7 +-- pkg/harness/controller/controller.go | 42 +++++++++------ pkg/harness/controller/controller_hooks.go | 2 +- pkg/harness/exec/analyzer.go | 49 +++++++++++++++++ pkg/harness/exec/analyzer_heuristic.go | 63 ++++++++++++++++++++++ pkg/harness/exec/analyzer_types.go | 46 ++++++++++++++++ pkg/harness/exec/exec.go | 46 ++++++++++++---- pkg/harness/exec/exec_options.go | 6 +++ pkg/harness/exec/exec_types.go | 3 ++ 10 files changed, 233 insertions(+), 34 deletions(-) create mode 100644 pkg/harness/exec/analyzer.go create mode 100644 pkg/harness/exec/analyzer_heuristic.go create mode 100644 pkg/harness/exec/analyzer_types.go diff --git a/go.mod b/go.mod index 96567cc1..af76da4d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/open-policy-agent/gatekeeper/v3 v3.15.1 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 - github.com/pluralsh/console-client-go v0.5.13 + github.com/pluralsh/console-client-go v0.5.18 github.com/pluralsh/controller-reconcile-helper v0.0.4 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 @@ -39,7 +39,6 @@ require ( k8s.io/apimachinery v0.29.3 k8s.io/cli-runtime v0.29.2 k8s.io/client-go v0.29.2 - k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.110.1 k8s.io/kubectl v0.29.2 layeh.com/gopher-luar v1.0.11 diff --git a/go.sum b/go.sum index 627a5186..20483e8f 100644 --- a/go.sum +++ b/go.sum @@ -207,7 +207,6 @@ github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= @@ -527,8 +526,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= -github.com/pluralsh/console-client-go v0.5.13 h1:HOmkho5aaU42f6PkSb+BUFjhCJKnL5jceLZiT16HMBE= -github.com/pluralsh/console-client-go v0.5.13/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= +github.com/pluralsh/console-client-go v0.5.18 h1:uwYsoGaggvi3uPZYL/+qdhvgl73sGBiuVUfQGAC/J4c= +github.com/pluralsh/console-client-go v0.5.18/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E= github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= @@ -1095,8 +1094,6 @@ k8s.io/client-go v0.29.2 h1:FEg85el1TeZp+/vYJM7hkDlSTFZ+c5nnK44DJ4FyoRg= k8s.io/client-go v0.29.2/go.mod h1:knlvFZE58VpqbQpJNbCbctTVXcd35mMyAAwBdpt4jrA= k8s.io/component-base v0.29.2 h1:lpiLyuvPA9yV1aQwGLENYyK7n/8t6l3nn3zAtFTJYe8= k8s.io/component-base v0.29.2/go.mod h1:BfB3SLrefbZXiBfbM+2H1dlat21Uewg/5qtKOl8degM= -k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= -k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= diff --git a/pkg/harness/controller/controller.go b/pkg/harness/controller/controller.go index f7f4c0f9..9cd8880e 100644 --- a/pkg/harness/controller/controller.go +++ b/pkg/harness/controller/controller.go @@ -100,6 +100,10 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient. // ensuring they have completed all work. in.wg.Add(1) + toolWriters := make([]io.WriteCloser, 0) + modifier := in.tool.Modifier(step.Stage) + args := step.Args + env := in.stackRun.Env() consoleWriter := sink.NewConsoleWriter( ctx, in.consoleClient, @@ -115,29 +119,35 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient. )..., ) - var toolWriters []io.WriteCloser - modifier := in.tool.Modifier(step.Stage) - args := step.Args - env := in.stackRun.Env() if modifier != nil { args = modifier.Args(args) env = modifier.Env(env) toolWriters = modifier.WriteCloser() } - return exec.NewExecutable( - step.Cmd, - append( - in.execOptions, - exec.WithDir(in.execWorkDir()), - exec.WithEnv(env), - exec.WithArgs(args), - exec.WithID(step.ID), - exec.WithOutputSinks(append(toolWriters, consoleWriter)...), - exec.WithHook(v1.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)), - exec.WithHook(v1.LifecyclePostStart, in.postExecHook(step.Stage)), - )..., + // base executable options + options := in.execOptions + options = append( + options, + exec.WithDir(in.execWorkDir()), + exec.WithEnv(env), + exec.WithArgs(args), + exec.WithID(step.ID), + exec.WithOutputSinks(append(toolWriters, consoleWriter)...), + exec.WithHook(v1.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)), + exec.WithHook(v1.LifecyclePostStart, in.postExecHook(step.Stage)), + exec.WithOutputAnalyzer(exec.NewKeywordDetector()), ) + + // Add a custom run step output analyzer for the destroy stage to increase + // a chance of detecting errors during execution. On occasion executable can + // return exit code 0 even though there was a fatal error during execution. + // TODO: use destroy stage + // if step.Stage == gqlclient.StepStageApply { + // options = append(options, exec.WithOutputAnalyzer(exec.NewKeywordDetector())) + //} + + return exec.NewExecutable(step.Cmd, options...) } func (in *stackRunController) completeStackRun(status gqlclient.StackStatus, stackRunErr error) error { diff --git a/pkg/harness/controller/controller_hooks.go b/pkg/harness/controller/controller_hooks.go index 7be8b776..2c8ff5b8 100644 --- a/pkg/harness/controller/controller_hooks.go +++ b/pkg/harness/controller/controller_hooks.go @@ -91,7 +91,7 @@ func (in *stackRunController) postExecHook(stage gqlclient.StepStage) v1.HookFun // postExecHook is a callback function started by the exec.Executable before it runs the executable. func (in *stackRunController) preExecHook(stage gqlclient.StepStage, id string) v1.HookFunction { return func() error { - if stage == gqlclient.StepStageApply && in.requiresApproval() { + if (stage == gqlclient.StepStageApply || stage == gqlclient.StepStageDestroy) && in.requiresApproval() { in.waitForApproval() } diff --git a/pkg/harness/exec/analyzer.go b/pkg/harness/exec/analyzer.go new file mode 100644 index 00000000..728df984 --- /dev/null +++ b/pkg/harness/exec/analyzer.go @@ -0,0 +1,49 @@ +package exec + +import ( + "bufio" + "bytes" + "fmt" + "io" + "strings" +) + +type outputAnalyzer struct { + stdout *bytes.Buffer + stderr *bytes.Buffer + + heuristics []OutputAnalyzerHeuristic +} + +func (in *outputAnalyzer) Stdout() io.Writer { + return in.stdout +} + +func (in *outputAnalyzer) Stderr() io.Writer { + return in.stderr +} + +func (in *outputAnalyzer) Detect() []error { + errors := make([]error, 0) + output := in.stdout.String() + + for _, heuristic := range in.heuristics { + if potentialErrors := heuristic.Detect(bufio.NewScanner(strings.NewReader(output))); len(potentialErrors) > 0 { + errors = append(errors, potentialErrors.ToErrors()...) + } + } + + if in.stderr.Len() > 0 { + errors = append(errors, fmt.Errorf("%s", in.stderr.String())) + } + + return errors +} + +func NewOutputAnalyzer(heuristics ...OutputAnalyzerHeuristic) OutputAnalyzer { + return &outputAnalyzer{ + stdout: bytes.NewBuffer([]byte{}), + stderr: bytes.NewBuffer([]byte{}), + heuristics: heuristics, + } +} diff --git a/pkg/harness/exec/analyzer_heuristic.go b/pkg/harness/exec/analyzer_heuristic.go new file mode 100644 index 00000000..2500ce26 --- /dev/null +++ b/pkg/harness/exec/analyzer_heuristic.go @@ -0,0 +1,63 @@ +package exec + +import ( + "bufio" + "strings" + + "github.com/pluralsh/polly/algorithms" +) + +type keywordDetector struct { + keywords []keyword +} + +type keyword struct { + content string + ignoreCase bool +} + +func (in keyword) PartOf(s string) bool { + if !in.ignoreCase { + return strings.Contains(s, in.content) + } + + return strings.Contains( + strings.ToLower(s), + strings.ToLower(in.content), + ) +} + +// Detect implements [OutputAnalyzerHeuristic] interface. +// TODO: we can spread actual message analysis into multiple routines to speed up the process. +func (in *keywordDetector) Detect(input *bufio.Scanner) Errors { + line := 0 + errors := make([]Error, 0) + for input.Scan() { + if !in.hasError(input.Text()) { + continue + } + + errors = append(errors, Error{ + line: line, + message: input.Text(), + }) + } + + return errors +} + +func (in *keywordDetector) hasError(message string) bool { + return algorithms.Index(in.keywords, func(k keyword) bool { + return k.PartOf(message) + }) >= 0 +} + +func NewKeywordDetector() OutputAnalyzerHeuristic { + return &keywordDetector{ + keywords: []keyword{ + {"error message: http remote state already locked", true}, + {"error acquiring the state lock", true}, + {"Error:", false}, + }, + } +} diff --git a/pkg/harness/exec/analyzer_types.go b/pkg/harness/exec/analyzer_types.go new file mode 100644 index 00000000..77d60b7e --- /dev/null +++ b/pkg/harness/exec/analyzer_types.go @@ -0,0 +1,46 @@ +package exec + +import ( + "bufio" + "fmt" + "io" +) + +// OutputAnalyzer captures the command output +// and attempts to detect potential errors. +type OutputAnalyzer interface { + Stdout() io.Writer + Stderr() io.Writer + + // Detect scans the output for potential errors. + // It uses a custom heuristics to detect issues. + // It can result in a false positives. + // + // Note: Make sure that it is executed after Write + // has finished to ensure proper detection. + Detect() []error +} + +type OutputAnalyzerHeuristic interface { + Detect(input *bufio.Scanner) Errors +} + +type Error struct { + line int + message string +} + +func (in Error) ToError() error { + return fmt.Errorf("[%d] %s", in.line, in.message) +} + +type Errors []Error + +func (in Errors) ToErrors() []error { + errors := make([]error, len(in)) + for _, err := range in { + errors = append(errors, err.ToError()) + } + + return errors +} diff --git a/pkg/harness/exec/exec.go b/pkg/harness/exec/exec.go index 51fb4880..39419893 100644 --- a/pkg/harness/exec/exec.go +++ b/pkg/harness/exec/exec.go @@ -2,6 +2,7 @@ package exec import ( "context" + "errors" "fmt" "io" "os" @@ -35,6 +36,10 @@ func (in *executable) Run(ctx context.Context) error { cmd.Stdout = w cmd.Stderr = w + if in.outputAnalyzer != nil { + cmd.Stderr = io.MultiWriter(w, in.outputAnalyzer.Stderr()) + } + // Configure environment of the executable. // Root process environment is used as a base and passed in env vars // are added on top of that. In case of duplicate keys, custom env @@ -47,13 +52,17 @@ func (in *executable) Run(ctx context.Context) error { klog.V(log.LogLevelExtended).InfoS("executing", "command", in.Command()) if err := cmd.Run(); err != nil { - if err = context.Cause(ctx); err != nil { + if err := context.Cause(ctx); err != nil { return err } return err } + if err := in.analyze(); err != nil { + return err + } + return in.runLifecycleFunction(v1.LifecyclePostStart) } @@ -87,17 +96,22 @@ func (in *executable) ID() string { } func (in *executable) writer() io.Writer { - if len(in.outputSinks) == 0 { - return os.Stdout - } + writers := []io.Writer{os.Stdout} - return io.MultiWriter( - append( - algorithms.Map(in.outputSinks, func(writer io.WriteCloser) io.Writer { + if len(in.outputSinks) > 0 { + writers = append(writers, algorithms.Map( + in.outputSinks, + func(writer io.WriteCloser) io.Writer { return writer - }), - os.Stdout)..., - ) + })..., + ) + } + + if in.outputAnalyzer != nil { + writers = append(writers, in.outputAnalyzer.Stdout()) + } + + return io.MultiWriter(writers...) } func (in *executable) close(writers []io.WriteCloser) { @@ -120,6 +134,18 @@ func (in *executable) runLifecycleFunction(lifecycle v1.Lifecycle) error { return nil } +func (in *executable) analyze() error { + if in.outputAnalyzer == nil { + return nil + } + + if err := in.outputAnalyzer.Detect(); len(err) > 0 { + return errors.Join(err...) + } + + return nil +} + func NewExecutable(command string, options ...Option) Executable { result := &executable{ command: command, diff --git a/pkg/harness/exec/exec_options.go b/pkg/harness/exec/exec_options.go index 6c741a55..67792eff 100644 --- a/pkg/harness/exec/exec_options.go +++ b/pkg/harness/exec/exec_options.go @@ -50,3 +50,9 @@ func WithTimeout(timeout time.Duration) Option { e.timeout = timeout } } + +func WithOutputAnalyzer(heuristics ...OutputAnalyzerHeuristic) Option { + return func(e *executable) { + e.outputAnalyzer = NewOutputAnalyzer(heuristics...) + } +} diff --git a/pkg/harness/exec/exec_types.go b/pkg/harness/exec/exec_types.go index 9ea3a067..f35febaf 100644 --- a/pkg/harness/exec/exec_types.go +++ b/pkg/harness/exec/exec_types.go @@ -44,6 +44,9 @@ type executable struct { // to the [os.Stdout]. outputSinks []io.WriteCloser + // outputAnalyzer + outputAnalyzer OutputAnalyzer + // hookFunctions ... hookFunctions map[v1.Lifecycle]v1.HookFunction } From 72d976098c67b2b6afdaeab54df5c8e82b8c86ff Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 12 Jun 2024 18:43:45 -0400 Subject: [PATCH 2/8] Updated chart to release v0.4.35 (#218) Co-authored-by: michaeljguarino --- charts/deployment-operator/Chart.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/charts/deployment-operator/Chart.yaml b/charts/deployment-operator/Chart.yaml index 0b634083..b4f5407d 100644 --- a/charts/deployment-operator/Chart.yaml +++ b/charts/deployment-operator/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: deployment-operator description: creates a new instance of the plural deployment operator -appVersion: 0.4.34 -version: 0.4.34 +appVersion: 0.4.35 +version: 0.4.35 maintainers: - name: Plural url: https://www.plural.sh From 7243acc17df38ccc0110d3c0cd8cddf79217982f Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Mon, 17 Jun 2024 10:00:27 -0500 Subject: [PATCH 3/8] fix: do not allow discovery api call for helm templating to stop the agent (#222) * do not allow discovery api call for helm templating to stop the agent * bump default number of reconciles to 20 * update logic * fix lint --- cmd/agent/agent.go | 23 ++++++++------- cmd/agent/options.go | 2 +- internal/helpers/poll.go | 24 +++++++++++++++ pkg/controller/service/reconciler.go | 44 +++++++++++++--------------- 4 files changed, 57 insertions(+), 36 deletions(-) create mode 100644 internal/helpers/poll.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index deb3f5db..adc68632 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -7,39 +7,40 @@ import ( "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/controller/stacks" + "github.com/samber/lo" + "golang.org/x/net/context" + "k8s.io/client-go/rest" + ctrclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/controller/namespaces" "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" "github.com/pluralsh/deployment-operator/pkg/controller/restore" "github.com/pluralsh/deployment-operator/pkg/controller/service" - "github.com/samber/lo" - "golang.org/x/net/context" - "k8s.io/client-go/rest" - ctrclient "sigs.k8s.io/controller-runtime/pkg/client" ) func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) { r, err := time.ParseDuration(opt.refreshInterval) if err != nil { - setupLog.Error(err, "unable to get refresh interval") + setupLog.Error("unable to get refresh interval", "error", err) os.Exit(1) } t, err := time.ParseDuration(opt.processingTimeout) if err != nil { - setupLog.Error(err, "unable to get processing timeout") + setupLog.Errorw("unable to get processing timeout", "error", err) os.Exit(1) } mgr, err := controller.NewControllerManager(ctx, opt.maxConcurrentReconciles, t, r, lo.ToPtr(true), opt.consoleUrl, opt.deployToken, opt.clusterId) if err != nil { - setupLog.Error(err, "unable to create manager") + setupLog.Errorw("unable to create manager", "error", err) os.Exit(1) } sr, err := service.NewServiceReconciler(ctx, mgr.GetClient(), config, r, opt.restoreNamespace) if err != nil { - setupLog.Error(err, "unable to create service reconciler") + setupLog.Errorw("unable to create service reconciler", "error", err) os.Exit(1) } mgr.AddController(&controller.Controller{ @@ -49,7 +50,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient }) gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, r, opt.clusterId) if err != nil { - setupLog.Error(err, "unable to create gate reconciler") + setupLog.Errorw("unable to create gate reconciler", "error", err) os.Exit(1) } mgr.AddController(&controller.Controller{ @@ -74,7 +75,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient namespace, err := utils.GetOperatorNamespace() if err != nil { - setupLog.Error(err, "unable to get operator namespace") + setupLog.Errorw("unable to get operator namespace", "error", err) os.Exit(1) } @@ -85,7 +86,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient Queue: s.StackQueue, }) if err := mgr.Start(); err != nil { - setupLog.Error(err, "unable to start controller manager") + setupLog.Errorw("unable to start controller manager", "error", err) os.Exit(1) } diff --git a/cmd/agent/options.go b/cmd/agent/options.go index 838d5fd8..382b2d3b 100644 --- a/cmd/agent/options.go +++ b/cmd/agent/options.go @@ -38,7 +38,7 @@ func newOptions() *options { flag.StringVar(&o.metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&o.probeAddr, "health-probe-bind-address", ":9001", "The address the probe endpoint binds to.") flag.BoolVar(&o.enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") - flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 10, "Maximum number of concurrent reconciles which can be run.") + flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.") flag.IntVar(&o.resyncSeconds, "resync-seconds", 300, "Resync duration in seconds.") flag.StringVar(&o.refreshInterval, "refresh-interval", "2m", "Refresh interval duration.") flag.StringVar(&o.processingTimeout, "processing-timeout", "1m", "Maximum amount of time to spend trying to process queue item.") diff --git a/internal/helpers/poll.go b/internal/helpers/poll.go new file mode 100644 index 00000000..dd54cc3a --- /dev/null +++ b/internal/helpers/poll.go @@ -0,0 +1,24 @@ +package helpers + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// BackgroundPollUntilContextCancel spawns a new goroutine that runs the condition function on interval. +// If syncFirstRun is set to true, it will execute the condition function synchronously first and then start +// polling. Since error is returned synchronously, the only way to check for it is to use syncFirstRun. +// Background poller does not sync errors. It can be stopped externally by cancelling the provided context. +func BackgroundPollUntilContextCancel(ctx context.Context, interval time.Duration, immediate, syncFirstRun bool, condition wait.ConditionWithContextFunc) (err error) { + if syncFirstRun { + _, err = condition(ctx) + } + + go func() { + _ = wait.PollUntilContextCancel(ctx, interval, immediate, condition) + }() + + return err +} diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index feba3bf6..7935ac64 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -7,23 +7,11 @@ import ( "time" console "github.com/pluralsh/console-client-go" - clienterrors "github.com/pluralsh/deployment-operator/internal/errors" - "github.com/pluralsh/deployment-operator/internal/utils" - "github.com/pluralsh/deployment-operator/pkg/applier" - "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" - plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" - "github.com/pluralsh/deployment-operator/pkg/manifests" - manis "github.com/pluralsh/deployment-operator/pkg/manifests" - "github.com/pluralsh/deployment-operator/pkg/manifests/template" - "github.com/pluralsh/deployment-operator/pkg/ping" - "github.com/pluralsh/deployment-operator/pkg/websocket" "github.com/pluralsh/polly/algorithms" "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -34,6 +22,19 @@ import ( "sigs.k8s.io/cli-utils/pkg/inventory" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + clienterrors "github.com/pluralsh/deployment-operator/internal/errors" + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/applier" + "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" + plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" + "github.com/pluralsh/deployment-operator/pkg/manifests" + manis "github.com/pluralsh/deployment-operator/pkg/manifests" + "github.com/pluralsh/deployment-operator/pkg/manifests/template" + "github.com/pluralsh/deployment-operator/pkg/ping" + "github.com/pluralsh/deployment-operator/pkg/websocket" ) func init() { @@ -106,19 +107,14 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf if err != nil { return nil, err } - if err := CapabilitiesAPIVersions(discoveryClient); err != nil { - return nil, err - } - go func() { - //nolint:all - _ = wait.PollImmediateInfinite(time.Minute*5, func() (done bool, err error) { - if err := CapabilitiesAPIVersions(discoveryClient); err != nil { - logger.Error(err, "can't fetch API versions") - } - return false, nil - }) - }() + _ = helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { + if err := CapabilitiesAPIVersions(discoveryClient); err != nil { + logger.Error(err, "can't fetch API versions") + } + + return false, nil + }) return &ServiceReconciler{ ConsoleClient: consoleClient, From a3cbfb6605603db70b2fe96efcf5192afea26fbb Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Mon, 17 Jun 2024 12:27:37 -0500 Subject: [PATCH 4/8] feat: scrape minimum kubelet version from k8s nodes (#223) * feat: scrape minimum kubelet version from k8s nodes * log pinger errors --- pkg/ping/build.go | 3 ++- pkg/ping/pinger.go | 51 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/pkg/ping/build.go b/pkg/ping/build.go index 8991c7b7..7b0d88e7 100644 --- a/pkg/ping/build.go +++ b/pkg/ping/build.go @@ -8,10 +8,11 @@ import ( "k8s.io/apimachinery/pkg/version" ) -func pingAttributes(info *version.Info, pods []string) console.ClusterPing { +func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing { vs := strings.Split(info.GitVersion, "-") return console.ClusterPing{ CurrentVersion: strings.TrimPrefix(vs[0], "v"), Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), + KubeletVersion: minKubeletVersion, } } diff --git a/pkg/ping/pinger.go b/pkg/ping/pinger.go index 3149e605..985c962f 100644 --- a/pkg/ping/pinger.go +++ b/pkg/ping/pinger.go @@ -3,12 +3,16 @@ package ping import ( "context" - "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/Masterminds/semver/v3" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" "k8s.io/kubectl/pkg/cmd/util" + + "github.com/pluralsh/deployment-operator/pkg/client" ) type Pinger struct { @@ -28,23 +32,27 @@ func New(console client.Client, discovery *discovery.DiscoveryClient, factory ut func (p *Pinger) Ping() error { info, err := p.discoveryClient.ServerVersion() if err != nil { + klog.ErrorS(err, "failed to get server version") return err } cs, err := p.factory.KubernetesClientSet() if err != nil { + klog.ErrorS(err, "failed to create kubernetes clientset") return nil } - podNames := []string{} + var podNames []string // can find some distro information by checking what's running in kube-system - if pods, err := cs.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{}); err == nil { + if pods, err := cs.CoreV1().Pods("kube-system").List(context.Background(), metav1.ListOptions{}); err == nil { podNames = lo.Map(pods.Items, func(pod corev1.Pod, ind int) string { return pod.Name }) } - attrs := pingAttributes(info, podNames) + minKubeletVersion := p.minimumKubeletVersion(cs) + + attrs := pingAttributes(info, podNames, minKubeletVersion) if err := p.consoleClient.PingCluster(attrs); err != nil { attrs.Distro = nil return p.consoleClient.PingCluster(attrs) // fallback to no distro to support old console servers @@ -52,3 +60,38 @@ func (p *Pinger) Ping() error { return nil } + +// minimumKubeletVersion tries to scrape a minimum kubelet version across all nodes in the cluster. +// It is expected that the kubelet will report to the API a valid SemVer-ish version. +// If no parsable version is found across all nodes, nil will be returned. +func (p *Pinger) minimumKubeletVersion(client *kubernetes.Clientset) *string { + nodes, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + klog.ErrorS(err, "failed to list nodes") + return nil + } + + minKubeletVersion := new(semver.Version) + for _, node := range nodes.Items { + kubeletVersion, _ := semver.NewVersion(node.Status.NodeInfo.KubeletVersion) + if kubeletVersion == nil { + continue + } + + // Initialize with first correctly parsed version + if len(minKubeletVersion.Original()) == 0 { + minKubeletVersion = kubeletVersion + continue + } + + if kubeletVersion.LessThan(minKubeletVersion) { + minKubeletVersion = kubeletVersion + } + } + + if len(minKubeletVersion.Original()) == 0 { + return nil + } + + return lo.ToPtr(minKubeletVersion.Original()) +} From fcd0e30450e18a551345aed611898b7e26f20729 Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Mon, 17 Jun 2024 20:45:31 +0200 Subject: [PATCH 5/8] feat: add argo rollouts resume support (#220) * add argo rollouts resume support * fix gofmt * fix linter * prevent second promotion * add rollback * apply review comments * fix health status * use status code * kick service reconciler on rollout promote --------- Co-authored-by: michaeljguarino --- cmd/agent/main.go | 45 ++++- go.mod | 11 +- go.sum | 22 +-- internal/controller/argorollout_controller.go | 160 ++++++++++++++++++ internal/utils/argorollout.go | 105 ++++++++++++ pkg/controller/service/health.go | 33 ++++ pkg/controller/service/reconciler_status.go | 11 +- pkg/controller/service/status_collector.go | 9 +- 8 files changed, 375 insertions(+), 21 deletions(-) create mode 100644 internal/controller/argorollout_controller.go create mode 100644 internal/utils/argorollout.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 84537d54..0cc37549 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -1,8 +1,13 @@ package main import ( + "net/http" "os" + "time" + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" + rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + roclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" templatesv1 "github.com/open-policy-agent/frameworks/constraint/pkg/apis/templates/v1" constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -10,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -32,9 +39,15 @@ func init() { utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) utilruntime.Must(constraintstatusv1beta1.AddToScheme(scheme)) utilruntime.Must(templatesv1.AddToScheme(scheme)) + utilruntime.Must(rolloutv1alpha1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } +const ( + httpClientTimout = time.Second * 5 + httpCacheExpiryTime = time.Second * 2 +) + func main() { opt := newOptions() config := ctrl.GetConfigOrDie() @@ -50,7 +63,21 @@ func main() { setupLog.Error(err, "unable to create manager") os.Exit(1) } - + rolloutsClient, err := roclientset.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create rollouts client") + os.Exit(1) + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create dynamic client") + os.Exit(1) + } + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create kubernetes client") + os.Exit(1) + } setupLog.Info("starting agent") ctrlMgr, serviceReconciler, gateReconciler := runAgent(opt, config, ctx, mgr.GetClient()) @@ -70,6 +97,17 @@ func main() { ConsoleClient: ctrlMgr.GetClient(), Reader: mgr.GetCache(), } + argoRolloutController := &controller.ArgoRolloutReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConsoleClient: ctrlMgr.GetClient(), + ConsoleURL: opt.consoleUrl, + HttpClient: &http.Client{Timeout: httpClientTimout}, + ArgoClientSet: rolloutsClient, + DynamicClient: dynamicClient, + SvcReconciler: serviceReconciler, + KubeClient: kubeClient, + } reconcileGroups := map[schema.GroupVersionKind]controller.SetupWithManager{ { @@ -87,6 +125,11 @@ func main() { Version: "v1beta1", Kind: "ConstraintPodStatus", }: constraintController.SetupWithManager, + { + Group: rolloutv1alpha1.SchemeGroupVersion.Group, + Version: rolloutv1alpha1.SchemeGroupVersion.Version, + Kind: rollouts.RolloutKind, + }: argoRolloutController.SetupWithManager, } if err = (&controller.CrdRegisterControllerReconciler{ diff --git a/go.mod b/go.mod index af76da4d..2d461195 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.1 github.com/Masterminds/sprig/v3 v3.2.3 github.com/Yamashou/gqlgenc v0.18.1 + github.com/argoproj/argo-rollouts v1.6.6 github.com/elastic/crd-ref-docs v0.0.12 github.com/evanphx/json-patch v5.7.0+incompatible github.com/fluxcd/flagger v1.35.0 @@ -19,7 +20,7 @@ require ( github.com/open-policy-agent/gatekeeper/v3 v3.15.1 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 - github.com/pluralsh/console-client-go v0.5.18 + github.com/pluralsh/console-client-go v0.7.0 github.com/pluralsh/controller-reconcile-helper v0.0.4 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 @@ -87,7 +88,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.8.0 // indirect - github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect + github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect github.com/fatih/camelcase v1.0.0 // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -111,7 +112,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/btree v1.0.1 // indirect + github.com/google/btree v1.1.2 // indirect github.com/google/cel-go v0.17.7 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect @@ -122,7 +123,7 @@ require ( github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/gosuri/uitable v0.0.4 // indirect - github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect + github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -184,7 +185,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/sosodev/duration v1.2.0 // indirect github.com/spf13/afero v1.9.3 // indirect - github.com/spf13/cast v1.5.0 // indirect + github.com/spf13/cast v1.5.1 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.15.0 // indirect diff --git a/go.sum b/go.sum index 20483e8f..ee757816 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= +github.com/argoproj/argo-rollouts v1.6.6 h1:JCJ0cGAwWkh2xCAHZ1OQmrobysRjCatmG9IZaLJpS1g= +github.com/argoproj/argo-rollouts v1.6.6/go.mod h1:X2kTiBaYCSounmw1kmONdIZTwJNzNQYC0SrXUgSw9UI= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= @@ -173,8 +175,8 @@ github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1u0KQro= github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= -github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d h1:105gxyaGwCFad8crR9dcMQWvV9Hvulu6hwUh4tWPJnM= -github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4= +github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f h1:Wl78ApPPB2Wvf/TIe2xdyJxTlb6obmF18d8QdkxNDu4= +github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f/go.mod h1:OSYXu++VVOHnXeitef/D8n/6y4QV8uLHSFXX4NeXMGc= github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8= github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= @@ -289,8 +291,8 @@ github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= -github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= +github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= +github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.17.7 h1:6ebJFzu1xO2n7TLtN+UBqShGBhlD85bhvglh5DpcfqQ= github.com/google/cel-go v0.17.7/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -345,8 +347,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gosuri/uitable v0.0.4 h1:IG2xLKRvErL3uhY6e1BylFzG+aJiwQviDDTfOKeKTpY= github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16vt0PJo= -github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= -github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= +github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= @@ -526,8 +528,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= -github.com/pluralsh/console-client-go v0.5.18 h1:uwYsoGaggvi3uPZYL/+qdhvgl73sGBiuVUfQGAC/J4c= -github.com/pluralsh/console-client-go v0.5.18/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= +github.com/pluralsh/console-client-go v0.7.0 h1:7BcvftmKhssYd8F06NGsWXKxs7O3K8gQDYrQebvbmHE= +github.com/pluralsh/console-client-go v0.7.0/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E= github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= @@ -590,8 +592,8 @@ github.com/sosodev/duration v1.2.0/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERA github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk= github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= -github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= +github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= +github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= diff --git a/internal/controller/argorollout_controller.go b/internal/controller/argorollout_controller.go new file mode 100644 index 00000000..f286b051 --- /dev/null +++ b/internal/controller/argorollout_controller.go @@ -0,0 +1,160 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + "net/url" + "time" + + clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1" + "github.com/argoproj/argo-rollouts/pkg/kubectl-argo-rollouts/cmd/abort" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" + rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + roclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" + console "github.com/pluralsh/console-client-go" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller/service" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + k8sClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + inventoryAnnotationName = "config.k8s.io/owning-inventory" + closed = "closed" +) + +var requeueRollout = ctrl.Result{RequeueAfter: time.Second * 5} + +// ArgoRolloutReconciler reconciles a Argo Rollout custom resource. +type ArgoRolloutReconciler struct { + k8sClient.Client + Scheme *runtime.Scheme + ConsoleClient client.Client + ConsoleURL string + HttpClient *http.Client + ArgoClientSet roclientset.Interface + DynamicClient dynamic.Interface + KubeClient kubernetes.Interface + SvcReconciler *service.ServiceReconciler +} + +// Reconcile Argo Rollout custom resources to ensure that Console stays in sync with Kubernetes cluster. +func (r *ArgoRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Read resource from Kubernetes cluster. + rollout := &rolloutv1alpha1.Rollout{} + if err := r.Get(ctx, req.NamespacedName, rollout); err != nil { + logger.Error(err, "unable to fetch rollout") + return ctrl.Result{}, k8sClient.IgnoreNotFound(err) + } + + if !rollout.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + serviceID, ok := rollout.Annotations[inventoryAnnotationName] + if !ok { + return ctrl.Result{}, nil + } + if serviceID == "" { + return ctrl.Result{}, fmt.Errorf("the service ID from the inventory annotation is empty") + } + service, err := r.ConsoleClient.GetService(serviceID) + if err != nil { + return ctrl.Result{}, err + } + consoleURL, err := sanitizeURL(r.ConsoleURL) + if err != nil { + return ctrl.Result{}, err + } + if rollout.Status.Phase == rolloutv1alpha1.RolloutPhasePaused { + // wait until the agent will change component status + if !hasPausedRolloutComponent(service) { + return requeueRollout, nil + } + + rolloutIf := r.ArgoClientSet.ArgoprojV1alpha1().Rollouts(rollout.Namespace) + promoteURL := fmt.Sprintf("%s/ext/v1/gate/%s", consoleURL, serviceID) + rollbackURL := fmt.Sprintf("%s/ext/v1/rollback/%s", consoleURL, serviceID) + + promoteResponse, err := r.get(promoteURL) + if err != nil { + return ctrl.Result{}, err + } + if promoteResponse == http.StatusOK { + return ctrl.Result{}, r.promote(ctx, rolloutIf, rollout, serviceID) + } + rollbackResponse, err := r.get(rollbackURL) + if err != nil { + return ctrl.Result{}, err + } + if rollbackResponse == http.StatusOK { + return ctrl.Result{}, r.rollback(rolloutIf, rollout) + } + return requeueRollout, nil + } + return ctrl.Result{}, nil +} + +func (r *ArgoRolloutReconciler) promote(ctx context.Context, rolloutIf clientset.RolloutInterface, rollout *rolloutv1alpha1.Rollout, svcId string) error { + if _, err := utils.PromoteRollout(ctx, rolloutIf, rollout.Name); err != nil { + return err + } + + if r.SvcReconciler != nil { + r.SvcReconciler.SvcQueue.AddRateLimited(svcId) + } + return nil +} + +func (r *ArgoRolloutReconciler) rollback(rolloutIf clientset.RolloutInterface, rollout *rolloutv1alpha1.Rollout) error { + if _, err := abort.AbortRollout(rolloutIf, rollout.Name); err != nil { + return err + } + return nil +} + +func hasPausedRolloutComponent(service *console.GetServiceDeploymentForAgent_ServiceDeployment) bool { + for _, component := range service.Components { + if component.Kind == rollouts.RolloutKind { + if component.State != nil && *component.State == console.ComponentStatePaused { + return true + } + } + } + return false +} + +func sanitizeURL(consoleURL string) (string, error) { + u, err := url.Parse(consoleURL) + if err != nil { + return "", err + } + return fmt.Sprintf("%s://%s", u.Scheme, u.Host), nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *ArgoRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&rolloutv1alpha1.Rollout{}). + Complete(r) +} + +func (r *ArgoRolloutReconciler) get(url string) (int, error) { + // Make the HTTP request + resp, err := r.HttpClient.Get(url) + if err != nil { + return http.StatusInternalServerError, err + } + defer resp.Body.Close() + + return resp.StatusCode, nil +} diff --git a/internal/utils/argorollout.go b/internal/utils/argorollout.go new file mode 100644 index 00000000..8f7b3480 --- /dev/null +++ b/internal/utils/argorollout.go @@ -0,0 +1,105 @@ +package utils + +import ( + "context" + "fmt" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +const ( + unpausePatch = `{"spec":{"paused":false}}` + clearPauseConditionsPatch = `{"status":{"pauseConditions":null}}` + clearPauseConditionsAndControllerPausePatch = `{"status":{"pauseConditions":null, "controllerPause":false, "currentStepIndex":%d}}` + unpauseAndClearPauseConditionsPatch = `{"spec":{"paused":false},"status":{"pauseConditions":null}}` + clearPauseConditionsPatchWithStep = `{"status":{"pauseConditions":null, "currentStepIndex":%d}}` + unpauseAndClearPauseConditionsPatchWithStep = `{"spec":{"paused":false},"status":{"pauseConditions":null, "currentStepIndex":%d}}` +) + +// PromoteRollout promotes a rollout to the next step, or to end of all steps +func PromoteRollout(ctx context.Context, rolloutIf clientset.RolloutInterface, name string) (*v1alpha1.Rollout, error) { + ro, err := rolloutIf.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + specPatch, statusPatch, unifiedPatch := getPatches(ro) + if statusPatch != nil { + ro, err = rolloutIf.Patch(ctx, name, types.MergePatchType, statusPatch, metav1.PatchOptions{}, "status") + if err != nil { + // NOTE: in the future, we can simply return error here, if we wish to drop support for v0.9 + if !k8serrors.IsNotFound(err) { + return nil, err + } + // we got a NotFound error. status subresource is not being used, so perform unifiedPatch + specPatch = unifiedPatch + } + } + if specPatch != nil { + ro, err = rolloutIf.Patch(ctx, name, types.MergePatchType, specPatch, metav1.PatchOptions{}) + if err != nil { + return nil, err + } + } + return ro, nil +} + +func isInconclusive(rollout *v1alpha1.Rollout) bool { + return rollout.Spec.Strategy.Canary != nil && rollout.Status.Canary.CurrentStepAnalysisRunStatus != nil && rollout.Status.Canary.CurrentStepAnalysisRunStatus.Status == v1alpha1.AnalysisPhaseInconclusive +} + +func getPatches(rollout *v1alpha1.Rollout) ([]byte, []byte, []byte) { + var specPatch, statusPatch, unifiedPatch []byte + + unifiedPatch = []byte(unpauseAndClearPauseConditionsPatch) + if rollout.Spec.Paused { + specPatch = []byte(unpausePatch) + } + // in case if canary rollout in inconclusive state, we want to unset controller pause , clean pause conditions and increment step index + // so that rollout can proceed to next step + // without such patch, rollout will be stuck in inconclusive state in case if next step is pause step + switch { + case isInconclusive(rollout) && len(rollout.Status.PauseConditions) > 0 && rollout.Status.ControllerPause: + _, index := GetCurrentCanaryStep(rollout) + if index != nil { + if *index < int32(len(rollout.Spec.Strategy.Canary.Steps)) { + *index++ + } + statusPatch = []byte(fmt.Sprintf(clearPauseConditionsAndControllerPausePatch, *index)) + } + case len(rollout.Status.PauseConditions) > 0: + statusPatch = []byte(clearPauseConditionsPatch) + case rollout.Spec.Strategy.Canary != nil: + _, index := GetCurrentCanaryStep(rollout) + // At this point, the controller knows that the rollout is a canary with steps and GetCurrentCanaryStep returns 0 if + // the index is not set in the rollout + if index != nil { + if *index < int32(len(rollout.Spec.Strategy.Canary.Steps)) { + *index++ + } + statusPatch = []byte(fmt.Sprintf(clearPauseConditionsPatchWithStep, *index)) + unifiedPatch = []byte(fmt.Sprintf(unpauseAndClearPauseConditionsPatchWithStep, *index)) + } + } + return specPatch, statusPatch, unifiedPatch +} + +// GetCurrentCanaryStep returns the current canary step. If there are no steps or the rollout +// has already executed the last step, the func returns nil +func GetCurrentCanaryStep(rollout *v1alpha1.Rollout) (*v1alpha1.CanaryStep, *int32) { + if rollout.Spec.Strategy.Canary == nil || len(rollout.Spec.Strategy.Canary.Steps) == 0 { + return nil, nil + } + currentStepIndex := int32(0) + if rollout.Status.CurrentStepIndex != nil { + currentStepIndex = *rollout.Status.CurrentStepIndex + } + if len(rollout.Spec.Strategy.Canary.Steps) <= int(currentStepIndex) { + return nil, ¤tStepIndex + } + return &rollout.Spec.Strategy.Canary.Steps[currentStepIndex], ¤tStepIndex +} diff --git a/pkg/controller/service/health.go b/pkg/controller/service/health.go index 61a9048c..247072ea 100644 --- a/pkg/controller/service/health.go +++ b/pkg/controller/service/health.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" flaggerv1beta1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" "github.com/pluralsh/deployment-operator/pkg/lua" appsv1 "k8s.io/api/apps/v1" @@ -98,6 +99,38 @@ func getHPAHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { } } +func getArgoRolloutHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { + var argo rolloutv1alpha1.Rollout + var msg string + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &argo); err != nil { + return nil, err + } + switch argo.Status.Phase { + case rolloutv1alpha1.RolloutPhasePaused: + return &HealthStatus{ + Status: HealthStatusPaused, + Message: argo.Status.Message, + }, nil + + case rolloutv1alpha1.RolloutPhaseDegraded: + return &HealthStatus{ + Status: HealthStatusDegraded, + Message: argo.Status.Message, + }, nil + + case rolloutv1alpha1.RolloutPhaseHealthy: + return &HealthStatus{ + Status: HealthStatusHealthy, + Message: argo.Status.Message, + }, nil + default: + return &HealthStatus{ + Status: HealthStatusProgressing, + Message: msg, + }, nil + } +} + func getCanaryHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { var canary flaggerv1beta1.Canary if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &canary); err != nil { diff --git a/pkg/controller/service/reconciler_status.go b/pkg/controller/service/reconciler_status.go index dff51bae..c04bc6bb 100644 --- a/pkg/controller/service/reconciler_status.go +++ b/pkg/controller/service/reconciler_status.go @@ -5,15 +5,14 @@ import ( "fmt" "strings" + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" console "github.com/pluralsh/console-client-go" + "github.com/pluralsh/deployment-operator/pkg/manifests" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/cli-utils/pkg/print/stats" - - "github.com/pluralsh/deployment-operator/pkg/manifests" - "sigs.k8s.io/cli-utils/pkg/apply/event" + "sigs.k8s.io/cli-utils/pkg/print/stats" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -98,6 +97,10 @@ func (s *ServiceReconciler) GetHealthCheckFunc(gvk schema.GroupVersionKind) func if gvk.Kind == CanaryKind { return getCanaryHealth } + case rollouts.Group: + if gvk.Kind == rollouts.RolloutKind { + return getArgoRolloutHealth + } case "autoscaling": if gvk.Kind == HorizontalPodAutoscalerKind { return getHPAHealth diff --git a/pkg/controller/service/status_collector.go b/pkg/controller/service/status_collector.go index 19ba2a4a..1d217e4b 100644 --- a/pkg/controller/service/status_collector.go +++ b/pkg/controller/service/status_collector.go @@ -108,13 +108,20 @@ func (sc *serviceComponentsStatusCollector) fromSyncResult(e event.StatusEvent, version = v } + synced := e.PollResourceInfo.Status == status.CurrentStatus + + if e.PollResourceInfo.Status == status.UnknownStatus { + if sc.reconciler.toStatus(e.Resource) != nil { + synced = *sc.reconciler.toStatus(e.Resource) == console.ComponentStateRunning + } + } return &console.ComponentAttributes{ Group: gvk.Group, Kind: gvk.Kind, Namespace: e.Resource.GetNamespace(), Name: e.Resource.GetName(), Version: version, - Synced: e.PollResourceInfo.Status == status.CurrentStatus, + Synced: synced, State: sc.reconciler.toStatus(e.Resource), } } From 4625c2f9f7633f4f82a35349e70f802c19dd8bd1 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:37:43 -0400 Subject: [PATCH 6/8] Updated chart to release v0.4.36 (#224) Co-authored-by: michaeljguarino --- charts/deployment-operator/Chart.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/charts/deployment-operator/Chart.yaml b/charts/deployment-operator/Chart.yaml index b4f5407d..3a6418d2 100644 --- a/charts/deployment-operator/Chart.yaml +++ b/charts/deployment-operator/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: deployment-operator description: creates a new instance of the plural deployment operator -appVersion: 0.4.35 -version: 0.4.35 +appVersion: 0.4.36 +version: 0.4.36 maintainers: - name: Plural url: https://www.plural.sh From 8db49ca097b9197a3951ddd07b628f624ae0a1ae Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Tue, 18 Jun 2024 10:37:11 +0200 Subject: [PATCH 7/8] feat: make a configurable poll interval (#225) * make a configurable poll interval * add comment --- cmd/agent/agent.go | 6 ++++-- pkg/controller/controller.go | 3 +++ pkg/controller/controller_manager.go | 7 ++++++- pkg/controller/namespaces/reconciler.go | 4 ++++ pkg/controller/pipelinegates/reconciler.go | 8 +++++++- pkg/controller/restore/reconciler.go | 4 ++++ pkg/controller/service/reconciler.go | 4 ++++ pkg/controller/stacks/job_test.go | 2 +- pkg/controller/stacks/reconciler.go | 14 ++++++++++---- pkg/controller/stacks/reconciler_test.go | 12 ++++++------ pkg/test/mocks/Client_mock.go | 20 ++++++++++---------- 11 files changed, 59 insertions(+), 25 deletions(-) diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index adc68632..b54272fd 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -19,6 +19,8 @@ import ( "github.com/pluralsh/deployment-operator/pkg/controller/service" ) +const pollInterval = time.Second * 30 + func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) { r, err := time.ParseDuration(opt.refreshInterval) if err != nil { @@ -48,7 +50,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient Do: sr, Queue: sr.SvcQueue, }) - gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, r, opt.clusterId) + gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, r, pollInterval, opt.clusterId) if err != nil { setupLog.Errorw("unable to create gate reconciler", "error", err) os.Exit(1) @@ -79,7 +81,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient os.Exit(1) } - s := stacks.NewStackReconciler(mgr.GetClient(), k8sClient, r, namespace, opt.consoleUrl, opt.deployToken) + s := stacks.NewStackReconciler(mgr.GetClient(), k8sClient, r, pollInterval, namespace, opt.consoleUrl, opt.deployToken) mgr.AddController(&controller.Controller{ Name: "Stack Controller", Do: s, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2cd78522..a9fda51f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -33,6 +33,9 @@ type Reconciler interface { // ShutdownQueue containing Console resources. ShutdownQueue() + + // GetPollInterval returns custom poll interval. If 0 then controller manager use default from the options. + GetPollInterval() time.Duration } type Controller struct { diff --git a/pkg/controller/controller_manager.go b/pkg/controller/controller_manager.go index 6278537c..9a819d6b 100644 --- a/pkg/controller/controller_manager.go +++ b/pkg/controller/controller_manager.go @@ -83,8 +83,13 @@ func (cm *ControllerManager) Start() error { go func() { defer controller.Do.ShutdownQueue() defer controller.Do.WipeCache() + + pollInterval := cm.Refresh + if controllerPollInterval := controller.Do.GetPollInterval(); controllerPollInterval > 0 { + pollInterval = controllerPollInterval + } //nolint:all - _ = wait.PollImmediateInfinite(cm.Refresh, func() (done bool, err error) { + _ = wait.PollImmediateInfinite(pollInterval, func() (done bool, err error) { return controller.Do.Poll(cm.ctx) }) }() diff --git a/pkg/controller/namespaces/reconciler.go b/pkg/controller/namespaces/reconciler.go index 67a7381d..3d47f059 100644 --- a/pkg/controller/namespaces/reconciler.go +++ b/pkg/controller/namespaces/reconciler.go @@ -41,6 +41,10 @@ func NewNamespaceReconciler(consoleClient client.Client, k8sClient ctrlclient.Cl } } +func (n *NamespaceReconciler) GetPollInterval() time.Duration { + return 0 // use default poll interval +} + func (n *NamespaceReconciler) GetPublisher() (string, websocket.Publisher) { return "namespace.event", &socketPublisher{ restoreQueue: n.NamespaceQueue, diff --git a/pkg/controller/pipelinegates/reconciler.go b/pkg/controller/pipelinegates/reconciler.go index c94663ac..2e8fa9b2 100644 --- a/pkg/controller/pipelinegates/reconciler.go +++ b/pkg/controller/pipelinegates/reconciler.go @@ -36,9 +36,10 @@ type GateReconciler struct { discoveryClient *discovery.DiscoveryClient pinger *ping.Pinger operatorNamespace string + PollInterval time.Duration } -func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, config *rest.Config, refresh time.Duration, clusterId string) (*GateReconciler, error) { +func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, config *rest.Config, refresh, pollInterval time.Duration, clusterId string) (*GateReconciler, error) { utils.DisableClientLimits(config) discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) @@ -74,9 +75,14 @@ func NewGateReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, discoveryClient: discoveryClient, pinger: ping.New(consoleClient, discoveryClient, f), operatorNamespace: namespace, + PollInterval: pollInterval, }, nil } +func (s *GateReconciler) GetPollInterval() time.Duration { + return s.PollInterval +} + func (s *GateReconciler) WipeCache() { s.GateCache.Wipe() } diff --git a/pkg/controller/restore/reconciler.go b/pkg/controller/restore/reconciler.go index 4b8bc5a1..e3e7b9df 100644 --- a/pkg/controller/restore/reconciler.go +++ b/pkg/controller/restore/reconciler.go @@ -63,6 +63,10 @@ func NewRestoreReconciler(consoleClient client.Client, k8sClient ctrlclient.Clie } } +func (s *RestoreReconciler) GetPollInterval() time.Duration { + return 0 // use default poll interval +} + func (s *RestoreReconciler) GetPublisher() (string, websocket.Publisher) { return "restore.event", &socketPublisher{ restoreQueue: s.RestoreQueue, diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index 7935ac64..43573d9b 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -155,6 +155,10 @@ func CapabilitiesAPIVersions(discoveryClient *discovery.DiscoveryClient) error { return nil } +func (s *ServiceReconciler) GetPollInterval() time.Duration { + return 0 // use default poll interval +} + func (s *ServiceReconciler) GetPublisher() (string, websocket.Publisher) { return "service.event", &socketPublisher{ svcQueue: s.SvcQueue, diff --git a/pkg/controller/stacks/job_test.go b/pkg/controller/stacks/job_test.go index 600edee6..b6ba519f 100644 --- a/pkg/controller/stacks/job_test.go +++ b/pkg/controller/stacks/job_test.go @@ -15,7 +15,7 @@ func TestGetDefaultContainerImage(t *testing.T) { var kClient client.Client fakeConsoleClient := mocks.NewClientMock(t) namespace := "default" - reconciler := NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") run := &console.StackRunFragment{ Type: console.StackTypeTerraform, Configuration: &console.StackConfigurationFragment{ diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index 355eff78..ef0cf125 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -25,9 +25,10 @@ type StackReconciler struct { Namespace string ConsoleURL string DeployToken string + PollInterval time.Duration } -func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh time.Duration, namespace, consoleURL, deployToken string) *StackReconciler { +func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh, pollInterval time.Duration, namespace, consoleURL, deployToken string) *StackReconciler { return &StackReconciler{ ConsoleClient: consoleClient, K8sClient: k8sClient, @@ -35,12 +36,17 @@ func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client StackCache: client.NewCache[console.StackRunFragment](refresh, func(id string) (*console.StackRunFragment, error) { return consoleClient.GetStackRun(id) }), - Namespace: namespace, - ConsoleURL: consoleURL, - DeployToken: deployToken, + Namespace: namespace, + ConsoleURL: consoleURL, + DeployToken: deployToken, + PollInterval: pollInterval, } } +func (r *StackReconciler) GetPollInterval() time.Duration { + return r.PollInterval +} + func (r *StackReconciler) GetPublisher() (string, websocket.Publisher) { return "stack.run.event", &socketPublisher{ stackRunQueue: r.StackQueue, diff --git a/pkg/controller/stacks/reconciler_test.go b/pkg/controller/stacks/reconciler_test.go index 06e04853..6be22b03 100644 --- a/pkg/controller/stacks/reconciler_test.go +++ b/pkg/controller/stacks/reconciler_test.go @@ -73,7 +73,7 @@ var _ = Describe("Reconciler", Ordered, func() { GqlErrors: &gqlerror.List{gqlerror.Errorf(errors2.ErrorNotFound.String())}, }) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -85,7 +85,7 @@ var _ = Describe("Reconciler", Ordered, func() { GqlErrors: &gqlerror.List{gqlerror.Errorf("unknown error")}, }) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).To(HaveOccurred()) @@ -100,7 +100,7 @@ var _ = Describe("Reconciler", Ordered, func() { Status: console.StackStatusPending, }, nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -118,7 +118,7 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -157,7 +157,7 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err := reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) @@ -213,7 +213,7 @@ var _ = Describe("Reconciler", Ordered, func() { fakeConsoleClient.On("GetStackRun", mock.Anything).Return(stackRun, nil) fakeConsoleClient.On("UpdateStackRun", mock.Anything, mock.Anything).Return(nil) - reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, namespace, "", "") + reconciler := stacks.NewStackReconciler(fakeConsoleClient, kClient, time.Minute, 0, namespace, "", "") _, err = reconciler.Reconcile(ctx, stackRunId) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/test/mocks/Client_mock.go b/pkg/test/mocks/Client_mock.go index eb2be1d1..dbb749fe 100644 --- a/pkg/test/mocks/Client_mock.go +++ b/pkg/test/mocks/Client_mock.go @@ -4,11 +4,11 @@ package mocks import ( gqlclient "github.com/pluralsh/console-client-go" - "github.com/stretchr/testify/mock" + mock "github.com/stretchr/testify/mock" - stackrun "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1" + v1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1" - "github.com/pluralsh/deployment-operator/api/v1alpha1" + v1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" ) // ClientMock is an autogenerated mock type for the Client type @@ -735,23 +735,23 @@ func (_c *ClientMock_GetStackRun_Call) RunAndReturn(run func(string) (*gqlclient } // GetStackRunBase provides a mock function with given fields: id -func (_m *ClientMock) GetStackRunBase(id string) (*stackrun.StackRun, error) { +func (_m *ClientMock) GetStackRunBase(id string) (*v1.StackRun, error) { ret := _m.Called(id) if len(ret) == 0 { panic("no return value specified for GetStackRunBase") } - var r0 *stackrun.StackRun + var r0 *v1.StackRun var r1 error - if rf, ok := ret.Get(0).(func(string) (*stackrun.StackRun, error)); ok { + if rf, ok := ret.Get(0).(func(string) (*v1.StackRun, error)); ok { return rf(id) } - if rf, ok := ret.Get(0).(func(string) *stackrun.StackRun); ok { + if rf, ok := ret.Get(0).(func(string) *v1.StackRun); ok { r0 = rf(id) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*stackrun.StackRun) + r0 = ret.Get(0).(*v1.StackRun) } } @@ -782,12 +782,12 @@ func (_c *ClientMock_GetStackRunBase_Call) Run(run func(id string)) *ClientMock_ return _c } -func (_c *ClientMock_GetStackRunBase_Call) Return(_a0 *stackrun.StackRun, _a1 error) *ClientMock_GetStackRunBase_Call { +func (_c *ClientMock_GetStackRunBase_Call) Return(_a0 *v1.StackRun, _a1 error) *ClientMock_GetStackRunBase_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *ClientMock_GetStackRunBase_Call) RunAndReturn(run func(string) (*stackrun.StackRun, error)) *ClientMock_GetStackRunBase_Call { +func (_c *ClientMock_GetStackRunBase_Call) RunAndReturn(run func(string) (*v1.StackRun, error)) *ClientMock_GetStackRunBase_Call { _c.Call.Return(run) return _c } From afea7487db945ae836ace82e40939d4630c1d30c Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Tue, 18 Jun 2024 12:54:48 -0500 Subject: [PATCH 8/8] feat: integrate prometheus metrics handler and expose basic metrics (#226) * feat: integrate prometheus metrics handler and expose basic metrics * fix obsolete metric record * fix lint * improve prometheus metrics recording * fix lint * small optimization --- cmd/agent/metrics.go | 26 +++++++ go.mod | 7 +- go.sum | 14 ++-- internal/metrics/metrics_prometheus.go | 73 +++++++++++++++++++ internal/metrics/metrics_types.go | 27 +++++++ pkg/controller/namespaces/socket_publisher.go | 3 +- pkg/controller/service/reconciler.go | 6 +- pkg/controller/stacks/job.go | 2 + pkg/controller/stacks/socket_publisher.go | 3 +- 9 files changed, 146 insertions(+), 15 deletions(-) create mode 100644 cmd/agent/metrics.go create mode 100644 internal/metrics/metrics_prometheus.go create mode 100644 internal/metrics/metrics_types.go diff --git a/cmd/agent/metrics.go b/cmd/agent/metrics.go new file mode 100644 index 00000000..323cd994 --- /dev/null +++ b/cmd/agent/metrics.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/klog/v2" +) + +const ( + prometheusMetricsPath = "/metrics" + prometheusMetricsPort = 8000 +) + +func init() { + go initPrometheusMetrics() +} + +func initPrometheusMetrics() { + http.Handle(prometheusMetricsPath, promhttp.Handler()) + + if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusMetricsPort), nil); err != nil { + klog.Fatal(err) + } +} diff --git a/go.mod b/go.mod index 2d461195..8d707031 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/pluralsh/controller-reconcile-helper v0.0.4 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 + github.com/prometheus/client_golang v1.19.1 github.com/samber/lo v1.39.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 @@ -150,7 +151,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -172,9 +172,8 @@ require ( github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rs/zerolog v1.29.0 // indirect @@ -207,7 +206,7 @@ require ( golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.16.0 // indirect - golang.org/x/oauth2 v0.14.0 // indirect + golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect diff --git a/go.sum b/go.sum index ee757816..6195291d 100644 --- a/go.sum +++ b/go.sum @@ -452,8 +452,6 @@ github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/miekg/dns v1.1.48 h1:Ucfr7IIVyMBz4lRE8qmGUuZ4Wt3/ZGu9hmcMT3Uu4tQ= github.com/miekg/dns v1.1.48/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= @@ -543,8 +541,8 @@ github.com/poy/onpar v1.1.2/go.mod h1:6X8FLNoxyr9kkmnlqpK6LSoiOtrO6MICtWwEuWkLjz github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -552,8 +550,8 @@ github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cY github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= -github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= -github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= @@ -797,8 +795,8 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.14.0 h1:P0Vrf/2538nmC0H+pEQ3MNFRRnVR7RlqyVw+bvm26z0= -golang.org/x/oauth2 v0.14.0/go.mod h1:lAtNWgaWfL4cm7j2OV8TxGi9Qb7ECORx8DktCY74OwM= +golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= +golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/metrics/metrics_prometheus.go b/internal/metrics/metrics_prometheus.go new file mode 100644 index 00000000..dfe3c4d1 --- /dev/null +++ b/internal/metrics/metrics_prometheus.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + recorder = (&prometheusRecorder{}).init() +) + +type prometheusRecorder struct { + discoveryAPICacheRefreshCounter prometheus.Counter + discoveryAPICacheRefreshErrorCounter prometheus.Counter + serviceReconciliationCounter *prometheus.CounterVec + serviceReconciliationErrorCounter *prometheus.CounterVec + stackRunJobsCreatedCounter prometheus.Counter +} + +func (in *prometheusRecorder) DiscoveryAPICacheRefresh(err error) { + if err != nil { + in.discoveryAPICacheRefreshErrorCounter.Inc() + return + } + + in.discoveryAPICacheRefreshCounter.Inc() +} + +func (in *prometheusRecorder) ServiceReconciliation(serviceID, serviceName string, err error) { + if err != nil { + in.serviceReconciliationErrorCounter.WithLabelValues(serviceID, serviceName).Inc() + return + } + + in.serviceReconciliationCounter.WithLabelValues(serviceID, serviceName).Inc() +} + +func (in *prometheusRecorder) StackRunJobCreation() { + in.stackRunJobsCreatedCounter.Inc() +} + +func (in *prometheusRecorder) init() Recorder { + in.discoveryAPICacheRefreshCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: DiscoveryAPICacheRefreshMetricName, + Help: DiscoveryAPICacheRefreshMetricDescription, + }) + + in.discoveryAPICacheRefreshErrorCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: DiscoveryAPICacheRefreshErrorMetricName, + Help: DiscoveryAPICacheRefreshErrorMetricDescription, + }) + + in.serviceReconciliationCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: ServiceReconciliationMetricName, + Help: ServiceReconciliationMetricDescription, + }, []string{ServiceReconciliationMetricLabelServiceID, ServiceReconciliationMetricLabelServiceName}) + + in.serviceReconciliationErrorCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: ServiceReconciliationErrorMetricName, + Help: ServiceReconciliationErrorMetricDescription, + }, []string{ServiceReconciliationMetricLabelServiceID, ServiceReconciliationMetricLabelServiceName}) + + in.stackRunJobsCreatedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: StackRunJobsCreatedMetricName, + Help: StackRunJobsCreatedMetricDescription, + }) + + return in +} + +func Record() Recorder { + return recorder +} diff --git a/internal/metrics/metrics_types.go b/internal/metrics/metrics_types.go new file mode 100644 index 00000000..1cf352f6 --- /dev/null +++ b/internal/metrics/metrics_types.go @@ -0,0 +1,27 @@ +package metrics + +const ( + DiscoveryAPICacheRefreshMetricName = "agent_discoveryapi_cache_refresh_total" + DiscoveryAPICacheRefreshMetricDescription = "The total number of Discovery API cache refresh attempts" + + DiscoveryAPICacheRefreshErrorMetricName = "agent_discoveryapi_cache_refresh_error_total" + DiscoveryAPICacheRefreshErrorMetricDescription = "The total number of Discovery API cache refresh errors" + + ServiceReconciliationMetricName = "agent_service_reconciliations_total" + ServiceReconciliationMetricDescription = "The total number of service reconciliations" + + ServiceReconciliationErrorMetricName = "agent_service_reconciliation_errors_total" + ServiceReconciliationErrorMetricDescription = "The total number of service reconciliation errors" + + ServiceReconciliationMetricLabelServiceID = "service_id" + ServiceReconciliationMetricLabelServiceName = "service_name" + + StackRunJobsCreatedMetricName = "agent_stack_runs_created_total" + StackRunJobsCreatedMetricDescription = "The total number of created stack runs" +) + +type Recorder interface { + DiscoveryAPICacheRefresh(err error) + ServiceReconciliation(serviceID, serviceName string, err error) + StackRunJobCreation() +} diff --git a/pkg/controller/namespaces/socket_publisher.go b/pkg/controller/namespaces/socket_publisher.go index fae5df94..ebeec831 100644 --- a/pkg/controller/namespaces/socket_publisher.go +++ b/pkg/controller/namespaces/socket_publisher.go @@ -2,8 +2,9 @@ package namespaces import ( console "github.com/pluralsh/console-client-go" - "github.com/pluralsh/deployment-operator/pkg/client" "k8s.io/client-go/util/workqueue" + + "github.com/pluralsh/deployment-operator/pkg/client" ) type socketPublisher struct { diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index 43573d9b..d02363d1 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -25,6 +25,7 @@ import ( clienterrors "github.com/pluralsh/deployment-operator/internal/errors" "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/internal/metrics" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/applier" "github.com/pluralsh/deployment-operator/pkg/client" @@ -109,10 +110,11 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf } _ = helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { - if err := CapabilitiesAPIVersions(discoveryClient); err != nil { + if err = CapabilitiesAPIVersions(discoveryClient); err != nil { logger.Error(err, "can't fetch API versions") } + metrics.Record().DiscoveryAPICacheRefresh(err) return false, nil }) @@ -293,6 +295,8 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re s.UpdateErrorStatus(ctx, id, err) } } + + metrics.Record().ServiceReconciliation(id, svc.Name, err) }() logger.V(2).Info("local", "flag", Local) diff --git a/pkg/controller/stacks/job.go b/pkg/controller/stacks/job.go index 124fbad8..d1205a0c 100644 --- a/pkg/controller/stacks/job.go +++ b/pkg/controller/stacks/job.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/pluralsh/deployment-operator/internal/metrics" consoleclient "github.com/pluralsh/deployment-operator/pkg/client" ) @@ -93,6 +94,7 @@ func (r *StackReconciler) reconcileRunJob(ctx context.Context, run *console.Stac return nil, err } + metrics.Record().StackRunJobCreation() if err := r.ConsoleClient.UpdateStackRun(run.ID, console.StackRunAttributes{ Status: run.Status, JobRef: &console.NamespacedName{ diff --git a/pkg/controller/stacks/socket_publisher.go b/pkg/controller/stacks/socket_publisher.go index 14cbfad1..c0edad64 100644 --- a/pkg/controller/stacks/socket_publisher.go +++ b/pkg/controller/stacks/socket_publisher.go @@ -2,8 +2,9 @@ package stacks import ( console "github.com/pluralsh/console-client-go" - "github.com/pluralsh/deployment-operator/pkg/client" "k8s.io/client-go/util/workqueue" + + "github.com/pluralsh/deployment-operator/pkg/client" ) type socketPublisher struct {