From 795446b2be1f8d7d4c35180d2150505d2b6dfbb5 Mon Sep 17 00:00:00 2001 From: xiangguangyxg Date: Mon, 23 Dec 2024 20:07:41 +0800 Subject: [PATCH] [Feature] Support restoring from a cluster snapshot for shared-data mode (part 3, introduce gtid for tablet metadata) Signed-off-by: xiangguangyxg --- be/src/storage/lake/tablet_manager.cpp | 1 + be/src/storage/lake/transactions.cpp | 5 +- .../com/starrocks/alter/LakeRollupJob.java | 24 +++++++-- .../alter/LakeTableAlterMetaJobBase.java | 5 ++ .../alter/LakeTableSchemaChangeJob.java | 11 +++- .../alter/LakeTableSchemaChangeJobBase.java | 2 + .../com/starrocks/lake/TxnInfoHelper.java | 1 + .../com/starrocks/server/LocalMetastore.java | 9 +++- .../com/starrocks/task/CreateReplicaTask.java | 13 +++++ .../starrocks/task/TabletTaskExecutor.java | 53 +++++++++++++------ gensrc/proto/lake_types.proto | 3 ++ gensrc/thrift/AgentService.thrift | 2 + 12 files changed, 105 insertions(+), 24 deletions(-) diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index 8e0a57140e12a..789e4da22c75d 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -167,6 +167,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& req) { tablet_metadata_pb->set_version(kInitialVersion); tablet_metadata_pb->set_next_rowset_id(1); tablet_metadata_pb->set_cumulative_point(0); + tablet_metadata_pb->set_gtid(req.gtid); if (req.__isset.enable_persistent_index) { tablet_metadata_pb->set_enable_persistent_index(req.enable_persistent_index); diff --git a/be/src/storage/lake/transactions.cpp b/be/src/storage/lake/transactions.cpp index c003bbe2ab55d..6018539dd1a08 100644 --- a/be/src/storage/lake/transactions.cpp +++ b/be/src/storage/lake/transactions.cpp @@ -151,6 +151,7 @@ StatusOr publish_version(TabletManager* tablet_mgr, int64_t t auto new_metadata = std::make_shared(*metadata); new_metadata->set_version(new_version); + new_metadata->set_gtid(txns[0].gtid()); RETURN_IF_ERROR(tablet_mgr->put_tablet_metadata(new_metadata)); return new_metadata; @@ -171,7 +172,6 @@ StatusOr publish_version(TabletManager* tablet_mgr, int64_t t VLOG(2) << "publish version tablet_id: " << tablet_id << ", txns: " << txns << ", base_version: " << base_version << ", new_version: " << new_version; - auto commit_time = txns.back().commit_time(); auto new_metadata_path = tablet_mgr->tablet_metadata_location(tablet_id, new_version); auto cached_new_metadata = tablet_mgr->metacache()->lookup_tablet_metadata(new_metadata_path); if (cached_new_metadata != nullptr) { @@ -222,6 +222,8 @@ StatusOr publish_version(TabletManager* tablet_mgr, int64_t t std::unique_ptr log_applier; std::shared_ptr new_metadata; std::vector files_to_delete; + auto commit_time = txns.back().commit_time(); + auto gtid = txns.back().gtid(); // Apply txn logs int64_t alter_version = -1; @@ -322,6 +324,7 @@ StatusOr publish_version(TabletManager* tablet_mgr, int64_t t } new_metadata->set_commit_time(commit_time); + new_metadata->set_gtid(gtid); auto init_st = log_applier->init(); if (!init_st.ok()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java b/fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java index eaa1edb53524d..fc045772384af 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/LakeRollupJob.java @@ -147,19 +147,23 @@ public LakeRollupJob() { @Override protected void runPendingJob() throws AlterCancelException { + boolean enableTabletCreationOptimization = Config.lake_enable_tablet_creation_optimization; long numTablets = 0; AgentBatchTask batchTask = new AgentBatchTask(); MarkedCountDownLatch countDownLatch; try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) { LakeTable table = getTableOrThrow(db, tableId); Preconditions.checkState(table.getState() == OlapTable.OlapTableState.ROLLUP); - MaterializedIndexMeta index = table.getIndexMetaByIndexId(table.getBaseIndexId()); - for (MaterializedIndex rollupIdx : physicalPartitionIdToRollupIndex.values()) { - numTablets += rollupIdx.getTablets().size(); + if (enableTabletCreationOptimization) { + numTablets = physicalPartitionIdToRollupIndex.size(); + } else { + numTablets = physicalPartitionIdToRollupIndex.values().stream().map(MaterializedIndex::getTablets) + .mapToLong(List::size).sum(); } countDownLatch = new MarkedCountDownLatch<>((int) numTablets); + long gtid = getNextGtid(); for (Map.Entry entry : this.physicalPartitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); PhysicalPartition partition = table.getPhysicalPartition(partitionId); @@ -213,11 +217,17 @@ protected void runPendingJob() throws AlterCancelException { .setCompressionType(table.getCompressionType()) .setCreateSchemaFile(createSchemaFile) .setTabletSchema(tabletSchema) + .setEnableTabletCreationOptimization(enableTabletCreationOptimization) + .setGtid(gtid) .build(); // For each partition, the schema file is created only when the first Tablet is created createSchemaFile = false; batchTask.addTask(task); + + if (enableTabletCreationOptimization) { + break; + } } // end for rollupTablets } } @@ -231,6 +241,7 @@ protected void runPendingJob() throws AlterCancelException { throw new IllegalStateException("Table State doesn't equal to ROLLUP, it is " + table.getState() + "."); } watershedTxnId = getNextTransactionId(); + watershedGtid = getNextGtid(); addRollIndexToCatalog(table); } @@ -493,6 +504,7 @@ public void replay(AlterJobV2 lakeRollupJob) { this.timeoutMs = other.timeoutMs; this.watershedTxnId = other.watershedTxnId; + this.watershedGtid = other.watershedGtid; this.commitVersionMap = other.commitVersionMap; this.physicalPartitionIdToBaseRollupTabletIdMap = other.physicalPartitionIdToBaseRollupTabletIdMap; @@ -582,6 +594,10 @@ public static long peekNextTransactionId() { return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId(); } + public static long getNextGtid() { + return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid(); + } + void addRollIndexToCatalog(@NotNull LakeTable tbl) { for (Partition partition : tbl.getPartitions()) { for (PhysicalPartition physicalPartition : partition.getSubPartitions()) { @@ -657,6 +673,7 @@ private boolean publishVersion() { rollUpTxnInfo.combinedTxnLog = false; rollUpTxnInfo.commitTime = finishedTimeMs / 1000; rollUpTxnInfo.txnType = TxnTypePB.TXN_NORMAL; + rollUpTxnInfo.gtid = watershedGtid; // publish rollup tablets Utils.publishVersion(physicalPartitionIdToRollupIndex.get(partitionId).getTablets(), rollUpTxnInfo, 1, commitVersion, warehouseId); @@ -666,6 +683,7 @@ private boolean publishVersion() { originTxnInfo.combinedTxnLog = false; originTxnInfo.commitTime = finishedTimeMs / 1000; originTxnInfo.txnType = TxnTypePB.TXN_EMPTY; + originTxnInfo.gtid = watershedGtid; // publish origin tablets Utils.publishVersion(allOtherPartitionTablets, originTxnInfo, commitVersion - 1, commitVersion, warehouseId); diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableAlterMetaJobBase.java b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableAlterMetaJobBase.java index 553f639deca36..e52de526ccbdd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableAlterMetaJobBase.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableAlterMetaJobBase.java @@ -64,6 +64,8 @@ public abstract class LakeTableAlterMetaJobBase extends AlterJobV2 { private static final Logger LOG = LogManager.getLogger(LakeTableAlterMetaJobBase.class); @SerializedName(value = "watershedTxnId") private long watershedTxnId = -1; + @SerializedName(value = "watershedGtid") + private long watershedGtid = -1; // PhysicalPartitionId -> indexId -> MaterializedIndex @SerializedName(value = "partitionIndexMap") private Table physicalPartitionIndexMap = HashBasedTable.create(); @@ -107,6 +109,7 @@ protected void runPendingJob() throws AlterCancelException { if (this.watershedTxnId == -1) { this.watershedTxnId = globalStateMgr.getGlobalTransactionMgr().getTransactionIDGenerator() .getNextTransactionId(); + this.watershedGtid = globalStateMgr.getGtidGenerator().nextGtid(); GlobalStateMgr.getCurrentState().getEditLog().logAlterJob(this); } @@ -259,6 +262,7 @@ boolean publishVersion() { txnInfo.combinedTxnLog = false; txnInfo.commitTime = finishedTimeMs / 1000; txnInfo.txnType = TxnTypePB.TXN_NORMAL; + txnInfo.gtid = watershedGtid; for (long partitionId : physicalPartitionIndexMap.rowKeySet()) { long commitVersion = commitVersionMap.get(partitionId); Map dirtyIndexMap = physicalPartitionIndexMap.row(partitionId); @@ -475,6 +479,7 @@ public void replay(AlterJobV2 replayedJob) { this.physicalPartitionIndexMap = other.physicalPartitionIndexMap; this.watershedTxnId = other.watershedTxnId; + this.watershedGtid = other.watershedGtid; this.commitVersionMap = other.commitVersionMap; restoreState(other); diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java index 7d593ca1c6bb6..d890635d6ee39 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java @@ -291,6 +291,10 @@ public static long peekNextTransactionId() { return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionIDGenerator().peekNextTransactionId(); } + public static long getNextGtid() { + return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid(); + } + @VisibleForTesting public void setIsCancelling(boolean isCancelling) { this.isCancelling.set(isCancelling); @@ -320,7 +324,7 @@ protected void runPendingJob() throws AlterCancelException { try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) { OlapTable table = getTableOrThrow(db, tableId); Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE); - MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(table.getBaseIndexId()); + if (enableTabletCreationOptimization) { numTablets = physicalPartitionIndexMap.size(); } else { @@ -330,6 +334,7 @@ protected void runPendingJob() throws AlterCancelException { countDownLatch = new MarkedCountDownLatch<>((int) numTablets); createReplicaLatch = countDownLatch; long baseIndexId = table.getBaseIndexId(); + long gtid = getNextGtid(); for (long physicalPartitionId : physicalPartitionIndexMap.rowKeySet()) { PhysicalPartition physicalPartition = table.getPhysicalPartition(physicalPartitionId); Preconditions.checkState(physicalPartition != null); @@ -389,6 +394,7 @@ protected void runPendingJob() throws AlterCancelException { .setCreateSchemaFile(createSchemaFile) .setTabletSchema(tabletSchema) .setEnableTabletCreationOptimization(enableTabletCreationOptimization) + .setGtid(gtid) .build(); // For each partition, the schema file is created only when the first Tablet is created createSchemaFile = false; @@ -412,6 +418,7 @@ protected void runPendingJob() throws AlterCancelException { OlapTable table = getTableOrThrow(db, tableId); Preconditions.checkState(table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE); watershedTxnId = getNextTransactionId(); + watershedGtid = getNextGtid(); addShadowIndexToCatalog(table, watershedTxnId); } @@ -651,6 +658,7 @@ boolean publishVersion() { txnInfo.combinedTxnLog = false; txnInfo.txnType = TxnTypePB.TXN_NORMAL; txnInfo.commitTime = finishedTimeMs / 1000; + txnInfo.gtid = watershedGtid; for (long partitionId : physicalPartitionIndexMap.rowKeySet()) { long commitVersion = commitVersionMap.get(partitionId); Map shadowIndexMap = physicalPartitionIndexMap.row(partitionId); @@ -753,6 +761,7 @@ public void replay(AlterJobV2 replayedJob) { this.indexChange = other.indexChange; this.indexes = other.indexes; this.watershedTxnId = other.watershedTxnId; + this.watershedGtid = other.watershedGtid; this.startTime = other.startTime; this.commitVersionMap = other.commitVersionMap; // this.schemaChangeBatchTask = other.schemaChangeBatchTask; diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJobBase.java b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJobBase.java index 1163c9f2a5f6d..a770517579b4a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJobBase.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJobBase.java @@ -45,6 +45,8 @@ public abstract class LakeTableSchemaChangeJobBase extends AlterJobV2 { // The job will wait all transactions before this txn id finished, then send the rollup tasks. @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; + @SerializedName(value = "watershedGtid") + protected long watershedGtid = -1; public LakeTableSchemaChangeJobBase(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/TxnInfoHelper.java b/fe/fe-core/src/main/java/com/starrocks/lake/TxnInfoHelper.java index 4341dabfe3c4b..b45b3efa848b3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/TxnInfoHelper.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/TxnInfoHelper.java @@ -33,6 +33,7 @@ public static TxnInfoPB fromTransactionState(TransactionState state) { } else { infoPB.forcePublish = false; } + infoPB.setGtid(state.getGlobalTransactionId()); return infoPB; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index f2565d7f8376f..4eedb8163b722 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -1791,18 +1791,23 @@ void buildPartitions(Database db, OlapTable table, List parti numReplicas += partition.storageReplicaCount(); } + TabletTaskExecutor.CreateTabletOption option = new TabletTaskExecutor.CreateTabletOption(); + option.setEnableTabletCreationOptimization(table.isCloudNativeTableOrMaterializedView() + && Config.lake_enable_tablet_creation_optimization); + option.setGtid(GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid()); + try { GlobalStateMgr.getCurrentState().getConsistencyChecker().addCreatingTableId(table.getId()); if (numReplicas > Config.create_table_max_serial_replicas) { LOG.info("start to build {} partitions concurrently for table {}.{} with {} replicas", partitions.size(), db.getFullName(), table.getName(), numReplicas); TabletTaskExecutor.buildPartitionsConcurrently( - db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId); + db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option); } else { LOG.info("start to build {} partitions sequentially for table {}.{} with {} replicas", partitions.size(), db.getFullName(), table.getName(), numReplicas); TabletTaskExecutor.buildPartitionsSequentially( - db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId); + db.getId(), table, partitions, numReplicas, numAliveNodes, warehouseId, option); } } finally { GlobalStateMgr.getCurrentState().getConsistencyChecker().deleteCreatingTableId(table.getId()); diff --git a/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java index 6d4f921ca8866..fb8c7c813f74e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java @@ -84,6 +84,7 @@ public enum RecoverySource { private boolean createSchemaFile = true; private boolean enableTabletCreationOptimization = false; private final TTabletSchema tabletSchema; + private long gtid = 0; private long timeoutMs = -1; private CreateReplicaTask(Builder builder) { @@ -105,6 +106,7 @@ private CreateReplicaTask(Builder builder) { this.baseTabletId = builder.getBaseTabletId(); this.recoverySource = builder.getRecoverySource(); this.inRestoreMode = builder.isInRestoreMode(); + this.gtid = builder.getGtid(); } public static Builder newBuilder() { @@ -182,6 +184,7 @@ public TCreateTabletReq toThrift() { createTabletReq.setTablet_type(tabletType); createTabletReq.setCreate_schema_file(createSchemaFile); createTabletReq.setEnable_tablet_creation_optimization(enableTabletCreationOptimization); + createTabletReq.setGtid(gtid); return createTabletReq; } @@ -210,6 +213,7 @@ public static class Builder { private boolean createSchemaFile = true; private boolean enableTabletCreationOptimization = false; private TTabletSchema tabletSchema; + private long gtid = 0; private Builder() { } @@ -412,6 +416,15 @@ public Builder setTabletSchema(TTabletSchema tabletSchema) { return this; } + public long getGtid() { + return gtid; + } + + public Builder setGtid(long gtid) { + this.gtid = gtid; + return this; + } + public CreateReplicaTask build() { Preconditions.checkState(nodeId != INVALID_ID); Preconditions.checkState(dbId != INVALID_ID); diff --git a/fe/fe-core/src/main/java/com/starrocks/task/TabletTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/TabletTaskExecutor.java index e37d8d4e6a668..cb211c8443002 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/TabletTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/TabletTaskExecutor.java @@ -59,21 +59,41 @@ public class TabletTaskExecutor { private static final Logger LOG = LogManager.getLogger(TabletTaskExecutor.class); + public static class CreateTabletOption { + private boolean enableTabletCreationOptimization; + private long gtid; + + public boolean isEnableTabletCreationOptimization() { + return enableTabletCreationOptimization; + } + + public void setEnableTabletCreationOptimization(boolean enableTabletCreationOptimization) { + this.enableTabletCreationOptimization = enableTabletCreationOptimization; + } + + public long getGtid() { + return gtid; + } + + public void setGtid(long gtid) { + this.gtid = gtid; + } + } + public static void buildPartitionsSequentially(long dbId, OlapTable table, List partitions, int numReplicas, - int numBackends, long warehouseId) throws DdlException { + int numBackends, long warehouseId, + CreateTabletOption option) throws DdlException { // Try to bundle at least 200 CreateReplicaTask's in a single AgentBatchTask. // The number 200 is just an experiment value that seems to work without obvious problems, feel free to // change it if you have a better choice. long start = System.currentTimeMillis(); int avgReplicasPerPartition = numReplicas / partitions.size(); int partitionGroupSize = Math.max(1, numBackends * 200 / Math.max(1, avgReplicasPerPartition)); - boolean enableTabletCreationOptimization = table.isCloudNativeTableOrMaterializedView() - && Config.lake_enable_tablet_creation_optimization; for (int i = 0; i < partitions.size(); i += partitionGroupSize) { int endIndex = Math.min(partitions.size(), i + partitionGroupSize); List tasks = buildCreateReplicaTasks(dbId, table, partitions.subList(i, endIndex), - warehouseId, enableTabletCreationOptimization); + warehouseId, option); int partitionCount = endIndex - i; int indexCountPerPartition = partitions.get(i).getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).size(); int timeout = Config.tablet_create_timeout_second * countMaxTasksPerBackend(tasks); @@ -99,16 +119,15 @@ public static void buildPartitionsSequentially(long dbId, OlapTable table, List< public static void buildPartitionsConcurrently(long dbId, OlapTable table, List partitions, int numReplicas, - int numBackends, long warehouseId) throws DdlException { + int numBackends, long warehouseId, + CreateTabletOption option) throws DdlException { long start = System.currentTimeMillis(); int timeout = Math.max(1, numReplicas / numBackends) * Config.tablet_create_timeout_second; int numIndexes = partitions.stream().mapToInt( partition -> partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).size()).sum(); int maxTimeout = numIndexes * Config.max_create_table_timeout_second; long maxWaitTimeSeconds = Math.min(timeout, maxTimeout); - boolean enableTabletCreationOptimization = table.isCloudNativeTableOrMaterializedView() - && Config.lake_enable_tablet_creation_optimization; - if (enableTabletCreationOptimization) { + if (option.isEnableTabletCreationOptimization()) { numReplicas = numIndexes; } MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch<>(numReplicas); @@ -122,7 +141,7 @@ public static void buildPartitionsConcurrently(long dbId, OlapTable table, List< break; } List tasks = buildCreateReplicaTasks(dbId, table, partition, warehouseId, - enableTabletCreationOptimization); + option); for (CreateReplicaTask task : tasks) { List signatures = taskSignatures.computeIfAbsent(task.getBackendId(), k -> new ArrayList<>()); @@ -173,24 +192,23 @@ public static void buildPartitionsConcurrently(long dbId, OlapTable table, List< } private static List buildCreateReplicaTasks(long dbId, OlapTable table, List partitions, - long warehouseId, boolean enableTabletCreationOptimization) + long warehouseId, CreateTabletOption option) throws DdlException { List tasks = new ArrayList<>(); for (PhysicalPartition partition : partitions) { tasks.addAll( - buildCreateReplicaTasks(dbId, table, partition, warehouseId, enableTabletCreationOptimization)); + buildCreateReplicaTasks(dbId, table, partition, warehouseId, option)); } return tasks; } private static List buildCreateReplicaTasks(long dbId, OlapTable table, PhysicalPartition physicalPartition, - long warehouseId, boolean enableTabletCreationOptimization) + long warehouseId, CreateTabletOption option) throws DdlException { ArrayList tasks = new ArrayList<>((int) physicalPartition.storageReplicaCount()); for (MaterializedIndex index : physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) { - tasks.addAll(buildCreateReplicaTasks(dbId, table, physicalPartition, index, warehouseId, - enableTabletCreationOptimization)); + tasks.addAll(buildCreateReplicaTasks(dbId, table, physicalPartition, index, warehouseId, option)); } return tasks; } @@ -200,7 +218,7 @@ private static List buildCreateReplicaTasks(long dbId, PhysicalPartition physicalPartition, MaterializedIndex index, long warehouseId, - boolean enableTabletCreationOptimization) { + CreateTabletOption option) { LOG.info("build create replica tasks for index {} db {} table {} partition {}", index, dbId, table.getId(), physicalPartition); boolean isCloudNativeTable = table.isCloudNativeTableOrMaterializedView(); @@ -258,13 +276,14 @@ private static List buildCreateReplicaTasks(long dbId, .setCompressionLevel(table.getCompressionLevel()) .setTabletSchema(tabletSchema) .setCreateSchemaFile(createSchemaFile) - .setEnableTabletCreationOptimization(enableTabletCreationOptimization) + .setEnableTabletCreationOptimization(option.isEnableTabletCreationOptimization()) + .setGtid(option.getGtid()) .build(); tasks.add(task); createSchemaFile = false; } - if (enableTabletCreationOptimization) { + if (option.isEnableTabletCreationOptimization()) { break; } } diff --git a/gensrc/proto/lake_types.proto b/gensrc/proto/lake_types.proto index e122f6dd44695..fec88c217d947 100644 --- a/gensrc/proto/lake_types.proto +++ b/gensrc/proto/lake_types.proto @@ -153,6 +153,8 @@ message TabletMetadataPB { map historical_schemas = 17; // rowset_id -> schema_id map rowset_to_schema = 18; + // global transaction id + optional int64 gtid = 19 [default=0]; } message MetadataUpdateInfoPB { @@ -233,5 +235,6 @@ message TxnInfoPB { optional TxnTypePB txn_type = 4; optional bool force_publish = 5; // only used for compaction optional bool rebuild_pindex = 6; + optional int64 gtid = 7 [default=0]; }; diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 0bb5f17045dcf..24d878a0289dd 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -120,6 +120,8 @@ struct TCreateTabletReq { 22: optional bool enable_tablet_creation_optimization = false; // The timeout FE will wait for the tablet to be created. 23: optional i64 timeout_ms = -1; + // Global transaction id + 24: optional i64 gtid = 0; } struct TDropTabletReq {