Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIVOT ON support #3390

Merged
merged 29 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,984 changes: 997 additions & 987 deletions proto/gen/rill/runtime/v1/queries.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,10 @@ paths:
title: Deprecated in favor of time_range
filter:
$ref: '#/definitions/v1MetricsViewFilter'
pivotOn:
type: array
items:
type: string
limit:
type: string
format: int64
Expand Down Expand Up @@ -3383,6 +3387,10 @@ definitions:
title: Deprecated in favor of time_range
filter:
$ref: '#/definitions/v1MetricsViewFilter'
pivotOn:
type: array
items:
type: string
limit:
type: string
format: int64
Expand Down
1 change: 1 addition & 0 deletions proto/rill/runtime/v1/queries.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ message MetricsViewAggregationRequest {
google.protobuf.Timestamp time_start = 6; // Deprecated in favor of time_range
google.protobuf.Timestamp time_end = 7; // Deprecated in favor of time_range
MetricsViewFilter filter = 8;
repeated string pivot_on = 13;
int64 limit = 9 [(validate.rules).int64.gte = 0];
int64 offset = 10 [(validate.rules).int64.gte = 0];
int32 priority = 11;
Expand Down
6 changes: 3 additions & 3 deletions runtime/drivers/duckdb/transporter_sqlstore_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
p.Target(int64(total), drivers.ProgressUnitRecord)
}
// create table
qry, err := createTableQuery(schema, table)
qry, err := CreateTableQuery(schema, table)
if err != nil {
return err
}
Expand All @@ -132,7 +132,7 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
}

