From dc1eedd0c08e648fa025a26faf5fc49cb008076c Mon Sep 17 00:00:00 2001 From: Markus Freitag Date: Mon, 22 Apr 2024 11:44:48 +0200 Subject: [PATCH 1/3] fix: generation of namespaced filters The recursion bug was caused by the loop within the merge closure, which itself is called for each item. This behaviour let to having the hashed namespace been added multiple times. Signed-off-by: Markus Freitag --- apis/fluentbit/v1alpha2/filter_types.go | 50 +++---- apis/fluentbit/v1alpha2/filter_types_test.go | 133 +++++++++++++++++++ 2 files changed, 158 insertions(+), 25 deletions(-) create mode 100644 apis/fluentbit/v1alpha2/filter_types_test.go diff --git a/apis/fluentbit/v1alpha2/filter_types.go b/apis/fluentbit/v1alpha2/filter_types.go index 1a62c36cc..3e5c87cd9 100644 --- a/apis/fluentbit/v1alpha2/filter_types.go +++ b/apis/fluentbit/v1alpha2/filter_types.go @@ -20,13 +20,15 @@ import ( "bytes" "crypto/md5" "fmt" + "reflect" + "sort" + "strings" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter" "github.com/fluent/fluent-operator/v2/pkg/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "reflect" - "sort" - "strings" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -80,32 +82,30 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) { if item.Spec.MatchRegex != "" { buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex))) } - for _, filter := range item.Spec.FilterItems { - if filter.Kubernetes != nil { - kubeTagPrefix := filter.Kubernetes.KubeTagPrefix - if kubeTagPrefix == "" { - kubeTagPrefix = "kube.var.log.containers." - } - filter.Kubernetes.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix) - if filter.Kubernetes.RegexParser != "" { - filter.Kubernetes.RegexParser = fmt.Sprintf("%s-%x", filter.Kubernetes.RegexParser, md5.Sum([]byte(item.Namespace))) - } + + switch f := p.(type) { + case *filter.Kubernetes: + kubeTagPrefix := f.KubeTagPrefix + if kubeTagPrefix == "" { + kubeTagPrefix = "kube.var.log.containers." } - if filter.Parser != nil { - parsers := strings.Split(filter.Parser.Parser, ",") - parserString := "" - for i := range parsers { - parsers[i] = strings.Trim(parsers[i], " ") - parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace))) - parserString = parserString + parsers[i] + "," - } - parserString = strings.TrimSuffix(parserString, ",") - filter.Parser.Parser = parserString + f.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix) + if f.RegexParser != "" { + f.RegexParser = fmt.Sprintf("%s-%x", f.RegexParser, md5.Sum([]byte(item.Namespace))) } - if filter.CustomPlugin != nil && filter.CustomPlugin.Config != "" { - filter.CustomPlugin.Config = custom.MakeCustomConfigNamespaced(filter.CustomPlugin.Config, item.Namespace) + case *filter.Parser: + parsers := strings.Split(f.Parser, ",") + for i := range parsers { + parsers[i] = strings.Trim(parsers[i], " ") + parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace))) + } + f.Parser = strings.Join(parsers, ",") + case *custom.CustomPlugin: + if f.Config != "" { + f.Config = custom.MakeCustomConfigNamespaced(f.Config, item.Namespace) } } + kvs, err := p.Params(sl) if err != nil { return err diff --git a/apis/fluentbit/v1alpha2/filter_types_test.go b/apis/fluentbit/v1alpha2/filter_types_test.go new file mode 100644 index 000000000..6e3902ac5 --- /dev/null +++ b/apis/fluentbit/v1alpha2/filter_types_test.go @@ -0,0 +1,133 @@ +package v1alpha2 + +import ( + "testing" + + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter" + . "github.com/onsi/gomega" +) + +func ptr[T any](v T) *T { return &v } + +func TestFilterList_Load(t *testing.T) { + testcases := []struct { + name string + input Filter + expected string + }{ + { + name: "a single filteritem", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + }, + }, + }, + expected: `[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +`, + }, + { + name: "a single filteritem, with multiple plugins", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Kubernetes: &filter.Kubernetes{ + KubeTagPrefix: "custom-tag", + }, + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + }, + }, + }, + expected: `[Filter] + Name kubernetes + Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag +[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +`, + }, + { + name: "multiple filteritems", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Kubernetes: &filter.Kubernetes{ + KubeTagPrefix: "custom-tag", + }, + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + FilterItem{ + Parser: &filter.Parser{ + KeyName: "msg", + Parser: "second-parser", + ReserveData: ptr(true), + }, + }, + FilterItem{ + Parser: &filter.Parser{ + KeyName: "msg", + Parser: "third-parser", + ReserveData: ptr(true), + }, + }, + }, + }, + }, + expected: `[Filter] + Name kubernetes + Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag +[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +[Filter] + Name parser + Key_Name msg + Parser second-parser-d41d8cd98f00b204e9800998ecf8427e + Reserve_Data true +[Filter] + Name parser + Key_Name msg + Parser third-parser-d41d8cd98f00b204e9800998ecf8427e + Reserve_Data true +`, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + g := NewGomegaWithT(t) + + sl := plugins.NewSecretLoader(nil, "testnamespace") + + fl := FilterList{ + Items: make([]Filter, 1), + } + fl.Items[0] = tc.input + + renderedFilterList, err := fl.Load(sl) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(renderedFilterList).To(Equal(tc.expected)) + }) + } +} From ae9a6767b056814aa505b0e8fa6f579b5d38f418 Mon Sep 17 00:00:00 2001 From: Markus Freitag Date: Mon, 22 Apr 2024 11:57:20 +0200 Subject: [PATCH 2/3] introduce interface for filters that are namespaceable Signed-off-by: Markus Freitag --- apis/fluentbit/v1alpha2/filter_types.go | 28 ++----------------- .../v1alpha2/plugins/custom/custom_types.go | 9 +++++- .../plugins/filter/kubernetes_types.go | 11 ++++++++ .../v1alpha2/plugins/filter/parser_types.go | 10 +++++++ apis/fluentbit/v1alpha2/plugins/interface.go | 6 ++++ 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/apis/fluentbit/v1alpha2/filter_types.go b/apis/fluentbit/v1alpha2/filter_types.go index 3e5c87cd9..e3c9b671d 100644 --- a/apis/fluentbit/v1alpha2/filter_types.go +++ b/apis/fluentbit/v1alpha2/filter_types.go @@ -18,15 +18,11 @@ package v1alpha2 import ( "bytes" - "crypto/md5" "fmt" "reflect" "sort" - "strings" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" - "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom" - "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter" "github.com/fluent/fluent-operator/v2/pkg/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -83,27 +79,9 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) { buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex))) } - switch f := p.(type) { - case *filter.Kubernetes: - kubeTagPrefix := f.KubeTagPrefix - if kubeTagPrefix == "" { - kubeTagPrefix = "kube.var.log.containers." - } - f.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix) - if f.RegexParser != "" { - f.RegexParser = fmt.Sprintf("%s-%x", f.RegexParser, md5.Sum([]byte(item.Namespace))) - } - case *filter.Parser: - parsers := strings.Split(f.Parser, ",") - for i := range parsers { - parsers[i] = strings.Trim(parsers[i], " ") - parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace))) - } - f.Parser = strings.Join(parsers, ",") - case *custom.CustomPlugin: - if f.Config != "" { - f.Config = custom.MakeCustomConfigNamespaced(f.Config, item.Namespace) - } + var iface interface{} = p + if f, ok := iface.(plugins.Namespaceable); ok { + f.MakeNamespaced(item.Namespace) } kvs, err := p.Params(sl) diff --git a/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go b/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go index ab3dbb8b0..1c9659822 100644 --- a/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go +++ b/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go @@ -3,9 +3,10 @@ package custom import ( "bytes" "fmt" - "github.com/fluent/fluent-operator/v2/pkg/utils" "strings" + "github.com/fluent/fluent-operator/v2/pkg/utils" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" ) @@ -28,6 +29,12 @@ func (a *CustomPlugin) Params(_ plugins.SecretLoader) (*params.KVs, error) { return kvs, nil } +func (c *CustomPlugin) MakeNamespaced(ns string) { + if c.Config != "" { + c.Config = MakeCustomConfigNamespaced(c.Config, ns) + } +} + func indentation(str string) string { splits := strings.Split(str, "\n") var buf bytes.Buffer diff --git a/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go b/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go index bd5d89a4f..99493a34f 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go @@ -1,6 +1,7 @@ package filter import ( + "crypto/md5" "fmt" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" @@ -189,3 +190,13 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (k *Kubernetes) MakeNamespaced(ns string) { + if k.KubeTagPrefix == "" { + k.KubeTagPrefix = "kube.var.log.containers." + } + k.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), k.KubeTagPrefix) + if k.RegexParser != "" { + k.RegexParser = fmt.Sprintf("%s-%x", k.RegexParser, md5.Sum([]byte(ns))) + } +} diff --git a/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go b/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go index 9aa4e0a3a..dfdfbcfb3 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go @@ -1,6 +1,7 @@ package filter import ( + "crypto/md5" "fmt" "strings" @@ -59,3 +60,12 @@ func (p *Parser) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (p *Parser) MakeNamespaced(ns string) { + parsers := strings.Split(p.Parser, ",") + for i := range parsers { + parsers[i] = strings.Trim(parsers[i], " ") + parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(ns))) + } + p.Parser = strings.Join(parsers, ",") +} diff --git a/apis/fluentbit/v1alpha2/plugins/interface.go b/apis/fluentbit/v1alpha2/plugins/interface.go index 6ac42caa2..f21ebad28 100644 --- a/apis/fluentbit/v1alpha2/plugins/interface.go +++ b/apis/fluentbit/v1alpha2/plugins/interface.go @@ -10,3 +10,9 @@ type Plugin interface { Name() string Params(SecretLoader) (*params.KVs, error) } + +// The Namespaceable interface defines a method for adding a namespace +// to a plugins identifier. +type Namespaceable interface { + MakeNamespaced(string) +} From 4d00ba3fa8b376dfed7620a5ba8dfaa118ebeecf Mon Sep 17 00:00:00 2001 From: Markus Freitag Date: Mon, 22 Apr 2024 12:00:28 +0200 Subject: [PATCH 3/3] let RewriteTag filter fullfill namespaceable interface Signed-off-by: Markus Freitag --- .../v1alpha2/plugins/filter/rewritetag_types.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go b/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go index fd724316d..2bd8154bd 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go @@ -1,6 +1,10 @@ package filter import ( + "crypto/md5" + "fmt" + "strings" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" ) @@ -18,7 +22,7 @@ type RewriteTag struct { // When the filter emits a record under the new Tag, there is an internal emitter // plugin that takes care of the job. Since this emitter expose metrics as any other // component of the pipeline, you can use this property to configure an optional name for it. - EmitterName string `json:"emitterName,omitempty"` + EmitterName string `json:"emitterName,omitempty"` EmitterMemBufLimit string `json:"emitterMemBufLimit,omitempty"` EmitterStorageType string `json:"emitterStorageType,omitempty"` } @@ -47,3 +51,11 @@ func (r *RewriteTag) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (r *RewriteTag) MakeNamespaced(ns string) { + for idx, rule := range r.Rules { + parts := strings.Fields(rule) + parts[2] = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), parts[2]) + r.Rules[idx] = strings.Join(parts, " ") + } +}