-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIVOT ON support #3390
PIVOT ON support #3390
Changes from 14 commits
ca628b5
137367e
0b48abe
a5b60db
9b89a0e
a1ff2df
78cc055
fe28fd6
55aa9bc
17df06e
32cc6a7
0303594
f20e289
5c4988f
79f78fb
2bc33c3
7a2233f
fb995ec
fa470dc
a24dd5e
18ccdd5
a879eb0
b3f4302
84a8fe0
9a38291
1a70bb4
e279681
98dd97f
815ba63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,15 +2,28 @@ package queries | |
|
||
import ( | ||
"context" | ||
databasesql "database/sql" | ||
"database/sql/driver" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/marcboeker/go-duckdb" | ||
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" | ||
"github.com/rilldata/rill/runtime" | ||
"github.com/rilldata/rill/runtime/drivers" | ||
duckdbolap "github.com/rilldata/rill/runtime/drivers/duckdb" | ||
"github.com/rilldata/rill/runtime/pkg/activity" | ||
"go.uber.org/zap" | ||
) | ||
|
||
var ( | ||
dbOnce sync.Once | ||
handle drivers.Handle | ||
errHandle error | ||
) | ||
|
||
type MetricsViewAggregation struct { | ||
|
@@ -25,6 +38,7 @@ type MetricsViewAggregation struct { | |
Offset int64 `json:"offset,omitempty"` | ||
MetricsView *runtimev1.MetricsViewSpec `json:"-"` | ||
ResolvedMVSecurity *runtime.ResolvedMetricsViewSecurity `json:"security"` | ||
PivotOn []string `json:"pivot_on,omitempty"` | ||
|
||
Result *runtimev1.MetricsViewAggregationResponse `json:"-"` | ||
} | ||
|
@@ -76,14 +90,159 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim | |
return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsView) | ||
} | ||
|
||
// Build query | ||
sql, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity) | ||
// execute druid query | ||
sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment doesn't match |
||
if err != nil { | ||
return fmt.Errorf("error building query: %w", err) | ||
} | ||
|
||
// Execute | ||
schema, data, err := olapQuery(ctx, olap, priority, sql, args) | ||
if len(q.PivotOn) == 0 { | ||
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
q.Result = &runtimev1.MetricsViewAggregationResponse{ | ||
Schema: schema, | ||
Data: data, | ||
} | ||
return nil | ||
} | ||
|
||
if olap.Dialect() == drivers.DialectDuckDB { | ||
return olap.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error { | ||
temporaryTableName := tempName("_for_pivot_") | ||
|
||
err = olap.Exec(ctx, &drivers.Statement{ | ||
Query: fmt.Sprintf("CREATE TEMPORARY TABLE %s AS %s", temporaryTableName, sqlString), | ||
Args: args, | ||
Priority: priority, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
_ = olap.Exec(ensuredCtx, &drivers.Statement{ | ||
Query: `DROP TABLE "` + temporaryTableName + `"`, | ||
}) | ||
}() | ||
|
||
return q.pivotOn(ctx, olap, temporaryTableName) | ||
}) | ||
} | ||
dbOnce.Do(func() { | ||
handle, errHandle = duckdbolap.Driver{}.Open(map[string]any{"pool_size": 10}, false, activity.NewNoopClient(), zap.NewNop()) | ||
}) | ||
if errHandle != nil { | ||
return errHandle | ||
} | ||
|
||
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
duckDBOLAP, _ := handle.AsOLAP("") | ||
err = duckDBOLAP.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error { | ||
temporaryTableName := tempName("_for_pivot_") | ||
createTableSQL, err := duckdbolap.CreateTableQuery(schema, temporaryTableName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid using our DuckDB OLAP driver for this kind of ephemeral in-memory transformation. A few reasons:
As discussed previously, I would suggest creating a generic That would also encapsulate the appender logic in a scoped package, and allow easier testing of the code. === An implication for the above is that here, the DuckDB and Druid cases become pretty different (since we want to avoid the overhead of converting from our DuckDB driver to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've run some benchmarks to compare DB initialising vs keeping for a table of 10k rows - 3x slower. My thoughts, yes, there are usually low time requirements for exports, but eventually it's 3x CPU load - though, it's OK until exports is a small part of Runtime CPU consumption (this speculation considers the cloud-based mostly):
package duckdb
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// func initDb(t *testing.B) (*sql.DB, error) {
// connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error {
// bootQueries := []string{
// "INSTALL 'icu'",
// "LOAD 'icu'",
// }
// for _, qry := range bootQueries {
// _, err := execer.ExecContext(context.Background(), qry, nil)
// if err != nil {
// return err
// }
// }
// return nil
// })
// if err != nil {
// return nil, err
// }
// db := sql.OpenDB(connector)
// return db, nil
// }
func BenchmarkTestCreate(b *testing.B) {
for i := 0; i < b.N; i++ {
func() {
db, err := initDb(b)
require.NoError(b, err)
defer db.Close()
_, err = db.ExecContext(context.Background(), `
CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
require.NoError(b, err)
r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
require.NoError(b, err)
var cnt int
var tm time.Time
for r.Next() {
err = r.Scan(&tm, &cnt)
require.NoError(b, err)
}
r.Close()
_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
require.NoError(b, err)
}()
}
}
func BenchmarkTestNoCreate(b *testing.B) {
db, err := initDb(b)
require.NoError(b, err)
defer db.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = db.ExecContext(context.Background(), `
CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
require.NoError(b, err)
r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
require.NoError(b, err)
var cnt int
var tm time.Time
for r.Next() {
err = r.Scan(&tm, &cnt)
require.NoError(b, err)
}
r.Close()
_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
require.NoError(b, err)
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
won't work. The function creates a handle but doesn't return it and it's not closed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, I cannot pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Addressed those. |
||
if err != nil { | ||
return err | ||
} | ||
|
||
err = duckDBOLAP.Exec(ctx, &drivers.Statement{ | ||
Query: createTableSQL, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
_ = duckDBOLAP.Exec(ensuredCtx, &drivers.Statement{ | ||
Query: `DROP TABLE "` + temporaryTableName + `"`, | ||
}) | ||
}() | ||
|
||
err = duckdbolap.RawConn(conn, func(conn driver.Conn) error { | ||
appender, err := duckdb.NewAppenderFromConn(conn, "", temporaryTableName) | ||
if err != nil { | ||
return err | ||
} | ||
defer appender.Close() | ||
|
||
batchSize := 10000 | ||
arr := make([]driver.Value, 0, len(schema.Fields)) | ||
count := 0 | ||
for _, row := range data { | ||
for _, key := range schema.Fields { | ||
arr = append(arr, row.Fields[key.Name].AsInterface()) | ||
} | ||
err = appender.AppendRowArray(arr) | ||
if err != nil { | ||
return err | ||
} | ||
arr = arr[:0] | ||
count++ | ||
if count >= batchSize { | ||
appender.Flush() | ||
count = 0 | ||
} | ||
} | ||
Comment on lines
+260
to
+264
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should additionally enforce a limit on the number of cells to be pivoted (i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to implement the same against DuckDB (can use a |
||
appender.Flush() | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return q.pivotOn(ctx, duckDBOLAP, temporaryTableName) | ||
}) | ||
return err | ||
} | ||
|
||
func (q *MetricsViewAggregation) pivotOn(ctx context.Context, olap drivers.OLAPStore, temporaryTableName string) error { | ||
measureCols := make([]string, 0, len(q.Measures)) | ||
for _, m := range q.Measures { | ||
sn := safeName(m.Name) | ||
measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn)) | ||
} | ||
|
||
sortingCriteria := make([]string, 0, len(q.Sort)) | ||
for _, s := range q.Sort { | ||
sortCriterion := safeName(s.Name) | ||
if s.Desc { | ||
sortCriterion += " DESC" | ||
} | ||
if olap.Dialect() == drivers.DialectDuckDB { | ||
sortCriterion += " NULLS LAST" | ||
} | ||
sortingCriteria = append(sortingCriteria, sortCriterion) | ||
} | ||
|
||
orderClause := "" | ||
if len(sortingCriteria) > 0 { | ||
orderClause = "ORDER BY " + strings.Join(sortingCriteria, ", ") | ||
} | ||
|
||
var limitClause string | ||
if q.Limit != nil { | ||
if *q.Limit == 0 { | ||
*q.Limit = 100 | ||
} | ||
limitClause = fmt.Sprintf("LIMIT %d", *q.Limit) | ||
} | ||
|
||
// execute duckdb pivot | ||
// PIVOT t ON year USING LAST(ap) ap; | ||
pivotSQL := fmt.Sprintf("PIVOT %[1]s ON %[2]s USING %[3]s %[4]s %[5]s OFFSET %[6]d", | ||
temporaryTableName, // 1 | ||
strings.Join(q.PivotOn, ", "), // 2 | ||
strings.Join(measureCols, ", "), // 3 | ||
orderClause, // 4 | ||
limitClause, // 5 | ||
q.Offset, // 6 | ||
) | ||
schema, data, err := olapQuery(ctx, olap, int(q.Priority), pivotSQL, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -135,11 +294,12 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric | |
return "", nil, errors.New("no dimensions or measures specified") | ||
} | ||
|
||
selectCols := make([]string, 0, len(q.Dimensions)+len(q.Measures)) | ||
cols := len(q.Dimensions) + len(q.Measures) | ||
selectCols := make([]string, 0, cols) | ||
|
||
groupCols := make([]string, 0, len(q.Dimensions)) | ||
unnestClauses := make([]string, 0) | ||
args := []any{} | ||
|
||
for _, d := range q.Dimensions { | ||
// Handle regular dimensions | ||
if d.TimeGrain == runtimev1.TimeGrain_TIME_GRAIN_UNSPECIFIED { | ||
|
@@ -173,13 +333,14 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric | |
} | ||
|
||
for _, m := range q.Measures { | ||
sn := safeName(m.Name) | ||
switch m.BuiltinMeasure { | ||
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_UNSPECIFIED: | ||
expr, err := metricsViewMeasureExpression(mv, m.Name) | ||
if err != nil { | ||
return "", nil, err | ||
} | ||
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, safeName(m.Name))) | ||
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn)) | ||
Comment on lines
+451
to
+458
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT: | ||
selectCols = append(selectCols, fmt.Sprintf("COUNT(*) as %s", safeName(m.Name))) | ||
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT_DISTINCT: | ||
|
@@ -246,16 +407,30 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric | |
limitClause = fmt.Sprintf("LIMIT %d", *q.Limit) | ||
} | ||
|
||
sql := fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d", | ||
strings.Join(selectCols, ", "), | ||
safeName(mv.Table), | ||
strings.Join(unnestClauses, ""), | ||
whereClause, | ||
groupClause, | ||
orderClause, | ||
limitClause, | ||
q.Offset, | ||
) | ||
var sql string | ||
if q.PivotOn != nil { | ||
// select m1, m2, d1, d2 from t, lateral unnest(t.d1) tbl(unnested_d1_) where d1 = 'a' group by d1, d2 | ||
sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s %[5]s %[6]s %[7]s", | ||
strings.Join(selectCols, ", "), // 1 | ||
safeName(mv.Table), // 2 | ||
strings.Join(unnestClauses, ""), // 3 | ||
whereClause, // 4 | ||
groupClause, // 5 | ||
orderClause, // 6 | ||
limitClause, // 7 | ||
) | ||
} else { | ||
sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d", | ||
strings.Join(selectCols, ", "), | ||
safeName(mv.Table), | ||
strings.Join(unnestClauses, ""), | ||
whereClause, | ||
groupClause, | ||
orderClause, | ||
limitClause, | ||
q.Offset, | ||
) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They seem equivalent apart from the offset clause? I would suggest we return an error if passing |
||
|
||
return sql, args, nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an implementation detail we should try to avoid leaking from the driver