Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix namespaced filters #1143

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 8 additions & 30 deletions apis/fluentbit/v1alpha2/filter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package v1alpha2

import (
"bytes"
"crypto/md5"
"fmt"
"reflect"
"sort"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom"
"github.com/fluent/fluent-operator/v2/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -80,32 +78,12 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
if item.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex)))
}
for _, filter := range item.Spec.FilterItems {
if filter.Kubernetes != nil {
kubeTagPrefix := filter.Kubernetes.KubeTagPrefix
if kubeTagPrefix == "" {
kubeTagPrefix = "kube.var.log.containers."
}
filter.Kubernetes.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix)
if filter.Kubernetes.RegexParser != "" {
filter.Kubernetes.RegexParser = fmt.Sprintf("%s-%x", filter.Kubernetes.RegexParser, md5.Sum([]byte(item.Namespace)))
}
}
if filter.Parser != nil {
parsers := strings.Split(filter.Parser.Parser, ",")
parserString := ""
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace)))
parserString = parserString + parsers[i] + ","
}
parserString = strings.TrimSuffix(parserString, ",")
filter.Parser.Parser = parserString
}
if filter.CustomPlugin != nil && filter.CustomPlugin.Config != "" {
filter.CustomPlugin.Config = custom.MakeCustomConfigNamespaced(filter.CustomPlugin.Config, item.Namespace)
}

var iface interface{} = p
if f, ok := iface.(plugins.Namespaceable); ok {
f.MakeNamespaced(item.Namespace)
}

kvs, err := p.Params(sl)
if err != nil {
return err
Expand Down
133 changes: 133 additions & 0 deletions apis/fluentbit/v1alpha2/filter_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package v1alpha2

import (
"testing"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter"
. "github.com/onsi/gomega"
)

func ptr[T any](v T) *T { return &v }

func TestFilterList_Load(t *testing.T) {
testcases := []struct {
name string
input Filter
expected string
}{
{
name: "a single filteritem",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "a single filteritem, with multiple plugins",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "multiple filteritems",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "second-parser",
ReserveData: ptr(true),
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "third-parser",
ReserveData: ptr(true),
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
[Filter]
Name parser
Key_Name msg
Parser second-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
[Filter]
Name parser
Key_Name msg
Parser third-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
`,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
g := NewGomegaWithT(t)

sl := plugins.NewSecretLoader(nil, "testnamespace")

fl := FilterList{
Items: make([]Filter, 1),
}
fl.Items[0] = tc.input

renderedFilterList, err := fl.Load(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(renderedFilterList).To(Equal(tc.expected))
})
}
}
9 changes: 8 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/custom/custom_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package custom
import (
"bytes"
"fmt"
"github.com/fluent/fluent-operator/v2/pkg/utils"
"strings"

"github.com/fluent/fluent-operator/v2/pkg/utils"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)
Expand All @@ -28,6 +29,12 @@ func (a *CustomPlugin) Params(_ plugins.SecretLoader) (*params.KVs, error) {
return kvs, nil
}

func (c *CustomPlugin) MakeNamespaced(ns string) {
if c.Config != "" {
c.Config = MakeCustomConfigNamespaced(c.Config, ns)
}
}

func indentation(str string) string {
splits := strings.Split(str, "\n")
var buf bytes.Buffer
Expand Down
11 changes: 11 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"crypto/md5"
"fmt"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
Expand Down Expand Up @@ -189,3 +190,13 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (k *Kubernetes) MakeNamespaced(ns string) {
if k.KubeTagPrefix == "" {
k.KubeTagPrefix = "kube.var.log.containers."
}
k.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), k.KubeTagPrefix)
if k.RegexParser != "" {
k.RegexParser = fmt.Sprintf("%s-%x", k.RegexParser, md5.Sum([]byte(ns)))
}
}
10 changes: 10 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/parser_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"crypto/md5"
"fmt"
"strings"

Expand Down Expand Up @@ -59,3 +60,12 @@ func (p *Parser) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (p *Parser) MakeNamespaced(ns string) {
parsers := strings.Split(p.Parser, ",")
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(ns)))
}
p.Parser = strings.Join(parsers, ",")
}
14 changes: 13 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package filter

import (
"crypto/md5"
"fmt"
"strings"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)
Expand All @@ -18,7 +22,7 @@ type RewriteTag struct {
// When the filter emits a record under the new Tag, there is an internal emitter
// plugin that takes care of the job. Since this emitter expose metrics as any other
// component of the pipeline, you can use this property to configure an optional name for it.
EmitterName string `json:"emitterName,omitempty"`
EmitterName string `json:"emitterName,omitempty"`
EmitterMemBufLimit string `json:"emitterMemBufLimit,omitempty"`
EmitterStorageType string `json:"emitterStorageType,omitempty"`
}
Expand Down Expand Up @@ -47,3 +51,11 @@ func (r *RewriteTag) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (r *RewriteTag) MakeNamespaced(ns string) {
for idx, rule := range r.Rules {
parts := strings.Fields(rule)
parts[2] = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), parts[2])
r.Rules[idx] = strings.Join(parts, " ")
}
}
6 changes: 6 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@ type Plugin interface {
Name() string
Params(SecretLoader) (*params.KVs, error)
}

// The Namespaceable interface defines a method for adding a namespace
// to a plugins identifier.
type Namespaceable interface {
MakeNamespaced(string)
}
Loading