From 92470f3f62a0c3a54d9f7b5c5f42c0c5594425b4 Mon Sep 17 00:00:00 2001 From: snower Date: Tue, 21 May 2024 11:35:02 +0800 Subject: [PATCH] Optimize the memory usage of LockManager Locked queue and wait queue --- server/admin.go | 9 +-- server/db.go | 14 +--- server/lock.go | 177 +++++++++++++++++++++++++++++++++++++------ server/lock_test.go | 159 ++++++++++++++++++++++++++++++++++++++ server/protocol.go | 9 +-- server/queue.go | 12 +-- server/queue_test.go | 34 +++++++++ 7 files changed, 363 insertions(+), 51 deletions(-) diff --git a/server/admin.go b/server/admin.go index 23294c2..c306e9f 100644 --- a/server/admin.go +++ b/server/admin.go @@ -619,8 +619,8 @@ func (self *Admin) commandHandleShowLockCommand(serverProtocol *TextServerProtoc } if lockManager.locks != nil { - for i := range lockManager.locks.IterNodes() { - nodeQueues := lockManager.locks.IterNodeQueues(int32(i)) + for i := range lockManager.locks.queue.IterNodes() { + nodeQueues := lockManager.locks.queue.IterNodeQueues(int32(i)) for _, lock := range nodeQueues { if lock.locked == 0 { continue @@ -670,9 +670,8 @@ func (self *Admin) commandHandleShowLockWaitCommand(serverProtocol *TextServerPr lockInfos := make([]string, 0) lockManager.glock.LowPriorityLock() if lockManager.waitLocks != nil { - for i := range lockManager.waitLocks.IterNodes() { - nodeQueues := lockManager.waitLocks.IterNodeQueues(int32(i)) - for _, lock := range nodeQueues { + for _, waitLocks := range lockManager.waitLocks.IterNodes() { + for _, lock := range waitLocks { if lock.timeouted { continue } diff --git a/server/db.go b/server/db.go index cfae135..5a7c9ed 100755 --- a/server/db.go +++ b/server/db.go @@ -1015,7 +1015,6 @@ func (self *LockDB) initNewLockManager(dbId uint8) { lockManagers[i].lockDb = self lockManagers[i].dbId = dbId lockManagers[i].locks = nil - lockManagers[i].lockMaps = nil lockManagers[i].waitLocks = nil lockManagers[i].glock = self.managerGlocks[self.managerGlockIndex] lockManagers[i].glockIndex = self.managerGlockIndex @@ -1134,9 +1133,8 @@ func (self *LockDB) RemoveLockManager(lockManager *LockManager) { self.freeLockManagers[freeLockManagerHead] = lockManager lockManager.locks = nil - lockManager.lockMaps = nil if lockManager.waitLocks != nil { - _ = lockManager.waitLocks.Rellac() + lockManager.waitLocks.Rellac() } lockManager.currentData = nil atomic.AddUint32(&lockManager.state.KeyCount, 0xffffffff) @@ -1148,7 +1146,6 @@ func (self *LockDB) RemoveLockManager(lockManager *LockManager) { lockManager.currentLock = nil lockManager.currentData = nil lockManager.locks = nil - lockManager.lockMaps = nil lockManager.waitLocks = nil lockManager.freeLocks = nil atomic.AddUint32(&lockManager.state.KeyCount, 0xffffffff) @@ -1176,9 +1173,8 @@ func (self *LockDB) RemoveLockManager(lockManager *LockManager) { self.freeLockManagers[freeLockManagerHead] = lockManager lockManager.locks = nil - lockManager.lockMaps = nil if lockManager.waitLocks != nil { - _ = lockManager.waitLocks.Rellac() + lockManager.waitLocks.Rellac() } lockManager.currentData = nil atomic.AddUint32(&lockManager.state.KeyCount, 0xffffffff) @@ -1190,7 +1186,6 @@ func (self *LockDB) RemoveLockManager(lockManager *LockManager) { lockManager.currentLock = nil lockManager.currentData = nil lockManager.locks = nil - lockManager.lockMaps = nil lockManager.waitLocks = nil lockManager.freeLocks = nil atomic.AddUint32(&lockManager.state.KeyCount, 0xffffffff) @@ -2189,9 +2184,8 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser func (self *LockDB) cancelWaitLock(lockManager *LockManager, command *protocol.LockCommand, serverProtocol ServerProtocol) { var waitLock *Lock = nil if lockManager.waitLocks != nil { - for i := range lockManager.waitLocks.IterNodes() { - nodeQueues := lockManager.waitLocks.IterNodeQueues(int32(i)) - for _, lock := range nodeQueues { + for _, waitLocks := range lockManager.waitLocks.IterNodes() { + for _, lock := range waitLocks { if lock.timeouted { continue } diff --git a/server/lock.go b/server/lock.go index 1334eaa..7c34262 100755 --- a/server/lock.go +++ b/server/lock.go @@ -6,14 +6,151 @@ import ( "sync/atomic" ) +type LockManagerRingQueue struct { + queue []*Lock + index int +} + +func NewLockManagerRingQueue(size int) *LockManagerRingQueue { + return &LockManagerRingQueue{make([]*Lock, 0, size), 0} +} + +func (self *LockManagerRingQueue) Push(lock *Lock) { + if len(self.queue) == cap(self.queue) { + if self.index > len(self.queue)/2 { + copy(self.queue, self.queue[self.index:]) + self.queue = self.queue[:len(self.queue)-self.index] + self.index = 0 + } + } + self.queue = append(self.queue, lock) +} + +func (self *LockManagerRingQueue) Pop() *Lock { + if self.index >= len(self.queue) { + return nil + } + lock := self.queue[self.index] + self.queue[self.index] = nil + self.index++ + if self.index >= len(self.queue) { + self.queue = self.queue[:0] + self.index = 0 + } + return lock +} + +func (self *LockManagerRingQueue) Head() *Lock { + if self.index >= len(self.queue) { + return nil + } + return self.queue[self.index] +} + +type LockManagerLockQueue struct { + queue *LockQueue + maps map[[16]byte]*Lock +} + +func NewLockManagerLockQueue() *LockManagerLockQueue { + return &LockManagerLockQueue{NewLockQueue(4, 16, 4), make(map[[16]byte]*Lock)} +} + +func (self *LockManagerLockQueue) Push(lock *Lock) { + err := self.queue.Push(lock) + if err == nil { + self.maps[lock.command.LockId] = lock + } +} + +func (self *LockManagerLockQueue) Pop() *Lock { + return self.queue.Pop() +} + +func (self *LockManagerLockQueue) Head() *Lock { + return self.queue.Head() +} + +type LockManagerWaitQueue struct { + fastQueue []*Lock + fastIndex int + ringQueue *LockManagerRingQueue +} + +func NewLockManagerWaitQueue() *LockManagerWaitQueue { + return &LockManagerWaitQueue{make([]*Lock, 0, 16), 0, nil} +} + +func (self *LockManagerWaitQueue) Push(lock *Lock) { + if self.ringQueue != nil { + self.ringQueue.Push(lock) + return + } + if len(self.fastQueue) == cap(self.fastQueue) { + if self.fastIndex > len(self.fastQueue)/2 { + copy(self.fastQueue, self.fastQueue[self.fastIndex:]) + self.fastQueue = self.fastQueue[:len(self.fastQueue)-self.fastIndex] + self.fastIndex = 0 + } else { + self.ringQueue = NewLockManagerRingQueue(64) + self.ringQueue.Push(lock) + return + } + } + self.fastQueue = append(self.fastQueue, lock) +} + +func (self *LockManagerWaitQueue) Pop() *Lock { + if self.fastIndex < len(self.fastQueue) { + lock := self.fastQueue[self.fastIndex] + self.fastQueue[self.fastIndex] = nil + self.fastIndex++ + if self.fastIndex >= len(self.fastQueue) { + self.fastQueue = self.fastQueue[:0] + self.fastIndex = 0 + } + return lock + } + if self.ringQueue != nil { + return self.ringQueue.Pop() + } + return nil +} + +func (self *LockManagerWaitQueue) Head() *Lock { + if self.fastIndex < len(self.fastQueue) { + return self.fastQueue[self.fastIndex] + } + if self.ringQueue != nil { + return self.ringQueue.Head() + } + return nil +} + +func (self *LockManagerWaitQueue) Rellac() { + self.fastQueue = self.fastQueue[:0] + self.fastIndex = 0 + self.ringQueue = nil +} + +func (self *LockManagerWaitQueue) IterNodes() [][]*Lock { + lockNodes := make([][]*Lock, 0, 2) + if self.fastIndex < len(self.fastQueue) { + lockNodes = append(lockNodes, self.fastQueue[self.fastIndex:]) + } + if self.ringQueue != nil && self.ringQueue.index < len(self.ringQueue.queue) { + lockNodes = append(lockNodes, self.ringQueue.queue[self.ringQueue.index:]) + } + return lockNodes +} + type LockManager struct { lockDb *LockDB lockKey [16]byte currentLock *Lock currentData *LockManagerData - locks *LockQueue - lockMaps map[[16]byte]*Lock - waitLocks *LockQueue + locks *LockManagerLockQueue + waitLocks *LockManagerWaitQueue glock *PriorityMutex freeLocks *LockQueue fastKeyValue *FastKeyValue @@ -27,7 +164,7 @@ type LockManager struct { func NewLockManager(lockDb *LockDB, command *protocol.LockCommand, glock *PriorityMutex, glockIndex uint16, freeLocks *LockQueue, state *protocol.LockDBState) *LockManager { return &LockManager{lockDb, command.LockKey, nil, nil, nil, nil, - nil, glock, freeLocks, nil, state, 0, 0, + glock, freeLocks, nil, state, 0, 0, glockIndex, command.DbId, false} } @@ -86,11 +223,9 @@ func (self *LockManager) AddLock(lock *Lock) *Lock { self.currentLock = lock } else { if self.locks == nil { - self.locks = NewLockQueue(4, 4, 4) - self.lockMaps = make(map[[16]byte]*Lock, 8) + self.locks = NewLockManagerLockQueue() } - _ = self.locks.Push(lock) - self.lockMaps[lock.command.LockId] = lock + self.locks.Push(lock) } return lock } @@ -109,7 +244,7 @@ func (self *LockManager) RemoveLock(lock *Lock) *Lock { lockedLock := self.locks.Pop() for lockedLock != nil { if lockedLock.locked > 0 { - delete(self.lockMaps, lockedLock.command.LockId) + delete(self.locks.maps, lockedLock.command.LockId) self.currentLock = lockedLock break } @@ -121,8 +256,8 @@ func (self *LockManager) RemoveLock(lock *Lock) *Lock { lockedLock = self.locks.Pop() } - if self.locks.headNodeIndex >= 8 { - _ = self.locks.Resize() + if self.locks.queue.headNodeIndex >= 8 { + _ = self.locks.queue.Resize() } return lock } @@ -130,7 +265,7 @@ func (self *LockManager) RemoveLock(lock *Lock) *Lock { if self.locks == nil { return lock } - delete(self.lockMaps, lock.command.LockId) + delete(self.locks.maps, lock.command.LockId) lockedLock := self.locks.Head() for lockedLock != nil { if lockedLock.locked > 0 { @@ -145,8 +280,8 @@ func (self *LockManager) RemoveLock(lock *Lock) *Lock { lockedLock = self.locks.Head() } - if self.locks.headNodeIndex >= 8 { - _ = self.locks.Resize() + if self.locks.queue.headNodeIndex >= 8 { + _ = self.locks.queue.Resize() } return lock } @@ -158,7 +293,7 @@ func (self *LockManager) GetLockedLock(command *protocol.LockCommand) *Lock { if self.locks == nil { return nil } - lockedLock, ok := self.lockMaps[command.LockId] + lockedLock, ok := self.locks.maps[command.LockId] if ok { return lockedLock } @@ -246,9 +381,9 @@ func (self *LockManager) UpdateLockedLock(lock *Lock, command *protocol.LockComm func (self *LockManager) AddWaitLock(lock *Lock) *Lock { if self.waitLocks == nil { - self.waitLocks = NewLockQueue(4, 4, 4) + self.waitLocks = NewLockManagerWaitQueue() } - _ = self.waitLocks.Push(lock) + self.waitLocks.Push(lock) lock.refCount++ self.waited = true return lock @@ -269,16 +404,8 @@ func (self *LockManager) GetWaitLock() *Lock { lock = self.waitLocks.Head() continue } - - if self.waitLocks.headNodeIndex >= 8 { - _ = self.waitLocks.Resize() - } return lock } - - if self.waitLocks.headNodeIndex >= 8 { - _ = self.waitLocks.Resize() - } return nil } diff --git a/server/lock_test.go b/server/lock_test.go index e81ef75..5da7612 100644 --- a/server/lock_test.go +++ b/server/lock_test.go @@ -2,10 +2,169 @@ package server import ( "github.com/snower/slock/protocol" + "math/rand" "testing" "time" ) +func TestLockManagerRingQueue(t *testing.T) { + queue := NewLockManagerRingQueue(4) + + lock := &Lock{} + queue.Push(lock) + if queue.Head() != lock || queue.Pop() != lock || queue.index != 0 { + t.Errorf("LockManagerRingQueue Push Pop fail") + return + } + + for i := 0; i < 4; i++ { + queue.Push(lock) + if len(queue.queue) != i+1 || cap(queue.queue) != 4 { + t.Errorf("LockManagerRingQueue Push Size fail") + return + } + } + for i := 0; i < 3; i++ { + if queue.Pop() != lock || queue.index != i+1 { + t.Errorf("LockManagerRingQueue Pop fail") + return + } + } + queue.Push(lock) + if len(queue.queue) != 2 || cap(queue.queue) != 4 || queue.index != 0 { + t.Errorf("LockManagerRingQueue Push Size fail") + return + } + for i := 0; i < 2; i++ { + queue.Push(lock) + if len(queue.queue) != i+3 || cap(queue.queue) != 4 { + t.Errorf("LockManagerRingQueue Push Size fail") + return + } + } + for i := 0; i < 2; i++ { + queue.Push(lock) + if len(queue.queue) != i+5 || cap(queue.queue) != 8 { + t.Errorf("LockManagerRingQueue Push Size fail") + return + } + } + for i := 0; i < 5; i++ { + if queue.Pop() != lock || queue.index != i+1 || cap(queue.queue) != 8 { + t.Errorf("LockManagerRingQueue Pop fail") + return + } + } + if queue.Head() != lock || queue.Pop() != lock || queue.index != 0 || cap(queue.queue) != 8 || queue.Head() != nil { + t.Errorf("LockManagerRingQueue Pop fail") + return + } + + for i := 0; i < 1000000; i++ { + queue.Push(lock) + } + if len(queue.queue) != 1000000 || cap(queue.queue) != 1135616 { + t.Errorf("LockManagerRingQueue Push Size fail") + return + } + for i := 0; i < 1000000; i++ { + if queue.Pop() != lock { + t.Errorf("LockManagerRingQueue Pop fail") + return + } + } + if len(queue.queue) != 0 || cap(queue.queue) != 1135616 || queue.index != 0 { + t.Errorf("LockManagerRingQueue Pop Size fail") + return + } +} + +func TestLockManagerWaitQueue(t *testing.T) { + queue := NewLockManagerWaitQueue() + + lock := &Lock{} + queue.Push(lock) + if queue.Head() != lock || queue.Pop() != lock || queue.fastIndex != 0 { + t.Errorf("LockManagerWaitQueue Push Pop fail") + return + } + + for i := 0; i < 16; i++ { + queue.Push(lock) + if len(queue.fastQueue) != i+1 || cap(queue.fastQueue) != 16 { + t.Errorf("LockManagerWaitQueue Push Size fail") + return + } + } + for i := 0; i < 15; i++ { + if queue.Pop() != lock || queue.fastIndex != i+1 { + t.Errorf("LockManagerWaitQueue Pop fail") + return + } + } + queue.Push(lock) + if len(queue.fastQueue) != 2 || cap(queue.fastQueue) != 16 || queue.fastIndex != 0 { + t.Errorf("LockManagerWaitQueue Push Size fail") + return + } + for i := 0; i < 14; i++ { + queue.Push(lock) + if len(queue.fastQueue) != i+3 || cap(queue.fastQueue) != 16 { + t.Errorf("LockManagerWaitQueue Push Size fail") + return + } + } + for i := 0; i < 1024; i++ { + queue.Push(lock) + if len(queue.fastQueue) != 16 || cap(queue.fastQueue) != 16 || queue.ringQueue == nil || len(queue.ringQueue.queue) != i+1 { + t.Errorf("LockManagerWaitQueue Push Size fail") + return + } + } + for i := 0; i < 15; i++ { + if queue.Pop() != lock || queue.fastIndex != i+1 || cap(queue.fastQueue) != 16 { + t.Errorf("LockManagerWaitQueue Pop fail") + return + } + } + if queue.Head() != lock || queue.Pop() != lock || queue.fastIndex != 0 || cap(queue.fastQueue) != 16 || queue.Head() != lock { + t.Errorf("LockManagerWaitQueue Pop fail") + return + } + for i := 0; i < 1023; i++ { + if queue.Pop() != lock { + t.Errorf("LockManagerWaitQueue Pop fail") + return + } + } + if queue.Head() != lock || queue.Pop() != lock || queue.Head() != nil { + t.Errorf("LockManagerWaitQueue Pop fail") + return + } + queue.Rellac() + if queue.fastIndex != 0 || len(queue.fastQueue) != 0 || cap(queue.fastQueue) != 16 || queue.ringQueue != nil { + t.Errorf("LockManagerWaitQueue Rellac fail") + return + } +} + +func BenchmarkLockManagerWaitQueue(b *testing.B) { + lock := &Lock{} + queue := NewLockManagerWaitQueue() + for i := 0; i < b.N; i++ { + for j := 0; j < 10000; j++ { + n := rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Push(lock) + } + n = rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Pop() + } + } + } +} + func TestLockManager_ProcessLockDataSet(t *testing.T) { testWithLockDB(t, func(db *LockDB) { lockKey := protocol.GenLockId() diff --git a/server/protocol.go b/server/protocol.go index 8e96c40..3ecf898 100755 --- a/server/protocol.go +++ b/server/protocol.go @@ -1700,8 +1700,8 @@ func (self *BinaryServerProtocol) commandHandleListLockedCommand(_ *BinaryServer } if lockManager.locks != nil { - for i := range lockManager.locks.IterNodes() { - nodeQueues := lockManager.locks.IterNodeQueues(int32(i)) + for i := range lockManager.locks.queue.IterNodes() { + nodeQueues := lockManager.locks.queue.IterNodeQueues(int32(i)) for _, lock := range nodeQueues { if lock.locked == 0 { continue @@ -1760,9 +1760,8 @@ func (self *BinaryServerProtocol) commandHandleListWaitCommand(_ *BinaryServerPr locks := make([]*protobuf.LockDBLockWait, 0) lockManager.glock.LowPriorityLock() if lockManager.waitLocks != nil { - for i := range lockManager.waitLocks.IterNodes() { - nodeQueues := lockManager.waitLocks.IterNodeQueues(int32(i)) - for _, lock := range nodeQueues { + for _, waitLocks := range lockManager.waitLocks.IterNodes() { + for _, lock := range waitLocks { if lock.timeouted { continue } diff --git a/server/queue.go b/server/queue.go index 151e1a9..b878844 100644 --- a/server/queue.go +++ b/server/queue.go @@ -29,7 +29,7 @@ func NewLockQueue(baseNodeSize int32, nodeSize int32, queueSize int32) *LockQueu queues := make([][]*Lock, nodeSize) nodeQueueSizes := make([]int32, nodeSize) - queues[0] = make([]*Lock, queueSize) + queues[0] = make([]*Lock, queueSize, queueSize) nodeQueueSizes[0] = queueSize return &LockQueue{0, queueSize, queues[0], 0, @@ -48,7 +48,7 @@ func (self *LockQueue) mallocQueue() { self.queueSize = QUEUE_MAX_MALLOC_SIZE } - self.queues = append(self.queues, make([]*Lock, self.queueSize)) + self.queues = append(self.queues, make([]*Lock, self.queueSize, self.queueSize)) self.nodeQueueSizes = append(self.nodeQueueSizes, self.queueSize) self.nodeIndex++ self.nodeSize++ @@ -58,7 +58,7 @@ func (self *LockQueue) mallocQueue() { self.queueSize = QUEUE_MAX_MALLOC_SIZE } - self.queues[self.tailNodeIndex] = make([]*Lock, self.queueSize) + self.queues[self.tailNodeIndex] = make([]*Lock, self.queueSize, self.queueSize) self.nodeQueueSizes[self.tailNodeIndex] = self.queueSize self.nodeIndex++ } @@ -334,7 +334,7 @@ func NewLockCommandQueue(baseNodeSize int32, nodeSize int32, queueSize int32) *L queues := make([][]*protocol.LockCommand, nodeSize) nodeQueueSizes := make([]int32, nodeSize) - queues[0] = make([]*protocol.LockCommand, queueSize) + queues[0] = make([]*protocol.LockCommand, queueSize, queueSize) nodeQueueSizes[0] = queueSize return &LockCommandQueue{0, queueSize, queues[0], 0, @@ -352,7 +352,7 @@ func (self *LockCommandQueue) mallocQueue() { if self.queueSize > QUEUE_MAX_MALLOC_SIZE { self.queueSize = QUEUE_MAX_MALLOC_SIZE } - self.queues = append(self.queues, make([]*protocol.LockCommand, self.queueSize)) + self.queues = append(self.queues, make([]*protocol.LockCommand, self.queueSize, self.queueSize)) self.nodeQueueSizes = append(self.nodeQueueSizes, self.queueSize) self.nodeIndex++ self.nodeSize++ @@ -361,7 +361,7 @@ func (self *LockCommandQueue) mallocQueue() { if self.queueSize > QUEUE_MAX_MALLOC_SIZE { self.queueSize = QUEUE_MAX_MALLOC_SIZE } - self.queues[self.tailNodeIndex] = make([]*protocol.LockCommand, self.queueSize) + self.queues[self.tailNodeIndex] = make([]*protocol.LockCommand, self.queueSize, self.queueSize) self.nodeQueueSizes[self.tailNodeIndex] = self.queueSize self.nodeIndex++ } diff --git a/server/queue_test.go b/server/queue_test.go index 8738b35..426e040 100644 --- a/server/queue_test.go +++ b/server/queue_test.go @@ -6,6 +6,23 @@ import ( "testing" ) +func BenchmarkLockQueue(b *testing.B) { + lock := &Lock{} + queue := NewLockQueue(4, 16, 4) + for i := 0; i < b.N; i++ { + for j := 0; j < 10000; j++ { + n := rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Push(lock) + } + n = rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Pop() + } + } + } +} + func TestLockQueuePushPop(t *testing.T) { head := &Lock{} tail := &Lock{} @@ -480,6 +497,23 @@ func TestLockQueueIter(t *testing.T) { } } +func BenchmarkLockCommandQueue(b *testing.B) { + lockCommand := &protocol.LockCommand{} + queue := NewLockCommandQueue(4, 16, 4) + for i := 0; i < b.N; i++ { + for j := 0; j < 10000; j++ { + n := rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Push(lockCommand) + } + n = rand.Intn(1024) + for k := 0; k < n; k++ { + queue.Pop() + } + } + } +} + func TestLockCommandQueuePushPop(t *testing.T) { head := &protocol.LockCommand{} tail := &protocol.LockCommand{}