Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIVOT ON support #3390

Merged
merged 29 commits into from
Jan 2, 2024
Merged

PIVOT ON support #3390

merged 29 commits into from
Jan 2, 2024

Conversation

egor-ryashin
Copy link
Contributor

No description provided.

Comment on lines 700 to 729
func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) {
if r == nil {
return nil, nil
}

cts, err := r.ColumnTypes()
if err != nil {
return nil, err
}

fields := make([]*runtimev1.StructType_Field, len(cts))
for i, ct := range cts {
nullable, ok := ct.Nullable()
if !ok {
nullable = true
}

t, err := DatabaseTypeToPB(ct.DatabaseTypeName(), nullable)
if err != nil {
return nil, err
}

fields[i] = &runtimev1.StructType_Field{
Name: ct.Name(),
Type: t,
}
}

return &runtimev1.StructType{Fields: fields}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a duplicate of RowsToSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost, the parameter type is different though
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 14 to 22
duckdbolap "github.com/rilldata/rill/runtime/drivers/duckdb"

databasesql "database/sql"
"database/sql/driver"
"sync"

"github.com/marcboeker/go-duckdb"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"google.golang.org/protobuf/types/known/structpb"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken formatting – std lib should be in first group, other libs in second group

@@ -129,16 +268,47 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime

return nil
}
func RowsToSchema(r *databasesql.Rows) (*runtimev1.StructType, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate of duckdb.RowsToSchema?

return r.Err()
}

func rowsToSchema(r *databasesql.Rows) (*runtimev1.StructType, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another duplicate?

return query, nil
}

