Skip to content

Commit

Permalink
Optimize the rollback logic if the lock data operation fails during r…
Browse files Browse the repository at this point in the history
…equire ack and add the shift lock data operation
  • Loading branch information
snower committed Apr 25, 2024
1 parent a833a23 commit a2fd861
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 75 deletions.
13 changes: 11 additions & 2 deletions client/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestLock_WithData(t *testing.T) {
t.Errorf("Lock LockWithData Append Result LockData Fail %v", result.GetLockData())
return
}
ulock1 = client.Lock(testString2Key("TestData4"), 50, 0)
ulock1 = client.Lock(testString2Key("TestData4"), 50, 10)
ulock1.SetCount(10)
result, err = ulock1.LockWithData(protocol.NewLockCommandDataAppendString("bbb"))
if err != nil {
Expand All @@ -580,7 +580,7 @@ func TestLock_WithData(t *testing.T) {
t.Errorf("Lock LockWithData1 Append Expried Result LockData Fail %v", result.GetLockData())
return
}
result, err = lock.Unlock()
result, err = lock.UnlockWithData(protocol.NewLockCommandDataShiftData(2))
if err != nil {
t.Errorf("Lock Unlock Append Fail %v", err)
return
Expand All @@ -589,6 +589,15 @@ func TestLock_WithData(t *testing.T) {
t.Errorf("Lock Unlock Append Result LockData Fail %v", result.GetLockData())
return
}
result, err = ulock1.Unlock()
if err != nil {
t.Errorf("Lock Unlock1 Append Fail %v", err)
return
}
if result.GetLockData() == nil || result.GetLockData().GetStringData() != "abbb" {
t.Errorf("Lock Unlock1 Append Result LockData Fail %v", result.GetLockData())
return
}
})
}

Expand Down
24 changes: 24 additions & 0 deletions protocol/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
LOCK_DATA_COMMAND_TYPE_UNSET = 1
LOCK_DATA_COMMAND_TYPE_INCR = 2
LOCK_DATA_COMMAND_TYPE_APPEND = 3
LOCK_DATA_COMMAND_TYPE_SHIFT = 4
)

