Skip to content

Commit

Permalink
added resync period and some refactoring (#53)
Browse files Browse the repository at this point in the history
* added resync period and some refactoring

* removed a broken import

* added some more logs and removed unnecessary panic handling

* lint fixes

* formatted
  • Loading branch information
shubhamrai1993 authored Dec 30, 2024
1 parent 7508b54 commit 4bbfd4a
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 73 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ docker push localhost:5000/elasti-resolver:v1alpha1
We will build and publish our Operator changes.

1. Go into the operator directory.
```bash
cd operator
```
2. Run the build and publish command.

```bash
Expand Down
2 changes: 1 addition & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ $(LOCALBIN):
## Tool Binaries
KUBECTL ?= kubectl
KUSTOMIZE ?= $(LOCALBIN)/kustomize-$(KUSTOMIZE_VERSION)
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION)
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION)
ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION)

Expand Down
29 changes: 22 additions & 7 deletions operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,16 @@ func mainWithError() error {

// Start the shared CRD Directory
crddirectory.INITDirectory(zapLogger)
// Initiate and start the shared Informer manager
Informer := informer.NewInformerManager(zapLogger, mgr.GetConfig())
Informer.Start()
// Initiate and start the shared informerManager manager
informerManager := informer.NewInformerManager(zapLogger, mgr.GetConfig())
informerManager.Start()

// Set up the ElastiService controller
if err = (&controller.ElastiServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(), Logger: zapLogger,
Informer: Informer,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: zapLogger,
InformerManager: informerManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ElastiService")
sentry.CaptureException(err)
Expand All @@ -186,7 +187,21 @@ func mainWithError() error {

// Start the elasti server
eServer := elastiserver.NewServer(zapLogger, mgr.GetConfig(), 30*time.Second)
go eServer.Start(elastiServerPort)
errChan := make(chan error, 1)
go func() {
if err := eServer.Start(elastiServerPort); err != nil {
setupLog.Error(err, "elasti server failed to start")
sentry.CaptureException(err)
errChan <- fmt.Errorf("elasti server: %w", err)
}
}()

// Add error channel check before manager start
select {
case err := <-errChan:
return fmt.Errorf("main: %w", err)
default:
}

//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type (
client.Client
Scheme *kRuntime.Scheme
Logger *zap.Logger
Informer *informer.Manager
InformerManager *informer.Manager
SwitchModeLocks sync.Map
InformerStartLocks sync.Map
ReconcileLocks sync.Map
Expand Down Expand Up @@ -64,15 +64,15 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
mutex.Lock()
defer r.Logger.Debug("- Out of Reconcile", zap.String("es", req.NamespacedName.String()))
defer mutex.Unlock()
start := time.Now()
startTime := time.Now()

defer func() {
e := values.Success
if err != nil {
e = err.Error()
sentry.CaptureException(err)
}
duration := time.Since(start).Seconds()
duration := time.Since(startTime).Seconds()
prom.CRDReconcileHistogram.WithLabelValues(req.String(), e).Observe(duration)
}()

Expand Down
10 changes: 5 additions & 5 deletions operator/internal/controller/opsCRD.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *ElastiServiceReconciler) finalizeCRD(ctx context.Context, es *v1alpha1.
// Stop all active informers related to this CRD in background
go func() {
defer wg.Done()
r.Informer.StopForCRD(req.Name)
r.InformerManager.StopForCRD(req.Name)
r.Logger.Info("[Done] Informer stopped for CRD", zap.String("es", req.String()))
// Reset the informer start mutex, so if the ElastiService is recreated, we will need to reset the informer
r.resetMutexForInformer(r.getMutexKeyForTargetRef(req))
Expand Down Expand Up @@ -134,13 +134,13 @@ func (r *ElastiServiceReconciler) watchScaleTargetRef(ctx context.Context, es *v
es.Spec.ScaleTargetRef.Kind != crd.Spec.ScaleTargetRef.Kind ||
es.Spec.ScaleTargetRef.APIVersion != crd.Spec.ScaleTargetRef.APIVersion {
r.Logger.Debug("ScaleTargetRef has changed, stopping previous informer.", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef))
key := r.Informer.GetKey(informer.KeyParams{
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: req.Namespace,
CRDName: req.Name,
ResourceName: crd.Spec.ScaleTargetRef.Name,
Resource: strings.ToLower(crd.Spec.ScaleTargetRef.Kind),
})
err := r.Informer.StopInformer(key)
err := r.InformerManager.StopInformer(key)
if err != nil {
r.Logger.Error("Failed to stop informer for old scaleTargetRef", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef), zap.Error(err))
}
Expand All @@ -156,7 +156,7 @@ func (r *ElastiServiceReconciler) watchScaleTargetRef(ctx context.Context, es *v
informerErr = fmt.Errorf("failed to parse API version: %w", err)
return
}
if err := r.Informer.Add(&informer.RequestWatch{
if err := r.InformerManager.Add(&informer.RequestWatch{
Req: req,
ResourceName: es.Spec.ScaleTargetRef.Name,
ResourceNamespace: req.Namespace,
Expand Down Expand Up @@ -184,7 +184,7 @@ func (r *ElastiServiceReconciler) watchPublicService(ctx context.Context, es *v1
}
var informerErr error
r.getMutexForInformerStart(r.getMutexKeyForPublicSVC(req)).Do(func() {
if err := r.Informer.Add(&informer.RequestWatch{
if err := r.InformerManager.Add(&informer.RequestWatch{
Req: req,
ResourceName: es.Spec.Service,
ResourceNamespace: es.Namespace,
Expand Down
6 changes: 3 additions & 3 deletions operator/internal/controller/opsInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *ElastiServiceReconciler) getMutexKeyForTargetRef(req ctrl.Request) stri
}

func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
key := r.Informer.GetKey(informer.KeyParams{
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: resolverNamespace,
CRDName: req.Name,
ResourceName: resolverDeploymentName,
Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context,
}

func (r *ElastiServiceReconciler) getPublicServiceChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
key := r.Informer.GetKey(informer.KeyParams{
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: resolverNamespace,
CRDName: req.Name,
ResourceName: es.Spec.Service,
Expand Down Expand Up @@ -123,7 +123,7 @@ func (r *ElastiServiceReconciler) getPublicServiceChangeHandler(ctx context.Cont
}

func (r *ElastiServiceReconciler) getScaleTargetRefChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
key := r.Informer.GetKey(informer.KeyParams{
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: req.Namespace,
CRDName: req.Name,
ResourceName: es.Spec.ScaleTargetRef.Kind,
Expand Down
8 changes: 5 additions & 3 deletions operator/internal/controller/opsModes.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl.
r.Logger.Info("3. Created or updated endpointslice to resolver", zap.String("service", targetSVC.Name))

// Watch for changes in resolver deployment, and update the endpointslice since we are in proxy mode
if err := r.Informer.WatchDeployment(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)); err != nil {
if err := r.InformerManager.WatchDeployment(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)); err != nil {
return fmt.Errorf("failed to add watch on resolver deployment: %w", err)
}
r.Logger.Info("4. Added watch on resolver deployment", zap.String("deployment", resolverDeploymentName))
Expand All @@ -92,14 +92,16 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl.
}

func (r *ElastiServiceReconciler) enableServeMode(ctx context.Context, req ctrl.Request, es *v1alpha1.ElastiService) error {
// TODO: Why are we stopping the watch on resolver deployment if a service moves to serve mode?
// Seems we are creating multiple informers for the resolver deployment when only one would suffice
// Stop the watch on resolver deployment, since we are in serve mode
key := r.Informer.GetKey(informer.KeyParams{
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: resolverNamespace,
CRDName: req.Name,
ResourceName: resolverDeploymentName,
Resource: values.KindDeployments,
})
err := r.Informer.StopInformer(key)
err := r.InformerManager.StopInformer(key)
if err != nil {
r.Logger.Error("Failed to stop watch on resolver deployment", zap.String("deployment", resolverDeploymentName), zap.Error(err))
}
Expand Down
105 changes: 68 additions & 37 deletions operator/internal/elastiserver/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

sentryhttp "github.com/getsentry/sentry-go/http"
Expand Down Expand Up @@ -43,90 +46,117 @@ func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Dur
// Get Ops client
k8sUtil := k8shelper.NewOps(logger, config)
return &Server{
logger: logger.Named("elastiServer"),
k8shelper: k8sUtil,
logger: logger.Named("elastiServer"),
k8shelper: k8sUtil,
// rescaleDuration is the duration to wait before checking to rescaling the target
rescaleDuration: rescaleDuration,
}
}

// Start starts the ElastiServer and declares the endpoint and handlers for it
func (s *Server) Start(port string) {
defer func() {
if rec := recover(); s != nil {
s.logger.Error("ElastiServer is recovering from panic", zap.Any("error", rec))
go s.Start(port)
}
}()

func (s *Server) Start(port string) error {
mux := http.NewServeMux()
sentryHandler := sentryhttp.New(sentryhttp.Options{})
http.Handle("/metrics", sentryHandler.Handle(promhttp.Handler()))
http.Handle("/informer/incoming-request", sentryHandler.HandleFunc(s.resolverReqHandler))
mux.Handle("/metrics", sentryHandler.Handle(promhttp.Handler()))
mux.Handle("/informer/incoming-request", sentryHandler.HandleFunc(s.resolverReqHandler))

server := &http.Server{
Addr: port,
Addr: fmt.Sprintf(":%s", strings.TrimPrefix(port, ":")),
Handler: mux,
ReadHeaderTimeout: 2 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}

// Graceful shutdown handling
done := make(chan bool)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-quit
s.logger.Info("Server is shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := server.Shutdown(ctx); err != nil {
s.logger.Error("Could not gracefully shutdown the server", zap.Error(err))
}
close(done)
}()

s.logger.Info("Starting ElastiServer", zap.String("port", port))
if err := server.ListenAndServe(); err != nil {
s.logger.Fatal("Failed to start StartElastiServer", zap.Error(err))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
s.logger.Error("Failed to start ElastiServer", zap.Error(err))
return err
}

<-done
s.logger.Info("Server stopped")
return nil
}

func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) {
defer func() {
if rec := recover(); rec != nil {
s.logger.Error("Recovered from panic", zap.Any("error", rec))
if err := req.Body.Close(); err != nil {
s.logger.Error("Failed to close request body", zap.Error(err))
}
}()
ctx := context.Background()

if req.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
var body messages.RequestCount
if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
s.logger.Error("Failed to decode request body", zap.Error(err))
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
s.logger.Error("Failed to close Body", zap.Error(err))
}
}(req.Body)
s.logger.Info("-- Received request from Resolver", zap.Any("body", body))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

s.logger.Info("Received request from Resolver", zap.Any("body", body))

response := Response{
Message: "Request received successfully!",
}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
s.logger.Error("Failed to marshal response", zap.Error(err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
_, err = w.Write(jsonResponse)
if err != nil {

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

if _, err = w.Write(jsonResponse); err != nil {
s.logger.Error("Failed to write response", zap.Error(err))
return
}
err = s.scaleTargetForService(ctx, body.Svc, body.Namespace)
if err != nil {
s.logger.Error("Failed to compare and scale target", zap.Error(err))

if err = s.scaleTargetForService(req.Context(), body.Svc, body.Namespace); err != nil {
s.logger.Error("Failed to scale target",
zap.Error(err),
zap.String("service", body.Svc),
zap.String("namespace", body.Namespace))
return
}
s.logger.Info("-- Received fulfilled from Resolver", zap.Any("body", body))

s.logger.Info("Request fulfilled successfully",
zap.String("service", body.Svc),
zap.String("namespace", body.Namespace))
}

func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error {
scaleMutex, loaded := s.getMutexForServiceScale(serviceName)
if loaded {
s.logger.Debug("Scale target lock already exists", zap.String("service", serviceName))
return nil
}
scaleMutex.Lock()
defer s.logger.Debug("Scale target lock released")
s.logger.Debug("Scale target lock taken")
defer s.logger.Debug("Scale target lock released", zap.String("service", serviceName))
s.logger.Debug("Scale target lock taken", zap.String("service", serviceName))
crd, found := crddirectory.CRDDirectory.GetCRD(serviceName)
if !found {
s.releaseMutexForServiceScale(serviceName)
Expand All @@ -141,6 +171,7 @@ func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace
prom.TargetScaleCounter.WithLabelValues(serviceName, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc()

// If the target is scaled up, we will hold the lock for longer, to not scale up again
// TODO: Is there a better way to do this and why is it even needed?
time.AfterFunc(s.rescaleDuration, func() {
s.releaseMutexForServiceScale(serviceName)
})
Expand Down
Loading

0 comments on commit 4bbfd4a

Please sign in to comment.