Skip to content

Commit

Permalink
session: track LastCommitTS in SessionVars and check StartTS of a txn…
Browse files Browse the repository at this point in the history
… is larger
  • Loading branch information
b6g committed Nov 19, 2024
1 parent 058d947 commit 31193ac
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (t *mockTxn) StartTS() uint64 {
return uint64(0)
}

func (t *mockTxn) CommitTS() uint64 {
return 0
}

func (t *mockTxn) Get(ctx context.Context, k Key) ([]byte, error) {
return nil, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Transaction interface {
IsReadOnly() bool
// StartTS returns the transaction start timestamp.
StartTS() uint64
// CommitTS returns the transaction commit timestamp.
CommitTS() uint64
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
Expand Down
13 changes: 13 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,19 @@ func (s *session) CommitTxn(ctx context.Context) error {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

if err == nil && s.txn.lastCommitTS > 0 {
// lastCommitTS could be the same, e.g. when the txn is considered readonly
if s.txn.lastCommitTS < s.sessionVars.LastCommitTS {
logutil.BgLogger().Fatal("check lastCommitTS failed",
zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS),
zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS),
zap.String("sql", s.sessionVars.StmtCtx.OriginalSQL),
)
} else {
s.sessionVars.LastCommitTS = s.txn.lastCommitTS
}
}

// record the TTLInsertRows in the metric
metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount))

Expand Down
9 changes: 8 additions & 1 deletion pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type LazyTxn struct {

// mark the txn enables lazy uniqueness check in pessimistic transactions.
lazyUniquenessCheckEnabled bool

// commit ts of the last successful transaction, to ensure ordering of TS
lastCommitTS uint64
}

// GetTableInfo returns the cached index name.
Expand Down Expand Up @@ -431,7 +434,11 @@ func (txn *LazyTxn) Commit(ctx context.Context) error {
}
})

return txn.Transaction.Commit(ctx)
err := txn.Transaction.Commit(ctx)
if err == nil {
txn.lastCommitTS = txn.Transaction.CommitTS()
}
return err
}

// Rollback overrides the Transaction interface.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,9 @@ type SessionVars struct {
// SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request.
SnapshotTS uint64

// LastCommitTS is the commit_ts of the last successful transaction in this session.
LastCommitTS uint64

// TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS.
TxnReadTS *TxnReadTS

Expand Down
11 changes: 11 additions & 0 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table/temptable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tidb/pkg/util/tracing"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/transaction"
"go.uber.org/zap"
)

// baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation.
Expand Down Expand Up @@ -304,6 +306,15 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
sessVars.SetInTxn(true)
}

// verify start_ts is later than any previous commit_ts in the session
if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS {
logutil.BgLogger().Fatal("check session lastCommitTS failed",
zap.Uint64("lastCommitTS", sessVars.LastCommitTS),
zap.Uint64("startTS", sessVars.TxnCtx.StartTS),
zap.String("sql", sessVars.StmtCtx.OriginalSQL),
)
}

txn.SetVars(sessVars.KVVars)

p.SetOptionsOnTxnActive(txn)
Expand Down

0 comments on commit 31193ac

Please sign in to comment.