Skip to content

Commit

Permalink
fix: Singleton receiver creator losing all leader (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
skhalash authored Oct 31, 2024
1 parent 7fdfb7f commit 352e0fd
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 144 deletions.
19 changes: 15 additions & 4 deletions receiver/singletonreceivercreator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/kyma-project/opentelemetry-collector-components/receiver/singletonreceivercreator/internal/metadata"
)

const (
leaseHolderIDEnvVar = "LEASE_HOLDER_ID"
)

func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
Expand Down Expand Up @@ -45,9 +49,16 @@ func createMetricsReceiver(
if !ok {
return nil, errors.New("invalid configuration")
}
hostname, err := os.Hostname()
if err != nil {
return nil, err

// Set leaseHolderID for local development
leaseHolderID := os.Getenv(leaseHolderIDEnvVar)
if leaseHolderID == "" {
// If running in a Pod, the hostname will be the pod name
var err error
leaseHolderID, err = os.Hostname()
if err != nil {
return nil, err
}
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(params.TelemetrySettings)
Expand All @@ -60,6 +71,6 @@ func createMetricsReceiver(
cfg,
consumer,
telemetryBuilder,
hostname,
leaseHolderID,
), nil
}
16 changes: 10 additions & 6 deletions receiver/singletonreceivercreator/leader_elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
leaseAttrKey = "lease"
)

// newLeaderElector return a leader elector object using client-go
// newLeaderElector returns a leader elector object using client-go
func newLeaderElector(
cfg leaderElectionConfig,
client kubernetes.Interface,
Expand All @@ -29,7 +29,6 @@ func newLeaderElector(
onStoppedLeading func(),
identity string,
) (*leaderelection.LeaderElector, error) {

resourceLock, err := resourcelock.New(
resourcelock.LeasesResourceLock,
cfg.leaseNamespace,
Expand All @@ -44,16 +43,21 @@ func newLeaderElector(
}

leConfig := leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: cfg.leaseDuration,
RenewDeadline: cfg.renewDuration,
RetryPeriod: cfg.retryPeriod,
// The lock resource name is used as a lease label in leader election metrics.
Name: cfg.leaseName,
Lock: resourceLock,
LeaseDuration: cfg.leaseDuration,
RenewDeadline: cfg.renewDuration,
RetryPeriod: cfg.retryPeriod,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStartedLeading,
OnStoppedLeading: onStoppedLeading,
},
}

// TODO: Contribute to the leaderelection package to support configuring a metric provider directly,
// eliminating the need for global variables.
leaderelection.SetProvider(leaderMetricProvider{
telemetryBuilder: telemetryBuilder,
})
Expand Down
53 changes: 36 additions & 17 deletions receiver/singletonreceivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"k8s.io/client-go/tools/leaderelection"

"github.com/kyma-project/opentelemetry-collector-components/receiver/singletonreceivercreator/internal/metadata"
)
Expand All @@ -19,7 +20,7 @@ type singletonReceiverCreator struct {
nextMetricsConsumer consumer.Metrics
telemetryBuilder *metadata.TelemetryBuilder

identity string
leaseHolderID string
subReceiverRunner *receiverRunner
cancel context.CancelFunc
}
Expand All @@ -29,14 +30,14 @@ func newSingletonReceiverCreator(
cfg *Config,
consumer consumer.Metrics,
telemetryBuilder *metadata.TelemetryBuilder,
identity string,
leaseHolderID string,
) *singletonReceiverCreator {
return &singletonReceiverCreator{
params: params,
cfg: cfg,
nextMetricsConsumer: consumer,
telemetryBuilder: telemetryBuilder,
identity: identity,
leaseHolderID: leaseHolderID,
}
}

Expand All @@ -57,41 +58,59 @@ func (c *singletonReceiverCreator) Start(_ context.Context, h component.Host) er

c.params.TelemetrySettings.Logger.Info("Starting singleton election receiver...")

client, err := c.cfg.getK8sClient()
c.params.TelemetrySettings.Logger.Debug("Creating leader elector...")
c.subReceiverRunner = newReceiverRunner(c.params, rcHost)

leaderElector, err := c.initLeaderElector()
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
return fmt.Errorf("failed to create leader elector: %w", err)
}

