diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRule.java deleted file mode 100644 index a2681666398a1..0000000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRule.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.rules.logical; - -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.tools.RelBuilderFactory; - -/** - * This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.JoinPushExpressionsRule}. - * Modification: - Supports SEMI/ANTI join using {@link - * org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil#pushDownJoinConditions} - Only push - * down calls on non-time-indicator field. - */ - -/** - * Planner rule that pushes down expressions in "equal" join condition. - * - *
For example, given "emp JOIN dept ON emp.deptno + 1 = dept.deptno", adds a project above "emp" - * that computes the expression "emp.deptno + 1". The resulting join condition is a simple - * combination of AND, equals, and input fields, plus the remaining non-equal conditions. - */ -public class FlinkJoinPushExpressionsRule extends RelOptRule { - - public static final FlinkJoinPushExpressionsRule INSTANCE = - new FlinkJoinPushExpressionsRule(Join.class, RelFactories.LOGICAL_BUILDER); - - /** Creates a JoinPushExpressionsRule. */ - public FlinkJoinPushExpressionsRule( - Class extends Join> clazz, RelBuilderFactory relBuilderFactory) { - super(operand(clazz, any()), relBuilderFactory, null); - } - - @Deprecated // to be removed before 2.0 - public FlinkJoinPushExpressionsRule( - Class extends Join> clazz, RelFactories.ProjectFactory projectFactory) { - this(clazz, RelBuilder.proto(projectFactory)); - } - - @Override - public void onMatch(RelOptRuleCall call) { - Join join = call.rel(0); - - // Push expression in join condition into Project below Join. - RelNode newJoin = RelOptUtil.pushDownJoinConditions(join, call.builder()); - - // If the join is the same, we bail out - if (newJoin instanceof Join) { - final RexNode newCondition = ((Join) newJoin).getCondition(); - if (join.getCondition().equals(newCondition)) { - return; - } - } - - call.transformTo(newJoin); - } -} - -// End FlinkJoinPushExpressionsRule.java diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 4611cacb26a37..8e11cf08fc96e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -36,7 +36,7 @@ object FlinkBatchRuleSets { FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, - FlinkJoinPushExpressionsRule.INSTANCE + CoreRules.JOIN_PUSH_EXPRESSIONS ) /** Convert sub-queries before query decorrelation. */ @@ -246,7 +246,7 @@ object FlinkBatchRuleSets { CoreRules.SORT_REMOVE, // join rules - FlinkJoinPushExpressionsRule.INSTANCE, + CoreRules.JOIN_PUSH_EXPRESSIONS, SimplifyJoinConditionRule.INSTANCE, // remove union with only a single child diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 109a982d2166e..e3abb9b1f0434 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -36,7 +36,7 @@ object FlinkStreamRuleSets { FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, - FlinkJoinPushExpressionsRule.INSTANCE + CoreRules.JOIN_PUSH_EXPRESSIONS ) /** Convert sub-queries before query decorrelation. */ @@ -249,7 +249,7 @@ object FlinkStreamRuleSets { CoreRules.SORT_REMOVE, // join rules - FlinkJoinPushExpressionsRule.INSTANCE, + CoreRules.JOIN_PUSH_EXPRESSIONS, SimplifyJoinConditionRule.INSTANCE, // remove union with only a single child diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinPushExpressionsRuleTest.xml similarity index 100% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinPushExpressionsRuleTest.xml diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinPushExpressionsRuleTest.scala similarity index 92% rename from flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala rename to flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinPushExpressionsRuleTest.scala index 3b04f9d567180..d336690932fe2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/JoinPushExpressionsRuleTest.scala @@ -22,11 +22,12 @@ import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContex import org.apache.flink.table.planner.utils.TableTestBase import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.{CoreRules, JoinPushExpressionsRule} import org.apache.calcite.tools.RuleSets import org.junit.jupiter.api.{BeforeEach, Test} -/** Tests for [[FlinkJoinPushExpressionsRule]]. */ -class FlinkJoinPushExpressionsRuleTest extends TableTestBase { +/** Tests for [[JoinPushExpressionsRule]]. */ +class JoinPushExpressionsRuleTest extends TableTestBase { private val util = batchTestUtil() @@ -43,7 +44,7 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase { FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, - FlinkJoinPushExpressionsRule.INSTANCE + CoreRules.JOIN_PUSH_EXPRESSIONS )) .build() )