Skip to content

Commit

Permalink
improve catalog (#21166)
Browse files Browse the repository at this point in the history
improve catalog replay

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
jiangxinmeng1 authored Jan 9, 2025
1 parent daf5ab7 commit 0738115
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 56 deletions.
113 changes: 75 additions & 38 deletions pkg/vm/engine/tae/catalog/catalogreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ func (catalog *Catalog) RelayFromSysTableObjects(
dataFactory DataFactory,
readFunc func(context.Context, *TableEntry, txnif.AsyncTxn) *containers.Batch,
sortFunc func([]containers.Vector, int) error,
) {
replayer ObjectListReplayer,
) (closeCB []func()) {
closeCB = make([]func(), 0)
db, err := catalog.GetDatabaseByID(pkgcatalog.MO_CATALOG_ID)
if err != nil {
panic(err)
Expand Down Expand Up @@ -265,7 +267,7 @@ func (catalog *Catalog) RelayFromSysTableObjects(

// replay database catalog
if dbBatch := readFunc(ctx, dbTbl, readTxn); dbBatch != nil {
defer dbBatch.Close()
closeCB = append(closeCB, dbBatch.Close)
catalog.ReplayMODatabase(ctx, txnNode, dbBatch)
}

Expand All @@ -274,27 +276,33 @@ func (catalog *Catalog) RelayFromSysTableObjects(
if err := sortFunc(tableBatch.Vecs, pkgcatalog.MO_TABLES_REL_ID_IDX); err != nil {
panic(err)
}
defer tableBatch.Close()
closeCB = append(closeCB, tableBatch.Close)
columnBatch := readFunc(ctx, columnTbl, readTxn)
if err := sortFunc(columnBatch.Vecs, pkgcatalog.MO_COLUMNS_ATT_RELNAME_ID_IDX); err != nil {
panic(err)
}
defer columnBatch.Close()
catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch)
closeCB = append(closeCB, columnBatch.Close)
catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch, replayer)
}
// logutil.Info(catalog.SimplePPString(common.PPL3))
return
}

func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch) {
dbids := vector.MustFixedColNoTypeCheck[uint64](bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).GetDownstreamVector())
tenantIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).GetDownstreamVector())
userIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).GetDownstreamVector())
roleIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).GetDownstreamVector())
createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector())
for i := 0; i < bat.Length(); i++ {
dbid := bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).Get(i).(uint64)
name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte))
tenantID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).Get(i).(uint32)
userID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).Get(i).(uint32)
roleID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).Get(i).(uint32)
createAt := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).Get(i).(types.Timestamp)
createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte))
datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte))
dbid := dbids[i]
name := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).GetDownstreamVector().GetStringAt(i)
tenantID := tenantIDs[i]
userID := userIDs[i]
roleID := roleIDs[i]
createAt := createAts[i]
createSql := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).GetDownstreamVector().GetStringAt(i)
datType := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).GetDownstreamVector().GetStringAt(i)
catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType)
}
}
Expand Down Expand Up @@ -336,34 +344,63 @@ func (catalog *Catalog) onReplayCreateDB(
db.InsertLocked(un)
}

func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch) {
func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch, replayer ObjectListReplayer) {
tids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).GetDownstreamVector())
dbids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).GetDownstreamVector())
versions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).GetDownstreamVector())
catalogVersions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).GetDownstreamVector())
partitioneds := vector.MustFixedColNoTypeCheck[int8](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).GetDownstreamVector())
roleIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).GetDownstreamVector())
userIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).GetDownstreamVector())
createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).GetDownstreamVector())
tenantIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).GetDownstreamVector())

