Skip to content

Commit

Permalink
Merge branch 'main' into t8
Browse files Browse the repository at this point in the history
  • Loading branch information
huby2358 authored Jan 3, 2025
2 parents 238b6fe + a9be293 commit bad04d7
Show file tree
Hide file tree
Showing 86 changed files with 1,442 additions and 918 deletions.
48 changes: 48 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_cdc_watermark,
upg_mo_pubs_add_account_name,
upg_mo_subs_add_sub_account_name,
upg_mo_subs_add_pub_account_id,
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Expand All @@ -37,3 +40,48 @@ var upg_mo_cdc_watermark = versions.UpgradeEntry{
return !colInfo.IsExits, nil
},
}

var upg_mo_pubs_add_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_PUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_pubs add column account_name varchar(300) after account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_PUBS, "account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_pubs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.account_id = t2.account_id SET t1.account_name = t2.account_name",
}

var upg_mo_subs_add_sub_account_name = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column sub_account_name VARCHAR(300) NOT NULL after sub_account_id",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "sub_account_name")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.sub_account_id = t2.account_id SET t1.sub_account_name = t2.account_name",
}

var upg_mo_subs_add_pub_account_id = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_SUBS,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_subs add column pub_account_id INT NOT NULL after sub_time",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_SUBS, "pub_account_id")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.pub_account_name = t2.account_name SET t1.pub_account_id = t2.account_id",
}
78 changes: 46 additions & 32 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type tableReader struct {
sinker Sinker
wMarkUpdater IWatermarkUpdater
tick *time.Ticker
resetWatermarkFunc func(*DbTableInfo) error
initSnapshotSplitTxn bool
runningReaders *sync.Map
startTs, endTs types.TS
noFull bool

tableDef *plan.TableDef
insTsColIdx, insCompositedPkColIdx int
Expand All @@ -60,9 +61,10 @@ var NewTableReader = func(
sinker Sinker,
wMarkUpdater IWatermarkUpdater,
tableDef *plan.TableDef,
resetWatermarkFunc func(*DbTableInfo) error,
initSnapshotSplitTxn bool,
runningReaders *sync.Map,
startTs, endTs types.TS,
noFull bool,
) Reader {
reader := &tableReader{
cnTxnClient: cnTxnClient,
Expand All @@ -73,9 +75,11 @@ var NewTableReader = func(
sinker: sinker,
wMarkUpdater: wMarkUpdater,
tick: time.NewTicker(200 * time.Millisecond),
resetWatermarkFunc: resetWatermarkFunc,
initSnapshotSplitTxn: initSnapshotSplitTxn,
runningReaders: runningReaders,
startTs: startTs,
endTs: endTs,
noFull: noFull,
tableDef: tableDef,
}

Expand All @@ -95,9 +99,7 @@ func (reader *tableReader) Close() {
reader.sinker.Close()
}

func (reader *tableReader) Run(
ctx context.Context,
ar *ActiveRoutine) {
func (reader *tableReader) Run(ctx context.Context, ar *ActiveRoutine) {
var err error
logutil.Infof("cdc tableReader(%v).Run: start", reader.info)
defer func() {
Expand Down Expand Up @@ -125,33 +127,24 @@ func (reader *tableReader) Run(

if err = reader.readTable(ctx, ar); err != nil {
logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err)

// if stale read, try to restart reader
if moerr.IsMoErrCode(err, moerr.ErrStaleRead) {
// reset sinker
reader.sinker.Reset()
// reset watermark
if err = reader.resetWatermarkFunc(reader.info); err != nil {
logutil.Errorf("cdc tableReader(%v) restart failed, err: %v", reader.info, err)
return
}
logutil.Errorf("cdc tableReader(%v) restart successfully", reader.info)
continue
}

logutil.Errorf("cdc tableReader(%v) err is not stale read, quit", reader.info)
return
}
}
}

func (reader *tableReader) readTable(
var readTableWithTxn = func(
reader *tableReader,
ctx context.Context,
ar *ActiveRoutine) (err error) {
txnOp client.TxnOperator,
packer *types.Packer,
ar *ActiveRoutine,
) (err error) {
return reader.readTableWithTxn(ctx, txnOp, packer, ar)
}

var txnOp client.TxnOperator
func (reader *tableReader) readTable(ctx context.Context, ar *ActiveRoutine) (err error) {
//step1 : create an txnOp
txnOp, err = GetTxnOp(ctx, reader.cnEngine, reader.cnTxnClient, "readMultipleTables")
txnOp, err := GetTxnOp(ctx, reader.cnEngine, reader.cnTxnClient, "readMultipleTables")
if err != nil {
return err
}
Expand All @@ -164,8 +157,7 @@ func (reader *tableReader) readTable(
ExitRunSql(txnOp)
}()

err = GetTxn(ctx, reader.cnEngine, txnOp)
if err != nil {
if err = GetTxn(ctx, reader.cnEngine, txnOp); err != nil {
return err
}

Expand All @@ -174,11 +166,25 @@ func (reader *tableReader) readTable(
defer put.Put()

//step2 : read table
err = reader.readTableWithTxn(
ctx,
txnOp,
packer,
ar)
err = readTableWithTxn(reader, ctx, txnOp, packer, ar)
// if stale read, try to reset watermark
if moerr.IsMoErrCode(err, moerr.ErrStaleRead) {
if !reader.noFull && !reader.startTs.IsEmpty() {
err = moerr.NewInternalErrorf(ctx, "cdc tableReader(%v) stale read, and startTs(%v) is set, end", reader.info, reader.startTs)
return
}

// reset sinker
reader.sinker.Reset()
// reset watermark to startTs, will read from the beginning at next round
watermark := reader.startTs
if reader.noFull {
watermark = types.TimestampToTS(txnOp.SnapshotTS())
}
reader.wMarkUpdater.UpdateMem(reader.info.SourceDbName, reader.info.SourceTblName, watermark)
logutil.Infof("cdc tableReader(%v) reset watermark success", reader.info)
err = nil
}
return
}

Expand All @@ -199,7 +205,15 @@ func (reader *tableReader) readTableWithTxn(
// from = last wmark
// to = txn operator snapshot ts
fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceDbName, reader.info.SourceTblName)
if !reader.endTs.IsEmpty() && fromTs.GE(&reader.endTs) {
logutil.Debugf("current watermark(%v) >= endTs(%v), end", fromTs, reader.endTs)
return
}
toTs := types.TimestampToTS(GetSnapshotTS(txnOp))
if !reader.endTs.IsEmpty() && toTs.GT(&reader.endTs) {
toTs = reader.endTs
}

start := time.Now()
changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp)
v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds())
Expand Down
Loading

0 comments on commit bad04d7

Please sign in to comment.