-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIVOT ON support #3390
PIVOT ON support #3390
Conversation
163090c
to
ca628b5
Compare
runtime/drivers/duckdb/olap.go
Outdated
func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { | ||
if r == nil { | ||
return nil, nil | ||
} | ||
|
||
cts, err := r.ColumnTypes() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
fields := make([]*runtimev1.StructType_Field, len(cts)) | ||
for i, ct := range cts { | ||
nullable, ok := ct.Nullable() | ||
if !ok { | ||
nullable = true | ||
} | ||
|
||
t, err := DatabaseTypeToPB(ct.DatabaseTypeName(), nullable) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
fields[i] = &runtimev1.StructType_Field{ | ||
Name: ct.Name(), | ||
Type: t, | ||
} | ||
} | ||
|
||
return &runtimev1.StructType{Fields: fields}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a duplicate of RowsToSchema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
duckdbolap "github.com/rilldata/rill/runtime/drivers/duckdb" | ||
|
||
databasesql "database/sql" | ||
"database/sql/driver" | ||
"sync" | ||
|
||
"github.com/marcboeker/go-duckdb" | ||
"github.com/rilldata/rill/runtime/pkg/pbutil" | ||
"google.golang.org/protobuf/types/known/structpb" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken formatting – std lib should be in first group, other libs in second group
@@ -129,16 +268,47 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime | |||
|
|||
return nil | |||
} | |||
func RowsToSchema(r *databasesql.Rows) (*runtimev1.StructType, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate of duckdb.RowsToSchema
?
return r.Err() | ||
} | ||
|
||
func rowsToSchema(r *databasesql.Rows) (*runtimev1.StructType, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another duplicate?
return query, nil | ||
} | ||
|
||
func pbTypeToDuckDB(t *runtimev1.Type) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate from runtime/drivers/duckdb/transporter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
func createTableQuery(schema *runtimev1.StructType, name string) (string, error) { | ||
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s(", safeName(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use TEMPORARY TABLE
return err | ||
} | ||
defer func() { | ||
_, _ = db.ExecContext(ctx, `DROP TABLE "`+temporaryTableName+`"`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also change here – DROP TEMPORARY TABLE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DROP doesn't have TEMPORARY clause
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args) | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth optimizing to avoid serializing and deserializing when the DB is already DuckDB. Additionally, it's also worth pushing exports into DuckDB. So the control flow needs to be something like:
- On DuckDB, run a
create temporary table ... as select ...
- On Druid, we need to run the
olapQuery
and create and insert it into a new temporary table - Then we should execute the pivot against the temporary table OR execute the export against the temporary table
- Lastly, drop the temporary table
var connErr error | ||
var db *databasesql.DB | ||
dbOnce.Do(func() { | ||
connector, connErr = duckdb.NewConnector("", func(conn driver.ExecerContext) error { | ||
_, err := conn.ExecContext(context.Background(), "INSTALL 'json'; LOAD 'json';", nil) | ||
return err | ||
}) | ||
|
||
db = databasesql.OpenDB(connector) | ||
db.SetMaxOpenConns(1) | ||
}) | ||
if connErr != nil { | ||
return connErr | ||
} | ||
|
||
conn, err := connector.Connect(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
defer conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The db
variable will be nil
on every invocation after the first (which triggers dbOnce
). Generally, this code mixes up db
and conn
multiple places.
I would propose moving the entire olapQuery -> temporary table in a temporary DuckDB handle logic into a util function. Something like:
schema, data, err := olapQuery(...)
if err != nil {...}
conn, tableName, release, err := ephemeralDuckDBForResult(schema, data)
// ...
If you use that for Druid, combined with a call to WithConnection
for the DuckDB case (which provides access to the underlying conn *sql.Conn
in the callback), you might be able to have common logic for doing the pivot (and export) for both. But it's also worth considering treating both cases completely separately, even if it's more verbose.
Regardless, I think the various bugs around connection management here show that it's worth putting some time into thinking about how to make the logic flows here as simple to understand/review as possible.
|
||
var ( | ||
dbOnce sync.Once | ||
connector driver.Connector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not safe to reuse a connector for multiple DBs. So it should be created at the same time as the OpenDB
call, every time.
@@ -25,6 +43,7 @@ type MetricsViewAggregation struct { | |||
Offset int64 `json:"offset,omitempty"` | |||
MetricsView *runtimev1.MetricsViewSpec `json:"-"` | |||
ResolvedMVSecurity *runtime.ResolvedMetricsViewSecurity `json:"security"` | |||
PivotOn []string `json:"pviot_on,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pviot_on
> pivot_on
First benchmarks:
The growth is non-linear (may be due to the temporary table - as it isn't limited). |
With the limit on subquery the benchmarks are:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lastly, it's not currently adding any limits on the source Druid data. I would suggest we put a cap at e.g. 100k rows, and return an error if it's exceeded (an error is better than returning incorrect/partial pivoted data)
runtime/drivers/duckdb/utils.go
Outdated
// rawConn is similar to *sql.Conn.Raw, but additionally unwraps otelsql (which we use for instrumentation). | ||
func rawConn(conn *sql.Conn, f func(driver.Conn) error) error { | ||
func RawConn(conn *sql.Conn, f func(driver.Conn) error) error { | ||
return conn.Raw(func(raw any) error { | ||
// For details, see: https://github.com/XSAM/otelsql/issues/98 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an implementation detail we should try to avoid leaking from the driver
// execute druid query | ||
sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment doesn't match
dbOnce.Do(func() { | ||
handle, errHandle = duckdbolap.Driver{}.Open(map[string]any{"pool_size": 10}, false, activity.NewNoopClient(), zap.NewNop()) | ||
}) | ||
if errHandle != nil { | ||
return errHandle | ||
} | ||
|
||
schema, data, err := olapQuery(ctx, olap, priority, sqlString, args) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
duckDBOLAP, _ := handle.AsOLAP("") | ||
err = duckDBOLAP.WithConnection(ctx, priority, false, false, func(ctx context.Context, ensuredCtx context.Context, conn *databasesql.Conn) error { | ||
temporaryTableName := tempName("_for_pivot_") | ||
createTableSQL, err := duckdbolap.CreateTableQuery(schema, temporaryTableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid using our DuckDB OLAP driver for this kind of ephemeral in-memory transformation. A few reasons:
- Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
- It leaks some pretty internal details of the driver into the
runtime/queries
package (like theRawConn
func should not be public) - Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.
As discussed previously, I would suggest creating a generic runtime/pkg/duckdbpivot
package with a Pivot
function that takes a sqlx.Rows
iterator as an argument, and returns a pivoted sqlx.Rows
. Internally, it would use database/sql
to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.
That would also encapsulate the appender logic in a scoped package, and allow easier testing of the code.
===
An implication for the above is that here, the DuckDB and Druid cases become pretty different (since we want to avoid the overhead of converting from our DuckDB driver to duckdbpivot
), but since the pivot SQL is pretty simple, I think some duplication is the best way to keep it simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've run some benchmarks to compare DB initialising vs keeping for a table of 10k rows - 3x slower. My thoughts, yes, there are usually low time requirements for exports, but eventually it's 3x CPU load - though, it's OK until exports is a small part of Runtime CPU consumption (this speculation considers the cloud-based mostly):
Running tool: /opt/homebrew/opt/go/libexec/bin/go test -benchmem -run=^$ -bench ^(BenchmarkTestCreate|BenchmarkTestNoCreate)$ github.com/rilldata/rill/runtime/drivers/duckdb -count=1
goos: darwin
goarch: arm64
pkg: github.com/rilldata/rill/runtime/drivers/duckdb
BenchmarkTestCreate-10 36 34185877 ns/op 353456 B/op 26358 allocs/op
BenchmarkTestNoCreate-10 97 11942874 ns/op 351659 B/op 26322 allocs/op
PASS
ok github.com/rilldata/rill/runtime/drivers/duckdb 4.416s
package duckdb
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// func initDb(t *testing.B) (*sql.DB, error) {
// connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error {
// bootQueries := []string{
// "INSTALL 'icu'",
// "LOAD 'icu'",
// }
// for _, qry := range bootQueries {
// _, err := execer.ExecContext(context.Background(), qry, nil)
// if err != nil {
// return err
// }
// }
// return nil
// })
// if err != nil {
// return nil, err
// }
// db := sql.OpenDB(connector)
// return db, nil
// }
func BenchmarkTestCreate(b *testing.B) {
for i := 0; i < b.N; i++ {
func() {
db, err := initDb(b)
require.NoError(b, err)
defer db.Close()
_, err = db.ExecContext(context.Background(), `
CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
require.NoError(b, err)
r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
require.NoError(b, err)
var cnt int
var tm time.Time
for r.Next() {
err = r.Scan(&tm, &cnt)
require.NoError(b, err)
}
r.Close()
_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
require.NoError(b, err)
}()
}
}
func BenchmarkTestNoCreate(b *testing.B) {
db, err := initDb(b)
require.NoError(b, err)
defer db.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = db.ExecContext(context.Background(), `
CREATE TEMPORARY TABLE tm as select range from range(timestamptz '2022-01-01', timestamptz '2023-01-01', interval '1 hour')`)
require.NoError(b, err)
r, err := db.Query("select time_bucket(interval '1 minute', range, 'Asia/Kathmandu') as bucket, count(*) as cnt from tm group by bucket")
require.NoError(b, err)
var cnt int
var tm time.Time
for r.Next() {
err = r.Scan(&tm, &cnt)
require.NoError(b, err)
}
r.Close()
_, err = db.ExecContext(context.Background(), `DROP TABLE tm`)
require.NoError(b, err)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pivot function that takes a sqlx.Rows iterator as an argument, and returns a pivoted sqlx.Rows. Internally, it would use database/sql to directly open an in-memory DuckDB handle to just create the temporary table and do the pivot.
won't work. The function creates a handle but doesn't return it and it's not closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I cannot pass sqlx.Rows
because I have drivers.Result
from the OLAPStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our driver has a lot of extra overhead, like loading a bunch of extensions, migrations for catalog tables, emitting DB stats telemetry, etc.
It leaks some pretty internal details of the driver into the runtime/queries package (like the RawConn func should not be public)
Lifecycle management of the DuckDB handle becomes tricky with a global here, since DuckDB doesn't release memory it has taken. Since this is just for exports, it's probably better to open/close a handle on each call initially.
Addressed those.
sn := safeName(m.Name) | ||
switch m.BuiltinMeasure { | ||
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_UNSPECIFIED: | ||
expr, err := metricsViewMeasureExpression(mv, m.Name) | ||
if err != nil { | ||
return "", nil, err | ||
} | ||
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, safeName(m.Name))) | ||
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sn
only referenced once in the scope – probably simpler to keep as before
var sql string | ||
if q.PivotOn != nil { | ||
// select m1, m2, d1, d2 from t, lateral unnest(t.d1) tbl(unnested_d1_) where d1 = 'a' group by d1, d2 | ||
sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s %[5]s %[6]s %[7]s", | ||
strings.Join(selectCols, ", "), // 1 | ||
safeName(mv.Table), // 2 | ||
strings.Join(unnestClauses, ""), // 3 | ||
whereClause, // 4 | ||
groupClause, // 5 | ||
orderClause, // 6 | ||
limitClause, // 7 | ||
) | ||
} else { | ||
sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s %s OFFSET %d", | ||
strings.Join(selectCols, ", "), | ||
safeName(mv.Table), | ||
strings.Join(unnestClauses, ""), | ||
whereClause, | ||
groupClause, | ||
orderClause, | ||
limitClause, | ||
q.Offset, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They seem equivalent apart from the offset clause? I would suggest we return an error if passing q.Offset
together with q.PivotOn
, since that's not easy to support right now
Benchmarks (updated):
|
if count >= batchSize { | ||
appender.Flush() | ||
count = 0 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should additionally enforce a limit on the number of cells to be pivoted (i.e. len(columns) * count < maxPivotCells
). Maybe something like 1m cells?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to implement the same against DuckDB (can use a LIMIT
clause of maxPivotCells+1
to enforce, and run a count query against the temporary table to determine if the maxPivotCells
was exceeded).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are also some merge conflicts to address
} else if q.PivotOn != nil { | ||
l := 1_000_000 / cols | ||
limitClause = fmt.Sprintf("LIMIT %d", l) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Pull the number into a constant at the top of the file, as suggested in the previous comment
- We need to return an error if the limit is exceeded (for both DuckDB and Druid). Since otherwise people would get partial/wrong results. So the limit probably needs to be
maxPivotCells + 1
and then after querying, it count the number of rows and check it's<= maxPivotCells
.
No description provided.