Skip to content

Commit

Permalink
add DebounceReconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Sep 19, 2024
1 parent 4f8b1ef commit 72c4cf1
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 11 deletions.
12 changes: 12 additions & 0 deletions cmd/agent/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"net/http"
"os"
"strings"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -85,10 +87,12 @@ func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clie
}

func registerKubeReconcilersOrDie(
ctx context.Context,
manager ctrl.Manager,
consoleManager *consolectrl.ControllerManager,
config *rest.Config,
extConsoleClient consoleclient.Client,
discoveryClient discovery.DiscoveryInterface,
) {
rolloutsClient, dynamicClient, kubeClient := initKubeClientsOrDie(config)

Expand Down Expand Up @@ -209,4 +213,12 @@ func registerKubeReconcilersOrDie(
setupLog.Error(err, "unable to create controller", "controller", "Group")
os.Exit(1)
}

if err := (&controller.MetricsAggregateReconciler{
Client: manager.GetClient(),
Scheme: manager.GetScheme(),
DiscoveryClient: discoveryClient,
}).SetupWithManager(ctx, manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MetricsAggregate")
}
}
4 changes: 3 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
constraintstatusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"

deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/cmd/agent/args"
Expand Down Expand Up @@ -39,6 +40,7 @@ func init() {
utilruntime.Must(constraintstatusv1beta1.AddToScheme(scheme))
utilruntime.Must(templatesv1.AddToScheme(scheme))
utilruntime.Must(rolloutv1alpha1.AddToScheme(scheme))
utilruntime.Must(metricsv1beta1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand All @@ -57,7 +59,7 @@ func main() {
consoleManager := initConsoleManagerOrDie()

registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), extConsoleClient)
registerKubeReconcilersOrDie(kubeManager, consoleManager, config, extConsoleClient)
registerKubeReconcilersOrDie(ctx, kubeManager, consoleManager, config, extConsoleClient, discoveryClient)

//+kubebuilder:scaffold:builder

Expand Down
2 changes: 1 addition & 1 deletion config/samples/metricsAggregate.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: pipelines.plural.sh/v1alpha1
apiVersion: deployments.plural.sh/v1alpha1
kind: MetricsAggregate
metadata:
labels:
Expand Down
77 changes: 77 additions & 0 deletions internal/controller/debounce_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package controller

import (
"context"
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// DebounceReconciler is a Reconciler that debounces reconcile requests.
type DebounceReconciler struct {
client.Client
// Minimum time to wait before processing requests.
debounceDuration time.Duration
// Last request time.
lastRequest time.Time
// Channel to trigger reconciliations.
reconcileChan chan reconcile.Request
// The actual reconciler that processes the requests.
actualReconciler reconcile.Reconciler
}

// NewDebounceReconciler creates a new DebounceReconciler.
func NewDebounceReconciler(client client.Client, duration time.Duration, actual reconcile.Reconciler) *DebounceReconciler {
return &DebounceReconciler{
Client: client,
debounceDuration: duration,
reconcileChan: make(chan reconcile.Request, 1),
actualReconciler: actual,
}
}

// Reconcile implements the reconcile.Reconciler interface.
func (r *DebounceReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
select {
case r.reconcileChan <- req:
default:
// Channel is full, drop the request to avoid spamming.
}
return reconcile.Result{}, nil
}

// Start begins the debouncing mechanism.
func (r *DebounceReconciler) Start(ctx context.Context) {
go func() {
ticker := time.NewTicker(r.debounceDuration)
defer ticker.Stop()

var latestRequest reconcile.Request

for {
select {
case <-ctx.Done():
return
case req := <-r.reconcileChan:
latestRequest = req
r.lastRequest = time.Now()
case <-ticker.C:
// Check if enough time has passed since the last request.
if time.Since(r.lastRequest) >= r.debounceDuration {
// Process the debounced request.
if err := r.processRequest(ctx, latestRequest); err != nil {
fmt.Printf("Error processing request: %v\n", err)
}
}
}
}
}()
}

// processRequest performs the actual reconciliation.
func (r *DebounceReconciler) processRequest(ctx context.Context, req reconcile.Request) error {
_, err := r.actualReconciler.Reconcile(ctx, req)
return err
}
58 changes: 50 additions & 8 deletions internal/controller/metricsaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,49 @@ package controller
import (
"context"
"fmt"

"github.com/pluralsh/deployment-operator/internal/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"

"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/internal/utils"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
k8sClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const debounceDuration = time.Second * 30

var supportedMetricsAPIVersions = []string{
"v1beta1",
}

// MetricsAggregateReconciler reconciles a MetricsAggregate resource.
type MetricsAggregateReconciler struct {
k8sClient.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
DiscoveryClient discovery.DiscoveryInterface
}

// Reconcile IngressReplica ensure that stays in sync with Kubernetes cluster.
func (r *MetricsAggregateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ reconcile.Result, reterr error) {
logger := log.FromContext(ctx)

apiGroups, err := r.DiscoveryClient.ServerGroups()
if err != nil {
return reconcile.Result{}, err
}
metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups)
if !metricsAPIAvailable {
logger.V(5).Info("metrics api not available")
return requeue(time.Minute*5, jitter), nil
}

// Read resource from Kubernetes cluster.
metrics := &v1alpha1.MetricsAggregate{}
if err := r.Get(ctx, req.NamespacedName, metrics); err != nil {
Expand Down Expand Up @@ -56,8 +75,11 @@ func (r *MetricsAggregateReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

nodeList := &corev1.NodeList{}
availableResources := make(map[string]corev1.ResourceList)
if err := r.List(ctx, nodeList); err != nil {
return reconcile.Result{}, err
}

availableResources := make(map[string]corev1.ResourceList)
for _, n := range nodeList.Items {
availableResources[n.Name] = n.Status.Allocatable
}
Expand All @@ -78,6 +100,8 @@ func (r *MetricsAggregateReconciler) Reconcile(ctx context.Context, req ctrl.Req
if err != nil {
return reconcile.Result{}, err
}

// save metrics
metrics.Spec.Nodes = len(nodeList.Items)
for _, nm := range nodeMetrics {
metrics.Spec.CPUAvailableMillicores += nm.CPUAvailableMillicores
Expand All @@ -91,14 +115,16 @@ func (r *MetricsAggregateReconciler) Reconcile(ctx context.Context, req ctrl.Req
fraction = float64(metrics.Spec.MemoryTotalBytes) / float64(metrics.Spec.MemoryAvailableBytes) * 100
metrics.Spec.MemoryUsedPercentage = int64(fraction)

return requeue(requeueAfter, jitter), reterr
return ctrl.Result{}, reterr
}

// SetupWithManager sets up the controller with the Manager.
func (r *MetricsAggregateReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *MetricsAggregateReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
debounceReconciler := NewDebounceReconciler(mgr.GetClient(), debounceDuration, r)
debounceReconciler.Start(ctx)
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.MetricsAggregate{}).
Complete(r)
Complete(debounceReconciler)
}

type ResourceMetricsInfo struct {
Expand Down Expand Up @@ -141,3 +167,19 @@ func ConvertNodeMetrics(metrics []v1beta1.NodeMetrics, availableResources map[st

return nodeMetrics, nil
}

func SupportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) bool {
for _, discoveredAPIGroup := range discoveredAPIGroups.Groups {
if discoveredAPIGroup.Name != metricsapi.GroupName {
continue
}
for _, version := range discoveredAPIGroup.Versions {
for _, supportedVersion := range supportedMetricsAPIVersions {
if version.Version == supportedVersion {
return true
}
}
}
}
return false
}
2 changes: 1 addition & 1 deletion pkg/manifests/template/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var _ = Describe("Helm template", func() {
It("should successfully render the helm template", func() {
resp, err := NewHelm(dir).Render(svc, utilFactory)
Expect(err).NotTo(HaveOccurred())
Expect(len(resp)).To(Equal(12))
Expect(len(resp)).To(Equal(13))
})
})

Expand Down

0 comments on commit 72c4cf1

Please sign in to comment.