forked from honeycombio/honeytail
-
Notifications
You must be signed in to change notification settings - Fork 0
/
syslog.go
171 lines (150 loc) · 4.54 KB
/
syslog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package syslog
import (
"fmt"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/honeycombio/honeytail/event"
"github.com/honeycombio/honeytail/parsers"
"github.com/jeromer/syslogparser"
"github.com/jeromer/syslogparser/rfc3164"
"github.com/jeromer/syslogparser/rfc5424"
)
// Options defines the options relevant to the syslog parser
type Options struct {
Mode string `long:"mode" description:"Syslog mode. Supported values are rfc3164 and rfc5424"`
ProcessList string `long:"processes" description:"comma separated list of processes to filter for. example: 'sshd,sudo' - by default all are consumed"`
NumParsers int `hidden:"true" description:"number of parsers to spin up"`
}
// Parser implements the Parser interface
type Parser struct {
conf Options
lineParser parsers.LineParser
}
// Init constructs our parser from the provided options
func (p *Parser) Init(options interface{}) error {
p.conf = *options.(*Options)
lineParser, err := NewSyslogLineParser(p.conf.Mode, p.conf.ProcessList)
if err != nil {
return err
}
p.lineParser = lineParser
return nil
}
type SyslogLineParser struct {
mode string
supportedProcesses map[string]struct{}
}
func normalizeLogFields(fields map[string]interface{}) {
// The RFC3164 and RFC5424 parsers use different fields to refer to the
// process - normalize to "process" for consistency and clarity
// RFC3164
if process, ok := fields["tag"].(string); ok {
fields["process"] = process
delete(fields, "tag")
}
// RFC5424
if process, ok := fields["app_name"].(string); ok {
fields["process"] = process
delete(fields, "app_name")
}
// clean up whitespace in the message
if message, ok := fields["message"].(string); ok {
fields["message"] = strings.TrimSpace(message)
}
}
// NewSyslogLineParser factory
func NewSyslogLineParser(mode string, processList string) (*SyslogLineParser, error) {
var supportedProcesses map[string]struct{}
// if a list of process
if processList != "" {
supportedProcesses = make(map[string]struct{})
for _, process := range strings.Split(processList, ",") {
supportedProcesses[strings.TrimSpace(process)] = struct{}{}
}
}
if mode == "rfc3164" || mode == "rfc5424" {
return &SyslogLineParser{
mode: mode,
supportedProcesses: supportedProcesses,
}, nil
}
return nil, fmt.Errorf("unsupported mode %s, see --help", mode)
}
func (p *SyslogLineParser) ParseLine(line string) (map[string]interface{}, error) {
var parser syslogparser.LogParser
if p.mode == "rfc3164" {
parser = rfc3164.NewParser([]byte(line))
} else if p.mode == "rfc5424" {
parser = rfc5424.NewParser([]byte(line))
}
if err := parser.Parse(); err != nil {
return nil, err
}
logFields := parser.Dump()
normalizeLogFields(logFields)
// if someone set --processes, this will not be nil
if p.supportedProcesses != nil {
if process, ok := logFields["process"].(string); ok {
// if the process is not in the whitelist, skip it
if _, match := p.supportedProcesses[process]; !match {
return nil, nil
}
}
}
return logFields, nil
}
func (p *Parser) ProcessLines(lines <-chan string, send chan<- event.Event, prefixRegex *parsers.ExtRegexp) {
// parse lines one by one
wg := sync.WaitGroup{}
numParsers := 1
if p.conf.NumParsers > 0 {
numParsers = p.conf.NumParsers
}
for i := 0; i < numParsers; i++ {
wg.Add(1)
go func() {
for line := range lines {
logrus.WithFields(logrus.Fields{
"line": line,
}).Debug("attempting to process line")
// take care of any headers on the line
var prefixFields map[string]string
if prefixRegex != nil {
var prefix string
prefix, prefixFields = prefixRegex.FindStringSubmatchMap(line)
line = strings.TrimPrefix(line, prefix)
}
parsedLine, err := p.lineParser.ParseLine(line)
if parsedLine == nil || err != nil {
continue
}
if len(parsedLine) == 0 {
logrus.WithFields(logrus.Fields{
"line": line,
}).Info("skipping line, no values found")
continue
}
// merge the prefix fields and the parsed line contents
for k, v := range prefixFields {
parsedLine[k] = v
}
// the timestamp should be in the log file if we're following either rfc 3164 or 5424
var timestamp time.Time
if t, ok := parsedLine["timestamp"].(time.Time); ok {
timestamp = t
}
// send an event to Transmission
e := event.Event{
Timestamp: timestamp,
Data: parsedLine,
}
send <- e
}
wg.Done()
}()
}
wg.Wait()
logrus.Debug("lines channel is closed, ending syslog processor")
}