diff --git a/apis/fluentbit/v1alpha2/filter_types.go b/apis/fluentbit/v1alpha2/filter_types.go index 1a62c36cc..e3c9b671d 100644 --- a/apis/fluentbit/v1alpha2/filter_types.go +++ b/apis/fluentbit/v1alpha2/filter_types.go @@ -18,15 +18,13 @@ package v1alpha2 import ( "bytes" - "crypto/md5" "fmt" + "reflect" + "sort" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" - "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom" "github.com/fluent/fluent-operator/v2/pkg/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "reflect" - "sort" - "strings" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -80,32 +78,12 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) { if item.Spec.MatchRegex != "" { buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex))) } - for _, filter := range item.Spec.FilterItems { - if filter.Kubernetes != nil { - kubeTagPrefix := filter.Kubernetes.KubeTagPrefix - if kubeTagPrefix == "" { - kubeTagPrefix = "kube.var.log.containers." - } - filter.Kubernetes.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix) - if filter.Kubernetes.RegexParser != "" { - filter.Kubernetes.RegexParser = fmt.Sprintf("%s-%x", filter.Kubernetes.RegexParser, md5.Sum([]byte(item.Namespace))) - } - } - if filter.Parser != nil { - parsers := strings.Split(filter.Parser.Parser, ",") - parserString := "" - for i := range parsers { - parsers[i] = strings.Trim(parsers[i], " ") - parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace))) - parserString = parserString + parsers[i] + "," - } - parserString = strings.TrimSuffix(parserString, ",") - filter.Parser.Parser = parserString - } - if filter.CustomPlugin != nil && filter.CustomPlugin.Config != "" { - filter.CustomPlugin.Config = custom.MakeCustomConfigNamespaced(filter.CustomPlugin.Config, item.Namespace) - } + + var iface interface{} = p + if f, ok := iface.(plugins.Namespaceable); ok { + f.MakeNamespaced(item.Namespace) } + kvs, err := p.Params(sl) if err != nil { return err diff --git a/apis/fluentbit/v1alpha2/filter_types_test.go b/apis/fluentbit/v1alpha2/filter_types_test.go new file mode 100644 index 000000000..6e3902ac5 --- /dev/null +++ b/apis/fluentbit/v1alpha2/filter_types_test.go @@ -0,0 +1,133 @@ +package v1alpha2 + +import ( + "testing" + + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter" + . "github.com/onsi/gomega" +) + +func ptr[T any](v T) *T { return &v } + +func TestFilterList_Load(t *testing.T) { + testcases := []struct { + name string + input Filter + expected string + }{ + { + name: "a single filteritem", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + }, + }, + }, + expected: `[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +`, + }, + { + name: "a single filteritem, with multiple plugins", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Kubernetes: &filter.Kubernetes{ + KubeTagPrefix: "custom-tag", + }, + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + }, + }, + }, + expected: `[Filter] + Name kubernetes + Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag +[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +`, + }, + { + name: "multiple filteritems", + input: Filter{ + Spec: FilterSpec{ + FilterItems: []FilterItem{ + FilterItem{ + Kubernetes: &filter.Kubernetes{ + KubeTagPrefix: "custom-tag", + }, + Parser: &filter.Parser{ + KeyName: "log", + Parser: "first-parser", + }, + }, + FilterItem{ + Parser: &filter.Parser{ + KeyName: "msg", + Parser: "second-parser", + ReserveData: ptr(true), + }, + }, + FilterItem{ + Parser: &filter.Parser{ + KeyName: "msg", + Parser: "third-parser", + ReserveData: ptr(true), + }, + }, + }, + }, + }, + expected: `[Filter] + Name kubernetes + Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag +[Filter] + Name parser + Key_Name log + Parser first-parser-d41d8cd98f00b204e9800998ecf8427e +[Filter] + Name parser + Key_Name msg + Parser second-parser-d41d8cd98f00b204e9800998ecf8427e + Reserve_Data true +[Filter] + Name parser + Key_Name msg + Parser third-parser-d41d8cd98f00b204e9800998ecf8427e + Reserve_Data true +`, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + g := NewGomegaWithT(t) + + sl := plugins.NewSecretLoader(nil, "testnamespace") + + fl := FilterList{ + Items: make([]Filter, 1), + } + fl.Items[0] = tc.input + + renderedFilterList, err := fl.Load(sl) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(renderedFilterList).To(Equal(tc.expected)) + }) + } +} diff --git a/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go b/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go index ab3dbb8b0..1c9659822 100644 --- a/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go +++ b/apis/fluentbit/v1alpha2/plugins/custom/custom_types.go @@ -3,9 +3,10 @@ package custom import ( "bytes" "fmt" - "github.com/fluent/fluent-operator/v2/pkg/utils" "strings" + "github.com/fluent/fluent-operator/v2/pkg/utils" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" ) @@ -28,6 +29,12 @@ func (a *CustomPlugin) Params(_ plugins.SecretLoader) (*params.KVs, error) { return kvs, nil } +func (c *CustomPlugin) MakeNamespaced(ns string) { + if c.Config != "" { + c.Config = MakeCustomConfigNamespaced(c.Config, ns) + } +} + func indentation(str string) string { splits := strings.Split(str, "\n") var buf bytes.Buffer diff --git a/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go b/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go index bd5d89a4f..99493a34f 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go @@ -1,6 +1,7 @@ package filter import ( + "crypto/md5" "fmt" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" @@ -189,3 +190,13 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (k *Kubernetes) MakeNamespaced(ns string) { + if k.KubeTagPrefix == "" { + k.KubeTagPrefix = "kube.var.log.containers." + } + k.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), k.KubeTagPrefix) + if k.RegexParser != "" { + k.RegexParser = fmt.Sprintf("%s-%x", k.RegexParser, md5.Sum([]byte(ns))) + } +} diff --git a/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go b/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go index 9aa4e0a3a..dfdfbcfb3 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/parser_types.go @@ -1,6 +1,7 @@ package filter import ( + "crypto/md5" "fmt" "strings" @@ -59,3 +60,12 @@ func (p *Parser) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (p *Parser) MakeNamespaced(ns string) { + parsers := strings.Split(p.Parser, ",") + for i := range parsers { + parsers[i] = strings.Trim(parsers[i], " ") + parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(ns))) + } + p.Parser = strings.Join(parsers, ",") +} diff --git a/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go b/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go index fd724316d..2bd8154bd 100644 --- a/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go +++ b/apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go @@ -1,6 +1,10 @@ package filter import ( + "crypto/md5" + "fmt" + "strings" + "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params" ) @@ -18,7 +22,7 @@ type RewriteTag struct { // When the filter emits a record under the new Tag, there is an internal emitter // plugin that takes care of the job. Since this emitter expose metrics as any other // component of the pipeline, you can use this property to configure an optional name for it. - EmitterName string `json:"emitterName,omitempty"` + EmitterName string `json:"emitterName,omitempty"` EmitterMemBufLimit string `json:"emitterMemBufLimit,omitempty"` EmitterStorageType string `json:"emitterStorageType,omitempty"` } @@ -47,3 +51,11 @@ func (r *RewriteTag) Params(_ plugins.SecretLoader) (*params.KVs, error) { } return kvs, nil } + +func (r *RewriteTag) MakeNamespaced(ns string) { + for idx, rule := range r.Rules { + parts := strings.Fields(rule) + parts[2] = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), parts[2]) + r.Rules[idx] = strings.Join(parts, " ") + } +} diff --git a/apis/fluentbit/v1alpha2/plugins/interface.go b/apis/fluentbit/v1alpha2/plugins/interface.go index 6ac42caa2..f21ebad28 100644 --- a/apis/fluentbit/v1alpha2/plugins/interface.go +++ b/apis/fluentbit/v1alpha2/plugins/interface.go @@ -10,3 +10,9 @@ type Plugin interface { Name() string Params(SecretLoader) (*params.KVs, error) } + +// The Namespaceable interface defines a method for adding a namespace +// to a plugins identifier. +type Namespaceable interface { + MakeNamespaced(string) +}