Skip to content

Commit

Permalink
fix: no panic if i.reader.Next() returns an empty result (#37)
Browse files Browse the repository at this point in the history
* fix: no panic if i.reader.Next() returns an empty res

* fix: cleanup code; unit test

* docs: Update CHANGELOG.md

---------

Co-authored-by: Jakub Bednář <[email protected]>
  • Loading branch information
akvlad and bednar authored Sep 8, 2023
1 parent d7292cc commit 3fa711c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.3.0 [unreleased]

### Bug Fixes

1. [#37](https://github.com/InfluxCommunity/influxdb3-go/pull/37): `runtime error` for iterating Arrow Record without rows

## 0.2.0 [2023-08-11]

### Features
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/query_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (i *QueryIterator) Next() bool {
}
i.indexInRecord++
i.i++
if i.record == nil || i.indexInRecord >= int(i.record.NumRows()) {
for i.record == nil || i.indexInRecord >= int(i.record.NumRows()) {
if !i.reader.Next() {
i.done = true
return false
Expand Down
63 changes: 63 additions & 0 deletions influxdb3/query_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package influxdb3

import (
"bytes"
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/flight"
"github.com/apache/arrow/go/v13/arrow/ipc"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/stretchr/testify/assert"
"testing"
)

type testMessagesReader struct {
r ipc.MessageReader
}

func (r *testMessagesReader) Message() (*ipc.Message, error) {
return r.r.Message()
}
func (r *testMessagesReader) Release() {}
func (r *testMessagesReader) Retain() {}

func TestQueryIteratorEmptyRecord(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "f1", Type: arrow.PrimitiveTypes.Int32},
}, nil)
var buf bytes.Buffer
writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))

rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
rec := rb.NewRecord() // first record is empty
err := writer.Write(rec)
assert.Nil(t, err)

rb.Field(0).(*array.Int32Builder).AppendValues([]int32{1}, nil)
rec = rb.NewRecord() // second record is not empty
err = writer.Write(rec)
assert.Nil(t, err)

err = writer.Close()
assert.Nil(t, err)

reader := ipc.NewMessageReader(&buf)

ipcReader, err := ipc.NewReaderFromMessageReader(
&testMessagesReader{
r: reader,
})
assert.Nil(t, err)

fReader := &flight.Reader{Reader: ipcReader}
it := newQueryIterator(fReader)

count := 0
for it.Next() {
assert.Equal(t, 1, it.record.Column(0).(*array.Int32).Len())
assert.Equal(t, int32(1), it.record.Column(0).(*array.Int32).Value(0))
assert.Equal(t, 0, count)
count++
}
assert.Equal(t, 1, count)
}

0 comments on commit 3fa711c

Please sign in to comment.