From 0ecf19188d50f6326d8046b627993e27269e714f Mon Sep 17 00:00:00 2001 From: Paul Stuart Date: Mon, 11 Apr 2016 10:30:41 -0700 Subject: [PATCH 1/3] cache compiled regexps, add ParseStream as batch handler --- grok.go | 73 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/grok.go b/grok.go index 2fa659b..01a1cdb 100644 --- a/grok.go +++ b/grok.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "os" "path/filepath" "regexp" @@ -12,6 +13,11 @@ import ( "sync" ) +var ( + defn = regexp.MustCompile(`%{(\w+):?(\w+)?:?(\w+)?}`) + normal = regexp.MustCompile(`%{(\w+:?\w+:?\w+)}`) +) + // A Config structure is used to configure a Grok parser. type Config struct { NamedCapturesOnly bool @@ -72,8 +78,7 @@ func NewWithConfig(config *Config) (*Grok, error) { } - err := g.AddPatternsFromMap(config.Patterns) - if err != nil { + if err := g.AddPatternsFromMap(config.Patterns); err != nil { return nil, err } @@ -94,9 +99,9 @@ func (g *Grok) addPattern(name, pattern string) error { // AddPattern adds a named pattern to grok func (g *Grok) AddPattern(name, pattern string) error { g.serviceMu.Lock() - defer g.serviceMu.Unlock() g.rawPattern[name] = pattern g.buildPatterns() + g.serviceMu.Unlock() return nil } @@ -114,14 +119,11 @@ func (g *Grok) AddPatternsFromMap(m map[string]string) error { // AddPatternsFromMap adds new patterns from the specified map to the list of // loaded patterns. func (g *Grok) addPatternsFromMap(m map[string]string) error { - re, _ := regexp.Compile(`%{(\w+):?(\w+)?:?(\w+)?}`) - patternDeps := graph{} for k, v := range m { keys := []string{} - for _, key := range re.FindAllStringSubmatch(v, -1) { + for _, key := range defn.FindAllStringSubmatch(v, -1) { if g.patterns[key[1]] == nil { - if _, ok := m[key[1]]; !ok { return fmt.Errorf("no pattern found for %%{%s}", key[1]) } @@ -130,7 +132,6 @@ func (g *Grok) addPatternsFromMap(m map[string]string) error { } patternDeps[k] = keys } - // pp.Print(patternDeps) order, _ := sortGraph(patternDeps) for _, key := range reverseList(order) { g.addPattern(key, m[key]) @@ -191,19 +192,13 @@ func (g *Grok) Match(pattern, text string) (bool, error) { return true, nil } -// Parse the specified text and return a map with the results. -func (g *Grok) Parse(pattern, text string) (map[string]string, error) { - gr, err := g.compile(pattern) - if err != nil { - return nil, err - } - +// CompiledParse parses the specified text and returns a map with the results. +func (g *Grok) CompiledParse(gr *gRegexp, text string) (map[string]string, error) { captures := make(map[string]string) if match := gr.regexp.FindStringSubmatch(text); len(match) > 0 { for i, name := range gr.regexp.SubexpNames() { - if name != "" { - if g.config.RemoveEmptyValues == true && match[i] == "" { + if g.config.RemoveEmptyValues && match[i] == "" { continue } captures[name] = match[i] @@ -215,6 +210,16 @@ func (g *Grok) Parse(pattern, text string) (map[string]string, error) { return captures, nil } +// Parse the specified text and return a map with the results. +func (g *Grok) Parse(pattern, text string) (map[string]string, error) { + gr, err := g.compile(pattern) + if err != nil { + return nil, err + } + + return g.CompiledParse(gr, text) +} + // ParseTyped returns a inteface{} map with typed captured fields based on provided pattern over the text func (g *Grok) ParseTyped(pattern string, text string) (map[string]interface{}, error) { gr, err := g.compile(pattern) @@ -279,6 +284,9 @@ func (g *Grok) buildPatterns() error { } func (g *Grok) compile(pattern string) (*gRegexp, error) { + if gr, ok := g.compiledPatterns[pattern]; ok { + return gr, nil + } newPattern, ti, err := g.denormalizePattern(pattern, g.patterns) if err != nil { return nil, err @@ -289,13 +297,13 @@ func (g *Grok) compile(pattern string) (*gRegexp, error) { return nil, err } gr := &gRegexp{regexp: compiledRegex, typeInfo: ti} + g.compiledPatterns[pattern] = gr return gr, nil } func (g *Grok) denormalizePattern(pattern string, storedPatterns map[string]*gPattern) (string, semanticTypes, error) { - r, _ := regexp.Compile(`%{(\w+:?\w+:?\w+)}`) ti := semanticTypes{} - for _, values := range r.FindAllStringSubmatch(pattern, -1) { + for _, values := range normal.FindAllStringSubmatch(pattern, -1) { names := strings.Split(values[1], ":") syntax, semantic := names[0], names[0] @@ -340,4 +348,31 @@ func (g *Grok) denormalizePattern(pattern string, storedPatterns map[string]*gPa } return pattern, ti, nil + +} + +// ParseStream will match the given pattern on a line by line basis from the reader +// and apply the results to the process function +func (g *Grok) ParseStream(reader *bufio.Reader, pattern string, process func(map[string]string) error) error { + gr, err := g.compile(pattern) + if err != nil { + return err + } + for { + line, err := reader.ReadString('\n') + fmt.Println("LINE:", line) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + values, err := g.CompiledParse(gr, line) + if err != nil { + return err + } + if err = process(values); err != nil { + return err + } + } } From 2badf976a3ea5f37d8de03b37cc1a2882f29d782 Mon Sep 17 00:00:00 2001 From: Paul Stuart Date: Thu, 21 Apr 2016 20:34:23 -0700 Subject: [PATCH 2/3] make [C|c]ompiledParse private, fix data race, update TZ to include 'GMT' --- grok.go | 16 +++++++++++----- grok_test.go | 6 ++++-- patterns/grok-patterns | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/grok.go b/grok.go index 01a1cdb..ce3359b 100644 --- a/grok.go +++ b/grok.go @@ -192,9 +192,10 @@ func (g *Grok) Match(pattern, text string) (bool, error) { return true, nil } -// CompiledParse parses the specified text and returns a map with the results. -func (g *Grok) CompiledParse(gr *gRegexp, text string) (map[string]string, error) { +// compiledParse parses the specified text and returns a map with the results. +func (g *Grok) compiledParse(gr *gRegexp, text string) (map[string]string, error) { captures := make(map[string]string) + g.serviceMu.Lock() if match := gr.regexp.FindStringSubmatch(text); len(match) > 0 { for i, name := range gr.regexp.SubexpNames() { if name != "" { @@ -206,6 +207,7 @@ func (g *Grok) CompiledParse(gr *gRegexp, text string) (map[string]string, error } } } + g.serviceMu.Unlock() return captures, nil } @@ -217,7 +219,7 @@ func (g *Grok) Parse(pattern, text string) (map[string]string, error) { return nil, err } - return g.CompiledParse(gr, text) + return g.compiledParse(gr, text) } // ParseTyped returns a inteface{} map with typed captured fields based on provided pattern over the text @@ -284,6 +286,11 @@ func (g *Grok) buildPatterns() error { } func (g *Grok) compile(pattern string) (*gRegexp, error) { + g.serviceMu.Lock() + defer g.serviceMu.Unlock() + if g.compiledPatterns == nil { + g.compiledPatterns = map[string]*gRegexp{} + } if gr, ok := g.compiledPatterns[pattern]; ok { return gr, nil } @@ -360,14 +367,13 @@ func (g *Grok) ParseStream(reader *bufio.Reader, pattern string, process func(ma } for { line, err := reader.ReadString('\n') - fmt.Println("LINE:", line) if err == io.EOF { return nil } if err != nil { return err } - values, err := g.CompiledParse(gr, line) + values, err := g.compiledParse(gr, line) if err != nil { return err } diff --git a/grok_test.go b/grok_test.go index d527c3f..3144465 100644 --- a/grok_test.go +++ b/grok_test.go @@ -84,8 +84,10 @@ func TestAddPatternsFromPathErr(t *testing.T) { } func TestConfigPatternsDir(t *testing.T) { - g, _ := NewWithConfig(&Config{PatternsDir: []string{"./patterns"}}) - // g,_ := New() + g, err := NewWithConfig(&Config{PatternsDir: []string{"./patterns"}}) + if err != nil { + t.Error(err) + } if captures, err := g.Parse("%{SYSLOGLINE}", `Sep 12 23:19:02 docker syslog-ng[25389]: syslog-ng starting up; version='3.5.3'`); err != nil { t.Fatalf("error : %s", err.Error()) diff --git a/patterns/grok-patterns b/patterns/grok-patterns index 12e6fea..032315e 100644 --- a/patterns/grok-patterns +++ b/patterns/grok-patterns @@ -75,7 +75,7 @@ ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} -TZ (?:[PMCE][SD]T|UTC) +TZ (?:[PMCE][SD]T|UTC|GMT) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} From e8d8c436e4ae35bdb7f40c389a4943a387ebd840 Mon Sep 17 00:00:00 2001 From: Paul Stuart Date: Fri, 22 Apr 2016 08:54:07 -0700 Subject: [PATCH 3/3] added test for ParseStream --- grok_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/grok_test.go b/grok_test.go index 3144465..4f8ee17 100644 --- a/grok_test.go +++ b/grok_test.go @@ -1,6 +1,11 @@ package grok -import "testing" +import ( + "bufio" + "fmt" + "strings" + "testing" +) func TestNew(t *testing.T) { g, _ := New() @@ -659,3 +664,45 @@ func TestGrok_AddPatternsFromMap_complex(t *testing.T) { t.Errorf("bad match: expected 333666, got %s", mss["match"]) } } + +func TestParseStream(t *testing.T) { + g, _ := New() + pTest := func(m map[string]string) error { + ts, ok := m["timestamp"] + if !ok { + t.Error("timestamp not found") + } + if len(ts) == 0 { + t.Error("empty timestamp") + } + return nil + } + const testLog = `127.0.0.1 - - [23/Apr/2014:22:58:32 +0200] "GET /index.php HTTP/1.1" 404 207 +127.0.0.1 - - [23/Apr/2014:22:59:32 +0200] "GET /index.php HTTP/1.1" 404 207 +127.0.0.1 - - [23/Apr/2014:23:00:32 +0200] "GET /index.php HTTP/1.1" 404 207 +` + + r := bufio.NewReader(strings.NewReader(testLog)) + if err := g.ParseStream(r, "%{COMMONAPACHELOG}", pTest); err != nil { + t.Fatal(err) + } +} + +func TestParseStreamError(t *testing.T) { + g, _ := New() + pTest := func(m map[string]string) error { + if _, ok := m["timestamp"]; !ok { + return fmt.Errorf("timestamp not found") + } + return nil + } + const testLog = `127.0.0.1 - - [23/Apr/2014:22:58:32 +0200] "GET /index.php HTTP/1.1" 404 207 +127.0.0.1 - - [xxxxxxxxxxxxxxxxxxxx +0200] "GET /index.php HTTP/1.1" 404 207 +127.0.0.1 - - [23/Apr/2014:23:00:32 +0200] "GET /index.php HTTP/1.1" 404 207 +` + + r := bufio.NewReader(strings.NewReader(testLog)) + if err := g.ParseStream(r, "%{COMMONAPACHELOG}", pTest); err == nil { + t.Fatal("Error expected") + } +}