Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Nov 8, 2024
1 parent 22ee74f commit 84fbd74
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.MaterializedIndex.IndexState;
import com.starrocks.catalog.Replica.ReplicaState;
import com.starrocks.common.DdlException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.DistributionDesc;
import com.starrocks.sql.ast.HashDistributionDesc;
import com.starrocks.sql.ast.IndexDef;
Expand Down Expand Up @@ -467,7 +468,12 @@ private void updateMetaInternal(String dbName, TTableMeta meta, List<TBackendMet
null, // TODO(wulei): fix it
defaultDistributionInfo);

PhysicalPartition physicalPartition = logicalPartition.getDefaultPhysicalPartition();
PhysicalPartition physicalPartition = new PhysicalPartition(GlobalStateMgr.getCurrentState().getNextId(),
partitionMeta.getPartition_name(),
partitionMeta.getPartition_id(), // TODO(wulei): fix it
0, null);

logicalPartition.addSubPartition(physicalPartition);

physicalPartition.setNextVersion(partitionMeta.getNext_version());
physicalPartition.updateVisibleVersion(partitionMeta.getVisible_version(),
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
partitionInfo.setReplicationNum(id, (short) restoreReplicationNum);
}

// reset partiton info
// reset partition info
partitionInfo.setPartitionIdsForRestore(partitionOldIdToNewId);

