Skip to content

Commit

Permalink
optimization implementation of saving Lock data
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 18, 2024
1 parent 1dcc8e8 commit 6057d0b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
4 changes: 3 additions & 1 deletion server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -796,6 +797,7 @@ func (self *AofChannel) AofAcked(buf []byte, succed bool) error {

copy(aofLock.buf, buf)
aofLock.data = nil
aofLock.AofFlag = 0
if succed {
aofLock.Result = protocol.RESULT_SUCCED
} else {
Expand Down Expand Up @@ -1243,7 +1245,7 @@ func (self *Aof) FindAofFiles() ([]string, string, error) {
}

fileName := info.Name()
if len(fileName) >= 11 && fileName[:10] == "append.aof" {
if len(fileName) >= 11 && strings.HasPrefix(fileName, "append.aof.") && !strings.HasSuffix(fileName, ".dat") {
aofIndex, err := strconv.ParseInt(fileName[11:], 10, 64)
if err == nil {
aofIndexs[uint32(aofIndex)] = fileName
Expand Down
33 changes: 21 additions & 12 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,11 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
return nil
}

lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_UPDATE_WHEN_LOCKED != 0 {
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
}
if currentLock.longWaitIndex > 0 {
self.RemoveLongExpried(currentLock)
lockManager.UpdateLockedLock(currentLock, command.Timeout, command.TimeoutFlag, command.Expried, command.ExpriedFlag, command.Count, command.Rcount)
Expand Down Expand Up @@ -1545,14 +1549,13 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
command.Rcount = currentLock.command.Rcount
lockManager.glock.Unlock()

_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_EXPRIED, uint16(lockManager.locked), currentLock.locked, lockManager.GetLockData())
_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_EXPRIED, uint16(lockManager.locked), currentLock.locked, lockData)
_ = serverProtocol.FreeLockCommand(command)
return nil
}

lockManager.locked++
currentLock.locked++
lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
}
Expand Down Expand Up @@ -1583,7 +1586,7 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockManager.glock.Unlock()
}

_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_LOCKED_ERROR, uint16(lockManager.locked), currentLock.locked, lockManager.GetLockData())
_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_LOCKED_ERROR, uint16(lockManager.locked), currentLock.locked, lockData)
_ = serverProtocol.FreeLockCommand(command)
return nil
}
Expand Down Expand Up @@ -1649,9 +1652,6 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
}

lockData := lockManager.GetLockData()
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(command)
}
if command.ExpriedFlag&protocol.EXPRIED_FLAG_PUSH_SUBSCRIBE != 0 {
_ = self.subscribeChannels[lockManager.glockIndex].Push(command, protocol.RESULT_EXPRIED, uint16(lockManager.locked), lock.locked, lockManager.GetLockData())
}
Expand All @@ -1662,7 +1662,11 @@ func (self *LockDB) Lock(serverProtocol ServerProtocol, command *protocol.LockCo
lockManager.state.LockCount++
lockManager.glock.Unlock()

_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_SUCCED, uint16(lockManager.locked), lock.locked, lockData)
if command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_ERROR, uint16(lockManager.locked), lock.locked, lockData)
} else {
_ = serverProtocol.ProcessLockResultCommand(command, protocol.RESULT_SUCCED, uint16(lockManager.locked), lock.locked, lockData)
}
_ = serverProtocol.FreeLockCommand(command)

if requireWakeup {
Expand Down Expand Up @@ -2026,9 +2030,6 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
}

lockData := lockManager.GetLockData()
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
lockManager.ProcessLockData(waitLock.command)
}
waitLockProtocol, waitLockCommand := waitLock.protocol, waitLock.command
lockManager.state.LockCount++
lockManager.state.WaitCount--
Expand All @@ -2038,10 +2039,18 @@ func (self *LockDB) wakeUpWaitLock(lockManager *LockManager, waitLock *Lock, ser
lockManager.glock.Unlock()

if waitLockProtocol.serverProtocol == serverProtocol {
_ = serverProtocol.ProcessLockResultCommand(waitLockCommand, protocol.RESULT_SUCCED, uint16(lockManager.locked), waitLock.locked, lockData)
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
_ = serverProtocol.ProcessLockResultCommand(waitLockCommand, protocol.RESULT_ERROR, uint16(lockManager.locked), waitLock.locked, lockData)
} else {
_ = serverProtocol.ProcessLockResultCommand(waitLockCommand, protocol.RESULT_SUCCED, uint16(lockManager.locked), waitLock.locked, lockData)
}
_ = serverProtocol.FreeLockCommand(waitLockCommand)
} else {
_ = waitLockProtocol.ProcessLockResultCommandLocked(waitLockCommand, protocol.RESULT_SUCCED, uint16(lockManager.locked), waitLock.locked, lockData)
if waitLock.command.Flag&protocol.LOCK_FLAG_CONTAINS_DATA != 0 {
_ = waitLockProtocol.ProcessLockResultCommandLocked(waitLockCommand, protocol.RESULT_ERROR, uint16(lockManager.locked), waitLock.locked, lockData)
} else {
_ = waitLockProtocol.ProcessLockResultCommandLocked(waitLockCommand, protocol.RESULT_SUCCED, uint16(lockManager.locked), waitLock.locked, lockData)
}
_ = waitLockProtocol.FreeLockCommandLocked(waitLockCommand)
}
}
Expand Down
23 changes: 18 additions & 5 deletions server/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (self *LockManager) PushLockAof(lock *Lock) error {
}

fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15]))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, 0, lock.manager.GetLockData())
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(lock.manager.dbId, lock, protocol.COMMAND_LOCK, lock.command, nil, 0, lock.manager.AofLockData())
if err != nil {
self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x",
lock.command.DbId, lock.command.LockKey, lock.command.LockId)
Expand All @@ -261,7 +261,7 @@ func (self *LockManager) PushUnLockAof(dbId uint8, lock *Lock, lockCommand *prot
}

fashHash := (uint32(self.lockKey[0])<<24 | uint32(self.lockKey[1])<<16 | uint32(self.lockKey[2])<<8 | uint32(self.lockKey[3])) ^ (uint32(self.lockKey[4])<<24 | uint32(self.lockKey[5])<<16 | uint32(self.lockKey[6])<<8 | uint32(self.lockKey[7])) ^ (uint32(self.lockKey[8])<<24 | uint32(self.lockKey[9])<<16 | uint32(self.lockKey[10])<<8 | uint32(self.lockKey[11])) ^ (uint32(self.lockKey[12])<<24 | uint32(self.lockKey[13])<<16 | uint32(self.lockKey[14])<<8 | uint32(self.lockKey[15]))
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(dbId, lock, protocol.COMMAND_UNLOCK, lockCommand, unLockCommand, aofFlag, lock.manager.GetLockData())
err := self.lockDb.aofChannels[fashHash%uint32(self.lockDb.managerMaxGlocks)].Push(dbId, lock, protocol.COMMAND_UNLOCK, lockCommand, unLockCommand, aofFlag, lock.manager.AofLockData())
if err != nil {
self.lockDb.slock.Log().Errorf("Database lock push aof error DbId:%d LockKey:%x LockId:%x",
lock.command.DbId, lock.command.LockKey, lock.command.LockId)
Expand Down Expand Up @@ -333,17 +333,25 @@ func (self *LockManager) GetLockData() []byte {
return nil
}

func (self *LockManager) AofLockData() []byte {
if self.currentData != nil && !self.currentData.isAof {
self.currentData.isAof = true
return self.currentData.Data
}
return nil
}

func (self *LockManager) ProcessLockData(command *protocol.LockCommand) {
if command.Data == nil {
return
}
lockCommandData := command.Data
switch lockCommandData.CommandType {
case protocol.LOCK_DATA_COMMAND_TYPE_SET:
self.currentData = &LockData{Data: lockCommandData.Data}
self.currentData = NewLockData(lockCommandData.Data)
break
}
command.Data.Data = nil
command.Data = nil
}

type Lock struct {
Expand Down Expand Up @@ -378,7 +386,12 @@ func (self *Lock) GetDB() *LockDB {
}

type LockData struct {
Data []byte
Data []byte
isAof bool
}

func NewLockData(data []byte) *LockData {
return &LockData{data, false}
}

type PriorityMutex struct {
Expand Down
57 changes: 41 additions & 16 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ type ReplicationBufferQueueItem struct {
seq uint64
}

func NewReplicationBufferQueueItem() *ReplicationBufferQueueItem {
return &ReplicationBufferQueueItem{nil, make([]byte, 64), nil, 0, 0, 0}
}

func (self *ReplicationBufferQueueItem) Init(buf []byte) {
self.nextItem = nil
self.buf = buf
self.pollCount = 0
self.pollIndex = 0
self.seq = 0
}

type ReplicationBufferQueueCursor struct {
currentItem *ReplicationBufferQueueItem
currentRequestId [16]byte
Expand All @@ -34,6 +46,10 @@ type ReplicationBufferQueueCursor struct {
writed bool
}

func NewReplicationBufferQueueCursor(buf []byte) *ReplicationBufferQueueCursor {
return &ReplicationBufferQueueCursor{nil, [16]byte{}, buf, nil, 0xffffffffffffffff, true}
}

type ReplicationBufferQueue struct {
manager *ReplicationManager
glock *sync.RWMutex
Expand All @@ -53,9 +69,23 @@ func NewReplicationBufferQueue(manager *ReplicationManager, bufSize uint64, maxS
queue := &ReplicationBufferQueue{manager, &sync.RWMutex{}, nil,
nil, nil, 0, 0, bufSize, maxSize,
0, 0, false}
queue.InitFreeQueueItems(bufSize / 64)
return queue
}

func (self *ReplicationBufferQueue) InitFreeQueueItems(count uint64) {
queueItems := make([]ReplicationBufferQueueItem, count)
queueItemBuf := make([]byte, count*64)
for i := uint64(0); i < count; i++ {
queueItem := &queueItems[i]
queueItem.Init(queueItemBuf[i*64 : (i+1)*64])
if self.freeHeadItem != nil {
queueItem.nextItem = self.freeHeadItem
}
self.freeHeadItem = queueItem
}
}

func (self *ReplicationBufferQueue) AddPoll(cursor *ReplicationBufferQueueCursor) error {
self.glock.Lock()
self.pollCount++
Expand Down Expand Up @@ -90,6 +120,7 @@ func (self *ReplicationBufferQueue) Push(buf []byte, data []byte) error {
var queueItem *ReplicationBufferQueueItem = nil
if self.usedBufferSize >= self.bufferSize && self.tailItem != nil {
if self.tailItem.pollIndex < self.tailItem.pollCount && self.bufferSize < self.maxBufferSize {
self.InitFreeQueueItems(self.bufferSize / 64)
self.bufferSize *= 2
self.dupCount++
} else {
Expand Down Expand Up @@ -125,7 +156,7 @@ func (self *ReplicationBufferQueue) Push(buf []byte, data []byte) error {
queueItem = self.freeHeadItem
self.freeHeadItem = self.freeHeadItem.nextItem
} else {
queueItem = &ReplicationBufferQueueItem{nil, make([]byte, 64), nil, 0, 0, 0}
queueItem = NewReplicationBufferQueueItem()
}
}

Expand Down Expand Up @@ -161,28 +192,22 @@ func (self *ReplicationBufferQueue) Pop(cursor *ReplicationBufferQueueCursor) er
self.glock.RUnlock()
return io.EOF
}
cursor.currentItem = currentItem
} else {
buf := currentItem.buf
if buf == nil && len(buf) != 64 {
if currentItem.seq-cursor.seq != 1 && currentItem.seq != 0 && cursor.seq != 0xffffffffffffffff {
self.glock.RUnlock()
return errors.New("out of buf")
}
requestId := cursor.currentRequestId
if requestId[0] != buf[3] || requestId[1] != buf[4] || requestId[2] != buf[5] || requestId[3] != buf[6] || requestId[4] != buf[7] || requestId[5] != buf[8] || requestId[6] != buf[9] || requestId[7] != buf[10] ||
requestId[8] != buf[11] || requestId[9] != buf[12] || requestId[10] != buf[13] || requestId[11] != buf[14] || requestId[12] != buf[15] || requestId[13] != buf[16] || requestId[14] != buf[17] || requestId[15] != buf[18] {
cursor.currentItem = currentItem
} else {
if currentItem.seq != cursor.seq {
self.glock.RUnlock()
return errors.New("out of buf")
}

nextCurrentItem := currentItem.nextItem
if nextCurrentItem == nil {
currentItem = currentItem.nextItem
if currentItem == nil {
self.glock.RUnlock()
return io.EOF
}
atomic.AddUint32(&currentItem.pollIndex, 1)
currentItem = nextCurrentItem
cursor.currentItem = nextCurrentItem
cursor.currentItem = currentItem
}

buf := currentItem.buf
Expand Down Expand Up @@ -866,8 +891,7 @@ type ReplicationServer struct {
func NewReplicationServer(manager *ReplicationManager, serverProtocol *BinaryServerProtocol) *ReplicationServer {
waofLock := NewAofLock()
return &ReplicationServer{manager, serverProtocol.stream, serverProtocol,
manager.slock.GetAof(), NewAofLock(), waofLock,
&ReplicationBufferQueueCursor{nil, [16]byte{}, waofLock.buf, nil, 0, true},
manager.slock.GetAof(), NewAofLock(), waofLock, NewReplicationBufferQueueCursor(waofLock.buf),
0, make(chan bool, 1), false, false, make(chan bool, 1), false}
}

Expand Down Expand Up @@ -1082,6 +1106,7 @@ func (self *ReplicationServer) SendProcess() error {
}
}
self.bufferCursor.writed = true
atomic.AddUint32(&self.bufferCursor.currentItem.pollIndex, 1)
}

bufferQueue := self.manager.bufferQueue
Expand Down

0 comments on commit 6057d0b

Please sign in to comment.