diff --git a/server/lock.go b/server/lock.go index 81ada25..9277a9c 100755 --- a/server/lock.go +++ b/server/lock.go @@ -6,6 +6,13 @@ import ( "sync/atomic" ) +type ILockManagerRingQueue interface { + Push(lock *Lock) + Pop() *Lock + Head() *Lock + IterNodes() [][]*Lock +} + type LockManagerRingQueue struct { queue []*Lock index int @@ -47,6 +54,73 @@ func (self *LockManagerRingQueue) Head() *Lock { return self.queue[self.index] } +func (self *LockManagerRingQueue) IterNodes() [][]*Lock { + if self.index < len(self.queue) { + return [][]*Lock{self.queue[self.index:]} + } + return make([][]*Lock, 0) +} + +type LockManagerPriorityRingQueueNode struct { + ringQueue *LockManagerRingQueue + priority uint8 +} + +type LockManagerPriorityRingQueue struct { + priorityNodes []*LockManagerPriorityRingQueueNode + size int +} + +func NewLockManagerPriorityRingQueue(size int) *LockManagerPriorityRingQueue { + return &LockManagerPriorityRingQueue{make([]*LockManagerPriorityRingQueueNode, 0), size} +} + +func (self *LockManagerPriorityRingQueue) Push(lock *Lock) { + for _, node := range self.priorityNodes { + if node.priority == lock.command.Rcount { + node.ringQueue.Push(lock) + return + } + } + node := &LockManagerPriorityRingQueueNode{NewLockManagerRingQueue(self.size), lock.command.Rcount} + self.priorityNodes = append(self.priorityNodes, node) + for i := len(self.priorityNodes) - 2; i >= 0; i-- { + if self.priorityNodes[i].priority <= self.priorityNodes[i+1].priority { + break + } + self.priorityNodes[i], self.priorityNodes[i+1] = self.priorityNodes[i+1], self.priorityNodes[i] + } + node.ringQueue.Push(lock) +} + +func (self *LockManagerPriorityRingQueue) Pop() *Lock { + for _, node := range self.priorityNodes { + lock := node.ringQueue.Pop() + if lock != nil { + return lock + } + } + return nil +} + +func (self *LockManagerPriorityRingQueue) Head() *Lock { + for _, node := range self.priorityNodes { + lock := node.ringQueue.Head() + if lock != nil { + return lock + } + } + return nil +} + +func (self *LockManagerPriorityRingQueue) IterNodes() [][]*Lock { + iterNodes := make([][]*Lock, 0) + for _, node := range self.priorityNodes { + iterNodes = append(iterNodes, node.ringQueue.IterNodes()...) + } + return make([][]*Lock, 0) +} + type LockManagerLockQueue struct { queue *LockQueue maps map[[16]byte]*Lock @@ -74,10 +148,13 @@ func (self *LockManagerLockQueue) Head() *Lock { type LockManagerWaitQueue struct { fastQueue []*Lock fastIndex int - ringQueue *LockManagerRingQueue + ringQueue ILockManagerRingQueue } -func NewLockManagerWaitQueue() *LockManagerWaitQueue { +func NewLockManagerWaitQueue(priorityQueue bool) *LockManagerWaitQueue { + if priorityQueue { + return &LockManagerWaitQueue{make([]*Lock, 0, 8), 0, NewLockManagerPriorityRingQueue(16)} + } return &LockManagerWaitQueue{make([]*Lock, 0, 8), 0, nil} } @@ -134,12 +211,12 @@ func (self *LockManagerWaitQueue) Rellac() { } func (self *LockManagerWaitQueue) IterNodes() [][]*Lock { - lockNodes := make([][]*Lock, 0, 2) + lockNodes := make([][]*Lock, 0) if self.fastIndex < len(self.fastQueue) { lockNodes = append(lockNodes, self.fastQueue[self.fastIndex:]) } - if self.ringQueue != nil && self.ringQueue.index < len(self.ringQueue.queue) { - lockNodes = append(lockNodes, self.ringQueue.queue[self.ringQueue.index:]) + if self.ringQueue != nil { + lockNodes = append(lockNodes, self.ringQueue.IterNodes()...) } return lockNodes } @@ -381,7 +458,15 @@ func (self *LockManager) UpdateLockedLock(lock *Lock, command *protocol.LockComm func (self *LockManager) AddWaitLock(lock *Lock) *Lock { if self.waitLocks == nil { - self.waitLocks = NewLockManagerWaitQueue() + if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY != 0 { + self.waitLocks = NewLockManagerWaitQueue(true) + } else { + self.waitLocks = NewLockManagerWaitQueue(false) + } + } else { + if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY != 0 && self.waitLocks.ringQueue == nil { + self.waitLocks.ringQueue = NewLockManagerPriorityRingQueue(16) + } } self.waitLocks.Push(lock) lock.refCount++ diff --git a/server/lock_test.go b/server/lock_test.go index cd8d73a..90e7ca8 100644 --- a/server/lock_test.go +++ b/server/lock_test.go @@ -80,8 +80,62 @@ func TestLockManagerRingQueue(t *testing.T) { } } +func TestLockManagerPriorityRingQueue(t *testing.T) { + queue := NewLockManagerPriorityRingQueue(4) + + lock := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 1}} + queue.Push(lock) + if queue.Head() != lock || queue.Pop() != lock || len(queue.priorityNodes) != 1 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[0].ringQueue.index != 0 { + t.Errorf("LockManagerPriorityRingQueue Push Pop fail") + return + } + + lock1 := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 2}} + queue.Push(lock1) + lock2 := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 1}} + queue.Push(lock2) + if len(queue.priorityNodes) != 2 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[1].priority != 2 { + t.Errorf("LockManagerPriorityRingQueue Push Priority fail") + return + } + if queue.Head() != lock2 || queue.Pop() != lock2 || len(queue.priorityNodes) != 2 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[0].ringQueue.index != 0 { + t.Errorf("LockManagerPriorityRingQueue Push Pop fail") + return + } + if queue.Head() != lock1 || queue.Pop() != lock1 || len(queue.priorityNodes) != 2 || queue.priorityNodes[1].priority != 2 || queue.priorityNodes[1].ringQueue.index != 0 { + t.Errorf("LockManagerPriorityRingQueue Push Pop fail") + return + } + + for i := 0; i < 10000; i++ { + lock = &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: uint8(rand.Intn(50) + 1)}} + queue.Push(lock) + } + currentPriority := uint8(0) + for queue.Head() != nil { + lock = queue.Pop() + if lock == nil || lock.command.Rcount < currentPriority { + t.Errorf("LockManagerPriorityRingQueue Pop fail") + return + } + currentPriority = lock.command.Rcount + } + currentPriority = uint8(0) + for _, node := range queue.priorityNodes { + if node.priority <= currentPriority { + t.Errorf("LockManagerPriorityRingQueue priorityNodes fail") + return + } + if node.ringQueue.index != 0 { + t.Errorf("LockManagerPriorityRingQueue priorityNodes ringQueue fail") + return + } + currentPriority = node.priority + } +} + func TestLockManagerWaitQueue(t *testing.T) { - queue := NewLockManagerWaitQueue() + queue := NewLockManagerWaitQueue(false) lock := &Lock{} queue.Push(lock) @@ -117,7 +171,8 @@ func TestLockManagerWaitQueue(t *testing.T) { } for i := 0; i < 1024; i++ { queue.Push(lock) - if len(queue.fastQueue) != 8 || cap(queue.fastQueue) != 8 || queue.ringQueue == nil || len(queue.ringQueue.queue) != i+1 { + ringQueue := queue.ringQueue.(*LockManagerRingQueue) + if len(queue.fastQueue) != 8 || cap(queue.fastQueue) != 8 || queue.ringQueue == nil || len(ringQueue.queue) != i+1 { t.Errorf("LockManagerWaitQueue Push Size fail") return } @@ -151,7 +206,7 @@ func TestLockManagerWaitQueue(t *testing.T) { func BenchmarkLockManagerWaitQueue(b *testing.B) { lock := &Lock{} - queue := NewLockManagerWaitQueue() + queue := NewLockManagerWaitQueue(false) for i := 0; i < b.N; i++ { for j := 0; j < 10000; j++ { n := rand.Intn(1024)