Skip to content

Commit

Permalink
feat: ValueAsPoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Sciator authored and bednar committed Oct 11, 2023
1 parent 00ee884 commit 1d654fd
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
10 changes: 10 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)

func main() {
Expand Down Expand Up @@ -95,6 +96,15 @@ func main() {

fmt.Printf("avg is %f\n", value["avg"])
fmt.Printf("max is %f\n", value["max"])

point := iterator.ValueAsPoint();
lp, err := point.MarshalBinary(lineprotocol.Millisecond);

if (err != nil) {
panic((err))
}

fmt.Print(string(lp))
}

}
54 changes: 54 additions & 0 deletions influxdb3/query_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package influxdb3

import (
"fmt"
"strings"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
Expand Down Expand Up @@ -94,6 +95,56 @@ func (i *QueryIterator) Next() bool {
return true
}

func (i *QueryIterator) ValueAsPoint() *Point {
readerSchema := i.reader.Schema()
p := NewPointWithMeasurement("__empty__")

for ci, col := range i.record.Columns() {
schema := readerSchema.Field(ci)
name := schema.Name
value, err := getArrowValue(col, i.indexInRecord)
if err != nil {
panic(err)
}
if value == nil {
continue
}

metadataType, hasmetadataType := schema.Metadata.GetValue("iox::column::type")

if stringValue, isString := value.(string); ((name == "measurement") || (name == "iox::measurement")) && isString {
p.Measurement = stringValue
continue
}

if !hasmetadataType {
if timestampValue, isTimestamp := value.(arrow.Timestamp); isTimestamp && name == "time" {
p.SetTimestamp(timestampValue.ToTime(arrow.Nanosecond))
} else {
p.AddField(name, value)
}
continue
}

parts := strings.Split(metadataType, "::")
_, _, valueType := parts[0], parts[1], parts[2]
// fieldType := ""
// if len(parts) >= 4 {
// fieldType = parts[3];
// }

if valueType == "field" {
p.AddField(name, value)
} else if stringValue, isString := value.(string); isString && valueType == "tag" {
p.AddTag(name, stringValue)
} else if timestampValue, isTimestamp := value.(arrow.Timestamp); isTimestamp && valueType == "timestamp" {
p.SetTimestamp(timestampValue.ToTime(arrow.Nanosecond))
}
}

return p
}

// Value returns the current value from the flight reader as a map object.
// The map contains the fields and tags as key-value pairs.
//
Expand Down Expand Up @@ -130,6 +181,9 @@ func (i *QueryIterator) Raw() *flight.Reader {
}

func getArrowValue(arrayNoType arrow.Array, i int) (interface{}, error) {
if arrayNoType.IsNull(i) {
return nil, nil
}
switch arrayNoType.DataType().ID() {
case arrow.NULL:
return nil, nil
Expand Down

0 comments on commit 1d654fd

Please sign in to comment.