From 7243acc17df38ccc0110d3c0cd8cddf79217982f Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Mon, 17 Jun 2024 10:00:27 -0500 Subject: [PATCH] fix: do not allow discovery api call for helm templating to stop the agent (#222) * do not allow discovery api call for helm templating to stop the agent * bump default number of reconciles to 20 * update logic * fix lint --- cmd/agent/agent.go | 23 ++++++++------- cmd/agent/options.go | 2 +- internal/helpers/poll.go | 24 +++++++++++++++ pkg/controller/service/reconciler.go | 44 +++++++++++++--------------- 4 files changed, 57 insertions(+), 36 deletions(-) create mode 100644 internal/helpers/poll.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index deb3f5db..adc68632 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -7,39 +7,40 @@ import ( "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/controller/stacks" + "github.com/samber/lo" + "golang.org/x/net/context" + "k8s.io/client-go/rest" + ctrclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/controller/namespaces" "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" "github.com/pluralsh/deployment-operator/pkg/controller/restore" "github.com/pluralsh/deployment-operator/pkg/controller/service" - "github.com/samber/lo" - "golang.org/x/net/context" - "k8s.io/client-go/rest" - ctrclient "sigs.k8s.io/controller-runtime/pkg/client" ) func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) { r, err := time.ParseDuration(opt.refreshInterval) if err != nil { - setupLog.Error(err, "unable to get refresh interval") + setupLog.Error("unable to get refresh interval", "error", err) os.Exit(1) } t, err := time.ParseDuration(opt.processingTimeout) if err != nil { - setupLog.Error(err, "unable to get processing timeout") + setupLog.Errorw("unable to get processing timeout", "error", err) os.Exit(1) } mgr, err := controller.NewControllerManager(ctx, opt.maxConcurrentReconciles, t, r, lo.ToPtr(true), opt.consoleUrl, opt.deployToken, opt.clusterId) if err != nil { - setupLog.Error(err, "unable to create manager") + setupLog.Errorw("unable to create manager", "error", err) os.Exit(1) } sr, err := service.NewServiceReconciler(ctx, mgr.GetClient(), config, r, opt.restoreNamespace) if err != nil { - setupLog.Error(err, "unable to create service reconciler") + setupLog.Errorw("unable to create service reconciler", "error", err) os.Exit(1) } mgr.AddController(&controller.Controller{ @@ -49,7 +50,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient }) gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, r, opt.clusterId) if err != nil { - setupLog.Error(err, "unable to create gate reconciler") + setupLog.Errorw("unable to create gate reconciler", "error", err) os.Exit(1) } mgr.AddController(&controller.Controller{ @@ -74,7 +75,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient namespace, err := utils.GetOperatorNamespace() if err != nil { - setupLog.Error(err, "unable to get operator namespace") + setupLog.Errorw("unable to get operator namespace", "error", err) os.Exit(1) } @@ -85,7 +86,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient Queue: s.StackQueue, }) if err := mgr.Start(); err != nil { - setupLog.Error(err, "unable to start controller manager") + setupLog.Errorw("unable to start controller manager", "error", err) os.Exit(1) } diff --git a/cmd/agent/options.go b/cmd/agent/options.go index 838d5fd8..382b2d3b 100644 --- a/cmd/agent/options.go +++ b/cmd/agent/options.go @@ -38,7 +38,7 @@ func newOptions() *options { flag.StringVar(&o.metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&o.probeAddr, "health-probe-bind-address", ":9001", "The address the probe endpoint binds to.") flag.BoolVar(&o.enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") - flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 10, "Maximum number of concurrent reconciles which can be run.") + flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.") flag.IntVar(&o.resyncSeconds, "resync-seconds", 300, "Resync duration in seconds.") flag.StringVar(&o.refreshInterval, "refresh-interval", "2m", "Refresh interval duration.") flag.StringVar(&o.processingTimeout, "processing-timeout", "1m", "Maximum amount of time to spend trying to process queue item.") diff --git a/internal/helpers/poll.go b/internal/helpers/poll.go new file mode 100644 index 00000000..dd54cc3a --- /dev/null +++ b/internal/helpers/poll.go @@ -0,0 +1,24 @@ +package helpers + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// BackgroundPollUntilContextCancel spawns a new goroutine that runs the condition function on interval. +// If syncFirstRun is set to true, it will execute the condition function synchronously first and then start +// polling. Since error is returned synchronously, the only way to check for it is to use syncFirstRun. +// Background poller does not sync errors. It can be stopped externally by cancelling the provided context. +func BackgroundPollUntilContextCancel(ctx context.Context, interval time.Duration, immediate, syncFirstRun bool, condition wait.ConditionWithContextFunc) (err error) { + if syncFirstRun { + _, err = condition(ctx) + } + + go func() { + _ = wait.PollUntilContextCancel(ctx, interval, immediate, condition) + }() + + return err +} diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index feba3bf6..7935ac64 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -7,23 +7,11 @@ import ( "time" console "github.com/pluralsh/console-client-go" - clienterrors "github.com/pluralsh/deployment-operator/internal/errors" - "github.com/pluralsh/deployment-operator/internal/utils" - "github.com/pluralsh/deployment-operator/pkg/applier" - "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" - plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" - "github.com/pluralsh/deployment-operator/pkg/manifests" - manis "github.com/pluralsh/deployment-operator/pkg/manifests" - "github.com/pluralsh/deployment-operator/pkg/manifests/template" - "github.com/pluralsh/deployment-operator/pkg/ping" - "github.com/pluralsh/deployment-operator/pkg/websocket" "github.com/pluralsh/polly/algorithms" "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -34,6 +22,19 @@ import ( "sigs.k8s.io/cli-utils/pkg/inventory" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + clienterrors "github.com/pluralsh/deployment-operator/internal/errors" + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/applier" + "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" + plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" + "github.com/pluralsh/deployment-operator/pkg/manifests" + manis "github.com/pluralsh/deployment-operator/pkg/manifests" + "github.com/pluralsh/deployment-operator/pkg/manifests/template" + "github.com/pluralsh/deployment-operator/pkg/ping" + "github.com/pluralsh/deployment-operator/pkg/websocket" ) func init() { @@ -106,19 +107,14 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf if err != nil { return nil, err } - if err := CapabilitiesAPIVersions(discoveryClient); err != nil { - return nil, err - } - go func() { - //nolint:all - _ = wait.PollImmediateInfinite(time.Minute*5, func() (done bool, err error) { - if err := CapabilitiesAPIVersions(discoveryClient); err != nil { - logger.Error(err, "can't fetch API versions") - } - return false, nil - }) - }() + _ = helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { + if err := CapabilitiesAPIVersions(discoveryClient); err != nil { + logger.Error(err, "can't fetch API versions") + } + + return false, nil + }) return &ServiceReconciler{ ConsoleClient: consoleClient,