Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Beyla setting Service Name/Namespace/Instance according to OTEL collector #1415

Merged
merged 8 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/beyla/pkg/internal/filter"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/kubeflags"
"github.com/grafana/beyla/pkg/services"
Expand Down Expand Up @@ -110,6 +111,7 @@ var DefaultConfig = Config{
Enable: kubeflags.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
InformersResyncPeriod: 30 * time.Minute,
MetadataSources: kube.DefaultMetadataSources,
},
HostID: HostIDConfig{
FetchTimeout: 500 * time.Millisecond,
Expand Down
7 changes: 7 additions & 0 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/beyla/pkg/export/prom"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/netolly/transform/cidr"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/kubeflags"
Expand Down Expand Up @@ -53,6 +54,8 @@ attributes:
kubeconfig_path: /foo/bar
enable: true
informers_sync_timeout: 30s
meta_naming_sources:
service_name_labels: ["titi.com/lala"]
instance_id:
dns: true
host_id:
Expand Down Expand Up @@ -101,6 +104,9 @@ network:
nc.AgentIP = "1.2.3.4"
nc.CIDRs = cidr.Definitions{"10.244.0.0/16"}

metaSources := kube.DefaultMetadataSources
metaSources.ServiceNameLabels = []string{"titi.com/lala"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're the best! :)


assert.Equal(t, &Config{
Exec: cfg.Exec,
Port: cfg.Port,
Expand Down Expand Up @@ -176,6 +182,7 @@ network:
Enable: kubeflags.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
InformersResyncPeriod: 30 * time.Minute,
MetadataSources: metaSources,
},
HostID: HostIDConfig{
Override: "the-host-id",
Expand Down
2 changes: 1 addition & 1 deletion pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func buildCommonContextInfo(
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress,
MetaSourceLabels: config.Attributes.Kubernetes.MetaSourceLabels,
MetaSourceLabels: config.Attributes.Kubernetes.MetadataSources,
}),
}
switch {
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestWatcherKubeEnricher(t *testing.T) {

// Setup a fake K8s API connected to the watcherKubeEnricher
fInformer := &fakeInformer{}
store := kube.NewStore(fInformer, kube.MetaSourceLabels{})
store := kube.NewStore(fInformer, kube.MetadataSources{})
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &fakeMetadataProvider{store: store})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
processInfo = fakeProcessInfo
// Setup a fake K8s API connected to the watcherKubeEnricher
fInformer := &fakeInformer{}
store := kube.NewStore(fInformer, kube.MetaSourceLabels{})
store := kube.NewStore(fInformer, kube.MetadataSources{})
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &fakeMetadataProvider{store: store})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type MetadataConfig struct {
SyncTimeout time.Duration
ResyncPeriod time.Duration
MetaCacheAddr string
MetaSourceLabels MetaSourceLabels
MetaSourceLabels MetadataSources
}

type MetadataProvider struct {
Expand Down
66 changes: 52 additions & 14 deletions pkg/internal/kube/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,29 @@ func qName(om *informer.ObjectMeta) qualifiedName {
return qualifiedName{name: om.Name, namespace: om.Namespace, kind: om.Kind}
}

// MetaSourceLabels allow overriding some metadata from kubernetes labels
type MetaSourceLabels struct {
ServiceName string `yaml:"service_name" env:"BEYLA_KUBE_META_SOURCE_LABEL_SERVICE_NAME"`
ServiceNamespace string `yaml:"service_namespace" env:"BEYLA_KUBE_META_SOURCE_LABEL_SERVICE_NAMESPACE"`
// MetadataSources allow overriding some metadata from kubernetes labels and annotations
type MetadataSources struct {
ServiceNameAnnotations []string `yaml:"service_name_annotations" env:"BEYLA_KUBE_ANNOTATION_SERVICE_NAME" envSeparator:","`
ServiceNamespaceAnnotations []string `yaml:"service_namespace_annotations" env:"BEYLA_KUBE_ANNOTATION_SERVICE_NAMESPACE" envSeparator:","`
ServiceNameLabels []string `yaml:"service_name_labels" env:"BEYLA_KUBE_LABEL_SERVICE_NAME" envSeparator:","`
ServiceNamespaceLabels []string `yaml:"service_namespace_labels" env:"BEYLA_KUBE_LABEL_SERVICE_NAMESPACE" envSeparator:","`
}

var DefaultMetadataSources = MetadataSources{
ServiceNameAnnotations: []string{
"resource.opentelemetry.io/service.name",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all annotations starting with "resource.opentelemetry.io/" should be supported

if you want a flag (not sure that it's needed) then just "useAnnotationsForResourceAttributes"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this PR currently changes what we already support: service name, namespace and instance ID. Service version and deployment environment will go into another PR, as they are attributes that aren't currently provided by Beyla.

I'd prefer not using the useAnnotationsForResourceAttributes flag, as it seems a very OTEL-opinionated option and I'd prefer something more flexible to use in other environments and use case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then maybe make it more generic like #1415 (comment)

},
ServiceNamespaceAnnotations: []string{
"resource.opentelemetry.io/service.namespace",
},
// default by empty. If the OTEL operator Instrumentation CRD sets useLabelsForResourceAttributes: true,
// the values below should be populated so:
// - `app.kubernetes.io/name` becomes `service.name`
// - `app.kubernetes.io/version` becomes `service.version`
// - `app.kubernetes.io/part-of` becomes `service.namespace`
// - `app.kubernetes.io/instance` becomes `service.instance.id`
ServiceNameLabels: nil,
ServiceNamespaceLabels: nil,
}

// Store aggregates Kubernetes information from multiple sources:
Expand Down Expand Up @@ -79,10 +98,10 @@ type Store struct {
// they receive is already present in the store
meta.BaseNotifier

sourceLabels MetaSourceLabels
metadataSources MetadataSources
}

func NewStore(kubeMetadata meta.Notifier, sourceLabels MetaSourceLabels) *Store {
func NewStore(kubeMetadata meta.Notifier, metadataSources MetadataSources) *Store {
log := dblog()
db := &Store{
log: log,
Expand All @@ -96,7 +115,7 @@ func NewStore(kubeMetadata meta.Notifier, sourceLabels MetaSourceLabels) *Store
otelServiceInfoByIP: map[string]OTelServiceNamePair{},
metadataNotifier: kubeMetadata,
BaseNotifier: meta.NewBaseNotifier(log),
sourceLabels: sourceLabels,
metadataSources: metadataSources,
}
kubeMetadata.Subscribe(db)
return db
Expand Down Expand Up @@ -289,17 +308,36 @@ func (s *Store) serviceNameNamespaceForMetadata(om *informer.ObjectMeta) (string
} else {
name, namespace = s.serviceNameNamespaceForOwner(om)
}
if s.sourceLabels.ServiceName != "" {
if on, ok := om.Labels[s.sourceLabels.ServiceName]; ok {
name = on
if nameFromMeta := s.valueFromMetadata(om,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this here will override any service names set by the OTEL environment variables. I think this will introduce a difference in how it will be handled by the SDK, because environment variables passed to the process directly will, "I think", override whatever else is set. I think it might be better if we added this override in here:

func (s *Store) serviceNameNamespaceOwnerID(ownerKey, name, namespace string) (string, string) {
	serviceName := name
	serviceNamespace := namespace
...
}

Right after we set them from whatever k8s found, we can override them based on the labels and annotations and then let the envvar code do its thing.

But we should confirm with the SDKs team if my understanding is correct that the env variables take precedence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I reordered the preferences and modified some unit and integration tests to make sure that the environment variables take precedence over annotations or labels.

s.metadataSources.ServiceNameAnnotations,
s.metadataSources.ServiceNameLabels,
); nameFromMeta != "" {
name = nameFromMeta
}
if nsFromMeta := s.valueFromMetadata(om,
s.metadataSources.ServiceNamespaceAnnotations,
s.metadataSources.ServiceNamespaceLabels,
); nsFromMeta != "" {
namespace = nsFromMeta
}
return name, namespace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@mariomac mariomac Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Beyla, the fallback values are previously set into the name and namespace variables, then overridden here if valueFromMetadata returns a non-empty value. They were already set as required, so that's why they are not in this PR.

That's the reason I was mentioning that moving this code into a shared library could be complex due to the way multiple components could populate/store the same information.

Copy link
Member

@zeitlinger zeitlinger Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand - the important part is to have the same logic - can you point me to the logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one side, Beyla does not store the metadata from the Pod owners (DaemonSet, Deployment, Job, etc...) so we cannot query the fallbacks one by one.

Instead, Beyla infers the pod owner name from the actual pod name (assuming the kubernetes pod naming conventions). So at the point we execute the above, service.name already contains the value of of one of the fallbacks (which at the end are nothing that the Pod owner name).

Is then that, if the service name/namespace can't be inferred from env/annotations/labels (in that priority), the original value is kept (the fallback).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it should result in the same outcome - correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. My initial concern was not about the outcome but the code reutilization.

}

// function implemented to provide consistent service metadata naming across multiple
// OTEL implementations: OTEL operator, Loki and Beyla
// https://github.com/grafana/k8s-monitoring-helm/issues/942
func (s *Store) valueFromMetadata(om *informer.ObjectMeta, annotationNames, labelNames []string) string {
for _, key := range annotationNames {
if val, ok := om.Annotations[key]; ok {
return val
}
}
if s.sourceLabels.ServiceNamespace != "" {
if ons, ok := om.Labels[s.sourceLabels.ServiceNamespace]; ok {
namespace = ons
for _, key := range labelNames {
if val, ok := om.Labels[key]; ok {
return val
}
}
return name, namespace
return ""
}

// ServiceNameNamespaceForIP returns the service name and namespace for a given IP address
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/kube/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestContainerInfo(t *testing.T) {

fInformer := &fakeInformer{}

store := NewStore(fInformer, MetaSourceLabels{})
store := NewStore(fInformer, MetadataSources{})

_ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &service})
_ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &podMetaA})
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestMemoryCleanedUp(t *testing.T) {

fInformer := &fakeInformer{}

store := NewStore(fInformer, MetaSourceLabels{})
store := NewStore(fInformer, MetadataSources{})

_ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &service})
_ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &podMetaA})
Expand All @@ -281,7 +281,7 @@ func TestMemoryCleanedUp(t *testing.T) {
// Fixes a memory leak in the store where the objectMetaByIP map was not cleaned up
func TestMetaByIPEntryRemovedIfIPGroupChanges(t *testing.T) {
// GIVEN a store with
store := NewStore(&fakeInformer{}, MetaSourceLabels{})
store := NewStore(&fakeInformer{}, MetadataSources{})
// WHEN an object is created with several IPs
_ = store.On(&informer.Event{
Type: informer.EventType_CREATED,
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestMetaByIPEntryRemovedIfIPGroupChanges(t *testing.T) {
}

func TestNoLeakOnUpdateOrDeletion(t *testing.T) {
store := NewStore(&fakeInformer{}, MetaSourceLabels{})
store := NewStore(&fakeInformer{}, MetadataSources{})
topOwner := &informer.Owner{Name: "foo", Kind: "Deployment"}
require.NoError(t, store.On(&informer.Event{
Type: informer.EventType_CREATED,
Expand Down
Loading
Loading