// reset partitions
Expand All @@ -855,7 +855,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
physicalPartitionNameToPartitionId.clear();
for (Partition partition : partitions) {
long newPartitionId = partitionOldIdToNewId.get(partition.getId());
partition.getDefaultPhysicalPartition().setIdForRestore(newPartitionId);
partition.setIdForRestore(newPartitionId);
idToPartition.put(newPartitionId, partition);
List<PhysicalPartition> origPhysicalPartitions = Lists.newArrayList(partition.getSubPartitions());
origPhysicalPartitions.forEach(physicalPartition -> {
Expand Down
78 changes: 16 additions & 62 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.transaction.TransactionType;
Expand All @@ -53,7 +52,7 @@
/**
* Internal representation of partition-related metadata.
*/
public class Partition extends MetaObject implements GsonPreProcessable, GsonPostProcessable {
public class Partition extends MetaObject implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Partition.class);

public static final long PARTITION_INIT_VERSION = 1L;
Expand Down Expand Up @@ -121,9 +120,11 @@ public Partition(long id, String name,
this.distributionInfo = distributionInfo;
this.shardGroupId = shardGroupId;

this.defaultPhysicalPartitionId = id;
PhysicalPartition physicalPartition = new PhysicalPartition(id, name, id, this.shardGroupId, baseIndex);
this.idToSubPartition.put(id, physicalPartition);
long physicalPartitionId = id;
this.defaultPhysicalPartitionId = physicalPartitionId;
PhysicalPartition physicalPartition = new PhysicalPartition(physicalPartitionId, name, id, this.shardGroupId, baseIndex);
this.idToSubPartition.put(physicalPartitionId, physicalPartition);
this.nameToSubPartition.put(name, physicalPartition);
}

public Partition shallowCopy() {
Expand All @@ -143,11 +144,16 @@ public Partition shallowCopy() {
partition.versionTxnType = this.versionTxnType;
partition.distributionInfo = this.distributionInfo;
partition.shardGroupId = this.shardGroupId;
partition.defaultPhysicalPartitionId = this.defaultPhysicalPartitionId;
partition.idToSubPartition = Maps.newHashMap(this.idToSubPartition);
partition.nameToSubPartition = Maps.newHashMap(this.nameToSubPartition);
return partition;
}

public void setIdForRestore(long id) {
this.id = id;
}

public long getId() {
return this.id;
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public void setDistributionInfo(DistributionInfo distributionInfo) {
}

public void addSubPartition(PhysicalPartition subPartition) {
if (defaultPhysicalPartitionId == 0) {
if (idToSubPartition.size() == 0) {
defaultPhysicalPartitionId = subPartition.getId();
}
if (subPartition.getName() == null) {
Expand Down Expand Up @@ -271,28 +277,6 @@ public String toString() {
buffer.append("partition_id: ").append(id).append("; ");
buffer.append("name: ").append(name).append("; ");
buffer.append("partition_state.name: ").append(state.name()).append("; ");

buffer.append("base_index: ").append(baseIndex.toString()).append("; ");

int rollupCount = (idToVisibleRollupIndex != null) ? idToVisibleRollupIndex.size() : 0;
buffer.append("rollup count: ").append(rollupCount).append("; ");

if (idToVisibleRollupIndex != null) {
for (Map.Entry<Long, MaterializedIndex> entry : idToVisibleRollupIndex.entrySet()) {
buffer.append("rollup_index: ").append(entry.getValue().toString()).append("; ");
}
}

buffer.append("visibleVersion: ").append(visibleVersion).append("; ");
buffer.append("committedVersion: ").append(this.nextVersion - 1).append("; ");
buffer.append("nextVersion: ").append(nextVersion).append("; ");

buffer.append("dataVersion: ").append(dataVersion).append("; ");
buffer.append("committedDataVersion: ").append(this.nextDataVersion - 1).append("; ");

buffer.append("versionEpoch: ").append(versionEpoch).append("; ");
buffer.append("versionTxnType: ").append(versionTxnType).append("; ");

buffer.append("distribution_info.type: ").append(distributionInfo.getType().name()).append("; ");
buffer.append("distribution_info: ").append(distributionInfo.toString());

Expand All @@ -303,38 +287,6 @@ public String generatePhysicalPartitionName(long physicalPartitionId) {
return this.name + '_' + physicalPartitionId;
}

@Override
public void gsonPreProcess() throws IOException {
if (defaultPhysicalPartitionId != 0) {
PhysicalPartition physicalPartition = idToSubPartition.get(defaultPhysicalPartitionId);
this.shardGroupId = physicalPartition.getShardGroupId();
this.baseIndex = physicalPartition.getBaseIndex();
this.isImmutable.set(physicalPartition.isImmutable());
this.idToVisibleRollupIndex.clear();
for (MaterializedIndex materializedIndex
: physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
this.idToVisibleRollupIndex.put(materializedIndex.getId(), materializedIndex);
}

this.idToShadowIndex.clear();
for (MaterializedIndex materializedIndex
: physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.SHADOW)) {
this.idToShadowIndex.put(materializedIndex.getId(), materializedIndex);
}

this.visibleVersion = physicalPartition.getVisibleVersion();
this.visibleVersionTime = physicalPartition.getVisibleVersionTime();
this.nextVersion = physicalPartition.getNextVersion();
this.dataVersion = physicalPartition.getDataVersion();
this.nextDataVersion = physicalPartition.getNextDataVersion();
this.versionEpoch = physicalPartition.getVersionEpoch();
this.versionTxnType = physicalPartition.getVersionTxnType();

idToSubPartition.remove(defaultPhysicalPartitionId);
defaultPhysicalPartitionId = 0;
}
}

@Override
public void gsonPostProcess() throws IOException {
if (dataVersion == 0) {
Expand All @@ -358,11 +310,13 @@ public void gsonPostProcess() throws IOException {
}

if (defaultPhysicalPartitionId == 0) {
defaultPhysicalPartitionId = id;
String partitionJson = GsonUtils.GSON.toJson(this);
PhysicalPartition physicalPartition = GsonUtils.GSON.fromJson(partitionJson, PhysicalPartition.class);
physicalPartition.setParentId(id);
idToSubPartition.put(id, physicalPartition);

long nextId = GlobalStateMgr.getCurrentState().getNextId();
defaultPhysicalPartitionId = nextId;
idToSubPartition.put(nextId, physicalPartition);
nameToSubPartition.put(name, physicalPartition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public class PhysicalPartition extends MetaObject implements GsonPostProcessable

private volatile long minRetainVersion = 0;

private PhysicalPartition() {

}

public PhysicalPartition(long id, String name, long parentId, long sharedGroupId, MaterializedIndex baseIndex) {
this.id = id;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public void checkIfScanRangeNumSafe(long scanRangeSize) {
long totalTabletsNum = 0;
for (long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
List<PhysicalPartition> physicalPartitions = (List<PhysicalPartition>) partition.getSubPartitions();
Collection<PhysicalPartition> physicalPartitions = partition.getSubPartitions();
totalPartitionNum += physicalPartitions.size();
for (PhysicalPartition physicalPartition : physicalPartitions) {
final MaterializedIndex selectedTable = physicalPartition.getIndex(selectedIndexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ private Map<String, MaterializedView.BasePartitionInfo> getRefreshedPartitionInf
OlapTable olapTable = (OlapTable) baseTable;
for (String partitionName : refreshedPartitionNames) {
Partition partition = olapTable.getPartition(partitionName);
// it's ok to skip because only existed partitinos are updated in the version map.
// it's ok to skip because only existed partitions are updated in the version map.
if (partition == null) {
LOG.warn("Partition {} not found in base table {} in refreshing {}, refreshedPartitionNames:{}",
partitionName, baseTable.getName(), materializedView.getName(), refreshedPartitionNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1526,7 +1526,7 @@ private PhysicalPartition createPhysicalPartition(String name, Database db, Olap
indexMap.put(indexId, rollup);
}

Long id = GlobalStateMgr.getCurrentState().getNextId();
long id = GlobalStateMgr.getCurrentState().getNextId();
// physical partitions in the same logical partition use the same shard_group_id,
// so that the shards of this logical partition are more evenly distributed.
long shardGroupId = partition.getDefaultPhysicalPartition().getShardGroupId();
Expand Down Expand Up @@ -1703,15 +1703,14 @@ Partition createPartition(Database db, OlapTable table, long partitionId, String
distributionInfo);

PhysicalPartition physicalPartition = new PhysicalPartition(
partitionId,
GlobalStateMgr.getCurrentState().getNextId(),
partitionName,
partitionId,
shardGroupId,
indexMap.get(table.getBaseIndexId()));

logicalPartition.addSubPartition(physicalPartition);

//LogicalPartition partition = new LogicalPartition(partitionId, partitionName, indexMap.get(table.getBaseIndexId()), distributionInfo, shardGroupId);
// version
if (version != null) {
physicalPartition.updateVisibleVersion(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Type;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testErrorBaseIndexName(@Injectable CreateMaterializedViewStmt create
public void testRollupReplica(@Injectable CreateMaterializedViewStmt createMaterializedViewStmt,
@Injectable Database db,
@Injectable OlapTable olapTable,
@Injectable Partition partition,
@Injectable PhysicalPartition partition,
@Injectable MaterializedIndex materializedIndex) {
final String baseIndexName = "t1";
final Long baseIndexId = new Long(1);
Expand All @@ -149,8 +149,10 @@ public void testRollupReplica(@Injectable CreateMaterializedViewStmt createMater
result = baseIndexId;
olapTable.getPhysicalPartitions();
result = Lists.newArrayList(partition);
partition.getDefaultPhysicalPartition().getIndex(baseIndexId);

partition.getIndex(baseIndexId);
result = materializedIndex;

materializedIndex.getState();
result = MaterializedIndex.IndexState.SHADOW;
}
Expand Down Expand Up @@ -320,7 +322,7 @@ public void checkInvalidPartitionKeyMV(@Injectable CreateMaterializedViewStmt cr
}

@Test
public void testCheckDropMaterializedView(@Injectable OlapTable olapTable, @Injectable Partition partition,
public void testCheckDropMaterializedView(@Injectable OlapTable olapTable, @Injectable PhysicalPartition partition,
@Injectable MaterializedIndex materializedIndex,
@Injectable Database db) {
String mvName = "mv_1";
Expand All @@ -336,9 +338,11 @@ public void testCheckDropMaterializedView(@Injectable OlapTable olapTable, @Inje
result = 1L;
olapTable.getSchemaHashByIndexId(1L);
result = 1;

olapTable.getPhysicalPartitions();
result = Lists.newArrayList(partition);
partition.getDefaultPhysicalPartition().getIndex(1L);

partition.getIndex(1L);
result = materializedIndex;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.starrocks.catalog.MaterializedIndex.IndexState;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.RandomDistributionInfo;
import com.starrocks.catalog.SinglePartitionInfo;
import com.starrocks.catalog.Type;
Expand Down Expand Up @@ -103,19 +104,28 @@ public static OlapTable mockTable(String name) {
}
};

Partition partition = Deencapsulation.newInstance(Partition.class);
new Expectations(partition) {
PhysicalPartition physicalPartition = Deencapsulation.newInstance(PhysicalPartition.class);
new Expectations(physicalPartition) {
{
partition.getDefaultPhysicalPartition().getBaseIndex();
physicalPartition.getBaseIndex();
minTimes = 0;
result = index;

partition.getDefaultPhysicalPartition().getIndex(30000L);
physicalPartition.getIndex(30000L);
minTimes = 0;
result = index;
}
};

Partition partition = Deencapsulation.newInstance(Partition.class);
new Expectations(partition) {
{
partition.getDefaultPhysicalPartition();
minTimes = 0;
result = physicalPartition;
}
};

OlapTable table = new OlapTable();
new Expectations(table) {
{
Expand Down Expand Up @@ -234,19 +244,28 @@ public static Analyzer fetchTableAnalyzer() {
}
};

Partition partition = Deencapsulation.newInstance(Partition.class);
new Expectations(partition) {
PhysicalPartition physicalPartition = Deencapsulation.newInstance(PhysicalPartition.class);
new Expectations(physicalPartition) {
{
partition.getDefaultPhysicalPartition().getBaseIndex();
physicalPartition.getBaseIndex();
minTimes = 0;
result = index;

partition.getDefaultPhysicalPartition().getIndex(30000L);
physicalPartition.getIndex(30000L);
minTimes = 0;
result = index;
}
};

Partition partition = Deencapsulation.newInstance(Partition.class);
new Expectations(partition) {
{
partition.getDefaultPhysicalPartition();
minTimes = 0;
result = physicalPartition;
}
};

OlapTable table = new OlapTable();
new Expectations(table) {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class CatalogUtilsTest {
@Mock
private Partition partition;

@Mock
private PhysicalPartition physicalPartition;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
Expand All @@ -61,7 +64,8 @@ public void testCalAvgBucketNumOfRecentPartitions_CalculateByDataSize() {
partitions.add(partition);
when(olapTable.getPartitions()).thenReturn(partitions);
when(olapTable.getRecentPartitions(anyInt())).thenReturn(partitions);
when(partition.getDefaultPhysicalPartition().getVisibleVersion()).thenReturn(2L);
when(partition.getDefaultPhysicalPartition()).thenReturn(physicalPartition);
when(physicalPartition.getVisibleVersion()).thenReturn(2L);
when(partition.getDataSize()).thenReturn(2L * FeConstants.AUTO_DISTRIBUTION_UNIT);

int bucketNum = CatalogUtils.calAvgBucketNumOfRecentPartitions(olapTable, 1, true);
Expand Down
Loading

0 comments on commit 84fbd74

Please sign in to comment.