diff --git a/models/filter.go b/models/filter.go index 4e2f614e3599b..8229dec4918b0 100644 --- a/models/filter.go +++ b/models/filter.go @@ -161,14 +161,7 @@ func (f *Filter) shouldNamePass(key string) bool { // shouldFieldPass returns true if the metric should pass, false if it should drop // based on the drop/pass filter parameters func (f *Filter) shouldFieldPass(key string) bool { - if f.fieldPassFilter != nil && f.fieldDropFilter != nil { - return f.fieldPassFilter.Match(key) && !f.fieldDropFilter.Match(key) - } else if f.fieldPassFilter != nil { - return f.fieldPassFilter.Match(key) - } else if f.fieldDropFilter != nil { - return !f.fieldDropFilter.Match(key) - } - return true + return ShouldPassFilters(f.fieldPassFilter, f.fieldDropFilter, key) } // shouldTagsPass returns true if the metric should pass, false if it should drop @@ -217,6 +210,17 @@ func (f *Filter) filterTags(metric telegraf.Metric) { } } +func ShouldPassFilters(include filter.Filter, exclude filter.Filter, key string) bool { + if include != nil && exclude != nil { + return include.Match(key) && !exclude.Match(key) + } else if include != nil { + return include.Match(key) + } else if exclude != nil { + return !exclude.Match(key) + } + return true +} + func ShouldTagsPass(passFilters []TagFilter, dropFilters []TagFilter, tags []*telegraf.Tag) bool { pass := func(tpf []TagFilter) bool { for _, pat := range tpf { diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index e049652a1c36b..82c1af78b6e0c 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -95,6 +95,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + ## Filter which pod annotations and labels will be added to metric tags + # + # pod_annotation_include = ["annotation-key-1"] + # pod_annotation_exclude = ["exclude-me"] + # pod_label_include = ["label-key-1"] + # pod_label_exclude = ["exclude-me"] + # cache refresh interval to set the interval for re-sync of pods list. # Default is 60 minutes. # cache_refresh_interval = 60 diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 3a2ef96359b12..ff7368d8e6b89 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -5,8 +5,6 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/models" "net" "net/http" "net/url" @@ -15,6 +13,9 @@ import ( "strconv" "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -369,10 +370,13 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } p.Log.Debugf("will scrape metrics from %q", targetURL.String()) - // add annotation as metrics tags - tags := pod.Annotations - if tags == nil { - tags = map[string]string{} + tags := map[string]string{} + + // add annotation as metrics tags, subject to include/exclude filters + for k, v := range pod.Annotations { + if models.ShouldPassFilters(p.podAnnotationIncludeFilter, p.podAnnotationExcludeFilter, k) { + tags[k] = v + } } tags["pod_name"] = pod.Name @@ -382,9 +386,11 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } tags[podNamespace] = pod.Namespace - // add labels as metrics tags + // add labels as metrics tags, subject to include/exclude filters for k, v := range pod.Labels { - tags[k] = v + if models.ShouldPassFilters(p.podLabelIncludeFilter, p.podLabelExcludeFilter, k) { + tags[k] = v + } } podURL := p.AddressToURL(targetURL, targetURL.Hostname()) diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index b7624e6e3f89c..8981b7c6d0984 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -1,9 +1,10 @@ package prometheus import ( - "k8s.io/client-go/tools/cache" "testing" + "k8s.io/client-go/tools/cache" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -264,6 +265,96 @@ func TestInvalidFieldSelector(t *testing.T) { require.NotEqual(t, err, nil) } +func TestAnnotationFilters(t *testing.T) { + p := pod() + p.Annotations = map[string]string{ + "prometheus.io/scrape": "true", + "includeme": "true", + "excludeme": "true", + "neutral": "true", + } + + cases := []struct { + desc string + include []string + exclude []string + expectedTags []string + }{ + {"Just include", + []string{"includeme"}, + nil, + []string{"includeme"}}, + {"Just exclude", + nil, + []string{"excludeme"}, + []string{"includeme", "neutral"}}, + {"Include & exclude", + []string{"includeme"}, + []string{"exludeme"}, + []string{"includeme"}}, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + prom.PodAnnotationInclude = tc.include + prom.PodAnnotationExclude = tc.exclude + require.NoError(t, prom.initFilters()) + registerPod(p, prom) + for _, pd := range prom.kubernetesPods { + for _, tagKey := range tc.expectedTags { + require.Contains(t, pd.Tags, tagKey) + } + } + }) + } +} + +func TestLabelFilters(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Labels = map[string]string{ + "includeme": "true", + "excludeme": "true", + "neutral": "true", + } + + cases := []struct { + desc string + include []string + exclude []string + expectedTags []string + }{ + {"Just include", + []string{"includeme"}, + nil, + []string{"includeme"}}, + {"Just exclude", + nil, + []string{"excludeme"}, + []string{"includeme", "neutral"}}, + {"Include & exclude", + []string{"includeme"}, + []string{"exludeme"}, + []string{"includeme"}}, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + prom.PodLabelInclude = tc.include + prom.PodLabelExclude = tc.exclude + require.NoError(t, prom.initFilters()) + registerPod(p, prom) + for _, pd := range prom.kubernetesPods { + for _, tagKey := range tc.expectedTags { + require.Contains(t, pd.Tags, tagKey) + } + } + }) + } +} + func pod() *corev1.Pod { p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}, Status: corev1.PodStatus{}, Spec: corev1.PodSpec{}} p.Status.PodIP = "127.0.0.1" diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index ebb01b0c2ad04..2e631650638c4 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/models" "k8s.io/client-go/tools/cache" @@ -119,6 +120,17 @@ type Prometheus struct { NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"` NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"` + PodAnnotationInclude []string `toml:"pod_annotation_include"` + PodAnnotationExclude []string `toml:"pod_annotation_exclude"` + + PodLabelInclude []string `toml:"pod_label_include"` + PodLabelExclude []string `toml:"pod_label_exclude"` + + podAnnotationIncludeFilter filter.Filter + podAnnotationExcludeFilter filter.Filter + podLabelIncludeFilter filter.Filter + podLabelExcludeFilter filter.Filter + // Only for monitor_kubernetes_pods=true CacheRefreshInterval int `toml:"cache_refresh_interval"` @@ -193,6 +205,10 @@ func (p *Prometheus) Init() error { p.nsAnnotationDrop = append(p.nsAnnotationDrop, tagFilter) } + if err := p.initFilters(); err != nil { + return err + } + ctx := context.Background() if p.ResponseTimeout != 0 { p.HTTPClientConfig.Timeout = p.ResponseTimeout @@ -211,6 +227,38 @@ func (p *Prometheus) Init() error { return nil } +func (p *Prometheus) initFilters() error { + if p.PodAnnotationExclude != nil { + podAnnotationExclude, err := filter.Compile(p.PodAnnotationExclude) + if err != nil { + return fmt.Errorf("error compiling 'pod_annotation_exclude': %w", err) + } + p.podAnnotationExcludeFilter = podAnnotationExclude + } + if p.PodAnnotationInclude != nil { + podAnnotationInclude, err := filter.Compile(p.PodAnnotationInclude) + if err != nil { + return fmt.Errorf("error compiling 'pod_annotation_include': %w", err) + } + p.podAnnotationIncludeFilter = podAnnotationInclude + } + if p.PodLabelExclude != nil { + podLabelExclude, err := filter.Compile(p.PodLabelExclude) + if err != nil { + return fmt.Errorf("error compiling 'pod_label_exclude': %w", err) + } + p.podLabelExcludeFilter = podLabelExclude + } + if p.PodLabelInclude != nil { + podLabelInclude, err := filter.Compile(p.PodLabelInclude) + if err != nil { + return fmt.Errorf("error compiling 'pod_label_include': %w", err) + } + p.podLabelIncludeFilter = podLabelInclude + } + return nil +} + func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL { host := address if u.Port() != "" { diff --git a/plugins/inputs/prometheus/sample.conf b/plugins/inputs/prometheus/sample.conf index 33c9a90a5f8ba..b57a1558990e5 100644 --- a/plugins/inputs/prometheus/sample.conf +++ b/plugins/inputs/prometheus/sample.conf @@ -78,6 +78,13 @@ # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + ## Filter which pod annotations and labels will be added to metric tags + # + # pod_annotation_include = ["annotation-key-1"] + # pod_annotation_exclude = ["exclude-me"] + # pod_label_include = ["label-key-1"] + # pod_label_exclude = ["exclude-me"] + # cache refresh interval to set the interval for re-sync of pods list. # Default is 60 minutes. # cache_refresh_interval = 60