From e3cdc8df15ada556211d3d70b86cc109c56da0da Mon Sep 17 00:00:00 2001 From: Sebastian Florek Date: Mon, 16 Sep 2024 12:16:52 +0200 Subject: [PATCH] feat: reafctor agent in preparation for console manager supervisor (#265) * refactor operator initialization * fix circular dependency issue * fix lint * fix merge conflicts --- cmd/agent/agent.go | 98 --------- cmd/agent/args/args.go | 52 +++-- cmd/agent/console.go | 78 +++++++ cmd/agent/kubernetes.go | 212 +++++++++++++++++++ cmd/agent/main.go | 198 ++++------------- internal/kubernetes/schema/group_name.go | 7 + pkg/cache/discovery_cache.go | 68 ++++++ pkg/cache/resource_cache.go | 8 +- pkg/cache/resource_cache_entry.go | 4 +- pkg/common/common.go | 14 ++ pkg/common/lua.go | 3 +- pkg/common/status.go | 9 +- pkg/controller/controller_manager.go | 62 +++--- pkg/controller/controller_manager_options.go | 82 +++++++ pkg/controller/namespaces/reconciler.go | 4 + pkg/controller/pipelinegates/reconciler.go | 17 +- pkg/controller/restore/reconciler.go | 4 + pkg/controller/service/reconciler.go | 48 +---- pkg/controller/service/reconciler_status.go | 11 +- pkg/controller/service/status_collector.go | 10 +- pkg/controller/stacks/reconciler.go | 4 + pkg/manifests/template/helm.go | 6 +- pkg/manifests/versions.go | 14 +- 23 files changed, 636 insertions(+), 377 deletions(-) delete mode 100644 cmd/agent/agent.go create mode 100644 cmd/agent/console.go create mode 100644 cmd/agent/kubernetes.go create mode 100644 internal/kubernetes/schema/group_name.go create mode 100644 pkg/cache/discovery_cache.go create mode 100644 pkg/controller/controller_manager_options.go diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go deleted file mode 100644 index cbf45ee2..00000000 --- a/cmd/agent/agent.go +++ /dev/null @@ -1,98 +0,0 @@ -package main - -import ( - "os" - "time" - - "github.com/pluralsh/deployment-operator/cmd/agent/args" - "github.com/pluralsh/deployment-operator/internal/utils" - "github.com/pluralsh/deployment-operator/pkg/controller/stacks" - - "github.com/samber/lo" - "golang.org/x/net/context" - "k8s.io/client-go/rest" - ctrclient "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/pluralsh/deployment-operator/pkg/controller" - "github.com/pluralsh/deployment-operator/pkg/controller/namespaces" - "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" - "github.com/pluralsh/deployment-operator/pkg/controller/restore" - "github.com/pluralsh/deployment-operator/pkg/controller/service" -) - -const ( - pollInterval = time.Second * 30 - jitter = time.Second * 15 -) - -func runAgent(config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) { - mgr, err := controller.NewControllerManager( - ctx, - args.MaxConcurrentReconciles(), - args.ProcessingTimeout(), - args.RefreshInterval(), - args.RefreshJitter(), - lo.ToPtr(true), - args.ConsoleUrl(), - args.DeployToken(), - args.ClusterId(), - ) - if err != nil { - setupLog.Errorw("unable to create manager", "error", err) - os.Exit(1) - } - - sr, err := service.NewServiceReconciler(ctx, mgr.GetClient(), config, args.RefreshInterval(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl()) - if err != nil { - setupLog.Errorw("unable to create service reconciler", "error", err) - os.Exit(1) - } - mgr.AddController(&controller.Controller{ - Name: "Service Controller", - Do: sr, - Queue: sr.SvcQueue, - }) - gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, args.RefreshInterval(), pollInterval, args.ClusterId()) - if err != nil { - setupLog.Errorw("unable to create gate reconciler", "error", err) - os.Exit(1) - } - mgr.AddController(&controller.Controller{ - Name: "Gate Controller", - Do: gr, - Queue: gr.GateQueue, - }) - - rr := restore.NewRestoreReconciler(mgr.GetClient(), k8sClient, args.RefreshInterval(), args.RestoreNamespace()) - mgr.AddController(&controller.Controller{ - Name: "Restore Controller", - Do: rr, - Queue: rr.RestoreQueue, - }) - - ns := namespaces.NewNamespaceReconciler(mgr.GetClient(), k8sClient, args.RefreshInterval()) - mgr.AddController(&controller.Controller{ - Name: "Managed Namespace Controller", - Do: ns, - Queue: ns.NamespaceQueue, - }) - - namespace, err := utils.GetOperatorNamespace() - if err != nil { - setupLog.Errorw("unable to get operator namespace", "error", err) - os.Exit(1) - } - - s := stacks.NewStackReconciler(mgr.GetClient(), k8sClient, args.RefreshInterval(), pollInterval, namespace, args.ConsoleUrl(), args.DeployToken()) - mgr.AddController(&controller.Controller{ - Name: "Stack Controller", - Do: s, - Queue: s.StackQueue, - }) - if err := mgr.Start(); err != nil { - setupLog.Errorw("unable to start controller manager", "error", err) - os.Exit(1) - } - - return mgr, sr, gr -} diff --git a/cmd/agent/args/args.go b/cmd/agent/args/args.go index f98fcd88..0e5b9f0e 100644 --- a/cmd/agent/args/args.go +++ b/cmd/agent/args/args.go @@ -29,6 +29,9 @@ const ( defaultRefreshInterval = "2m" defaultRefreshIntervalDuration = 2 * time.Minute + defaultPollInterval = "30s" + defaultPollIntervalDuration = 30 * time.Second + defaultRefreshJitter = "15s" defaultRefreshJitterDuration = 15 * time.Second @@ -38,6 +41,9 @@ const ( defaultManifestCacheTTL = "1h" defaultManifestCacheTTLDuration = time.Hour + defaultControllerCacheTTL = "30s" + defaultControllerCacheTTLDuration = 30 * time.Second + defaultRestoreNamespace = "velero" defaultProfilerPath = "/debug/pprof/" @@ -55,18 +61,20 @@ var ( argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.") argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.") - argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.") - argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.") - argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.") - argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.") - argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.") - argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.") - argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Refresh interval duration.") - argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.") - argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.") - argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.") - argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.") - argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.") + argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.") + argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.") + argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.") + argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.") + argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.") + argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.") + argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.") + argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.") + argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.") + argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.") + argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.") + argControllerCacheTTL = flag.String("controller-cache-ttl", defaultControllerCacheTTL, "The time to live of console controller cache entries.") + argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.") + argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.") serviceSet containers.Set[string] ) @@ -169,6 +177,16 @@ func RefreshInterval() time.Duration { return duration } +func PollInterval() time.Duration { + duration, err := time.ParseDuration(*argPollInterval) + if err != nil { + klog.ErrorS(err, "Could not parse poll-interval", "value", *argPollInterval, "default", defaultPollIntervalDuration) + return defaultPollIntervalDuration + } + + return duration +} + func RefreshJitter() time.Duration { jitter, err := time.ParseDuration(*argRefreshJitter) if err != nil { @@ -199,6 +217,16 @@ func ManifestCacheTTL() time.Duration { return duration } +func ControllerCacheTTL() time.Duration { + duration, err := time.ParseDuration(*argControllerCacheTTL) + if err != nil { + klog.ErrorS(err, "Could not parse controller-cache-ttl", "value", *argControllerCacheTTL, "default", defaultControllerCacheTTLDuration) + return defaultControllerCacheTTLDuration + } + + return duration +} + func RestoreNamespace() string { return *argRestoreNamespace } diff --git a/cmd/agent/console.go b/cmd/agent/console.go new file mode 100644 index 00000000..f60a54af --- /dev/null +++ b/cmd/agent/console.go @@ -0,0 +1,78 @@ +package main + +import ( + "os" + + "k8s.io/client-go/util/workqueue" + + "github.com/pluralsh/deployment-operator/cmd/agent/args" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/client" + consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/stacks" + + "k8s.io/client-go/rest" + ctrclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/namespaces" + "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" + "github.com/pluralsh/deployment-operator/pkg/controller/restore" + "github.com/pluralsh/deployment-operator/pkg/controller/service" +) + +func initConsoleManagerOrDie() *consolectrl.ControllerManager { + mgr, err := consolectrl.NewControllerManager( + consolectrl.WithMaxConcurrentReconciles(args.MaxConcurrentReconciles()), + consolectrl.WithCacheSyncTimeout(args.ProcessingTimeout()), + consolectrl.WithRefresh(args.RefreshInterval()), + consolectrl.WithJitter(args.RefreshJitter()), + consolectrl.WithRecoverPanic(true), + consolectrl.WithConsoleClientArgs(args.ConsoleUrl(), args.DeployToken()), + consolectrl.WithSocketArgs(args.ClusterId(), args.ConsoleUrl(), args.DeployToken()), + ) + if err != nil { + setupLog.Errorw("unable to create manager", "error", err) + os.Exit(1) + } + + return mgr +} + +func registerConsoleReconcilersOrDie( + mgr *controller.ControllerManager, + config *rest.Config, + k8sClient ctrclient.Client, + consoleClient client.Client, +) { + mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl()) + return r, r.SvcQueue, err + }) + + mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.PollInterval(), args.ClusterId()) + return r, r.GateQueue, err + }) + + mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + r := restore.NewRestoreReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.RestoreNamespace()) + return r, r.RestoreQueue, nil + }) + + mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + r := namespaces.NewNamespaceReconciler(consoleClient, k8sClient, args.ControllerCacheTTL()) + return r, r.NamespaceQueue, nil + }) + + mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) { + namespace, err := utils.GetOperatorNamespace() + if err != nil { + setupLog.Errorw("unable to get operator namespace", "error", err) + os.Exit(1) + } + + r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.PollInterval(), namespace, args.ConsoleUrl(), args.DeployToken()) + return r, r.StackQueue, nil + }) +} diff --git a/cmd/agent/kubernetes.go b/cmd/agent/kubernetes.go new file mode 100644 index 00000000..34958e48 --- /dev/null +++ b/cmd/agent/kubernetes.go @@ -0,0 +1,212 @@ +package main + +import ( + "net/http" + "os" + "strings" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" + rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + roclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" + "github.com/prometheus/client_golang/prometheus/promhttp" + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "github.com/pluralsh/deployment-operator/cmd/agent/args" + "github.com/pluralsh/deployment-operator/internal/controller" + consoleclient "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/common" + consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates" + "github.com/pluralsh/deployment-operator/pkg/controller/service" +) + +func initKubeManagerOrDie(config *rest.Config) manager.Manager { + mgr, err := ctrl.NewManager(config, ctrl.Options{ + Scheme: scheme, + LeaderElection: args.EnableLeaderElection(), + LeaderElectionID: "dep12loy45.plural.sh", + HealthProbeBindAddress: args.ProbeAddr(), + Metrics: server.Options{ + BindAddress: args.MetricsAddr(), + ExtraHandlers: map[string]http.Handler{ + // Default prometheus metrics path. + // We can't use /metrics as it is already taken by the + // controller manager. + "/metrics/agent": promhttp.Handler(), + }, + }, + }) + if err != nil { + setupLog.Error(err, "unable to create manager") + os.Exit(1) + } + + if err = mgr.AddHealthzCheck("ping", healthz.Ping); err != nil { + setupLog.Error(err, "unable to create health check") + os.Exit(1) + } + + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + + return mgr +} + +func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clientset, dynamicClient *dynamic.DynamicClient, kubeClient *kubernetes.Clientset) { + rolloutsClient, err := roclientset.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create rollouts client") + os.Exit(1) + } + + dynamicClient, err = dynamic.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create dynamic client") + os.Exit(1) + } + + kubeClient, err = kubernetes.NewForConfig(config) + if err != nil { + setupLog.Error(err, "unable to create kubernetes client") + os.Exit(1) + } + + return rolloutsClient, dynamicClient, kubeClient +} + +func registerKubeReconcilersOrDie( + manager ctrl.Manager, + consoleManager *consolectrl.ControllerManager, + config *rest.Config, + extConsoleClient consoleclient.Client, +) { + rolloutsClient, dynamicClient, kubeClient := initKubeClientsOrDie(config) + + backupController := &controller.BackupReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + } + restoreController := &controller.RestoreReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + } + constraintController := &controller.ConstraintReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + Reader: manager.GetCache(), + } + argoRolloutController := &controller.ArgoRolloutReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + ConsoleURL: args.ConsoleUrl(), + HttpClient: &http.Client{Timeout: httpClientTimout}, + ArgoClientSet: rolloutsClient, + DynamicClient: dynamicClient, + SvcReconciler: common.ToReconcilerOrDie[*service.ServiceReconciler]( + consoleManager.GetReconciler(service.Identifier), + ), + KubeClient: kubeClient, + } + + reconcileGroups := map[schema.GroupVersionKind]controller.SetupWithManager{ + { + Group: velerov1.SchemeGroupVersion.Group, + Version: velerov1.SchemeGroupVersion.Version, + Kind: "Backup", + }: backupController.SetupWithManager, + { + Group: velerov1.SchemeGroupVersion.Group, + Version: velerov1.SchemeGroupVersion.Version, + Kind: "Restore", + }: restoreController.SetupWithManager, + { + Group: "status.gatekeeper.sh", + Version: "v1beta1", + Kind: "ConstraintPodStatus", + }: constraintController.SetupWithManager, + { + Group: rolloutv1alpha1.SchemeGroupVersion.Group, + Version: rolloutv1alpha1.SchemeGroupVersion.Version, + Kind: rollouts.RolloutKind, + }: argoRolloutController.SetupWithManager, + } + + if err := (&controller.CrdRegisterControllerReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ReconcilerGroups: reconcileGroups, + Mgr: manager, + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "CRDRegisterController") + } + + if err := (&controller.CustomHealthReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ServiceReconciler: common.ToReconcilerOrDie[*service.ServiceReconciler]( + consoleManager.GetReconciler(service.Identifier), + ), + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "HealthConvert") + } + if err := (&controller.StackRunJobReconciler{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "StackRun") + } + + rawConsoleUrl, _ := strings.CutSuffix(args.ConsoleUrl(), "/ext/gql") + if err := (&controller.VirtualClusterController{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ExtConsoleClient: extConsoleClient, + ConsoleUrl: rawConsoleUrl, + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "VirtualCluster") + } + + if err := (&controller.UpgradeInsightsController{ + Client: manager.GetClient(), + Scheme: manager.GetScheme(), + ConsoleClient: extConsoleClient, + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "UpgradeInsights") + } + + statusController, err := controller.NewStatusReconciler(manager.GetClient()) + if err != nil { + setupLog.Error(err, "unable to create controller", "controller", "StatusController") + } + if err := statusController.SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to setup controller", "controller", "StatusController") + } + + if err = (&controller.PipelineGateReconciler{ + Client: manager.GetClient(), + GateCache: common.ToReconcilerOrDie[*pipelinegates.GateReconciler]( + consoleManager.GetReconciler(pipelinegates.Identifier), + ).GateCache, + ConsoleClient: consoleclient.New(args.ConsoleUrl(), args.DeployToken()), + Log: ctrl.Log.WithName("controllers").WithName("PipelineGate"), + Scheme: manager.GetScheme(), + }).SetupWithManager(manager); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Group") + os.Exit(1) + } +} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 8b9168c3..09db47d5 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -1,37 +1,29 @@ package main import ( - "net/http" + "context" "os" - "strings" "time" - "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" - roclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" templatesv1 "github.com/open-policy-agent/frameworks/constraint/pkg/apis/templates/v1" constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" - "github.com/prometheus/client_golang/prometheus/promhttp" - "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" "github.com/pluralsh/deployment-operator/cmd/agent/args" - "github.com/pluralsh/deployment-operator/internal/controller" "github.com/pluralsh/deployment-operator/pkg/cache" - _ "github.com/pluralsh/deployment-operator/pkg/cache" // Init cache. "github.com/pluralsh/deployment-operator/pkg/client" + consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/log" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/healthz" ) var ( @@ -51,8 +43,7 @@ func init() { } const ( - httpClientTimout = time.Second * 5 - httpCacheExpiryTime = time.Second * 2 + httpClientTimout = time.Second * 5 ) func main() { @@ -60,170 +51,53 @@ func main() { config := ctrl.GetConfigOrDie() ctx := ctrl.SetupSignalHandler() - if args.ResourceCacheEnabled() { - cache.Init(ctx, config, args.ResourceCacheTTL()) - } - - mgr, err := ctrl.NewManager(config, ctrl.Options{ - Scheme: scheme, - LeaderElection: args.EnableLeaderElection(), - LeaderElectionID: "dep12loy45.plural.sh", - HealthProbeBindAddress: args.ProbeAddr(), - Metrics: server.Options{ - BindAddress: args.MetricsAddr(), - ExtraHandlers: map[string]http.Handler{ - // Default prometheus metrics path. - // We can't use /metrics as it is already taken by the - // controller manager. - "/metrics/agent": promhttp.Handler(), - }, - }, - }) - if err != nil { - setupLog.Error(err, "unable to create manager") - os.Exit(1) - } - rolloutsClient, err := roclientset.NewForConfig(config) - if err != nil { - setupLog.Error(err, "unable to create rollouts client") - os.Exit(1) - } - dynamicClient, err := dynamic.NewForConfig(config) - if err != nil { - setupLog.Error(err, "unable to create dynamic client") - os.Exit(1) - } - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - setupLog.Error(err, "unable to create kubernetes client") - os.Exit(1) - } + extConsoleClient := client.New(args.ConsoleUrl(), args.DeployToken()) + discoveryClient := initDiscoveryClientOrDie(config) + kubeManager := initKubeManagerOrDie(config) + consoleManager := initConsoleManagerOrDie() - setupLog.Info("starting agent") - ctrlMgr, serviceReconciler, gateReconciler := runAgent(config, ctx, mgr.GetClient()) + registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient) + registerKubeReconcilersOrDie(kubeManager, consoleManager, config, extConsoleClient) - backupController := &controller.BackupReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - } - restoreController := &controller.RestoreReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - } - constraintController := &controller.ConstraintReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - Reader: mgr.GetCache(), - } - argoRolloutController := &controller.ArgoRolloutReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - ConsoleURL: args.ConsoleUrl(), - HttpClient: &http.Client{Timeout: httpClientTimout}, - ArgoClientSet: rolloutsClient, - DynamicClient: dynamicClient, - SvcReconciler: serviceReconciler, - KubeClient: kubeClient, - } - - reconcileGroups := map[schema.GroupVersionKind]controller.SetupWithManager{ - { - Group: velerov1.SchemeGroupVersion.Group, - Version: velerov1.SchemeGroupVersion.Version, - Kind: "Backup", - }: backupController.SetupWithManager, - { - Group: velerov1.SchemeGroupVersion.Group, - Version: velerov1.SchemeGroupVersion.Version, - Kind: "Restore", - }: restoreController.SetupWithManager, - { - Group: "status.gatekeeper.sh", - Version: "v1beta1", - Kind: "ConstraintPodStatus", - }: constraintController.SetupWithManager, - { - Group: rolloutv1alpha1.SchemeGroupVersion.Group, - Version: rolloutv1alpha1.SchemeGroupVersion.Version, - Kind: rollouts.RolloutKind, - }: argoRolloutController.SetupWithManager, - } + //+kubebuilder:scaffold:builder - if err = (&controller.CrdRegisterControllerReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ReconcilerGroups: reconcileGroups, - Mgr: mgr, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "CRDRegisterController") + // Start resource cache in background if enabled. + if args.ResourceCacheEnabled() { + cache.Init(ctx, config, args.ResourceCacheTTL()) } - if err = (&controller.CustomHealthReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ServiceReconciler: serviceReconciler, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "HealthConvert") - } - if err = (&controller.StackRunJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "StackRun") - } + // Start the discovery cache in background. + cache.RunDiscoveryCacheInBackgroundOrDie(ctx, discoveryClient) - rawConsoleUrl, _ := strings.CutSuffix(args.ConsoleUrl(), "/ext/gql") - if err = (&controller.VirtualClusterController{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ExtConsoleClient: ctrlMgr.GetClient(), - ConsoleUrl: rawConsoleUrl, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "VirtualCluster") - } + // Start the console manager in background. + runConsoleManagerInBackgroundOrDie(ctx, consoleManager) - if err = (&controller.UpgradeInsightsController{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConsoleClient: ctrlMgr.GetClient(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "UpgradeInsights") - } + // Start the standard kubernetes manager and block the main thread until context cancel. + runKubeManagerOrDie(ctx, kubeManager) +} - statusController, err := controller.NewStatusReconciler(mgr.GetClient()) +func initDiscoveryClientOrDie(config *rest.Config) *discovery.DiscoveryClient { + discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { - setupLog.Error(err, "unable to create controller", "controller", "StatusController") - } - if err := statusController.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to start controller", "controller", "StatusController") - } - - //+kubebuilder:scaffold:builder - - if err = (&controller.PipelineGateReconciler{ - Client: mgr.GetClient(), - GateCache: gateReconciler.GateCache, - ConsoleClient: client.New(args.ConsoleUrl(), args.DeployToken()), - Log: ctrl.Log.WithName("controllers").WithName("PipelineGate"), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Group") + setupLog.Error(err, "unable to create discovery client") os.Exit(1) } - if err = mgr.AddHealthzCheck("ping", healthz.Ping); err != nil { - setupLog.Error(err, "unable to create health check") + return discoveryClient +} + +func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.ControllerManager) { + setupLog.Info("starting console controller manager") + if err := mgr.Start(ctx); err != nil { + setupLog.Errorw("unable to start console controller manager", "error", err) os.Exit(1) } +} - setupLog.Info("starting manager") +func runKubeManagerOrDie(ctx context.Context, mgr ctrl.Manager) { + setupLog.Info("starting kubernetes controller manager") if err := mgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running manager") + setupLog.Errorw("unable to start kubernetes controller manager", "error", err) os.Exit(1) } } diff --git a/internal/kubernetes/schema/group_name.go b/internal/kubernetes/schema/group_name.go new file mode 100644 index 00000000..a6949e45 --- /dev/null +++ b/internal/kubernetes/schema/group_name.go @@ -0,0 +1,7 @@ +package schema + +type GroupName struct { + Group string + Kind string + Name string +} diff --git a/pkg/cache/discovery_cache.go b/pkg/cache/discovery_cache.go new file mode 100644 index 00000000..f4870406 --- /dev/null +++ b/pkg/cache/discovery_cache.go @@ -0,0 +1,68 @@ +package cache + +import ( + "context" + "fmt" + "time" + + cmap "github.com/orcaman/concurrent-map/v2" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/internal/metrics" + "github.com/pluralsh/deployment-operator/pkg/log" +) + +var ( + // Maps a GroupVersion to resource Kind. + discoveryCache cmap.ConcurrentMap[string, bool] +) + +func init() { + discoveryCache = cmap.New[bool]() +} + +func DiscoveryCache() cmap.ConcurrentMap[string, bool] { + return discoveryCache +} + +func RunDiscoveryCacheInBackgroundOrDie(ctx context.Context, discoveryClient *discovery.DiscoveryClient) { + log.Logger.Info("starting discovery cache") + err := helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { + if err = updateDiscoveryCache(discoveryClient); err != nil { + log.Logger.Error(err, "can't fetch API versions") + } + + metrics.Record().DiscoveryAPICacheRefresh(err) + return false, nil + }) + if err != nil { + panic(fmt.Errorf("failed to start discovery cache in background: %w", err)) + } +} + +func updateDiscoveryCache(discoveryClient *discovery.DiscoveryClient) error { + lists, err := discoveryClient.ServerPreferredResources() + + for _, list := range lists { + if len(list.APIResources) == 0 { + continue + } + + gv, err := schema.ParseGroupVersion(list.GroupVersion) + if err != nil { + continue + } + + for _, resource := range list.APIResources { + if len(resource.Verbs) == 0 { + continue + } + + discoveryCache.Set(fmt.Sprintf("%s/%s", gv.String(), resource.Kind), true) + } + } + + return err +} diff --git a/pkg/cache/resource_cache.go b/pkg/cache/resource_cache.go index 998a3511..e0af327c 100644 --- a/pkg/cache/resource_cache.go +++ b/pkg/cache/resource_cache.go @@ -12,8 +12,6 @@ import ( "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" - "github.com/pluralsh/deployment-operator/pkg/manifests" - "github.com/pluralsh/polly/containers" "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/meta" @@ -27,6 +25,7 @@ import ( "sigs.k8s.io/cli-utils/pkg/object" "github.com/pluralsh/deployment-operator/internal/kstatus/watcher" + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/common" "github.com/pluralsh/deployment-operator/pkg/log" @@ -101,8 +100,7 @@ func Init(ctx context.Context, config *rest.Config, ttl time.Duration) { w := watcher.NewDynamicStatusWatcher(dynamicClient, discoveryClient, mapper, watcher.Options{ UseCustomObjectFilter: true, ObjectFilter: nil, - //UseInformerRefCache: true, - RESTScopeStrategy: lo.ToPtr(kwatcher.RESTScopeRoot), + RESTScopeStrategy: lo.ToPtr(kwatcher.RESTScopeRoot), Filters: &kwatcher.Filters{ Labels: common.ManagedByAgentLabelSelector(), Fields: nil, @@ -226,7 +224,7 @@ func (in *ResourceCache) GetCacheStatus(key object.ObjMetadata) (*console.Compon return nil, err } in.saveResourceStatus(obj) - return common.StatusEventToComponentAttributes(*s, make(map[manifests.GroupName]string)), nil + return common.StatusEventToComponentAttributes(*s, make(map[schema.GroupName]string)), nil } func (in *ResourceCache) saveResourceStatus(resource *unstructured.Unstructured) { diff --git a/pkg/cache/resource_cache_entry.go b/pkg/cache/resource_cache_entry.go index a8e391f0..704f9bf6 100644 --- a/pkg/cache/resource_cache_entry.go +++ b/pkg/cache/resource_cache_entry.go @@ -5,8 +5,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/cli-utils/pkg/apply/event" + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/pkg/common" - "github.com/pluralsh/deployment-operator/pkg/manifests" ) type SHAType string @@ -80,5 +80,5 @@ func (in *ResourceCacheEntry) RequiresApply(manifestSHA string) bool { // SetStatus saves the last seen resource [event.StatusEvent] and converts it to a simpler // [console.ComponentAttributes] structure. func (in *ResourceCacheEntry) SetStatus(se event.StatusEvent) { - in.status = common.StatusEventToComponentAttributes(se, make(map[manifests.GroupName]string)) + in.status = common.StatusEventToComponentAttributes(se, make(map[schema.GroupName]string)) } diff --git a/pkg/common/common.go b/pkg/common/common.go index ce2623f5..16f64b55 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,10 +1,14 @@ package common import ( + "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/yaml" + + "github.com/pluralsh/deployment-operator/pkg/controller" ) const ( @@ -33,3 +37,13 @@ func Unmarshal(s string) (map[string]interface{}, error) { return result, nil } + +func ToReconcilerOrDie[R controller.Reconciler](in controller.Reconciler) R { + out, ok := in.(R) + // If cast fails panic. It means that the calling code is bad and has to be changed. + if !ok { + panic(fmt.Sprintf("%T is not a R", in)) + } + + return out +} diff --git a/pkg/common/lua.go b/pkg/common/lua.go index cba3809b..688ddb04 100644 --- a/pkg/common/lua.go +++ b/pkg/common/lua.go @@ -3,9 +3,10 @@ package common import ( "sync" - "github.com/pluralsh/deployment-operator/pkg/lua" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + + "github.com/pluralsh/deployment-operator/pkg/lua" ) func init() { diff --git a/pkg/common/status.go b/pkg/common/status.go index 43a07550..ad9841bc 100644 --- a/pkg/common/status.go +++ b/pkg/common/status.go @@ -2,21 +2,22 @@ package common import ( console "github.com/pluralsh/console/go/client" - dlog "github.com/pluralsh/deployment-operator/pkg/log" - "github.com/pluralsh/deployment-operator/pkg/manifests" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/kstatus/status" + + internalschema "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" + dlog "github.com/pluralsh/deployment-operator/pkg/log" ) -func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[manifests.GroupName]string) *console.ComponentAttributes { +func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalschema.GroupName]string) *console.ComponentAttributes { if e.Resource == nil { return nil } gvk := e.Resource.GroupVersionKind() - gname := manifests.GroupName{ + gname := internalschema.GroupName{ Group: gvk.Group, Kind: gvk.Kind, Name: e.Resource.GetName(), diff --git a/pkg/controller/controller_manager.go b/pkg/controller/controller_manager.go index b7e01b11..8b187880 100644 --- a/pkg/controller/controller_manager.go +++ b/pkg/controller/controller_manager.go @@ -4,12 +4,14 @@ import ( "context" "errors" "math/rand" + "os" "time" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" + "k8s.io/client-go/util/workqueue" "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/log" "github.com/pluralsh/deployment-operator/pkg/websocket" ) @@ -33,36 +35,24 @@ type ControllerManager struct { // started is true if the ControllerManager has been Started started bool - ctx context.Context - client client.Client Socket *websocket.Socket } -func NewControllerManager(ctx context.Context, maxConcurrentReconciles int, cacheSyncTimeout time.Duration, - refresh, jitter time.Duration, recoverPanic *bool, consoleUrl, deployToken, clusterId string) (*ControllerManager, error) { +func NewControllerManager(options ...ControllerManagerOption) (*ControllerManager, error) { + ctrl := &ControllerManager{ + Controllers: make([]*Controller, 0), + started: false, + } - socket, err := websocket.New(clusterId, consoleUrl, deployToken) - if err != nil { - if socket == nil { + for _, option := range options { + if err := option(ctrl); err != nil { return nil, err } - klog.Error(err, "could not initiate websocket connection, ignoring and falling back to polling") } - return &ControllerManager{ - Controllers: make([]*Controller, 0), - MaxConcurrentReconciles: maxConcurrentReconciles, - CacheSyncTimeout: cacheSyncTimeout, - RecoverPanic: recoverPanic, - Refresh: refresh, - Jitter: jitter, - started: false, - ctx: ctx, - client: client.New(consoleUrl, deployToken), - Socket: socket, - }, nil + return ctrl, nil } func (cm *ControllerManager) GetClient() client.Client { @@ -75,7 +65,31 @@ func (cm *ControllerManager) AddController(ctrl *Controller) { cm.Controllers = append(cm.Controllers, ctrl) } -func (cm *ControllerManager) Start() error { +func (cm *ControllerManager) GetReconciler(name string) Reconciler { + for _, ctrl := range cm.Controllers { + if ctrl.Name == name { + return ctrl.Do + } + } + + return nil +} + +func (cm *ControllerManager) AddReconcilerOrDie(name string, reconcilerGetter func() (Reconciler, workqueue.TypedRateLimitingInterface[string], error)) { + reconciler, queue, err := reconcilerGetter() + if err != nil { + log.Logger.Errorw("unable to create reconciler", "name", name, "error", err) + os.Exit(1) + } + + cm.AddController(&Controller{ + Name: name, + Do: reconciler, + Queue: queue, + }) +} + +func (cm *ControllerManager) Start(ctx context.Context) error { if cm.started { return errors.New("controller manager was started more than once") } @@ -95,12 +109,12 @@ func (cm *ControllerManager) Start() error { } pollInterval += jitterValue _ = wait.PollUntilContextCancel(context.Background(), pollInterval, true, func(_ context.Context) (done bool, err error) { - return controller.Do.Poll(cm.ctx) + return controller.Do.Poll(ctx) }) }() go func() { - controller.Start(cm.ctx) + controller.Start(ctx) }() } diff --git a/pkg/controller/controller_manager_options.go b/pkg/controller/controller_manager_options.go new file mode 100644 index 00000000..2935f64f --- /dev/null +++ b/pkg/controller/controller_manager_options.go @@ -0,0 +1,82 @@ +package controller + +import ( + "time" + + "k8s.io/klog/v2" + + "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/websocket" +) + +type ControllerManagerOption func(*ControllerManager) error + +func WithConsoleClient(client client.Client) ControllerManagerOption { + return func(o *ControllerManager) error { + o.client = client + return nil + } +} + +func WithConsoleClientArgs(url string, deployToken string) ControllerManagerOption { + return func(o *ControllerManager) error { + o.client = client.New(url, deployToken) + return nil + } +} + +func WithSocket(socket *websocket.Socket) ControllerManagerOption { + return func(o *ControllerManager) error { + o.Socket = socket + return nil + } +} + +func WithSocketArgs(clusterID, url, deployToken string) ControllerManagerOption { + return func(o *ControllerManager) (err error) { + socket, err := websocket.New(clusterID, url, deployToken) + o.Socket = socket + + if err != nil && socket != nil { + klog.Error(err, "could not initiate websocket connection, ignoring and falling back to polling") + return nil + } + + return err + } +} + +func WithMaxConcurrentReconciles(maxConcurrentReconciles int) ControllerManagerOption { + return func(o *ControllerManager) error { + o.MaxConcurrentReconciles = maxConcurrentReconciles + return nil + } +} + +func WithCacheSyncTimeout(timeout time.Duration) ControllerManagerOption { + return func(o *ControllerManager) error { + o.CacheSyncTimeout = timeout + return nil + } +} + +func WithRefresh(refresh time.Duration) ControllerManagerOption { + return func(o *ControllerManager) error { + o.Refresh = refresh + return nil + } +} + +func WithJitter(jitter time.Duration) ControllerManagerOption { + return func(o *ControllerManager) error { + o.Jitter = jitter + return nil + } +} + +func WithRecoverPanic(recoverPanic bool) ControllerManagerOption { + return func(o *ControllerManager) error { + o.RecoverPanic = &recoverPanic + return nil + } +} diff --git a/pkg/controller/namespaces/reconciler.go b/pkg/controller/namespaces/reconciler.go index 067a90e4..f897b556 100644 --- a/pkg/controller/namespaces/reconciler.go +++ b/pkg/controller/namespaces/reconciler.go @@ -24,6 +24,10 @@ import ( "github.com/pluralsh/deployment-operator/pkg/websocket" ) +const ( + Identifier = "Namespace Controller" +) + type NamespaceReconciler struct { ConsoleClient client.Client K8sClient ctrlclient.Client diff --git a/pkg/controller/pipelinegates/reconciler.go b/pkg/controller/pipelinegates/reconciler.go index 42690ece..e37f2365 100644 --- a/pkg/controller/pipelinegates/reconciler.go +++ b/pkg/controller/pipelinegates/reconciler.go @@ -6,12 +6,6 @@ import ( "time" console "github.com/pluralsh/console/go/client" - "github.com/pluralsh/deployment-operator/api/v1alpha1" - "github.com/pluralsh/deployment-operator/internal/utils" - "github.com/pluralsh/deployment-operator/pkg/client" - "github.com/pluralsh/deployment-operator/pkg/controller" - "github.com/pluralsh/deployment-operator/pkg/ping" - "github.com/pluralsh/deployment-operator/pkg/websocket" "github.com/pluralsh/polly/algorithms" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -23,6 +17,17 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/internal/utils" + "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/ping" + "github.com/pluralsh/deployment-operator/pkg/websocket" +) + +const ( + Identifier = "Gate Controller" ) type GateReconciler struct { diff --git a/pkg/controller/restore/reconciler.go b/pkg/controller/restore/reconciler.go index 67a5e6aa..1d6992f2 100644 --- a/pkg/controller/restore/reconciler.go +++ b/pkg/controller/restore/reconciler.go @@ -19,6 +19,10 @@ import ( "github.com/pluralsh/deployment-operator/pkg/websocket" ) +const ( + Identifier = "Restore Controller" +) + var ( restoreStatusMap = map[velerov1.RestorePhase]console.RestoreStatus{ velerov1.RestorePhaseNew: console.RestoreStatusCreated, diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index c657184d..f13c57d0 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -12,7 +12,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -26,10 +25,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/pluralsh/deployment-operator/cmd/agent/args" + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" agentcommon "github.com/pluralsh/deployment-operator/pkg/common" clienterrors "github.com/pluralsh/deployment-operator/internal/errors" - "github.com/pluralsh/deployment-operator/internal/helpers" "github.com/pluralsh/deployment-operator/internal/metrics" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/applier" @@ -38,12 +37,12 @@ import ( plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors" "github.com/pluralsh/deployment-operator/pkg/manifests" manis "github.com/pluralsh/deployment-operator/pkg/manifests" - "github.com/pluralsh/deployment-operator/pkg/manifests/template" "github.com/pluralsh/deployment-operator/pkg/ping" "github.com/pluralsh/deployment-operator/pkg/websocket" ) const ( + Identifier = "Service Controller" OperatorService = "deploy-operator" RestoreConfigMapName = "restore-config-map" // The field manager name for the ones agentk owns, see @@ -63,14 +62,11 @@ type ServiceReconciler struct { UtilFactory util.Factory RestoreNamespace string - mapper meta.RESTMapper - discoveryClient *discovery.DiscoveryClient - pinger *ping.Pinger - ctx context.Context + mapper meta.RESTMapper + pinger *ping.Pinger } -func NewServiceReconciler(ctx context.Context, consoleClient client.Client, config *rest.Config, refresh, manifestTTL time.Duration, restoreNamespace, consoleURL string) (*ServiceReconciler, error) { - logger := log.FromContext(ctx) +func NewServiceReconciler(consoleClient client.Client, config *rest.Config, refresh, manifestTTL time.Duration, restoreNamespace, consoleURL string) (*ServiceReconciler, error) { utils.DisableClientLimits(config) _, deployToken := consoleClient.GetCredentials() @@ -110,15 +106,6 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf return nil, err } - _ = helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) { - if err = CapabilitiesAPIVersions(discoveryClient); err != nil { - logger.Error(err, "can't fetch API versions") - } - - metrics.Record().DiscoveryAPICacheRefresh(err) - return false, nil - }) - return &ServiceReconciler{ ConsoleClient: consoleClient, Config: config, @@ -129,35 +116,12 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf UtilFactory: f, Applier: a, Destroyer: d, - discoveryClient: discoveryClient, pinger: ping.New(consoleClient, discoveryClient, f), RestoreNamespace: restoreNamespace, - ctx: ctx, mapper: mapper, }, nil } -func CapabilitiesAPIVersions(discoveryClient *discovery.DiscoveryClient) error { - lists, err := discoveryClient.ServerPreferredResources() - - for _, list := range lists { - if len(list.APIResources) == 0 { - continue - } - gv, err := schema.ParseGroupVersion(list.GroupVersion) - if err != nil { - continue - } - for _, resource := range list.APIResources { - if len(resource.Verbs) == 0 { - continue - } - template.APIVersions.Set(fmt.Sprintf("%s/%s", gv.String(), resource.Kind), true) - } - } - return err -} - func (s *ServiceReconciler) GetPollInterval() time.Duration { return 0 // use default poll interval } @@ -390,7 +354,7 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re }) metrics.Record().ServiceDeletion(id) - err = s.UpdatePruneStatus(ctx, svc, ch, map[manis.GroupName]string{}) + err = s.UpdatePruneStatus(ctx, svc, ch, map[schema.GroupName]string{}) return } diff --git a/pkg/controller/service/reconciler_status.go b/pkg/controller/service/reconciler_status.go index b01516e0..426010ac 100644 --- a/pkg/controller/service/reconciler_status.go +++ b/pkg/controller/service/reconciler_status.go @@ -13,12 +13,17 @@ import ( "sigs.k8s.io/cli-utils/pkg/print/stats" "sigs.k8s.io/controller-runtime/pkg/log" + internalschema "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/internal/metrics" "github.com/pluralsh/deployment-operator/pkg/cache" - "github.com/pluralsh/deployment-operator/pkg/manifests" ) -func (s *ServiceReconciler) UpdatePruneStatus(ctx context.Context, svc *console.GetServiceDeploymentForAgent_ServiceDeployment, ch <-chan event.Event, vcache map[manifests.GroupName]string) error { +func (s *ServiceReconciler) UpdatePruneStatus( + ctx context.Context, + svc *console.GetServiceDeploymentForAgent_ServiceDeployment, + ch <-chan event.Event, + vcache map[internalschema.GroupName]string, +) error { logger := log.FromContext(ctx) var statsCollector stats.Stats @@ -61,7 +66,7 @@ func (s *ServiceReconciler) UpdateApplyStatus( svc *console.GetServiceDeploymentForAgent_ServiceDeployment, ch <-chan event.Event, printStatus bool, - vcache map[manifests.GroupName]string, + vcache map[internalschema.GroupName]string, ) error { logger := log.FromContext(ctx) start, err := metrics.FromContext[time.Time](ctx, metrics.ContextKeyTimeStart) diff --git a/pkg/controller/service/status_collector.go b/pkg/controller/service/status_collector.go index 1b1ea1f1..4967d1bc 100644 --- a/pkg/controller/service/status_collector.go +++ b/pkg/controller/service/status_collector.go @@ -11,10 +11,10 @@ import ( "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/object" + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" "github.com/pluralsh/deployment-operator/pkg/cache" "github.com/pluralsh/deployment-operator/pkg/common" "github.com/pluralsh/deployment-operator/pkg/log" - "github.com/pluralsh/deployment-operator/pkg/manifests" ) type serviceComponentsStatusCollector struct { @@ -58,12 +58,12 @@ func (sc *serviceComponentsStatusCollector) refetch(resource *unstructured.Unstr return response } -func (sc *serviceComponentsStatusCollector) fromApplyResult(e event.ApplyEvent, vcache map[manifests.GroupName]string) *console.ComponentAttributes { +func (sc *serviceComponentsStatusCollector) fromApplyResult(e event.ApplyEvent, vcache map[schema.GroupName]string) *console.ComponentAttributes { if e.Resource == nil { return nil } gvk := e.Resource.GroupVersionKind() - gname := manifests.GroupName{ + gname := schema.GroupName{ Group: gvk.Group, Kind: gvk.Kind, Name: e.Resource.GetName(), @@ -96,7 +96,7 @@ func (sc *serviceComponentsStatusCollector) fromApplyResult(e event.ApplyEvent, } } -func (sc *serviceComponentsStatusCollector) componentsAttributes(vcache map[manifests.GroupName]string) []*console.ComponentAttributes { +func (sc *serviceComponentsStatusCollector) componentsAttributes(vcache map[schema.GroupName]string) []*console.ComponentAttributes { components := make([]*console.ComponentAttributes, 0, len(sc.latestStatus)) if sc.DryRun { @@ -123,7 +123,7 @@ func (sc *serviceComponentsStatusCollector) componentsAttributes(vcache map[mani log.Logger.Error(err, "failed to get cache status") continue } - gname := manifests.GroupName{ + gname := schema.GroupName{ Group: e.Group, Kind: e.Kind, Name: e.Name, diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index 58acf597..0f746bdb 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -18,6 +18,10 @@ import ( "github.com/pluralsh/deployment-operator/pkg/websocket" ) +const ( + Identifier = "Stack Controller" +) + type StackReconciler struct { ConsoleClient client.Client K8sClient ctrlclient.Client diff --git a/pkg/manifests/template/helm.go b/pkg/manifests/template/helm.go index b007eb6f..3efcbcff 100644 --- a/pkg/manifests/template/helm.go +++ b/pkg/manifests/template/helm.go @@ -13,7 +13,6 @@ import ( "time" "github.com/gofrs/flock" - cmap "github.com/orcaman/concurrent-map/v2" "github.com/pkg/errors" console "github.com/pluralsh/console/go/client" "github.com/pluralsh/polly/algorithms" @@ -35,6 +34,7 @@ import ( "sigs.k8s.io/yaml" "github.com/pluralsh/deployment-operator/cmd/agent/args" + "github.com/pluralsh/deployment-operator/pkg/cache" loglevel "github.com/pluralsh/deployment-operator/pkg/log" ) @@ -57,11 +57,9 @@ func init() { settings.RepositoryCache = dir settings.RepositoryConfig = path.Join(dir, "repositories.yaml") settings.KubeInsecureSkipTLSVerify = true - APIVersions = cmap.New[bool]() } var settings = cli.New() -var APIVersions cmap.ConcurrentMap[string, bool] func debug(format string, v ...interface{}) { format = fmt.Sprintf("INFO: %s\n", format) @@ -230,7 +228,7 @@ func (h *helm) templateHelm(conf *action.Configuration, release, namespace strin return nil, err } client.KubeVersion = vsn - client.APIVersions = algorithms.MapKeys[string, bool](APIVersions.Items()) + client.APIVersions = algorithms.MapKeys[string, bool](cache.DiscoveryCache().Items()) return client.Run(chart, values) } diff --git a/pkg/manifests/versions.go b/pkg/manifests/versions.go index 9f3f1adb..b1cf89f6 100644 --- a/pkg/manifests/versions.go +++ b/pkg/manifests/versions.go @@ -2,19 +2,15 @@ package manifests import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) -type GroupName struct { - Group string - Kind string - Name string -} + "github.com/pluralsh/deployment-operator/internal/kubernetes/schema" +) -func VersionCache(manifests []*unstructured.Unstructured) map[GroupName]string { - res := map[GroupName]string{} +func VersionCache(manifests []*unstructured.Unstructured) map[schema.GroupName]string { + res := map[schema.GroupName]string{} for _, man := range manifests { gvk := man.GroupVersionKind() - name := GroupName{ + name := schema.GroupName{ Group: gvk.Group, Kind: gvk.Kind, Name: man.GetName(),