Skip to content

Commit

Permalink
feat: filter ordinals
Browse files Browse the repository at this point in the history
Implements #1384

Signed-off-by: Zoltán Reegn <[email protected]>
  • Loading branch information
reegnz committed Oct 25, 2024
1 parent d33b950 commit fc4ea21
Show file tree
Hide file tree
Showing 15 changed files with 389 additions and 110 deletions.
63 changes: 18 additions & 45 deletions apis/fluentbit/v1alpha2/clusterfilter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type FilterSpec struct {
LogLevel string `json:"logLevel,omitempty"`
// A set of filter plugins in order.
FilterItems []FilterItem `json:"filters,omitempty"`
// An ordinal to influence filter ordering
Ordinal int32 `json:"ordinal,omitempty"`
}

type FilterItem struct {
Expand Down Expand Up @@ -101,17 +103,25 @@ type ClusterFilterList struct {

// +kubebuilder:object:generate:=false

// FilterByName implements sort.Interface for []ClusterFilter based on the Name field.
type FilterByName []ClusterFilter

func (a FilterByName) Len() int { return len(a) }
func (a FilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
// FilterByOrdinalAndName implements sort.Interface for []ClusterFilter based on the Ordinal and Name field.
type FilterByOrdinalAndName []ClusterFilter

func (a FilterByOrdinalAndName) Len() int { return len(a) }
func (a FilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByOrdinalAndName) Less(i, j int) bool {
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
return true
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
return a[i].Name < a[j].Name
} else {
return false
}
}

func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))

for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down Expand Up @@ -156,7 +166,7 @@ func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))
if len(list.Items) == 0 {
return "", nil
}
Expand Down Expand Up @@ -202,43 +212,6 @@ func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (st
return buf.String(), nil
}

func (clusterFilter ClusterFilter) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer
padding := utils.YamlIndent(depth + 2)
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
}

if p.Name() != "" {
buf.WriteString(fmt.Sprintf("%s- name: %s\n", utils.YamlIndent(depth+1), p.Name()))
}
if clusterFilter.Spec.LogLevel != "" {
buf.WriteString(fmt.Sprintf("%slog_level: %s\n", padding, clusterFilter.Spec.LogLevel))
}
if clusterFilter.Spec.Match != "" {
buf.WriteString(fmt.Sprintf("%smatch: \"%s\"\n", padding, clusterFilter.Spec.Match))
}
if clusterFilter.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf("%smatch_regex: %s\n", padding, clusterFilter.Spec.MatchRegex))
}
kvs, err := p.Params(sl)
if err != nil {
return err
}
buf.WriteString(kvs.YamlString(depth + 2))
return nil
}
for _, elem := range clusterFilter.Spec.FilterItems {
for i := 0; i < reflect.ValueOf(elem).NumField(); i++ {
p, _ := reflect.ValueOf(elem).Field(i).Interface().(plugins.Plugin)
if err := merge(p); err != nil {
return "", err
}
}
}
return buf.String(), nil
}
func init() {
SchemeBuilder.Register(&ClusterFilter{}, &ClusterFilterList{})
}
206 changes: 202 additions & 4 deletions apis/fluentbit/v1alpha2/clusterfilter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestClusterFilterList_Load(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name modify
Match logs.foo.bar
Condition Key_value_equals kve0 kvev0
Expand Down Expand Up @@ -185,8 +185,206 @@ func TestClusterFilterList_Load(t *testing.T) {
}
}

func TestClusterFilterList_Load_Before(t *testing.T) {
filtersExpected := `[Filter]
Name grep
Match *
Alias third
Regex ^.*$
[Filter]
Name grep
Match *
Alias first
Regex ^.*$
`

g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, "testnamespace")

filterObj1 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "first",
},
Spec: FilterSpec{
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "first",
},
Regex: "^.*$",
},
},
},
},
}

filterObj2 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "second",
},
Spec: FilterSpec{
Ordinal: 10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "second",
},
Regex: "^.*$",
},
},
},
},
}

filterObj3 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "third",
},
Spec: FilterSpec{
Ordinal: -10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "third",
},
Regex: "^.*$",
},
},
},
},
}

filters := ClusterFilterList{
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
}

i := 0
for i < 5 {
clusterFilters, err := filters.LoadBefore(sl)
g.Expect(err).NotTo(HaveOccurred())

g.Expect(clusterFilters).To(Equal(filtersExpected))

i++
}
}

func TestClusterFilterList_Load_After(t *testing.T) {
filtersExpected := `[Filter]
Name grep
Match *
Alias second
Regex ^.*$
`

g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, "testnamespace")

filterObj1 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "first",
},
Spec: FilterSpec{
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "first",
},
Regex: "^.*$",
},
},
},
},
}

filterObj2 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "second",
},
Spec: FilterSpec{
Ordinal: 10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "second",
},
Regex: "^.*$",
},
},
},
},
}

filterObj3 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "third",
},
Spec: FilterSpec{
Ordinal: -10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "third",
},
Regex: "^.*$",
},
},
},
},
}

filters := ClusterFilterList{
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
}

i := 0
for i < 5 {
clusterFilters, err := filters.LoadAfter(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(clusterFilters).To(Equal(filtersExpected))

i++
}
}

func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name record_modifier
Match logs.foo.bar
Record hostname ${HOSTNAME}
Expand Down Expand Up @@ -260,7 +458,7 @@ func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
}

func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: modify
match: "logs.foo.bar"
condition:
Expand Down Expand Up @@ -438,7 +636,7 @@ func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
}

func TestClusterFilter_RecordModifier_Generated_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: record_modifier
match: "logs.foo.bar"
record:
Expand Down
1 change: 0 additions & 1 deletion apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"crypto/md5"
"fmt"

"sort"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
Expand Down
22 changes: 15 additions & 7 deletions apis/fluentbit/v1alpha2/filter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,24 @@ type FilterList struct {
Items []Filter `json:"items"`
}

type NSFilterByName []Filter

func (a NSFilterByName) Len() int { return len(a) }
func (a NSFilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a NSFilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
type NSFilterByOrdinalAndName []Filter

func (a NSFilterByOrdinalAndName) Len() int { return len(a) }
func (a NSFilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a NSFilterByOrdinalAndName) Less(i, j int) bool {
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
return true
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
return a[i].Name < a[j].Name
} else {
return false
}
}

func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
var buf bytes.Buffer

sort.Sort(NSFilterByName(list.Items))
sort.Sort(NSFilterByOrdinalAndName(list.Items))

for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down Expand Up @@ -108,7 +116,7 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
func (list FilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer

sort.Sort(NSFilterByName(list.Items))
sort.Sort(NSFilterByOrdinalAndName(list.Items))
padding := utils.YamlIndent(depth + 2)
for _, item := range list.Items {
merge := func(p plugins.Plugin) error {
Expand Down
Loading

0 comments on commit fc4ea21

Please sign in to comment.