diff --git a/pkg/kv/interface_mock_test.go b/pkg/kv/interface_mock_test.go index d0241b9fd732d..7930806b0b544 100644 --- a/pkg/kv/interface_mock_test.go +++ b/pkg/kv/interface_mock_test.go @@ -75,6 +75,10 @@ func (t *mockTxn) StartTS() uint64 { return uint64(0) } +func (t *mockTxn) CommitTS() uint64 { + return 0 +} + func (t *mockTxn) Get(ctx context.Context, k Key) ([]byte, error) { return nil, nil } diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index b2ad11cb33dec..bfe3bbd86f5f1 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -251,6 +251,8 @@ type Transaction interface { IsReadOnly() bool // StartTS returns the transaction start timestamp. StartTS() uint64 + // CommitTS returns the transaction commit timestamp. + CommitTS() uint64 // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. Valid() bool diff --git a/pkg/session/session.go b/pkg/session/session.go index f381afdab8632..54ac5442b5dfa 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -929,6 +929,17 @@ func (s *session) CommitTxn(ctx context.Context) error { s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail) } + if err == nil && s.txn.lastCommitTS > 0 { + if s.txn.lastCommitTS <= s.sessionVars.LastCommitTS { + logutil.BgLogger().Fatal("check lastCommitTS failed", + zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), + zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), + ) + } else { + s.sessionVars.LastCommitTS = s.txn.lastCommitTS + } + } + // record the TTLInsertRows in the metric metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount)) diff --git a/pkg/session/txn.go b/pkg/session/txn.go index b65a969b28ea0..74f1ca79b5f0f 100644 --- a/pkg/session/txn.go +++ b/pkg/session/txn.go @@ -70,6 +70,9 @@ type LazyTxn struct { // mark the txn enables lazy uniqueness check in pessimistic transactions. lazyUniquenessCheckEnabled bool + + // commit ts of the last successful transaction, to ensure ordering of TS + lastCommitTS uint64 } // GetTableInfo returns the cached index name. @@ -431,7 +434,11 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { } }) - return txn.Transaction.Commit(ctx) + err := txn.Transaction.Commit(ctx) + if err == nil { + txn.lastCommitTS = txn.Transaction.CommitTS() + } + return err } // Rollback overrides the Transaction interface. diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 29d304d133403..e478df941b265 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -888,6 +888,9 @@ type SessionVars struct { // SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request. SnapshotTS uint64 + // LastCommitTS is the commit_ts of the last successful transaction in this session. + LastCommitTS uint64 + // TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS. TxnReadTS *TxnReadTS diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 29207b1ef42f5..6443007a0a634 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -34,11 +34,13 @@ import ( "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table/temptable" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tidb/pkg/util/tracing" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/txnkv/transaction" + "go.uber.org/zap" ) // baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation. @@ -304,6 +306,14 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { sessVars.SetInTxn(true) } + // verify start_ts is later than any previous commit_ts in the session + if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { + logutil.BgLogger().Fatal("check session lastCommitTS failed", + zap.Uint64("lastCommitTS", sessVars.LastCommitTS), + zap.Uint64("startTS", sessVars.TxnCtx.StartTS), + ) + } + txn.SetVars(sessVars.KVVars) p.SetOptionsOnTxnActive(txn)