Skip to content

Commit

Permalink
[Feature] Support restoring from a cluster snapshot for shared-data m…
Browse files Browse the repository at this point in the history
…ode (part 3, introduce gtid for tablet metadata) (#54326)

Signed-off-by: xiangguangyxg <[email protected]>
(cherry picked from commit f977337)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java
#	fe/fe-core/src/main/java/com/starrocks/task/TabletTaskExecutor.java
#	gensrc/proto/lake_types.proto
#	gensrc/thrift/AgentService.thrift
  • Loading branch information
xiangguangyxg authored and mergify[bot] committed Dec 27, 2024
1 parent bf74930 commit 366efb3
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 21 deletions.
1 change: 1 addition & 0 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& req) {
tablet_metadata_pb->set_version(kInitialVersion);
tablet_metadata_pb->set_next_rowset_id(1);
tablet_metadata_pb->set_cumulative_point(0);
tablet_metadata_pb->set_gtid(req.gtid);

if (req.__isset.enable_persistent_index) {
tablet_metadata_pb->set_enable_persistent_index(req.enable_persistent_index);
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/lake/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t

auto new_metadata = std::make_shared<TabletMetadataPB>(*metadata);
new_metadata->set_version(new_version);
new_metadata->set_gtid(txns[0].gtid());

RETURN_IF_ERROR(tablet_mgr->put_tablet_metadata(new_metadata));
return new_metadata;
Expand All @@ -171,7 +172,6 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
VLOG(2) << "publish version tablet_id: " << tablet_id << ", txns: " << txns << ", base_version: " << base_version
<< ", new_version: " << new_version;

auto commit_time = txns.back().commit_time();
auto new_metadata_path = tablet_mgr->tablet_metadata_location(tablet_id, new_version);
auto cached_new_metadata = tablet_mgr->metacache()->lookup_tablet_metadata(new_metadata_path);
if (cached_new_metadata != nullptr) {
Expand Down Expand Up @@ -222,6 +222,8 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
std::unique_ptr<TxnLogApplier> log_applier;
std::shared_ptr<TabletMetadataPB> new_metadata;
std::vector<std::string> files_to_delete;
auto commit_time = txns.back().commit_time();
auto gtid = txns.back().gtid();

// Apply txn logs
int64_t alter_version = -1;
Expand Down Expand Up @@ -321,6 +323,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
}

new_metadata->set_commit_time(commit_time);
new_metadata->set_gtid(gtid);

auto init_st = log_applier->init();
if (!init_st.ok()) {
Expand Down
24 changes: 21 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,23 @@ public LakeRollupJob() {

@Override
protected void runPendingJob() throws AlterCancelException {
boolean enableTabletCreationOptimization = Config.lake_enable_tablet_creation_optimization;
long numTablets = 0;
AgentBatchTask batchTask = new AgentBatchTask();
MarkedCountDownLatch<Long, Long> countDownLatch;
try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) {
LakeTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.ROLLUP);
MaterializedIndexMeta index = table.getIndexMetaByIndexId(table.getBaseIndexId());

for (MaterializedIndex rollupIdx : physicalPartitionIdToRollupIndex.values()) {
numTablets += rollupIdx.getTablets().size();
if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIdToRollupIndex.size();
} else {
numTablets = physicalPartitionIdToRollupIndex.values().stream().map(MaterializedIndex::getTablets)
.mapToLong(List::size).sum();
}
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);

long gtid = getNextGtid();
for (Map.Entry<Long, MaterializedIndex> entry : this.physicalPartitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
PhysicalPartition partition = table.getPhysicalPartition(partitionId);
Expand Down Expand Up @@ -213,11 +217,17 @@ protected void runPendingJob() throws AlterCancelException {
.setCompressionType(table.getCompressionType())
.setCreateSchemaFile(createSchemaFile)
.setTabletSchema(tabletSchema)
.setEnableTabletCreationOptimization(enableTabletCreationOptimization)
.setGtid(gtid)
.build();

// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
batchTask.addTask(task);

if (enableTabletCreationOptimization) {
break;
}
} // end for rollupTablets
}
}
Expand All @@ -231,6 +241,7 @@ protected void runPendingJob() throws AlterCancelException {
throw new IllegalStateException("Table State doesn't equal to ROLLUP, it is " + table.getState() + ".");
}
watershedTxnId = getNextTransactionId();
watershedGtid = getNextGtid();
addRollIndexToCatalog(table);
}

Expand Down Expand Up @@ -493,6 +504,7 @@ public void replay(AlterJobV2 lakeRollupJob) {
this.timeoutMs = other.timeoutMs;

this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.commitVersionMap = other.commitVersionMap;

this.physicalPartitionIdToBaseRollupTabletIdMap = other.physicalPartitionIdToBaseRollupTabletIdMap;
Expand Down Expand Up @@ -582,6 +594,10 @@ public static long peekNextTransactionId() {
return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId();
}

public static long getNextGtid() {
return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid();
}

void addRollIndexToCatalog(@NotNull LakeTable tbl) {
for (Partition partition : tbl.getPartitions()) {
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
Expand Down Expand Up @@ -657,6 +673,7 @@ private boolean publishVersion() {
rollUpTxnInfo.combinedTxnLog = false;
rollUpTxnInfo.commitTime = finishedTimeMs / 1000;
rollUpTxnInfo.txnType = TxnTypePB.TXN_NORMAL;
rollUpTxnInfo.gtid = watershedGtid;
// publish rollup tablets
Utils.publishVersion(physicalPartitionIdToRollupIndex.get(partitionId).getTablets(), rollUpTxnInfo,
1, commitVersion, warehouseId);
Expand All @@ -666,6 +683,7 @@ private boolean publishVersion() {
originTxnInfo.combinedTxnLog = false;
originTxnInfo.commitTime = finishedTimeMs / 1000;
originTxnInfo.txnType = TxnTypePB.TXN_EMPTY;
originTxnInfo.gtid = watershedGtid;
// publish origin tablets
Utils.publishVersion(allOtherPartitionTablets, originTxnInfo, commitVersion - 1,
commitVersion, warehouseId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public abstract class LakeTableAlterMetaJobBase extends AlterJobV2 {
private static final Logger LOG = LogManager.getLogger(LakeTableAlterMetaJobBase.class);
@SerializedName(value = "watershedTxnId")
private long watershedTxnId = -1;
@SerializedName(value = "watershedGtid")
private long watershedGtid = -1;
// PhysicalPartitionId -> indexId -> MaterializedIndex
@SerializedName(value = "partitionIndexMap")
private Table<Long, Long, MaterializedIndex> physicalPartitionIndexMap = HashBasedTable.create();
Expand Down Expand Up @@ -107,6 +109,7 @@ protected void runPendingJob() throws AlterCancelException {
if (this.watershedTxnId == -1) {
this.watershedTxnId = globalStateMgr.getGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId();
this.watershedGtid = globalStateMgr.getGtidGenerator().nextGtid();
GlobalStateMgr.getCurrentState().getEditLog().logAlterJob(this);
}

Expand Down Expand Up @@ -259,6 +262,7 @@ boolean publishVersion() {
txnInfo.combinedTxnLog = false;
txnInfo.commitTime = finishedTimeMs / 1000;
txnInfo.txnType = TxnTypePB.TXN_NORMAL;
txnInfo.gtid = watershedGtid;
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> dirtyIndexMap = physicalPartitionIndexMap.row(partitionId);
Expand Down Expand Up @@ -475,6 +479,7 @@ public void replay(AlterJobV2 replayedJob) {

this.physicalPartitionIndexMap = other.physicalPartitionIndexMap;
this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.commitVersionMap = other.commitVersionMap;

restoreState(other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ public static long peekNextTransactionId() {
return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId();
}

public static long getNextGtid() {
return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid();
}

@VisibleForTesting
public void setIsCancelling(boolean isCancelling) {
this.isCancelling.set(isCancelling);
Expand Down Expand Up @@ -320,7 +324,7 @@ protected void runPendingJob() throws AlterCancelException {
try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) {
OlapTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE);
MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(table.getBaseIndexId());

if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIndexMap.size();
} else {
Expand All @@ -330,6 +334,7 @@ protected void runPendingJob() throws AlterCancelException {
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);
createReplicaLatch = countDownLatch;
long baseIndexId = table.getBaseIndexId();
long gtid = getNextGtid();
for (long physicalPartitionId : physicalPartitionIndexMap.rowKeySet()) {
PhysicalPartition physicalPartition = table.getPhysicalPartition(physicalPartitionId);
Preconditions.checkState(physicalPartition != null);
Expand Down Expand Up @@ -389,6 +394,7 @@ protected void runPendingJob() throws AlterCancelException {
.setCreateSchemaFile(createSchemaFile)
.setTabletSchema(tabletSchema)
.setEnableTabletCreationOptimization(enableTabletCreationOptimization)
.setGtid(gtid)
.build();
// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
Expand All @@ -412,6 +418,7 @@ protected void runPendingJob() throws AlterCancelException {
OlapTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE);
watershedTxnId = getNextTransactionId();
watershedGtid = getNextGtid();
addShadowIndexToCatalog(table, watershedTxnId);
}

Expand Down Expand Up @@ -651,6 +658,7 @@ boolean publishVersion() {
txnInfo.combinedTxnLog = false;
txnInfo.txnType = TxnTypePB.TXN_NORMAL;
txnInfo.commitTime = finishedTimeMs / 1000;
txnInfo.gtid = watershedGtid;
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(partitionId);
Expand Down Expand Up @@ -753,6 +761,7 @@ public void replay(AlterJobV2 replayedJob) {
this.indexChange = other.indexChange;
this.indexes = other.indexes;
this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.startTime = other.startTime;
this.commitVersionMap = other.commitVersionMap;
// this.schemaChangeBatchTask = other.schemaChangeBatchTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class LakeTableSchemaChangeJobBase extends AlterJobV2 {
// The job will wait all transactions before this txn id finished, then send the rollup tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
@SerializedName(value = "watershedGtid")
protected long watershedGtid = -1;

public LakeTableSchemaChangeJobBase(long jobId, JobType jobType, long dbId, long tableId,
String tableName, long timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static TxnInfoPB fromTransactionState(TransactionState state) {
} else {
infoPB.forcePublish = false;
}
infoPB.setGtid(state.getGlobalTransactionId());
return infoPB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1776,18 +1776,23 @@ void buildPartitions(Database db, OlapTable table, List<PhysicalPartition> parti
numReplicas += partition.storageReplicaCount();
}

TabletTaskExecutor.CreateTabletOption option = new TabletTaskExecutor.CreateTabletOption();
option.setEnableTabletCreationOptimization(table.isCloudNativeTableOrMaterializedView()
&& Config.lake_enable_tablet_creation_optimization);
option.setGtid(GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid());

try {
GlobalStateMgr.getCurrentState().getConsistencyChecker().addCreatingTableId(table.getId());
if (numReplicas > Config.create_table_max_serial_replicas) {
LOG.info("start to build {} partitions concurrently for table {}.{} with {} replicas",
partitions.size(), db.getFullName(), table.getName(), numReplicas);
TabletTaskExecutor.buildPartitionsConcurrently(
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId);
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option);
} else {
LOG.info("start to build {} partitions sequentially for table {}.{} with {} replicas",
partitions.size(), db.getFullName(), table.getName(), numReplicas);
TabletTaskExecutor.buildPartitionsSequentially(
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId);
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option);
}
} finally {
GlobalStateMgr.getCurrentState().getConsistencyChecker().deleteCreatingTableId(table.getId());
Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public enum RecoverySource {
private boolean createSchemaFile = true;
private boolean enableTabletCreationOptimization = false;
private final TTabletSchema tabletSchema;
<<<<<<< HEAD
=======
private long gtid = 0;
private long timeoutMs = -1;
>>>>>>> f977337f8 ([Feature] Support restoring from a cluster snapshot for shared-data mode (part 3, introduce gtid for tablet metadata) (#54326))

private CreateReplicaTask(Builder builder) {
super(null, builder.getNodeId(), TTaskType.CREATE, builder.getDbId(), builder.getTableId(),
Expand All @@ -104,6 +109,7 @@ private CreateReplicaTask(Builder builder) {
this.baseTabletId = builder.getBaseTabletId();
this.recoverySource = builder.getRecoverySource();
this.inRestoreMode = builder.isInRestoreMode();
this.gtid = builder.getGtid();
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -177,6 +183,7 @@ public TCreateTabletReq toThrift() {
createTabletReq.setTablet_type(tabletType);
createTabletReq.setCreate_schema_file(createSchemaFile);
createTabletReq.setEnable_tablet_creation_optimization(enableTabletCreationOptimization);
createTabletReq.setGtid(gtid);
return createTabletReq;
}

Expand Down Expand Up @@ -205,6 +212,7 @@ public static class Builder {
private boolean createSchemaFile = true;
private boolean enableTabletCreationOptimization = false;
private TTabletSchema tabletSchema;
private long gtid = 0;

private Builder() {
}
Expand Down Expand Up @@ -407,6 +415,15 @@ public Builder setTabletSchema(TTabletSchema tabletSchema) {
return this;
}

public long getGtid() {
return gtid;
}

public Builder setGtid(long gtid) {
this.gtid = gtid;
return this;
}

public CreateReplicaTask build() {
Preconditions.checkState(nodeId != INVALID_ID);
Preconditions.checkState(dbId != INVALID_ID);
Expand Down
Loading

0 comments on commit 366efb3

Please sign in to comment.