forked from bshuster-repo/logrus-logstash-hook
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hook.go
319 lines (272 loc) · 8.07 KB
/
hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
package logrustash
import (
"fmt"
"io"
"net"
"os"
"runtime"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
)
const (
defaultLogrusEntryFireChannelBufferSize = 8192
)
type ContextKey string
const (
ContextKeyRuntimeCaller ContextKey = "context.key.runtime.caller"
)
// Hook represents a Logstash hook.
// It has two fields: writer to write the entry to Logstash and
// formatter to format the entry to a Logstash format before sending.
//
// To initialize it use the `New` function.
type Hook struct {
sync.RWMutex
conn io.Writer
protocol string
addr string
logrusEntryFireChannel chan *logrus.Entry
formatter logrus.Formatter
}
type HookOptions struct {
// KeepAlive enables TCP keepalive.
KeepAlive bool
// KeepAlivePeriod sets the TCP keepalive period.
KeepAlivePeriod time.Duration
// FireChannelBufferSize sets the size of the logrus entry fire channel.
FireChannelBufferSize int
}
// GetKeepAlivePeriod returns the keep alive period, defaults to 30 seconds.
func (h HookOptions) GetKeepAlivePeriod() time.Duration {
if h.KeepAlivePeriod > 0 {
return h.KeepAlivePeriod
}
return time.Second * 30
}
// GetFireChannelBufferSize returns the fire channel buffer size, defaults to 8192.
func (h HookOptions) GetFireChannelBufferSize() int {
if h.FireChannelBufferSize > 0 {
return h.FireChannelBufferSize
}
return defaultLogrusEntryFireChannelBufferSize
}
// New returns a new logrus.Hook for Logstash
func New(protocol, addr string, f logrus.Formatter, opts ...HookOptions) (logrus.Hook, error) {
if protocol == "" || addr == "" {
return nil, fmt.Errorf("protocol and addr must be set")
}
// dial the connection
conn, err := net.Dial(protocol, addr)
if err != nil {
return nil, err
}
h := &Hook{
protocol: protocol,
addr: addr,
conn: conn,
formatter: f,
}
// apply options
if len(opts) > 0 {
opt := opts[0]
// apply keep alive options
if opt.KeepAlive {
if c, ok := conn.(*net.TCPConn); ok && c != nil {
err = c.SetKeepAlive(true)
if err != nil {
return nil, err
}
err = c.SetKeepAlivePeriod(opt.GetKeepAlivePeriod())
if err != nil {
return nil, err
}
}
}
// apply fire channel buffer size
h.logrusEntryFireChannel = make(chan *logrus.Entry, opt.GetFireChannelBufferSize())
}
// if fire channel is not set, create a default one
if h.logrusEntryFireChannel == nil {
h.logrusEntryFireChannel = make(chan *logrus.Entry, defaultLogrusEntryFireChannelBufferSize)
}
// split a goroutine to handle logrus entry fire channel
go func() {
// defer recover
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "panic in logrus entry fire channel: %v\n", r)
debug.PrintStack()
}
}()
// handle logrus entry fire channel
for e := range h.logrusEntryFireChannel {
if err := h.fire(e); err != nil {
fmt.Fprintf(os.Stderr, "failed to send log to logstash, error: %v\n", err)
}
}
}()
return h, nil
}
// reconnect reconnects to the logstash server.
func (h *Hook) reconnect() {
fmt.Fprintln(os.Stderr, "failed to send log entry to logstash, reconnecting...")
// Sleep before reconnect.
_, _, _ = lo.AttemptWithDelay(0, time.Second*5, func(index int, duration time.Duration) error {
conn, err := net.Dial(h.protocol, h.addr)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to reconnect to logstash, error: %s (current attempt %d)\n", err, index+1)
return err
}
h.Lock()
h.conn = conn
h.Unlock()
return nil
})
}
// processSendError processes the error returned by the send function.
func (h *Hook) processSendError(err error, data []byte) error {
netErr, ok := err.(net.Error)
if !ok {
// return if its not net.Error
return err
}
// if its a timeout error, try to resend the data
if netErr.Timeout() {
fmt.Fprintf(os.Stderr, "failed to send log entry to logstash, error: %s, resending...\n", err)
return h.send(data)
}
// otherwise reconnect and try to resend the data
h.reconnect()
return h.send(data)
}
// send sends the data to the logstash server.
func (h *Hook) send(data []byte) error {
h.Lock()
_, err := h.conn.Write(data)
h.Unlock()
if err != nil {
return h.processSendError(err, data)
}
return nil
}
// fire wraps the fire function to handle the logrus entry fire channel.
func (h *Hook) fire(e *logrus.Entry) error {
dataBytes, err := h.formatter.Format(e)
if err != nil {
return err
}
err = h.send(dataBytes)
return err
}
// Fire takes, formats and sends the entry to Logstash.
// Hook's formatter is used to format the entry into Logstash format
// and Hook's writer is used to write the formatted entry to the Logstash instance.
func (h *Hook) Fire(e *logrus.Entry) error {
if h.logrusEntryFireChannel != nil {
h.logrusEntryFireChannel <- e
return nil
} else {
fmt.Fprintln(os.Stderr, "logrus entry fire channel is not initialized or closed")
}
return h.fire(e)
}
// Levels returns all logrus levels.
func (h *Hook) Levels() []logrus.Level {
return logrus.AllLevels
}
// Using a pool to re-use of old entries when formatting Logstash messages.
// It is used in the Fire function.
var entryPool = sync.Pool{
New: func() interface{} {
return &logrus.Entry{}
},
}
// copyEntry copies the entry `e` to a new entry and then adds all the fields in `fields` that are missing in the new entry data.
// It uses `entryPool` to re-use allocated entries.
func copyEntry(e *logrus.Entry, fields logrus.Fields) *logrus.Entry {
ne := entryPool.Get().(*logrus.Entry)
ne.Message = e.Message
ne.Level = e.Level
ne.Time = e.Time
ne.Data = logrus.Fields{}
if e.Logger.ReportCaller && e.Context != nil {
caller, _ := e.Context.Value(ContextKeyRuntimeCaller).(*runtime.Frame)
if caller != nil {
ne.Data["function"] = caller.Function
ne.Data["file"] = fmt.Sprintf("%s:%d", caller.File, caller.Line)
}
}
if e.Logger.ReportCaller && e.Data["file"] != nil {
ne.Data["file"] = e.Data["file"]
delete(e.Data, "file")
}
if e.Logger.ReportCaller && e.Data["function"] != nil {
ne.Data["function"] = e.Data["function"]
delete(e.Data, "function")
}
if len(e.Data) > 0 {
fieldsStrings := make([]string, 0)
for k, v := range e.Data {
fieldsStrings = append(fieldsStrings, fmt.Sprintf("%s=%v", k, v))
delete(e.Data, k)
}
ne.Data["fields"] = strings.Join(fieldsStrings, " ")
}
for k, v := range fields {
ne.Data[k] = v
}
return ne
}
// releaseEntry puts the given entry back to `entryPool`. It must be called if copyEntry is called.
func releaseEntry(e *logrus.Entry) {
entryPool.Put(e)
}
// LogstashFormatter represents a Logstash format.
// It has logrus.Formatter which formats the entry and logrus.Fields which
// are added to the JSON message if not given in the entry data.
//
// Note: use the `DefaultFormatter` function to set a default Logstash formatter.
type LogstashFormatter struct {
logrus.Formatter
logrus.Fields
}
var (
logstashFields = logrus.Fields{"@version": "1", "type": "log"}
logstashFieldMap = logrus.FieldMap{
logrus.FieldKeyTime: "@timestamp",
logrus.FieldKeyMsg: "message",
}
)
// DefaultFormatter returns a default Logstash formatter:
// A JSON format with "@version" set to "1" (unless set differently in `fields`,
// "type" to "log" (unless set differently in `fields`),
// "@timestamp" to the log time and "message" to the log message.
//
// Note: to set a different configuration use the `LogstashFormatter` structure.
func DefaultFormatter(fields logrus.Fields) logrus.Formatter {
for k, v := range logstashFields {
if _, ok := fields[k]; !ok {
fields[k] = v
}
}
return LogstashFormatter{
Formatter: &logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
FieldMap: logstashFieldMap,
},
Fields: fields,
}
}
// Format formats an entry to a Logstash format according to the given Formatter and Fields.
//
// Note: the given entry is copied and not changed during the formatting process.
func (f LogstashFormatter) Format(e *logrus.Entry) ([]byte, error) {
ne := copyEntry(e, f.Fields)
dataBytes, err := f.Formatter.Format(ne)
releaseEntry(ne)
return dataBytes, err
}