diff --git a/receiver/singletonreceivercreator/factory.go b/receiver/singletonreceivercreator/factory.go index 686bd8ba..3a5ebe2a 100644 --- a/receiver/singletonreceivercreator/factory.go +++ b/receiver/singletonreceivercreator/factory.go @@ -13,6 +13,10 @@ import ( "github.com/kyma-project/opentelemetry-collector-components/receiver/singletonreceivercreator/internal/metadata" ) +const ( + leaseHolderIDEnvVar = "LEASE_HOLDER_ID" +) + func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, @@ -45,9 +49,16 @@ func createMetricsReceiver( if !ok { return nil, errors.New("invalid configuration") } - hostname, err := os.Hostname() - if err != nil { - return nil, err + + // Set leaseHolderID for local development + leaseHolderID := os.Getenv(leaseHolderIDEnvVar) + if leaseHolderID == "" { + // If running in a Pod, the hostname will be the pod name + var err error + leaseHolderID, err = os.Hostname() + if err != nil { + return nil, err + } } telemetryBuilder, err := metadata.NewTelemetryBuilder(params.TelemetrySettings) @@ -60,6 +71,6 @@ func createMetricsReceiver( cfg, consumer, telemetryBuilder, - hostname, + leaseHolderID, ), nil } diff --git a/receiver/singletonreceivercreator/leader_elector.go b/receiver/singletonreceivercreator/leader_elector.go index 1e946bf1..a09d9ec9 100644 --- a/receiver/singletonreceivercreator/leader_elector.go +++ b/receiver/singletonreceivercreator/leader_elector.go @@ -20,7 +20,7 @@ const ( leaseAttrKey = "lease" ) -// newLeaderElector return a leader elector object using client-go +// newLeaderElector returns a leader elector object using client-go func newLeaderElector( cfg leaderElectionConfig, client kubernetes.Interface, @@ -29,7 +29,6 @@ func newLeaderElector( onStoppedLeading func(), identity string, ) (*leaderelection.LeaderElector, error) { - resourceLock, err := resourcelock.New( resourcelock.LeasesResourceLock, cfg.leaseNamespace, @@ -44,16 +43,21 @@ func newLeaderElector( } leConfig := leaderelection.LeaderElectionConfig{ - Lock: resourceLock, - LeaseDuration: cfg.leaseDuration, - RenewDeadline: cfg.renewDuration, - RetryPeriod: cfg.retryPeriod, + // The lock resource name is used as a lease label in leader election metrics. + Name: cfg.leaseName, + Lock: resourceLock, + LeaseDuration: cfg.leaseDuration, + RenewDeadline: cfg.renewDuration, + RetryPeriod: cfg.retryPeriod, + ReleaseOnCancel: true, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: onStartedLeading, OnStoppedLeading: onStoppedLeading, }, } + // TODO: Contribute to the leaderelection package to support configuring a metric provider directly, + // eliminating the need for global variables. leaderelection.SetProvider(leaderMetricProvider{ telemetryBuilder: telemetryBuilder, }) diff --git a/receiver/singletonreceivercreator/receiver.go b/receiver/singletonreceivercreator/receiver.go index 107cf063..714c962c 100644 --- a/receiver/singletonreceivercreator/receiver.go +++ b/receiver/singletonreceivercreator/receiver.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "k8s.io/client-go/tools/leaderelection" "github.com/kyma-project/opentelemetry-collector-components/receiver/singletonreceivercreator/internal/metadata" ) @@ -19,7 +20,7 @@ type singletonReceiverCreator struct { nextMetricsConsumer consumer.Metrics telemetryBuilder *metadata.TelemetryBuilder - identity string + leaseHolderID string subReceiverRunner *receiverRunner cancel context.CancelFunc } @@ -29,14 +30,14 @@ func newSingletonReceiverCreator( cfg *Config, consumer consumer.Metrics, telemetryBuilder *metadata.TelemetryBuilder, - identity string, + leaseHolderID string, ) *singletonReceiverCreator { return &singletonReceiverCreator{ params: params, cfg: cfg, nextMetricsConsumer: consumer, telemetryBuilder: telemetryBuilder, - identity: identity, + leaseHolderID: leaseHolderID, } } @@ -57,20 +58,31 @@ func (c *singletonReceiverCreator) Start(_ context.Context, h component.Host) er c.params.TelemetrySettings.Logger.Info("Starting singleton election receiver...") - client, err := c.cfg.getK8sClient() + c.params.TelemetrySettings.Logger.Debug("Creating leader elector...") + c.subReceiverRunner = newReceiverRunner(c.params, rcHost) + + leaderElector, err := c.initLeaderElector() if err != nil { - return fmt.Errorf("failed to create Kubernetes client: %w", err) + return fmt.Errorf("failed to create leader elector: %w", err) } - c.params.TelemetrySettings.Logger.Debug("Creating leader elector...") - c.subReceiverRunner = newReceiverRunner(c.params, rcHost) + go c.runLeaderElector(ctx, leaderElector) + + return nil +} + +func (c *singletonReceiverCreator) initLeaderElector() (*leaderelection.LeaderElector, error) { + client, err := c.cfg.getK8sClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %w", err) + } - leaderElector, err := newLeaderElector( + return newLeaderElector( c.cfg.leaderElectionConfig, client, c.telemetryBuilder, func(ctx context.Context) { - c.params.TelemetrySettings.Logger.Info("Elected as leader") + c.params.TelemetrySettings.Logger.Info("Leader lease acquired") //nolint:contextcheck // no context passed, as this follows the same pattern as the upstream implementation if err := c.startSubReceiver(); err != nil { c.params.TelemetrySettings.Logger.Error("Failed to start subreceiver", zap.Error(err)) @@ -78,20 +90,27 @@ func (c *singletonReceiverCreator) Start(_ context.Context, h component.Host) er }, //nolint:contextcheck // no context passed, as this follows the same pattern as the upstream implementation func() { - c.params.TelemetrySettings.Logger.Info("Lost leadership") + c.params.TelemetrySettings.Logger.Info("Leader lease lost") if err := c.stopSubReceiver(); err != nil { c.params.TelemetrySettings.Logger.Error("Failed to stop subreceiver", zap.Error(err)) } }, - c.identity, + c.leaseHolderID, ) - if err != nil { - return fmt.Errorf("failed to create leader elector: %w", err) - } +} - //nolint:contextcheck // Create a new context as specified in the interface documentation - go leaderElector.Run(ctx) - return nil +func (c *singletonReceiverCreator) runLeaderElector(ctx context.Context, leaderElector *leaderelection.LeaderElector) { + // Leader election loop stops if context is canceled or the leader elector loses the lease. + // The loop allows continued participation in leader election, even if the lease is lost. + for { + leaderElector.Run(ctx) + + if ctx.Err() != nil { + break + } + + c.params.TelemetrySettings.Logger.Info("Leader lease lost. Returning to standby mode...") + } } func (c *singletonReceiverCreator) startSubReceiver() error { diff --git a/receiver/singletonreceivercreator/samples/collectors-with-leader.yaml b/receiver/singletonreceivercreator/samples/collectors-with-leader.yaml index d192f202..e96467bf 100644 --- a/receiver/singletonreceivercreator/samples/collectors-with-leader.yaml +++ b/receiver/singletonreceivercreator/samples/collectors-with-leader.yaml @@ -1,3 +1,118 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: lease-manager + namespace: default +rules: +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - list + - watch + - create + - update + - patch + - delete +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cluster-monitor +rules: +- apiGroups: + - "" + resources: + - events + - namespaces + - namespaces/status + - nodes + - nodes/spec + - pods + - pods/status + - replicationcontrollers + - replicationcontrollers/status + - resourcequotas + - services + verbs: + - get + - list + - watch +- apiGroups: + - apps + resources: + - daemonsets + - deployments + - replicasets + - statefulsets + verbs: + - get + - list + - watch +- apiGroups: + - extensions + resources: + - daemonsets + - deployments + - replicasets + verbs: + - get + - list + - watch +- apiGroups: + - batch + resources: + - jobs + - cronjobs + verbs: + - get + - list + - watch +- apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - get + - list + - watch +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: collector + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: lease-manager + namespace: default +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: lease-manager +subjects: + - kind: ServiceAccount + name: collector + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cluster-monitor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cluster-monitor +subjects: +- kind: ServiceAccount + name: collector + namespace: default +--- apiVersion: v1 kind: ConfigMap metadata: @@ -22,7 +137,6 @@ data: batch: exporters: debug: - verbosity: detailed service: pipelines: @@ -39,6 +153,8 @@ kind: Pod metadata: name: collector-1 namespace: default + labels: + app: collector-1 spec: serviceAccountName: collector volumes: @@ -46,7 +162,7 @@ spec: configMap: name: collector-config containers: - - image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.107.0-main + - image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.111.0-main imagePullPolicy: Always name: collector volumeMounts: @@ -60,6 +176,8 @@ kind: Pod metadata: name: collector-2 namespace: default + labels: + app: collector-2 spec: serviceAccountName: collector volumes: @@ -67,7 +185,7 @@ spec: configMap: name: collector-config containers: - - image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.107.0-main + - image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.111.0-main imagePullPolicy: Always name: collector volumeMounts: diff --git a/receiver/singletonreceivercreator/samples/rbac.yaml b/receiver/singletonreceivercreator/samples/rbac.yaml deleted file mode 100644 index 85befca8..00000000 --- a/receiver/singletonreceivercreator/samples/rbac.yaml +++ /dev/null @@ -1,114 +0,0 @@ ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - name: lease-manager - namespace: default -rules: -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - get - - list - - watch - - create - - update - - patch - - delete ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: cluster-monitor -rules: -- apiGroups: - - "" - resources: - - events - - namespaces - - namespaces/status - - nodes - - nodes/spec - - pods - - pods/status - - replicationcontrollers - - replicationcontrollers/status - - resourcequotas - - services - verbs: - - get - - list - - watch -- apiGroups: - - apps - resources: - - daemonsets - - deployments - - replicasets - - statefulsets - verbs: - - get - - list - - watch -- apiGroups: - - extensions - resources: - - daemonsets - - deployments - - replicasets - verbs: - - get - - list - - watch -- apiGroups: - - batch - resources: - - jobs - - cronjobs - verbs: - - get - - list - - watch -- apiGroups: - - autoscaling - resources: - - horizontalpodautoscalers - verbs: - - get - - list - - watch ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: collector - namespace: default ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - name: lease-manager - namespace: default -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: Role - name: lease-manager -subjects: - - kind: ServiceAccount - name: collector - namespace: default ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: cluster-monitor -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: cluster-monitor -subjects: -- kind: ServiceAccount - name: collector - namespace: default