Skip to content

Commit

Permalink
Merge pull request #2135 from sgayangi/prom-operator
Browse files Browse the repository at this point in the history
Update Prometheus functionality in adapter and common controller
  • Loading branch information
Krishanx92 authored Mar 25, 2024
2 parents dedaf2b + 64893bb commit 5097646
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 90 deletions.
4 changes: 2 additions & 2 deletions adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var defaultConfig = &Config{
Namespaces: nil,
},
Environment: "Default",
Metrics: metrics{
Metrics: Metrics{
Enabled: false,
Type: "prometheus",
Port: 18006,
Expand Down Expand Up @@ -192,7 +192,7 @@ var defaultConfig = &Config{
MaximumSize: 10000,
ExpiryTime: 15,
},
Metrics: metrics{
Metrics: Metrics{
Enabled: false,
Type: "azure",
},
Expand Down
14 changes: 7 additions & 7 deletions adapter/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type adapter struct {
// Environment of the Adapter
Environment string
// Metric represents configurations to expose/export go metrics
Metrics metrics
Metrics Metrics
}

// Envoy Listener Component related configurations.
Expand Down Expand Up @@ -159,7 +159,7 @@ type enforcer struct {
Management management
RestServer restServer
Filters []filter
Metrics metrics
Metrics Metrics
MandateSubscriptionValidation bool
Client httpClient
}
Expand Down Expand Up @@ -300,11 +300,11 @@ type tracing struct {
ConfigProperties map[string]string
}

type metrics struct {
Enabled bool
Type string
Port int32
CollectionInterval int32
// Metrics defines the configuration for metrics collection.
type Metrics struct {
Enabled bool
Type string
Port int32
}

type analyticsAdapter struct {
Expand Down
10 changes: 1 addition & 9 deletions adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package adapter

import (
"crypto/tls"
"strings"
"time"

discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand All @@ -34,7 +33,6 @@ import (
wso2_server "github.com/wso2/apk/adapter/pkg/discovery/protocol/server/v3"
"github.com/wso2/apk/adapter/pkg/health"
healthservice "github.com/wso2/apk/adapter/pkg/health/api/wso2/health/service"
metrics "github.com/wso2/apk/adapter/pkg/metrics"
"github.com/wso2/apk/adapter/pkg/utils/tlsutils"

"context"
Expand Down Expand Up @@ -157,12 +155,6 @@ func Run(conf *config.Config) {

logger.LoggerAPK.Info("Starting adapter ....")

// Start the metrics server
if conf.Adapter.Metrics.Enabled && strings.EqualFold(conf.Adapter.Metrics.Type, metrics.PrometheusMetricType) {
logger.LoggerAPK.Info("Starting Prometheus Metrics Server ....")
go metrics.StartPrometheusMetricsServer(conf.Adapter.Metrics.Port)
}

cache := xds.GetXdsCache()
enforcerCache := xds.GetEnforcerCache()
enforcerAPICache := xds.GetEnforcerAPICache()
Expand All @@ -188,7 +180,7 @@ func Run(conf *config.Config) {
// Set enforcer startup configs
xds.UpdateEnforcerConfig(conf)

go operator.InitOperator()
go operator.InitOperator(conf.Adapter.Metrics)

OUTER:
for {
Expand Down
25 changes: 20 additions & 5 deletions adapter/internal/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package operator

import (
"flag"
"fmt"
"strings"

"github.com/wso2/apk/adapter/config"
"github.com/wso2/apk/adapter/internal/loggers"

"github.com/wso2/apk/adapter/pkg/logging"
"github.com/wso2/apk/adapter/pkg/metrics"
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand Down Expand Up @@ -67,11 +70,9 @@ func init() {
}

// InitOperator starts the Kubernetes gateway operator
func InitOperator() {
var metricsAddr string
func InitOperator(metricsConfig config.Metrics) {
var enableLeaderElection bool
var probeAddr string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
Expand All @@ -85,7 +86,7 @@ func InitOperator() {

operatorDataStore := synchronizer.GetOperatorDataStore()

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
options := ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: probeAddr,
LeaderElection: true,
Expand All @@ -102,7 +103,21 @@ func InitOperator() {
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
}

if metricsConfig.Enabled {
options.Metrics.BindAddress = fmt.Sprintf(":%d", metricsConfig.Port)
// Register the metrics collector
if strings.EqualFold(metricsConfig.Type, metrics.PrometheusMetricType) {
loggers.LoggerAPKOperator.Info("Registering Prometheus metrics collector.")
metrics.RegisterPrometheusCollector()
}
} else {
options.Metrics.BindAddress = "0"
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)

if err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2600, logging.BLOCKER, "Unable to start manager: %v", err))
}
Expand Down
22 changes: 4 additions & 18 deletions adapter/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@
package metrics

import (
"fmt"
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
xds "github.com/wso2/apk/adapter/internal/discovery/xds"
logger "github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/pkg/logging"
commonmetrics "github.com/wso2/apk/common-go-libs/pkg/metrics"
k8smetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
Expand Down Expand Up @@ -93,18 +87,10 @@ func (collector *AdapterCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(collector.internalClusterCount, prometheus.GaugeValue, internalClusterCount)
}

// StartPrometheusMetricsServer initializes and starts the metrics server to expose metrics to prometheus.
func StartPrometheusMetricsServer(port int32) {
// RegisterPrometheusCollector registers the Prometheus collector for metrics.
func RegisterPrometheusCollector() {

collector := adapterMetricsCollector()
k8smetrics.Registry.MustRegister(collector)
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":"+strconv.Itoa(int(port)), nil)
if err != nil {
logger.LoggerAPK.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintln("Prometheus metrics server error:", err),
Severity: logging.MAJOR,
ErrorCode: 1110,
})
}
}
10 changes: 1 addition & 9 deletions common-controller/commoncontroller/common_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net"
"os"
"os/signal"
"strings"
"time"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/wso2/apk/common-controller/internal/server"
utils "github.com/wso2/apk/common-controller/internal/utils"
xds "github.com/wso2/apk/common-controller/internal/xds"
"github.com/wso2/apk/common-controller/pkg/metrics"
apkmgt "github.com/wso2/apk/common-go-libs/pkg/discovery/api/wso2/discovery/service/apkmgt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -206,12 +204,6 @@ func InitCommonControllerServer(conf *config.Config) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start the metrics server
if conf.CommonController.Metrics.Enabled && strings.EqualFold(conf.CommonController.Metrics.Type, metrics.PrometheusMetricType) {
loggers.LoggerAPKOperator.Info("Starting Prometheus Metrics Server ....")
go metrics.StartPrometheusMetricsServer(conf.CommonController.Metrics.Port)
}

loggers.LoggerAPKOperator.Info("Starting common controller ....")

rateLimiterCache := xds.GetRateLimiterCache()
Expand All @@ -225,7 +217,7 @@ func InitCommonControllerServer(conf *config.Config) {
// Start Enforcer xDS gRPC server
runCommonEnforcerServer(port)

go operator.InitOperator()
go operator.InitOperator(conf.CommonController.Metrics)

OUTER:
for {
Expand Down
2 changes: 1 addition & 1 deletion common-controller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/jackc/pgx/v5 v5.5.2
github.com/onsi/ginkgo/v2 v2.14.0
github.com/onsi/gomega v1.30.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sirupsen/logrus v1.9.0
k8s.io/apimachinery v0.29.2
Expand Down
2 changes: 1 addition & 1 deletion common-controller/internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var defaultConfig = &Config{
RetryInterval: 5,
Persistence: persistence{Type: "K8s"},
},
Metrics: metrics{
Metrics: Metrics{
Enabled: false,
Type: "prometheus",
Port: 18006,
Expand Down
12 changes: 6 additions & 6 deletions common-controller/internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type commoncontroller struct {
WebServer webServer
InternalAPIServer internalAPIServer
ControlPlane controlplane
Metrics metrics
Metrics Metrics
Database database
}
type controlplane struct {
Expand Down Expand Up @@ -105,11 +105,11 @@ type webServer struct {
Port int64
}

type metrics struct {
Enabled bool
Type string
Port int32
CollectionInterval int32
// Metrics defines the configuration for metrics collection.
type Metrics struct {
Enabled bool
Type string
Port int32
}

type database struct {
Expand Down
27 changes: 22 additions & 5 deletions common-controller/internal/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package operator

import (
"flag"
"fmt"
"os"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -33,6 +35,7 @@ import (
"github.com/wso2/apk/common-controller/internal/loggers"
cpcontrollers "github.com/wso2/apk/common-controller/internal/operator/controllers/cp"
dpcontrollers "github.com/wso2/apk/common-controller/internal/operator/controllers/dp"
"github.com/wso2/apk/common-controller/pkg/metrics"
cpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha2"
dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
Expand Down Expand Up @@ -64,12 +67,11 @@ func init() {
}

// InitOperator initializes the operator
func InitOperator() {
var metricsAddr string
// func InitOperator(prometheusPort int32, metricsEnabled bool) {
func InitOperator(metricsConfig config.Metrics) {
var enableLeaderElection bool
var probeAddr string
controlPlaneID := uuid.New().String()
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
Expand All @@ -83,7 +85,8 @@ func InitOperator() {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
ratelimitStore := cache.CreateNewOperatorDataStore()
subscriptionStore := cache.CreateNewSubscriptionDataStore()
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{

options := ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: probeAddr,
// LeaderElection: true,
Expand All @@ -99,7 +102,21 @@ func InitOperator() {
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
}

if metricsConfig.Enabled {
options.Metrics.BindAddress = fmt.Sprintf(":%d", metricsConfig.Port)

// Register the metrics collector
if strings.EqualFold(metricsConfig.Type, metrics.PrometheusMetricType) {
loggers.LoggerAPKOperator.Info("Registering Prometheus metrics collector.")
metrics.RegisterPrometheusCollector()
}
} else {
options.Metrics.BindAddress = "0"
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
if err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2600, logging.BLOCKER, "Unable to start manager: %v", err))
os.Exit(1)
Expand Down
24 changes: 4 additions & 20 deletions common-controller/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,13 @@
package metrics

import (
"fmt"
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/wso2/apk/adapter/pkg/logging"
logger "github.com/wso2/apk/common-controller/internal/loggers"
metrics "github.com/wso2/apk/common-go-libs/pkg/metrics"
k8smetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

// StartPrometheusMetricsServer initializes and starts the metrics server to expose metrics to prometheus.
func StartPrometheusMetricsServer(port int32) {
// RegisterPrometheusCollector registers the Prometheus collector for metrics.
func RegisterPrometheusCollector() {

collector := metrics.CustomMetricsCollector()
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":"+strconv.Itoa(int(port)), nil)
if err != nil {
logger.LoggerAPK.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintln("Prometheus metrics server error:", err),
Severity: logging.MAJOR,
ErrorCode: 1110,
})
}
k8smetrics.Registry.MustRegister(collector)
}
Loading

0 comments on commit 5097646

Please sign in to comment.