Skip to content

Commit

Permalink
pivot-on
Browse files Browse the repository at this point in the history
  • Loading branch information
Egor Ryashin committed Nov 6, 2023
1 parent eae31ef commit 163090c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 86 deletions.
51 changes: 0 additions & 51 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,6 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim
return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsView)
}

// if olap.Dialect() == drivers.DialectDuckDB {
// // Build query
// sql, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity)
// if err != nil {
// return fmt.Errorf("error building query: %w", err)
// }

// // Execute
// schema, data, err := olapQuery(ctx, olap, priority, sql, args)
// if err != nil {
// return err
// }
// q.Result = &runtimev1.MetricsViewAggregationResponse{
// Schema: schema,
// Data: data,
// }

// return nil
// }

// execute druid query
sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity)
if err != nil {
Expand Down Expand Up @@ -235,7 +215,6 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim
limitClause, // 5
q.Offset, // 6
)
fmt.Println(pivotSQL)
r, err := db.QueryContext(ctx, pivotSQL)
if err != nil {
return err
Expand Down Expand Up @@ -331,12 +310,10 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
cols := len(q.Dimensions) + len(q.Measures)
selectCols := make([]string, 0, cols)

// measureCols := make([]string, 0, len(q.Measures))
groupCols := make([]string, 0, len(q.Dimensions))
args := []any{}
for _, d := range q.Dimensions {
// Handle regular dimensions
// sn := safeName(d.Name)
if d.TimeGrain == runtimev1.TimeGrain_TIME_GRAIN_UNSPECIFIED {
col, err := metricsViewDimensionToSafeColumn(mv, d.Name)
if err != nil {
Expand Down Expand Up @@ -367,10 +344,8 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
return "", nil, err
}
selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn))
// measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn))
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT:
selectCols = append(selectCols, fmt.Sprintf("COUNT(*) as %s", safeName(m.Name)))
// measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn))
case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT_DISTINCT:
if len(m.BuiltinMeasureArgs) != 1 {
return "", nil, fmt.Errorf("builtin measure '%s' expects 1 argument", m.BuiltinMeasure.String())
Expand All @@ -380,7 +355,6 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
return "", nil, fmt.Errorf("builtin measure '%s' expects non-empty string argument, got '%v'", m.BuiltinMeasure.String(), m.BuiltinMeasureArgs[0])
}
selectCols = append(selectCols, fmt.Sprintf("COUNT(DISTINCT %s) as %s", safeName(arg), safeName(m.Name)))
// measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn))
default:
return "", nil, fmt.Errorf("unknown builtin measure '%d'", m.BuiltinMeasure)
}
Expand Down Expand Up @@ -437,38 +411,13 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric
}
var sql string
if q.PivotOn != nil {
// if dialect == drivers.DialectDuckDB {
// /*
// Query example:
// PIVOT (SELECT AVG(population) ap, country, year from Cities group by country, year) t ON year USING LAST(ap) ap;
// ┌─────────┬────────────┬────────────┬────────────┐
// │ Country │ 2000_ap │ 2010_ap │ 2020_ap │
// │ varchar │ double │ double │ double │
// ├─────────┼────────────┼────────────┼────────────┤
// │ NL │ 1005.0 │ 1065.0 │ 1158.0 │
// │ US │ 4289.5 │ 4391.5 │ 4755.0 │
// └─────────┴────────────┴────────────┴────────────┘
// */
// sql = fmt.Sprintf("PIVOT (SELECT %[1]s FROM %[2]s %[3]s %[4]s) ON %[8]s USING %[9]s %[5]s %[6]s OFFSET %[7]d",
// strings.Join(selectCols, ", "), // 1
// safeName(mv.Table), // 2
// whereClause, // 3
// groupClause, // 4
// orderClause, // 5
// limitClause, // 6
// q.Offset, // 7
// strings.Join(q.PivotOn, ", "), // 8 // todo safeName
// strings.Join(measureCols, ", "), // 9
// )
// } else {
// select m1, m2, d1, d2 from t where d1 = 'a' group by d1, d2
sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s",
strings.Join(selectCols, ", "), // 1
safeName(mv.Table), // 2
whereClause, // 3
groupClause, // 4
)
// }
} else {
sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s OFFSET %d",
strings.Join(selectCols, ", "),
Expand Down
115 changes: 80 additions & 35 deletions runtime/queries/metricsview_aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"

// Register drivers
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
)

