Skip to content

Commit

Permalink
Optimize repeated updates of Lock status without retaining aof rewrit…
Browse files Browse the repository at this point in the history
…e only retains the last record
  • Loading branch information
snower committed Apr 20, 2024
1 parent 2a2339d commit cd7046c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
10 changes: 6 additions & 4 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ const AOF_LOCK_TYPE_REPLAY = 2
const AOF_LOCK_TYPE_ACK_FILE = 3
const AOF_LOCK_TYPE_ACK_ACKED = 4

const AOF_FLAG_REWRITEd = 0x0001
const AOF_FLAG_REWRITED = 0x0001
const AOF_FLAG_TIMEOUTED = 0x0002
const AOF_FLAG_EXPRIED = 0x0004
const AOF_FLAG_UPDATED = 0x0008
const AOF_FLAG_REQUIRE_ACKED = 0x1000
const AOF_FLAG_CONTAINS_DATA = 0x2000

Expand Down Expand Up @@ -1828,15 +1829,16 @@ func (self *Aof) loadRewriteAofFiles(aofFilenames []string) (*AofFile, []*AofFil
}

lockCommand.CommandType = lock.CommandType
lockCommand.RequestId = lock.GetRequestId()
lockCommand.DbId = lock.DbId
lockCommand.LockId = lock.LockId
lockCommand.LockKey = lock.LockKey
if !db.HasLock(lockCommand) {
if !db.HasLock(lockCommand, lock.CommandType == protocol.COMMAND_LOCK && lock.AofFlag&AOF_FLAG_UPDATED != 0 && lock.Rcount == 0) {
return true, nil
}

lock.AofFlag |= AOF_FLAG_REWRITEd
lock.buf[55] |= AOF_FLAG_REWRITEd
lock.AofFlag |= AOF_FLAG_REWRITED
lock.buf[55] |= AOF_FLAG_REWRITED
err = rewriteAofFile.AppendLock(lock)
if err != nil {
return true, err
Expand Down
28 changes: 16 additions & 12 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (self *LockDB) flushExpried(glockIndex uint16, doExpried bool) {
} else {
for _, lock := range doExpriedLocks {
if !lock.isAof && lock.aofTime != 0xff {
_ = lock.manager.PushLockAof(lock)
_ = lock.manager.PushLockAof(lock, 0)
}
}
}
Expand Down Expand Up @@ -1313,7 +1313,7 @@ func (self *LockDB) AddExpried(lock *Lock, lockExpriedTime int64) {
if !lock.isAof && lock.aofTime != 0xff {
if self.currentTime-lock.startTime >= int64(lock.aofTime) {
for i := uint8(0); i < lock.locked; i++ {
_ = lock.manager.PushLockAof(lock)
_ = lock.manager.PushLockAof(lock, 0)
}
}
}
Expand Down Expand Up @@ -1443,7 +1443,7 @@ func (self *LockDB) AddMillisecondExpried(lock *Lock) {
_ = lockQueue.Push(lock)

if !lock.isAof && lock.aofTime == 0 {
_ = lock.manager.PushLockAof(lock)
_ = lock.manager.PushLockAof(lock, 0)
}
}

Expand Down Expand Up @@ -1534,7 +1534,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
}
currentLock.protocol = serverProtocol.GetProxy()
if currentLock.isAof {
_ = lockManager.PushLockAof(currentLock)
_ = lockManager.PushLockAof(currentLock, AOF_FLAG_UPDATED)
}

command.Expried = uint16(currentLock.expriedTime - currentLock.startTime)
Expand Down Expand Up @@ -1574,7 +1574,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
}
currentLock.protocol = serverProtocol.GetProxy()
if currentLock.isAof {
_ = lockManager.PushLockAof(currentLock)
_ = lockManager.PushLockAof(currentLock, AOF_FLAG_UPDATED)
}
lockManager.state.LockCount++
lockManager.state.LockedCount++
Expand Down Expand Up @@ -1621,7 +1621,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
self.AddMillisecondTimeOut(lock)
}
lock.refCount += 2
err := lockManager.PushLockAof(lock)
err := lockManager.PushLockAof(lock, 0)
if err == nil {
lockManager.glock.Unlock()
} else {
Expand Down Expand Up @@ -1827,7 +1827,7 @@ func (self *LockDB) UnLock(serverProtocol ServerProtocol, command *protocol.Lock
lockManager.ProcessLockData(command)
}
if currentLock.isAof {
_ = lockManager.PushUnLockAof(lockManager.dbId, currentLock, currentLock.command, command, true, 0)
_ = lockManager.PushUnLockAof(lockManager.dbId, currentLock, currentLock.command, command, true, AOF_FLAG_UPDATED)
}
lockManager.state.UnLockCount++
lockManager.state.LockedCount--
Expand Down Expand Up @@ -1987,7 +1987,7 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
lockManager.AddLock(waitLock)
lockManager.locked++
waitLock.refCount++
err := lockManager.PushLockAof(waitLock)
err := lockManager.PushLockAof(waitLock, 0)
if err == nil {
lockManager.glock.Unlock()
} else {
Expand Down Expand Up @@ -2253,7 +2253,7 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) {
self.wakeUpWaitLocks(lockManager, nil)
}

func (self *LockDB) HasLock(command *protocol.LockCommand) bool {
func (self *LockDB) HasLock(command *protocol.LockCommand, checkUpdated bool) bool {
lockManager := self.GetLockManager(command)
if lockManager == nil {
return false
Expand All @@ -2274,12 +2274,16 @@ func (self *LockDB) HasLock(command *protocol.LockCommand) bool {
return false
}
currentLock := lockManager.GetLockedLock(command)
if currentLock != nil {
if currentLock == nil {
lockManager.glock.Unlock()
return true
return false
}
if checkUpdated && currentLock.command.RequestId != command.RequestId {
lockManager.glock.Unlock()
return false
}
lockManager.glock.Unlock()
return false
return true
}

func (self *LockDB) GetState() *protocol.LockDBState {
Expand Down
4 changes: 2 additions & 2 deletions server/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ func (self *LockManager) GetWaitLock() *Lock {
return nil
}

func (self *LockManager) PushLockAof(lock *Lock) error {
func (self *LockManager) PushLockAof(lock *Lock, aofFlag uint16) error {
if lock.command.Flag&protocol.LOCK_FLAG_FROM_AOF != 0 {
lock.isAof = true
return nil
}

fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15]))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, 0, lock.manager.AofLockData())
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, aofFlag, lock.manager.AofLockData())
if err != nil {
self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x",
lock.command.DbId, lock.command.LockKey, lock.command.LockId)
Expand Down
2 changes: 1 addition & 1 deletion server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (self *ReplicationClient) recvFiles() error {
}

currentAofIndex := self.aofLock.AofIndex
if self.aofLock.AofFlag&AOF_FLAG_REWRITEd != 0 {
if self.aofLock.AofFlag&AOF_FLAG_REWRITED != 0 {
currentAofIndex = 0
}
if currentAofIndex != aofIndex || aofFile == nil {
Expand Down

0 comments on commit cd7046c

Please sign in to comment.