Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 26, 2024
1 parent 1eb6d72 commit e9f58a0
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
!ttlRetentionCondition.equalsIgnoreCase(materializedView.getTableProperty().getPartitionRetentionCondition())) {
curProp.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, ttlRetentionCondition);
materializedView.getTableProperty().setPartitionRetentionCondition(ttlRetentionCondition);
// re-analyze mv retention condition
materializedView.analyzeMVRetentionCondition(context);
isChanged = true;
} else if (propClone.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER) &&
materializedView.getTableProperty().getPartitionRefreshNumber() != partitionRefreshNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.starrocks.sql.analyzer.AnalyzeState;
import com.starrocks.sql.analyzer.ExpressionAnalyzer;
import com.starrocks.sql.analyzer.Field;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.RelationFields;
import com.starrocks.sql.analyzer.RelationId;
import com.starrocks.sql.analyzer.Scope;
Expand Down Expand Up @@ -504,6 +505,8 @@ public String toString() {
// cache table to base table info's mapping to refresh table, Iceberg/Delta table needs to refresh table's snapshots
// to fetch the newest table info.
private transient volatile Map<Table, BaseTableInfo> tableToBaseTableInfoCache = Maps.newConcurrentMap();
// mv's partition retention expr
private transient volatile Optional<Expr> retentionConditionExprOpt = Optional.empty();

// Materialized view's output columns may be different from defined query's output columns.
// Record the indexes based on materialized view's column output.
Expand Down Expand Up @@ -1144,7 +1147,6 @@ private void analyzePartitionInfo() {
}
}
}

