-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement filters on deployment, job & origin
This allows filtering of events based on the above mentioned three fields. Multiple filters can be configured and they will run against a message in the order specified. A filter has the form type₀:key₀:value₀;type₁:key₁:value₁;…;typeₙ,keyₙ,valueₙ `type` is either "must" or "mustnot" and specifies, if a messages must contain or must not match it's `key` against `value`. `key` specifies what we want to match against: the deployment name, the job name or the event's origin. It is checked if the message `key`'s value contains the `value`, case-sensitive. Benchmarks have been added to get an understanding of the performance impact of those filters.
- Loading branch information
1 parent
1ba369d
commit 0d9676c
Showing
10 changed files
with
514 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package eventfilter_test | ||
|
||
import ( | ||
"testing" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
func TestEventfilter(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "Eventfilter Suite") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package eventfilter | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/cloudfoundry/sonde-go/events" | ||
) | ||
|
||
const ( | ||
filterSep = ";" | ||
filterKeyValueSep = ":" | ||
Must = "must" | ||
MustNot = "mustnot" | ||
) | ||
|
||
// supportedGetters are all supported keys we can use for filters and the | ||
// functions that pull the respective data out of an envelope. | ||
var supportedGetters = map[string]func(*events.Envelope) string{ | ||
"deployment": func(msg *events.Envelope) string { | ||
return msg.GetDeployment() | ||
}, | ||
"origin": func(msg *events.Envelope) string { | ||
return msg.GetOrigin() | ||
}, | ||
"job": func(msg *events.Envelope) string { | ||
return msg.GetJob() | ||
}, | ||
} | ||
|
||
// SupportedFilterKeys lists all supported filter keys. This is only used to | ||
// signal the list of supported keys to users, e.g. for the usage text. | ||
var SupportedFilterKeys = func() []string { | ||
keys := make([]string, 0, len(supportedGetters)) | ||
for k := range supportedGetters { | ||
keys = append(keys, k) | ||
} | ||
|
||
return keys | ||
}() | ||
|
||
// Filters is something that can tell it's Length (the number of its configured | ||
// filters) and can be used to check if an envelope is accepted or should be | ||
// dropped/discarded. | ||
type Filters interface { | ||
Accepts(*events.Envelope) bool | ||
Length() int | ||
} | ||
|
||
type filterRule struct { | ||
key string | ||
value string | ||
must bool | ||
} | ||
|
||
var ( | ||
errInvalidFormat = fmt.Errorf("format must be '[%s|%s]:field:value'", Must, MustNot) | ||
errEmptyValue = fmt.Errorf("filter value must not be empty string") | ||
errInvaldFilter = fmt.Errorf("filter needs to be either %q or %q", Must, MustNot) | ||
errInvalidFilterKey = fmt.Errorf("filter key not supported") | ||
) | ||
|
||
func parseFilterConfig(filters string) ([]filterRule, error) { | ||
rules := []filterRule{} | ||
|
||
for _, filterRaw := range strings.Split(filters, filterSep) { | ||
filter := strings.TrimSpace(filterRaw) | ||
|
||
if filter == "" { | ||
continue | ||
} | ||
|
||
tokens := strings.Split(filter, filterKeyValueSep) | ||
if len(tokens) != 3 { | ||
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat) | ||
} | ||
|
||
rule := filterRule{ | ||
key: tokens[1], | ||
value: tokens[2], | ||
} | ||
|
||
if rule.value == "" { | ||
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue) | ||
} | ||
|
||
switch t := strings.TrimSpace(strings.ToLower(tokens[0])); t { | ||
case MustNot: | ||
rule.must = false | ||
case Must: | ||
rule.must = true | ||
default: | ||
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter) | ||
} | ||
|
||
rules = append(rules, rule) | ||
} | ||
|
||
return rules, nil | ||
} | ||
|
||
type filter func(*events.Envelope) bool | ||
|
||
type filters []filter | ||
|
||
func (ef *filters) Accepts(msg *events.Envelope) bool { | ||
for _, f := range *ef { | ||
if allow := f(msg); !allow { | ||
return false | ||
} | ||
} | ||
|
||
return true | ||
} | ||
|
||
func (ef *filters) Length() int { | ||
return len(*ef) | ||
} | ||
|
||
func (ef *filters) addFilter(key, value string, must bool) error { | ||
valueGetter, ok := supportedGetters[strings.ToLower(key)] | ||
if !ok { | ||
return fmt.Errorf("invalid filter key %q: %s", key, errInvalidFilterKey) | ||
} | ||
|
||
*ef = append(*ef, func(msg *events.Envelope) bool { | ||
return must == strings.Contains(valueGetter(msg), value) | ||
}) | ||
|
||
return nil | ||
} | ||
|
||
func New(filterList string) (Filters, error) { | ||
f := &filters{} | ||
|
||
filters, err := parseFilterConfig(filterList) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for _, filter := range filters { | ||
if err := f.addFilter(filter.key, filter.value, filter.must); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return f, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package eventfilter_test | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" | ||
"github.com/cloudfoundry/sonde-go/events" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/ginkgo/extensions/table" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
var _ = Describe("Rule parsing", func() { | ||
testError := func(filterConf string, errorMsg string) { | ||
filters, err := eventfilter.New(filterConf) | ||
Expect(filters).To(BeNil()) | ||
Expect(err).To(MatchError(ContainSubstring(errorMsg))) | ||
} | ||
DescribeTable("throws error", testError, | ||
Entry("not enough fields", ":", "format must be"), | ||
Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"), | ||
Entry("invalid value", "xxx::", "filter value must not be empty"), | ||
Entry("invalid filter", "xxx:yyy:zzz", "needs to be either"), | ||
Entry("invalid field", "must:notValid:zzz", "invalid filter key \"notValid\""), | ||
) | ||
|
||
testOk := func(filterConf string, length int) { | ||
filters, err := eventfilter.New(filterConf) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(filters).NotTo(BeNil(), "filters have not been initialized") | ||
Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length) | ||
} | ||
DescribeTable("parses ok", testOk, | ||
Entry("no filters at all", "", 0), | ||
Entry("multiple empty rules", ";;;;", 0), | ||
Entry("filtering on deployment", "must:deployment:some deployment", 1), | ||
Entry("accepts whitespace between rules", " must:deployment:something ; must:origin:someOrigin ", 2), | ||
Entry("accepts whitespace in filter", " must :deployment:something", 1), | ||
|
||
Entry("inclusion filter on deployment", "must:Deployment:something", 1), | ||
Entry("inclusion filter on origin", "must:origin:something", 1), | ||
Entry("inclusion filter on job", "must:job:something", 1), | ||
|
||
Entry("exclusion filter on deployment", "mustNot:Deployment:something", 1), | ||
Entry("exclusion filter on origin", "mustNot:origin:something", 1), | ||
Entry("exclusion filter on job", "mustNot:job:something", 1), | ||
) | ||
}) | ||
|
||
var _ = Describe("Filtering", func() { | ||
msg := &events.Envelope{ | ||
Deployment: p("p-healthwatch2-123123123"), | ||
Origin: p("some origin"), | ||
Job: p("some job"), | ||
} | ||
|
||
test := func(filterConf string, expected bool) { | ||
filters, err := eventfilter.New(filterConf) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(filters.Accepts(msg)). | ||
To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded")) | ||
Expect(filters).NotTo(BeNil(), "filters have not been initialized") | ||
} | ||
|
||
DescribeTable("on", test, | ||
Entry("empty filter conf should accept", "", true), | ||
Entry("matching inclusion filter should accept", "must:deployment:healthwatch2", true), | ||
Entry("non-matching inclusion filter should discard", "must:deployment:something", false), | ||
Entry("matching exclusion filter should discard", "mustNot:deployment:healthwatch2", false), | ||
Entry("2nd exclusion filter should discard", "must:deployment:health ; mustNot:deployment:watch", false), | ||
Entry("3rd exclusion filter should discard", | ||
"must:deployment:health ; mustNot:job:other job ; mustNot:deployment:watch", | ||
false, | ||
), | ||
Entry("many matching inclusion filters should accept", | ||
"must:deployment:h ; must:deployment:e ; must:deployment:a ; must:deployment:l ; must:deployment:t ; must:deployment:h", | ||
true, | ||
), | ||
Entry("many non-matching exclusion filters should accept", | ||
"mustNot:deployment:x ; mustNot:deployment:y ; mustNot:deployment:z ; mustNot:deployment:u ; mustNot:deployment:b ; mustNot:deployment:r", | ||
true, | ||
), | ||
) | ||
}) | ||
|
||
var _ = Describe("filter keys", func() { | ||
entries := []TableEntry{} | ||
for _, key := range eventfilter.SupportedFilterKeys { | ||
entries = append(entries, Entry(fmt.Sprintf("key %q", key), key)) | ||
} | ||
|
||
DescribeTable("support", func(key string) { | ||
_, err := eventfilter.New("must:" + key + ":someValue") | ||
Expect(err).NotTo(HaveOccurred()) | ||
}, entries...) | ||
}) | ||
|
||
func p(s string) *string { return &s } | ||
|
||
func tern(b bool, t string, f string) string { | ||
if b { | ||
return t | ||
} | ||
|
||
return f | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.