diff --git a/README.md b/README.md index ebd9bcf..bddd741 100644 --- a/README.md +++ b/README.md @@ -376,6 +376,9 @@ docker push localhost:5000/elasti-resolver:v1alpha1 We will build and publish our Operator changes. 1. Go into the operator directory. +```bash +cd operator +``` 2. Run the build and publish command. ```bash diff --git a/operator/Makefile b/operator/Makefile index 016f82e..ac9b7ed 100644 --- a/operator/Makefile +++ b/operator/Makefile @@ -154,7 +154,7 @@ $(LOCALBIN): ## Tool Binaries KUBECTL ?= kubectl KUSTOMIZE ?= $(LOCALBIN)/kustomize-$(KUSTOMIZE_VERSION) -CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION) +CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION) ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION) GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION) diff --git a/operator/cmd/main.go b/operator/cmd/main.go index c9f2f54..cd52548 100644 --- a/operator/cmd/main.go +++ b/operator/cmd/main.go @@ -169,15 +169,16 @@ func mainWithError() error { // Start the shared CRD Directory crddirectory.INITDirectory(zapLogger) - // Initiate and start the shared Informer manager - Informer := informer.NewInformerManager(zapLogger, mgr.GetConfig()) - Informer.Start() + // Initiate and start the shared informerManager manager + informerManager := informer.NewInformerManager(zapLogger, mgr.GetConfig()) + informerManager.Start() // Set up the ElastiService controller if err = (&controller.ElastiServiceReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), Logger: zapLogger, - Informer: Informer, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Logger: zapLogger, + InformerManager: informerManager, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ElastiService") sentry.CaptureException(err) @@ -186,7 +187,21 @@ func mainWithError() error { // Start the elasti server eServer := elastiserver.NewServer(zapLogger, mgr.GetConfig(), 30*time.Second) - go eServer.Start(elastiServerPort) + errChan := make(chan error, 1) + go func() { + if err := eServer.Start(elastiServerPort); err != nil { + setupLog.Error(err, "elasti server failed to start") + sentry.CaptureException(err) + errChan <- fmt.Errorf("elasti server: %w", err) + } + }() + + // Add error channel check before manager start + select { + case err := <-errChan: + return fmt.Errorf("main: %w", err) + default: + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/operator/internal/controller/elastiservice_controller.go b/operator/internal/controller/elastiservice_controller.go index 5093f44..50a0a3b 100644 --- a/operator/internal/controller/elastiservice_controller.go +++ b/operator/internal/controller/elastiservice_controller.go @@ -29,7 +29,7 @@ type ( client.Client Scheme *kRuntime.Scheme Logger *zap.Logger - Informer *informer.Manager + InformerManager *informer.Manager SwitchModeLocks sync.Map InformerStartLocks sync.Map ReconcileLocks sync.Map @@ -64,7 +64,7 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques mutex.Lock() defer r.Logger.Debug("- Out of Reconcile", zap.String("es", req.NamespacedName.String())) defer mutex.Unlock() - start := time.Now() + startTime := time.Now() defer func() { e := values.Success @@ -72,7 +72,7 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques e = err.Error() sentry.CaptureException(err) } - duration := time.Since(start).Seconds() + duration := time.Since(startTime).Seconds() prom.CRDReconcileHistogram.WithLabelValues(req.String(), e).Observe(duration) }() diff --git a/operator/internal/controller/opsCRD.go b/operator/internal/controller/opsCRD.go index c4982be..df5bd55 100644 --- a/operator/internal/controller/opsCRD.go +++ b/operator/internal/controller/opsCRD.go @@ -79,7 +79,7 @@ func (r *ElastiServiceReconciler) finalizeCRD(ctx context.Context, es *v1alpha1. // Stop all active informers related to this CRD in background go func() { defer wg.Done() - r.Informer.StopForCRD(req.Name) + r.InformerManager.StopForCRD(req.Name) r.Logger.Info("[Done] Informer stopped for CRD", zap.String("es", req.String())) // Reset the informer start mutex, so if the ElastiService is recreated, we will need to reset the informer r.resetMutexForInformer(r.getMutexKeyForTargetRef(req)) @@ -134,13 +134,13 @@ func (r *ElastiServiceReconciler) watchScaleTargetRef(ctx context.Context, es *v es.Spec.ScaleTargetRef.Kind != crd.Spec.ScaleTargetRef.Kind || es.Spec.ScaleTargetRef.APIVersion != crd.Spec.ScaleTargetRef.APIVersion { r.Logger.Debug("ScaleTargetRef has changed, stopping previous informer.", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef)) - key := r.Informer.GetKey(informer.KeyParams{ + key := r.InformerManager.GetKey(informer.KeyParams{ Namespace: req.Namespace, CRDName: req.Name, ResourceName: crd.Spec.ScaleTargetRef.Name, Resource: strings.ToLower(crd.Spec.ScaleTargetRef.Kind), }) - err := r.Informer.StopInformer(key) + err := r.InformerManager.StopInformer(key) if err != nil { r.Logger.Error("Failed to stop informer for old scaleTargetRef", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef), zap.Error(err)) } @@ -156,7 +156,7 @@ func (r *ElastiServiceReconciler) watchScaleTargetRef(ctx context.Context, es *v informerErr = fmt.Errorf("failed to parse API version: %w", err) return } - if err := r.Informer.Add(&informer.RequestWatch{ + if err := r.InformerManager.Add(&informer.RequestWatch{ Req: req, ResourceName: es.Spec.ScaleTargetRef.Name, ResourceNamespace: req.Namespace, @@ -184,7 +184,7 @@ func (r *ElastiServiceReconciler) watchPublicService(ctx context.Context, es *v1 } var informerErr error r.getMutexForInformerStart(r.getMutexKeyForPublicSVC(req)).Do(func() { - if err := r.Informer.Add(&informer.RequestWatch{ + if err := r.InformerManager.Add(&informer.RequestWatch{ Req: req, ResourceName: es.Spec.Service, ResourceNamespace: es.Namespace, diff --git a/operator/internal/controller/opsInformer.go b/operator/internal/controller/opsInformer.go index 606a85a..26fcf0a 100644 --- a/operator/internal/controller/opsInformer.go +++ b/operator/internal/controller/opsInformer.go @@ -40,7 +40,7 @@ func (r *ElastiServiceReconciler) getMutexKeyForTargetRef(req ctrl.Request) stri } func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs { - key := r.Informer.GetKey(informer.KeyParams{ + key := r.InformerManager.GetKey(informer.KeyParams{ Namespace: resolverNamespace, CRDName: req.Name, ResourceName: resolverDeploymentName, @@ -84,7 +84,7 @@ func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context, } func (r *ElastiServiceReconciler) getPublicServiceChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs { - key := r.Informer.GetKey(informer.KeyParams{ + key := r.InformerManager.GetKey(informer.KeyParams{ Namespace: resolverNamespace, CRDName: req.Name, ResourceName: es.Spec.Service, @@ -123,7 +123,7 @@ func (r *ElastiServiceReconciler) getPublicServiceChangeHandler(ctx context.Cont } func (r *ElastiServiceReconciler) getScaleTargetRefChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs { - key := r.Informer.GetKey(informer.KeyParams{ + key := r.InformerManager.GetKey(informer.KeyParams{ Namespace: req.Namespace, CRDName: req.Name, ResourceName: es.Spec.ScaleTargetRef.Kind, diff --git a/operator/internal/controller/opsModes.go b/operator/internal/controller/opsModes.go index a188655..20dd997 100644 --- a/operator/internal/controller/opsModes.go +++ b/operator/internal/controller/opsModes.go @@ -83,7 +83,7 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl. r.Logger.Info("3. Created or updated endpointslice to resolver", zap.String("service", targetSVC.Name)) // Watch for changes in resolver deployment, and update the endpointslice since we are in proxy mode - if err := r.Informer.WatchDeployment(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)); err != nil { + if err := r.InformerManager.WatchDeployment(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)); err != nil { return fmt.Errorf("failed to add watch on resolver deployment: %w", err) } r.Logger.Info("4. Added watch on resolver deployment", zap.String("deployment", resolverDeploymentName)) @@ -92,14 +92,16 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl. } func (r *ElastiServiceReconciler) enableServeMode(ctx context.Context, req ctrl.Request, es *v1alpha1.ElastiService) error { + // TODO: Why are we stopping the watch on resolver deployment if a service moves to serve mode? + // Seems we are creating multiple informers for the resolver deployment when only one would suffice // Stop the watch on resolver deployment, since we are in serve mode - key := r.Informer.GetKey(informer.KeyParams{ + key := r.InformerManager.GetKey(informer.KeyParams{ Namespace: resolverNamespace, CRDName: req.Name, ResourceName: resolverDeploymentName, Resource: values.KindDeployments, }) - err := r.Informer.StopInformer(key) + err := r.InformerManager.StopInformer(key) if err != nil { r.Logger.Error("Failed to stop watch on resolver deployment", zap.String("deployment", resolverDeploymentName), zap.Error(err)) } diff --git a/operator/internal/elastiserver/elastiServer.go b/operator/internal/elastiserver/elastiServer.go index 5215851..55330b5 100644 --- a/operator/internal/elastiserver/elastiServer.go +++ b/operator/internal/elastiserver/elastiServer.go @@ -4,9 +4,12 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" + "os" + "os/signal" + "strings" "sync" + "syscall" "time" sentryhttp "github.com/getsentry/sentry-go/http" @@ -43,90 +46,117 @@ func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Dur // Get Ops client k8sUtil := k8shelper.NewOps(logger, config) return &Server{ - logger: logger.Named("elastiServer"), - k8shelper: k8sUtil, + logger: logger.Named("elastiServer"), + k8shelper: k8sUtil, + // rescaleDuration is the duration to wait before checking to rescaling the target rescaleDuration: rescaleDuration, } } // Start starts the ElastiServer and declares the endpoint and handlers for it -func (s *Server) Start(port string) { - defer func() { - if rec := recover(); s != nil { - s.logger.Error("ElastiServer is recovering from panic", zap.Any("error", rec)) - go s.Start(port) - } - }() - +func (s *Server) Start(port string) error { + mux := http.NewServeMux() sentryHandler := sentryhttp.New(sentryhttp.Options{}) - http.Handle("/metrics", sentryHandler.Handle(promhttp.Handler())) - http.Handle("/informer/incoming-request", sentryHandler.HandleFunc(s.resolverReqHandler)) + mux.Handle("/metrics", sentryHandler.Handle(promhttp.Handler())) + mux.Handle("/informer/incoming-request", sentryHandler.HandleFunc(s.resolverReqHandler)) server := &http.Server{ - Addr: port, + Addr: fmt.Sprintf(":%s", strings.TrimPrefix(port, ":")), + Handler: mux, ReadHeaderTimeout: 2 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, } + // Graceful shutdown handling + done := make(chan bool) + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-quit + s.logger.Info("Server is shutting down...") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + s.logger.Error("Could not gracefully shutdown the server", zap.Error(err)) + } + close(done) + }() + s.logger.Info("Starting ElastiServer", zap.String("port", port)) - if err := server.ListenAndServe(); err != nil { - s.logger.Fatal("Failed to start StartElastiServer", zap.Error(err)) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + s.logger.Error("Failed to start ElastiServer", zap.Error(err)) + return err } + + <-done + s.logger.Info("Server stopped") + return nil } func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) { defer func() { - if rec := recover(); rec != nil { - s.logger.Error("Recovered from panic", zap.Any("error", rec)) + if err := req.Body.Close(); err != nil { + s.logger.Error("Failed to close request body", zap.Error(err)) } }() - ctx := context.Background() + if req.Method != http.MethodPost { http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) return } var body messages.RequestCount if err := json.NewDecoder(req.Body).Decode(&body); err != nil { + s.logger.Error("Failed to decode request body", zap.Error(err)) http.Error(w, "Invalid request body", http.StatusBadRequest) return } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - s.logger.Error("Failed to close Body", zap.Error(err)) - } - }(req.Body) - s.logger.Info("-- Received request from Resolver", zap.Any("body", body)) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) + + s.logger.Info("Received request from Resolver", zap.Any("body", body)) + response := Response{ Message: "Request received successfully!", } jsonResponse, err := json.Marshal(response) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + s.logger.Error("Failed to marshal response", zap.Error(err)) + http.Error(w, "Internal server error", http.StatusInternalServerError) return } - _, err = w.Write(jsonResponse) - if err != nil { + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + if _, err = w.Write(jsonResponse); err != nil { s.logger.Error("Failed to write response", zap.Error(err)) return } - err = s.scaleTargetForService(ctx, body.Svc, body.Namespace) - if err != nil { - s.logger.Error("Failed to compare and scale target", zap.Error(err)) + + if err = s.scaleTargetForService(req.Context(), body.Svc, body.Namespace); err != nil { + s.logger.Error("Failed to scale target", + zap.Error(err), + zap.String("service", body.Svc), + zap.String("namespace", body.Namespace)) return } - s.logger.Info("-- Received fulfilled from Resolver", zap.Any("body", body)) + + s.logger.Info("Request fulfilled successfully", + zap.String("service", body.Svc), + zap.String("namespace", body.Namespace)) } func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error { scaleMutex, loaded := s.getMutexForServiceScale(serviceName) if loaded { + s.logger.Debug("Scale target lock already exists", zap.String("service", serviceName)) return nil } scaleMutex.Lock() - defer s.logger.Debug("Scale target lock released") - s.logger.Debug("Scale target lock taken") + defer s.logger.Debug("Scale target lock released", zap.String("service", serviceName)) + s.logger.Debug("Scale target lock taken", zap.String("service", serviceName)) crd, found := crddirectory.CRDDirectory.GetCRD(serviceName) if !found { s.releaseMutexForServiceScale(serviceName) @@ -141,6 +171,7 @@ func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace prom.TargetScaleCounter.WithLabelValues(serviceName, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc() // If the target is scaled up, we will hold the lock for longer, to not scale up again + // TODO: Is there a better way to do this and why is it even needed? time.AfterFunc(s.rescaleDuration, func() { s.releaseMutexForServiceScale(serviceName) }) diff --git a/operator/internal/informer/informer.go b/operator/internal/informer/informer.go index 95594d2..48609ad 100644 --- a/operator/internal/informer/informer.go +++ b/operator/internal/informer/informer.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "runtime" "strings" "sync" "time" @@ -46,6 +45,7 @@ type ( StopCh chan struct{} Req *RequestWatch } + // RequestWatch is the request body sent to the informer RequestWatch struct { Req ctrl.Request @@ -72,7 +72,7 @@ func NewInformerManager(logger *zap.Logger, kConfig *rest.Config) *Manager { dynamicClient: dynamicClient, logger: logger.Named("InformerManager"), // ResyncPeriod is the proactive resync we do, even when no events are received by the informer. - resyncPeriod: 0, + resyncPeriod: 5 * time.Minute, healthCheckDuration: 5 * time.Second, healthCheckStopChan: make(chan struct{}), } @@ -230,16 +230,7 @@ func (m *Manager) Add(req *RequestWatch) (err error) { } // enableInformer is to enable the informer for a resource -func (m *Manager) enableInformer(req *RequestWatch) (err error) { - defer func() { - if rErr := recover(); rErr != nil { - m.logger.Error("Recovered from panic", zap.Any("recovered", rErr)) - buf := make([]byte, 4096) - n := runtime.Stack(buf, false) - m.logger.Error("Panic stack trace", zap.ByteString("stacktrace", buf[:n])) - } - }() - +func (m *Manager) enableInformer(req *RequestWatch) error { ctx := context.Background() // Create an informer for the resource informer := cache.NewSharedInformer( @@ -256,10 +247,10 @@ func (m *Manager) enableInformer(req *RequestWatch) (err error) { }, }, &unstructured.Unstructured{}, - 0, + m.resyncPeriod, ) // We pass the handlers we received as a parameter - _, err = informer.AddEventHandler(req.Handlers) + _, err := informer.AddEventHandler(req.Handlers) if err != nil { m.logger.Error("Error creating informer handler", zap.Error(err)) return fmt.Errorf("enableInformer: %w", err)