Skip to content

Commit

Permalink
[FLINK-36911][table] Algin Calcite Rules.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyongvs committed Dec 17, 2024
1 parent 9d34beb commit 6f037b2
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ object FlinkBatchRuleSets {

/** RuleSet to reduce expressions */
private val REDUCE_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
CoreRules.FILTER_REDUCE_EXPRESSIONS,
CoreRules.PROJECT_REDUCE_EXPRESSIONS,
CoreRules.FILTER_REDUCE_EXPRESSIONS,
CoreRules.CALC_REDUCE_EXPRESSIONS,
CoreRules.JOIN_REDUCE_EXPRESSIONS
CoreRules.WINDOW_REDUCE_EXPRESSIONS,
CoreRules.JOIN_REDUCE_EXPRESSIONS,
CoreRules.FILTER_VALUES_MERGE,
CoreRules.PROJECT_FILTER_VALUES_MERGE,
CoreRules.PROJECT_VALUES_MERGE,
CoreRules.AGGREGATE_VALUES
)

/** RuleSet to simplify coalesce invocations */
Expand Down Expand Up @@ -176,13 +181,17 @@ object FlinkBatchRuleSets {

/** RuleSet to prune empty results rules */
val PRUNE_EMPTY_RULES: RuleSet = RuleSets.ofList(
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.UNION_INSTANCE,
PruneEmptyRules.INTERSECT_INSTANCE,
PruneEmptyRules.MINUS_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
PruneEmptyRules.EMPTY_TABLE_INSTANCE
)

/** RuleSet about project */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ object FlinkStreamRuleSets {

/** RuleSet to reduce expressions */
private val REDUCE_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
CoreRules.FILTER_REDUCE_EXPRESSIONS,
CoreRules.PROJECT_REDUCE_EXPRESSIONS,
CoreRules.FILTER_REDUCE_EXPRESSIONS,
CoreRules.CALC_REDUCE_EXPRESSIONS,
CoreRules.JOIN_REDUCE_EXPRESSIONS
CoreRules.WINDOW_REDUCE_EXPRESSIONS,
CoreRules.JOIN_REDUCE_EXPRESSIONS,
CoreRules.FILTER_VALUES_MERGE,
CoreRules.PROJECT_FILTER_VALUES_MERGE,
CoreRules.PROJECT_VALUES_MERGE,
CoreRules.AGGREGATE_VALUES
)

/** RuleSet to simplify coalesce invocations */
Expand Down Expand Up @@ -180,13 +185,17 @@ object FlinkStreamRuleSets {

/** RuleSet to prune empty results rules */
val PRUNE_EMPTY_RULES: RuleSet = RuleSets.ofList(
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.UNION_INSTANCE,
PruneEmptyRules.INTERSECT_INSTANCE,
PruneEmptyRules.MINUS_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
PruneEmptyRules.EMPTY_TABLE_INSTANCE
)

/** RuleSet about project */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[CAST($1):VARCHAR(2147483647) CHARACTER SET "
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a AS EXPR$0, CAST(b AS VARCHAR(2147483647)) AS EXPR$1])
+- Values(tuples=[[{ 3, _UTF-16LE'c' }]], values=[a, b])
Values(tuples=[[{ 3, _UTF-16LE'c' }]], values=[EXPR$0, EXPR$1])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
<Resource name="optimized exec plan">
<![CDATA[
Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
+- Calc(select=[5 AS a, 1 AS b, 1 AS c])
+- Values(tuples=[[{ 0 }]], values=[ZERO])
+- Values(tuples=[[{ 5, 1, 1 }]], values=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,8 @@ LogicalMinus(all=[false])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
Calc(select=[c])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ Sink(table=[default_catalog.default_database.MyCtasSource], fields=[b, c, d])
+- Calc(select=[b, c, d])
+- Sort(orderBy=[a ASC])
+- Exchange(distribution=[single])
+- Values(tuples=[[{ 1, 1, 2, _UTF-16LE'd1' }]], values=[a, b, c, d])
+- Values(tuples=[[{ 1, 2, _UTF-16LE'd1', 1 }]], values=[b, c, d, a])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.MyCtasSource], fields=[b, c, d])
+- Calc(select=[b, c, d])
+- Sort(orderBy=[a ASC])
+- Exchange(distribution=[single])
+- Values(tuples=[[{ 1, 1, 2, _UTF-16LE'd1' }]], values=[a, b, c, d])
+- Values(tuples=[[{ 1, 2, _UTF-16LE'd1', 1 }]], values=[b, c, d, a])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)
<Resource name="optimized rel plan">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- Calc(select=[1 AS EXPR$0, 2.0 AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
: +- Values(tuples=[[{ 0 }]], values=[ZERO]), rowType=[RecordType(INTEGER ZERO)]
+- Calc(select=[3 AS EXPR$0, 4.0 AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Values(tuples=[[{ 0 }]], values=[ZERO]), rowType=[RecordType(INTEGER ZERO)]
:- Values(tuples=[[{ 1, 2.0 }]], values=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Values(tuples=[[{ 3, 4 }]], values=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -95,12 +93,11 @@ LogicalProject(a=[$0], b=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1])
:- Calc(select=[1 AS EXPR$0, CAST(2 AS INTEGER) AS EXPR$1])
: +- Values(tuples=[[{ 0 }]], values=[ZERO])(reuse_id=[1])
:- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
: +- Reused(reference_id=[1])
+- Calc(select=[4 AS EXPR$0, CAST(5 AS INTEGER) AS EXPR$1])
+- Reused(reference_id=[1])
:- Calc(select=[EXPR$0, CAST(EXPR$1 AS INTEGER) AS EXPR$1])
: +- Values(tuples=[[{ 1, 2 }]], values=[EXPR$0, EXPR$1])
:- Values(tuples=[[{ 3, null }]], values=[EXPR$0, EXPR$1])
+- Calc(select=[EXPR$0, CAST(EXPR$1 AS INTEGER) AS EXPR$1])
+- Values(tuples=[[{ 4, 5 }]], values=[EXPR$0, EXPR$1])
]]>
</Resource>
</TestCase>
Expand All @@ -117,8 +114,7 @@ LogicalProject(EXPR$0=[$0])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[null:INTEGER AS EXPR$0])
+- Values(tuples=[[{ 0 }]], values=[ZERO])
Values(tuples=[[{ null }]], values=[EXPR$0])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ LogicalProject(f0=[$0], f1=[$1])
<![CDATA[
Calc(select=[f0, name AS f1])
+- Correlate(invocation=[myFunc($cor1.f0)], correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+- Calc(select=[f0])
+- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]], values=[f0])
+- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]], values=[f0])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -177,8 +176,7 @@ LogicalProject(f0=[$0], f1=[$1])
<![CDATA[
Calc(select=[f0, name AS f1])
+- Correlate(invocation=[myFunc($cor1.f0)], correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+- Calc(select=[f0])
+- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]])
+- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ LogicalProject(EXPR$0=[COALESCE(1)], EXPR$1=[COALESCE(1, 2)], EXPR$2=[COALESCE(n
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4, 4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2E0 AS EXPR$14, 2E0 AS EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
+- Values(tuples=[[{ 0 }]])
Values(tuples=[[{ 1, 1, 2, 1, 3, 4, _UTF-16LE'1', _UTF-16LE'1', _UTF-16LE'2', _UTF-16LE'1', _UTF-16LE'3', _UTF-16LE'4', 1.0, 1.0, 2E0, 2E0, 2.0, null }]])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ Sink(table=[default_catalog.default_database.t1], fields=[EXPR$0, EXPR$1])
+- Union(all=[true], union=[EXPR$0, EXPR$1])
:- Calc(select=[CAST(MAP('a', '123', 'b', '123456') AS (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) AS EXPR$0, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS EXPR$1])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Calc(select=[null:(VARCHAR(2147483647), VARCHAR(2147483647)) MAP AS EXPR$0, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS EXPR$1])
: +- Reused(reference_id=[1])
:- Values(tuples=[[{ null, null }]])
+- Calc(select=[CAST(MAP('a', '1', 'b', '1') AS (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) AS EXPR$0, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS EXPR$1])
+- Reused(reference_id=[1])
]]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,8 @@ LogicalMinus(all=[false])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
GroupAggregate(groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[c])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
Calc(select=[c])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)
<Resource name="optimized rel plan">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- Calc(select=[1 AS EXPR$0, 2.0 AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
: +- Values(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+- Calc(select=[3 AS EXPR$0, 4.0 AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Values(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
:- Values(tuples=[[{ 1, 2.0 }]]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Values(tuples=[[{ 3, 4 }]]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -77,12 +75,11 @@ LogicalProject(a=[$0], b=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1])
:- Calc(select=[1 AS EXPR$0, CAST(2 AS INTEGER) AS EXPR$1])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
: +- Reused(reference_id=[1])
+- Calc(select=[4 AS EXPR$0, CAST(5 AS INTEGER) AS EXPR$1])
+- Reused(reference_id=[1])
:- Calc(select=[EXPR$0, CAST(EXPR$1 AS INTEGER) AS EXPR$1])
: +- Values(tuples=[[{ 1, 2 }]])
:- Values(tuples=[[{ 3, null }]])
+- Calc(select=[EXPR$0, CAST(EXPR$1 AS INTEGER) AS EXPR$1])
+- Values(tuples=[[{ 4, 5 }]])
]]>
</Resource>
</TestCase>
Expand All @@ -99,8 +96,7 @@ LogicalProject(EXPR$0=[$0])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[null:INTEGER AS EXPR$0])
+- Values(tuples=[[{ 0 }]])
Values(tuples=[[{ null }]])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ LogicalUnion(all=[true])
<Resource name="optimized exec plan">
<![CDATA[
Union(all=[true], union=[f0, f1, f2])
:- Calc(select=[1 AS f0, 'ABC' AS f1, null:INTEGER AS f2])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Values(tuples=[[{ 1, _UTF-16LE'ABC', null }]])
:- Calc(select=[3.141592653589793E0 AS f0, 'ABC' AS f1, CAST(1 AS INTEGER) AS f2])
: +- Reused(reference_id=[1])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Calc(select=[3.1E0 AS f0, 'DEF' AS f1, CAST(2 AS INTEGER) AS f2])
: +- Reused(reference_id=[1])
:- Calc(select=[99 AS f0, 'DEFG' AS f1, null:INTEGER AS f2])
: +- Reused(reference_id=[1])
:- Values(tuples=[[{ 99, _UTF-16LE'DEFG', null }]])
+- Calc(select=[0E-1 AS f0, 'D' AS f1, CAST(4 AS INTEGER) AS f2])
+- Reused(reference_id=[1])
]]>
Expand Down Expand Up @@ -137,8 +135,7 @@ Union(all=[true], union=[f0, f1, f2])
: +- Reused(reference_id=[1])
:- Calc(select=[99 AS f0, 'DEFG' AS f1, MAP('a', 1) AS f2])
: +- Reused(reference_id=[1])
+- Calc(select=[0E-1 AS f0, 'D' AS f1, null:(VARCHAR(4), DOUBLE) MAP AS f2])
+- Reused(reference_id=[1])
+- Values(tuples=[[{ 0E-1, _UTF-16LE'D', null }]])
]]>
</Resource>
</TestCase>
Expand All @@ -161,10 +158,9 @@ LogicalUnion(all=[true])
<Resource name="optimized exec plan">
<![CDATA[
Union(all=[true], union=[f0, f1, f2])
:- Calc(select=[1 AS f0, 'ABC' AS f1, null:INTEGER AS f2])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Values(tuples=[[{ 1, _UTF-16LE'ABC', null }]])
:- Calc(select=[3.141592653589793E0 AS f0, 'ABC' AS f1, CAST(1 AS INTEGER) AS f2])
: +- Reused(reference_id=[1])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Calc(select=[3.1E0 AS f0, 'DEF' AS f1, CAST(2 AS INTEGER) AS f2])
: +- Reused(reference_id=[1])
:- Calc(select=[99 AS f0, 'DEFG' AS f1, CAST(3 AS INTEGER) AS f2])
Expand Down Expand Up @@ -213,16 +209,11 @@ LogicalUnion(all=[true])
<Resource name="optimized exec plan">
<![CDATA[
Union(all=[true], union=[a, b])
:- Calc(select=[1 AS a, 'ABC ' AS b])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
:- Calc(select=[3.141592653589793E0 AS a, 'ABC ' AS b])
: +- Reused(reference_id=[1])
:- Calc(select=[3.1E0 AS a, 'DEF ' AS b])
: +- Reused(reference_id=[1])
:- Calc(select=[99 AS a, 'DEFG' AS b])
: +- Reused(reference_id=[1])
+- Calc(select=[0E-1 AS a, 'D ' AS b])
+- Reused(reference_id=[1])
:- Values(tuples=[[{ 1, _UTF-16LE'ABC ' }]])
:- Values(tuples=[[{ 3.141592653589793E0, _UTF-16LE'ABC ' }]])
:- Values(tuples=[[{ 3.1E0, _UTF-16LE'DEF ' }]])
:- Values(tuples=[[{ 99, _UTF-16LE'DEFG' }]])
+- Values(tuples=[[{ 0E-1, _UTF-16LE'D ' }]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -255,8 +246,7 @@ LogicalProject(a=[CAST(+(1, 2)):BIGINT NOT NULL], b=[_UTF-16LE'ABC':VARCHAR(4) C
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[3 AS a, 'ABC' AS b, X'01020300' AS c])
+- Values(tuples=[[{ 0 }]])
Values(tuples=[[{ 3, _UTF-16LE'ABC', X'01020300' }]])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit 6f037b2

Please sign in to comment.