Skip to content

Commit

Permalink
test: cleanup code in new integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Nov 5, 2024
1 parent 269c794 commit 01a7f99
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package influxdb3_test

import (
"context"
"errors"
"fmt"
"log/slog"
"math"
Expand All @@ -47,7 +46,6 @@ import (
)

func SkipCheck(t *testing.T) {

if _, present := os.LookupEnv("TESTING_INFLUXDB_URL"); !present {
t.Skip("TESTING_INFLUXDB_URL not set")
}
Expand Down Expand Up @@ -379,13 +377,13 @@ func PointFromLineProtocol(lp string) (*influxdb3.Point, error) {
fieldLines := strings.Split(groups[1], ",")

if len(head) < 1 {
return nil, errors.New(fmt.Sprintf("invalid line format: %s", lp))
return nil, fmt.Errorf("invalid line format: %s", lp)
}

result := influxdb3.NewPointWithMeasurement(head[0])

if len(fieldLines) < 1 {
return nil, errors.New(fmt.Sprintf("LineProtocol has no fields: %s", lp))
return nil, fmt.Errorf("LineProtocol has no fields: %s", lp)
}

if len(head) > 1 {
Expand All @@ -397,16 +395,17 @@ func PointFromLineProtocol(lp string) (*influxdb3.Point, error) {

for _, fl := range fieldLines {
fkv := strings.Split(fl, "=")
if strings.Contains(fkv[1], "\"") {
switch {
case strings.Contains(fkv[1], "\""):
result.SetStringField(fkv[0], fkv[1])
} else if strings.Contains(fkv[1], "i") {
fkv[1] = strings.Replace(fkv[1], "i", "", -1)
case strings.Contains(fkv[1], "i"):
fkv[1] = strings.ReplaceAll(fkv[1], "i", "")
ival, err := strconv.ParseInt(fkv[1], 10, 64)
if err != nil {
return nil, err
}
result = result.SetField(fkv[0], ival)
} else {
default:
fval, err := strconv.ParseFloat(fkv[1], 64)
if err != nil {
return nil, err
Expand All @@ -418,9 +417,9 @@ func PointFromLineProtocol(lp string) (*influxdb3.Point, error) {
if len(groups[2]) > 0 {
timestamp, err := strconv.ParseInt(groups[2], 10, 64)
nanoFactor := int64(19 - len(groups[2]))
timestamp = timestamp * int64(math.Pow(10.0, float64(nanoFactor)))
timestamp *= int64(math.Pow(10.0, float64(nanoFactor)))
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid time format: %s -> %s", lp, err))
return nil, fmt.Errorf("invalid time format: %s -> %w", lp, err)
}
result = result.SetTimestampWithEpoch(timestamp)
result = result.SetTimestamp(result.Values.Timestamp.UTC())
Expand All @@ -446,15 +445,30 @@ func LooseEqualPointValues(pvA *influxdb3.PointValues, pvB *influxdb3.PointValue
for fieldName := range pvA.Fields {
switch pvA.Fields[fieldName].(type) {
case int, int16, int32, int64:
if pvA.Fields[fieldName].(int64) != pvB.Fields[fieldName].(int64) {
ai, aiok := pvA.Fields[fieldName].(int64)
bi, biok := pvB.Fields[fieldName].(int64)
if !aiok || !biok {
return false
}
if ai != bi {
return false
}
case float32, float64:
if pvA.Fields[fieldName].(float64) != pvB.Fields[fieldName].(float64) {
af, afok := pvA.Fields[fieldName].(float64)
bf, bfok := pvB.Fields[fieldName].(float64)
if !afok || !bfok {
return false
}
if af != bf {
return false
}
default: // compare as strings
as, saok := pvA.Fields[fieldName].(string)
bs, sbok := pvB.Fields[fieldName].(string)
if !saok || !sbok {
return false
}
default: //compare as strings
if pvA.Fields[fieldName].(string) != pvB.Fields[fieldName].(string) {
if as != bs {
return false
}
}
Expand Down Expand Up @@ -498,7 +512,7 @@ func TestLPBatcher(t *testing.T) {
ids[n%len(ids)],
(rnd.Float64()*100)-50.0, n+1, now-int64(n*1000)))
if n%2 == 0 {
lines[n] = lines[n] + "\n" // verify appending LF with every second rec
lines[n] += "\n" // verify appending LF with every second rec
} else {
estBytesCt++ // LPBatcher appends missing "\n" on odd cases so increase estimate
}
Expand Down Expand Up @@ -530,13 +544,13 @@ func TestLPBatcher(t *testing.T) {
}))

sent := 0
for n, _ := range lines {
for n := range lines {
if n%100 == 0 {
lpb.Add(lines[sent : sent+100]...)
sent += 100
}
}
lpb.Add(lines[sent:len(lines)]...) // add remainder
lpb.Add(lines[sent:]...) // add remainder

// Check that collected emits make sense
assert.Equal(t, readyCt, emitCt)
Expand All @@ -558,7 +572,7 @@ func TestLPBatcher(t *testing.T) {
qiterator, qerr := client.Query(context.Background(), query)

if qerr != nil {
fmt.Printf("ERROR %v\n", qerr)
assert.Failf(t, "Failed to query.", "query: %s", query)
}

var pvResults []*influxdb3.PointValues
Expand Down

0 comments on commit 01a7f99

Please sign in to comment.