colTids := vector.MustFixedColNoTypeCheck[uint64](colBat.GetVectorByName(pkgcatalog.SystemColAttr_RelID).GetDownstreamVector())
nullables := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_NullAbility).GetDownstreamVector())
isHiddens := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsHidden).GetDownstreamVector())
clusterbys := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsClusterBy).GetDownstreamVector())
autoIncrements := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsAutoIncrement).GetDownstreamVector())
idxes := vector.MustFixedColNoTypeCheck[int32](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Num).GetDownstreamVector())
seqNums := vector.MustFixedColNoTypeCheck[uint16](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).GetDownstreamVector())

schemaOffset := 0
for i := 0; i < tblBat.Length(); i++ {
tid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).Get(i).(uint64)
dbid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).Get(i).(uint64)
name := string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))
schema := NewEmptySchema(name)
schemaOffset = schema.ReadFromBatch(colBat, schemaOffset, tid)
schema.Comment = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))
schema.Version = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).Get(i).(uint32)
schema.CatalogVersion = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).Get(i).(uint32)
schema.Partitioned = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).Get(i).(int8)
schema.Partition = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))
schema.Relkind = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))
schema.Createsql = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))
schema.View = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))
schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)
schema.AcInfo = accessInfo{}
schema.AcInfo.RoleID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).Get(i).(uint32)
schema.AcInfo.UserID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).Get(i).(uint32)
schema.AcInfo.CreateAt = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).Get(i).(types.Timestamp)
schema.AcInfo.TenantID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).Get(i).(uint32)
extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)
schema.MustRestoreExtra(extra)
if err := schema.Finalize(true); err != nil {
panic(err)
startOffset := schemaOffset
tid := tids[i]
for i := startOffset; i < len(colTids); i++ {
if tid != colTids[i] {
schemaOffset = i
break
}
}
replayFn := func() {
dbid := dbids[i]
name := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).GetDownstreamVector().GetStringAt(i)
schema := NewEmptySchema(name)
schema.ReadFromBatch(
colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, startOffset, tid)
schema.Comment = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).GetDownstreamVector().GetStringAt(i)
schema.Version = versions[i]
schema.CatalogVersion = catalogVersions[i]
schema.Partitioned = partitioneds[i]
schema.Partition = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).GetDownstreamVector().GetStringAt(i)
schema.Relkind = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).GetDownstreamVector().GetStringAt(i)
schema.Createsql = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).GetDownstreamVector().GetStringAt(i)
schema.View = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).GetDownstreamVector().GetStringAt(i)
schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).GetDownstreamVector().GetBytesAt(i)
schema.AcInfo = accessInfo{}
schema.AcInfo.RoleID = roleIDs[i]
schema.AcInfo.UserID = userIDs[i]
schema.AcInfo.CreateAt = createAts[i]
schema.AcInfo.TenantID = tenantIDs[i]
extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).GetDownstreamVector().GetBytesAt(i)
schema.MustRestoreExtra(extra)
if err := schema.Finalize(true); err != nil {
panic(err)
}
catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF)
}
catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF)
replayer.Submit(dbids[i], replayFn)
}
}

Expand Down
40 changes: 23 additions & 17 deletions pkg/vm/engine/tae/catalog/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,15 @@ func (s *Schema) Marshal() (buf []byte, err error) {
return
}

