Skip to content

Commit

Permalink
feat: (WIP) better handling of []byte in LPBatcher.Emit()
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Oct 23, 2024
1 parent c696b11 commit 45a6b20
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
15 changes: 12 additions & 3 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ THE SOFTWARE.
package batching

import (
"bytes"
"sync"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
Expand Down Expand Up @@ -251,14 +252,22 @@ func (b *LPBatcher) Emit() []byte {
b.Lock()
defer b.Unlock()

return b.emitBytes()
packet := b.emitBytes()

if b.callbackEmit != nil {
b.callbackEmit(packet)
}

return packet
}

func (l *LPBatcher) emitBytes() []byte {
c := min(l.size, len(l.buffer))

packet := l.buffer[:c]
l.buffer = l.buffer[c:]
prepacket := l.buffer[:c]
lastLF := bytes.LastIndexByte(prepacket, '\n')
packet := l.buffer[:lastLF]
l.buffer = l.buffer[len(packet):]

return packet
}
Expand Down
44 changes: 22 additions & 22 deletions influxdb3/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ package batching

import (
"fmt"
"math"
"reflect"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -81,44 +83,42 @@ func TestAddAndDefaultEmitPointDefault(t *testing.T) {
func TestAddAndEmitLineProtocolDefault(t *testing.T) {
batchSize := 1000 // Bytes
capacity := 10000 // Bytes
var emitted bool
var emittedBytes []byte
emitCount := 0
emittedBytes := make([]byte, 0)

lps2emit := make([]string, batchSize)
lps2emit := make([]string, 100)

b := NewLPBatcher(
WithBufferSize(batchSize),
WithBufferCapacity(capacity),
WithEmitBytesCallback(func(b []byte) {
emitted = true
emittedBytes = b
emitCount++
emittedBytes = append(emittedBytes, b...)
}))
fmt.Printf("\nDEBUG b: %+v", b)

for n := range 10 {
for n := range lps2emit {
lps2emit[n] = fmt.Sprintf("lptest,foo=bar count=%di", n+1)
}

b.Add(lps2emit...)

//fmt.Printf("\nDEBUG Inspect b.buffer %s", string(b.buffer))
fmt.Printf("\nDEBUG b.Ready %t", b.Ready())
for i, _ := range lps2emit {
if i > 0 && i%10 == 0 {
set := lps2emit[i-10 : i]
b.Add(set...)
}
}
// add lingering set
b.Add(lps2emit[len(lps2emit)-10:]...)

packet := b.Emit()
fmt.Printf("\nDEBUG Inspect packet %s", string(packet))
verify := strings.Join(lps2emit, "\n")

fmt.Printf("\nDEBUG Inspect emitted %t", emitted)
fmt.Printf("\nDEBUG Inspect emittedBytes %s\n", string(emittedBytes))
assert.False(t, b.Ready())

/*err := b.AddLP(lps2emit...)
if err != nil {
fmt.Println(err)
}
fmt.Printf("\nDEBUG b: %+v", b)
result := b.Emit()
fmt.Printf("\nDEBUG result: %+v", result)
fmt.Printf("\nDEBUG result[0]: %v\n", *result[0]) */
_ = b.Emit() // flush any leftovers - to be collected in callback above

expectCall := math.Ceil(float64(len(emittedBytes)) / float64(batchSize))
assert.Equal(t, int(expectCall), emitCount)
assert.Equal(t, verify, string(emittedBytes))
}

func TestAddAndCallBackEmitPoint(t *testing.T) {
Expand Down

0 comments on commit 45a6b20

Please sign in to comment.