Skip to content

Commit

Permalink
Refactor mv partition compensate codes
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 19, 2024
1 parent 03803b2 commit 1751598
Show file tree
Hide file tree
Showing 22 changed files with 673 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MvBaseTableUpdateInfo {
// The partition names of base table that have been updated
private final Set<String> toRefreshPartitionNames = Sets.newHashSet();
// The mapping of partition name to partition range
private final Map<String, PCell> partitonToCells = Maps.newHashMap();
private final Map<String, PCell> partitionToCells = Maps.newHashMap();

// If the base table is a mv, needs to record the mapping of mv partition name to partition range
private final Map<String, PCell> mvPartitionNameToCellMap = Maps.newHashMap();
Expand All @@ -52,8 +52,8 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

public Map<String, PCell> getPartitonToCells() {
return partitonToCells;
public Map<String, PCell> getPartitionToCells() {
return partitionToCells;
}

/**
Expand All @@ -71,22 +71,22 @@ public void addToRefreshPartitionNames(Set<String> toRefreshPartitionNames) {
*/
public void addRangePartitionKeys(String partitionName,
Range<PartitionKey> rangePartitionKey) {
partitonToCells.put(partitionName, new PRangeCell(rangePartitionKey));
partitionToCells.put(partitionName, new PRangeCell(rangePartitionKey));
}

/**
* Add partition name that needs to be refreshed and its associated list partition key
*/
public void addPartitionCells(Map<String, PCell> cells) {
partitonToCells.putAll(cells);
partitionToCells.putAll(cells);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
Map<String, Range<PartitionKey>> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
for (Map.Entry<String, PCell> e : partitionToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PRangeCell rangeCell = (PRangeCell) e.getValue();
result.put(e.getKey(), rangeCell.getRange());
Expand All @@ -99,7 +99,7 @@ public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
*/
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
for (Map.Entry<String, PCell> e : partitionToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PListCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
Expand All @@ -111,7 +111,7 @@ public Map<String, PListCell> getPartitionNameWithLists() {
public String toString() {
return "BaseTableRefreshInfo{" +
", toRefreshPartitionNames=" + toRefreshPartitionNames +
", nameToPartKeys=" + partitonToCells +
", nameToPartKeys=" + partitionToCells +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static boolean needsToRefreshTable(MaterializedView mv, Table table, bool
public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolean isQueryRewrite) {
// Skip check for sync materialized view.
if (mv.getRefreshScheme().isSync()) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}

// check mv's query rewrite consistency mode property only in query rewrite.
Expand All @@ -72,9 +72,9 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea
if (isQueryRewrite) {
switch (mvConsistencyRewriteMode) {
case DISABLE:
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
case NOCHECK:
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
case LOOSE:
case CHECKED:
default:
Expand All @@ -89,7 +89,7 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea
return timelinessArbiter.getMVTimelinessUpdateInfo(mvConsistencyRewriteMode);
} catch (AnalysisException e) {
logMVPrepare(mv, "Failed to get mv timeliness info: {}", DebugUtil.getStackTrace(e));
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.UNKNOWN);
return MvUpdateInfo.unknown(mv);
}
}

Expand Down
211 changes: 195 additions & 16 deletions fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,41 @@

package com.starrocks.catalog;

import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.analysis.Expr;
import com.starrocks.common.Config;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.operator.ScanOperatorPredicates;
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvPartitionCompensator;
import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.BaseCompensation;
import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.ExternalTableCompensationV1;
import com.starrocks.sql.optimizer.rule.transformation.materialization.compensation.OlapTableCompensation;
import org.apache.commons.collections4.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVRewrite;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.shrinkToSize;

/**
* Store the update information of MV used for mv rewrite(mv refresh can use it later).
*/
public class MvUpdateInfo {
private final MaterializedView mv;
// The type of mv refresh later
private final MvToRefreshType mvToRefreshType;
// The partition names of mv to refresh
Expand All @@ -45,6 +65,9 @@ public class MvUpdateInfo {
// If the base table is a mv, needs to record the mapping of mv partition name to partition range
private final Map<String, PCell> mvPartitionNameToCellMap = Maps.newHashMap();

// For force_mv rewrite, compensate partition expr is used to rewrite the query.
private Expr compensatePartitionExpr;

/**
* Marks the type of mv refresh later.
*/
Expand All @@ -55,16 +78,36 @@ public enum MvToRefreshType {
UNKNOWN // unknown type
}

public MvUpdateInfo(MvToRefreshType mvToRefreshType) {
public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType) {
this.mv = mv;
this.mvToRefreshType = mvToRefreshType;
this.queryRewriteConsistencyMode = TableProperty.QueryRewriteConsistencyMode.CHECKED;
}

public MvUpdateInfo(MvToRefreshType mvToRefreshType, TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType,
TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
this.mv = mv;
this.mvToRefreshType = mvToRefreshType;
this.queryRewriteConsistencyMode = queryRewriteConsistencyMode;
}

public static MvUpdateInfo unknown(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.UNKNOWN);
}

public static MvUpdateInfo noRefresh(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.NO_REFRESH);
}

public static MvUpdateInfo fullRefresh(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.FULL);
}

public static MvUpdateInfo partialRefresh(MaterializedView mv,
TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
return new MvUpdateInfo(mv, MvToRefreshType.PARTIAL, queryRewriteConsistencyMode);
}

public MvToRefreshType getMvToRefreshType() {
return mvToRefreshType;
}
Expand Down Expand Up @@ -120,25 +163,12 @@ public String toString() {
'}';
}

/**
* @return the detail string of the mv update info
*/
public String toDetailString() {
return "MvUpdateInfo{" +
"refreshType=" + mvToRefreshType +
", mvToRefreshPartitionNames=" + mvToRefreshPartitionNames +
", baseTableUpdateInfos=" + baseTableUpdateInfos +
", basePartToMvPartNames=" + basePartToMvPartNames +
", mvPartToBasePartNames=" + mvPartToBasePartNames +
'}';
}

/**
* Get the ref base table partition names to refresh for the given mv.
* @param refBaseTable: the input ref base table
* @return: the partition names to refresh of the ref base table.
*/
public Set<String> getBaseTableToRefreshPartitionNames(Table refBaseTable) {
private Set<String> getBaseTableToRefreshPartitionNames(Table refBaseTable) {
if (mvToRefreshPartitionNames.isEmpty() || mvToRefreshType == MvToRefreshType.NO_REFRESH) {
return Sets.newHashSet();
}
Expand Down Expand Up @@ -167,4 +197,153 @@ public Set<String> getBaseTableToRefreshPartitionNames(Table refBaseTable) {
}
return refBaseTableToRefreshPartitionNames;
}

/**
* Get the compensation for the ref base table which has no input query plan but to compensate a consistent mv.
*/
public BaseCompensation getRefBaseTableCompensation(Table refBaseTable,
Optional<LogicalScanOperator> scanOperatorOpt) {
Set<String> toRefreshPartitionNames = getBaseTableToRefreshPartitionNames(refBaseTable);
if (toRefreshPartitionNames == null) {
logMVRewrite(mv.getName(), "MV's ref base table {} to refresh partition is null, unknown state",
refBaseTable.getName());
return null;
}
if (toRefreshPartitionNames.isEmpty()) {
logMVRewrite(mv.getName(), "MV's ref base table {} to refresh partition is empty, no need compensate",
refBaseTable.getName());
return BaseCompensation.noCompensation();
}

if (refBaseTable.isNativeTableOrMaterializedView()) {
return getCompensationForOlap(refBaseTable, scanOperatorOpt, toRefreshPartitionNames);
} else if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) {
return getCompensationForExternal(refBaseTable, scanOperatorOpt, toRefreshPartitionNames);
} else {
return BaseCompensation.unknown();
}
}

private BaseCompensation getCompensationForOlap(Table refBaseTable,
Optional<LogicalScanOperator> scanOperatorOpt,
Set<String> toRefreshPartitionNames) {
// only retain the partition ids which are selected by the query plan.
if (scanOperatorOpt.isPresent()) {
LogicalOlapScanOperator logicalOlapScanOperator = (LogicalOlapScanOperator) scanOperatorOpt.get();
Set<String> selectedPartitionNames = logicalOlapScanOperator.getSelectedPartitionId()
.stream()
.map(p -> refBaseTable.getPartition(p))
.map(p -> p.getName())
.collect(Collectors.toSet());
// if all selected partitions need to refresh, no need to rewrite.
if (toRefreshPartitionNames.containsAll(selectedPartitionNames)) {
return BaseCompensation.noRewrite();
}
// only retain the selected partitions to refresh.
toRefreshPartitionNames.retainAll(selectedPartitionNames);
}
// if no partition need to refresh, no need to compensate.
if (toRefreshPartitionNames.isEmpty()) {
return BaseCompensation.noCompensation();
}
List<Long> refTablePartitionIdsToRefresh = toRefreshPartitionNames.stream()
.map(name -> refBaseTable.getPartition(name))
.filter(Objects::nonNull)
.map(p -> p.getId())
.collect(Collectors.toList());
return new OlapTableCompensation(refTablePartitionIdsToRefresh);
}

private BaseCompensation getCompensationForExternal(Table refBaseTable,
Optional<LogicalScanOperator> scanOperatorOpt,
Set<String> toRefreshPartitionNames) {
List<PartitionKey> selectPartitionKeys = null;
if (scanOperatorOpt.isPresent()) {
Collection<Long> selectPartitionIds = null;
try {
ScanOperatorPredicates scanOperatorPredicates = scanOperatorOpt.get().getScanOperatorPredicates();
selectPartitionIds = scanOperatorPredicates.getSelectedPartitionIds();
selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys();
} catch (Exception e) {
return null;
}
// For scan operator that supports prune partitions with OptExternalPartitionPruner,
// we could only compensate partitions which selected partitions need to refresh.
if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) {
if (CollectionUtils.isEmpty(selectPartitionIds)) {
// see OptExternalPartitionPruner#computePartitionInfo:
// it's different meaning when selectPartitionIds is null and empty for hive and other tables
if (refBaseTable instanceof HiveTable) {
return BaseCompensation.noCompensation();
} else {
return BaseCompensation.unknown();
}
}
}
}

if (selectPartitionKeys != null) {
List<Integer> colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mv, refBaseTable);
if (colIndexes == null) {
logMVRewrite(mv.getName(), "Failed to get partition column indexes for ref base table: {}",
refBaseTable.getName());
return null;
}
Set<PartitionKey> newSelectedPartitionKeys = selectPartitionKeys
.stream()
.map(p -> PartitionUtil.getSelectedPartitionKey(p, colIndexes))
.collect(Collectors.toSet());
Set<String> selectPartitionNames = newSelectedPartitionKeys.stream()
.map(PartitionUtil::generateMVPartitionName)
.collect(Collectors.toSet());
// NOTE: use partition names rather than partition keys since the partition key may be different for null value.
// if all selected partitions need to refresh, no need to rewrite.
if (toRefreshPartitionNames.containsAll(selectPartitionNames)) {
logMVRewrite(mv.getName(), "All external table {}'s selected partitions {} need to refresh, no rewrite",
refBaseTable.getName(), selectPartitionKeys);
return BaseCompensation.noRewrite();
}
// filter the selected partitions to refresh.
toRefreshPartitionNames.retainAll(selectPartitionNames);

// if no partition needs to refresh, no need to compensate.
if (toRefreshPartitionNames.isEmpty()) {
return BaseCompensation.noCompensation();
}
}
List<PartitionKey> toRefreshPartitionKeys =
getPartitionKeysToRefresh(refBaseTable, toRefreshPartitionNames);
return new ExternalTableCompensationV1(toRefreshPartitionKeys);
}

private List<PartitionKey> getPartitionKeysToRefresh(Table refBaseTable, Set<String> refTablePartitionNamesToRefresh) {
// get the partition keys to refresh
MvBaseTableUpdateInfo baseTableUpdateInfo = baseTableUpdateInfos.get(refBaseTable);
if (baseTableUpdateInfo == null) {
return null;
}
// use update info's partition to cells since it's accurate.
Map<String, PCell> nameToPartitionKeys = baseTableUpdateInfo.getPartitionToCells();
List<PartitionKey> toRefreshPartitionKeys = Lists.newArrayList();
try {
for (String partitionName : refTablePartitionNamesToRefresh) {
if (!nameToPartitionKeys.containsKey(partitionName)) {
return null;
}
PCell pCell = nameToPartitionKeys.get(partitionName);
if (pCell instanceof PRangeCell) {
toRefreshPartitionKeys.add(((PRangeCell) pCell).getRange().lowerEndpoint());
} else if (pCell instanceof PListCell) {
List<Column> partitionColumns = refBaseTable.getPartitionColumns();
List<PartitionKey> keys = ((PListCell) pCell).toPartitionKeys(partitionColumns);
toRefreshPartitionKeys.addAll(keys);
}
}
} catch (Exception e) {
logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(),
DebugUtil.getStackTrace(e));
return null;
}
return toRefreshPartitionKeys;
}
}
Loading

0 comments on commit 1751598

Please sign in to comment.