Expand Down Expand Up @@ -53,14 +52,6 @@ func TestMetricsViewsAggregation(t *testing.T) {
Name: "timestamp",
},
},
// Filter: &runtimev1.MetricsViewFilter{
// Include: []*runtimev1.MetricsViewFilter_Cond{
// {
// Name: "pub",
// In: []*structpb.Value{structpb.NewStringValue("Google")},
// },
// },
// },

Limit: &limit,
}
Expand Down Expand Up @@ -241,6 +232,82 @@ func TestMetricsViewsAggregation_pivot_2_measures(t *testing.T) {
require.Equal(t, "", fieldsToString(rows[i], "pub"))
}

func TestMetricsViewsAggregation_pivot_2_measures_and_filter(t *testing.T) {
rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids")

ctrl, err := rt.Controller(context.Background(), instanceID)
require.NoError(t, err)
r, err := ctrl.Get(context.Background(), &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: "ad_bids_metrics"}, false)
require.NoError(t, err)
mv := r.GetMetricsView().Spec

limit := int64(10)
q := &queries.MetricsViewAggregation{
MetricsViewName: "ad_bids_metrics",
Dimensions: []*runtimev1.MetricsViewAggregationDimension{
{
Name: "pub",
},

{
Name: "timestamp",
TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_MONTH,
},
},
Measures: []*runtimev1.MetricsViewAggregationMeasure{
{
Name: "measure_1",
},
{
Name: "measure_0",
},
},
MetricsView: mv,
Sort: []*runtimev1.MetricsViewAggregationSort{
{
Name: "pub",
},
},
PivotOn: []string{
"timestamp",
},
Filter: &runtimev1.MetricsViewFilter{
Include: []*runtimev1.MetricsViewFilter_Cond{
{
Name: "pub",
In: []*structpb.Value{structpb.NewStringValue("Google")},
},
},
},
Limit: &limit,
}
err = q.Resolve(context.Background(), rt, instanceID, 0)
require.NoError(t, err)
require.NotEmpty(t, q.Result)
for i, row := range q.Result.Data {
for _, f := range row.Fields {
fmt.Printf("%v ", f.AsInterface())
}
fmt.Printf(" %d \n", i)

}
rows := q.Result.Data

require.Equal(t, q.Result.Schema.Fields[0].Name, "pub")
require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-01-01_measure_0")

require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-02-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[4].Name, "2022-02-01_measure_0")

require.Equal(t, q.Result.Schema.Fields[5].Name, "2022-03-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[6].Name, "2022-03-01_measure_0")

require.Equal(t, 1, len(rows))
i := 0
require.Equal(t, "Google", fieldsToString(rows[i], "pub"))
}

func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) {
rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids")

Expand Down Expand Up @@ -284,15 +351,6 @@ func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) {
Name: "dom",
},
},
// Filter: &runtimev1.MetricsViewFilter{
// Include: []*runtimev1.MetricsViewFilter_Cond{
// {
// Name: "pub",
// In: []*structpb.Value{structpb.NewStringValue("Google")},
// },
// },
// },

PivotOn: []string{
"timestamp",
"pub",
Expand All @@ -315,25 +373,12 @@ func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) {
rows := q.Result.Data

require.Equal(t, q.Result.Schema.Fields[0].Name, "dom")
require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-01-01_measure_0")

require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-02-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[4].Name, "2022-02-01_measure_0")

require.Equal(t, q.Result.Schema.Fields[5].Name, "2022-03-01_measure_1")
require.Equal(t, q.Result.Schema.Fields[6].Name, "2022-03-01_measure_0")
require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_Google_measure_1")
require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-02-01_Google_measure_1")
require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-03-01_Google_measure_1")

i := 0
require.Equal(t, "Facebook", fieldsToString(rows[i], "pub"))
i++
require.Equal(t, "Google", fieldsToString(rows[i], "pub"))
i++
require.Equal(t, "Microsoft", fieldsToString(rows[i], "pub"))
i++
require.Equal(t, "Yahoo", fieldsToString(rows[i], "pub"))
i++
require.Equal(t, "", fieldsToString(rows[i], "pub"))
require.Equal(t, "google.com", fieldsToString(rows[i], "dom"))
}

func fieldsToString(row *structpb.Struct, args ...string) string {
Expand Down

0 comments on commit 163090c

Please sign in to comment.