Skip to content

Commit

Permalink
[Enhancement] Support replication from another cluster with compactio…
Browse files Browse the repository at this point in the history
…n enabled in shared-data mode

Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Jan 7, 2025
1 parent 8f6bd26 commit 37337e2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,13 @@ private static TableInfo initTableInfo(TTableReplicationRequest request) throws
throw new MetaNotFoundException("Partition " + tPartitionInfo.partition_id + " in table "
+ table.getName() + " in database " + db.getFullName() + " not found");
}
Preconditions.checkState(partition.getCommittedVersion() == partition.getVisibleVersion(),
Preconditions.checkState(partition.getCommittedDataVersion() == partition.getDataVersion(),
"Partition " + tPartitionInfo.partition_id + " in table " + table.getName()
+ " in database " + db.getFullName() + " publish version not finished");
Preconditions.checkState(partition.getVisibleVersion() <= tPartitionInfo.src_version,
"Target visible version: " + partition.getVisibleVersion()
+ " is larger than source visible version: " + tPartitionInfo.src_version);
if (partition.getVisibleVersion() == tPartitionInfo.src_version) {
Preconditions.checkState(partition.getDataVersion() <= tPartitionInfo.src_version,
"Target data version: " + partition.getDataVersion()
+ " is larger than source data version: " + tPartitionInfo.src_version);
if (partition.getDataVersion() == tPartitionInfo.src_version) {
continue;
}
PartitionInfo partitionInfo = initPartitionInfo(olapTable, tPartitionInfo, partition);
Expand Down Expand Up @@ -708,7 +708,7 @@ private static PartitionInfo initPartitionInfo(OlapTable olapTable, TPartitionRe
IndexInfo indexInfo = initIndexInfo(olapTable, tIndexInfo, index);
indexInfos.put(indexInfo.getIndexId(), indexInfo);
}
return new PartitionInfo(tPartitionInfo.partition_id, partition.getVisibleVersion(),
return new PartitionInfo(tPartitionInfo.partition_id, partition.getDataVersion(),
tPartitionInfo.src_version, tPartitionInfo.src_version_epoch, indexInfos);
}

Expand Down Expand Up @@ -765,13 +765,13 @@ private static Map<Long, PartitionInfo> initPartitionInfos(OlapTable table, Olap
Map<Long, PartitionInfo> partitionInfos = Maps.newHashMap();
for (PhysicalPartition physicalPartition : table.getPhysicalPartitions()) {
PhysicalPartition srcPartition = srcTable.getPhysicalPartition(physicalPartition.getName());
Preconditions.checkState(physicalPartition.getCommittedVersion() == physicalPartition.getVisibleVersion(),
Preconditions.checkState(physicalPartition.getCommittedDataVersion() == physicalPartition.getDataVersion(),
"Partition " + physicalPartition.getName() + " in table " + table.getName()
+ " publish version not finished");
Preconditions.checkState(physicalPartition.getVisibleVersion() <= srcPartition.getVisibleVersion(),
"Target visible version: " + physicalPartition.getVisibleVersion()
+ " is larger than source visible version: " + srcPartition.getVisibleVersion());
if (physicalPartition.getVisibleVersion() == srcPartition.getVisibleVersion()) {
Preconditions.checkState(physicalPartition.getDataVersion() <= srcPartition.getDataVersion(),
"Target data version: " + physicalPartition.getDataVersion()
+ " is larger than source data version: " + srcPartition.getDataVersion());
if (physicalPartition.getDataVersion() == srcPartition.getDataVersion()) {
continue;
}
PartitionInfo partitionInfo = initPartitionInfo(table, srcTable, physicalPartition, srcPartition,
Expand All @@ -792,7 +792,7 @@ private static PartitionInfo initPartitionInfo(OlapTable table, OlapTable srcTab
IndexInfo indexInfo = initIndexInfo(table, srcTable, index, srcIndex, srcSystemInfoService);
indexInfos.put(indexInfo.getIndexId(), indexInfo);
}
return new PartitionInfo(partition.getId(), partition.getVisibleVersion(), srcPartition.getVisibleVersion(),
return new PartitionInfo(partition.getId(), partition.getDataVersion(), srcPartition.getDataVersion(),
srcPartition.getVersionEpoch(), indexInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public class ReplicationTxnCommitAttachment extends TxnCommitAttachment {
@SerializedName("partitionVersions")
private Map<Long, Long> partitionVersions; // The version of partitions
private Map<Long, Long> partitionVersions; // The data version of partitions, not the visible version

@SerializedName("partitionVersionEpochs")
private Map<Long, Long> partitionVersionEpochs; // The version epoch of partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,10 +1271,10 @@ protected void unprotectedCommitPreparedTransaction(TransactionState transaction
(ReplicationTxnCommitAttachment) transactionState
.getTxnCommitAttachment();
Map<Long, Long> partitionVersions = replicationTxnAttachment.getPartitionVersions();
long newVersion = partitionVersions.get(partitionCommitInfo.getPhysicalPartitionId());
long versionDiff = newVersion - partition.getVisibleVersion();
partitionCommitInfo.setVersion(newVersion);
partitionCommitInfo.setDataVersion(partition.getDataVersion() + versionDiff);
long newDataVersion = partitionVersions.get(partitionCommitInfo.getPhysicalPartitionId());
long dataVersionDiff = newDataVersion - partition.getDataVersion();
partitionCommitInfo.setVersion(partition.getCommittedVersion() + dataVersionDiff);
partitionCommitInfo.setDataVersion(newDataVersion);
Map<Long, Long> partitionVersionEpochs = replicationTxnAttachment.getPartitionVersionEpochs();
if (partitionVersionEpochs != null) {
long newVersionEpoch = partitionVersionEpochs.get(partitionCommitInfo.getPhysicalPartitionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ public void applyCommitLog(TransactionState txnState, TableCommitInfo commitInfo

// The version of a replication transaction may not continuously
if (txnState.getSourceType() == TransactionState.LoadJobSourceType.REPLICATION) {
long versionDiff = partitionCommitInfo.getVersion() - partition.getNextVersion();
partition.setNextVersion(partitionCommitInfo.getVersion() + 1);
partition.setNextDataVersion(partition.getNextDataVersion() + versionDiff + 1);
partition.setNextDataVersion(partitionCommitInfo.getDataVersion() + 1);
} else {
partition.setNextVersion(partition.getNextVersion() + 1);
if (txnState.getSourceType() != TransactionState.LoadJobSourceType.LAKE_COMPACTION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,21 @@ public void applyCommitLog(TransactionState txnState, TableCommitInfo commitInfo
}
// The version of a replication transaction may not continuously
if (txnState.getSourceType() == TransactionState.LoadJobSourceType.REPLICATION) {
long versionDiff = partitionCommitInfo.getVersion() - partition.getNextVersion();
partition.setNextVersion(partitionCommitInfo.getVersion() + 1);
partition.setNextDataVersion(partition.getNextDataVersion() + versionDiff + 1);
} else if (txnState.isVersionOverwrite()) {
// overwrite empty partition, it's next version will less than overwrite version
// otherwise, it's next version will not change
if (partitionCommitInfo.getVersion() + 1 > partition.getNextVersion()) {
partition.setNextVersion(partitionCommitInfo.getVersion() + 1);
partition.setNextDataVersion(partition.getNextVersion());
}
} else if (partitionCommitInfo.isDoubleWrite()) {
partition.setNextVersion(partitionCommitInfo.getVersion() + 1);
partition.setNextDataVersion(partition.getNextVersion());
} else {
partition.setNextVersion(partition.getNextVersion() + 1);
partition.setNextDataVersion(partition.getNextDataVersion() + 1);
}
// data version == visible version in shared-nothing mode
partition.setNextDataVersion(partition.getNextVersion());

LOG.debug("partition[{}] next version[{}]", partitionId, partition.getNextVersion());
}
}
Expand Down Expand Up @@ -180,15 +178,15 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
if (txnState.isVersionOverwrite()) {
if (partition.getVisibleVersion() < version) {
partition.updateVisibleVersion(version, versionTime, txnState.getTransactionId());
partition.setDataVersion(partitionCommitInfo.getDataVersion());
partition.setDataVersion(version); // data version == visible version in shared-nothing mode
if (partitionCommitInfo.getVersionEpoch() > 0) {
partition.setVersionEpoch(partitionCommitInfo.getVersionEpoch());
}
partition.setVersionTxnType(txnState.getTransactionType());
}
} else {
partition.updateVisibleVersion(version, versionTime, txnState.getTransactionId());
partition.setDataVersion(partitionCommitInfo.getDataVersion());
partition.setDataVersion(version); // data version == visible version in shared-nothing mode
if (partitionCommitInfo.getVersionEpoch() > 0) {
partition.setVersionEpoch(partitionCommitInfo.getVersionEpoch());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.leader.LeaderImpl;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.ast.CreateTableStmt;
import com.starrocks.system.Backend;
Expand Down Expand Up @@ -68,21 +69,21 @@ public class ReplicationJobTest {

@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinStarRocksCluster();
UtFrameUtils.createMinStarRocksCluster(RunMode.SHARED_NOTHING);
AnalyzeTestUtil.init();
starRocksAssert = new StarRocksAssert(AnalyzeTestUtil.getConnectContext());
starRocksAssert.withDatabase("test").useDatabase("test");

db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");

String sql = "create table single_partition_duplicate_key (key1 int, key2 varchar(10))\n" +
"distributed by hash(key1) buckets 1\n" +
"properties('replication_num' = '1'); ";
"distributed by hash(key1) buckets 1\n" +
"properties('replication_num' = '1'); ";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseStmtWithNewParser(sql,
AnalyzeTestUtil.getConnectContext());
AnalyzeTestUtil.getConnectContext());
StarRocksAssert.utCreateTableWithRetry(createTableStmt);
table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), "single_partition_duplicate_key");
.getTable(db.getFullName(), "single_partition_duplicate_key");
srcTable = DeepCopy.copyWithGson(table, OlapTable.class);

partition = table.getPartitions().iterator().next();
Expand All @@ -106,16 +107,16 @@ public void setUp() throws Exception {
srcPartition.getDefaultPhysicalPartition().setNextDataVersion(99);

job = new ReplicationJob(null, "test_token", db.getId(), table, srcTable,
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
}

@Test
public void testJobId() {
ReplicationJob jobWithoutId = new ReplicationJob(null, "test_token", db.getId(), table, srcTable,
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
Assert.assertFalse(jobWithoutId.getJobId().isEmpty());
ReplicationJob jobWithId = new ReplicationJob("fake_id", "test_token", db.getId(), table, srcTable,
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo());
Assert.assertEquals("fake_id", jobWithId.getJobId());
}

Expand All @@ -141,7 +142,7 @@ public void testNormal() throws Exception {
job.finishRemoteSnapshotTask((RemoteSnapshotTask) task, request);

Deencapsulation.invoke(new LeaderImpl(), "finishRemoteSnapshotTask",
(RemoteSnapshotTask) task, request);
(RemoteSnapshotTask) task, request);
((RemoteSnapshotTask) task).toThrift();
task.toString();
}
Expand All @@ -155,7 +156,7 @@ public void testNormal() throws Exception {
job.finishReplicateSnapshotTask((ReplicateSnapshotTask) task, request);

Deencapsulation.invoke(new LeaderImpl(), "finishReplicateSnapshotTask",
(ReplicateSnapshotTask) task, request);
(ReplicateSnapshotTask) task, request);
((ReplicateSnapshotTask) task).toThrift();
task.toString();
}
Expand All @@ -165,8 +166,9 @@ public void testNormal() throws Exception {

Assert.assertEquals(partition.getDefaultPhysicalPartition().getCommittedVersion(),
srcPartition.getDefaultPhysicalPartition().getVisibleVersion());
// data version == visible version in shared-nothing mode
Assert.assertEquals(partition.getDefaultPhysicalPartition().getCommittedDataVersion(),
srcPartition.getDefaultPhysicalPartition().getDataVersion());
srcPartition.getDefaultPhysicalPartition().getVisibleVersion());
}

@Test
Expand Down Expand Up @@ -349,7 +351,7 @@ public void testInitializedByThrift() {
tabletInfo.replica_replication_infos = new ArrayList<TReplicaReplicationInfo>();
TReplicaReplicationInfo replicaInfo = new TReplicaReplicationInfo();
Backend backend = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackends().iterator()
.next();
.next();
replicaInfo.src_backend = new TBackend(backend.getHost(), backend.getBePort(), backend.getHttpPort());
tabletInfo.replica_replication_infos.add(replicaInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.starrocks.common.proc.ReplicationsProcNode;
import com.starrocks.leader.LeaderImpl;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.ast.CreateTableStmt;
import com.starrocks.system.Backend;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class ReplicationMgrTest {

@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinStarRocksCluster();
UtFrameUtils.createMinStarRocksCluster(RunMode.SHARED_DATA);
AnalyzeTestUtil.init();
starRocksAssert = new StarRocksAssert(AnalyzeTestUtil.getConnectContext());
starRocksAssert.withDatabase("test").useDatabase("test");
Expand Down

0 comments on commit 37337e2

Please sign in to comment.