From 77fa40713d9e601f10df875328dbca15b85d704a Mon Sep 17 00:00:00 2001 From: Maanas M Singh <59373661+Maanas-23@users.noreply.github.com> Date: Thu, 2 Jan 2025 19:49:54 +0530 Subject: [PATCH] Call reconcile from server (#58) * calling reconcile from server * reconcile if crd is not found in CRDDirectory * remove ForceReconcile function * fix goimports lint and update error messages * undo accidental change * fix goimports lint --------- Co-authored-by: Shubham Rai --- operator/cmd/main.go | 7 +++-- .../internal/elastiserver/elastiServer.go | 29 ++++++++++++++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/operator/cmd/main.go b/operator/cmd/main.go index 3207f45..f655f1a 100644 --- a/operator/cmd/main.go +++ b/operator/cmd/main.go @@ -175,19 +175,20 @@ func mainWithError() error { informerManager.Start() // Set up the ElastiService controller - if err = (&controller.ElastiServiceReconciler{ + reconciler := &controller.ElastiServiceReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Logger: zapLogger, InformerManager: informerManager, - }).SetupWithManager(mgr); err != nil { + } + if err = reconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ElastiService") sentry.CaptureException(err) return fmt.Errorf("main: %w", err) } // Start the elasti server - eServer := elastiserver.NewServer(zapLogger, mgr.GetConfig(), 30*time.Second) + eServer := elastiserver.NewServer(zapLogger, mgr.GetConfig(), 30*time.Second, reconciler) errChan := make(chan error, 1) go func() { if err := eServer.Start(elastiServerPort); err != nil { diff --git a/operator/internal/elastiserver/elastiServer.go b/operator/internal/elastiserver/elastiServer.go index 55330b5..ceab36a 100644 --- a/operator/internal/elastiserver/elastiServer.go +++ b/operator/internal/elastiserver/elastiServer.go @@ -14,11 +14,14 @@ import ( sentryhttp "github.com/getsentry/sentry-go/http" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "truefoundry/elasti/operator/internal/crddirectory" "truefoundry/elasti/operator/internal/prom" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/truefoundry/elasti/pkg/k8shelper" "github.com/truefoundry/elasti/pkg/messages" @@ -39,10 +42,11 @@ type ( scaleLocks sync.Map // rescaleDuration is the duration to wait before checking to rescaling the target rescaleDuration time.Duration + reconciler reconcile.Reconciler } ) -func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Duration) *Server { +func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Duration, reconciler reconcile.Reconciler) *Server { // Get Ops client k8sUtil := k8shelper.NewOps(logger, config) return &Server{ @@ -50,6 +54,7 @@ func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Dur k8shelper: k8sUtil, // rescaleDuration is the duration to wait before checking to rescaling the target rescaleDuration: rescaleDuration, + reconciler: reconciler, } } @@ -148,7 +153,7 @@ func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) { zap.String("namespace", body.Namespace)) } -func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error { +func (s *Server) scaleTargetForService(ctx context.Context, serviceName, namespace string) error { scaleMutex, loaded := s.getMutexForServiceScale(serviceName) if loaded { s.logger.Debug("Scale target lock already exists", zap.String("service", serviceName)) @@ -157,10 +162,26 @@ func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace scaleMutex.Lock() defer s.logger.Debug("Scale target lock released", zap.String("service", serviceName)) s.logger.Debug("Scale target lock taken", zap.String("service", serviceName)) + crd, found := crddirectory.CRDDirectory.GetCRD(serviceName) if !found { - s.releaseMutexForServiceScale(serviceName) - return fmt.Errorf("scaleTargetForService - error: failed to get CRD details from directory, serviceName: %s", serviceName) + s.logger.Debug("Failed to get CRD details from directory, running reconcile") + _, err := s.reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: serviceName, + Namespace: namespace, + }, + }) + if err != nil { + s.releaseMutexForServiceScale(serviceName) + return fmt.Errorf("scaleTargetForService - error: reconcile failed, serviceName: %s, %w", serviceName, err) + } + + crd, found = crddirectory.CRDDirectory.GetCRD(serviceName) + if !found { + s.releaseMutexForServiceScale(serviceName) + return fmt.Errorf("scaleTargetForService - error: failed to get CRD details from directory even after reconcile, serviceName: %s", serviceName) + } } if err := s.k8shelper.ScaleTargetWhenAtZero(namespace, crd.Spec.ScaleTargetRef.Name, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {