Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Support WindowGroupLimit for row_number, rank and dense_rank #7087

Closed
lgbo-ustc opened this issue Sep 3, 2024 · 4 comments · Fixed by #7176
Closed

[CH] Support WindowGroupLimit for row_number, rank and dense_rank #7087

lgbo-ustc opened this issue Sep 3, 2024 · 4 comments · Fixed by #7176
Labels
enhancement New feature or request

Comments

@lgbo-ustc
Copy link
Contributor

lgbo-ustc commented Sep 3, 2024

Description

Part of #6067.

For ds q44, the phyiscal plan is

CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#74 ASC NULLS FIRST], output=[rnk#74,best_performing#80,worst_performing#81])
   +- ^(16) ProjectExecTransformer [rnk#74, i_product_name#135 AS best_performing#80, i_product_name#180 AS worst_performing#81]
      +- ^(16) CHSortMergeJoinExecTransformer [item_sk#75L], [i_item_sk#159L], Inner, false
         :- ^(16) SortExecTransformer [item_sk#75L ASC NULLS FIRST], false, 0
         :  +- ^(16) InputIteratorTransformer[rnk#74, item_sk#75L, i_product_name#135]
         :     +- ColumnarExchange hashpartitioning(item_sk#75L, 5), ENSURE_REQUIREMENTS, [plan_id=1465], [shuffle_writer_type=hash], [OUTPUT] List(rnk:IntegerType, item_sk:LongType, i_product_name:StringType)
         :        +- ^(14) ProjectExecTransformer [rnk#74, item_sk#75L, i_product_name#135]
         :           +- ^(14) CHSortMergeJoinExecTransformer [item_sk#70L], [i_item_sk#114L], Inner, false
         :              :- ^(14) SortExecTransformer [item_sk#70L ASC NULLS FIRST], false, 0
         :              :  +- ^(14) InputIteratorTransformer[item_sk#70L, rnk#74, item_sk#75L]
         :              :     +- ColumnarExchange hashpartitioning(item_sk#70L, 5), ENSURE_REQUIREMENTS, [plan_id=1407], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rnk:IntegerType, item_sk:LongType)
         :              :        +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74, item_sk#75L]
         :              :           +- ^(12) CHSortMergeJoinExecTransformer [rnk#74], [rnk#79], Inner, false
         :              :              :- ^(12) SortExecTransformer [rnk#74 ASC NULLS FIRST], false, 0
         :              :              :  +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74]
         :              :              :     +- ^(12) FilterExecTransformer ((rnk#74 < 11) AND isnotnull(item_sk#70L))
         :              :              :        +- ^(12) WindowExecTransformer [rank(rank_col#71) windowspecdefinition(rank_col#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#74], [rank_col#71 ASC NULLS FIRST]
         :              :              :           +- ^(12) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :              +- RowToCHNativeColumnar
         :              :              :                 +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Final
         :              :              :                    +- CHNativeColumnarToRow
         :              :              :                       +- ^(8) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                          +- ^(8) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :                             +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1210], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :              :                                +- RowToCHNativeColumnar
         :              :              :                                   +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Partial
         :              :              :                                      +- CHNativeColumnarToRow
         :              :              :                                         +- ^(7) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                                            +- ^(7) FilterExecTransformer (isnotnull(rank_col#71) AND (cast(rank_col#71 as decimal(13,7)) > (0.9 * Subquery scalar-subquery#73, [id=#294])))
         :              :              :                                               :  +- Subquery scalar-subquery#73, [id=#294]
         :              :              :                                               :     +- CHNativeColumnarToRow
         :              :              :                                               :        +- ^(3) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#209))#185 / 100.0) as decimal(11,6)) AS rank_col#72]
         :              :              :                                               :           +- ^(3) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[avg(UnscaledValue(ss_net_profit#209))], isStreamingAgg=false)
         :              :              :                                               :              +- ^(3) InputIteratorTransformer[ss_store_sk#195L, sum#265, count#266L]
         :              :              :                                               :                 +- ColumnarExchange hashpartitioning(ss_store_sk#195L, 5), ENSURE_REQUIREMENTS, [plan_id=287], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                               :                    +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[partial_avg(_pre_0#267L)], isStreamingAgg=false)
         :              :              :                                               :                       +- ^(2) ProjectExecTransformer [ss_store_sk#195L, ss_net_profit#209, UnscaledValue(ss_net_profit#209) AS _pre_0#267L]
         :              :              :                                               :                          +- ^(2) FilterExecTransformer ((isnotnull(ss_store_sk#195L) AND (ss_store_sk#195L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#193L))
         :              :              :                                               :                             +- ^(2) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_hdemo_sk#193L,ss_store_sk#195L,ss_net_profit#209] Batched: true, DataFilters: [isnotnull(ss_store_sk#195L), isnull(ss_hdemo_sk#193L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              :                                               +- ^(7) ProjectExecTransformer [ss_item_sk#91L AS item_sk#70L, cast((avg(UnscaledValue(ss_net_profit#113))#183 / 100.0) as decimal(11,6)) AS rank_col#71]
         :              :              :                                                  +- ^(7) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[avg(UnscaledValue(ss_net_profit#113))], isStreamingAgg=false)
         :              :              :                                                     +- ^(7) InputIteratorTransformer[ss_item_sk#91L, sum#257, count#258L]
         :              :              :                                                        +- ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                                           +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[partial_avg(_pre_2#277L)], isStreamingAgg=false)
         :              :              :                                                              +- ^(6) ProjectExecTransformer [ss_item_sk#91L, ss_net_profit#113, UnscaledValue(ss_net_profit#113) AS _pre_2#277L]
         :              :              :                                                                 +- ^(6) FilterExecTransformer (isnotnull(ss_store_sk#99L) AND (ss_store_sk#99L = cast(2 as bigint)))
         :              :              :                                                                    +- ^(6) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_item_sk#91L,ss_store_sk#99L,ss_net_profit#113] Batched: true, DataFilters: [isnotnull(ss_store_sk#99L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              +- ^(12) SortExecTransformer [rnk#79 ASC NULLS FIRST], false, 0
         :              :                 +- ^(12) ProjectExecTransformer [item_sk#75L, rnk#79]
         :              :                    +- ^(12) FilterExecTransformer ((rnk#79 < 11) AND isnotnull(item_sk#75L))
         :              :                       +- ^(12) WindowExecTransformer [rank(rank_col#76) windowspecdefinition(rank_col#76 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#79], [rank_col#76 DESC NULLS LAST]
         :              :                          +- ^(12) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                             +- RowToCHNativeColumnar
         :              :                                +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Final
         :              :                                   +- CHNativeColumnarToRow
         :              :                                      +- ^(11) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                         +- ^(11) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                                            +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1374], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :                                               +- RowToCHNativeColumnar
         :              :                                                  +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Partial
         :              :                                                     +- CHNativeColumnarToRow
         :              :                                                        +- ^(10) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                                           +- ^(10) FilterExecTransformer (isnotnull(rank_col#76) AND (cast(rank_col#76 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery scalar-subquery#73, [id=#294])))
         :              :                                                              :  +- ReusedSubquery Subquery scalar-subquery#73, [id=#294]
         :              :                                                              +- ^(10) ProjectExecTransformer [ss_item_sk#136L AS item_sk#75L, cast((avg(UnscaledValue(ss_net_profit#158))#184 / 100.0) as decimal(11,6)) AS rank_col#76]
         :              :                                                                 +- ^(10) HashAggregateTransformer(keys=[ss_item_sk#136L], functions=[avg(UnscaledValue(ss_net_profit#158))], isStreamingAgg=false)
         :              :                                                                    +- ^(10) InputIteratorTransformer[ss_item_sk#136L, sum#261, count#262L]
         :              :                                                                       +- ReusedExchange [ss_item_sk#136L, sum#261, count#262L], ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              +- ^(14) SortExecTransformer [i_item_sk#114L ASC NULLS FIRST], false, 0
         :                 +- ^(14) InputIteratorTransformer[i_item_sk#114L, i_product_name#135]
         :                    +- ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)
         :                       +- ^(13) FilterExecTransformer isnotnull(i_item_sk#114L)
         :                          +- ^(13) NativeFileScan parquet spark_catalog.tpcds_pq100.item[i_item_sk#114L,i_product_name#135] Batched: true, DataFilters: [isnotnull(i_item_sk#114L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
         +- ^(16) SortExecTransformer [i_item_sk#159L ASC NULLS FIRST], false, 0
            +- ^(16) InputIteratorTransformer[i_item_sk#159L, i_product_name#180]
               +- ReusedExchange [i_item_sk#159L, i_product_name#180], ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)

q67 and q70 alse fallback on WindowGroupLimit

@lgbo-ustc lgbo-ustc added the enhancement New feature or request label Sep 3, 2024
@lgbo-ustc
Copy link
Contributor Author

The physical plan on spark-3.3 is

CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#5 ASC NULLS FIRST], output=[rnk#5,best_performing#11,worst_performing#12])
   +- ^(11) ProjectExecTransformer [rnk#5, i_product_name#66 AS best_performing#11, i_product_name#111 AS worst_performing#12]
      +- ^(11) CHBroadcastHashJoinExecTransformer [item_sk#6L], [i_item_sk#90L], Inner, BuildRight, false
         :- ^(11) ProjectExecTransformer [rnk#5, item_sk#6L, i_product_name#66]
         :  +- ^(11) CHBroadcastHashJoinExecTransformer [item_sk#1L], [i_item_sk#45L], Inner, BuildRight, false
         :     :- ^(11) ProjectExecTransformer [item_sk#1L, rnk#5, item_sk#6L]
         :     :  +- ^(11) CHSortMergeJoinExecTransformer [rnk#5], [rnk#10], Inner, false
         :     :     :- ^(11) SortExecTransformer [rnk#5 ASC NULLS FIRST], false, 0
         :     :     :  +- ^(11) ProjectExecTransformer [item_sk#1L, rnk#5]
         :     :     :     +- ^(11) FilterExecTransformer ((rnk#5 < 11) AND isnotnull(item_sk#1L))
         :     :     :        +- ^(11) WindowExecTransformer [rank(rank_col#2) windowspecdefinition(rank_col#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#5], [rank_col#2 ASC NULLS FIRST]
         :     :     :           +- ^(11) SortExecTransformer [rank_col#2 ASC NULLS FIRST], false, 0
         :     :     :              +- ^(11) InputIteratorTransformer[item_sk#1L, rank_col#2]
         :     :     :                 +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=896], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :     :     :                    +- ^(6) FilterExecTransformer (isnotnull(rank_col#2) AND (cast(rank_col#2 as decimal(13,7)) > CheckOverflow((promote_precision(cast(0.9 as decimal(11,6))) * promote_precision(Subquery scalar-subquery#4, [id=#222])), DecimalType(13,7))))
         :     :     :                       :  +- Subquery scalar-subquery#4, [id=#222]
         :     :     :                       :     +- CHNativeColumnarToRow
         :     :     :                       :        +- ^(2) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#142))#116 / 100.0) as decimal(11,6)) AS rank_col#3]
         :     :     :                       :           +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#128L], functions=[avg(UnscaledValue(ss_net_profit#142))], isStreamingAgg=false)
         :     :     :                       :              +- ^(2) InputIteratorTransformer[ss_store_sk#128L, sum#204, count#205L]
         :     :     :                       :                 +- ColumnarExchange hashpartitioning(ss_store_sk#128L, 5), ENSURE_REQUIREMENTS, [plan_id=215], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
         :     :     :                       :                    +- ^(1) HashAggregateTransformer(keys=[ss_store_sk#128L], functions=[partial_avg(_pre_0#206L)], isStreamingAgg=false)
         :     :     :                       :                       +- ^(1) ProjectExecTransformer [ss_store_sk#128L, ss_net_profit#142, UnscaledValue(ss_net_profit#142) AS _pre_0#206L]
         :     :     :                       :                          +- ^(1) FilterExecTransformer ((isnotnull(ss_store_sk#128L) AND (ss_store_sk#128L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#126L))
         :     :     :                       :                             +- ^(1) NativeFileScan parquet tpcds_pq100.store_sales[ss_hdemo_sk#126L,ss_store_sk#128L,ss_net_profit#142] Batched: true, DataFilters: [isnotnull(ss_store_sk#128L), isnull(ss_hdemo_sk#126L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :     :     :                       +- ^(6) ProjectExecTransformer [ss_item_sk#22L AS item_sk#1L, cast((avg(UnscaledValue(ss_net_profit#44))#114 / 100.0) as decimal(11,6)) AS rank_col#2]
         :     :     :                          +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#22L], functions=[avg(UnscaledValue(ss_net_profit#44))], isStreamingAgg=false)
         :     :     :                             +- ^(6) InputIteratorTransformer[ss_item_sk#22L, sum#196, count#197L]
         :     :     :                                +- ColumnarExchange hashpartitioning(ss_item_sk#22L, 5), ENSURE_REQUIREMENTS, [plan_id=889], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :     :     :                                   +- ^(5) HashAggregateTransformer(keys=[ss_item_sk#22L], functions=[partial_avg(_pre_2#216L)], isStreamingAgg=false)
         :     :     :                                      +- ^(5) ProjectExecTransformer [ss_item_sk#22L, ss_net_profit#44, UnscaledValue(ss_net_profit#44) AS _pre_2#216L]
         :     :     :                                         +- ^(5) FilterExecTransformer (isnotnull(ss_store_sk#30L) AND (ss_store_sk#30L = cast(2 as bigint)))
         :     :     :                                            +- ^(5) NativeFileScan parquet tpcds_pq100.store_sales[ss_item_sk#22L,ss_store_sk#30L,ss_net_profit#44] Batched: true, DataFilters: [isnotnull(ss_store_sk#30L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :     :     +- ^(11) SortExecTransformer [rnk#10 ASC NULLS FIRST], false, 0
         :     :        +- ^(11) ProjectExecTransformer [item_sk#6L, rnk#10]
         :     :           +- ^(11) FilterExecTransformer ((rnk#10 < 11) AND isnotnull(item_sk#6L))
         :     :              +- ^(11) WindowExecTransformer [rank(rank_col#7) windowspecdefinition(rank_col#7 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#10], [rank_col#7 DESC NULLS LAST]
         :     :                 +- ^(11) SortExecTransformer [rank_col#7 DESC NULLS LAST], false, 0
         :     :                    +- ^(11) InputIteratorTransformer[item_sk#6L, rank_col#7]
         :     :                       +- ReusedExchange [item_sk#6L, rank_col#7], ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=896], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :     +- ^(11) InputIteratorTransformer[i_item_sk#45L, i_product_name#66]
         :        +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=923]
         :           +- ^(9) FilterExecTransformer isnotnull(i_item_sk#45L)
         :              +- ^(9) NativeFileScan parquet tpcds_pq100.item[i_item_sk#45L,i_product_name#66] Batched: true, DataFilters: [isnotnull(i_item_sk#45L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
         +- ^(11) InputIteratorTransformer[i_item_sk#90L, i_product_name#111]
            +- ReusedExchange [i_item_sk#90L, i_product_name#111], ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=923]

@lgbo-ustc
Copy link
Contributor Author

lgbo-ustc commented Sep 3, 2024

Two advantages of WindowGroupLimit

  1. avoid OOM
  2. more less data to be shuffled and processed on the later stages

@lgbo-ustc
Copy link
Contributor Author

lgbo-ustc commented Sep 3, 2024

Base on the WindowTransform in CH, we need to

  1. Adjust the window frame begin bound to a near offset, instead of unbounded preceding, similar to [GLUTEN-6213][CH] Reduce memory usage for some window functions #6214. This will reduce a lot of required memory.
  2. How to drop following blocks directly when the rank value is larger then the limit. This will reduce the overhead of value insertion and later filter.

@lgbo-ustc
Copy link
Contributor Author

Cannot use WindowTransform to implement this, have to make a new processor, it should be simpler then WindowTransform

@lgbo-ustc lgbo-ustc changed the title [CH] Support WindowGroupLimit [CH] Support WindowGroupLimit for row_number, rank and dense_rank Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant