Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter ordinals #1386

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 18 additions & 45 deletions apis/fluentbit/v1alpha2/clusterfilter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type FilterSpec struct {
LogLevel string `json:"logLevel,omitempty"`
// A set of filter plugins in order.
FilterItems []FilterItem `json:"filters,omitempty"`
// An ordinal to influence filter ordering
Ordinal int32 `json:"ordinal,omitempty"`
}

type FilterItem struct {
Expand Down Expand Up @@ -101,17 +103,25 @@ type ClusterFilterList struct {

// +kubebuilder:object:generate:=false

// FilterByName implements sort.Interface for []ClusterFilter based on the Name field.
type FilterByName []ClusterFilter

func (a FilterByName) Len() int { return len(a) }
func (a FilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
// FilterByOrdinalAndName implements sort.Interface for []ClusterFilter based on the Ordinal and Name field.
type FilterByOrdinalAndName []ClusterFilter

func (a FilterByOrdinalAndName) Len() int { return len(a) }
func (a FilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByOrdinalAndName) Less(i, j int) bool {
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
return true
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
return a[i].Name < a[j].Name
} else {
return false
}
}

func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))

for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down Expand Up @@ -156,7 +166,7 @@ func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))
if len(list.Items) == 0 {
return "", nil
}
Expand Down Expand Up @@ -202,43 +212,6 @@ func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (st
return buf.String(), nil
}

func (clusterFilter ClusterFilter) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer
padding := utils.YamlIndent(depth + 2)
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
}

if p.Name() != "" {
buf.WriteString(fmt.Sprintf("%s- name: %s\n", utils.YamlIndent(depth+1), p.Name()))
}
if clusterFilter.Spec.LogLevel != "" {
buf.WriteString(fmt.Sprintf("%slog_level: %s\n", padding, clusterFilter.Spec.LogLevel))
}
if clusterFilter.Spec.Match != "" {
buf.WriteString(fmt.Sprintf("%smatch: \"%s\"\n", padding, clusterFilter.Spec.Match))
}
if clusterFilter.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf("%smatch_regex: %s\n", padding, clusterFilter.Spec.MatchRegex))
}
kvs, err := p.Params(sl)
if err != nil {
return err
}
buf.WriteString(kvs.YamlString(depth + 2))
return nil
}
for _, elem := range clusterFilter.Spec.FilterItems {
for i := 0; i < reflect.ValueOf(elem).NumField(); i++ {
p, _ := reflect.ValueOf(elem).Field(i).Interface().(plugins.Plugin)
if err := merge(p); err != nil {
return "", err
}
}
}
return buf.String(), nil
}
func init() {
SchemeBuilder.Register(&ClusterFilter{}, &ClusterFilterList{})
}
115 changes: 111 additions & 4 deletions apis/fluentbit/v1alpha2/clusterfilter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestClusterFilterList_Load(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name modify
Match logs.foo.bar
Condition Key_value_equals kve0 kvev0
Expand Down Expand Up @@ -185,8 +185,115 @@ func TestClusterFilterList_Load(t *testing.T) {
}
}

func TestClusterFilterList_Load_With_Ordinals(t *testing.T) {
filtersExpected := `[Filter]
Name grep
Match *
Alias first
Regex ^.*$
[Filter]
Name grep
Match *
Alias second
Regex ^.*$
[Filter]
Name grep
Match *
Alias third
Regex ^.*$
`

g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, "testnamespace")

filterObj1 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "first",
},
Spec: FilterSpec{
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "second",
},
Regex: "^.*$",
},
},
},
},
}

filterObj2 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "second",
},
Spec: FilterSpec{
Ordinal: 10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "third",
},
Regex: "^.*$",
},
},
},
},
}

filterObj3 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "third",
},
Spec: FilterSpec{
Ordinal: -10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "first",
},
Regex: "^.*$",
},
},
},
},
}

filters := ClusterFilterList{
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
}

i := 0
for i < 5 {
clusterFilters, err := filters.Load(sl)
g.Expect(err).NotTo(HaveOccurred())

g.Expect(clusterFilters).To(Equal(filtersExpected))

i++
}
}

func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name record_modifier
Match logs.foo.bar
Record hostname ${HOSTNAME}
Expand Down Expand Up @@ -260,7 +367,7 @@ func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
}

func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: modify
match: "logs.foo.bar"
condition:
Expand Down Expand Up @@ -438,7 +545,7 @@ func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
}

func TestClusterFilter_RecordModifier_Generated_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: record_modifier
match: "logs.foo.bar"
record:
Expand Down
1 change: 0 additions & 1 deletion apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"crypto/md5"
"fmt"

"sort"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
Expand Down
22 changes: 15 additions & 7 deletions apis/fluentbit/v1alpha2/filter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,24 @@ type FilterList struct {
Items []Filter `json:"items"`
}

type NSFilterByName []Filter

func (a NSFilterByName) Len() int { return len(a) }
func (a NSFilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a NSFilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type NSFilterByOrdinalAndName []Filter

func (a NSFilterByOrdinalAndName) Len() int { return len(a) }
func (a NSFilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a NSFilterByOrdinalAndName) Less(i, j int) bool {
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
return true
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
return a[i].Name < a[j].Name
} else {
return false
}
}

func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

sort.Sort(NSFilterByName(list.Items))
sort.Sort(NSFilterByOrdinalAndName(list.Items))

for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down Expand Up @@ -108,7 +116,7 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
func (list FilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer

sort.Sort(NSFilterByName(list.Items))
sort.Sort(NSFilterByOrdinalAndName(list.Items))
padding := utils.YamlIndent(depth + 2)
for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down
Loading
Loading