Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add console manager supervisor logic w/ restart option #270

Merged
Merged
31 changes: 16 additions & 15 deletions cmd/agent/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
defaultPollInterval = "30s"
defaultPollIntervalDuration = 30 * time.Second

defaultRefreshJitter = "15s"
defaultRefreshJitterDuration = 15 * time.Second
defaultPollJitter = "15s"
defaultPollJitterDuration = 15 * time.Second

defaultResourceCacheTTL = "1h"
defaultResourceCacheTTLDuration = time.Hour
Expand Down Expand Up @@ -61,15 +61,16 @@ var (
argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.")
argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.")

argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.")
argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argClusterId = flag.String("cluster-id", "", "The ID of the cluster being connected to.")
argConsoleUrl = flag.String("console-url", "", "The URL of the console api to fetch services from.")
argDeployToken = flag.String("deploy-token", helpers.GetEnv(EnvDeployToken, ""), "The deploy token to auth to Console API with.")
argProbeAddr = flag.String("health-probe-bind-address", defaultProbeAddress, "The address the probe endpoint binds to.")
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Time interval to recheck the websocket connection.")
argPollInterval = flag.String("poll-interval", defaultPollInterval, "Time interval to poll resources from the Console API.")
// TODO: ensure this arg can be safely renamed without causing breaking changes.
argPollJitter = flag.String("refresh-jitter", defaultPollJitter, "Randomly selected jitter time up to the provided duration will be added to the poll interval.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argControllerCacheTTL = flag.String("controller-cache-ttl", defaultControllerCacheTTL, "The time to live of console controller cache entries.")
Expand Down Expand Up @@ -187,11 +188,11 @@ func PollInterval() time.Duration {
return duration
}

func RefreshJitter() time.Duration {
jitter, err := time.ParseDuration(*argRefreshJitter)
func PollJitter() time.Duration {
jitter, err := time.ParseDuration(*argPollJitter)
if err != nil {
klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argRefreshJitter, "default", defaultRefreshJitterDuration)
return defaultRefreshJitterDuration
klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argPollJitter, "default", defaultPollJitterDuration)
return defaultPollJitterDuration
}

return jitter
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/args/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"net/http"
"net/http/pprof"

"github.com/pluralsh/deployment-operator/pkg/log"
"k8s.io/klog/v2"
)

func initProfiler() {
log.Logger.Info("initializing profiler")
klog.Info("initializing profiler")

mux := http.NewServeMux()
mux.HandleFunc(defaultProfilerPath, pprof.Index)
go func() {
if err := http.ListenAndServe(defaultProfilerAddress, mux); err != nil {
log.Logger.Fatal(err)
klog.Fatal(err)
}
}()
}
36 changes: 17 additions & 19 deletions cmd/agent/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"os"

"k8s.io/client-go/util/workqueue"

"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
Expand All @@ -21,58 +19,58 @@ import (
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

func initConsoleManagerOrDie() *consolectrl.ControllerManager {
func initConsoleManagerOrDie() *consolectrl.Manager {
mgr, err := consolectrl.NewControllerManager(
consolectrl.WithMaxConcurrentReconciles(args.MaxConcurrentReconciles()),
consolectrl.WithCacheSyncTimeout(args.ProcessingTimeout()),
consolectrl.WithRefresh(args.RefreshInterval()),
consolectrl.WithJitter(args.RefreshJitter()),
consolectrl.WithPollInterval(args.PollInterval()),
consolectrl.WithJitter(args.PollJitter()),
consolectrl.WithRecoverPanic(true),
consolectrl.WithConsoleClientArgs(args.ConsoleUrl(), args.DeployToken()),
consolectrl.WithSocketArgs(args.ClusterId(), args.ConsoleUrl(), args.DeployToken()),
)
if err != nil {
setupLog.Errorw("unable to create manager", "error", err)
setupLog.Error(err, "unable to create manager")
os.Exit(1)
}

return mgr
}

func registerConsoleReconcilersOrDie(
mgr *controller.ControllerManager,
mgr *controller.Manager,
config *rest.Config,
k8sClient ctrclient.Client,
consoleClient client.Client,
) {
mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
mgr.AddReconcilerOrDie(service.Identifier, func() (controller.Reconciler, error) {
r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl())
return r, r.SvcQueue, err
return r, err
})

mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.PollInterval(), args.ClusterId())
return r, r.GateQueue, err
mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (controller.Reconciler, error) {
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.PollInterval())
return r, err
})

mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
mgr.AddReconcilerOrDie(restore.Identifier, func() (controller.Reconciler, error) {
r := restore.NewRestoreReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.RestoreNamespace())
return r, r.RestoreQueue, nil
return r, nil
})

mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
mgr.AddReconcilerOrDie(namespaces.Identifier, func() (controller.Reconciler, error) {
r := namespaces.NewNamespaceReconciler(consoleClient, k8sClient, args.ControllerCacheTTL())
return r, r.NamespaceQueue, nil
return r, nil
})

mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, workqueue.TypedRateLimitingInterface[string], error) {
mgr.AddReconcilerOrDie(stacks.Identifier, func() (controller.Reconciler, error) {
namespace, err := utils.GetOperatorNamespace()
if err != nil {
setupLog.Errorw("unable to get operator namespace", "error", err)
setupLog.Error(err, "unable to get operator namespace")
os.Exit(1)
}

r := stacks.NewStackReconciler(consoleClient, k8sClient, args.ControllerCacheTTL(), args.PollInterval(), namespace, args.ConsoleUrl(), args.DeployToken())
return r, r.StackQueue, nil
return r, nil
})
}
15 changes: 5 additions & 10 deletions cmd/agent/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (

"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/internal/controller"
"github.com/pluralsh/deployment-operator/pkg/cache"
consoleclient "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/common"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

func initKubeManagerOrDie(config *rest.Config) manager.Manager {
mgr, err := ctrl.NewManager(config, ctrl.Options{
Logger: setupLog,
Scheme: scheme,
LeaderElection: args.EnableLeaderElection(),
LeaderElectionID: "dep12loy45.plural.sh",
Expand Down Expand Up @@ -86,7 +87,7 @@ func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clie

func registerKubeReconcilersOrDie(
manager ctrl.Manager,
consoleManager *consolectrl.ControllerManager,
consoleManager *consolectrl.Manager,
config *rest.Config,
extConsoleClient consoleclient.Client,
) {
Expand Down Expand Up @@ -157,9 +158,6 @@ func registerKubeReconcilersOrDie(
if err := (&controller.CustomHealthReconciler{
Client: manager.GetClient(),
Scheme: manager.GetScheme(),
ServiceReconciler: common.ToReconcilerOrDie[*service.ServiceReconciler](
consoleManager.GetReconciler(service.Identifier),
),
}).SetupWithManager(manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HealthConvert")
}
Expand Down Expand Up @@ -198,13 +196,10 @@ func registerKubeReconcilersOrDie(
}

if err = (&controller.PipelineGateReconciler{
Client: manager.GetClient(),
GateCache: common.ToReconcilerOrDie[*pipelinegates.GateReconciler](
consoleManager.GetReconciler(pipelinegates.Identifier),
).GateCache,
Client: manager.GetClient(),
ConsoleClient: consoleclient.New(args.ConsoleUrl(), args.DeployToken()),
Log: ctrl.Log.WithName("controllers").WithName("PipelineGate"),
Scheme: manager.GetScheme(),
GateCache: cache.GateCache(),
}).SetupWithManager(manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Group")
os.Exit(1)
Expand Down
27 changes: 15 additions & 12 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ import (
constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/pkg/cache"
"github.com/pluralsh/deployment-operator/pkg/client"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/log"
"k8s.io/klog/v2"

velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"

deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/cmd/agent/args"
"github.com/pluralsh/deployment-operator/pkg/cache"
"github.com/pluralsh/deployment-operator/pkg/client"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
)

var (
scheme = runtime.NewScheme()
setupLog = log.Logger
setupLog = klog.NewKlogr()
)

func init() {
Expand All @@ -49,13 +49,16 @@ const (
func main() {
args.Init()
config := ctrl.GetConfigOrDie()
ctx := ctrl.SetupSignalHandler()
ctx := ctrl.LoggerInto(ctrl.SetupSignalHandler(), setupLog)

extConsoleClient := client.New(args.ConsoleUrl(), args.DeployToken())
discoveryClient := initDiscoveryClientOrDie(config)
kubeManager := initKubeManagerOrDie(config)
consoleManager := initConsoleManagerOrDie()

// Initialize Pipeline Gate Cache
cache.InitGateCache(args.ControllerCacheTTL(), extConsoleClient)

registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient)
registerKubeReconcilersOrDie(kubeManager, consoleManager, config, extConsoleClient)

Expand Down Expand Up @@ -86,18 +89,18 @@ func initDiscoveryClientOrDie(config *rest.Config) *discovery.DiscoveryClient {
return discoveryClient
}

func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.ControllerManager) {
func runConsoleManagerInBackgroundOrDie(ctx context.Context, mgr *consolectrl.Manager) {
setupLog.Info("starting console controller manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Errorw("unable to start console controller manager", "error", err)
setupLog.Error(err, "unable to start console controller manager")
os.Exit(1)
}
}

func runKubeManagerOrDie(ctx context.Context, mgr ctrl.Manager) {
setupLog.Info("starting kubernetes controller manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Errorw("unable to start kubernetes controller manager", "error", err)
setupLog.Error(err, "unable to start kubernetes controller manager")
os.Exit(1)
}
}
6 changes: 3 additions & 3 deletions internal/controller/argorollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
"github.com/pluralsh/deployment-operator/pkg/controller"
)

const requeueArgoRolloutAfter = time.Second * 5
Expand All @@ -39,7 +39,7 @@ type ArgoRolloutReconciler struct {
ArgoClientSet roclientset.Interface
DynamicClient dynamic.Interface
KubeClient kubernetes.Interface
SvcReconciler *service.ServiceReconciler
SvcReconciler controller.Reconciler
}

// Reconcile Argo Rollout custom resources to ensure that Console stays in sync with Kubernetes cluster.
Expand Down Expand Up @@ -107,7 +107,7 @@ func (r *ArgoRolloutReconciler) promote(ctx context.Context, rolloutIf clientset
}

if r.SvcReconciler != nil {
r.SvcReconciler.SvcQueue.AddRateLimited(svcId)
r.SvcReconciler.Queue().AddRateLimited(svcId)
}
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions internal/controller/customhealth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ import (
"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/common"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

// CustomHealthReconciler reconciles a LuaScript object
type CustomHealthReconciler struct {
client.Client
Scheme *runtime.Scheme
ServiceReconciler *service.ServiceReconciler
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=deployments.plural.sh,resources=customhealths,verbs=get;list;watch;create;update;patch;delete
Expand Down
12 changes: 5 additions & 7 deletions internal/controller/customhealth_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/pkg/common"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/pkg/common"
)

var _ = Describe("Customhealt Controller", Ordered, func() {
Expand Down Expand Up @@ -66,11 +66,9 @@ var _ = Describe("Customhealt Controller", Ordered, func() {
Conditions: []metav1.Condition{},
},
}
sr := &service.ServiceReconciler{}
reconciler := &CustomHealthReconciler{
Client: kClient,
Scheme: kClient.Scheme(),
ServiceReconciler: sr,
Client: kClient,
Scheme: kClient.Scheme(),
}
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
Expand Down
Loading
Loading