return s.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, conn *sql.Conn) error {
return rawConn(conn, func(conn driver.Conn) error {
return RawConn(conn, func(conn driver.Conn) error {
a, err := duckdb.NewAppenderFromConn(conn, "", table)
if err != nil {
return err
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
})
}

func createTableQuery(schema *runtimev1.StructType, name string) (string, error) {
func CreateTableQuery(schema *runtimev1.StructType, name string) (string, error) {
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s(", safeName(name))
for i, s := range schema.Fields {
i++
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// rawConn is similar to *sql.Conn.Raw, but additionally unwraps otelsql (which we use for instrumentation).
func rawConn(conn *sql.Conn, f func(driver.Conn) error) error {
func RawConn(conn *sql.Conn, f func(driver.Conn) error) error {
return conn.Raw(func(raw any) error {
// For details, see: https://github.com/XSAM/otelsql/issues/98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an implementation detail we should try to avoid leaking from the driver

if c, ok := raw.(interface{ Raw() driver.Conn }); ok {
Expand Down
209 changes: 192 additions & 17 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ package queries

import (
"context"
databasesql "database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"

"github.com/marcboeker/go-duckdb"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
duckdbolap "github.com/rilldata/rill/runtime/drivers/duckdb"
"github.com/rilldata/rill/runtime/pkg/activity"
"go.uber.org/zap"
)

var (
dbOnce sync.Once
handle drivers.Handle
errHandle error
)

type MetricsViewAggregation struct {
Expand All @@ -25,6 +38,7 @@ type MetricsViewAggregation struct {
Offset int64 `json:"offset,omitempty"`
MetricsView *runtimev1.MetricsViewSpec `json:"-"`
ResolvedMVSecurity *runtime.ResolvedMetricsViewSecurity `json:"security"`
PivotOn []string `json:"pivot_on,omitempty"`

Result *runtimev1.MetricsViewAggregationResponse `json:"-"`
}
Expand Down Expand Up @@ -76,14 +90,159 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim
return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsView)
}

// Build query
sql, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity)
// execute druid query
sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment doesn't match

if err != nil {
return fmt.Errorf("error building query: %w", err)
}

// Execute
schema, data, err := olapQuery(ctx, olap, priority, sql, args)
if len(q.PivotOn) == 0 {
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args)
if err != nil {
return err
}

q.Result = &runtimev1.MetricsViewAggregationResponse{
Schema: schema,
Data: data,
}
return nil
}

if olap.Dialect() == drivers.DialectDuckDB {
return olap.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error {
temporaryTableName := tempName("_for_pivot_")

err = olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("CREATE TEMPORARY TABLE %s AS %s", temporaryTableName, sqlString),
Args: args,
Priority: priority,
})
if err != nil {
return err
}
defer func() {
_ = olap.Exec(ensuredCtx, &drivers.Statement{
Query: `DROP TABLE "` + temporaryTableName + `"`,
})
}()

return q.pivotOn(ctx, olap, temporaryTableName)
})
}
dbOnce.Do(func() {
handle, errHandle = duckdbolap.Driver{}.Open(map[string]any{"pool_size": 10}, false, activity.NewNoopClient(), zap.NewNop())
})
if errHandle != nil {
return errHandle
}

schema, data, err := olapQuery(ctx, olap, priority, sqlString, args)
if err != nil {
return err
}

duckDBOLAP, _ := handle.AsOLAP("")
err = duckDBOLAP.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error {
temporaryTableName := tempName("_for_pivot_")
createTableSQL, err := duckdbolap.CreateTableQuery(schema, temporaryTableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid using our DuckDB OLAP driver for this kind of ephemeral in-memory transformation. A few reasons:

  • Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
  • It leaks some pretty internal details of the driver into the runtime/queries package (like the RawConn func should not be public)
  • Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.

As discussed previously, I would suggest creating a generic runtime/pkg/duckdbpivot package with a Pivot function that takes a sqlx.Rows iterator as an argument, and returns a pivoted sqlx.Rows. Internally, it would use database/sql to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.

That would also encapsulate the appender logic in a scoped package, and allow easier testing of the code.

===

An implication for the above is that here, the DuckDB and Druid cases become pretty different (since we want to avoid the overhead of converting from our DuckDB driver to duckdbpivot), but since the pivot SQL is pretty simple, I think some duplication is the best way to keep it simple.

Copy link
Contributor Author

@egor-ryashin egor-ryashin Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run some benchmarks to compare DB initialising vs keeping for a table of 10k rows - 3x slower. My thoughts, yes, there are usually low time requirements for exports, but eventually it's 3x CPU load - though, it's OK until exports is a small part of Runtime CPU consumption (this speculation considers the cloud-based mostly):

Running tool: /opt/homebrew/opt/go/libexec/bin/go test -benchmem -run=^$ -bench ^(BenchmarkTestCreate|BenchmarkTestNoCreate)$ github.com/rilldata/rill/runtime/drivers/duckdb -count=1

goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/drivers/duckdb
BenchmarkTestCreate-10      	      36	  34185877 ns/op	  353456 B/op	   26358 allocs/op
BenchmarkTestNoCreate-10    	      97	  11942874 ns/op	  351659 B/op	   26322 allocs/op
PASS
ok  	github.com/rilldata/rill/runtime/drivers/duckdb	4.416s
package duckdb

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
)

// func initDb(t *testing.B) (*sql.DB, error) {
// 	connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error {
// 		bootQueries := []string{
// 			"INSTALL 'icu'",
// 			"LOAD 'icu'",
// 		}

// 		for _, qry := range bootQueries {
// 			_, err := execer.ExecContext(context.Background(), qry, nil)
// 			if err != nil {
// 				return err
// 			}
// 		}
// 		return nil
// 	})
// 	if err != nil {
// 		return nil, err
// 	}

// 	db := sql.OpenDB(connector)
// 	return db, nil
// }

func BenchmarkTestCreate(b *testing.B) {
	for i := 0; i < b.N; i++ {
		func() {
			db, err := initDb(b)
			require.NoError(b, err)
			defer db.Close()

			_, err = db.ExecContext(context.Background(), `
	CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
			require.NoError(b, err)

			r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
			require.NoError(b, err)
			var cnt int
			var tm time.Time
			for r.Next() {
				err = r.Scan(&tm, &cnt)
				require.NoError(b, err)
			}
			r.Close()

			_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
			require.NoError(b, err)

		}()
	}
}

func BenchmarkTestNoCreate(b *testing.B) {
	db, err := initDb(b)
	require.NoError(b, err)
	defer db.Close()

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, err = db.ExecContext(context.Background(), `
	CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
		require.NoError(b, err)

		r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
		require.NoError(b, err)
		var cnt int
		var tm time.Time
		for r.Next() {
			err = r.Scan(&tm, &cnt)
			require.NoError(b, err)
		}
		r.Close()

		_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
		require.NoError(b, err)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pivot function that takes a sqlx.Rows iterator as an argument, and returns a pivoted sqlx.Rows. Internally, it would use database/sql to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.

won't work. The function creates a handle but doesn't return it and it's not closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I cannot pass sqlx.Rows because I have drivers.Result from the OLAPStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
It leaks some pretty internal details of the driver into the runtime/queries package (like the RawConn func should not be public)
Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.

Addressed those.

if err != nil {
return err
}

err = duckDBOLAP.Exec(ctx, &drivers.Statement{
Query: createTableSQL,
})
if err != nil {
return err
}
defer func() {
_ = duckDBOLAP.Exec(ensuredCtx, &drivers.Statement{
Query: `DROP TABLE "` + temporaryTableName + `"`,
})
}()

err = duckdbolap.RawConn(conn, func(conn driver.Conn) error {
appender, err := duckdb.NewAppenderFromConn(conn, "", temporaryTableName)
if err != nil {
return err
}
defer appender.Close()

batchSize := 10000
arr := make([]driver.Value, 0, len(schema.Fields))
count := 0
for _, row := range data {
for _, key := range schema.Fields {
arr = append(arr, row.Fields[key.Name].AsInterface())
}
err = appender.AppendRowArray(arr)
if err != nil {
return err
}
arr = arr[:0]
count++
if count >= batchSize {
appender.Flush()
count = 0
}
}
Comment on lines +260 to +264
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should additionally enforce a limit on the number of cells to be pivoted (i.e. len(columns) * count < maxPivotCells). Maybe something like 1m cells?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to implement the same against DuckDB (can use a LIMIT clause of maxPivotCells+1 to enforce, and run a count query against the temporary table to determine if the maxPivotCells was exceeded).

appender.Flush()

return nil
})
if err != nil {
return err
}

return q.pivotOn(ctx, duckDBOLAP, temporaryTableName)
})
return err
}

func (q *MetricsViewAggregation) pivotOn(ctx context.Context, olap drivers.OLAPStore, temporaryTableName string) error {
measureCols := make([]string, 0, len(q.Measures))
for _, m := range q.Measures {
sn := safeName(m.Name)
measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn))
}

sortingCriteria := make([]string, 0, len(q.Sort))
for _, s := range q.Sort {
sortCriterion := safeName(s.Name)
if s.Desc {
sortCriterion += " DESC"
}
if olap.Dialect() == drivers.DialectDuckDB {
sortCriterion += " NULLS LAST"
}
sortingCriteria = append(sortingCriteria, sortCriterion)
}

orderClause := ""
if len(sortingCriteria) > 0 {
orderClause = "ORDER BY " + strings.Join(sortingCriteria, ", ")
}

var limitClause string
if q.Limit != nil {
if *q.Limit == 0 {
*q.Limit = 100
}
limitClause = fmt.Sprintf("LIMIT %d", *q.Limit)
}

// execute duckdb pivot
// PIVOT t ON year USING LAST(ap) ap;
pivotSQL := fmt.Sprintf("PIVOT %[1]s ON %[2]s USING %[3]s %[4]s %[5]s OFFSET %[6]d",
temporaryTableName, // 1
strings.Join(q.PivotOn, ", "), // 2
strings.Join(measureCols, ", "), // 3
orderClause, // 4
limitClause, // 5
q.Offset, // 6
)
schema, data, err := olapQuery(ctx, olap, int(q.Priority), pivotSQL, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -135,11 +294,12 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
return "", nil, errors.New("no dimensions or measures specified")
}

selectCols := make([]string, 0, len(q.Dimensions)+len(q.Measures))
cols := len(q.Dimensions) + len(q.Measures)
selectCols := make([]string, 0, cols)

groupCols := make([]string, 0, len(q.Dimensions))
unnestClauses := make([]string, 0)
args := []any{}

for _, d := range q.Dimensions {
// Handle regular dimensions
if d.TimeGrain == runtimev1.TimeGrain_TIME_GRAIN_UNSPECIFIED {
Expand Down Expand Up @@ -173,13 +333,14 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
}

for _, m := range q.Measures {
sn := safeName(m.Name)
switch m.BuiltinMeasure {
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_UNSPECIFIED:
expr, err := metricsViewMeasureExpression(mv, m.Name)
if err != nil {
return "", nil, err
}
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, safeName(m.Name)))
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn))
Comment on lines +451 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sn only referenced once in the scope – probably simpler to keep as before

case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT:
selectCols = append(selectCols, fmt.Sprintf("COUNT(*) as %s", safeName(m.Name)))
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT_DISTINCT:
Expand Down Expand Up @@ -246,16 +407,30 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
limitClause = fmt.Sprintf("LIMIT %d", *q.Limit)
}

sql := fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d",
strings.Join(selectCols, ", "),
safeName(mv.Table),
strings.Join(unnestClauses, ""),
whereClause,
groupClause,
orderClause,
limitClause,
q.Offset,
)
var sql string
if q.PivotOn != nil {
// select m1, m2, d1, d2 from t, lateral unnest(t.d1) tbl(unnested_d1_) where d1 = 'a' group by d1, d2
sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s %[5]s %[6]s %[7]s",
strings.Join(selectCols, ", "), // 1
safeName(mv.Table), // 2
strings.Join(unnestClauses, ""), // 3
whereClause, // 4
groupClause, // 5
orderClause, // 6
limitClause, // 7
)
} else {
sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d",
strings.Join(selectCols, ", "),
safeName(mv.Table),
strings.Join(unnestClauses, ""),
whereClause,
groupClause,
orderClause,
limitClause,
q.Offset,
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They seem equivalent apart from the offset clause? I would suggest we return an error if passing q.Offset together with q.PivotOn, since that's not easy to support right now


return sql, args, nil
}
Expand Down
Loading