var ERROR_MSG []string = []string{
Expand Down Expand Up @@ -390,6 +391,11 @@ func NewLockCommandDataAppendString(data string) *LockCommandData {
return NewLockCommandDataFromString(data, LOCK_DATA_COMMAND_TYPE_APPEND, 0)
}

func NewLockCommandDataShiftData(lengthValue uint32) *LockCommandData {
return &LockCommandData{[]byte{6, 0, 0, 0, LOCK_DATA_COMMAND_TYPE_SHIFT, 0,
byte(lengthValue), byte(lengthValue >> 8), byte(lengthValue >> 16), byte(lengthValue >> 24)}, LOCK_DATA_COMMAND_TYPE_SHIFT, 0}
}

func (self *LockCommandData) GetBytesData() []byte {
if self.Data == nil || self.CommandType == LOCK_DATA_COMMAND_TYPE_UNSET {
return nil
Expand Down Expand Up @@ -422,6 +428,24 @@ func (self *LockCommandData) GetIncrValue() int64 {
return value
}

func (self *LockCommandData) GetShiftLengthValue() uint32 {
if self.Data == nil || self.CommandType == LOCK_DATA_COMMAND_TYPE_UNSET {
return 0
}
value := uint32(0)
for i := 0; i < 4; i++ {
if i+6 >= len(self.Data) {
break
}
if i > 0 {
value |= uint32(self.Data[i+6]) << (i * 8)
} else {
value |= uint32(self.Data[i+6])
}
}
return value
}

type LockCommand struct {
Command
Flag uint8
Expand Down
65 changes: 17 additions & 48 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,9 @@ func (self *LockDB) doTimeOut(lock *Lock, forcedExpried bool) {
if lockLocked > 0 {
lockManager.locked -= uint32(lockLocked)
lockManager.RemoveLock(lock)
if lock.ackCount != 0xff {
lockManager.ProcessRecoverLockData(lock)
}
if lock.isAof {
_ = lockManager.PushUnLockAof(lockManager.dbId, lock, lockCommand, nil, false, AOF_FLAG_TIMEOUTED)
}
Expand Down Expand Up @@ -1519,7 +1522,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_UPDATE_WHEN_LOCKED != 0 {
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, currentLock, false)
}
if currentLock.longWaitIndex > 0 {
self.RemoveLongExpried(currentLock)
Expand Down Expand Up @@ -1560,7 +1563,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockManager.locked++
currentLock.locked++
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, currentLock, false)
}
if currentLock.longWaitIndex > 0 {
self.RemoveLongExpried(currentLock)
Expand Down Expand Up @@ -1618,12 +1621,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo

if command.TimeoutFlag&protocol.TIMEOUT_FLAG_REQUIRE_ACKED != 0 && !lock.isAof && lock.aofTime != 0xff {
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
currentLockData := lockManager.currentData
lockManager.ProcessLockData(command)
if lockManager.currentData != nil {
lockManager.currentData.recoverLock = lock
lockManager.currentData.recoverData = currentLockData
}
lockManager.ProcessLockData(command, lock, true)
}
if command.TimeoutFlag&protocol.TIMEOUT_FLAG_MILLISECOND_TIME == 0 {
self.AddTimeOut(lock, lock.timeoutTime)
Expand All @@ -1643,7 +1641,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo

lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, lock, false)
}
if command.ExpriedFlag&protocol.EXPRIED_FLAG_MILLISECOND_TIME == 0 {
self.AddExpried(lock, lock.expriedTime)
Expand All @@ -1665,7 +1663,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
isRequireAof := (lockManager.currentLock != nil && lockManager.currentLock.isAof) || (lockManager.currentData != nil && lockManager.currentData.isAof)
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, lock, false)
if isRequireAof && lockManager.currentData != nil && !lockManager.currentData.isAof {
_ = lockManager.PushLockAof(lock, 0)
}
Expand All @@ -1692,7 +1690,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
if self.checkLessLockVersion(lockManager, command) {
lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, lock, false)
}
lockManager.FreeLock(lock)
if lockManager.refCount == 0 {
Expand Down Expand Up @@ -1836,7 +1834,7 @@ func (self *LockDB) UnLock(serverProtocol ServerProtocol, command *protocol.Lock
} else {
lockData := lockManager.GetLockData()
if command.Flag&protocol.UNLOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, currentLock, false)
}
if currentLock.isAof {
_ = lockManager.PushUnLockAof(lockManager.dbId, currentLock, currentLock.command, command, true, AOF_FLAG_UPDATED)
Expand All @@ -1857,7 +1855,7 @@ func (self *LockDB) UnLock(serverProtocol ServerProtocol, command *protocol.Lock
//self.RemoveExpried(current_lock)
lockData := lockManager.GetLockData()
if command.Flag&protocol.UNLOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, currentLock, false)
}
currentLockCommand := currentLock.command
currentLock.expried = true
Expand Down Expand Up @@ -1897,7 +1895,7 @@ func (self *LockDB) UnLock(serverProtocol ServerProtocol, command *protocol.Lock
currentLock.expried = true
lockData := lockManager.GetLockData()
if command.Flag&protocol.UNLOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
lockManager.ProcessLockData(command, currentLock, false)
}
if currentLock.longWaitIndex > 0 {
self.RemoveLongExpried(currentLock)
Expand Down Expand Up @@ -2000,12 +1998,7 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
lockManager.locked++
waitLock.refCount++
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
currentLockData := lockManager.currentData
lockManager.ProcessLockData(waitLock.command)
if lockManager.currentData != nil {
lockManager.currentData.recoverLock = waitLock
lockManager.currentData.recoverData = currentLockData
}
lockManager.ProcessLockData(waitLock.command, waitLock, true)
}
err := lockManager.PushLockAof(waitLock, 0)
if err == nil {
Expand All @@ -2028,7 +2021,7 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser

lockData := lockManager.GetLockData()
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(waitLock.command)
lockManager.ProcessLockData(waitLock.command, waitLock, false)
}
if waitLock.command.ExpriedFlag&protocol.EXPRIED_FLAG_MILLISECOND_TIME == 0 {
self.AddExpried(waitLock, waitLock.expriedTime)
Expand All @@ -2053,7 +2046,7 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
lockData := lockManager.GetLockData()
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
isRequireAof := (lockManager.currentLock != nil && lockManager.currentLock.isAof) || (lockManager.currentData != nil && lockManager.currentData.isAof)
lockManager.ProcessLockData(waitLock.command)
lockManager.ProcessLockData(waitLock.command, waitLock, false)
if isRequireAof && lockManager.currentData != nil && !lockManager.currentData.isAof {
_ = lockManager.PushLockAof(waitLock, 0)
}
Expand Down Expand Up @@ -2231,19 +2224,7 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) {
lock.expriedTime = lock.startTime + int64(lock.command.Expried)/1000 + 1
}

var lockData []byte = nil
if lockManager.currentData != nil {
currentData := lockManager.currentData
if currentData.recoverLock == lock {
if currentData.recoverData != nil {
lockData = currentData.recoverData.GetData()
}
currentData.recoverLock = nil
currentData.recoverData = nil
} else {
lockData = lockManager.GetLockData()
}
}
lockData := lockManager.ProcessAckLockData(lock)
if lock.command.ExpriedFlag&protocol.EXPRIED_FLAG_MILLISECOND_TIME == 0 {
self.AddExpried(lock, lock.expriedTime)
} else {
Expand All @@ -2262,19 +2243,7 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) {
lockManager.locked -= uint32(lockLocked)
lockProtocol, lockCommand := lock.protocol, lock.command
lockManager.RemoveLock(lock)
if lockManager.currentData != nil {
currentData := lockManager.currentData
if currentData.recoverLock == lock {
if currentData.recoverData == nil {
lockManager.currentData = NewLockDataUnsetData()
} else {
lockManager.currentData = currentData.recoverData
lockManager.currentData.isAof = false
}
currentData.recoverLock = nil
currentData.recoverData = nil
}
}
lockManager.ProcessRecoverLockData(lock)
if lock.isAof {
_ = lockManager.PushUnLockAof(lockManager.dbId, lock, lockCommand, nil, false, 0)
}
Expand Down
Loading

0 comments on commit a2fd861

Please sign in to comment.