Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export commitTS of KVTxn #1489

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (s *testOnePCSuite) Test1PC() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
// Check keys are committed with the same version
s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5)
s.mustGetFromSnapshot(txn.CommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.CommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k5)

// Overwriting in MVCC
v5New := []byte("v5new")
Expand All @@ -129,8 +129,8 @@ func (s *testOnePCSuite) Test1PC() {
s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.CommitTS()-1, k5, v5)

// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5}
Expand Down Expand Up @@ -175,8 +175,8 @@ func (s *testOnePCSuite) Test1PCIsolation() {
s.mustGetFromTxn(txn2, k, v1)
s.Nil(txn2.Rollback())

s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1)
s.mustGetFromSnapshot(txn.CommitTS(), k, v2)
s.mustGetFromSnapshot(txn.CommitTS()-1, k, v1)
}

func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
Expand Down
24 changes: 12 additions & 12 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
txn.StartAggressiveLocking()

s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())
s.Greater(txn0.CommitTS(), txn.StartTS())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
Expand All @@ -1087,9 +1087,9 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))

s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn0.CommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.Equal(txn0.CommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)

Expand Down Expand Up @@ -1269,12 +1269,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(insertPessimisticLock(lockCtx, "k8"))
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
// Update forUpdateTS to simulate a pessimistic retry.
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS())
s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS())
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
mustAlreadyExist(insertPessimisticLock(lockCtx, "k7"))
s.NoError(insertPessimisticLock(lockCtx, "k8"))
Expand All @@ -1291,7 +1291,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 := s.begin()
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS := txn0.GetCommitTS()
txn0CommitTS := txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand All @@ -1312,7 +1312,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 = s.begin()
s.NoError(txn0.Delete([]byte("k1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS = txn0.GetCommitTS()
txn0CommitTS = txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand Down Expand Up @@ -1461,13 +1461,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() {
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))

if firstAttemptLockedWithConflict {
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
}

if firstAttemptLockedWithConflict {
forUpdateTS = txn2.GetCommitTS() + 1
forUpdateTS = txn2.CommitTS() + 1
} else {
forUpdateTS++
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/assertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic boo
err = prepareTxn.Commit(context.Background())
s.Nil(err)
prepareStartTS := prepareTxn.GetCommitter().GetStartTS()
prepareCommitTS := prepareTxn.GetCommitTS()
prepareCommitTS := prepareTxn.CommitTS()

// A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only
// the last key.
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool)
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}

func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
Expand Down Expand Up @@ -440,8 +440,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Nil(err)
err = t1.Commit(ctx)
s.Nil(err)
commitTS1 := t1.GetCommitTS()
commitTS2 := t2.GetCommitTS()
commitTS1 := t1.CommitTS()
commitTS2 := t2.CommitTS()
s.Less(commitTS2, commitTS1)
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testIsolationSuite) SetWithRetry(k, v []byte) writeRecord {
if err == nil {
return writeRecord{
startTS: txn.StartTS(),
commitTS: txn.GetCommitTS(),
commitTS: txn.CommitTS(),
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) {
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}

func (s *testLockSuite) prepareAlphabetLocks() {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
info := simulatedTxn.GetAggressiveLockingKeysInfo()
s.Equal(1, len(info))
s.Equal(k1, info[0].Key())
s.Equal(txn2.GetCommitTS(), info[0].ActualLockForUpdateTS())
s.Equal(txn2.CommitTS(), info[0].ActualLockForUpdateTS())

simulatedTxn.DoneAggressiveLocking(context.Background())
defer func() {
Expand All @@ -1324,12 +1324,12 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
s.Error(err)
s.Regexp("[pP]essimistic ?[lL]ock ?[nN]ot ?[fF]ound", err.Error())

snapshot := s.store.GetSnapshot(txn2.GetCommitTS())
snapshot := s.store.GetSnapshot(txn2.CommitTS())
v, err := snapshot.Get(context.Background(), k1)
s.NoError(err)
s.Equal(v2, v)

snapshot = s.store.GetSnapshot(txn2.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn2.CommitTS() - 1)
_, err = snapshot.Get(context.Background(), k1)
s.Equal(tikverr.ErrNotExist, err)

Expand Down Expand Up @@ -1369,15 +1369,15 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {

s.NoError(txn.Commit(ctx))

snapshot = s.store.GetSnapshot(txn.GetCommitTS())
snapshot = s.store.GetSnapshot(txn.CommitTS())
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v1, v)
v, err = snapshot.Get(context.Background(), k3)
s.NoError(err)
s.Equal(v1, v)

snapshot = s.store.GetSnapshot(txn.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn.CommitTS() - 1)
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v2, v)
Expand Down
5 changes: 0 additions & 5 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func (txn TxnProbe) SetStartTS(ts uint64) {
txn.startTS = ts
}

// GetCommitTS returns the commit ts.
func (txn TxnProbe) GetCommitTS() uint64 {
return txn.commitTS
}

// GetUnionStore returns transaction's embedded unionstore.
func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore {
return txn.us
Expand Down
4 changes: 4 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,10 @@ func (txn *KVTxn) StartTS() uint64 {
return txn.startTS
}

func (txn *KVTxn) CommitTS() uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still some tests using txn.GetCommitTS in 1pc_test.go.

cd integration_tests && go test ./...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@you06 Seems to be an enhancement to add a check before and after the ts allocation, how about filing an issue for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration_tests are fixed.

Sounds a good idea to check store's last_commit_ts is smaller than start_ts, because store's last_commit_ts is not directly allocated from pd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about filing an issue for it?

@cfzjywxk It's mentioned in pingcap/tidb#57165 IMO.

In theory, all read requests occurring after the write transaction should use a read_ts greater than the previous known transaction's commit_ts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (txn *KVTxn) CommitTS() uint64 {
// CommitTS returns the commit timestamp of the already committed transaction, zero if it's not committed yet.
func (txn *KVTxn) CommitTS() uint64 {

Exported functions need explicity comments from the ci & lint.

return txn.commitTS
}

// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
func (txn *KVTxn) Valid() bool {
Expand Down
Loading