TableName tableName =
new TableName(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, db.getFullName(), this.name);
ExpressionAnalyzer.analyzeExpression(partitionExpr, new AnalyzeState(),
Expand Down Expand Up @@ -1490,6 +1492,10 @@ public boolean containsBaseTable(TableName tableName) {
return false;
}

public Expr getRetentionConditionExpr() {
return retentionConditionExprOpt.orElse(null);
}

/**
* NOTE: The ref-base-table partition expressions' order is guaranteed as the order of mv's defined partition columns' order.
* @return table to the partition expr map, multi values if mv contains multi ref base tables, empty if it's un-partitioned
Expand Down Expand Up @@ -1815,11 +1821,18 @@ private void analyzePartitionExprs() {
analyzeRefBaseTablePartitionSlots();
// analyze partition columns for ref base tables
analyzeRefBaseTablePartitionColumns();
// analyze partition retention condition
analyzeMVRetentionCondition(connectContext);
} catch (Exception e) {
LOG.warn("Analyze partition exprs failed", e);
}
}

public synchronized void analyzeMVRetentionCondition(ConnectContext connectContext) {
this.retentionConditionExprOpt =
MaterializedViewAnalyzer.analyzeMVRetentionCondition(connectContext, this);
}

/**
* Since the table is cached in the Optional, needs to refresh it again for each query.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.starrocks.connector.iceberg.IcebergPartitionTransform;
import com.starrocks.mv.analyzer.MVPartitionSlotRefResolver;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterMaterializedViewStmt;
Expand Down Expand Up @@ -103,6 +105,7 @@
import com.starrocks.sql.optimizer.transformer.OptExprBuilder;
import com.starrocks.sql.optimizer.transformer.RelationTransformer;
import com.starrocks.sql.parser.ParsingException;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanFragmentBuilder;
import org.apache.commons.collections.map.CaseInsensitiveMap;
Expand All @@ -112,7 +115,6 @@
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.NotNull;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -209,7 +211,7 @@ private static boolean isExternalTableFromResource(Table table) {
return false;
}
String catalog = table.getCatalogName();
return Strings.isBlank(catalog) || isResourceMappingCatalog(catalog);
return Strings.isNullOrEmpty(catalog) || isResourceMappingCatalog(catalog);
}

private static void processViews(QueryStatement queryStatement, Set<BaseTableInfo> baseTableInfos,
Expand Down Expand Up @@ -263,18 +265,18 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement
* Materialized view name is a little bit different from a normal table
* 1. Use default catalog if not specified, actually it only support default catalog until now
*/
if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getCatalog())) {
if (Strings.isNullOrEmpty(tableNameObject.getCatalog())) {
tableNameObject.setCatalog(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME);
}
if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getDb())) {
if (com.google.common.base.Strings.isNullOrEmpty(context.getDatabase())) {
if (Strings.isNullOrEmpty(tableNameObject.getDb())) {
if (Strings.isNullOrEmpty(context.getDatabase())) {
throw new SemanticException("No database selected. " +
"You could set the database name through `<database>.<table>` or `use <database>` statement");
}
tableNameObject.setDb(context.getDatabase());
}

if (com.google.common.base.Strings.isNullOrEmpty(tableNameObject.getTbl())) {
if (Strings.isNullOrEmpty(tableNameObject.getTbl())) {
throw new SemanticException("Table name cannot be empty");
}

Expand Down Expand Up @@ -581,7 +583,7 @@ private List<Pair<Column, Integer>> genMaterializedViewColumns(CreateMaterialize
for (int i = 0; i < mvColumns.size(); i++) {
Column col = mvColumns.get(i);
if (columnMap.putIfAbsent(col.getName(), Pair.create(col, i)) != null) {
throw new SemanticException("Duplicate column name " + Strings.quote(col.getName()));
throw new SemanticException("Duplicate column name `" + col.getName() + "`");
}
}

Expand Down Expand Up @@ -1844,4 +1846,38 @@ public static Expr getAdjustedOlapTablePartitionExpr(Map<Integer, Expr> changedP
}
return partitionByExpr;
}

public static Optional<Expr> analyzeMVRetentionCondition(ConnectContext connectContext,
MaterializedView mv) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return Optional.empty();
}
String retentionCondition = mv.getTableProperty().getPartitionRetentionCondition();
if (Strings.isNullOrEmpty(retentionCondition)) {
return Optional.empty();
}

Expr retentionCondtiionExpr = null;
try {
retentionCondtiionExpr = SqlParser.parseSqlToExpr(retentionCondition, SqlModeHelper.MODE_DEFAULT);
} catch (Exception e) {
throw new SemanticException("Failed to parse retention condition: " + retentionCondition);
}
if (retentionCondtiionExpr == null) {
return Optional.empty();
}

// analyze partition retention expr
Scope scope = new Scope(RelationId.anonymous(), new RelationFields(
mv.getBaseSchema().stream()
.map(col -> new Field(col.getName(), col.getType(), null, null))
.collect(Collectors.toList())));
ExpressionAnalyzer.analyzeExpression(retentionCondtiionExpr, new AnalyzeState(), scope, connectContext);

Map<Expr, Expr> partitionByExprMap = getMVPartitionByExprToAdjustMap(null, mv);
retentionCondtiionExpr = MaterializedViewAnalyzer.adjustWhereExprIfNeeded(partitionByExprMap, retentionCondtiionExpr,
scope, connectContext);
return Optional.of(retentionCondtiionExpr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ListPartitionPruner implements PartitionPruner {
Expand Down Expand Up @@ -261,7 +262,7 @@ public static List<String> deduceGenerateColumns(LogicalScanOperator scanOperato
}
List<String> result = Lists.newArrayList(partitionColumnNames);

java.util.function.Function<SlotRef, ColumnRefOperator> slotRefResolver = (slot) -> {
Function<SlotRef, ColumnRefOperator> slotRefResolver = (slot) -> {
return scanOperator.getColumnNameToColRefMap().get(slot.getColumnName());
};
Consumer<SlotRef> slotRefConsumer = (slot) -> {
Expand Down Expand Up @@ -304,7 +305,7 @@ private void deduceExtraConjuncts() {
if (!deduceExtraConjuncts) {
return;
}
java.util.function.Function<SlotRef, ColumnRefOperator> slotRefResolver = (slot) -> {
Function<SlotRef, ColumnRefOperator> slotRefResolver = (slot) -> {
return scanOperator.getColumnNameToColRefMap().get(slot.getColumnName());
};
// The GeneratedColumn doesn't have the correct type info, let's help it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.starrocks.common.Pair;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.AnalyzeState;
import com.starrocks.sql.analyzer.ExpressionAnalyzer;
import com.starrocks.sql.analyzer.Field;
Expand All @@ -59,6 +61,7 @@
import com.starrocks.sql.optimizer.transformer.ExpressionMapping;
import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.iceberg.Snapshot;

import java.util.Collection;
import java.util.List;
Expand All @@ -79,21 +82,6 @@ public ExternalTableCompensation(Table refBaseTable, List<PartitionKey> compensa
this.compensations = compensations;
}

public ScalarOperator getCompensateExpr(OptimizerContext optimizerContext,
MaterializedView mv,
Table refBaseTable,
List<ColumnRefOperator> refPartitionColRefs) {
if (refBaseTable instanceof IcebergTable) {
IcebergTable cachedIcebergTable = (IcebergTable) refBaseTable;
String catalogName = cachedIcebergTable.getCatalogName();
String dbName = cachedIcebergTable.getCatalogDBName();
TableName refTableName = new TableName(catalogName, dbName, cachedIcebergTable.getName());
return getIcebergTableCompensationExpr(optimizerContext, mv, refBaseTable, refTableName, refPartitionColRefs);
} else {
return convertPartitionKeysToListPredicate(refPartitionColRefs, compensations);
}
}

@Override
public boolean isNoCompensate() {
return super.isNoCompensate() || (state.isCompensate() && CollectionUtils.isEmpty(compensations));
Expand All @@ -119,20 +107,38 @@ public LogicalScanOperator compensate(OptimizerContext optimizerContext,
.stream()
.map(col -> scanOperator.getColumnReference(col))
.collect(Collectors.toList());
ScalarOperator externalExtraPredicate = getCompensateExpr(optimizerContext, mv,
refBaseTable, refPartitionColRefs);
ScalarOperator externalExtraPredicate;
if (refBaseTable instanceof IcebergTable) {
IcebergTable cachedIcebergTable = (IcebergTable) refBaseTable;
String catalogName = cachedIcebergTable.getCatalogName();
String dbName = cachedIcebergTable.getCatalogDBName();
TableName refTableName = new TableName(catalogName, dbName, cachedIcebergTable.getName());
Table currentTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(refTableName).orElse(null);
if (currentTable == null) {
return null;
}
builder.setTable(currentTable);
TableVersionRange versionRange = TableVersionRange.withEnd(
Optional.ofNullable(((IcebergTable) currentTable).getNativeTable().currentSnapshot())
.map(Snapshot::snapshotId));
builder.setTableVersionRange(versionRange);
externalExtraPredicate = getIcebergTableCompensation(optimizerContext, mv, refBaseTable, refTableName,
refPartitionColRefs);
} else {
externalExtraPredicate = convertPartitionKeysToListPredicate(refPartitionColRefs, compensations);
}
Preconditions.checkState(externalExtraPredicate != null);
externalExtraPredicate.setRedundant(true);
ScalarOperator finalPredicate = Utils.compoundAnd(scanOperator.getPredicate(), externalExtraPredicate);
builder.setPredicate(finalPredicate);
return builder.build();
}

private ScalarOperator getIcebergTableCompensationExpr(OptimizerContext optimizerContext,
MaterializedView mv,
Table refBaseTable,
TableName refTableName,
List<ColumnRefOperator> refPartitionColRefs) {
private ScalarOperator getIcebergTableCompensation(OptimizerContext optimizerContext,
MaterializedView mv,
Table refBaseTable,
TableName refTableName,
List<ColumnRefOperator> refPartitionColRefs) {
PartitionInfo mvPartitionInfo = mv.getPartitionInfo();
if (!mvPartitionInfo.isListPartition()) {
return convertPartitionKeysToListPredicate(refPartitionColRefs, compensations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.sql.analyzer.AnalyzeState;
import com.starrocks.sql.analyzer.ExpressionAnalyzer;
import com.starrocks.sql.analyzer.Field;
import com.starrocks.sql.analyzer.RelationFields;
import com.starrocks.sql.analyzer.RelationId;
import com.starrocks.sql.analyzer.Scope;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.OperatorBuilderFactory;
Expand All @@ -42,7 +40,6 @@
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTransparentState;
import com.starrocks.sql.optimizer.transformer.ExpressionMapping;
import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator;
import com.starrocks.sql.parser.SqlParser;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,34 +108,14 @@ public static TableCompensation build(Table refBaseTable,
Preconditions.checkArgument(mvUpdateInfo.getQueryRewriteConsistencyMode() ==
TableProperty.QueryRewriteConsistencyMode.FORCE_MV);
MaterializedView mv = mvUpdateInfo.getMv();
String partitionRetentionCondition =
mv.getTableProperty().getPartitionRetentionCondition();
Preconditions.checkArgument(partitionRetentionCondition != null);
try {
Expr whereExpr = parseToExpr(refBaseTable, partitionRetentionCondition);
if (whereExpr == null) {
Expr retentionConditionExpr = mv.getRetentionConditionExpr();
if (retentionConditionExpr == null) {
return TableCompensation.unknown();
}
return new PartitionRetentionTableCompensation(refBaseTable, whereExpr);
return new PartitionRetentionTableCompensation(refBaseTable, retentionConditionExpr);
} catch (Exception e) {
return TableCompensation.unknown();
}
}

public static Expr parseToExpr(Table table,
String ttlCondition) {
// needs to parse the expr each schedule because it can be changed dynamically
// TODO: cache the parsed expr to avoid parsing it every time later.
try {
Expr whereExpr = SqlParser.parseSqlToExpr(ttlCondition, SqlModeHelper.MODE_DEFAULT);
if (whereExpr == null) {
return null;
}
return whereExpr;
} catch (Exception e) {
throw new SemanticException("Failed to parse retention condition: " + ttlCondition);
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ private void testMVRefreshWithTTLCondition(String tableName) {
String alterMVSql = String.format("alter materialized view %s set (" +
"'query_rewrite_consistency' = 'force_mv')", mvName);
starRocksAssert.alterMvProperties(alterMVSql);
String plan = getFragmentPlan(connectContext, query);
String plan = getFragmentPlan(connectContext, query, "MV");
PlanTestBase.assertContains(plan, ":UNION");
PlanTestBase.assertContains(plan, String.format("TABLE: %s\n" +
" PREAGGREGATION: ON\n" +
Expand Down

0 comments on commit e9f58a0

Please sign in to comment.