Skip to content

Commit

Permalink
fix: generation of namespaced filters
Browse files Browse the repository at this point in the history
The recursion bug was caused by the loop within the merge closure,
which itself is called for each item. This behaviour let to having the
hashed namespace been added multiple times.

Signed-off-by: Markus Freitag <[email protected]>
  • Loading branch information
MarkusFreitag committed Apr 22, 2024
1 parent cc35e2a commit dc1eedd
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 25 deletions.
50 changes: 25 additions & 25 deletions apis/fluentbit/v1alpha2/filter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"bytes"
"crypto/md5"
"fmt"
"reflect"
"sort"
"strings"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter"
"github.com/fluent/fluent-operator/v2/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -80,32 +82,30 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
if item.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex)))
}
for _, filter := range item.Spec.FilterItems {
if filter.Kubernetes != nil {
kubeTagPrefix := filter.Kubernetes.KubeTagPrefix
if kubeTagPrefix == "" {
kubeTagPrefix = "kube.var.log.containers."
}
filter.Kubernetes.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix)
if filter.Kubernetes.RegexParser != "" {
filter.Kubernetes.RegexParser = fmt.Sprintf("%s-%x", filter.Kubernetes.RegexParser, md5.Sum([]byte(item.Namespace)))
}

switch f := p.(type) {
case *filter.Kubernetes:
kubeTagPrefix := f.KubeTagPrefix
if kubeTagPrefix == "" {
kubeTagPrefix = "kube.var.log.containers."
}
if filter.Parser != nil {
parsers := strings.Split(filter.Parser.Parser, ",")
parserString := ""
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace)))
parserString = parserString + parsers[i] + ","
}
parserString = strings.TrimSuffix(parserString, ",")
filter.Parser.Parser = parserString
f.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix)
if f.RegexParser != "" {
f.RegexParser = fmt.Sprintf("%s-%x", f.RegexParser, md5.Sum([]byte(item.Namespace)))
}
if filter.CustomPlugin != nil && filter.CustomPlugin.Config != "" {
filter.CustomPlugin.Config = custom.MakeCustomConfigNamespaced(filter.CustomPlugin.Config, item.Namespace)
case *filter.Parser:
parsers := strings.Split(f.Parser, ",")
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace)))
}
f.Parser = strings.Join(parsers, ",")
case *custom.CustomPlugin:
if f.Config != "" {
f.Config = custom.MakeCustomConfigNamespaced(f.Config, item.Namespace)
}
}

kvs, err := p.Params(sl)
if err != nil {
return err
Expand Down
133 changes: 133 additions & 0 deletions apis/fluentbit/v1alpha2/filter_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package v1alpha2

import (
"testing"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter"
. "github.com/onsi/gomega"
)

func ptr[T any](v T) *T { return &v }

func TestFilterList_Load(t *testing.T) {
testcases := []struct {
name string
input Filter
expected string
}{
{
name: "a single filteritem",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "a single filteritem, with multiple plugins",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "multiple filteritems",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "second-parser",
ReserveData: ptr(true),
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "third-parser",
ReserveData: ptr(true),
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
[Filter]
Name parser
Key_Name msg
Parser second-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
[Filter]
Name parser
Key_Name msg
Parser third-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
`,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
g := NewGomegaWithT(t)

sl := plugins.NewSecretLoader(nil, "testnamespace")

fl := FilterList{
Items: make([]Filter, 1),
}
fl.Items[0] = tc.input

renderedFilterList, err := fl.Load(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(renderedFilterList).To(Equal(tc.expected))
})
}
}

0 comments on commit dc1eedd

Please sign in to comment.