func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint64) (next int) {
func (s *Schema) ReadFromBatch(
bat *containers.Batch,
tids []uint64,
nullables, isHiddens, clusterbys, autoIncrements []int8,
idxes []int32,
seqNums []uint16,
offset int,
targetTid uint64) (next int) {
nameVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelName)
tidVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelID)
defer func() {
slices.SortStableFunc(s.ColDefs, func(i, j *ColDef) int {
return i.Idx - j.Idx
Expand All @@ -615,39 +621,39 @@ func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint
if offset >= nameVec.Length() {
break
}
name := string(nameVec.Get(offset).([]byte))
id := tidVec.Get(offset).(uint64)
name := nameVec.GetDownstreamVector().GetStringAt(offset)
id := tids[offset]
// every schema has 1 rowid column as last column, if have one, break
if name != s.Name || targetTid != id {
break
}
def := new(ColDef)
def.Name = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte))
data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte)
def.Name = bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).GetDownstreamVector().GetStringAt(offset)
data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).GetDownstreamVector().GetBytesAt(offset)
types.Decode(data, &def.Type)
nullable := bat.GetVectorByName((pkgcatalog.SystemColAttr_NullAbility)).Get(offset).(int8)
nullable := nullables[offset]
def.NullAbility = !i82bool(nullable)
isHidden := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsHidden)).Get(offset).(int8)
isHidden := isHiddens[offset]
def.Hidden = i82bool(isHidden)
isClusterBy := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsClusterBy)).Get(offset).(int8)
isClusterBy := clusterbys[offset]
def.ClusterBy = i82bool(isClusterBy)
if def.ClusterBy {
def.SortKey = true
}
isAutoIncrement := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsAutoIncrement)).Get(offset).(int8)
isAutoIncrement := autoIncrements[offset]
def.AutoIncrement = i82bool(isAutoIncrement)
def.Comment = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte))
def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte)
def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte)
def.Idx = int(bat.GetVectorByName((pkgcatalog.SystemColAttr_Num)).Get(offset).(int32)) - 1
def.SeqNum = bat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).Get(offset).(uint16)
def.EnumValues = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte))
def.Comment = bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).GetDownstreamVector().GetStringAt(offset)
def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).GetDownstreamVector().GetBytesAt(offset)
def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).GetDownstreamVector().GetBytesAt(offset)
def.Idx = int(idxes[offset]) - 1
def.SeqNum = seqNums[offset]
def.EnumValues = bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).GetDownstreamVector().GetStringAt(offset)
s.NameMap[def.Name] = def.Idx
s.ColDefs = append(s.ColDefs, def)
if def.Name == PhyAddrColumnName {
def.PhyAddr = true
}
constraint := string(bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).Get(offset).([]byte))
constraint := bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).GetDownstreamVector().GetStringAt(offset)
if constraint == "p" {
def.SortKey = true
def.Primary = true
Expand Down
12 changes: 11 additions & 1 deletion pkg/vm/engine/tae/db/checkpoint/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,19 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) {
_, err2 = mergesort.SortBlockColumns(cols, pkidx, c.r.rt.VectorPool.Transient)
return
}
c.r.catalog.RelayFromSysTableObjects(
closeFn := c.r.catalog.RelayFromSysTableObjects(
c.r.ctx,
readTxn,
c.dataF,
tables.ReadSysTableBatch,
sortFunc,
c,
)
c.wg.Wait()
for _, fn := range closeFn {
fn()
}
c.resetObjectCountMap()
// logutil.Info(c.r.catalog.SimplePPString(common.PPL0))
return
}
Expand Down Expand Up @@ -481,6 +487,10 @@ func (c *CkpReplayer) Submit(tid uint64, replayFn func()) {
c.objectReplayWorker[workerOffset].Enqueue(replayFn)
}

func (c *CkpReplayer) resetObjectCountMap() {
c.objectCountMap = map[uint64]int{}
}

func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer {
replayer := &CkpReplayer{
r: r,
Expand Down
11 changes: 11 additions & 0 deletions pkg/vm/engine/tae/tables/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ func ReadSysTableBatch(ctx context.Context, entry *catalog.TableEntry, readTxn t
panic(fmt.Sprintf("unsupported sys table id %v", entry))
}
schema := entry.GetLastestSchema(false)
prefetchIt := entry.MakeObjectIt(false)
defer prefetchIt.Release()
for prefetchIt.Next() {
obj := prefetchIt.Item()
if !obj.IsVisible(readTxn) {
continue
}
for blkOffset := range obj.BlockCnt() {
obj.GetObjectData().Prefetch(uint16(blkOffset))
}
}
it := entry.MakeObjectIt(false)
defer it.Release()
colIdxes := make([]int, 0, len(schema.ColDefs))
Expand Down

0 comments on commit 0738115

Please sign in to comment.