Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support restoring from a cluster snapshot for shared-data mode (part 3, introduce gtid for tablet metadata) #54326

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& req) {
tablet_metadata_pb->set_version(kInitialVersion);
tablet_metadata_pb->set_next_rowset_id(1);
tablet_metadata_pb->set_cumulative_point(0);
tablet_metadata_pb->set_gtid(req.gtid);

if (req.__isset.enable_persistent_index) {
tablet_metadata_pb->set_enable_persistent_index(req.enable_persistent_index);
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/lake/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t

auto new_metadata = std::make_shared<TabletMetadataPB>(*metadata);
new_metadata->set_version(new_version);
new_metadata->set_gtid(txns[0].gtid());

RETURN_IF_ERROR(tablet_mgr->put_tablet_metadata(new_metadata));
return new_metadata;
Expand All @@ -171,7 +172,6 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
VLOG(2) << "publish version tablet_id: " << tablet_id << ", txns: " << txns << ", base_version: " << base_version
<< ", new_version: " << new_version;

auto commit_time = txns.back().commit_time();
auto new_metadata_path = tablet_mgr->tablet_metadata_location(tablet_id, new_version);
auto cached_new_metadata = tablet_mgr->metacache()->lookup_tablet_metadata(new_metadata_path);
if (cached_new_metadata != nullptr) {
Expand Down Expand Up @@ -222,6 +222,8 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
std::unique_ptr<TxnLogApplier> log_applier;
std::shared_ptr<TabletMetadataPB> new_metadata;
std::vector<std::string> files_to_delete;
auto commit_time = txns.back().commit_time();
auto gtid = txns.back().gtid();

// Apply txn logs
int64_t alter_version = -1;
Expand Down Expand Up @@ -322,6 +324,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
}

new_metadata->set_commit_time(commit_time);
new_metadata->set_gtid(gtid);

auto init_st = log_applier->init();
if (!init_st.ok()) {
Expand Down
24 changes: 21 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,23 @@ public LakeRollupJob() {

@Override
protected void runPendingJob() throws AlterCancelException {
boolean enableTabletCreationOptimization = Config.lake_enable_tablet_creation_optimization;
long numTablets = 0;
AgentBatchTask batchTask = new AgentBatchTask();
MarkedCountDownLatch<Long, Long> countDownLatch;
try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) {
LakeTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.ROLLUP);
MaterializedIndexMeta index = table.getIndexMetaByIndexId(table.getBaseIndexId());

for (MaterializedIndex rollupIdx : physicalPartitionIdToRollupIndex.values()) {
numTablets += rollupIdx.getTablets().size();
if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIdToRollupIndex.size();
} else {
numTablets = physicalPartitionIdToRollupIndex.values().stream().map(MaterializedIndex::getTablets)
.mapToLong(List::size).sum();
}
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);

long gtid = getNextGtid();
for (Map.Entry<Long, MaterializedIndex> entry : this.physicalPartitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
PhysicalPartition partition = table.getPhysicalPartition(partitionId);
Expand Down Expand Up @@ -213,11 +217,17 @@ protected void runPendingJob() throws AlterCancelException {
.setCompressionType(table.getCompressionType())
.setCreateSchemaFile(createSchemaFile)
.setTabletSchema(tabletSchema)
.setEnableTabletCreationOptimization(enableTabletCreationOptimization)
.setGtid(gtid)
.build();

// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
batchTask.addTask(task);

if (enableTabletCreationOptimization) {
break;
}
} // end for rollupTablets
}
}
Expand All @@ -231,6 +241,7 @@ protected void runPendingJob() throws AlterCancelException {
throw new IllegalStateException("Table State doesn't equal to ROLLUP, it is " + table.getState() + ".");
}
watershedTxnId = getNextTransactionId();
watershedGtid = getNextGtid();
addRollIndexToCatalog(table);
}

Expand Down Expand Up @@ -493,6 +504,7 @@ public void replay(AlterJobV2 lakeRollupJob) {
this.timeoutMs = other.timeoutMs;

this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.commitVersionMap = other.commitVersionMap;

this.physicalPartitionIdToBaseRollupTabletIdMap = other.physicalPartitionIdToBaseRollupTabletIdMap;
Expand Down Expand Up @@ -582,6 +594,10 @@ public static long peekNextTransactionId() {
return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId();
}

public static long getNextGtid() {
return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid();
}

void addRollIndexToCatalog(@NotNull LakeTable tbl) {
for (Partition partition : tbl.getPartitions()) {
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
Expand Down Expand Up @@ -657,6 +673,7 @@ private boolean publishVersion() {
rollUpTxnInfo.combinedTxnLog = false;
rollUpTxnInfo.commitTime = finishedTimeMs / 1000;
rollUpTxnInfo.txnType = TxnTypePB.TXN_NORMAL;
rollUpTxnInfo.gtid = watershedGtid;
// publish rollup tablets
Utils.publishVersion(physicalPartitionIdToRollupIndex.get(partitionId).getTablets(), rollUpTxnInfo,
1, commitVersion, warehouseId);
Expand All @@ -666,6 +683,7 @@ private boolean publishVersion() {
originTxnInfo.combinedTxnLog = false;
originTxnInfo.commitTime = finishedTimeMs / 1000;
originTxnInfo.txnType = TxnTypePB.TXN_EMPTY;
originTxnInfo.gtid = watershedGtid;
// publish origin tablets
Utils.publishVersion(allOtherPartitionTablets, originTxnInfo, commitVersion - 1,
commitVersion, warehouseId);
xiangguangyxg marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public abstract class LakeTableAlterMetaJobBase extends AlterJobV2 {
private static final Logger LOG = LogManager.getLogger(LakeTableAlterMetaJobBase.class);
@SerializedName(value = "watershedTxnId")
private long watershedTxnId = -1;
@SerializedName(value = "watershedGtid")
private long watershedGtid = -1;
// PhysicalPartitionId -> indexId -> MaterializedIndex
@SerializedName(value = "partitionIndexMap")
private Table<Long, Long, MaterializedIndex> physicalPartitionIndexMap = HashBasedTable.create();
Expand Down Expand Up @@ -107,6 +109,7 @@ protected void runPendingJob() throws AlterCancelException {
if (this.watershedTxnId == -1) {
this.watershedTxnId = globalStateMgr.getGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId();
this.watershedGtid = globalStateMgr.getGtidGenerator().nextGtid();
GlobalStateMgr.getCurrentState().getEditLog().logAlterJob(this);
}

Expand Down Expand Up @@ -259,6 +262,7 @@ boolean publishVersion() {
txnInfo.combinedTxnLog = false;
txnInfo.commitTime = finishedTimeMs / 1000;
txnInfo.txnType = TxnTypePB.TXN_NORMAL;
txnInfo.gtid = watershedGtid;
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> dirtyIndexMap = physicalPartitionIndexMap.row(partitionId);
Expand Down Expand Up @@ -475,6 +479,7 @@ public void replay(AlterJobV2 replayedJob) {

this.physicalPartitionIndexMap = other.physicalPartitionIndexMap;
this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.commitVersionMap = other.commitVersionMap;

restoreState(other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ public static long peekNextTransactionId() {
return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId();
}

public static long getNextGtid() {
return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid();
}

@VisibleForTesting
public void setIsCancelling(boolean isCancelling) {
this.isCancelling.set(isCancelling);
Expand Down Expand Up @@ -320,7 +324,7 @@ protected void runPendingJob() throws AlterCancelException {
try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) {
OlapTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE);
MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(table.getBaseIndexId());

if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIndexMap.size();
} else {
Expand All @@ -330,6 +334,7 @@ protected void runPendingJob() throws AlterCancelException {
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);
createReplicaLatch = countDownLatch;
long baseIndexId = table.getBaseIndexId();
long gtid = getNextGtid();
for (long physicalPartitionId : physicalPartitionIndexMap.rowKeySet()) {
PhysicalPartition physicalPartition = table.getPhysicalPartition(physicalPartitionId);
Preconditions.checkState(physicalPartition != null);
Expand Down Expand Up @@ -389,6 +394,7 @@ protected void runPendingJob() throws AlterCancelException {
.setCreateSchemaFile(createSchemaFile)
.setTabletSchema(tabletSchema)
.setEnableTabletCreationOptimization(enableTabletCreationOptimization)
.setGtid(gtid)
.build();
// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
Expand All @@ -412,6 +418,7 @@ protected void runPendingJob() throws AlterCancelException {
OlapTable table = getTableOrThrow(db, tableId);
Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE);
watershedTxnId = getNextTransactionId();
watershedGtid = getNextGtid();
addShadowIndexToCatalog(table, watershedTxnId);
}

Expand Down Expand Up @@ -651,6 +658,7 @@ boolean publishVersion() {
txnInfo.combinedTxnLog = false;
txnInfo.txnType = TxnTypePB.TXN_NORMAL;
txnInfo.commitTime = finishedTimeMs / 1000;
txnInfo.gtid = watershedGtid;
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(partitionId);
Expand Down Expand Up @@ -753,6 +761,7 @@ public void replay(AlterJobV2 replayedJob) {
this.indexChange = other.indexChange;
this.indexes = other.indexes;
this.watershedTxnId = other.watershedTxnId;
this.watershedGtid = other.watershedGtid;
this.startTime = other.startTime;
this.commitVersionMap = other.commitVersionMap;
// this.schemaChangeBatchTask = other.schemaChangeBatchTask;
xiangguangyxg marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class LakeTableSchemaChangeJobBase extends AlterJobV2 {
// The job will wait all transactions before this txn id finished, then send the rollup tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
@SerializedName(value = "watershedGtid")
protected long watershedGtid = -1;

public LakeTableSchemaChangeJobBase(long jobId, JobType jobType, long dbId, long tableId,
String tableName, long timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static TxnInfoPB fromTransactionState(TransactionState state) {
} else {
infoPB.forcePublish = false;
}
infoPB.setGtid(state.getGlobalTransactionId());
return infoPB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1791,18 +1791,23 @@ void buildPartitions(Database db, OlapTable table, List<PhysicalPartition> parti
numReplicas += partition.storageReplicaCount();
}

TabletTaskExecutor.CreateTabletOption option = new TabletTaskExecutor.CreateTabletOption();
option.setEnableTabletCreationOptimization(table.isCloudNativeTableOrMaterializedView()
&& Config.lake_enable_tablet_creation_optimization);
option.setGtid(GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid());

try {
GlobalStateMgr.getCurrentState().getConsistencyChecker().addCreatingTableId(table.getId());
if (numReplicas > Config.create_table_max_serial_replicas) {
LOG.info("start to build {} partitions concurrently for table {}.{} with {} replicas",
partitions.size(), db.getFullName(), table.getName(), numReplicas);
TabletTaskExecutor.buildPartitionsConcurrently(
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId);
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option);
} else {
LOG.info("start to build {} partitions sequentially for table {}.{} with {} replicas",
partitions.size(), db.getFullName(), table.getName(), numReplicas);
TabletTaskExecutor.buildPartitionsSequentially(
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId);
db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option);
}
} finally {
GlobalStateMgr.getCurrentState().getConsistencyChecker().deleteCreatingTableId(table.getId());
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public enum RecoverySource {
private boolean createSchemaFile = true;
private boolean enableTabletCreationOptimization = false;
private final TTabletSchema tabletSchema;
private long gtid = 0;
private long timeoutMs = -1;

private CreateReplicaTask(Builder builder) {
Expand All @@ -105,6 +106,7 @@ private CreateReplicaTask(Builder builder) {
this.baseTabletId = builder.getBaseTabletId();
this.recoverySource = builder.getRecoverySource();
this.inRestoreMode = builder.isInRestoreMode();
this.gtid = builder.getGtid();
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -182,6 +184,7 @@ public TCreateTabletReq toThrift() {
createTabletReq.setTablet_type(tabletType);
createTabletReq.setCreate_schema_file(createSchemaFile);
createTabletReq.setEnable_tablet_creation_optimization(enableTabletCreationOptimization);
createTabletReq.setGtid(gtid);
return createTabletReq;
}

Expand Down Expand Up @@ -210,6 +213,7 @@ public static class Builder {
private boolean createSchemaFile = true;
private boolean enableTabletCreationOptimization = false;
private TTabletSchema tabletSchema;
private long gtid = 0;

private Builder() {
}
Expand Down Expand Up @@ -412,6 +416,15 @@ public Builder setTabletSchema(TTabletSchema tabletSchema) {
return this;
}

public long getGtid() {
return gtid;
}

public Builder setGtid(long gtid) {
this.gtid = gtid;
return this;
}

public CreateReplicaTask build() {
Preconditions.checkState(nodeId != INVALID_ID);
Preconditions.checkState(dbId != INVALID_ID);
Expand Down
Loading
Loading