func pbTypeToDuckDB(t *runtimev1.Type) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate from runtime/drivers/duckdb/transporter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 461 to 462
func createTableQuery(schema *runtimev1.StructType, name string) (string, error) {
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s(", safeName(name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use TEMPORARY TABLE

return err
}
defer func() {
_, _ = db.ExecContext(ctx, `DROP TABLE "`+temporaryTableName+`"`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also change here – DROP TEMPORARY TABLE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DROP doesn't have TEMPORARY clause

Comment on lines 100 to 103
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args)
if err != nil {
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth optimizing to avoid serializing and deserializing when the DB is already DuckDB. Additionally, it's also worth pushing exports into DuckDB. So the control flow needs to be something like:

  • On DuckDB, run a create temporary table ... as select ...
  • On Druid, we need to run the olapQuery and create and insert it into a new temporary table
  • Then we should execute the pivot against the temporary table OR execute the export against the temporary table
  • Lastly, drop the temporary table

Comment on lines 113 to 132
var connErr error
var db *databasesql.DB
dbOnce.Do(func() {
connector, connErr = duckdb.NewConnector("", func(conn driver.ExecerContext) error {
_, err := conn.ExecContext(context.Background(), "INSTALL 'json'; LOAD 'json';", nil)
return err
})

db = databasesql.OpenDB(connector)
db.SetMaxOpenConns(1)
})
if connErr != nil {
return connErr
}

conn, err := connector.Connect(ctx)
if err != nil {
return err
}
defer conn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The db variable will be nil on every invocation after the first (which triggers dbOnce). Generally, this code mixes up db and conn multiple places.

I would propose moving the entire olapQuery -> temporary table in a temporary DuckDB handle logic into a util function. Something like:

schema, data, err := olapQuery(...)
if err != nil {...}

conn, tableName, release, err := ephemeralDuckDBForResult(schema, data)
// ...

If you use that for Druid, combined with a call to WithConnection for the DuckDB case (which provides access to the underlying conn *sql.Conn in the callback), you might be able to have common logic for doing the pivot (and export) for both. But it's also worth considering treating both cases completely separately, even if it's more verbose.

Regardless, I think the various bugs around connection management here show that it's worth putting some time into thinking about how to make the logic flows here as simple to understand/review as possible.


var (
dbOnce sync.Once
connector driver.Connector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not safe to reuse a connector for multiple DBs. So it should be created at the same time as the OpenDB call, every time.

@@ -25,6 +43,7 @@ type MetricsViewAggregation struct {
Offset int64 `json:"offset,omitempty"`
MetricsView *runtimev1.MetricsViewSpec `json:"-"`
ResolvedMVSecurity *runtime.ResolvedMetricsViewSecurity `json:"security"`
PivotOn []string `json:"pviot_on,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pviot_on > pivot_on

@egor-ryashin
Copy link
Contributor Author

egor-ryashin commented Nov 29, 2023

First benchmarks:

Running tool: /opt/homebrew/opt/go/libexec/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMetricsViewsAggregation|BenchmarkMetricsViewsAggregation_pivot|BenchmarkMetricsViewsAggregation_spending|BenchmarkMetricsViewsAggregation_spending_pivot)$ github.com/rilldata/rill/runtime/queries -count=1

goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/queries
BenchmarkMetricsViewsAggregation-10                   	     268	   4406848 ns/op	   15070 B/op	     383 allocs/op
BenchmarkMetricsViewsAggregation_pivot-10             	     218	   5671571 ns/op	   19537 B/op	     432 allocs/op
BenchmarkMetricsViewsAggregation_spending-10          	      64	  16482316 ns/op	   15826 B/op	     373 allocs/op
BenchmarkMetricsViewsAggregation_spending_pivot-10    	      38	  32526113 ns/op	   77312 B/op	    1238 allocs/op

The growth is non-linear (may be due to the temporary table - as it isn't limited).

@egor-ryashin
Copy link
Contributor Author

With the limit on subquery the benchmarks are:

Running tool: /opt/homebrew/opt/go/libexec/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMetricsViewsAggregation|BenchmarkMetricsViewsAggregation_pivot|BenchmarkMetricsViewsAggregation_spending|BenchmarkMetricsViewsAggregation_spending_100|BenchmarkMetricsViewsAggregation_spending_pivot|BenchmarkMetricsViewsAggregation_spending_pivot_100)$ github.com/rilldata/rill/runtime/queries -count=1

goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/queries
BenchmarkMetricsViewsAggregation-10                       	     280	   4274759 ns/op	   15055 B/op	     383 allocs/op
BenchmarkMetricsViewsAggregation_pivot-10                 	     216	   5512721 ns/op	   18784 B/op	     407 allocs/op
BenchmarkMetricsViewsAggregation_spending-10              	      67	  16572626 ns/op	   15875 B/op	     373 allocs/op
BenchmarkMetricsViewsAggregation_spending_100-10          	      61	  16938342 ns/op	   87936 B/op	    2356 allocs/op
BenchmarkMetricsViewsAggregation_spending_pivot-10        	      66	  18002439 ns/op	   21155 B/op	     440 allocs/op
BenchmarkMetricsViewsAggregation_spending_pivot_100-10    	      51	  20305096 ns/op	  195780 B/op	    3670 allocs/op

@nishantmonu51 nishantmonu51 added blocker A release blocker issue that should be resolved before a new release and removed blocker A release blocker issue that should be resolved before a new release labels Dec 5, 2023
Copy link
Contributor

@begelundmuller begelundmuller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly, it's not currently adding any limits on the source Druid data. I would suggest we put a cap at e.g. 100k rows, and return an error if it's exceeded (an error is better than returning incorrect/partial pivoted data)

Comment on lines 15 to 18
// rawConn is similar to *sql.Conn.Raw, but additionally unwraps otelsql (which we use for instrumentation).
func rawConn(conn *sql.Conn, f func(driver.Conn) error) error {
func RawConn(conn *sql.Conn, f func(driver.Conn) error) error {
return conn.Raw(func(raw any) error {
// For details, see: https://github.com/XSAM/otelsql/issues/98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an implementation detail we should try to avoid leaking from the driver

Comment on lines 93 to 94
// execute druid query
sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment doesn't match

Comment on lines 133 to 148
dbOnce.Do(func() {
handle, errHandle = duckdbolap.Driver{}.Open(map[string]any{"pool_size": 10}, false, activity.NewNoopClient(), zap.NewNop())
})
if errHandle != nil {
return errHandle
}

schema, data, err := olapQuery(ctx, olap, priority, sqlString, args)
if err != nil {
return err
}

duckDBOLAP, _ := handle.AsOLAP("")
err = duckDBOLAP.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error {
temporaryTableName := tempName("_for_pivot_")
createTableSQL, err := duckdbolap.CreateTableQuery(schema, temporaryTableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid using our DuckDB OLAP driver for this kind of ephemeral in-memory transformation. A few reasons:

  • Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
  • It leaks some pretty internal details of the driver into the runtime/queries package (like the RawConn func should not be public)
  • Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.

As discussed previously, I would suggest creating a generic runtime/pkg/duckdbpivot package with a Pivot function that takes a sqlx.Rows iterator as an argument, and returns a pivoted sqlx.Rows. Internally, it would use database/sql to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.

That would also encapsulate the appender logic in a scoped package, and allow easier testing of the code.

===

An implication for the above is that here, the DuckDB and Druid cases become pretty different (since we want to avoid the overhead of converting from our DuckDB driver to duckdbpivot), but since the pivot SQL is pretty simple, I think some duplication is the best way to keep it simple.

Copy link
Contributor Author

@egor-ryashin egor-ryashin Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run some benchmarks to compare DB initialising vs keeping for a table of 10k rows - 3x slower. My thoughts, yes, there are usually low time requirements for exports, but eventually it's 3x CPU load - though, it's OK until exports is a small part of Runtime CPU consumption (this speculation considers the cloud-based mostly):

Running tool: /opt/homebrew/opt/go/libexec/bin/go test -benchmem -run=^$ -bench ^(BenchmarkTestCreate|BenchmarkTestNoCreate)$ github.com/rilldata/rill/runtime/drivers/duckdb -count=1

goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/drivers/duckdb
BenchmarkTestCreate-10      	      36	  34185877 ns/op	  353456 B/op	   26358 allocs/op
BenchmarkTestNoCreate-10    	      97	  11942874 ns/op	  351659 B/op	   26322 allocs/op
PASS
ok  	github.com/rilldata/rill/runtime/drivers/duckdb	4.416s
package duckdb

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
)

// func initDb(t *testing.B) (*sql.DB, error) {
// 	connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error {
// 		bootQueries := []string{
// 			"INSTALL 'icu'",
// 			"LOAD 'icu'",
// 		}

// 		for _, qry := range bootQueries {
// 			_, err := execer.ExecContext(context.Background(), qry, nil)
// 			if err != nil {
// 				return err
// 			}
// 		}
// 		return nil
// 	})
// 	if err != nil {
// 		return nil, err
// 	}

// 	db := sql.OpenDB(connector)
// 	return db, nil
// }

func BenchmarkTestCreate(b *testing.B) {
	for i := 0; i < b.N; i++ {
		func() {
			db, err := initDb(b)
			require.NoError(b, err)
			defer db.Close()

			_, err = db.ExecContext(context.Background(), `
	CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
			require.NoError(b, err)

			r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
			require.NoError(b, err)
			var cnt int
			var tm time.Time
			for r.Next() {
				err = r.Scan(&tm, &cnt)
				require.NoError(b, err)
			}
			r.Close()

			_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
			require.NoError(b, err)

		}()
	}
}

func BenchmarkTestNoCreate(b *testing.B) {
	db, err := initDb(b)
	require.NoError(b, err)
	defer db.Close()

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, err = db.ExecContext(context.Background(), `
	CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
		require.NoError(b, err)

		r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
		require.NoError(b, err)
		var cnt int
		var tm time.Time
		for r.Next() {
			err = r.Scan(&tm, &cnt)
			require.NoError(b, err)
		}
		r.Close()

		_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
		require.NoError(b, err)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pivot function that takes a sqlx.Rows iterator as an argument, and returns a pivoted sqlx.Rows. Internally, it would use database/sql to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.

won't work. The function creates a handle but doesn't return it and it's not closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I cannot pass sqlx.Rows because I have drivers.Result from the OLAPStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
It leaks some pretty internal details of the driver into the runtime/queries package (like the RawConn func should not be public)
Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.

Addressed those.

Comment on lines +336 to +343
sn := safeName(m.Name)
switch m.BuiltinMeasure {
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_UNSPECIFIED:
expr, err := metricsViewMeasureExpression(mv, m.Name)
if err != nil {
return "", nil, err
}
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, safeName(m.Name)))
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sn only referenced once in the scope – probably simpler to keep as before

Comment on lines 410 to 433
var sql string
if q.PivotOn != nil {
// select m1, m2, d1, d2 from t, lateral unnest(t.d1) tbl(unnested_d1_) where d1 = 'a' group by d1, d2
sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s %[5]s %[6]s %[7]s",
strings.Join(selectCols, ", "), // 1
safeName(mv.Table), // 2
strings.Join(unnestClauses, ""), // 3
whereClause, // 4
groupClause, // 5
orderClause, // 6
limitClause, // 7
)
} else {
sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d",
strings.Join(selectCols, ", "),
safeName(mv.Table),
strings.Join(unnestClauses, ""),
whereClause,
groupClause,
orderClause,
limitClause,
q.Offset,
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They seem equivalent apart from the offset clause? I would suggest we return an error if passing q.Offset together with q.PivotOn, since that's not easy to support right now

@egor-ryashin
Copy link
Contributor Author

egor-ryashin commented Dec 15, 2023

Benchmarks (updated):

goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/queries
BenchmarkMetricsViewsAggregation-10                           	     273	   4442161 ns/op	   15247 B/op	     385 allocs/op
BenchmarkMetricsViewsAggregation_pivot_2_measures-10          	     128	   8201447 ns/op	   82682 B/op	     601 allocs/op
BenchmarkMetricsViewsAggregation_pivot-10                     	     211	   5994564 ns/op	   19126 B/op	     413 allocs/op
BenchmarkMetricsViewsAggregation_spending-10                  	      58	  21029836 ns/op	   15980 B/op	     375 allocs/op
BenchmarkMetricsViewsAggregation_spending_100-10              	      60	  17369018 ns/op	   88083 B/op	    2358 allocs/op
BenchmarkMetricsViewsAggregation_spending_pivot-10            	      60	  18621535 ns/op	   21492 B/op	     446 allocs/op
BenchmarkMetricsViewsAggregation_spending_pivot_100-10        	      54	  19336074 ns/op	  196108 B/op	    3676 allocs/op
BenchmarkMetricsViewsAggregation_Druid-10                     	      27	  84698711 ns/op	   17516 B/op	     328 allocs/op
BenchmarkMetricsViewsAggregation_Druid_2_measures-10          	      16	  77315091 ns/op	   21221 B/op	     389 allocs/op
BenchmarkMetricsViewsAggregation_Druid_pivot-10               	      24	  42822396 ns/op	   14174 B/op	     229 allocs/op
BenchmarkMetricsViewsAggregation_Druid_pivot_2_measures-10    	      14	  94477979 ns/op	   19540 B/op	     307 allocs/op
PASS
ok  	github.com/rilldata/rill/runtime/queries	80.870s

Comment on lines +215 to +219
if count >= batchSize {
appender.Flush()
count = 0
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should additionally enforce a limit on the number of cells to be pivoted (i.e. len(columns) * count < maxPivotCells). Maybe something like 1m cells?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to implement the same against DuckDB (can use a LIMIT clause of maxPivotCells+1 to enforce, and run a count query against the temporary table to determine if the maxPivotCells was exceeded).

Copy link
Contributor

@begelundmuller begelundmuller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also some merge conflicts to address

Comment on lines 474 to 477
} else if q.PivotOn != nil {
l := 1_000_000 / cols
limitClause = fmt.Sprintf("LIMIT %d", l)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Pull the number into a constant at the top of the file, as suggested in the previous comment
  2. We need to return an error if the limit is exceeded (for both DuckDB and Druid). Since otherwise people would get partial/wrong results. So the limit probably needs to be maxPivotCells + 1 and then after querying, it count the number of rows and check it's <= maxPivotCells.

@begelundmuller begelundmuller merged commit 10e5cfe into main Jan 2, 2024
7 checks passed
@begelundmuller begelundmuller deleted the pivot-on branch January 2, 2024 14:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants