diff --git a/cmd/sync/manager/manager.go b/cmd/sync/manager/manager.go index 94cdfe26..abaf61e7 100644 --- a/cmd/sync/manager/manager.go +++ b/cmd/sync/manager/manager.go @@ -19,7 +19,7 @@ import ( registryv1 "github.com/adobe/cluster-registry/pkg/api/registry/v1" registryv1alpha1 "github.com/adobe/cluster-registry/pkg/api/registry/v1alpha1" "github.com/adobe/cluster-registry/pkg/config" - monitoring "github.com/adobe/cluster-registry/pkg/monitoring/client" + monitoring "github.com/adobe/cluster-registry/pkg/monitoring/manager" "github.com/adobe/cluster-registry/pkg/sqs" "github.com/adobe/cluster-registry/pkg/sync/manager" "github.com/adobe/cluster-registry/pkg/sync/parser" @@ -161,6 +161,7 @@ func main() { WatchedGVKs: loadWatchedGVKs(syncConfig), Queue: q, ResourceParser: rp, + Metrics: m, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "SyncController") os.Exit(1) diff --git a/pkg/monitoring/manager/metrics.go b/pkg/monitoring/manager/metrics.go new file mode 100644 index 00000000..c87bd80e --- /dev/null +++ b/pkg/monitoring/manager/metrics.go @@ -0,0 +1,119 @@ +/* +Copyright 2024 Adobe. All rights reserved. +This file is licensed to you under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. You may obtain a copy +of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS +OF ANY KIND, either express or implied. See the License for the specific language +governing permissions and limitations under the License. +*/ + +package monitoring + +import "github.com/prometheus/client_golang/prometheus" +import "github.com/prometheus/client_golang/prometheus/promauto" + +type MetricsI interface { + RecordRequeueCnt(target string) + RecordReconciliationCnt(target string) + RecordReconciliationDur(target string, elapsed float64) + RecordEnqueueCnt(target string) + RecordEnqueueDur(target string, elapsed float64) + RecordErrorCnt(target string) +} + +type Metrics struct { + RequeueCnt *prometheus.CounterVec + ReconciliationCnt *prometheus.CounterVec + ReconciliationDur *prometheus.HistogramVec + EnqueueCnt *prometheus.CounterVec + EnqueueDur *prometheus.HistogramVec + ErrCnt *prometheus.CounterVec + metrics []prometheus.Collector +} + +func NewMetrics() *Metrics { + return &Metrics{} +} + +func (m *Metrics) Init(isUnitTest bool) { + reg := prometheus.DefaultRegisterer + if isUnitTest { + reg = prometheus.NewRegistry() + } + var requeueCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cluster_registry_sync_manager_requeues_total", + Help: "The total number of controller-manager requeues partitioned by target.", + }, []string{"target"}) + m.RequeueCnt = requeueCnt.(*prometheus.CounterVec) + m.metrics = append(m.metrics, m.RequeueCnt) + + var reconciliationCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cluster_registry_sync_manager_reconciliation_total", + Help: "How many reconciliations occurred, partitioned by target.", + }, + []string{"target"}, + ) + m.ReconciliationCnt = reconciliationCnt.(*prometheus.CounterVec) + m.metrics = append(m.metrics, m.ReconciliationCnt) + + var reconciliationDur prometheus.Collector = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cluster_registry_sync_manager_reconciliation_duration_seconds", + Help: "The time taken to reconcile resources in seconds partitioned by target.", + }, + []string{"target"}, + ) + m.ReconciliationDur = reconciliationDur.(*prometheus.HistogramVec) + m.metrics = append(m.metrics, m.ReconciliationDur) + + var enqueueCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cluster_registry_sync_manager_enqueue_total", + Help: "How many reconciliations were enqueued, partitioned by target.", + }, + []string{"target"}, + ) + m.EnqueueCnt = enqueueCnt.(*prometheus.CounterVec) + m.metrics = append(m.metrics, m.EnqueueCnt) + + var enqueueDur prometheus.Collector = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cluster_registry_sync_manager_enqueue_duration_seconds", + Help: "The time taken to enqueue a reconciliation in seconds partitioned by target.", + }, + []string{"target"}, + ) + m.EnqueueDur = enqueueDur.(*prometheus.HistogramVec) + m.metrics = append(m.metrics, m.EnqueueDur) + + var errorCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cluster_registry_sync_manager_error_total", + Help: "The total number controller-manager errors partitioned by target.", + }, []string{"target"}) + m.ErrCnt = errorCnt.(*prometheus.CounterVec) + m.metrics = append(m.metrics, m.ErrCnt) +} + +func (m *Metrics) RecordRequeueCnt(target string) { + m.RequeueCnt.WithLabelValues(target).Inc() +} + +func (m *Metrics) RecordReconciliationCnt(target string) { + m.ReconciliationCnt.WithLabelValues(target).Inc() +} + +func (m *Metrics) RecordReconciliationDur(target string, elapsed float64) { + m.ReconciliationDur.WithLabelValues(target).Observe(elapsed) +} + +func (m *Metrics) RecordEnqueueCnt(target string) { + m.EnqueueCnt.WithLabelValues(target).Inc() +} + +func (m *Metrics) RecordEnqueueDur(target string, elapsed float64) { + m.EnqueueDur.WithLabelValues(target).Observe(elapsed) +} + +func (m *Metrics) RecordErrorCnt(target string) { + m.ErrCnt.WithLabelValues(target).Inc() +} diff --git a/pkg/monitoring/manager/metrics_test.go b/pkg/monitoring/manager/metrics_test.go new file mode 100644 index 00000000..05268bc6 --- /dev/null +++ b/pkg/monitoring/manager/metrics_test.go @@ -0,0 +1,145 @@ +/* +Copyright 2024 Adobe. All rights reserved. +This file is licensed to you under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. You may obtain a copy +of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS +OF ANY KIND, either express or implied. See the License for the specific language +governing permissions and limitations under the License. +*/ + +package monitoring + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "math/rand" + "strings" + "testing" +) + +const ( + clusterSyncTarget = "orgnumber-env-region-cluster-sync" + subsystem = "cluster_registry_sync_manager" + minRand = 1 + maxRand = 2.5 +) + +// Generate a random float number between min and max +func generateFloatRand(min, max float64) float64 { + return min + rand.Float64()*(max-min) +} + +// Generate what we expect a histogram of some random number to look like. metricTopic is what the metric is about, e.g. +// reconciliation or enqueue. helpString is the literal help string from metrics.go. I'd grab this myself, but it's not +// exposed in the HistogramVec object AFAICT :( +func generateExpectedHistogram(randomFloat float64, metricTopic string, helpString string) string { + expected := fmt.Sprintf(` + # HELP %[1]s_%[5]s_duration_seconds %[4]s + # TYPE %[1]s_%[5]s_duration_seconds histogram + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.005"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.01"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.025"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.05"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.1"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.25"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.5"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="1"} 0 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="2.5"} 1 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="5"} 1 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="10"} 1 + %[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="+Inf"} 1 + %[1]s_%[5]s_duration_seconds_sum{target="%[2]s"} %[3]s + %[1]s_%[5]s_duration_seconds_count{target="%[2]s"} 1 + `, subsystem, clusterSyncTarget, fmt.Sprintf("%.16f", randomFloat), helpString, metricTopic) + return expected +} + +func TestNewMetrics(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + test.NotNil(m) +} + +func TestInit(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + m.Init(true) + test.NotNil(m.RequeueCnt) + test.NotNil(m.ReconciliationCnt) + test.NotNil(m.ReconciliationDur) + test.NotNil(m.EnqueueCnt) + test.NotNil(m.EnqueueDur) + test.NotNil(m.ErrCnt) +} + +func TestRecordRequeueCnt(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + m.Init(true) + m.RecordRequeueCnt(clusterSyncTarget) + test.Equal(1, testutil.CollectAndCount(*m.RequeueCnt)) + test.Equal(float64(1), testutil.ToFloat64((*m.RequeueCnt).WithLabelValues(clusterSyncTarget))) +} + +func TestRecordReconciliationCnt(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + m.Init(true) + m.RecordReconciliationCnt(clusterSyncTarget) + test.Equal(1, testutil.CollectAndCount(*m.ReconciliationCnt)) + test.Equal(float64(1), testutil.ToFloat64((*m.ReconciliationCnt).WithLabelValues(clusterSyncTarget))) +} + +func TestRecordReconciliationDur(t *testing.T) { + m := NewMetrics() + m.Init(true) + randomFloat := generateFloatRand(minRand, maxRand) + m.RecordReconciliationDur(clusterSyncTarget, randomFloat) + expected := generateExpectedHistogram(randomFloat, "reconciliation", "The time taken to reconcile resources in seconds partitioned by target.") + if err := testutil.CollectAndCompare( + *m.ReconciliationDur, + strings.NewReader(expected), + fmt.Sprintf("%s_%s_duration_seconds", subsystem, "reconciliation")); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } + +} + +func TestRecordEnqueueCnt(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + m.Init(true) + m.RecordEnqueueCnt(clusterSyncTarget) + test.Equal(1, testutil.CollectAndCount(*m.EnqueueCnt)) + test.Equal(float64(1), testutil.ToFloat64((*m.EnqueueCnt).WithLabelValues(clusterSyncTarget))) + +} + +func TestRecordEnqueueDur(t *testing.T) { + m := NewMetrics() + m.Init(true) + randomFloat := generateFloatRand(minRand, maxRand) + m.RecordEnqueueDur(clusterSyncTarget, randomFloat) + expected := generateExpectedHistogram(randomFloat, "enqueue", "The time taken to enqueue a reconciliation in seconds partitioned by target.") + if err := testutil.CollectAndCompare( + *m.EnqueueDur, + strings.NewReader(expected), + fmt.Sprintf("%s_%s_duration_seconds", subsystem, "enqueue")); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } + +} + +func TestRecordErrCnt(t *testing.T) { + test := assert.New(t) + m := NewMetrics() + m.Init(true) + m.RecordErrorCnt(clusterSyncTarget) + test.Equal(1, testutil.CollectAndCount(*m.ErrCnt)) + test.Equal(float64(1), testutil.ToFloat64((*m.ErrCnt).WithLabelValues(clusterSyncTarget))) + +} diff --git a/pkg/sync/manager/controller.go b/pkg/sync/manager/controller.go index 41cfb5e9..bcdec860 100644 --- a/pkg/sync/manager/controller.go +++ b/pkg/sync/manager/controller.go @@ -16,6 +16,7 @@ import ( "context" v1 "github.com/adobe/cluster-registry/pkg/api/registry/v1" registryv1alpha1 "github.com/adobe/cluster-registry/pkg/api/registry/v1alpha1" + monitoring "github.com/adobe/cluster-registry/pkg/monitoring/manager" "github.com/adobe/cluster-registry/pkg/sqs" "github.com/adobe/cluster-registry/pkg/sync/parser" "github.com/aws/aws-sdk-go/aws" @@ -54,6 +55,7 @@ type SyncController struct { WatchedGVKs []schema.GroupVersionKind Queue *sqs.Config ResourceParser *parser.ResourceParser + Metrics monitoring.MetricsI } func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -61,12 +63,18 @@ func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) log.Info("start") - defer log.Info("end", "duration", time.Since(start)) + defer func() { + elapsed := time.Since(start) + log.Info("end", "duration", elapsed) + c.Metrics.RecordReconciliationDur(req.Name, float64(elapsed)/float64(time.Second)) + c.Metrics.RecordReconciliationCnt(req.Name) + }() instance := new(registryv1alpha1.ClusterSync) if err := c.Get(ctx, req.NamespacedName, instance); err != nil { + c.Metrics.RecordErrorCnt(req.Name) log.Error(err, "unable to fetch object") - return requeueIfError(client.IgnoreNotFound(err)) + return requeueIfError(c, req, client.IgnoreNotFound(err)) } if instance.ObjectMeta.DeletionTimestamp != nil { @@ -100,15 +108,17 @@ func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. instance.Status.LastSyncError = ptr.To(errList[0].Error()) instance.Status.LastSyncTime = &metav1.Time{Time: time.Now()} log.Error(errList[0], "failed to sync resources") + c.Metrics.RecordErrorCnt(req.Name) if err := c.updateStatus(ctx, instance); err != nil { - return requeueAfter(10*time.Second, err) + return requeueAfter(c, req, 10*time.Second, err) } return noRequeue() } syncedData, err := c.ResourceParser.Diff() if err != nil { + c.Metrics.RecordErrorCnt(req.Name) return noRequeue() } @@ -119,13 +129,15 @@ func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. instance.Status.LastSyncTime = &metav1.Time{Time: time.Now()} if err := c.enqueueData(instance); err != nil { log.Error(err, "failed to enqueue message") + c.Metrics.RecordErrorCnt(req.Name) if err := c.updateStatus(ctx, instance); err != nil { - return requeueAfter(10*time.Second, err) + return requeueAfter(c, req, 10*time.Second, err) } return noRequeue() } if err := c.updateStatus(ctx, instance); err != nil { - return requeueAfter(10*time.Second, err) + c.Metrics.RecordErrorCnt(req.Name) + return requeueAfter(c, req, 10*time.Second, err) } return noRequeue() } @@ -259,6 +271,7 @@ func (c *SyncController) enqueueRequestsFromMapFunc(gvk schema.GroupVersionKind) break } } + // TODO consider adding error handling/metrics if we don't find our object return requests } @@ -294,6 +307,8 @@ func (c *SyncController) enqueueData(instance *registryv1alpha1.ClusterSync) err }) elapsed := float64(time.Since(start)) / float64(time.Second) c.Log.Info("Enqueue time", "time", elapsed) + c.Metrics.RecordEnqueueDur(instance.Name, elapsed) + c.Metrics.RecordEnqueueCnt(instance.Name) if err != nil { return err diff --git a/pkg/sync/manager/result.go b/pkg/sync/manager/result.go index 0fad9c47..35c7c733 100644 --- a/pkg/sync/manager/result.go +++ b/pkg/sync/manager/result.go @@ -17,7 +17,10 @@ import ( "time" ) -func requeueIfError(err error) (ctrl.Result, error) { +func requeueIfError(c *SyncController, req ctrl.Request, err error) (ctrl.Result, error) { + if err != nil { + c.Metrics.RecordRequeueCnt(req.Name) + } return ctrl.Result{}, err } @@ -25,6 +28,7 @@ func noRequeue() (ctrl.Result, error) { return ctrl.Result{}, nil } -func requeueAfter(interval time.Duration, err error) (ctrl.Result, error) { +func requeueAfter(c *SyncController, req ctrl.Request, interval time.Duration, err error) (ctrl.Result, error) { + c.Metrics.RecordRequeueCnt(req.Name) return ctrl.Result{RequeueAfter: interval}, err }