From eb49de31f2f28673462b4deca97c2b6517765276 Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Thu, 19 Dec 2024 16:55:35 +0800 Subject: [PATCH] Refactor mv partition compensate codes Signed-off-by: shuming.li --- .../catalog/MvBaseTableUpdateInfo.java | 16 +- .../starrocks/catalog/MvRefreshArbiter.java | 8 +- .../com/starrocks/catalog/MvUpdateInfo.java | 211 ++++++++- .../catalog/mv/MVTimelinessArbiter.java | 29 +- .../mv/MVTimelinessListPartitionArbiter.java | 9 +- .../mv/MVTimelinessNonPartitionArbiter.java | 12 +- .../mv/MVTimelinessRangePartitionArbiter.java | 8 +- .../persist/MVTaskRunExtraMessage.java | 7 +- .../sql/optimizer/MvRewritePreprocessor.java | 3 +- ...aterializedViewTransparentRewriteRule.java | 2 +- .../materialization/MVTransparentState.java | 4 + .../MvPartitionCompensator.java | 8 +- .../materialization/MvUtils.java | 4 +- .../compensation/BaseCompensation.java | 49 ++- .../ExternalTableCompensation.java | 23 +- .../ExternalTableCompensationV1.java | 168 ++++++++ .../ExternalTableCompensationV2.java | 48 +++ .../compensation/MVCompensation.java | 55 ++- .../compensation/MVCompensationBuilder.java | 400 +++--------------- .../compensation/OlapTableCompensation.java | 27 +- .../compensation/OptCompensator.java | 179 ++------ .../com/starrocks/utframe/UtFrameUtils.java | 2 +- 22 files changed, 673 insertions(+), 599 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV1.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV2.java diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MvBaseTableUpdateInfo.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MvBaseTableUpdateInfo.java index 28489f703dbd10..ca1c0d39129ff8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MvBaseTableUpdateInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MvBaseTableUpdateInfo.java @@ -32,7 +32,7 @@ public class MvBaseTableUpdateInfo { // The partition names of base table that have been updated private final Set toRefreshPartitionNames = Sets.newHashSet(); // The mapping of partition name to partition range - private final Map partitonToCells = Maps.newHashMap(); + private final Map partitionToCells = Maps.newHashMap(); // If the base table is a mv, needs to record the mapping of mv partition name to partition range private final Map mvPartitionNameToCellMap = Maps.newHashMap(); @@ -52,8 +52,8 @@ public Set getToRefreshPartitionNames() { return toRefreshPartitionNames; } - public Map getPartitonToCells() { - return partitonToCells; + public Map getPartitionToCells() { + return partitionToCells; } /** @@ -71,14 +71,14 @@ public void addToRefreshPartitionNames(Set toRefreshPartitionNames) { */ public void addRangePartitionKeys(String partitionName, Range rangePartitionKey) { - partitonToCells.put(partitionName, new PRangeCell(rangePartitionKey)); + partitionToCells.put(partitionName, new PRangeCell(rangePartitionKey)); } /** * Add partition name that needs to be refreshed and its associated list partition key */ public void addPartitionCells(Map cells) { - partitonToCells.putAll(cells); + partitionToCells.putAll(cells); } /** @@ -86,7 +86,7 @@ public void addPartitionCells(Map cells) { */ public Map> getPartitionNameWithRanges() { Map> result = Maps.newHashMap(); - for (Map.Entry e : partitonToCells.entrySet()) { + for (Map.Entry e : partitionToCells.entrySet()) { Preconditions.checkState(e.getValue() instanceof PRangeCell); PRangeCell rangeCell = (PRangeCell) e.getValue(); result.put(e.getKey(), rangeCell.getRange()); @@ -99,7 +99,7 @@ public Map> getPartitionNameWithRanges() { */ public Map getPartitionNameWithLists() { Map result = Maps.newHashMap(); - for (Map.Entry e : partitonToCells.entrySet()) { + for (Map.Entry e : partitionToCells.entrySet()) { Preconditions.checkState(e.getValue() instanceof PListCell); PListCell listCell = (PListCell) e.getValue(); result.put(e.getKey(), listCell); @@ -111,7 +111,7 @@ public Map getPartitionNameWithLists() { public String toString() { return "BaseTableRefreshInfo{" + ", toRefreshPartitionNames=" + toRefreshPartitionNames + - ", nameToPartKeys=" + partitonToCells + + ", nameToPartKeys=" + partitionToCells + '}'; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MvRefreshArbiter.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MvRefreshArbiter.java index ae039620aaf76e..a3d12373ae67fb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MvRefreshArbiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MvRefreshArbiter.java @@ -63,7 +63,7 @@ public static boolean needsToRefreshTable(MaterializedView mv, Table table, bool public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolean isQueryRewrite) { // Skip check for sync materialized view. if (mv.getRefreshScheme().isSync()) { - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } // check mv's query rewrite consistency mode property only in query rewrite. @@ -72,9 +72,9 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea if (isQueryRewrite) { switch (mvConsistencyRewriteMode) { case DISABLE: - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); case NOCHECK: - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); case LOOSE: case CHECKED: default: @@ -89,7 +89,7 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea return timelinessArbiter.getMVTimelinessUpdateInfo(mvConsistencyRewriteMode); } catch (AnalysisException e) { logMVPrepare(mv, "Failed to get mv timeliness info: {}", DebugUtil.getStackTrace(e)); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.UNKNOWN); + return MvUpdateInfo.unknown(mv); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java index 2157c33df9f592..ccc1f51bd126ff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java @@ -14,21 +14,41 @@ package com.starrocks.catalog; +import com.google.api.client.util.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.starrocks.analysis.Expr; import com.starrocks.common.Config; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.connector.PartitionUtil; import com.starrocks.sql.common.PCell; +import com.starrocks.sql.common.PListCell; +import com.starrocks.sql.common.PRangeCell; +import com.starrocks.sql.optimizer.operator.ScanOperatorPredicates; +import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator; +import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MvPartitionCompensator; +import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.BaseCompensation; +import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.ExternalTableCompensationV1; +import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.OlapTableCompensation; import org.apache.commons.collections4.CollectionUtils; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVRewrite; import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.shrinkToSize; /** * Store the update information of MV used for mv rewrite(mv refresh can use it later). */ public class MvUpdateInfo { + private final MaterializedView mv; // The type of mv refresh later private final MvToRefreshType mvToRefreshType; // The partition names of mv to refresh @@ -45,6 +65,9 @@ public class MvUpdateInfo { // If the base table is a mv, needs to record the mapping of mv partition name to partition range private final Map mvPartitionNameToCellMap = Maps.newHashMap(); + // For force_mv rewrite, compensate partition expr is used to rewrite the query. + private Expr compensatePartitionExpr; + /** * Marks the type of mv refresh later. */ @@ -55,16 +78,36 @@ public enum MvToRefreshType { UNKNOWN // unknown type } - public MvUpdateInfo(MvToRefreshType mvToRefreshType) { + public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType) { + this.mv = mv; this.mvToRefreshType = mvToRefreshType; this.queryRewriteConsistencyMode = TableProperty.QueryRewriteConsistencyMode.CHECKED; } - public MvUpdateInfo(MvToRefreshType mvToRefreshType, TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) { + public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType, + TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) { + this.mv = mv; this.mvToRefreshType = mvToRefreshType; this.queryRewriteConsistencyMode = queryRewriteConsistencyMode; } + public static MvUpdateInfo unknown(MaterializedView mv) { + return new MvUpdateInfo(mv, MvToRefreshType.UNKNOWN); + } + + public static MvUpdateInfo noRefresh(MaterializedView mv) { + return new MvUpdateInfo(mv, MvToRefreshType.NO_REFRESH); + } + + public static MvUpdateInfo fullRefresh(MaterializedView mv) { + return new MvUpdateInfo(mv, MvToRefreshType.FULL); + } + + public static MvUpdateInfo partialRefresh(MaterializedView mv, + TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) { + return new MvUpdateInfo(mv, MvToRefreshType.PARTIAL, queryRewriteConsistencyMode); + } + public MvToRefreshType getMvToRefreshType() { return mvToRefreshType; } @@ -120,25 +163,12 @@ public String toString() { '}'; } - /** - * @return the detail string of the mv update info - */ - public String toDetailString() { - return "MvUpdateInfo{" + - "refreshType=" + mvToRefreshType + - ", mvToRefreshPartitionNames=" + mvToRefreshPartitionNames + - ", baseTableUpdateInfos=" + baseTableUpdateInfos + - ", basePartToMvPartNames=" + basePartToMvPartNames + - ", mvPartToBasePartNames=" + mvPartToBasePartNames + - '}'; - } - /** * Get the ref base table partition names to refresh for the given mv. * @param refBaseTable: the input ref base table * @return: the partition names to refresh of the ref base table. */ - public Set getBaseTableToRefreshPartitionNames(Table refBaseTable) { + private Set getBaseTableToRefreshPartitionNames(Table refBaseTable) { if (mvToRefreshPartitionNames.isEmpty() || mvToRefreshType == MvToRefreshType.NO_REFRESH) { return Sets.newHashSet(); } @@ -167,4 +197,153 @@ public Set getBaseTableToRefreshPartitionNames(Table refBaseTable) { } return refBaseTableToRefreshPartitionNames; } + + /** + * Get the compensation for the ref base table which has no input query plan but to compensate a consistent mv. + */ + public BaseCompensation getRefBaseTableCompensation(Table refBaseTable, + Optional scanOperatorOpt) { + Set toRefreshPartitionNames = getBaseTableToRefreshPartitionNames(refBaseTable); + if (toRefreshPartitionNames == null) { + logMVRewrite(mv.getName(), "MV's ref base table {} to refresh partition is null, unknown state", + refBaseTable.getName()); + return null; + } + if (toRefreshPartitionNames.isEmpty()) { + logMVRewrite(mv.getName(), "MV's ref base table {} to refresh partition is empty, no need compensate", + refBaseTable.getName()); + return BaseCompensation.noCompensation(); + } + + if (refBaseTable.isNativeTableOrMaterializedView()) { + return getCompensationForOlap(refBaseTable, scanOperatorOpt, toRefreshPartitionNames); + } else if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) { + return getCompensationForExternal(refBaseTable, scanOperatorOpt, toRefreshPartitionNames); + } else { + return BaseCompensation.unknown(); + } + } + + private BaseCompensation getCompensationForOlap(Table refBaseTable, + Optional scanOperatorOpt, + Set toRefreshPartitionNames) { + // only retain the partition ids which are selected by the query plan. + if (scanOperatorOpt.isPresent()) { + LogicalOlapScanOperator logicalOlapScanOperator = (LogicalOlapScanOperator) scanOperatorOpt.get(); + Set selectedPartitionNames = logicalOlapScanOperator.getSelectedPartitionId() + .stream() + .map(p -> refBaseTable.getPartition(p)) + .map(p -> p.getName()) + .collect(Collectors.toSet()); + // if all selected partitions need to refresh, no need to rewrite. + if (toRefreshPartitionNames.containsAll(selectedPartitionNames)) { + return BaseCompensation.noRewrite(); + } + // only retain the selected partitions to refresh. + toRefreshPartitionNames.retainAll(selectedPartitionNames); + } + // if no partition need to refresh, no need to compensate. + if (toRefreshPartitionNames.isEmpty()) { + return BaseCompensation.noCompensation(); + } + List refTablePartitionIdsToRefresh = toRefreshPartitionNames.stream() + .map(name -> refBaseTable.getPartition(name)) + .filter(Objects::nonNull) + .map(p -> p.getId()) + .collect(Collectors.toList()); + return new OlapTableCompensation(refTablePartitionIdsToRefresh); + } + + private BaseCompensation getCompensationForExternal(Table refBaseTable, + Optional scanOperatorOpt, + Set toRefreshPartitionNames) { + List selectPartitionKeys = null; + if (scanOperatorOpt.isPresent()) { + Collection selectPartitionIds = null; + try { + ScanOperatorPredicates scanOperatorPredicates = scanOperatorOpt.get().getScanOperatorPredicates(); + selectPartitionIds = scanOperatorPredicates.getSelectedPartitionIds(); + selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys(); + } catch (Exception e) { + return null; + } + // For scan operator that supports prune partitions with OptExternalPartitionPruner, + // we could only compensate partitions which selected partitions need to refresh. + if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) { + if (CollectionUtils.isEmpty(selectPartitionIds)) { + // see OptExternalPartitionPruner#computePartitionInfo: + // it's different meaning when selectPartitionIds is null and empty for hive and other tables + if (refBaseTable instanceof HiveTable) { + return BaseCompensation.noCompensation(); + } else { + return BaseCompensation.unknown(); + } + } + } + } + + if (selectPartitionKeys != null) { + List colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mv, refBaseTable); + if (colIndexes == null) { + logMVRewrite(mv.getName(), "Failed to get partition column indexes for ref base table: {}", + refBaseTable.getName()); + return null; + } + Set newSelectedPartitionKeys = selectPartitionKeys + .stream() + .map(p -> PartitionUtil.getSelectedPartitionKey(p, colIndexes)) + .collect(Collectors.toSet());; + Set selectPartitionNames = newSelectedPartitionKeys.stream() + .map(PartitionUtil::generateMVPartitionName) + .collect(Collectors.toSet()); + // NOTE: use partition names rather than partition keys since the partition key may be different for null value. + // if all selected partitions need to refresh, no need to rewrite. + if (toRefreshPartitionNames.containsAll(selectPartitionNames)) { + logMVRewrite(mv.getName(), "All external table {}'s selected partitions {} need to refresh, no rewrite", + refBaseTable.getName(), selectPartitionKeys); + return BaseCompensation.noRewrite(); + } + // filter the selected partitions to refresh. + toRefreshPartitionNames.retainAll(selectPartitionNames); + + // if no partition needs to refresh, no need to compensate. + if (toRefreshPartitionNames.isEmpty()) { + return BaseCompensation.noCompensation(); + } + } + List toRefreshPartitionKeys = + getPartitionKeysToRefresh(refBaseTable, toRefreshPartitionNames); + return new ExternalTableCompensationV1(toRefreshPartitionKeys); + } + + private List getPartitionKeysToRefresh(Table refBaseTable, Set refTablePartitionNamesToRefresh) { + // get the partition keys to refresh + MvBaseTableUpdateInfo baseTableUpdateInfo = baseTableUpdateInfos.get(refBaseTable); + if (baseTableUpdateInfo == null) { + return null; + } + // use update info's partition to cells since it's accurate. + Map nameToPartitionKeys = baseTableUpdateInfo.getPartitionToCells(); + List toRefreshPartitionKeys = Lists.newArrayList(); + try { + for (String partitionName : refTablePartitionNamesToRefresh) { + if (!nameToPartitionKeys.containsKey(partitionName)) { + return null; + } + PCell pCell = nameToPartitionKeys.get(partitionName); + if (pCell instanceof PRangeCell) { + toRefreshPartitionKeys.add(((PRangeCell) pCell).getRange().lowerEndpoint()); + } else if (pCell instanceof PListCell) { + List partitionColumns = refBaseTable.getPartitionColumns(); + List keys = ((PListCell) pCell).toPartitionKeys(partitionColumns); + toRefreshPartitionKeys.addAll(keys); + } + } + } catch (Exception e) { + logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(), + DebugUtil.getStackTrace(e)); + return null; + } + return toRefreshPartitionKeys; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessArbiter.java b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessArbiter.java index cd608a2e3542c7..06154dee899dd4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessArbiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessArbiter.java @@ -259,12 +259,10 @@ public PartitionDiff getChangedPartitionDiff(MaterializedView mv, * Only need to check the mv partition existence. */ public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() { - MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL, - TableProperty.QueryRewriteConsistencyMode.LOOSE); Map> refBaseTablePartitionMap = syncBaseTablePartitions(mv); if (refBaseTablePartitionMap == null) { logMVPrepare(mv, "Sync base table partition infos failed"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap); @@ -272,6 +270,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() { return null; } Map adds = diff.getAdds(); + MvUpdateInfo mvUpdateInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.LOOSE); if (!CollectionUtils.sizeIsEmpty(adds)) { adds.keySet().stream().forEach(mvPartitionName -> mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName)); @@ -318,24 +317,12 @@ public Set getMVRetentionPartitionNames(MaterializedView mv, /** * TODO: Optimize performance in loos/force_mv mode + * TODO: in loose mode, ignore partition that both exists in baseTable and mv */ protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdateInfo) { Map> refBaseTableAndColumns = mv.getRefBaseTablePartitionColumns(); // collect & update mv's to refresh partitions based on base table's partition changes collectBaseTableUpdatePartitionNames(refBaseTableAndColumns, mvUpdateInfo); - Set refBaseTables = mv.getRefBaseTablePartitionColumns().keySet(); - MaterializedView.AsyncRefreshContext context = mv.getRefreshScheme().getAsyncRefreshContext(); - for (Table table : refBaseTables) { - Map mvBaseTableVisibleVersionMap = - context.getBaseTableVisibleVersionMap() - .computeIfAbsent(table.getId(), k -> Maps.newHashMap()); - for (String partitionName : mvBaseTableVisibleVersionMap.keySet()) { - if (mvUpdateInfo.getBaseTableToRefreshPartitionNames(table) != null) { - // in loose mode, ignore partition that both exists in baseTable and mv - mvUpdateInfo.getBaseTableToRefreshPartitionNames(table).remove(partitionName); - } - } - } } /** @@ -347,15 +334,12 @@ protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdate public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() { String retentionCondition = mv.getTableProperty().getPartitionRetentionCondition(); if (Strings.isNullOrEmpty(retentionCondition)) { - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } - MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL, - TableProperty.QueryRewriteConsistencyMode.FORCE_MV); - Map> refBaseTablePartitionMap = syncBaseTablePartitions(mv); if (refBaseTablePartitionMap == null) { logMVPrepare(mv, "Sync base table partition infos failed"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap); @@ -366,8 +350,9 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() { Map adds = diff.getAdds(); // no partition added if (CollectionUtils.sizeIsEmpty(adds)) { - return mvUpdateInfo; + return MvUpdateInfo.noRefresh(mv); } + MvUpdateInfo mvUpdateInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.FORCE_MV); Set retentionPartitionNames = getMVRetentionPartitionNames(mv, retentionCondition, adds); if (retentionPartitionNames == null) { logMVPrepare(mv, "Get expired partitions by retention condition failed"); diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessListPartitionArbiter.java b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessListPartitionArbiter.java index 4936aaa0e8b474..06148f79088548 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessListPartitionArbiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessListPartitionArbiter.java @@ -21,6 +21,7 @@ import com.starrocks.catalog.MvUpdateInfo; import com.starrocks.catalog.PartitionInfo; import com.starrocks.catalog.Table; +import com.starrocks.catalog.TableProperty; import com.starrocks.common.AnalysisException; import com.starrocks.sql.common.ListPartitionDiffer; import com.starrocks.sql.common.PCell; @@ -59,11 +60,11 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio boolean isRefreshBasedOnNonRefTables = needsRefreshOnNonRefBaseTables(refBaseTablePartitionColumns); logMVPrepare(mv, "MV refresh based on non-ref base table:{}", isRefreshBasedOnNonRefTables); if (isRefreshBasedOnNonRefTables) { - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // update mv's to refresh partitions based on base table's partition changes - MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL); + MvUpdateInfo mvTimelinessInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.CHECKED); Map> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTablePartitionColumns, mvTimelinessInfo); @@ -71,7 +72,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio Map> refBaseTablePartitionMap = syncBaseTablePartitions(mv); if (refBaseTablePartitionMap == null) { logMVPrepare(mv, "Sync base table partition infos failed"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // If base table is materialized view, add partition name to cell mapping into base table partition mapping, // otherwise base table(mv) may lose partition names of the real base table changed partitions. @@ -80,7 +81,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap); if (diff == null) { logMVPrepare(mv, "Partitioned mv compute list diff failed"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // update into mv's to refresh partitions diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessNonPartitionArbiter.java b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessNonPartitionArbiter.java index 1fe419d3095fe7..cc93bd321e036b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessNonPartitionArbiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessNonPartitionArbiter.java @@ -64,7 +64,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() { // skip check external table if the external does not support rewrite. if (!table.isNativeTableOrMaterializedView() && isDisableExternalForceQueryRewrite) { logMVPrepare(mv, "Non-partitioned contains external table, and it's disabled query rewrite"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // once mv's base table has updated, refresh the materialized view totally. @@ -73,10 +73,10 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() { if (mvBaseTableUpdateInfo != null && CollectionUtils.isNotEmpty(mvBaseTableUpdateInfo.getToRefreshPartitionNames())) { logMVPrepare(mv, "Non-partitioned base table has updated, need refresh totally."); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } } - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } @Override @@ -84,14 +84,14 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() { List partitions = Lists.newArrayList(mv.getPartitions()); if (partitions.size() > 0 && partitions.get(0).getDefaultPhysicalPartition().getVisibleVersion() <= 1) { // the mv is newly created, can not use it to rewrite query. - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } @Override public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() { // for force mv mode, always no need to refresh for non-partitioned mv. - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessRangePartitionArbiter.java b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessRangePartitionArbiter.java index e180d48ad420ed..3016449e742b23 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessRangePartitionArbiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/mv/MVTimelinessRangePartitionArbiter.java @@ -23,6 +23,7 @@ import com.starrocks.catalog.MvUpdateInfo; import com.starrocks.catalog.PartitionInfo; import com.starrocks.catalog.Table; +import com.starrocks.catalog.TableProperty; import com.starrocks.common.AnalysisException; import com.starrocks.scheduler.TableWithPartitions; import com.starrocks.sql.common.PCell; @@ -68,11 +69,12 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep boolean isRefreshBasedOnNonRefTables = needsRefreshOnNonRefBaseTables(refBaseTablePartitionColumns); logMVPrepare(mv, "MV refresh based on non-ref base table:{}", isRefreshBasedOnNonRefTables); if (isRefreshBasedOnNonRefTables) { - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // record the relation of partitions between materialized view and base partition table - MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL); + MvUpdateInfo mvTimelinessInfo = MvUpdateInfo.partialRefresh(mv, + TableProperty.QueryRewriteConsistencyMode.CHECKED); // collect & update mv's to refresh partitions based on base table's partition changes Map> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTablePartitionColumns, mvTimelinessInfo); @@ -85,7 +87,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep Map> basePartitionNameToRangeMap = syncBaseTablePartitions(mv); if (basePartitionNameToRangeMap == null) { logMVPrepare(mv, "Sync base table partition infos failed"); - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + return MvUpdateInfo.fullRefresh(mv); } // If base table is materialized view, add partition name to cell mapping into base table partition mapping, diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java index ebdb1468bb4839..c1f99192b45859 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java @@ -102,8 +102,11 @@ public Set getMvPartitionsToRefresh() { } public void setMvPartitionsToRefresh(Set mvPartitionsToRefresh) { - this.mvPartitionsToRefresh = MvUtils.shrinkToSize(mvPartitionsToRefresh, - Config.max_mv_task_run_meta_message_values_length); + if (mvPartitionsToRefresh == null) { + return; + } + this.mvPartitionsToRefresh = Sets.newHashSet(MvUtils.shrinkToSize(mvPartitionsToRefresh, + Config.max_mv_task_run_meta_message_values_length)); } public Map> getBasePartitionsToRefreshMap() { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java index 8d29cc28a8498e..8f53f4da1c3753 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java @@ -745,7 +745,8 @@ public void prepareRelatedMVs(Set
queryTables, if (!checkMvPartitionNamesToRefresh(mv, partitionNamesToRefresh, mvPlanContext)) { continue; } - logMVPrepare(mv, "MV' partitions to refresh: {}", partitionNamesToRefresh); + logMVPrepare(mv, "MV' partitions to refresh: {}/{}", partitionNamesToRefresh.size(), + MvUtils.shrinkToSize(partitionNamesToRefresh, Config.max_mv_task_run_meta_message_values_length)); // mv's partial partition predicates ScalarOperator mvPartialPartitionPredicates = diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/MaterializedViewTransparentRewriteRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/MaterializedViewTransparentRewriteRule.java index b1f3a3751f4b09..23f62bc40281b0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/MaterializedViewTransparentRewriteRule.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/MaterializedViewTransparentRewriteRule.java @@ -200,7 +200,7 @@ private OptExpression redirectToMVDefinedQuery(OptimizerContext context, MvPlanContext mvPlanContext, LogicalOlapScanOperator olapScanOperator, Set
queryTables) { - MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL); + MvUpdateInfo mvUpdateInfo = MvUpdateInfo.fullRefresh(mv); mvUpdateInfo.addMvToRefreshPartitionNames(mv.getPartitionNames()); MaterializationContext mvContext = MvRewritePreprocessor.buildMaterializationContext(context, diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTransparentState.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTransparentState.java index d2660677483d75..4ed0dbafced8ea 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTransparentState.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTransparentState.java @@ -45,4 +45,8 @@ public boolean isCompensate() { public boolean isUnknown() { return this == UNKNOWN; } + + public boolean isUncompensable() { + return this == NO_REWRITE || this == UNKNOWN; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvPartitionCompensator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvPartitionCompensator.java index 7598b0916c5b23..2dae383cda1c0a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvPartitionCompensator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvPartitionCompensator.java @@ -68,7 +68,6 @@ import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; import com.starrocks.sql.optimizer.rule.Rule; -import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.BaseCompensation; import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.MVCompensation; import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.MVCompensationBuilder; import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.OptCompensator; @@ -155,7 +154,7 @@ public static MVCompensation getMvCompensation(OptExpression queryPlan, // If mv contains no partitions to refresh, no need compensate if (Objects.isNull(mvPartitionNameToRefresh) || mvPartitionNameToRefresh.isEmpty()) { logMVRewrite(mvContext, "MV has no partitions to refresh, no need compensate"); - return MVCompensation.createNoCompensateState(sessionVariable); + return MVCompensation.noCompensate(sessionVariable); } MVCompensationBuilder mvCompensationBuilder = new MVCompensationBuilder(mvContext, mvUpdateInfo); @@ -215,7 +214,7 @@ private static Pair> getMvQueryPlan(Mater deriveLogicalProperty(newMvQueryPlan); List orgMvQueryOutputColumnRefs = mvContext.getMvOutputColumnRefs(); List mvQueryOutputColumnRefs = duplicator.getMappedColumns(orgMvQueryOutputColumnRefs); - newMvQueryPlan.getOp().setOpRuleBit(OP_MV_UNION_REWRITE); + // newMvQueryPlan.getOp().setOpRuleBit(OP_MV_UNION_REWRITE); return Pair.create(newMvQueryPlan, mvQueryOutputColumnRefs); } @@ -226,9 +225,8 @@ public static OptExpression getMvCompensateQueryPlan(MaterializationContext mvCo if (mv.getRefBaseTablePartitionColumns() == null) { return null; } - Map> refTableCompensations = mvCompensation.getCompensations(); return OptCompensator.getMVCompensatePlan(mvContext.getOptimizerContext(), - mv, refTableCompensations, mvQueryPlan); + mv, mvCompensation, mvQueryPlan); } /** diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java index 50bb56d8d09ef2..33219a9986de37 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java @@ -1492,7 +1492,7 @@ public static DistributionDesc getDistributionDesc(MaterializedView materialized * * @return the trimmed set. */ - public static Set shrinkToSize(Set set, int maxLength) { + public static Collection shrinkToSize(Collection set, int maxLength) { if (set != null && set.size() > maxLength) { return set.stream().limit(maxLength).collect(Collectors.toSet()); } @@ -1550,7 +1550,7 @@ public static String formatBaseTableInfos(List baseTableInfos) { return baseTableInfos.stream().map(BaseTableInfo::getReadableString).collect(Collectors.joining(",")); } - public static ScalarOperator convertPartitionKeysToListPredicate(List partitionColRefs, + public static ScalarOperator convertPartitionKeysToListPredicate(List partitionColRefs, Collection partitionRanges) { List values = Lists.newArrayList(); if (partitionColRefs.size() == 1) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/BaseCompensation.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/BaseCompensation.java index 39a7ca5cd671a6..5642b56bf5bbdd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/BaseCompensation.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/BaseCompensation.java @@ -14,7 +14,7 @@ package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; -import java.util.List; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; /** * To support mv compensation(transparent) rewrite, we need to compensate some partitions from the defined query of mv @@ -23,19 +23,50 @@ * There are some differences for different types of tables to compensate: * - OlapTable, partition ids that are already updated. * - ExternalTable, partition keys that are already changed. - * @param */ -public abstract class BaseCompensation { - private List compensations; +public abstract class BaseCompensation { + protected MVTransparentState state; - public BaseCompensation(List compensations) { - this.compensations = compensations; + public BaseCompensation(MVTransparentState state) { + this.state = state; } - public List getCompensations() { - return compensations; + public MVTransparentState getState() { + return state; } @Override - public abstract String toString(); + public String toString() { + return state.toString(); + } + + public static class NoCompensation extends BaseCompensation { + public NoCompensation() { + super(MVTransparentState.NO_COMPENSATE); + } + } + + public static class UnknownCompensation extends BaseCompensation { + public UnknownCompensation() { + super(MVTransparentState.UNKNOWN); + } + } + + public static class NoRewriteCompensation extends BaseCompensation { + public NoRewriteCompensation() { + super(MVTransparentState.NO_REWRITE); + } + } + + public static BaseCompensation noCompensation() { + return new NoCompensation(); + } + + public static BaseCompensation noRewrite() { + return new NoRewriteCompensation(); + } + + public static BaseCompensation unknown() { + return new UnknownCompensation(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensation.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensation.java index 0fb30792b5a3ff..ec944b71f9edfe 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensation.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensation.java @@ -14,21 +14,24 @@ package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; -import com.starrocks.catalog.PartitionKey; +import com.starrocks.catalog.MaterializedView; +import com.starrocks.catalog.Table; +import com.starrocks.sql.optimizer.OptimizerContext; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; import java.util.List; -public class ExternalTableCompensation extends BaseCompensation { - public ExternalTableCompensation(List partitionKeys) { - super(partitionKeys); +public abstract class ExternalTableCompensation extends BaseCompensation { + public ExternalTableCompensation(MVTransparentState state) { + super(state); } - @Override - public String toString() { - return "ExternalTableCompensation{" + - "partitionKeys=" + getCompensations() + - '}'; - } + public abstract ScalarOperator getCompensateExpr(OptimizerContext optimizerContext, + MaterializedView mv, + Table refBaseTable, + List refPartitionColRefs); } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV1.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV1.java new file mode 100644 index 00000000000000..957f0a0eef5340 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV1.java @@ -0,0 +1,168 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.starrocks.analysis.BinaryType; +import com.starrocks.analysis.Expr; +import com.starrocks.analysis.LiteralExpr; +import com.starrocks.analysis.SlotRef; +import com.starrocks.analysis.TableName; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.IcebergTable; +import com.starrocks.catalog.MaterializedView; +import com.starrocks.catalog.PartitionInfo; +import com.starrocks.catalog.PartitionKey; +import com.starrocks.catalog.Table; +import com.starrocks.common.Config; +import com.starrocks.qe.ConnectContext; +import com.starrocks.sql.analyzer.AnalyzeState; +import com.starrocks.sql.analyzer.ExpressionAnalyzer; +import com.starrocks.sql.analyzer.Field; +import com.starrocks.sql.analyzer.RelationFields; +import com.starrocks.sql.analyzer.RelationId; +import com.starrocks.sql.analyzer.Scope; +import com.starrocks.sql.optimizer.OptimizerContext; +import com.starrocks.sql.optimizer.Utils; +import com.starrocks.sql.optimizer.operator.scalar.BinaryPredicateOperator; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; +import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; +import com.starrocks.sql.optimizer.transformer.ExpressionMapping; +import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.starrocks.connector.iceberg.IcebergPartitionUtils.getIcebergTablePartitionPredicateExpr; +import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.convertPartitionKeysToListPredicate; + +public final class ExternalTableCompensationV1 extends ExternalTableCompensation { + private List compensations; + + public ExternalTableCompensationV1(List compensations) { + super(MVTransparentState.COMPENSATE); + this.compensations = compensations; + } + + @Override + public ScalarOperator getCompensateExpr(OptimizerContext optimizerContext, + MaterializedView mv, + Table refBaseTable, + List refPartitionColRefs) { + if (refBaseTable instanceof IcebergTable) { + IcebergTable cachedIcebergTable = (IcebergTable) refBaseTable; + String catalogName = cachedIcebergTable.getCatalogName(); + String dbName = cachedIcebergTable.getCatalogDBName(); + TableName refTableName = new TableName(catalogName, dbName, cachedIcebergTable.getName()); + return getIcebergTableCompensationExpr(optimizerContext, mv, refBaseTable, refTableName, refPartitionColRefs); + } else { + return convertPartitionKeysToListPredicate(refPartitionColRefs, compensations); + } + } + + private ScalarOperator getIcebergTableCompensationExpr(OptimizerContext optimizerContext, + MaterializedView mv, + Table refBaseTable, + TableName refTableName, + List refPartitionColRefs) { + PartitionInfo mvPartitionInfo = mv.getPartitionInfo(); + if (!mvPartitionInfo.isListPartition()) { + return convertPartitionKeysToListPredicate(refPartitionColRefs, compensations); + } + List mvPartitionCols = mv.getPartitionColumns(); + // to iceberg, `partitionKeys` are using LocalTime as partition values which cannot be used to prune iceberg + // partitions directly because iceberg uses UTC time in its partition metadata. + // convert `partitionKeys` to iceberg utc time here. + // Please see MVPCTRefreshListPartitioner#genPartitionPredicate for more details. + Map> refBaseTablePartitionSlotRefs = mv.getRefBaseTablePartitionSlots(); + Preconditions.checkArgument(refBaseTablePartitionSlotRefs.containsKey(refBaseTable)); + List refBaseTableSlotRefs = refBaseTablePartitionSlotRefs.get(refBaseTable); + ExpressionMapping expressionMapping = + new ExpressionMapping(new Scope(RelationId.anonymous(), new RelationFields()), + Lists.newArrayList()); + for (int i = 0; i < refPartitionColRefs.size(); i++) { + ColumnRefOperator refPartitionColRef = refPartitionColRefs.get(i); + SlotRef refBaseTablePartitionExpr = refBaseTableSlotRefs.get(i); + expressionMapping.put(refBaseTablePartitionExpr, refPartitionColRef); + } + AnalyzeState analyzeState = new AnalyzeState(); + Scope scope = new Scope(RelationId.anonymous(), new RelationFields( + refBaseTable.getBaseSchema().stream() + .map(col -> new Field(col.getName(), + col.getType(), refTableName, null)) + .collect(Collectors.toList()))); + List externalPredicates = Lists.newArrayList(); + List refBaseTablePartitionCols = refPartitionColRefs.stream() + .map(ref -> refBaseTable.getColumn(ref.getName())) + .collect(Collectors.toList()); + for (PartitionKey partitionKey : compensations) { + List literalExprs = partitionKey.getKeys(); + Preconditions.checkState(literalExprs.size() == refPartitionColRefs.size()); + List predicates = Lists.newArrayList(); + for (int i = 0; i < literalExprs.size(); i++) { + Column mvColumn = mvPartitionCols.get(i); + LiteralExpr literalExpr = literalExprs.get(i); + ColumnRefOperator refPartitionColRef = refPartitionColRefs.get(i); + ConstantOperator expectPartitionVal = + (ConstantOperator) SqlToScalarOperatorTranslator.translate(literalExpr); + if (!mvColumn.isGeneratedColumn()) { + ScalarOperator eq = new BinaryPredicateOperator(BinaryType.EQ, refPartitionColRef, + expectPartitionVal); + predicates.add(eq); + } else { + SlotRef refBaseTablePartitionExpr = refBaseTableSlotRefs.get(i); + Column refColumn = refBaseTablePartitionCols.get(i); + Expr predicateExpr = getIcebergTablePartitionPredicateExpr((IcebergTable) refBaseTable, + refColumn.getName(), refBaseTablePartitionExpr, literalExpr); + ExpressionAnalyzer.analyzeExpression(predicateExpr, analyzeState, scope, ConnectContext.get()); + ScalarOperator predicate = SqlToScalarOperatorTranslator.translate(predicateExpr, expressionMapping, + optimizerContext.getColumnRefFactory()); + predicates.add(predicate); + } + } + externalPredicates.add(Utils.compoundAnd(predicates)); + } + return Utils.compoundOr(externalPredicates); + } + + @Override + public String toString() { + if (CollectionUtils.isEmpty(compensations)) { + return ""; + } + StringBuilder sb = new StringBuilder(); + sb.append("size=").append(compensations.size()); + int size = Math.min(Config.max_mv_task_run_meta_message_values_length, compensations.size()); + sb.append(" ["); + for (int i = 0; i < size; i++) { + PartitionKey key = compensations.get(i); + List keys = key.getKeys() + .stream() + .map(LiteralExpr::getStringValue) + .collect(Collectors.toList()); + sb.append(", (").append(Joiner.on(",").join(keys)).append(")"); + } + sb.append("]"); + return sb.toString(); + } +} + + diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV2.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV2.java new file mode 100644 index 00000000000000..dd22fd9326f680 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/ExternalTableCompensationV2.java @@ -0,0 +1,48 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; + +import com.starrocks.catalog.MaterializedView; +import com.starrocks.catalog.Table; +import com.starrocks.sql.optimizer.OptimizerContext; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; + +import java.util.List; + +public final class ExternalTableCompensationV2 extends ExternalTableCompensation { + private ScalarOperator compensation; + + public ExternalTableCompensationV2(ScalarOperator compensation) { + super(MVTransparentState.COMPENSATE); + this.compensation = compensation; + } + + @Override + public ScalarOperator getCompensateExpr(OptimizerContext optimizerContext, + MaterializedView mv, + Table refBaseTable, + List refPartitionColRefs) { + return null; + } + + @Override + public String toString() { + return ""; + } +} + + diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensation.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensation.java index cf3b3bbaafac54..4f64b3fcece84b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensation.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensation.java @@ -14,11 +14,11 @@ package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; +import com.google.common.base.Preconditions; import com.starrocks.catalog.Table; -import com.starrocks.common.Config; import com.starrocks.qe.SessionVariable; import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; -import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,33 +34,47 @@ public class MVCompensation { private final SessionVariable sessionVariable; // The state of compensation. private final MVTransparentState state; - private final Map> compensations; + private final Map compensations; public MVCompensation(SessionVariable sessionVariable, MVTransparentState state, - Map> compensations) { + Map compensations) { this.sessionVariable = sessionVariable; this.state = state; + if (state.isCompensate()) { + Preconditions.checkArgument(compensations != null && !compensations.isEmpty()); + } this.compensations = compensations; } - public static MVCompensation createNoCompensateState(SessionVariable sessionVariable) { - return new MVCompensation(sessionVariable, MVTransparentState.NO_COMPENSATE, null); + public static MVCompensation compensate(SessionVariable sessionVariable, + Map compensations) { + if (CollectionUtils.sizeIsEmpty(compensations)) { + return noCompensate(sessionVariable); + } else { + return new MVCompensation(sessionVariable, MVTransparentState.COMPENSATE, compensations); + } } - public static MVCompensation createNoRewriteState(SessionVariable sessionVariable) { - return new MVCompensation(sessionVariable, MVTransparentState.NO_REWRITE, null); + public static MVCompensation noCompensate(SessionVariable sessionVariable) { + return new MVCompensation(sessionVariable, MVTransparentState.NO_COMPENSATE, null); } - public static MVCompensation createUnkownState(SessionVariable sessionVariable) { + public static MVCompensation unknown(SessionVariable sessionVariable) { return new MVCompensation(sessionVariable, MVTransparentState.UNKNOWN, null); } + public static MVCompensation withState(SessionVariable sessionVariable, + MVTransparentState state) { + Preconditions.checkArgument(state != MVTransparentState.COMPENSATE); + return new MVCompensation(sessionVariable, state, null); + } + public MVTransparentState getState() { return state; } - public Map> getCompensations() { + public Map getCompensations() { return compensations; } @@ -93,11 +107,24 @@ public boolean isCompensatePartitionPredicate() { } } + public boolean isTableNeedCompensate(Table table) { + return !CollectionUtils.sizeIsEmpty(compensations) && compensations.containsKey(table); + } + + public BaseCompensation getTableCompensation(Table table) { + return compensations.get(table); + } + @Override public String toString() { - return "MvCompensation{" + - "state=" + state + - ", compensations=" + MvUtils.shrinkToSize(compensations, Config.max_mv_task_run_meta_message_values_length) + - '}'; + StringBuilder sb = new StringBuilder(); + sb.append("state=").append(state); + if (!CollectionUtils.sizeIsEmpty(compensations)) { + for (Map.Entry entry : compensations.entrySet()) { + sb.append(" [table=").append(entry.getKey().getName()); + sb.append(", ").append(entry.getValue().toString()).append("]"); + } + } + return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensationBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensationBuilder.java index fc349927818316..ae566e2127df57 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensationBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/MVCompensationBuilder.java @@ -14,31 +14,15 @@ package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; -import com.google.api.client.util.Lists; -import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; import com.starrocks.catalog.Column; import com.starrocks.catalog.MaterializedView; import com.starrocks.catalog.MvBaseTableUpdateInfo; import com.starrocks.catalog.MvUpdateInfo; -import com.starrocks.catalog.OlapTable; -import com.starrocks.catalog.Partition; -import com.starrocks.catalog.PartitionInfo; -import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; -import com.starrocks.common.AnalysisException; -import com.starrocks.common.util.DebugUtil; -import com.starrocks.connector.PartitionUtil; import com.starrocks.qe.SessionVariable; -import com.starrocks.sql.common.PCell; -import com.starrocks.sql.common.PListCell; -import com.starrocks.sql.common.PRangeCell; import com.starrocks.sql.optimizer.MaterializationContext; import com.starrocks.sql.optimizer.OptExpression; -import com.starrocks.sql.optimizer.operator.OperatorType; -import com.starrocks.sql.optimizer.operator.ScanOperatorPredicates; import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator; import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator; import com.starrocks.sql.optimizer.operator.logical.LogicalViewScanOperator; @@ -49,17 +33,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import static com.starrocks.connector.PartitionUtil.generateMVPartitionName; import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVRewrite; -import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvPartitionCompensator.SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES; /** * MVCompensationBuilder is used to build a mv compensation for materialized view in mv rewrite. @@ -84,36 +63,25 @@ public MVCompensationBuilder(MaterializationContext mvContext, public MVCompensation buildMvCompensation(OptExpression queryPlan) { SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable(); List scanOperators = MvUtils.getScanOperator(queryPlan); - // If no scan operator, no need compensate - if (scanOperators.isEmpty()) { - return MVCompensation.createUnkownState(sessionVariable); - } - if (scanOperators.stream().anyMatch(scan -> scan instanceof LogicalViewScanOperator)) { - return MVCompensation.createUnkownState(sessionVariable); - } - // If no partition to refresh, return directly. Set mvToRefreshPartitionNames = mvUpdateInfo.getMvToRefreshPartitionNames(); if (CollectionUtils.isEmpty(mvToRefreshPartitionNames)) { - return MVCompensation.createNoCompensateState(sessionVariable); + return MVCompensation.noCompensate(sessionVariable); } // If no scan operator, no need compensate - if (scanOperators.isEmpty()) { - return MVCompensation.createUnkownState(sessionVariable); - } - if (scanOperators.stream().anyMatch(scan -> scan instanceof LogicalViewScanOperator)) { - return MVCompensation.createUnkownState(sessionVariable); + if (scanOperators.isEmpty() || scanOperators.stream().anyMatch(scan -> scan instanceof LogicalViewScanOperator)) { + return MVCompensation.unknown(sessionVariable); } MaterializedView mv = mvContext.getMv(); Map> refBaseTablePartitionColumns = mv.getRefBaseTablePartitionColumns(); if (CollectionUtils.sizeIsEmpty(refBaseTablePartitionColumns)) { logMVRewrite("MV's not partitioned, failed to get partition keys: {}", mv.getName()); - return MVCompensation.createUnkownState(sessionVariable); + return MVCompensation.unknown(sessionVariable); } - Map> compensations = Maps.newHashMap(); + Map compensations = Maps.newHashMap(); for (LogicalScanOperator scanOperator : scanOperators) { Table refBaseTable = scanOperator.getTable(); if (!refBaseTablePartitionColumns.containsKey(refBaseTable)) { @@ -122,27 +90,25 @@ public MVCompensation buildMvCompensation(OptExpression queryPlan) { // If the ref table contains no partitions to refresh, no need compensate. // If the mv is partitioned and non-ref table needs refresh, then all partitions need to be refreshed; // it cannot be a candidate. - Set partitionNamesToRefresh = mvUpdateInfo.getBaseTableToRefreshPartitionNames(refBaseTable); - if (partitionNamesToRefresh == null) { + BaseCompensation baseCompensation = getMVCompensationOfTable(refBaseTable, scanOperator); + // NPE check + if (baseCompensation == null) { logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is null, unknown state", refBaseTable.getName()); - return MVCompensation.createUnkownState(sessionVariable); + return MVCompensation.unknown(sessionVariable); } - if (partitionNamesToRefresh.isEmpty()) { - logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is empty, no need compensate", - refBaseTable.getName()); - continue; - } - MVCompensation subCompensation = getMVCompensationOfTable(refBaseTable, partitionNamesToRefresh, scanOperator); - if (subCompensation.getState().isNoCompensate()) { + MVTransparentState state = baseCompensation.getState(); + // if the table is unnecessary to compensate, skip + if (state.isNoCompensate()) { continue; } - if (!subCompensation.getState().isCompensate()) { - return subCompensation; + // if the table is not rewritable, return unknown state + if (state.isUncompensable()) { + return MVCompensation.withState(sessionVariable, state); } - compensations.putAll(subCompensation.getCompensations()); + compensations.put(refBaseTable, baseCompensation); } - return ofBaseTableCompensations(compensations); + return MVCompensation.compensate(sessionVariable, compensations); } /** @@ -152,16 +118,15 @@ public MVCompensation buildMvCompensation() { // If no partition to refresh, return directly. Set mvToRefreshPartitionNames = mvUpdateInfo.getMvToRefreshPartitionNames(); if (CollectionUtils.isEmpty(mvToRefreshPartitionNames)) { - return MVCompensation.createNoCompensateState(sessionVariable); + return MVCompensation.noCompensate(sessionVariable); } - MaterializedView mv = mvContext.getMv(); Map> refBaseTablePartitionColumns = mv.getRefBaseTablePartitionColumns(); if (CollectionUtils.sizeIsEmpty(refBaseTablePartitionColumns)) { logMVRewrite("MV's not partitioned, failed to get partition keys: {}", mv.getName()); - return MVCompensation.createUnkownState(sessionVariable); + return MVCompensation.unknown(sessionVariable); } - Map> compensations = Maps.newHashMap(); + Map compensations = Maps.newHashMap(); Map baseTableUpdateInfoMap = mvUpdateInfo.getBaseTableUpdateInfos(); for (Map.Entry e : baseTableUpdateInfoMap.entrySet()) { Table refBaseTable = e.getKey(); @@ -171,316 +136,65 @@ public MVCompensation buildMvCompensation() { // If the ref table contains no partitions to refresh, no need compensate. // If the mv is partitioned and non-ref table needs refresh, then all partitions need to be refreshed; // it cannot be a candidate. - Set partitionNamesToRefresh = mvUpdateInfo.getBaseTableToRefreshPartitionNames(refBaseTable); - if (partitionNamesToRefresh == null) { + BaseCompensation baseCompensation = mvUpdateInfo.getRefBaseTableCompensation(refBaseTable, + Optional.empty()); + // NPE check + if (baseCompensation == null) { logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is null, unknown state", refBaseTable.getName()); - return MVCompensation.createUnkownState(sessionVariable); + return MVCompensation.unknown(sessionVariable); } - if (partitionNamesToRefresh.isEmpty()) { - logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is empty, no need compensate", - refBaseTable.getName()); + MVTransparentState state = baseCompensation.getState(); + // if the table is not needed to compensate, skip + if (state.isNoCompensate()) { continue; } - - MVCompensation subCompensation = getMVCompensationOfTable(refBaseTable, partitionNamesToRefresh); - if (subCompensation.getState().isNoCompensate()) { - continue; - } - - if (!subCompensation.getState().isCompensate()) { - return subCompensation; - } - compensations.putAll(subCompensation.getCompensations()); - } - return ofBaseTableCompensations(compensations); - } - - private MVCompensation getMVCompensationOfTable(Table refBaseTable, - Set refTablePartitionNamesToRefresh) { - if (refBaseTable.isNativeTableOrMaterializedView()) { - // What if nested mv? - List refTablePartitionIdsToRefresh = refTablePartitionNamesToRefresh.stream() - .map(name -> refBaseTable.getPartition(name)) - .filter(Objects::nonNull) - .map(p -> p.getId()) - .collect(Collectors.toList()); - return ofOlapTableCompensation(refBaseTable, refTablePartitionIdsToRefresh); - } else if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) { - MvBaseTableUpdateInfo mvBaseTableUpdateInfo = - mvUpdateInfo.getBaseTableUpdateInfos().get(refBaseTable); - if (mvBaseTableUpdateInfo == null) { - return null; - } - PartitionInfo partitionInfo = mvContext.getMv().getPartitionInfo(); - if (partitionInfo.isRangePartition()) { - Map> refTablePartitionNameWithRanges = - mvBaseTableUpdateInfo.getPartitionNameWithRanges(); - List partitionKeys = Lists.newArrayList(); - try { - for (String partitionName : refTablePartitionNamesToRefresh) { - Preconditions.checkState(refTablePartitionNameWithRanges.containsKey(partitionName)); - Range partitionKeyRange = refTablePartitionNameWithRanges.get(partitionName); - partitionKeys.add(partitionKeyRange.lowerEndpoint()); - } - } catch (Exception e) { - logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(), - DebugUtil.getStackTrace(e)); - return MVCompensation.createUnkownState(sessionVariable); - } - return ofExternalTableCompensation(refBaseTable, partitionKeys); - } else { - Preconditions.checkArgument(partitionInfo.isListPartition()); - Map partitionNameWithLists = mvBaseTableUpdateInfo.getPartitionNameWithLists(); - List partitionKeys = Lists.newArrayList(); - try { - List partitionCols = refBaseTable.getPartitionColumns(); - for (String partitionName : refTablePartitionNamesToRefresh) { - Preconditions.checkState(partitionNameWithLists.containsKey(partitionName)); - PListCell pCell = partitionNameWithLists.get(partitionName); - // TODO: we are assuming PListCell's cells' order is by partition's columns order, we may introduce - // partition columns in PListCell. - List keys = pCell.toPartitionKeys(partitionCols); - partitionKeys.addAll(keys); - } - } catch (Exception e) { - logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(), - DebugUtil.getStackTrace(e)); - return MVCompensation.createUnkownState(sessionVariable); - } - return ofExternalTableCompensation(refBaseTable, partitionKeys); + // if the table is not rewritable, return unknown state + if (state.isUncompensable()) { + return MVCompensation.withState(sessionVariable, state); } - } else { - return MVCompensation.createUnkownState(sessionVariable); + compensations.put(refBaseTable, baseCompensation); } + return MVCompensation.compensate(sessionVariable, compensations); } - private MVCompensation getMVCompensationOfTable(Table refBaseTable, - Set partitionNamesToRefresh, - LogicalScanOperator scanOperator) { + private BaseCompensation getMVCompensationOfTable(Table refBaseTable, + LogicalScanOperator scanOperator) { if (scanOperator instanceof LogicalOlapScanOperator) { - return getMVCompensationOfOlapTable(refBaseTable, partitionNamesToRefresh, - (LogicalOlapScanOperator) scanOperator); + return getMVCompensationOfOlapTable(refBaseTable, (LogicalOlapScanOperator) scanOperator); } else if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) { - return getMVCompensationForExternal(refBaseTable, partitionNamesToRefresh, scanOperator); + return getMVCompensationForExternal(refBaseTable, scanOperator); } else { - SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable(); - return MVCompensation.createUnkownState(sessionVariable); + return null; } } - private MVCompensation getMVCompensationOfOlapTable(Table refBaseTable, - Set partitionNamesToRefresh, + private BaseCompensation getMVCompensationOfOlapTable(Table refBaseTable, LogicalOlapScanOperator olapScanOperator) { - SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable(); - OlapTable olapTable = (OlapTable) refBaseTable; List selectPartitionIds = olapScanOperator.getSelectedPartitionId(); - if (Objects.isNull(selectPartitionIds) || selectPartitionIds.size() == 0) { - return MVCompensation.createNoCompensateState(sessionVariable); + if (CollectionUtils.isEmpty(selectPartitionIds)) { + return BaseCompensation.noCompensation(); } - // if any of query's select partition ids has not been refreshed, then no rewrite with this mv. - if (selectPartitionIds.stream() - .map(id -> olapTable.getPartition(id)) - .noneMatch(part -> partitionNamesToRefresh.contains(part.getName()))) { - return MVCompensation.createNoCompensateState(sessionVariable); - } - // if mv's to refresh partitions contains any of query's select partition ids, then rewrite with compensate. - List toRefreshRefTablePartitions = getMVCompensatePartitionsOfOlap(partitionNamesToRefresh, - refBaseTable, olapScanOperator); - if (toRefreshRefTablePartitions == null) { - return MVCompensation.createUnkownState(sessionVariable); - } - - Set toRefreshPartitionIds = Sets.newHashSet(toRefreshRefTablePartitions); - if (toRefreshPartitionIds.containsAll(selectPartitionIds)) { - logMVRewrite(mvContext, "All olap table {}'s selected partitions {} need to refresh, no rewrite", - refBaseTable.getName(), selectPartitionIds); - return MVCompensation.createNoRewriteState(sessionVariable); - } - return ofOlapTableCompensation(refBaseTable, toRefreshRefTablePartitions); - } - - private MVCompensation ofOlapTableCompensation(Table refBaseTable, - List toRefreshRefTablePartitions) { - BaseCompensation compensation = new OlapTableCompensation(toRefreshRefTablePartitions); - Map> compensationMap = Collections.singletonMap(refBaseTable, compensation); - return new MVCompensation(sessionVariable, MVTransparentState.COMPENSATE, compensationMap); - } - - private MVCompensation ofExternalTableCompensation(Table refBaseTable, - List toRefreshRefTablePartitions) { - Map> compensationMap = Collections.singletonMap(refBaseTable, - new ExternalTableCompensation(toRefreshRefTablePartitions)); - return new MVCompensation(sessionVariable, MVTransparentState.COMPENSATE, compensationMap); - } - - private MVCompensation ofBaseTableCompensations(Map> compensations) { - if (compensations.isEmpty()) { - return MVCompensation.createNoCompensateState(sessionVariable); - } else { - return new MVCompensation(sessionVariable, MVTransparentState.COMPENSATE, compensations); - } - } - - private MVCompensation getMVCompensationForExternal(Table refBaseTable, - Set refTablePartitionNamesToRefresh, - LogicalScanOperator refScanOperator) { - SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable(); - MaterializedView mv = mvContext.getMv(); - try { - ScanOperatorPredicates scanOperatorPredicates = refScanOperator.getScanOperatorPredicates(); - Collection selectPartitionIds = scanOperatorPredicates.getSelectedPartitionIds(); - List selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys(); - // For scan operator which support prune partitions with OptExternalPartitionPruner, - // we could only compensate partitions which selected partitions need to refresh. - if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) { - if (CollectionUtils.isEmpty(selectPartitionIds)) { - // see OptExternalPartitionPruner#computePartitionInfo: - // it's not the same meaning when selectPartitionIds is null and empty for hive and other tables - if (refScanOperator.getOpType() == OperatorType.LOGICAL_HIVE_SCAN) { - return MVCompensation.createNoCompensateState(sessionVariable); - } else { - return MVCompensation.createUnkownState(sessionVariable); - } - } - - // NOTE: ref base table's partition keys may contain multi columns, but mv may only contain one column. - List colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mv, refBaseTable); - if (colIndexes == null) { - return MVCompensation.createUnkownState(sessionVariable); - } - List newPartitionKeys = selectPartitionKeys.stream() - .map(partitionKey -> PartitionUtil.getSelectedPartitionKey(partitionKey, colIndexes)) - .collect(Collectors.toList()); - Set selectPartitionNames = newPartitionKeys.stream() - .map(PartitionUtil::generateMVPartitionName) - .collect(Collectors.toSet()); - if (selectPartitionNames.stream().noneMatch(refTablePartitionNamesToRefresh::contains)) { - return MVCompensation.createNoCompensateState(sessionVariable); - } - } - // if mv's to refresh partitions contains any of query's select partition ids, then rewrite with compensation. - List toRefreshRefTablePartitions = getMVCompensatePartitionsOfExternal(refBaseTable, - refTablePartitionNamesToRefresh, refScanOperator); - if (toRefreshRefTablePartitions == null) { - return MVCompensation.createUnkownState(sessionVariable); - } - - Table table = refScanOperator.getTable(); - if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) { - if (Sets.newHashSet(toRefreshRefTablePartitions).containsAll(selectPartitionKeys)) { - logMVRewrite(mvContext, "All external table {}'s selected partitions {} need to refresh, no rewrite", - table.getName(), selectPartitionIds); - return MVCompensation.createNoRewriteState(sessionVariable); - } - } - return ofExternalTableCompensation(table, toRefreshRefTablePartitions); - } catch (AnalysisException e) { - return MVCompensation.createUnkownState(sessionVariable); - } - } - - /** - * Get mv's compensate partitions for ref table(olap table). - * @param refBaseTable: materialized view's ref base table - * @param refScanOperator: ref base table's scan operator. - * @return: need to compensate partition ids of the materialized view. - */ - private List getMVCompensatePartitionsOfOlap(Set partitionNamesToRefresh, - Table refBaseTable, - LogicalScanOperator refScanOperator) { - LogicalOlapScanOperator olapScanOperator = ((LogicalOlapScanOperator) refScanOperator); - if (olapScanOperator.getSelectedPartitionId() == null) { + BaseCompensation baseCompensation = mvUpdateInfo.getRefBaseTableCompensation( + refBaseTable, Optional.of(olapScanOperator)); + if (baseCompensation == null) { + logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is null, unknown state", + refBaseTable.getName()); return null; } - List refTableCompensatePartitionIds = Lists.newArrayList(); - List selectPartitionIds = olapScanOperator.getSelectedPartitionId(); - for (Long selectPartitionId : selectPartitionIds) { - Partition partition = refBaseTable.getPartition(selectPartitionId); - // If this partition has updated, add it into compensate partition ids. - if (partitionNamesToRefresh.contains(partition.getName())) { - refTableCompensatePartitionIds.add(selectPartitionId); - } - } - return refTableCompensatePartitionIds; + return baseCompensation; } - private List getMVCompensatePartitionsOfExternal(Table refBaseTable, - Set refTablePartitionNamesToRefresh, - LogicalScanOperator refScanOperator) - throws AnalysisException { - if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) { - // For external table which support partition prune with OptExternalPartitionPruner, - // could use selectPartitionKeys to get the compensate partitions. - return getMVCompensatePartitionsOfExternalWithPartitionPruner(refTablePartitionNamesToRefresh, - refScanOperator); - } else { - return getMVCompensatePartitionsOfExternalWithoutPartitionPruner(refBaseTable, refTablePartitionNamesToRefresh); - } - } - private List getMVCompensatePartitionsOfExternalWithPartitionPruner( - Set refTablePartitionNamesToRefresh, - LogicalScanOperator refScanOperator) { - List refTableCompensatePartitionKeys = Lists.newArrayList(); - ScanOperatorPredicates scanOperatorPredicates = null; - try { - scanOperatorPredicates = refScanOperator.getScanOperatorPredicates(); - } catch (Exception e) { - return null; - } - if (scanOperatorPredicates == null) { - return null; - } - List selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys(); - // different behavior for different external table types - if (selectPartitionKeys.isEmpty() && refScanOperator.getOpType() != OperatorType.LOGICAL_HIVE_SCAN) { - return null; - } - Table refBaseTable = refScanOperator.getTable(); - List colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mvContext.getMv(), refBaseTable); - if (colIndexes == null) { - return null; - } - for (PartitionKey partitionKey : selectPartitionKeys) { - PartitionKey newPartitionKey = PartitionUtil.getSelectedPartitionKey(partitionKey, colIndexes); - String partitionName = generateMVPartitionName(newPartitionKey); - if (refTablePartitionNamesToRefresh.contains(partitionName)) { - refTableCompensatePartitionKeys.add(newPartitionKey); - } - } - return refTableCompensatePartitionKeys; - } - - private List getMVCompensatePartitionsOfExternalWithoutPartitionPruner( - Table refBaseTable, - Set refTablePartitionNamesToRefresh) { - MvBaseTableUpdateInfo baseTableUpdateInfo = mvUpdateInfo.getBaseTableUpdateInfos().get(refBaseTable); - if (baseTableUpdateInfo == null) { - return null; - } - // use update info's partition to cells since it's accurate. - Map nameToPartitionKeys = baseTableUpdateInfo.getPartitonToCells(); - List partitionKeys = Lists.newArrayList(); - try { - for (String partitionName : refTablePartitionNamesToRefresh) { - if (!nameToPartitionKeys.containsKey(partitionName)) { - return null; - } - PCell pCell = nameToPartitionKeys.get(partitionName); - if (pCell instanceof PRangeCell) { - partitionKeys.add(((PRangeCell) pCell).getRange().lowerEndpoint()); - } else if (pCell instanceof PListCell) { - List partitionColumns = refBaseTable.getPartitionColumns(); - List keys = ((PListCell) pCell).toPartitionKeys(partitionColumns); - partitionKeys.addAll(keys); - } - } - } catch (Exception e) { - logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(), - DebugUtil.getStackTrace(e)); + private BaseCompensation getMVCompensationForExternal(Table refBaseTable, + LogicalScanOperator refScanOperator) { + BaseCompensation baseCompensation = mvUpdateInfo.getRefBaseTableCompensation(refBaseTable, + Optional.of(refScanOperator)); + if (baseCompensation == null) { + logMVRewrite(mvContext, "MV's ref base table {} to refresh partition is null, unknown state", + refBaseTable.getName()); return null; } - return partitionKeys; + return baseCompensation; } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OlapTableCompensation.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OlapTableCompensation.java index ae5337fdaf29a3..809e9c1d29e38a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OlapTableCompensation.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OlapTableCompensation.java @@ -14,17 +14,32 @@ package com.starrocks.sql.optimizer.rule.transformation.materialization.compensation; +import com.starrocks.common.Config; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; + import java.util.List; -public class OlapTableCompensation extends BaseCompensation { +public final class OlapTableCompensation extends BaseCompensation { + private final List compensations; + public OlapTableCompensation(List partitionIds) { - super(partitionIds); + super(MVTransparentState.COMPENSATE); + this.compensations = partitionIds; + } + + public List getCompensations() { + return compensations; } @Override public String toString() { - return "OlapTableCompensation{" + - "partitionIds=" + getCompensations() + - '}'; + if (compensations == null) { + return ""; + } + StringBuilder sb = new StringBuilder(); + sb.append("size=").append(compensations.size()).append(", "); + sb.append(MvUtils.shrinkToSize(compensations, Config.max_mv_task_run_meta_message_values_length)); + return sb.toString(); } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OptCompensator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OptCompensator.java index 2d3dd32845cc7a..3c5df9f21ec08a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OptCompensator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/compensation/OptCompensator.java @@ -16,51 +16,25 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.starrocks.analysis.BinaryType; -import com.starrocks.analysis.Expr; -import com.starrocks.analysis.LiteralExpr; -import com.starrocks.analysis.SlotRef; -import com.starrocks.analysis.TableName; import com.starrocks.catalog.Column; -import com.starrocks.catalog.IcebergTable; import com.starrocks.catalog.MaterializedView; -import com.starrocks.catalog.PartitionInfo; -import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; -import com.starrocks.connector.TableVersionRange; -import com.starrocks.qe.ConnectContext; -import com.starrocks.server.GlobalStateMgr; -import com.starrocks.sql.analyzer.AnalyzeState; -import com.starrocks.sql.analyzer.ExpressionAnalyzer; -import com.starrocks.sql.analyzer.Field; -import com.starrocks.sql.analyzer.RelationFields; -import com.starrocks.sql.analyzer.RelationId; -import com.starrocks.sql.analyzer.Scope; import com.starrocks.sql.optimizer.OptExpression; import com.starrocks.sql.optimizer.OptExpressionVisitor; import com.starrocks.sql.optimizer.OptimizerContext; import com.starrocks.sql.optimizer.Utils; import com.starrocks.sql.optimizer.operator.OperatorBuilderFactory; -import com.starrocks.sql.optimizer.operator.OperatorType; import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator; import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator; -import com.starrocks.sql.optimizer.operator.scalar.BinaryPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; -import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; -import com.starrocks.sql.optimizer.transformer.ExpressionMapping; -import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator; -import org.apache.iceberg.Snapshot; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; -import static com.starrocks.connector.iceberg.IcebergPartitionUtils.getIcebergTablePartitionPredicateExpr; import static com.starrocks.sql.optimizer.operator.OpRuleBit.OP_PARTITION_PRUNED; import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvPartitionCompensator.SUPPORTED_PARTITION_COMPENSATE_EXTERNAL_SCAN_TYPES; -import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.convertPartitionKeysToListPredicate; /** * Compensate the scan operator with the partition compensation. @@ -68,63 +42,62 @@ public class OptCompensator extends OptExpressionVisitor { private final OptimizerContext optimizerContext; private final MaterializedView mv; - private final Map> compensations; + private final MVCompensation mvCompensation; // for olap table public OptCompensator(OptimizerContext optimizerContext, MaterializedView mv, - Map> compensations) { + MVCompensation mvCompensation) { this.optimizerContext = optimizerContext; this.mv = mv; - this.compensations = compensations; + this.mvCompensation = mvCompensation; } @Override public OptExpression visitLogicalTableScan(OptExpression optExpression, Void context) { LogicalScanOperator scanOperator = optExpression.getOp().cast(); Table refBaseTable = scanOperator.getTable(); - - if (refBaseTable.isNativeTableOrMaterializedView()) { - List olapTableCompensatePartitionIds = Lists.newArrayList(); - if (compensations.containsKey(refBaseTable)) { - BaseCompensation compensation = compensations.get(refBaseTable); - BaseCompensation olapTableCompensation = (BaseCompensation) compensation; - olapTableCompensatePartitionIds = olapTableCompensation.getCompensations(); + if (mvCompensation.isTableNeedCompensate(refBaseTable)) { + BaseCompensation compensation = mvCompensation.getTableCompensation(refBaseTable); + if (compensation.state.isNoRewrite()) { + return optExpression; } - LogicalOlapScanOperator olapScanOperator = (LogicalOlapScanOperator) scanOperator; - LogicalScanOperator newScanOperator = getOlapTableCompensatePlan(olapScanOperator, olapTableCompensatePartitionIds); - // reset the partition prune flag to be pruned again. - newScanOperator.resetOpRuleBit(OP_PARTITION_PRUNED); - return OptExpression.create(newScanOperator); - } else if (SUPPORTED_PARTITION_COMPENSATE_EXTERNAL_SCAN_TYPES.contains(scanOperator.getOpType())) { - List partitionKeys = Lists.newArrayList(); - if (compensations.containsKey(refBaseTable)) { - BaseCompensation compensation = compensations.get(refBaseTable); - BaseCompensation externalTableCompensation = (BaseCompensation) compensation; - partitionKeys = externalTableCompensation.getCompensations(); + if (refBaseTable.isNativeTableOrMaterializedView()) { + Preconditions.checkArgument(compensation instanceof OlapTableCompensation); + OlapTableCompensation olapTableCompensation = (OlapTableCompensation) compensation; + LogicalOlapScanOperator olapScanOperator = (LogicalOlapScanOperator) scanOperator; + LogicalScanOperator newScanOperator = getOlapTableCompensatePlan(olapScanOperator, olapTableCompensation); + // reset the partition prune flag to be pruned again. + newScanOperator.resetOpRuleBit(OP_PARTITION_PRUNED); + return OptExpression.create(newScanOperator); + } else if (SUPPORTED_PARTITION_COMPENSATE_EXTERNAL_SCAN_TYPES.contains(scanOperator.getOpType())) { + Preconditions.checkArgument(compensation instanceof ExternalTableCompensationV1); + ExternalTableCompensationV1 externalTableCompensation = (ExternalTableCompensationV1) compensation; + LogicalScanOperator newScanOperator = getExternalTableCompensatePlan(scanOperator, externalTableCompensation); + // reset the partition prune flag to be pruned again. + newScanOperator.resetOpRuleBit(OP_PARTITION_PRUNED); + return OptExpression.create(newScanOperator); + } else { + return optExpression; } - LogicalScanOperator newScanOperator = getExternalTableCompensatePlan(scanOperator, partitionKeys); - // reset the partition prune flag to be pruned again. - newScanOperator.resetOpRuleBit(OP_PARTITION_PRUNED); - return OptExpression.create(newScanOperator); } else { return optExpression; } } private LogicalScanOperator getOlapTableCompensatePlan(LogicalOlapScanOperator scanOperator, - List olapTableCompensatePartitionIds) { + OlapTableCompensation olapTableCompensation) { final LogicalOlapScanOperator.Builder builder = new LogicalOlapScanOperator.Builder(); - Preconditions.checkState(olapTableCompensatePartitionIds != null); + Preconditions.checkState(olapTableCompensation.getCompensations() != null); // reset original partition predicates to prune partitions/tablets again builder.withOperator(scanOperator) - .setSelectedPartitionId(olapTableCompensatePartitionIds) + .setSelectedPartitionId(olapTableCompensation.getCompensations()) .setSelectedTabletId(Lists.newArrayList()); return builder.build(); } private LogicalScanOperator getExternalTableCompensatePlan(LogicalScanOperator scanOperator, - List partitionKeys) { + ExternalTableCompensationV1 externalTableCompensation) { Table refBaseTable = scanOperator.getTable(); final LogicalScanOperator.Builder builder = OperatorBuilderFactory.build(scanOperator); @@ -139,90 +112,12 @@ private LogicalScanOperator getExternalTableCompensatePlan(LogicalScanOperator s } List refBaseTablePartitionCols = refBaseTablePartitionColumns.get(refBaseTable); Preconditions.checkState(refBaseTablePartitionCols != null); - ScalarOperator externalExtraPredicate = null; - if (scanOperator.getOpType() == OperatorType.LOGICAL_ICEBERG_SCAN) { - // refresh iceberg table's metadata - IcebergTable cachedIcebergTable = (IcebergTable) refBaseTable; - String catalogName = cachedIcebergTable.getCatalogName(); - String dbName = cachedIcebergTable.getCatalogDBName(); - TableName refTableName = new TableName(catalogName, dbName, cachedIcebergTable.getName()); - Table currentTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(refTableName).orElse(null); - if (currentTable == null) { - return null; - } - - builder.setTable(currentTable); - TableVersionRange versionRange = TableVersionRange.withEnd( - Optional.ofNullable(((IcebergTable) currentTable).getNativeTable().currentSnapshot()) - .map(Snapshot::snapshotId)); - builder.setTableVersionRange(versionRange); - PartitionInfo mvPartitionInfo = mv.getPartitionInfo(); - if (mvPartitionInfo.isListPartition()) { - List mvPartitionCols = mv.getPartitionColumns(); - // to iceberg, `partitionKeys` are using LocalTime as partition values which cannot be used to prune iceberg - // partitions directly because iceberg uses UTC time in its partition metadata. - // convert `partitionKeys` to iceberg utc time here. - // Please see MVPCTRefreshListPartitioner#genPartitionPredicate for more details. - List refPartitionColRefs = refBaseTablePartitionCols - .stream() - .map(col -> scanOperator.getColumnReference(col)) - .collect(Collectors.toList()); - Map> refBaseTablePartitionSlotRefs = mv.getRefBaseTablePartitionSlots(); - Preconditions.checkArgument(refBaseTablePartitionSlotRefs.containsKey(currentTable)); - List refBaseTableSlotRefs = refBaseTablePartitionSlotRefs.get(currentTable); - - ExpressionMapping expressionMapping = - new ExpressionMapping(new Scope(RelationId.anonymous(), new RelationFields()), - Lists.newArrayList()); - for (int i = 0; i < refPartitionColRefs.size(); i++) { - ColumnRefOperator refPartitionColRef = refPartitionColRefs.get(i); - SlotRef refBaseTablePartitionExpr = refBaseTableSlotRefs.get(i); - expressionMapping.put(refBaseTablePartitionExpr, refPartitionColRef); - } - AnalyzeState analyzeState = new AnalyzeState(); - Scope scope = new Scope(RelationId.anonymous(), new RelationFields( - refBaseTable.getBaseSchema().stream() - .map(col -> new Field(col.getName(), - col.getType(), refTableName, null)) - .collect(Collectors.toList()))); - List externalPredicates = Lists.newArrayList(); - for (PartitionKey partitionKey : partitionKeys) { - List literalExprs = partitionKey.getKeys(); - Preconditions.checkState(literalExprs.size() == refBaseTablePartitionCols.size()); - List predicates = Lists.newArrayList(); - for (int i = 0; i < literalExprs.size(); i++) { - Column mvColumn = mvPartitionCols.get(i); - LiteralExpr literalExpr = literalExprs.get(i); - Column refColumn = refBaseTablePartitionCols.get(i); - ColumnRefOperator refPartitionColRef = refPartitionColRefs.get(i); - ConstantOperator expectPartitionVal = - (ConstantOperator) SqlToScalarOperatorTranslator.translate(literalExpr); - if (!mvColumn.isGeneratedColumn()) { - ScalarOperator eq = new BinaryPredicateOperator(BinaryType.EQ, refPartitionColRef, - expectPartitionVal); - predicates.add(eq); - } else { - SlotRef refBaseTablePartitionExpr = refBaseTableSlotRefs.get(i); - Expr predicateExpr = getIcebergTablePartitionPredicateExpr((IcebergTable) currentTable, - refColumn.getName(), refBaseTablePartitionExpr, literalExpr); - ExpressionAnalyzer.analyzeExpression(predicateExpr, analyzeState, scope, ConnectContext.get()); - ScalarOperator predicate = SqlToScalarOperatorTranslator.translate(predicateExpr, expressionMapping, - optimizerContext.getColumnRefFactory()); - predicates.add(predicate); - } - } - externalPredicates.add(Utils.compoundAnd(predicates)); - } - externalExtraPredicate = Utils.compoundOr(externalPredicates); - } - } - if (externalExtraPredicate == null) { - List refPartitionColRefs = refBaseTablePartitionCols - .stream() - .map(col -> scanOperator.getColumnReference(col)) - .collect(Collectors.toList()); - externalExtraPredicate = convertPartitionKeysToListPredicate(refPartitionColRefs, partitionKeys); - } + List refPartitionColRefs = refBaseTablePartitionCols + .stream() + .map(col -> scanOperator.getColumnReference(col)) + .collect(Collectors.toList()); + ScalarOperator externalExtraPredicate = externalTableCompensation.getCompensateExpr(optimizerContext, mv, + refBaseTable, refPartitionColRefs); Preconditions.checkState(externalExtraPredicate != null); externalExtraPredicate.setRedundant(true); ScalarOperator finalPredicate = Utils.compoundAnd(scanOperator.getPredicate(), externalExtraPredicate); @@ -243,15 +138,15 @@ public OptExpression visit(OptExpression optExpression, Void context) { /** * Get the compensation plan for the mv. * @param mv the mv to compensate - * @param compensations the compensations for the mv, including ref base table's compensations + * @param mvCompensation the compensations for the mv, including ref base table's compensations * @param optExpression query plan with the ref base table * @return the compensation plan for the mv */ public static OptExpression getMVCompensatePlan(OptimizerContext optimizerContext, MaterializedView mv, - Map> compensations, + MVCompensation mvCompensation, OptExpression optExpression) { - OptCompensator scanOperatorCompensator = new OptCompensator(optimizerContext, mv, compensations); + OptCompensator scanOperatorCompensator = new OptCompensator(optimizerContext, mv, mvCompensation); return optExpression.getOp().accept(scanOperatorCompensator, optExpression, null); } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/com/starrocks/utframe/UtFrameUtils.java index a9065be89cd2ba..7293748319dd22 100644 --- a/fe/fe-core/src/test/java/com/starrocks/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/com/starrocks/utframe/UtFrameUtils.java @@ -1251,7 +1251,7 @@ public static void mockTimelinessForAsyncMVTest(ConnectContext connectContext) { @Mock public MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolean isQueryRewrite) { - return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH); + return MvUpdateInfo.noRefresh(mv); } };