From 163090ca33226f610185f805ff272e4593e27f92 Mon Sep 17 00:00:00 2001 From: Egor Ryashin Date: Mon, 6 Nov 2023 17:26:28 +0300 Subject: [PATCH] pivot-on --- runtime/queries/metricsview_aggregation.go | 51 -------- .../queries/metricsview_aggregation_test.go | 115 ++++++++++++------ 2 files changed, 80 insertions(+), 86 deletions(-) diff --git a/runtime/queries/metricsview_aggregation.go b/runtime/queries/metricsview_aggregation.go index 7ad71c3cb9a8..c824554ac638 100644 --- a/runtime/queries/metricsview_aggregation.go +++ b/runtime/queries/metricsview_aggregation.go @@ -93,26 +93,6 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsView) } - // if olap.Dialect() == drivers.DialectDuckDB { - // // Build query - // sql, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity) - // if err != nil { - // return fmt.Errorf("error building query: %w", err) - // } - - // // Execute - // schema, data, err := olapQuery(ctx, olap, priority, sql, args) - // if err != nil { - // return err - // } - // q.Result = &runtimev1.MetricsViewAggregationResponse{ - // Schema: schema, - // Data: data, - // } - - // return nil - // } - // execute druid query sqlString, args, err := q.buildMetricsAggregationSQL(q.MetricsView, olap.Dialect(), q.ResolvedMVSecurity) if err != nil { @@ -235,7 +215,6 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim limitClause, // 5 q.Offset, // 6 ) - fmt.Println(pivotSQL) r, err := db.QueryContext(ctx, pivotSQL) if err != nil { return err @@ -331,12 +310,10 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric cols := len(q.Dimensions) + len(q.Measures) selectCols := make([]string, 0, cols) - // measureCols := make([]string, 0, len(q.Measures)) groupCols := make([]string, 0, len(q.Dimensions)) args := []any{} for _, d := range q.Dimensions { // Handle regular dimensions - // sn := safeName(d.Name) if d.TimeGrain == runtimev1.TimeGrain_TIME_GRAIN_UNSPECIFIED { col, err := metricsViewDimensionToSafeColumn(mv, d.Name) if err != nil { @@ -367,10 +344,8 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric return "", nil, err } selectCols = append(selectCols, fmt.Sprintf("%s as %s", expr, sn)) - // measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn)) case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT: selectCols = append(selectCols, fmt.Sprintf("COUNT(*) as %s", safeName(m.Name))) - // measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn)) case runtimev1.BuiltinMeasure_BUILTIN_MEASURE_COUNT_DISTINCT: if len(m.BuiltinMeasureArgs) != 1 { return "", nil, fmt.Errorf("builtin measure '%s' expects 1 argument", m.BuiltinMeasure.String()) @@ -380,7 +355,6 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric return "", nil, fmt.Errorf("builtin measure '%s' expects non-empty string argument, got '%v'", m.BuiltinMeasure.String(), m.BuiltinMeasureArgs[0]) } selectCols = append(selectCols, fmt.Sprintf("COUNT(DISTINCT %s) as %s", safeName(arg), safeName(m.Name))) - // measureCols = append(measureCols, fmt.Sprintf("LAST(%s) as %s", sn, sn)) default: return "", nil, fmt.Errorf("unknown builtin measure '%d'", m.BuiltinMeasure) } @@ -437,30 +411,6 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric } var sql string if q.PivotOn != nil { - // if dialect == drivers.DialectDuckDB { - // /* - // Query example: - // PIVOT (SELECT AVG(population) ap, country, year from Cities group by country, year) t ON year USING LAST(ap) ap; - // ┌─────────┬────────────┬────────────┬────────────┐ - // │ Country │ 2000_ap │ 2010_ap │ 2020_ap │ - // │ varchar │ double │ double │ double │ - // ├─────────┼────────────┼────────────┼────────────┤ - // │ NL │ 1005.0 │ 1065.0 │ 1158.0 │ - // │ US │ 4289.5 │ 4391.5 │ 4755.0 │ - // └─────────┴────────────┴────────────┴────────────┘ - // */ - // sql = fmt.Sprintf("PIVOT (SELECT %[1]s FROM %[2]s %[3]s %[4]s) ON %[8]s USING %[9]s %[5]s %[6]s OFFSET %[7]d", - // strings.Join(selectCols, ", "), // 1 - // safeName(mv.Table), // 2 - // whereClause, // 3 - // groupClause, // 4 - // orderClause, // 5 - // limitClause, // 6 - // q.Offset, // 7 - // strings.Join(q.PivotOn, ", "), // 8 // todo safeName - // strings.Join(measureCols, ", "), // 9 - // ) - // } else { // select m1, m2, d1, d2 from t where d1 = 'a' group by d1, d2 sql = fmt.Sprintf("SELECT %[1]s FROM %[2]s %[3]s %[4]s", strings.Join(selectCols, ", "), // 1 @@ -468,7 +418,6 @@ func (q *MetricsViewAggregation) buildMetricsAggregationSQL(mv *runtimev1.Metric whereClause, // 3 groupClause, // 4 ) - // } } else { sql = fmt.Sprintf("SELECT %s FROM %s %s %s %s %s OFFSET %d", strings.Join(selectCols, ", "), diff --git a/runtime/queries/metricsview_aggregation_test.go b/runtime/queries/metricsview_aggregation_test.go index de5833f48a68..b2e231b04d97 100644 --- a/runtime/queries/metricsview_aggregation_test.go +++ b/runtime/queries/metricsview_aggregation_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/structpb" - // Register drivers _ "github.com/rilldata/rill/runtime/drivers/duckdb" ) @@ -53,14 +52,6 @@ func TestMetricsViewsAggregation(t *testing.T) { Name: "timestamp", }, }, - // Filter: &runtimev1.MetricsViewFilter{ - // Include: []*runtimev1.MetricsViewFilter_Cond{ - // { - // Name: "pub", - // In: []*structpb.Value{structpb.NewStringValue("Google")}, - // }, - // }, - // }, Limit: &limit, } @@ -241,6 +232,82 @@ func TestMetricsViewsAggregation_pivot_2_measures(t *testing.T) { require.Equal(t, "", fieldsToString(rows[i], "pub")) } +func TestMetricsViewsAggregation_pivot_2_measures_and_filter(t *testing.T) { + rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + + ctrl, err := rt.Controller(context.Background(), instanceID) + require.NoError(t, err) + r, err := ctrl.Get(context.Background(), &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: "ad_bids_metrics"}, false) + require.NoError(t, err) + mv := r.GetMetricsView().Spec + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + + { + Name: "timestamp", + TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_MONTH, + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "measure_1", + }, + { + Name: "measure_0", + }, + }, + MetricsView: mv, + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + }, + PivotOn: []string{ + "timestamp", + }, + Filter: &runtimev1.MetricsViewFilter{ + Include: []*runtimev1.MetricsViewFilter_Cond{ + { + Name: "pub", + In: []*structpb.Value{structpb.NewStringValue("Google")}, + }, + }, + }, + Limit: &limit, + } + err = q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + for i, row := range q.Result.Data { + for _, f := range row.Fields { + fmt.Printf("%v ", f.AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + + require.Equal(t, q.Result.Schema.Fields[0].Name, "pub") + require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_measure_1") + require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-01-01_measure_0") + + require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-02-01_measure_1") + require.Equal(t, q.Result.Schema.Fields[4].Name, "2022-02-01_measure_0") + + require.Equal(t, q.Result.Schema.Fields[5].Name, "2022-03-01_measure_1") + require.Equal(t, q.Result.Schema.Fields[6].Name, "2022-03-01_measure_0") + + require.Equal(t, 1, len(rows)) + i := 0 + require.Equal(t, "Google", fieldsToString(rows[i], "pub")) +} + func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) { rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") @@ -284,15 +351,6 @@ func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) { Name: "dom", }, }, - // Filter: &runtimev1.MetricsViewFilter{ - // Include: []*runtimev1.MetricsViewFilter_Cond{ - // { - // Name: "pub", - // In: []*structpb.Value{structpb.NewStringValue("Google")}, - // }, - // }, - // }, - PivotOn: []string{ "timestamp", "pub", @@ -315,25 +373,12 @@ func TestMetricsViewsAggregation_pivot_dim_and_measure(t *testing.T) { rows := q.Result.Data require.Equal(t, q.Result.Schema.Fields[0].Name, "dom") - require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_measure_1") - require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-01-01_measure_0") - - require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-02-01_measure_1") - require.Equal(t, q.Result.Schema.Fields[4].Name, "2022-02-01_measure_0") - - require.Equal(t, q.Result.Schema.Fields[5].Name, "2022-03-01_measure_1") - require.Equal(t, q.Result.Schema.Fields[6].Name, "2022-03-01_measure_0") + require.Equal(t, q.Result.Schema.Fields[1].Name, "2022-01-01_Google_measure_1") + require.Equal(t, q.Result.Schema.Fields[2].Name, "2022-02-01_Google_measure_1") + require.Equal(t, q.Result.Schema.Fields[3].Name, "2022-03-01_Google_measure_1") i := 0 - require.Equal(t, "Facebook", fieldsToString(rows[i], "pub")) - i++ - require.Equal(t, "Google", fieldsToString(rows[i], "pub")) - i++ - require.Equal(t, "Microsoft", fieldsToString(rows[i], "pub")) - i++ - require.Equal(t, "Yahoo", fieldsToString(rows[i], "pub")) - i++ - require.Equal(t, "", fieldsToString(rows[i], "pub")) + require.Equal(t, "google.com", fieldsToString(rows[i], "dom")) } func fieldsToString(row *structpb.Struct, args ...string) string {