Skip to content

Commit

Permalink
feat: reafctor agent in preparation for console manager supervisor (#265
Browse files Browse the repository at this point in the history
)

* refactor operator initialization

* fix circular dependency issue

* fix lint

* fix merge conflicts
  • Loading branch information
floreks authored Sep 16, 2024
1 parent 72ca352 commit e3cdc8d
Show file tree
Hide file tree
Showing 23 changed files with 636 additions and 377 deletions.
98 changes: 0 additions & 98 deletions cmd/agent/agent.go

This file was deleted.

52 changes: 40 additions & 12 deletions cmd/agent/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
defaultRefreshInterval = "2m"
defaultRefreshIntervalDuration = 2 * time.Minute

defaultPollInterval = "30s"
defaultPollIntervalDuration = 30 * time.Second

defaultRefreshJitter = "15s"
defaultRefreshJitterDuration = 15 * time.Second

Expand All @@ -38,6 +41,9 @@ const (
defaultManifestCacheTTL = "1h"
defaultManifestCacheTTLDuration = time.Hour

defaultControllerCacheTTL = "30s"
defaultControllerCacheTTLDuration = 30 * time.Second

defaultRestoreNamespace = "velero"

defaultProfilerPath = "/debug/pprof/"
Expand All @@ -55,18 +61,20 @@ var (
argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.")
argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.")

argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Refresh interval duration.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.")
argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.")
argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.")
argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argControllerCacheTTL = flag.String("controller-cache-ttl", defaultControllerCacheTTL, "The time to live of console controller cache entries.")
argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.")
argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.")

serviceSet containers.Set[string]
)
Expand Down Expand Up @@ -169,6 +177,16 @@ func RefreshInterval() time.Duration {
return duration
}

func PollInterval() time.Duration {
duration, err := time.ParseDuration(*argPollInterval)
if err != nil {
klog.ErrorS(err, "Could not parse poll-interval", "value", *argPollInterval, "default", defaultPollIntervalDuration)
return defaultPollIntervalDuration
}

return duration
}

func RefreshJitter() time.Duration {
jitter, err := time.ParseDuration(*argRefreshJitter)
if err != nil {
Expand Down Expand Up @@ -199,6 +217,16 @@ func ManifestCacheTTL() time.Duration {
return duration
}

func ControllerCacheTTL() time.Duration {
duration, err := time.ParseDuration(*argControllerCacheTTL)
if err != nil {
klog.ErrorS(err, "Could not parse controller-cache-ttl", "value", *argControllerCacheTTL, "default", defaultControllerCacheTTLDuration)
return defaultControllerCacheTTLDuration
}

return duration
}

func RestoreNamespace() string {
return *argRestoreNamespace
}
Expand Down
78 changes: 78 additions & 0 deletions cmd/agent/console.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"os"

"k8s.io/client-go/util/workqueue"

"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/stacks"

"k8s.io/client-go/rest"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/namespaces"
"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
"github.com/pluralsh/deployment-operator/pkg/controller/restore"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

func initConsoleManagerOrDie() *consolectrl.ControllerManager {
mgr, err := consolectrl.NewControllerManager(
consolectrl.WithMaxConcurrentReconciles(args.MaxConcurrentReconciles()),
consolectrl.WithCacheSyncTimeout(args.ProcessingTimeout()),
consolectrl.WithRefresh(args.RefreshInterval()),
consolectrl.WithJitter(args.RefreshJitter()),
consolectrl.WithRecoverPanic(true),
consolectrl.WithConsoleClientArgs(args.ConsoleUrl(), args.DeployToken()),
consolectrl.WithSocketArgs(args.ClusterId(), args.ConsoleUrl(), args.DeployToken()),
)
if err != nil {
setupLog.Errorw("unable to create manager", "error", err)
os.Exit(1)
}

return mgr
}

func registerConsoleReconcilersOrDie(
mgr *controller.ControllerManager,
config *rest.Config,
k8sClient ctrclient.Client,
consoleClient client.Client,
) {
mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl())
return r, r.SvcQueue, err
})

mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.PollInterval(), args.ClusterId())
return r, r.GateQueue, err
})

mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r := restore.NewRestoreReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.RestoreNamespace())
return r, r.RestoreQueue, nil
})

mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r := namespaces.NewNamespaceReconciler(consoleClient, k8sClient, args.ControllerCacheTTL())
return r, r.NamespaceQueue, nil
})

mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
namespace, err := utils.GetOperatorNamespace()
if err != nil {
setupLog.Errorw("unable to get operator namespace", "error", err)
os.Exit(1)
}

r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.PollInterval(), namespace, args.ConsoleUrl(), args.DeployToken())
return r, r.StackQueue, nil
})
}
Loading

0 comments on commit e3cdc8d

Please sign in to comment.