forked from bitdabbler/fluent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathencoder.go
162 lines (134 loc) · 3.81 KB
/
encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package fluent
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/vmihailenco/msgpack/v5"
)
// Pool defines a shared *Encoder pool, used to minimize heap allocations.
type EncoderPool struct {
p sync.Pool
*EncoderOptions
prelude []byte
}
// NewEncoderPool creates a shared *Encoder pool that returns Encoders with the
// log prelude, including the outer msgpack array and the tag, pre-encoded.
func NewEncoderPool(tag string, opts *EncoderOptions) (*EncoderPool, error) {
if opts == nil {
opts = DefaultEncoderOptions()
} else {
opts.resolve()
}
p := &EncoderPool{EncoderOptions: opts}
// pre-encode the msgpack array header and the tag
e := NewEncoder(opts.NewBufferCap)
var arrayLen int
switch opts.Mode {
case MessageMode:
arrayLen = 3
if opts.RequestACKs {
arrayLen++
}
case ForwardMode:
arrayLen = 2
if opts.RequestACKs {
arrayLen++
}
case PackedForwardMode, CompressedPackedForwardMode:
arrayLen = 3
}
if err := e.EncodeArrayLen(arrayLen); err != nil {
return nil, fmt.Errorf("failed to encode NewEncoder array len: %w", err)
}
if err := e.EncodeString(tag); err != nil {
InternalLogger().Printf("failed to encode NewEncoder tag: %v", err)
}
p.prelude = e.Bytes()
p.p = sync.Pool{
New: func() any {
enc := NewEncoder(opts.NewBufferCap)
// copy in the prelude and point to pool (w/ encoding options)
enc.Write(p.prelude)
enc.p = p
return enc
},
}
return p, nil
}
// Get returns an Encoder with the prelude pre-rendered.
func (p *EncoderPool) Get() *Encoder {
e := p.p.Get().(*Encoder)
return e
}
// Put resets an Encoder and returns it to the shared pool.
func (p *EncoderPool) Put(e *Encoder) {
// drop if the buffer got too large
if e.Buffer.Cap() > p.MaxBufferCap {
return
}
// reset for the next usage
e.Buffer.Truncate(len(p.prelude))
e.Encoder.Reset(e.Buffer)
// add back to the sync.Pool
p.p.Put(e)
}
// Encoder provides a mspgack encoder and its underlying bytes.Buffer.
type Encoder struct {
*bytes.Buffer
*msgpack.Encoder
p *EncoderPool
}
// NewEncoder returns a newly allocated Encoder.
func NewEncoder(bufferCap int) *Encoder {
buf := bytes.NewBuffer(make([]byte, 0, bufferCap))
return &Encoder{
Buffer: buf,
Encoder: msgpack.NewEncoder(buf),
}
}
// Free returns the encoder to the shared pool after eagerly resetting it.
func (e *Encoder) Free() {
e.p.Put(e)
}
// DeepCopy returns a deep copy of the Encoder.
func (e *Encoder) DeepCopy() *Encoder {
// pooled Encoder with pre-rendered prelude
if e.p != nil {
e2 := e.p.Get()
if e.Cap() > e2.Cap() {
e2.Grow(e.Cap())
}
e2.Write(e.Bytes()[len(e.p.prelude):])
return e2
}
// raw Encoder with no prelude
e2 := NewEncoder(e.Cap())
e2.Write(e.Bytes())
return e2
}
// EncodeTimestamp is a helper that by default encodes a time value as a custom
// msgpack type defined by Fluent (EventTime). If the Encoder is set to use
// coarse timestamps, then it encodes the time value as 64-bit integer
// representing Unix epoch.
func (e *Encoder) EncodeEventTime(utc time.Time) error {
// no timezone support in Fluent spec; ensure time is in UTC
utc = utc.In(time.UTC)
if e.p.UseCoarseTimestamps {
if err := e.EncodeInt64(utc.Unix()); err != nil {
return fmt.Errorf("failed to encode timestamp as int64: %w", err)
}
return nil
}
t := EventTime(utc)
if err := e.Encode(&t); err != nil {
return fmt.Errorf("failed to encode timestamp as EventTime: %w", err)
}
return nil
}
// Mode returns the Fluent event mode of the Encoder.
func (e *Encoder) Mode() eventMode { return e.p.Mode }
// UseCoarseTimestamps controls whether legacy (unix epoch) timestamps are used.
func (e *Encoder) UseCoarseTimestamps() bool { return e.p.UseCoarseTimestamps }
// RequestACK controls whether explicit ACKS are requested from the server.
func (e *Encoder) RequestACK() bool { return e.p.RequestACKs }