Skip to content

Commit

Permalink
Support partition retention condition compensation in force_mv rewrit…
Browse files Browse the repository at this point in the history
…e mode

Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Jan 2, 2025
1 parent e494f6e commit 48381e0
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
!ttlRetentionCondition.equalsIgnoreCase(materializedView.getTableProperty().getPartitionRetentionCondition())) {
curProp.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, ttlRetentionCondition);
materializedView.getTableProperty().setPartitionRetentionCondition(ttlRetentionCondition);
// re-analyze mv retention condition
materializedView.analyzeMVRetentionCondition(context);
isChanged = true;
} else if (propClone.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER) &&
materializedView.getTableProperty().getPartitionRefreshNumber() != partitionRefreshNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.starrocks.sql.analyzer.AnalyzeState;
import com.starrocks.sql.analyzer.ExpressionAnalyzer;
import com.starrocks.sql.analyzer.Field;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.RelationFields;
import com.starrocks.sql.analyzer.RelationId;
import com.starrocks.sql.analyzer.Scope;
Expand All @@ -77,6 +78,7 @@
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.MvRewritePreprocessor;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rule.mv.MVUtils;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.parser.SqlParser;
Expand Down Expand Up @@ -504,6 +506,9 @@ public String toString() {
// cache table to base table info's mapping to refresh table, Iceberg/Delta table needs to refresh table's snapshots
// to fetch the newest table info.
private transient volatile Map<Table, BaseTableInfo> tableToBaseTableInfoCache = Maps.newConcurrentMap();
// mv's partition retention expr
private transient volatile Optional<Expr> retentionConditionExprOpt = Optional.empty();
private transient volatile Optional<ScalarOperator> retentionConditionScalarOpOpt = Optional.empty();

// Materialized view's output columns may be different from defined query's output columns.
// Record the indexes based on materialized view's column output.
Expand Down Expand Up @@ -1144,7 +1149,6 @@ private void analyzePartitionInfo() {
}
}
}

