Skip to content

Commit

Permalink
Optimize locking the same version multiple times and the lock can be …
Browse files Browse the repository at this point in the history
…successful
  • Loading branch information
snower committed May 22, 2024
1 parent dc73c29 commit db3c053
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 20 deletions.
39 changes: 23 additions & 16 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,57 @@ func (self *Database) sendCommand(command protocol.ICommand) error {
return self.client.SendCommand(command)
}

func (self *Database) Lock(lockKey [16]byte, timeout uint32, expried uint32) *Lock {
lock := NewLock(self, lockKey, timeout, expried)
if self.defaultTimeoutFlag > 0 {
lock.SetTimeoutFlag(self.defaultTimeoutFlag)
func (self *Database) mergeTimeoutFlag(timeout uint32) uint32 {
if self.defaultTimeoutFlag != 0 {
return timeout | (uint32(self.defaultTimeoutFlag) << 16)
}
if self.defaultExpriedFlag > 0 {
lock.SetExpriedFlag(self.defaultExpriedFlag)
return timeout
}

func (self *Database) mergeExpriedFlag(expried uint32) uint32 {
if self.defaultExpriedFlag != 0 {
return expried | (uint32(self.defaultExpriedFlag) << 16)
}
return lock
return expried
}

func (self *Database) Lock(lockKey [16]byte, timeout uint32, expried uint32) *Lock {
return NewLock(self, lockKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) Event(eventKey [16]byte, timeout uint32, expried uint32, defaultSeted bool) *Event {
if defaultSeted {
return NewDefaultSetEvent(self, eventKey, timeout, expried)
return NewDefaultSetEvent(self, eventKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}
return NewDefaultClearEvent(self, eventKey, timeout, expried)
return NewDefaultClearEvent(self, eventKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) GroupEvent(groupKey [16]byte, clientId uint64, versionId uint64, timeout uint32, expried uint32) *GroupEvent {
return NewGroupEvent(self, groupKey, clientId, versionId, timeout, expried)
return NewGroupEvent(self, groupKey, clientId, versionId, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) Semaphore(semaphoreKey [16]byte, timeout uint32, expried uint32, count uint16) *Semaphore {
return NewSemaphore(self, semaphoreKey, timeout, expried, count)
return NewSemaphore(self, semaphoreKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried), count)
}

func (self *Database) RWLock(lockKey [16]byte, timeout uint32, expried uint32) *RWLock {
return NewRWLock(self, lockKey, timeout, expried)
return NewRWLock(self, lockKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) RLock(lockKey [16]byte, timeout uint32, expried uint32) *RLock {
return NewRLock(self, lockKey, timeout, expried)
return NewRLock(self, lockKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) MaxConcurrentFlow(flowKey [16]byte, count uint16, timeout uint32, expried uint32) *MaxConcurrentFlow {
return NewMaxConcurrentFlow(self, flowKey, count, timeout, expried)
return NewMaxConcurrentFlow(self, flowKey, count, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) TokenBucketFlow(flowKey [16]byte, count uint16, timeout uint32, period float64) *TokenBucketFlow {
return NewTokenBucketFlow(self, flowKey, count, timeout, period)
return NewTokenBucketFlow(self, flowKey, count, self.mergeTimeoutFlag(timeout), period)
}

func (self *Database) TreeLock(lockKey [16]byte, parentKey [16]byte, timeout uint32, expried uint32) *TreeLock {
return NewTreeLock(self, lockKey, parentKey, timeout, expried)
return NewTreeLock(self, lockKey, parentKey, self.mergeTimeoutFlag(timeout), self.mergeExpriedFlag(expried))
}

func (self *Database) State() *protocol.StateResultCommand {
Expand Down
57 changes: 57 additions & 0 deletions client/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,60 @@ func TestLock_RequireAck(t *testing.T) {
}
})
}

func TestLock_MultiLockCheckVersion(t *testing.T) {
testWithClient(t, func(client *Client) {
lock1 := client.Lock(testString2Key("TestMultiLockCheckVersion"), 0, 10)
lock1.SetTimeoutFlag(protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED)
lock1.SetCount(0xffff)
lock1.lockId = [16]byte{1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}
lock2 := client.Lock(testString2Key("TestMultiLockCheckVersion"), 0, 10)
lock2.SetTimeoutFlag(protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED)
lock2.SetCount(0xffff)
lock2.lockId = [16]byte{1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0}
_, err := lock1.Lock()
if err != nil {
t.Errorf("MultiLockCheckVersion Lock Fail %v", err)
return
}
_, err = lock2.Lock()
if err != nil {
t.Errorf("MultiLockCheckVersion Lock Fail %v", err)
return
}
_, err = lock1.Unlock()
if err != nil {
t.Errorf("MultiLockCheckVersion Unlock Fail %v", err)
return
}
_, err = lock2.Unlock()
if err != nil {
t.Errorf("MultiLockCheckVersion Unlock Fail %v", err)
return
}

lock1 = client.Lock(testString2Key("TestMultiLockCheckVersion"), 0, 10)
lock1.SetTimeoutFlag(protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED)
lock1.SetCount(0xffff)
lock1.lockId = [16]byte{1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}
lock2 = client.Lock(testString2Key("TestMultiLockCheckVersion"), 0, 10)
lock2.SetTimeoutFlag(protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED)
lock2.SetCount(0xffff)
lock2.lockId = [16]byte{2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0}
_, err = lock1.Lock()
if err != nil {
t.Errorf("MultiLockCheckVersion Lock Fail %v", err)
return
}
result, err := lock2.Lock()
if err == nil || result == nil || result.Result != protocol.RESULT_TIMEOUT {
t.Errorf("MultiLockCheckVersion Lock Fail %v", err)
return
}
_, err = lock1.Unlock()
if err != nil {
t.Errorf("MultiLockCheckVersion Unlock Fail %v", err)
return
}
})
}
17 changes: 13 additions & 4 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,24 +2055,33 @@ func (self *LockDB) doLock(lockManager *LockManager, lock *Lock) bool {
if lockManager.locked == 0 {
return true
}

if lock.command.Count == 0 {
return false
}
if lockManager.locked >= 0xffff {
if lockManager.locked >= 0x7fffffff {
return false
}

if lockManager.currentLock.command.Count == 0xffff && lock.command.Count == 0xffff {
if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED != 0 {
if lockManager.currentLock != nil && self.compareLockVersion(lock.command.LockId, lockManager.currentLock.command.LockId) == 1 {
return false
}
}
return true
}
return false
}

if lockManager.locked <= uint32(lockManager.currentLock.command.Count) {
if lockManager.locked <= uint32(lock.command.Count) {
if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_LESS_LOCK_VERSION_IS_LOCK_SUCCED != 0 {
if lockManager.currentLock != nil && self.compareLockVersion(lock.command.LockId, lockManager.currentLock.command.LockId) == 1 {
return false
}
}
return true
}
}

return false
}

Expand Down

0 comments on commit db3c053

Please sign in to comment.