From cd7046ca4c759c542a23aaf5eac554803c245dd6 Mon Sep 17 00:00:00 2001 From: snower Date: Sun, 21 Apr 2024 00:46:50 +0800 Subject: [PATCH] Optimize repeated updates of Lock status without retaining aof rewrite only retains the last record --- server/aof.go | 10 ++++++---- server/db.go | 28 ++++++++++++++++------------ server/lock.go | 4 ++-- server/replication.go | 2 +- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/server/aof.go b/server/aof.go index d69f2f4..a5aa405 100644 --- a/server/aof.go +++ b/server/aof.go @@ -21,9 +21,10 @@ const AOF_LOCK_TYPE_REPLAY = 2 const AOF_LOCK_TYPE_ACK_FILE = 3 const AOF_LOCK_TYPE_ACK_ACKED = 4 -const AOF_FLAG_REWRITEd = 0x0001 +const AOF_FLAG_REWRITED = 0x0001 const AOF_FLAG_TIMEOUTED = 0x0002 const AOF_FLAG_EXPRIED = 0x0004 +const AOF_FLAG_UPDATED = 0x0008 const AOF_FLAG_REQUIRE_ACKED = 0x1000 const AOF_FLAG_CONTAINS_DATA = 0x2000 @@ -1828,15 +1829,16 @@ func (self *Aof) loadRewriteAofFiles(aofFilenames []string) (*AofFile, []*AofFil } lockCommand.CommandType = lock.CommandType + lockCommand.RequestId = lock.GetRequestId() lockCommand.DbId = lock.DbId lockCommand.LockId = lock.LockId lockCommand.LockKey = lock.LockKey - if !db.HasLock(lockCommand) { + if !db.HasLock(lockCommand, lock.CommandType == protocol.COMMAND_LOCK && lock.AofFlag&AOF_FLAG_UPDATED != 0 && lock.Rcount == 0) { return true, nil } - lock.AofFlag |= AOF_FLAG_REWRITEd - lock.buf[55] |= AOF_FLAG_REWRITEd + lock.AofFlag |= AOF_FLAG_REWRITED + lock.buf[55] |= AOF_FLAG_REWRITED err = rewriteAofFile.AppendLock(lock) if err != nil { return true, err diff --git a/server/db.go b/server/db.go index 7405214..d3cd965 100755 --- a/server/db.go +++ b/server/db.go @@ -868,7 +868,7 @@ func (self *LockDB) flushExpried(glockIndex uint16, doExpried bool) { } else { for _, lock := range doExpriedLocks { if !lock.isAof && lock.aofTime != 0xff { - _ = lock.manager.PushLockAof(lock) + _ = lock.manager.PushLockAof(lock, 0) } } } @@ -1313,7 +1313,7 @@ func (self *LockDB) AddExpried(lock *Lock, lockExpriedTime int64) { if !lock.isAof && lock.aofTime != 0xff { if self.currentTime-lock.startTime >= int64(lock.aofTime) { for i := uint8(0); i < lock.locked; i++ { - _ = lock.manager.PushLockAof(lock) + _ = lock.manager.PushLockAof(lock, 0) } } } @@ -1443,7 +1443,7 @@ func (self *LockDB) AddMillisecondExpried(lock *Lock) { _ = lockQueue.Push(lock) if !lock.isAof && lock.aofTime == 0 { - _ = lock.manager.PushLockAof(lock) + _ = lock.manager.PushLockAof(lock, 0) } } @@ -1534,7 +1534,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo } currentLock.protocol = serverProtocol.GetProxy() if currentLock.isAof { - _ = lockManager.PushLockAof(currentLock) + _ = lockManager.PushLockAof(currentLock, AOF_FLAG_UPDATED) } command.Expried = uint16(currentLock.expriedTime - currentLock.startTime) @@ -1574,7 +1574,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo } currentLock.protocol = serverProtocol.GetProxy() if currentLock.isAof { - _ = lockManager.PushLockAof(currentLock) + _ = lockManager.PushLockAof(currentLock, AOF_FLAG_UPDATED) } lockManager.state.LockCount++ lockManager.state.LockedCount++ @@ -1621,7 +1621,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo self.AddMillisecondTimeOut(lock) } lock.refCount += 2 - err := lockManager.PushLockAof(lock) + err := lockManager.PushLockAof(lock, 0) if err == nil { lockManager.glock.Unlock() } else { @@ -1827,7 +1827,7 @@ func (self *LockDB) UnLock(serverProtocol ServerProtocol, command *protocol.Lock lockManager.ProcessLockData(command) } if currentLock.isAof { - _ = lockManager.PushUnLockAof(lockManager.dbId, currentLock, currentLock.command, command, true, 0) + _ = lockManager.PushUnLockAof(lockManager.dbId, currentLock, currentLock.command, command, true, AOF_FLAG_UPDATED) } lockManager.state.UnLockCount++ lockManager.state.LockedCount-- @@ -1987,7 +1987,7 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser lockManager.AddLock(waitLock) lockManager.locked++ waitLock.refCount++ - err := lockManager.PushLockAof(waitLock) + err := lockManager.PushLockAof(waitLock, 0) if err == nil { lockManager.glock.Unlock() } else { @@ -2253,7 +2253,7 @@ func (self *LockDB) DoAckLock(lock *Lock, succed bool) { self.wakeUpWaitLocks(lockManager, nil) } -func (self *LockDB) HasLock(command *protocol.LockCommand) bool { +func (self *LockDB) HasLock(command *protocol.LockCommand, checkUpdated bool) bool { lockManager := self.GetLockManager(command) if lockManager == nil { return false @@ -2274,12 +2274,16 @@ func (self *LockDB) HasLock(command *protocol.LockCommand) bool { return false } currentLock := lockManager.GetLockedLock(command) - if currentLock != nil { + if currentLock == nil { lockManager.glock.Unlock() - return true + return false + } + if checkUpdated && currentLock.command.RequestId != command.RequestId { + lockManager.glock.Unlock() + return false } lockManager.glock.Unlock() - return false + return true } func (self *LockDB) GetState() *protocol.LockDBState { diff --git a/server/lock.go b/server/lock.go index 6533f8c..6c3120f 100755 --- a/server/lock.go +++ b/server/lock.go @@ -230,14 +230,14 @@ func (self *LockManager) GetWaitLock() *Lock { return nil } -func (self *LockManager) PushLockAof(lock *Lock) error { +func (self *LockManager) PushLockAof(lock *Lock, aofFlag uint16) error { if lock.command.Flag&protocol.LOCK_FLAG_FROM_AOF != 0 { lock.isAof = true return nil } fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15])) - err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, 0, lock.manager.AofLockData()) + err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, aofFlag, lock.manager.AofLockData()) if err != nil { self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x", lock.command.DbId, lock.command.LockKey, lock.command.LockId) diff --git a/server/replication.go b/server/replication.go index e635b1d..bf1a557 100644 --- a/server/replication.go +++ b/server/replication.go @@ -569,7 +569,7 @@ func (self *ReplicationClient) recvFiles() error { } currentAofIndex := self.aofLock.AofIndex - if self.aofLock.AofFlag&AOF_FLAG_REWRITEd != 0 { + if self.aofLock.AofFlag&AOF_FLAG_REWRITED != 0 { currentAofIndex = 0 } if currentAofIndex != aofIndex || aofFile == nil {