c.params.TelemetrySettings.Logger.Debug("Creating leader elector...")
c.subReceiverRunner = newReceiverRunner(c.params, rcHost)
go c.runLeaderElector(ctx, leaderElector)

return nil
}

func (c *singletonReceiverCreator) initLeaderElector() (*leaderelection.LeaderElector, error) {
client, err := c.cfg.getK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client: %w", err)
}

leaderElector, err := newLeaderElector(
return newLeaderElector(
c.cfg.leaderElectionConfig,
client,
c.telemetryBuilder,
func(ctx context.Context) {
c.params.TelemetrySettings.Logger.Info("Elected as leader")
c.params.TelemetrySettings.Logger.Info("Leader lease acquired")
//nolint:contextcheck // no context passed, as this follows the same pattern as the upstream implementation
if err := c.startSubReceiver(); err != nil {
c.params.TelemetrySettings.Logger.Error("Failed to start subreceiver", zap.Error(err))
}
},
//nolint:contextcheck // no context passed, as this follows the same pattern as the upstream implementation
func() {
c.params.TelemetrySettings.Logger.Info("Lost leadership")
c.params.TelemetrySettings.Logger.Info("Leader lease lost")
if err := c.stopSubReceiver(); err != nil {
c.params.TelemetrySettings.Logger.Error("Failed to stop subreceiver", zap.Error(err))
}
},
c.identity,
c.leaseHolderID,
)
if err != nil {
return fmt.Errorf("failed to create leader elector: %w", err)
}
}

//nolint:contextcheck // Create a new context as specified in the interface documentation
go leaderElector.Run(ctx)
return nil
func (c *singletonReceiverCreator) runLeaderElector(ctx context.Context, leaderElector *leaderelection.LeaderElector) {
// Leader election loop stops if context is canceled or the leader elector loses the lease.
// The loop allows continued participation in leader election, even if the lease is lost.
for {
leaderElector.Run(ctx)

if ctx.Err() != nil {
break
}

c.params.TelemetrySettings.Logger.Info("Leader lease lost. Returning to standby mode...")
}
}

func (c *singletonReceiverCreator) startSubReceiver() error {
Expand Down
124 changes: 121 additions & 3 deletions receiver/singletonreceivercreator/samples/collectors-with-leader.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,118 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: lease-manager
namespace: default
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cluster-monitor
rules:
- apiGroups:
- ""
resources:
- events
- namespaces
- namespaces/status
- nodes
- nodes/spec
- pods
- pods/status
- replicationcontrollers
- replicationcontrollers/status
- resourcequotas
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- daemonsets
- deployments
- replicasets
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- extensions
resources:
- daemonsets
- deployments
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
- cronjobs
verbs:
- get
- list
- watch
- apiGroups:
- autoscaling
resources:
- horizontalpodautoscalers
verbs:
- get
- list
- watch
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: collector
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: lease-manager
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: lease-manager
subjects:
- kind: ServiceAccount
name: collector
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cluster-monitor
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-monitor
subjects:
- kind: ServiceAccount
name: collector
namespace: default
---
apiVersion: v1
kind: ConfigMap
metadata:
Expand All @@ -22,7 +137,6 @@ data:
batch:
exporters:
debug:
verbosity: detailed
service:
pipelines:
Expand All @@ -39,14 +153,16 @@ kind: Pod
metadata:
name: collector-1
namespace: default
labels:
app: collector-1
spec:
serviceAccountName: collector
volumes:
- name: config
configMap:
name: collector-config
containers:
- image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.107.0-main
- image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.111.0-main
imagePullPolicy: Always
name: collector
volumeMounts:
Expand All @@ -60,14 +176,16 @@ kind: Pod
metadata:
name: collector-2
namespace: default
labels:
app: collector-2
spec:
serviceAccountName: collector
volumes:
- name: config
configMap:
name: collector-config
containers:
- image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.107.0-main
- image: europe-docker.pkg.dev/kyma-project/prod/kyma-otel-collector:0.111.0-main
imagePullPolicy: Always
name: collector
volumeMounts:
Expand Down
Loading

0 comments on commit 352e0fd

Please sign in to comment.