Skip to content

Commit

Permalink
Optimize strong consistent ack processing lock data exception recover…
Browse files Browse the repository at this point in the history
…y original data logic
  • Loading branch information
snower committed Apr 21, 2024
1 parent aedfdd9 commit 221d7d9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 51 deletions.
40 changes: 35 additions & 5 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,14 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockManager.locked++

if command.TimeoutFlag&protocol.TIMEOUT_FLAG_REQUIRE_ACKED != 0 && !lock.isAof && lock.aofTime != 0xff {
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
currentLockData := lockManager.currentData
lockManager.ProcessLockData(command)
if lockManager.currentData != nil {
lockManager.currentData.recoverLock = lock
lockManager.currentData.recoverData = currentLockData
}
}
if command.TimeoutFlag&protocol.TIMEOUT_FLAG_MILLISECOND_TIME == 0 {
self.AddTimeOut(lock, lock.timeoutTime)
} else {
Expand Down Expand Up @@ -1989,6 +1997,14 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
lockManager.AddLock(waitLock)
lockManager.locked++
waitLock.refCount++
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
currentLockData := lockManager.currentData
lockManager.ProcessLockData(waitLock.command)
if lockManager.currentData != nil {
lockManager.currentData.recoverLock = waitLock
lockManager.currentData.recoverData = currentLockData
}
}
err := lockManager.PushLockAof(waitLock, 0)
if err == nil {
lockManager.glock.Unlock()
Expand Down Expand Up @@ -2214,11 +2230,17 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) {
lock.expriedTime = lock.startTime + int64(lock.command.Expried)/1000 + 1
}

lockData := lockManager.GetLockData()
if lock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(lock.command)
if lockManager.currentData != nil {
lockManager.currentData.isAof = true
var lockData []byte
if lockManager.currentData != nil {
currentData := lockManager.currentData
if currentData.recoverLock == lock {
if currentData.recoverData != nil {
lockData = currentData.recoverData.Data
}
currentData.recoverLock = nil
currentData.recoverData = nil
} else {
lockData = lockManager.GetLockData()
}
}
if lock.command.ExpriedFlag&protocol.EXPRIED_FLAG_MILLISECOND_TIME == 0 {
Expand All @@ -2239,6 +2261,14 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) {
lockManager.locked -= uint32(lockLocked)
lockProtocol, lockCommand := lock.protocol, lock.command
lockManager.RemoveLock(lock)
if lockManager.currentData != nil {
currentData := lockManager.currentData
if currentData.recoverLock == lock {
lockManager.currentData = currentData.recoverData
currentData.recoverLock = nil
currentData.recoverData = nil
}
}
if lock.isAof {
if lockManager.currentData != nil {
lockManager.currentData.isAof = false
Expand Down
57 changes: 11 additions & 46 deletions server/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (self *LockManager) PushLockAof(lock *Lock, aofFlag uint16) error {
}

fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15]))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, aofFlag, lock.manager.AofLockData(lock, protocol.COMMAND_LOCK))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, aofFlag, lock.manager.AofLockData(protocol.COMMAND_LOCK))
if err != nil {
self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x",
lock.command.DbId, lock.command.LockKey, lock.command.LockId)
Expand All @@ -261,7 +261,7 @@ func (self *LockManager) PushUnLockAof(dbId uint8, lock *Lock, lockCommand *prot
}

fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15]))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(dbId, lock, protocol.COMMAND_UNLOCK, lockCommand, unLockCommand, aofFlag, lock.manager.AofLockData(lock, protocol.COMMAND_UNLOCK))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(dbId, lock, protocol.COMMAND_UNLOCK, lockCommand, unLockCommand, aofFlag, lock.manager.AofLockData(protocol.COMMAND_UNLOCK))
if err != nil {
self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x",
lock.command.DbId, lock.command.LockKey, lock.command.LockId)
Expand Down Expand Up @@ -333,18 +333,8 @@ func (self *LockManager) GetLockData() []byte {
return nil
}

func (self *LockManager) AofLockData(lock *Lock, commandType uint8) []byte {
if commandType == protocol.COMMAND_LOCK {
if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_REQUIRE_ACKED != 0 && !lock.isAof {
return lock.manager.ProcessLockDataAckTry(lock.command)
}
if self.currentData != nil {
self.currentData.isAof = true
return self.currentData.Data
}
}

if self.currentData != nil && !self.currentData.isAof {
func (self *LockManager) AofLockData(commandType uint8) []byte {
if self.currentData != nil && (commandType == protocol.COMMAND_LOCK || !self.currentData.isAof) {
self.currentData.isAof = true
return self.currentData.Data
}
Expand All @@ -355,35 +345,12 @@ func (self *LockManager) ProcessLockData(command *protocol.LockCommand) {
if command.Data == nil {
return
}
lockData := self.ProcessLockDataToSetData(command)
if lockData != nil {
self.currentData = NewLockData(lockData)
}
command.Data = nil
}

func (self *LockManager) ProcessLockDataAckTry(command *protocol.LockCommand) []byte {
if command.Data == nil {
return nil
}
lockData := self.ProcessLockDataToSetData(command)
if lockData != nil {
self.currentData = NewLockDataEmptySetData()
command.Data = protocol.NewLockCommandDataFromOriginBytes(lockData)
}
return lockData
}

func (self *LockManager) ProcessLockDataToSetData(command *protocol.LockCommand) []byte {
if command.Data == nil {
return nil
}
lockCommandData := command.Data
switch lockCommandData.CommandType {
case protocol.LOCK_DATA_COMMAND_TYPE_SET:
return lockCommandData.Data
self.currentData = NewLockData(lockCommandData.Data)
}
return nil
command.Data = nil
}

type Lock struct {
Expand Down Expand Up @@ -418,16 +385,14 @@ func (self *Lock) GetDB() *LockDB {
}

type LockData struct {
Data []byte
isAof bool
Data []byte
recoverData *LockData
recoverLock *Lock
isAof bool
}

func NewLockData(data []byte) *LockData {
return &LockData{data, false}
}

func NewLockDataEmptySetData() *LockData {
return &LockData{[]byte{2, 0, 0, 0, protocol.LOCK_DATA_COMMAND_TYPE_SET, 0}, false}
return &LockData{data, nil, nil, false}
}

type PriorityMutex struct {
Expand Down

0 comments on commit 221d7d9

Please sign in to comment.