From 4998907331a4813b3206c6fa9a94e396a1174456 Mon Sep 17 00:00:00 2001 From: Jacky Lau Date: Sat, 30 Nov 2024 23:35:27 +0800 Subject: [PATCH] [FLINK-34503][table] Migrate JoinDeriveNullFilterRule to java --- .../logical/JoinDeriveNullFilterRule.java | 134 ++++++++++++++++++ .../logical/JoinDeriveNullFilterRule.scala | 94 ------------ 2 files changed, 134 insertions(+), 94 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.java new file mode 100644 index 0000000000000..6d76c85540cbf --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; + +import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableIntList; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule that filters null values before join if the count null value from join input is + * greater than null filter threshold. + * + *

Since the key of the Null value is impossible to match in the inner join, and there is a + * single point skew due to too many Null values. We should push down a not-null filter into the + * child node of join. + */ +@Value.Enclosing +public class JoinDeriveNullFilterRule + extends RelRule { + + // To avoid the impact of null values on the single join node, + // We will add a null filter (possibly be pushed down) before the join to filter + // null values when the source of InnerJoin has nullCount more than this value. + public static final Long JOIN_NULL_FILTER_THRESHOLD = 2000000L; + + public static final JoinDeriveNullFilterRule INSTANCE = + JoinDeriveNullFilterRule.JoinDeriveNullFilterRuleConfig.DEFAULT.toRule(); + + private JoinDeriveNullFilterRule(JoinDeriveNullFilterRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Join join = call.rel(0); + return join.getJoinType() == JoinRelType.INNER + && !join.analyzeCondition().pairs().isEmpty(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + FlinkRelMetadataQuery mq = + FlinkRelMetadataQuery.reuseOrCreate(join.getCluster().getMetadataQuery()); + + JoinInfo joinInfo = join.analyzeCondition(); + RelNode newLeft = + createIsNotNullFilter( + relBuilder, rexBuilder, mq, join.getLeft(), joinInfo.leftKeys); + RelNode newRight = + createIsNotNullFilter( + relBuilder, rexBuilder, mq, join.getRight(), joinInfo.rightKeys); + + if ((newLeft != join.getLeft()) || (newRight != join.getRight())) { + Join newJoin = join.copy(join.getTraitSet(), Lists.newArrayList(newLeft, newRight)); + call.transformTo(newJoin); + } + } + + private RelNode createIsNotNullFilter( + RelBuilder relBuilder, + RexBuilder rexBuilder, + FlinkRelMetadataQuery mq, + RelNode input, + ImmutableIntList keys) { + List filters = new ArrayList<>(); + for (int key : keys) { + Double nullCount = mq.getColumnNullCount(input, key); + if (nullCount != null && nullCount > JOIN_NULL_FILTER_THRESHOLD) { + filters.add( + relBuilder.call( + SqlStdOperatorTable.IS_NOT_NULL, + rexBuilder.makeInputRef(input, key))); + } + } + + if (!filters.isEmpty()) { + return relBuilder.push(input).filter(filters).build(); + } else { + return input; + } + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface JoinDeriveNullFilterRuleConfig extends RelRule.Config { + JoinDeriveNullFilterRule.JoinDeriveNullFilterRuleConfig DEFAULT = + ImmutableJoinDeriveNullFilterRule.JoinDeriveNullFilterRuleConfig.builder() + .build() + .withOperandSupplier(b0 -> b0.operand(LogicalJoin.class).anyInputs()) + .withDescription("JoinDeriveNullFilterRule"); + + @Override + default JoinDeriveNullFilterRule toRule() { + return new JoinDeriveNullFilterRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.scala deleted file mode 100644 index f107af3aa407e..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDeriveNullFilterRule.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery -import org.apache.flink.table.planner.plan.rules.logical.JoinDeriveNullFilterRule.JOIN_NULL_FILTER_THRESHOLD - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.{Join, JoinRelType} -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.util.ImmutableIntList - -import scala.collection.JavaConversions._ -import scala.collection.mutable - -/** - * Planner rule that filters null values before join if the count null value from join input is - * greater than null filter threshold. - * - * Since the key of the Null value is impossible to match in the inner join, and there is a single - * point skew due to too many Null values. We should push down a not-null filter into the child node - * of join. - */ -class JoinDeriveNullFilterRule - extends RelOptRule(operand(classOf[LogicalJoin], any()), "JoinDeriveNullFilterRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: Join = call.rel(0) - join.getJoinType == JoinRelType.INNER && join.analyzeCondition.pairs().nonEmpty - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val join: LogicalJoin = call.rel(0) - - val rexBuilder = join.getCluster.getRexBuilder - val mq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery) - - def createIsNotNullFilter(input: RelNode, keys: ImmutableIntList): RelNode = { - val relBuilder = call.builder() - val filters = new mutable.ArrayBuffer[RexNode] - keys.foreach { - key => - val nullCount = mq.getColumnNullCount(input, key) - if (nullCount != null && nullCount > JOIN_NULL_FILTER_THRESHOLD) { - filters += relBuilder.call( - SqlStdOperatorTable.IS_NOT_NULL, - rexBuilder.makeInputRef(input, key)) - } - } - if (filters.nonEmpty) { - relBuilder.push(input).filter(filters).build() - } else { - input - } - } - - val joinInfo = join.analyzeCondition - val newLeft = createIsNotNullFilter(join.getLeft, joinInfo.leftKeys) - val newRight = createIsNotNullFilter(join.getRight, joinInfo.rightKeys) - - if ((newLeft ne join.getLeft) || (newRight ne join.getRight)) { - val newJoin = join.copy(join.getTraitSet, Seq(newLeft, newRight)) - call.transformTo(newJoin) - } - } -} - -object JoinDeriveNullFilterRule { - val INSTANCE = new JoinDeriveNullFilterRule - - // To avoid the impact of null values on the single join node, - // We will add a null filter (possibly be pushed down) before the join to filter - // null values when the source of InnerJoin has nullCount more than this value. - val JOIN_NULL_FILTER_THRESHOLD = 2000000L -}