Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reafctor agent in preparation for console manager supervisor #265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 0 additions & 98 deletions cmd/agent/agent.go

This file was deleted.

52 changes: 40 additions & 12 deletions cmd/agent/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
defaultRefreshInterval = "2m"
defaultRefreshIntervalDuration = 2 * time.Minute

defaultPollInterval = "30s"
defaultPollIntervalDuration = 30 * time.Second

defaultRefreshJitter = "15s"
defaultRefreshJitterDuration = 15 * time.Second

Expand All @@ -38,6 +41,9 @@ const (
defaultManifestCacheTTL = "1h"
defaultManifestCacheTTLDuration = time.Hour

defaultControllerCacheTTL = "30s"
defaultControllerCacheTTLDuration = 30 * time.Second

defaultRestoreNamespace = "velero"

defaultProfilerPath = "/debug/pprof/"
Expand All @@ -55,18 +61,20 @@ var (
argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.")
argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.")

argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Refresh interval duration.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.")
argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.")
argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.")
argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argControllerCacheTTL = flag.String("controller-cache-ttl", defaultControllerCacheTTL, "The time to live of console controller cache entries.")
argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.")
argServices = flag.String("services", "", "A comma separated list of service ids to reconcile. Leave empty to reconcile all.")

serviceSet containers.Set[string]
)
Expand Down Expand Up @@ -169,6 +177,16 @@ func RefreshInterval() time.Duration {
return duration
}

func PollInterval() time.Duration {
duration, err := time.ParseDuration(*argPollInterval)
if err != nil {
klog.ErrorS(err, "Could not parse poll-interval", "value", *argPollInterval, "default", defaultPollIntervalDuration)
return defaultPollIntervalDuration
}

return duration
}

func RefreshJitter() time.Duration {
jitter, err := time.ParseDuration(*argRefreshJitter)
if err != nil {
Expand Down Expand Up @@ -199,6 +217,16 @@ func ManifestCacheTTL() time.Duration {
return duration
}

func ControllerCacheTTL() time.Duration {
duration, err := time.ParseDuration(*argControllerCacheTTL)
if err != nil {
klog.ErrorS(err, "Could not parse controller-cache-ttl", "value", *argControllerCacheTTL, "default", defaultControllerCacheTTLDuration)
return defaultControllerCacheTTLDuration
}

return duration
}

func RestoreNamespace() string {
return *argRestoreNamespace
}
Expand Down
78 changes: 78 additions & 0 deletions cmd/agent/console.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"os"

"k8s.io/client-go/util/workqueue"

"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/stacks"

"k8s.io/client-go/rest"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/namespaces"
"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
"github.com/pluralsh/deployment-operator/pkg/controller/restore"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

func initConsoleManagerOrDie() *consolectrl.ControllerManager {
mgr, err := consolectrl.NewControllerManager(
consolectrl.WithMaxConcurrentReconciles(args.MaxConcurrentReconciles()),
consolectrl.WithCacheSyncTimeout(args.ProcessingTimeout()),
consolectrl.WithRefresh(args.RefreshInterval()),
consolectrl.WithJitter(args.RefreshJitter()),
consolectrl.WithRecoverPanic(true),
consolectrl.WithConsoleClientArgs(args.ConsoleUrl(), args.DeployToken()),
consolectrl.WithSocketArgs(args.ClusterId(), args.ConsoleUrl(), args.DeployToken()),
)
if err != nil {
setupLog.Errorw("unable to create manager", "error", err)
os.Exit(1)
}

return mgr
}

func registerConsoleReconcilersOrDie(
mgr *controller.ControllerManager,
config *rest.Config,
k8sClient ctrclient.Client,
consoleClient client.Client,
) {
mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl())
return r, r.SvcQueue, err
})

mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.PollInterval(), args.ClusterId())
return r, r.GateQueue, err
})

mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r := restore.NewRestoreReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.RestoreNamespace())
return r, r.RestoreQueue, nil
})

mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r := namespaces.NewNamespaceReconciler(consoleClient, k8sClient, args.ControllerCacheTTL())
return r, r.NamespaceQueue, nil
})

mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
namespace, err := utils.GetOperatorNamespace()
if err != nil {
setupLog.Errorw("unable to get operator namespace", "error", err)
os.Exit(1)
}

r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.PollInterval(), namespace, args.ConsoleUrl(), args.DeployToken())
return r, r.StackQueue, nil
})
}
Loading
Loading