Skip to content

Commit

Permalink
Optimize master-slave synchronization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 25, 2024
1 parent 149f784 commit e7e2a08
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 32 deletions.
4 changes: 2 additions & 2 deletions run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Benchmark Single Key Multi Lock Info:
$BENCH_OUTPUTS
"

BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=8 --count=10000 --timeout_flag=4096"
BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=64 --count=10000 --timeout_flag=4096"
BENCH_OUTPUTS=$(which timeout > /dev/null 2>&1 && timeout 120 $BENCH_CMD 2>&1 || $BENCH_CMD 2>&1)
BENCH_SUCCEDS=$(echo "$BENCH_OUTPUTS" | grep "Succed")
if [ -z "$BENCH_SUCCEDS" ]
Expand Down Expand Up @@ -175,7 +175,7 @@ Benchmark Multi Key Single Lock With Data Info:
$BENCH_OUTPUTS
"

BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=8 --count=10000 --timeout_flag=4096 --data_length=4096"
BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=64 --count=10000 --timeout_flag=4096 --data_length=4096"
BENCH_OUTPUTS=$(which timeout > /dev/null 2>&1 && timeout 120 $BENCH_CMD 2>&1 || $BENCH_CMD 2>&1)
BENCH_SUCCEDS=$(echo "$BENCH_OUTPUTS" | grep "Succed")
if [ -z "$BENCH_SUCCEDS" ]
Expand Down
6 changes: 3 additions & 3 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ func (self *AofChannel) HandleAofAcked(aofLock *AofLock) {
func (self *AofChannel) HandleAcked(aofLock *AofLock) {
db := self.aof.slock.replicationManager.GetAckDB(aofLock.DbId)
if db != nil {
_ = db.Process(self.lockDbGlockIndex, aofLock)
_ = db.ProcessAcked(self.lockDbGlockIndex, aofLock)
}
}

Expand Down Expand Up @@ -1401,7 +1401,7 @@ func (self *Aof) NewAofChannel(lockDb *LockDB, lockDbGlockIndex uint16, lockDbGl
self.glock.Lock()
serverProtocol := NewMemWaiterServerProtocol(self.slock)
aofChannel := NewAofChannel(self, lockDb, lockDbGlockIndex, lockDbGlock)
_ = serverProtocol.SetResultCallback(func(serverProtocol *MemWaiterServerProtocol, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error {
_ = serverProtocol.SetResultCallback(func(serverProtocol *MemWaiterServerProtocol, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8, data []byte) error {
return self.lockLoaded(aofChannel, serverProtocol, command, result, lcount, lrcount)
})
aofChannel.serverProtocol = serverProtocol
Expand Down Expand Up @@ -1628,7 +1628,7 @@ func (self *Aof) lockLoaded(aofChannel *AofChannel, _ *MemWaiterServerProtocol,

db := self.slock.replicationManager.GetOrNewAckDB(command.DbId)
if db != nil {
return db.ProcessAcked(aofChannel.lockDbGlockIndex, command, result, lcount, lrcount)
return db.ProcessAckLocked(aofChannel.lockDbGlockIndex, command, result, lcount, lrcount)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions server/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (self *DefaultServerProtocol) FreeLockCommandLocked(command *protocol.LockC

var defaultServerProtocol *DefaultServerProtocol = nil

type MemWaiterServerProtocolResultCallback func(*MemWaiterServerProtocol, *protocol.LockCommand, uint8, uint16, uint8) error
type MemWaiterServerProtocolResultCallback func(*MemWaiterServerProtocol, *protocol.LockCommand, uint8, uint16, uint8, []byte) error

type MemWaiterServerProtocol struct {
slock *SLock
Expand Down Expand Up @@ -354,7 +354,7 @@ func (self *MemWaiterServerProtocol) ProcessLockCommand(lockCommand *protocol.Lo

func (self *MemWaiterServerProtocol) ProcessLockResultCommand(command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8, data []byte) error {
if self.resultCallback != nil {
return self.resultCallback(self, command, result, lcount, lrcount)
return self.resultCallback(self, command, result, lcount, lrcount, data)
}

self.glock.Lock()
Expand Down
60 changes: 35 additions & 25 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func (self *ReplicationAckDB) PushUnLock(glockIndex uint16, lock *AofLock) error
return nil
}

func (self *ReplicationAckDB) Process(glockIndex uint16, aofLock *AofLock) error {
func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, aofLock *AofLock) error {
requestId := aofLock.GetRequestId()
self.ackGlocks[glockIndex].Lock()
if lock, ok := self.locks[glockIndex][requestId]; ok {
Expand Down Expand Up @@ -1311,15 +1311,19 @@ func (self *ReplicationAckDB) PushAckLock(glockIndex uint16, aofLock *AofLock) e
requestId := aofLock.GetRequestId()
self.ackGlocks[glockIndex].Lock()
if ackLock, ok := self.ackLocks[glockIndex][requestId]; !ok {
self.glock.Lock()
if self.freeAckLocksIndex > 0 {
self.freeAckLocksIndex--
ackLock = self.freeAckLocks[self.freeAckLocksIndex]
self.glock.Unlock()
ackLock.locked = false
ackLock.aofed = false
self.glock.Lock()
if self.freeAckLocksIndex > 0 {
self.freeAckLocksIndex--
ackLock = self.freeAckLocks[self.freeAckLocksIndex]
self.glock.Unlock()
ackLock.locked = false
ackLock.aofed = false
} else {
self.glock.Unlock()
ackLock = NewReplicationAckLock()
}
} else {
self.glock.Unlock()
ackLock = NewReplicationAckLock()
}
self.ackLocks[glockIndex][requestId] = ackLock
Expand All @@ -1328,7 +1332,7 @@ func (self *ReplicationAckDB) PushAckLock(glockIndex uint16, aofLock *AofLock) e
return nil
}

func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error {
func (self *ReplicationAckDB) ProcessAckLocked(glockIndex uint16, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error {
self.ackGlocks[glockIndex].Lock()
ackLock, ok := self.ackLocks[glockIndex][command.RequestId]
if !ok {
Expand All @@ -1348,7 +1352,6 @@ func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol.
ackLock.lockResult.Lrcount = lrcount
ackLock.lockResult.Rcount = command.Rcount
ackLock.locked = true

if !ackLock.aofed {
self.ackGlocks[glockIndex].Unlock()
return nil
Expand All @@ -1359,12 +1362,14 @@ func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol.
if self.manager.clientChannel != nil {
_ = self.manager.clientChannel.HandleAcked(ackLock)
}
self.glock.Lock()
if self.freeAckLocksIndex < self.freeAckLocksMax {
self.freeAckLocks[self.freeAckLocksIndex] = ackLock
self.freeAckLocksIndex++
self.glock.Lock()
if self.freeAckLocksIndex < self.freeAckLocksMax {
self.freeAckLocks[self.freeAckLocksIndex] = ackLock
self.freeAckLocksIndex++
}
self.glock.Unlock()
}
self.glock.Unlock()
return nil
}

Expand All @@ -1373,22 +1378,25 @@ func (self *ReplicationAckDB) ProcessAckAofed(glockIndex uint16, aofLock *AofLoc
self.ackGlocks[glockIndex].Lock()
ackLock, ok := self.ackLocks[glockIndex][requestId]
if !ok {
self.glock.Lock()
if self.freeAckLocksIndex > 0 {
self.freeAckLocksIndex--
ackLock = self.freeAckLocks[self.freeAckLocksIndex]
self.glock.Unlock()
ackLock.locked = false
self.glock.Lock()
if self.freeAckLocksIndex > 0 {
self.freeAckLocksIndex--
ackLock = self.freeAckLocks[self.freeAckLocksIndex]
self.glock.Unlock()
ackLock.locked = false
} else {
self.glock.Unlock()
ackLock = NewReplicationAckLock()
}
} else {
self.glock.Unlock()
ackLock = NewReplicationAckLock()
}
self.ackLocks[glockIndex][requestId] = ackLock
}

ackLock.aofResult = aofLock.Result
ackLock.aofed = true

if !ackLock.locked {
self.ackGlocks[glockIndex].Unlock()
return nil
Expand All @@ -1399,12 +1407,14 @@ func (self *ReplicationAckDB) ProcessAckAofed(glockIndex uint16, aofLock *AofLoc
if self.manager.clientChannel != nil {
_ = self.manager.clientChannel.HandleAcked(ackLock)
}
self.glock.Lock()
if self.freeAckLocksIndex < self.freeAckLocksMax {
self.freeAckLocks[self.freeAckLocksIndex] = ackLock
self.freeAckLocksIndex++
self.glock.Lock()
if self.freeAckLocksIndex < self.freeAckLocksMax {
self.freeAckLocks[self.freeAckLocksIndex] = ackLock
self.freeAckLocksIndex++
}
self.glock.Unlock()
}
self.glock.Unlock()
return nil
}

Expand Down

0 comments on commit e7e2a08

Please sign in to comment.