Skip to content

Commit

Permalink
feat: (WIP) lp batching - additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Oct 29, 2024
1 parent 5824190 commit b7183a3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 33 deletions.
2 changes: 0 additions & 2 deletions influxdb3/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package batching

import (
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -178,7 +177,6 @@ func TestAddLargerThanSize(t *testing.T) {
expectedCt := len(pointSet) / batchSize
assert.Equal(t, expectedCt, emitCt)
assert.Equal(t, loadFactor*batchSize, len(resultSet))
fmt.Printf("DEBUG resultSet %d\n", len(resultSet))
assert.Equal(t, remainder, len(b.points))
assert.Equal(t, pointSet[:len(pointSet)-remainder], resultSet)
}
Expand Down
16 changes: 9 additions & 7 deletions influxdb3/batching/lp_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ func (l *LPBatcher) Add(lines ...string) {
}
}

//fmt.Printf("DEBUG after load l.buffer #%s#\n", string(l.buffer))

for l.isReady() {
if l.callbackReady != nil {
l.callbackReady()
}
if l.callbackEmit != nil {
// fmt.Printf("DEBUG calling emit function\n")
l.callbackEmit(l.emitBytes())
//fmt.Printf("DEBUG l.buffer #%s#\n", string(l.buffer))
} else {
// no emitter callback
if l.CurrentLoadSize() > (l.capacity - l.size) {
Expand Down Expand Up @@ -106,17 +110,15 @@ func (l *LPBatcher) Emit() []byte {
func (l *LPBatcher) emitBytes() []byte {
c := min(l.size, len(l.buffer))

if c == 0 { // i.e. buffer is empty
return l.buffer
}

prepacket := l.buffer[:c]
lastLF := bytes.LastIndexByte(prepacket, '\n')
lastLF := bytes.LastIndexByte(prepacket, '\n') + 1

if len(prepacket) < 1 || lastLF < 0 {
return prepacket
}
packet := l.buffer[:lastLF]
l.buffer = l.buffer[len(packet):]
if len(l.buffer) == 1 && l.buffer[0] == '\n' { // removing lingering delimiter
l.buffer = l.buffer[1:]
}

return packet
}
Expand Down
83 changes: 59 additions & 24 deletions influxdb3/batching/lp_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func TestLPBatcherCreate(t *testing.T) {
}),
)

fmt.Printf("DEBUG l: %v\n", l)

assert.Equal(t, size, l.size)
assert.Equal(t, capacity, l.capacity)
assert.False(t, emitted)
Expand Down Expand Up @@ -84,6 +82,46 @@ func TestLPReadyCallback(t *testing.T) {
assert.True(t, readyCalled)
}

func TestEmitEmptyBatcher(t *testing.T) {
size := 256
capacity := size * 2

lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))

results := lpb.Emit()

assert.Equal(t, 0, len(results))
}

func TestAddLineAppendsLF(t *testing.T) {
size := 256
capacity := size * 2

lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lines := []string{
"cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i",
"cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i",
"cpu,location=titan,id=HAL69 fVal=1.41,iVal=7i",
}
lpb.Add(lines...)
results := lpb.Emit()
assert.Equal(t, []byte(strings.Join(lines, "\n")+"\n"), results)
}

func TestAddLineAppendsNoLFWhenPresent(t *testing.T) {
size := 256
capacity := size * 2
lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lines := []string{
"cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i\n",
"cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i\n",
"cpu,location=titan,id=HAL69 fVal=1.41,iVal=7i\n",
}
lpb.Add(lines...)
results := lpb.Emit()
assert.Equal(t, []byte(strings.Join(lines, "")), results)
}

func TestLPAddAndPartialEmit(t *testing.T) {
size := 500
capacity := size * 2
Expand All @@ -98,7 +136,8 @@ func TestLPAddAndPartialEmit(t *testing.T) {
lineByteCt += len([]byte(lines[n])) + 1
}

verif := strings.Join(lines, "\n")
verify := strings.Join(lines, "\n")
verify += "\n"

lpb := NewLPBatcher(
WithSize(size),
Expand All @@ -113,7 +152,7 @@ func TestLPAddAndPartialEmit(t *testing.T) {

packet := lpb.Emit()

assert.Equal(t, verif, string(packet))
assert.Equal(t, verify, string(packet))
assert.Equal(t, 0, lpb.CurrentLoadSize())
assert.Equal(t, 0, emitCount) // callback should not have been called
assert.Equal(t, 0, len(emittedBytes)) // callback should not have been called
Expand Down Expand Up @@ -153,6 +192,7 @@ func TestLPAddAndEmitCallBack(t *testing.T) {
lpb.Add(lps2emit[len(lps2emit)-10:]...)

verify := strings.Join(lps2emit, "\n")
verify += "\n"

assert.False(t, lpb.Ready())

Expand Down Expand Up @@ -209,28 +249,22 @@ func TestLPThreadSafety(t *testing.T) {
}

func TestLPAddLargerThanSize(t *testing.T) {
// TODO review test -- appears Emit called too frequently
// Look for leading '\n' in lp.buffer
size := 64
batchSize := 64
loadFactor := 10
capacity := size * loadFactor
capacity := batchSize * loadFactor
remainder := 3
testString := "123456789ABCDEF\n"
stringSet := make([]string, ((size/len(testString))*loadFactor)+remainder)
stringSetByteCt := 0
stringSet := make([]string, ((batchSize/len(testString))*loadFactor)+remainder)
verify := make([]byte, 0)
for ct := range stringSet {
stringSet[ct] = testString
stringSetByteCt += len([]byte(testString))
verify = append(verify, []byte(stringSet[ct])...)
}

fmt.Printf("DEBUG len(stringSet)=%d\n", len(stringSet))
fmt.Printf("DEBUG stringSetByteCount %d\n", stringSetByteCt)
fmt.Printf("DEBUG stringSet: %v\n", stringSet)

emitCt := 0
resultBuffer := make([]byte, 0)
lpb := NewLPBatcher(
WithSize(size),
WithSize(batchSize),
WithCapacity(capacity),
WithEmitBytesCallback(func(ba []byte) {
emitCt++
Expand All @@ -239,13 +273,14 @@ func TestLPAddLargerThanSize(t *testing.T) {

lpb.Add(stringSet...)

results := strings.Split(string(resultBuffer), "\n")
resultsBytes := len(resultBuffer)
fmt.Printf("DEBUG emitCt: %d\n", emitCt)
fmt.Printf("DEBUG resultsBytes: %d\n", resultsBytes)
fmt.Printf("DEBUG len(results): %d\n", len(results))
fmt.Printf("DEBUG results: %s\n", results)
fmt.Printf("DEBUG lpb.CurrentLoadSize: %d\n", lpb.CurrentLoadSize())
fmt.Printf("DEBUG lpb.buffer #%s#\n", string(lpb.buffer))
resultBytes := len(resultBuffer)
assert.Equal(t, len(verify)/batchSize, emitCt, "Emit should be called correct number of times")
assert.Equal(t, batchSize*emitCt, resultBytes,
"ResultBuffer should have size of batchSize * number of emit calls ")
checkBuffer := verify[:batchSize*emitCt]
remainBuffer := verify[batchSize*emitCt:]
assert.Equal(t, checkBuffer, resultBuffer)
assert.Equal(t, len(remainBuffer), lpb.CurrentLoadSize())
assert.Equal(t, remainBuffer, lpb.buffer)

}

0 comments on commit b7183a3

Please sign in to comment.