From 6f037b251db5c6817d492af804af0257034cb202 Mon Sep 17 00:00:00 2001 From: liuyongvs Date: Tue, 17 Dec 2024 10:12:02 +0800 Subject: [PATCH] [FLINK-36911][table] Algin Calcite Rules. --- .../plan/rules/FlinkBatchRuleSets.scala | 21 ++++++++---- .../plan/rules/FlinkStreamRuleSets.scala | 21 ++++++++---- .../table/planner/plan/batch/sql/CalcTest.xml | 3 +- .../plan/batch/sql/PartitionableSinkTest.xml | 3 +- .../plan/batch/sql/SetOperatorsTest.xml | 7 ++-- .../planner/plan/batch/sql/TableSinkTest.xml | 4 +-- .../planner/plan/batch/sql/ValuesTest.xml | 20 +++++------ .../plan/common/ViewsExpandingTest.xml | 6 ++-- ...veUnreachableCoalesceArgumentsRuleTest.xml | 3 +- .../plan/stream/sql/InsertIntoValuesTest.xml | 3 +- .../plan/stream/sql/SetOperatorsTest.xml | 6 ++-- .../planner/plan/stream/sql/ValuesTest.xml | 20 +++++------ .../planner/plan/stream/table/ValuesTest.xml | 34 +++++++------------ 13 files changed, 70 insertions(+), 81 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 90b1ec86471c03..8a8adfac1c4ea2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -63,10 +63,15 @@ object FlinkBatchRuleSets { /** RuleSet to reduce expressions */ private val REDUCE_EXPRESSION_RULES: RuleSet = RuleSets.ofList( - CoreRules.FILTER_REDUCE_EXPRESSIONS, CoreRules.PROJECT_REDUCE_EXPRESSIONS, + CoreRules.FILTER_REDUCE_EXPRESSIONS, CoreRules.CALC_REDUCE_EXPRESSIONS, - CoreRules.JOIN_REDUCE_EXPRESSIONS + CoreRules.WINDOW_REDUCE_EXPRESSIONS, + CoreRules.JOIN_REDUCE_EXPRESSIONS, + CoreRules.FILTER_VALUES_MERGE, + CoreRules.PROJECT_FILTER_VALUES_MERGE, + CoreRules.PROJECT_VALUES_MERGE, + CoreRules.AGGREGATE_VALUES ) /** RuleSet to simplify coalesce invocations */ @@ -176,13 +181,17 @@ object FlinkBatchRuleSets { /** RuleSet to prune empty results rules */ val PRUNE_EMPTY_RULES: RuleSet = RuleSets.ofList( - PruneEmptyRules.AGGREGATE_INSTANCE, + PruneEmptyRules.UNION_INSTANCE, + PruneEmptyRules.INTERSECT_INSTANCE, + PruneEmptyRules.MINUS_INSTANCE, + PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, + PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE, PruneEmptyRules.JOIN_RIGHT_INSTANCE, - PruneEmptyRules.PROJECT_INSTANCE, - PruneEmptyRules.SORT_INSTANCE, - PruneEmptyRules.UNION_INSTANCE + PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE, + PruneEmptyRules.EMPTY_TABLE_INSTANCE ) /** RuleSet about project */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index fb1fec0a1b6d9a..8e895bfdf68f1b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -72,10 +72,15 @@ object FlinkStreamRuleSets { /** RuleSet to reduce expressions */ private val REDUCE_EXPRESSION_RULES: RuleSet = RuleSets.ofList( - CoreRules.FILTER_REDUCE_EXPRESSIONS, CoreRules.PROJECT_REDUCE_EXPRESSIONS, + CoreRules.FILTER_REDUCE_EXPRESSIONS, CoreRules.CALC_REDUCE_EXPRESSIONS, - CoreRules.JOIN_REDUCE_EXPRESSIONS + CoreRules.WINDOW_REDUCE_EXPRESSIONS, + CoreRules.JOIN_REDUCE_EXPRESSIONS, + CoreRules.FILTER_VALUES_MERGE, + CoreRules.PROJECT_FILTER_VALUES_MERGE, + CoreRules.PROJECT_VALUES_MERGE, + CoreRules.AGGREGATE_VALUES ) /** RuleSet to simplify coalesce invocations */ @@ -180,13 +185,17 @@ object FlinkStreamRuleSets { /** RuleSet to prune empty results rules */ val PRUNE_EMPTY_RULES: RuleSet = RuleSets.ofList( - PruneEmptyRules.AGGREGATE_INSTANCE, + PruneEmptyRules.UNION_INSTANCE, + PruneEmptyRules.INTERSECT_INSTANCE, + PruneEmptyRules.MINUS_INSTANCE, + PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, + PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE, PruneEmptyRules.JOIN_RIGHT_INSTANCE, - PruneEmptyRules.PROJECT_INSTANCE, - PruneEmptyRules.SORT_INSTANCE, - PruneEmptyRules.UNION_INSTANCE + PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE, + PruneEmptyRules.EMPTY_TABLE_INSTANCE ) /** RuleSet about project */ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index 69692846ffd63e..d22a22513d9701 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -115,8 +115,7 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[CAST($1):VARCHAR(2147483647) CHARACTER SET " diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml index 0989e80d5c6a20..f583710341bf35 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml @@ -108,8 +108,7 @@ LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml index 7c13155b7675c1..6fb4663cfb54b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml @@ -233,11 +233,8 @@ LogicalMinus(all=[false]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml index b7152ec7d27baa..aa06fa69a482dd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml @@ -30,14 +30,14 @@ Sink(table=[default_catalog.default_database.MyCtasSource], fields=[b, c, d]) +- Calc(select=[b, c, d]) +- Sort(orderBy=[a ASC]) +- Exchange(distribution=[single]) - +- Values(tuples=[[{ 1, 1, 2, _UTF-16LE'd1' }]], values=[a, b, c, d]) + +- Values(tuples=[[{ 1, 2, _UTF-16LE'd1', 1 }]], values=[b, c, d, a]) == Optimized Execution Plan == Sink(table=[default_catalog.default_database.MyCtasSource], fields=[b, c, d]) +- Calc(select=[b, c, d]) +- Sort(orderBy=[a ASC]) +- Exchange(distribution=[single]) - +- Values(tuples=[[{ 1, 1, 2, _UTF-16LE'd1' }]], values=[a, b, c, d]) + +- Values(tuples=[[{ 1, 2, _UTF-16LE'd1', 1 }]], values=[b, c, d, a]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml index 943b60bc49620b..c535836f73c5bb 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml @@ -34,10 +34,8 @@ LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b) @@ -95,12 +93,11 @@ LogicalProject(a=[$0], b=[$1]) @@ -117,8 +114,7 @@ LogicalProject(EXPR$0=[$0]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml index ad37a1cdc1a827..52661803c36205 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml @@ -70,8 +70,7 @@ LogicalProject(f0=[$0], f1=[$1]) @@ -177,8 +176,7 @@ LogicalProject(f0=[$0], f1=[$1]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml index e1ed6ffec6dea9..0c6de10bff25f6 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml @@ -157,8 +157,7 @@ LogicalProject(EXPR$0=[COALESCE(1)], EXPR$1=[COALESCE(1, 2)], EXPR$2=[COALESCE(n diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.xml index a10cc2f3f5f35d..47510efde6d06b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.xml @@ -47,8 +47,7 @@ Sink(table=[default_catalog.default_database.t1], fields=[EXPR$0, EXPR$1]) +- Union(all=[true], union=[EXPR$0, EXPR$1]) :- Calc(select=[CAST(MAP('a', '123', 'b', '123456') AS (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) AS EXPR$0, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS EXPR$1]) : +- Values(tuples=[[{ 0 }]])(reuse_id=[1]) - :- Calc(select=[null:(VARCHAR(2147483647), VARCHAR(2147483647)) MAP AS EXPR$0, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS EXPR$1]) - : +- Reused(reference_id=[1]) + :- Values(tuples=[[{ null, null }]]) +- Calc(select=[CAST(MAP('a', '1', 'b', '1') AS (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) AS EXPR$0, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS EXPR$1]) +- Reused(reference_id=[1]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml index 61552b112b2727..acdf49d4dd134f 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml @@ -231,10 +231,8 @@ LogicalMinus(all=[false]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml index 48aac784bb26e1..264e5cc3804d92 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml @@ -34,10 +34,8 @@ LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b) @@ -77,12 +75,11 @@ LogicalProject(a=[$0], b=[$1]) @@ -99,8 +96,7 @@ LogicalProject(EXPR$0=[$0]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml index f56313f6db68dd..f655bbad7d6b15 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml @@ -73,14 +73,12 @@ LogicalUnion(all=[true]) @@ -137,8 +135,7 @@ Union(all=[true], union=[f0, f1, f2]) : +- Reused(reference_id=[1]) :- Calc(select=[99 AS f0, 'DEFG' AS f1, MAP('a', 1) AS f2]) : +- Reused(reference_id=[1]) -+- Calc(select=[0E-1 AS f0, 'D' AS f1, null:(VARCHAR(4), DOUBLE) MAP AS f2]) - +- Reused(reference_id=[1]) ++- Values(tuples=[[{ 0E-1, _UTF-16LE'D', null }]]) ]]> @@ -161,10 +158,9 @@ LogicalUnion(all=[true]) @@ -255,8 +246,7 @@ LogicalProject(a=[CAST(+(1, 2)):BIGINT NOT NULL], b=[_UTF-16LE'ABC':VARCHAR(4) C