Skip to content

Commit

Permalink
feat: add structured query (#45)
Browse files Browse the repository at this point in the history
* feat: use same API as in C# and JS

* feat: ValueAsPoint

* feat: use same API as in C# and JS

* fix: linter

* fix: tags

* feat: add downsampling example

* docs: update CHANGELOG.md

* chore: add field values

* chore: add field values

* chore: add field values

* chore: add field values

* chore: add field values

* chore: add field values

---------

Co-authored-by: Sciator <[email protected]>
  • Loading branch information
bednar and Sciator authored Nov 3, 2023
1 parent 3cede62 commit c262f3e
Show file tree
Hide file tree
Showing 13 changed files with 754 additions and 153 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## 0.4.0 [unreleased]

### Features

1. [#45](https://github.com/InfluxCommunity/influxdb3-go/pull/45): Add structured query support

### Docs

1. [#45](https://github.com/InfluxCommunity/influxdb3-go/pull/45): Add downsampling example

## 0.3.0 [2023-10-02]

### Features
Expand Down
113 changes: 113 additions & 0 deletions example/Downsampling/downsampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"context"
"fmt"
"github.com/apache/arrow/go/v13/arrow"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

func main() {
url := "https://us-east-1-1.aws.cloud2.influxdata.com"
token := "my-token"
database := "my-database"

client, err := influxdb3.New(influxdb3.ClientConfig{
Host: url,
Token: token,
Database: database,
})

if err != nil {
panic(err)
}
// Close client at the end and escalate error if present
defer func(client *influxdb3.Client) {
err := client.Close()
if err != nil {
panic(err)
}
}(client)

//
// Write data
//
err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat").
SetTag("unit", "temperature").
SetDoubleField("avg", 23.2).
SetDoubleField("max", 45.0))
if err != nil {
panic(err)
}
time.Sleep(1 * time.Second)

err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat").
SetTag("unit", "temperature").
SetDoubleField("avg", 28.0).
SetDoubleField("max", 40.3))
if err != nil {
panic(err)
}
time.Sleep(1 * time.Second)

err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat").
SetTag("unit", "temperature").
SetDoubleField("avg", 23.2).
SetDoubleField("max", 45.0))
if err != nil {
panic(err)
}
time.Sleep(1 * time.Second)

//
// Query Downsampled data
//
query := `
SELECT
date_bin('5 minutes', "time") as window_start,
AVG("avg") as avg,
MAX("max") as max
FROM "stat"
WHERE
"time" >= now() - interval '1 hour'
GROUP BY window_start
ORDER BY window_start ASC;
`

//
// Execute downsampling query into PointValues
//
iterator, err := client.Query(context.Background(), query)
if err != nil {
panic(err)
}

for iterator.Next() {
row := iterator.AsPoints()
timestamp := int64(row.GetField("window_start").(arrow.Timestamp))

avgValue := row.GetDoubleField("avg")
maxValue := row.GetDoubleField("max")
fmt.Printf("%s: avg is %.2f, max is %.2f\n", time.Unix(0, timestamp), *avgValue, *maxValue)

//
// write back downsampled date to 'stat_downsampled' measurement
//
downsampledPoint, err := row.AsPointWithMeasurement("stat_downsampled")
if err != nil {
panic(err)
}

downsampledPoint = downsampledPoint.
RemoveField("window_start").
SetTimestampWithEpoch(timestamp)

err = client.WritePoints(context.Background(), downsampledPoint)
if err != nil {
panic(err)
}
}

}
6 changes: 3 additions & 3 deletions example/main.go → example/IOx/iox.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func main() {
}
// Create point using fluent style
p = influxdb3.NewPointWithMeasurement("stat").
AddTag("unit", "temperature").
AddField("avg", 23.2).
AddField("max", 45.0).
SetTag("unit", "temperature").
SetField("avg", 23.2).
SetField("max", 45.0).
SetTimestamp(time.Now())
// write point synchronously
err = client.WritePoints(context.Background(), p)
Expand Down
4 changes: 4 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Examples

- [IOxExample](IOx/iox.go) - How to use write and query data from InfluxDB IOx
- [Downsampling](Downsampling/downsampling.go) - How to use queries to structure data for downsampling
26 changes: 19 additions & 7 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func TestWriteAndQueryExample(t *testing.T) {
// Write test

p := influxdb3.NewPointWithMeasurement(tableName).
AddTag(tagKey, tagValue).
AddField("temp", 15.5).
AddField("index", 80).
AddField("uindex", uint64(800)).
AddField("valid", true).
AddField("testId", testId).
AddField("text", "a1").
SetTag(tagKey, tagValue).
SetField("temp", 15.5).
SetField("index", 80).
SetField("uindex", uint64(800)).
SetField("valid", true).
SetField("testId", testId).
SetField("text", "a1").
SetTimestamp(now)
err = client.WritePoints(context.Background(), p)
require.NoError(t, err)
Expand Down Expand Up @@ -134,6 +134,18 @@ func TestWriteAndQueryExample(t *testing.T) {

assert.False(t, iterator.Next())
assert.True(t, iterator.Done())

iterator, err = client.Query(context.Background(), query)
hasValue = iterator.Next()
assert.True(t, hasValue)
points := iterator.AsPoints()
assert.Equal(t, uint64(800), points.Fields["uindex"])

hasValue = iterator.Next()
assert.True(t, hasValue)

newPoint, _ := points.AsPointWithMeasurement("to_write")
assert.True(t, newPoint != nil)
}

func TestQueryDatabaseDoesNotExist(t *testing.T) {
Expand Down
Loading

0 comments on commit c262f3e

Please sign in to comment.