TableName tableName =
new TableName(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, db.getFullName(), this.name);
ExpressionAnalyzer.analyzeExpression(partitionExpr, new AnalyzeState(),
Expand Down Expand Up @@ -1490,6 +1494,14 @@ public boolean containsBaseTable(TableName tableName) {
return false;
}

public Expr getRetentionConditionExpr() {
return retentionConditionExprOpt.orElse(null);
}

public Optional<ScalarOperator> getRetentionConditionScalarOp() {
return retentionConditionScalarOpOpt;
}

/**
* NOTE: The ref-base-table partition expressions' order is guaranteed as the order of mv's defined partition columns' order.
* @return table to the partition expr map, multi values if mv contains multi ref base tables, empty if it's un-partitioned
Expand Down Expand Up @@ -1815,11 +1827,20 @@ private void analyzePartitionExprs() {
analyzeRefBaseTablePartitionSlots();
// analyze partition columns for ref base tables
analyzeRefBaseTablePartitionColumns();
// analyze partition retention condition
analyzeMVRetentionCondition(connectContext);
} catch (Exception e) {
LOG.warn("Analyze partition exprs failed", e);
}
}

public synchronized void analyzeMVRetentionCondition(ConnectContext connectContext) {
this.retentionConditionExprOpt =
MaterializedViewAnalyzer.analyzeMVRetentionCondition(connectContext, this);
this.retentionConditionScalarOpOpt = MaterializedViewAnalyzer.analyzeMVRetentionConditionOperator(
connectContext, this, this.retentionConditionExprOpt);
}

/**
* Since the table is cached in the Optional, needs to refresh it again for each query.
*/
Expand All @@ -1829,6 +1850,7 @@ private <K> Map<Table, K> refreshBaseTable(Map<Table, K> cached) {
Table table = e.getKey();
if (table instanceof IcebergTable || table instanceof DeltaLakeTable) {
Preconditions.checkState(tableToBaseTableInfoCache.containsKey(table));
// TODO: get table from current context rather than metadata catalog
// it's fine to re-get table from metadata catalog again since metadata catalog should cache
// the newest table info.
Table refreshedTable = MvUtils.getTableChecked(tableToBaseTableInfoCache.get(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.starrocks.connector.iceberg.IcebergPartitionTransform;
import com.starrocks.mv.analyzer.MVPartitionSlotRefResolver;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterMaterializedViewStmt;
Expand Down Expand Up @@ -98,11 +100,15 @@
import com.starrocks.sql.optimizer.base.PhysicalPropertySet;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorRewriter;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.optimizer.transformer.ExpressionMapping;
import com.starrocks.sql.optimizer.transformer.LogicalPlan;
import com.starrocks.sql.optimizer.transformer.OptExprBuilder;
import com.starrocks.sql.optimizer.transformer.RelationTransformer;
import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator;
import com.starrocks.sql.parser.ParsingException;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanFragmentBuilder;
import org.apache.commons.collections.map.CaseInsensitiveMap;
Expand All @@ -112,7 +118,6 @@
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.NotNull;

import java.time.LocalDateTime;
Expand All @@ -133,6 +138,7 @@

import static com.starrocks.server.CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog;
import static com.starrocks.server.CatalogMgr.isInternalCatalog;
import static com.starrocks.sql.optimizer.rewrite.ScalarOperatorRewriter.DEFAULT_TYPE_CAST_RULE;

public class MaterializedViewAnalyzer {
private static final Logger LOG = LogManager.getLogger(MaterializedViewAnalyzer.class);
Expand Down Expand Up @@ -209,7 +215,7 @@ private static boolean isExternalTableFromResource(Table table) {
return false;
}
String catalog = table.getCatalogName();
return Strings.isBlank(catalog) || isResourceMappingCatalog(catalog);
return Strings.isNullOrEmpty(catalog) || isResourceMappingCatalog(catalog);
}

private static void processViews(QueryStatement queryStatement, Set<BaseTableInfo> baseTableInfos,
Expand Down Expand Up @@ -263,18 +269,18 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement
* Materialized view name is a little bit different from a normal table
* 1. Use default catalog if not specified, actually it only support default catalog until now
*/
if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getCatalog())) {
if (Strings.isNullOrEmpty(tableNameObject.getCatalog())) {
tableNameObject.setCatalog(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME);
}
if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getDb())) {
if (com.google.common.base.Strings.isNullOrEmpty(context.getDatabase())) {
if (Strings.isNullOrEmpty(tableNameObject.getDb())) {
if (Strings.isNullOrEmpty(context.getDatabase())) {
throw new SemanticException("No database selected. " +
"You could set the database name through `<database>.<table>` or `use <database>` statement");
}
tableNameObject.setDb(context.getDatabase());
}

if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getTbl())) {
if (Strings.isNullOrEmpty(tableNameObject.getTbl())) {
throw new SemanticException("Table name cannot be empty");
}

Expand Down Expand Up @@ -581,7 +587,7 @@ private List<Pair<Column, Integer>> genMaterializedViewColumns(CreateMaterialize
for (int i = 0; i < mvColumns.size(); i++) {
Column col = mvColumns.get(i);
if (columnMap.putIfAbsent(col.getName(), Pair.create(col, i)) != null) {
throw new SemanticException("Duplicate column name " + Strings.quote(col.getName()));
throw new SemanticException("Duplicate column name `" + col.getName() + "`");
}
}

Expand Down Expand Up @@ -1844,4 +1850,78 @@ public static Expr getAdjustedOlapTablePartitionExpr(Map<Integer, Expr> changedP
}
return partitionByExpr;
}

public static Optional<Expr> analyzeMVRetentionCondition(ConnectContext connectContext,
MaterializedView mv) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return Optional.empty();
}
String retentionCondition = mv.getTableProperty().getPartitionRetentionCondition();
if (Strings.isNullOrEmpty(retentionCondition)) {
return Optional.empty();
}

Expr retentionCondtiionExpr = null;
try {
retentionCondtiionExpr = SqlParser.parseSqlToExpr(retentionCondition, SqlModeHelper.MODE_DEFAULT);
} catch (Exception e) {
throw new SemanticException("Failed to parse retention condition: " + retentionCondition);
}
if (retentionCondtiionExpr == null) {
return Optional.empty();
}
final Map<Table, List<Column>> refBaseTablePartitionColumns = mv.getRefBaseTablePartitionColumns();
if (refBaseTablePartitionColumns == null || refBaseTablePartitionColumns.size() != 1) {
return Optional.empty();
}
Table refBaseTable = refBaseTablePartitionColumns.keySet().iterator().next();
Scope scope = new Scope(RelationId.anonymous(), new RelationFields(
refBaseTable.getBaseSchema().stream()
.map(col -> new Field(col.getName(), col.getType(), null, null))
.collect(Collectors.toList())));
ExpressionAnalyzer.analyzeExpression(retentionCondtiionExpr, new AnalyzeState(), scope, connectContext);
Map<Expr, Expr> partitionByExprMap = getMVPartitionByExprToAdjustMap(null, mv);
retentionCondtiionExpr = MaterializedViewAnalyzer.adjustWhereExprIfNeeded(partitionByExprMap, retentionCondtiionExpr,
scope, connectContext);
return Optional.of(retentionCondtiionExpr);
}

public static Optional<ScalarOperator> analyzeMVRetentionConditionOperator(ConnectContext connectContext,
MaterializedView mv,
Optional<Expr> exprOpt) {
if (exprOpt.isEmpty()) {
return Optional.empty();
}

Expr retentionCondtiionExpr = exprOpt.get();
final Map<Table, List<Column>> refBaseTablePartitionColumns = mv.getRefBaseTablePartitionColumns();
if (refBaseTablePartitionColumns == null || refBaseTablePartitionColumns.size() != 1) {
return Optional.empty();
}
Table refBaseTable = refBaseTablePartitionColumns.keySet().iterator().next();
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
List<ColumnRefOperator> columnRefOperators = refBaseTable.getColumns()
.stream()
.map(col -> columnRefFactory.create(col.getName(), col.getType(), col.isAllowNull()))
.collect(Collectors.toList());
TableName tableName = new TableName(null, refBaseTable.getName());
Scope scope = new Scope(RelationId.anonymous(), new RelationFields(
columnRefOperators.stream()
.map(col -> new Field(col.getName(), col.getType(), tableName, null))
.collect(Collectors.toList())));
ExpressionMapping expressionMapping = new ExpressionMapping(scope, columnRefOperators);
// substitute generated column expr if whereExpr is a mv which contains iceberg transform expr.
// translate whereExpr to scalarOperator and replace whereExpr's generatedColumnExpr to partition slotRef.
ScalarOperator scalarOperator =
SqlToScalarOperatorTranslator.translate(retentionCondtiionExpr, expressionMapping, Lists.newArrayList(),
columnRefFactory, connectContext, null,
null, null, false);
if (scalarOperator == null) {
return Optional.empty();
}
ScalarOperatorRewriter scalarOpRewriter = new ScalarOperatorRewriter();
scalarOperator = scalarOpRewriter.rewrite(scalarOperator, DEFAULT_TYPE_CAST_RULE);
return Optional.of(scalarOperator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ private List<MvPlanContext> getMvPlanCacheFromFuture(SessionVariable sessionVari
LOG.warn("get mv plan cache failed: {}", mv.getName(), e);
return null;
}
LOG.info("Get mv plan cache success: {}, cost: {}ms", mv.getName(), System.currentTimeMillis() - startTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Get mv plan cache success: {}, cost: {}ms", mv.getName(),
System.currentTimeMillis() - startTime);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.starrocks.sql.optimizer.rule.transformation.SkewJoinOptimizeRule;
import com.starrocks.sql.optimizer.rule.transformation.SplitScanORToUnionRule;
import com.starrocks.sql.optimizer.rule.transformation.UnionToValuesRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVCompensationPruneUnionRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvRewriteStrategy;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.optimizer.rule.transformation.materialization.rule.TextMatchBasedRewriteRule;
Expand Down Expand Up @@ -460,6 +461,7 @@ private void ruleBasedMaterializedViewRewrite(OptExpression tree,
// It's necessary for external table since its predicate is not used directly after push down.
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.PARTITION_PRUNE);
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.PRUNE_EMPTY_OPERATOR);
ruleRewriteIterative(tree, rootTaskContext, new MVCompensationPruneUnionRule());
ruleRewriteIterative(tree, rootTaskContext, new MergeTwoProjectRule());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ public ScalarOperator evaluation(CallOperator root, boolean needMonotonic) {
}
return operator;
} catch (Exception e) {
LOG.debug("failed to invoke", e);
if (LOG.isDebugEnabled()) {
LOG.debug("failed to invoke", e);
}
if (invoker.isMetaFunction) {
throw new StarRocksPlannerException(ErrorType.USER_ERROR, ExceptionUtils.getRootCauseMessage(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public enum RuleType {
TF_MV_TRANSPARENT_REWRITE_RULE,
TF_MV_AGGREGATE_JOIN_PUSH_DOWN_RULE,
TF_MV_AGGREGATE_TIME_SERIES_SCAN_RULE,
TF_MV_COMPENSATION_PRUNE_UNION,

TF_GROUP_BY_COUNT_DISTINCT_DATA_SKEW_ELIMINATE_RULE,

Expand Down
Loading

0 comments on commit 48381e0

Please sign in to comment.