From e7e2a081f9b27721f8c46399f9a7085ee40d6ae0 Mon Sep 17 00:00:00 2001 From: snower Date: Thu, 25 Apr 2024 20:31:03 +0800 Subject: [PATCH] Optimize master-slave synchronization logic --- run-tests | 4 +-- server/aof.go | 6 ++--- server/protocol.go | 4 +-- server/replication.go | 60 +++++++++++++++++++++++++------------------ 4 files changed, 42 insertions(+), 32 deletions(-) diff --git a/run-tests b/run-tests index 8abeadb..d071351 100755 --- a/run-tests +++ b/run-tests @@ -137,7 +137,7 @@ Benchmark Single Key Multi Lock Info: $BENCH_OUTPUTS " -BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=8 --count=10000 --timeout_flag=4096" +BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=64 --count=10000 --timeout_flag=4096" BENCH_OUTPUTS=$(which timeout > /dev/null 2>&1 && timeout 120 $BENCH_CMD 2>&1 || $BENCH_CMD 2>&1) BENCH_SUCCEDS=$(echo "$BENCH_OUTPUTS" | grep "Succed") if [ -z "$BENCH_SUCCEDS" ] @@ -175,7 +175,7 @@ Benchmark Multi Key Single Lock With Data Info: $BENCH_OUTPUTS " -BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=8 --count=10000 --timeout_flag=4096 --data_length=4096" +BENCH_CMD="go run tools/benchmark/main.go --port=5659 --client=4 --conc=64 --count=10000 --timeout_flag=4096 --data_length=4096" BENCH_OUTPUTS=$(which timeout > /dev/null 2>&1 && timeout 120 $BENCH_CMD 2>&1 || $BENCH_CMD 2>&1) BENCH_SUCCEDS=$(echo "$BENCH_OUTPUTS" | grep "Succed") if [ -z "$BENCH_SUCCEDS" ] diff --git a/server/aof.go b/server/aof.go index f71f7c6..e7b786f 100644 --- a/server/aof.go +++ b/server/aof.go @@ -1054,7 +1054,7 @@ func (self *AofChannel) HandleAofAcked(aofLock *AofLock) { func (self *AofChannel) HandleAcked(aofLock *AofLock) { db := self.aof.slock.replicationManager.GetAckDB(aofLock.DbId) if db != nil { - _ = db.Process(self.lockDbGlockIndex, aofLock) + _ = db.ProcessAcked(self.lockDbGlockIndex, aofLock) } } @@ -1401,7 +1401,7 @@ func (self *Aof) NewAofChannel(lockDb *LockDB, lockDbGlockIndex uint16, lockDbGl self.glock.Lock() serverProtocol := NewMemWaiterServerProtocol(self.slock) aofChannel := NewAofChannel(self, lockDb, lockDbGlockIndex, lockDbGlock) - _ = serverProtocol.SetResultCallback(func(serverProtocol *MemWaiterServerProtocol, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error { + _ = serverProtocol.SetResultCallback(func(serverProtocol *MemWaiterServerProtocol, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8, data []byte) error { return self.lockLoaded(aofChannel, serverProtocol, command, result, lcount, lrcount) }) aofChannel.serverProtocol = serverProtocol @@ -1628,7 +1628,7 @@ func (self *Aof) lockLoaded(aofChannel *AofChannel, _ *MemWaiterServerProtocol, db := self.slock.replicationManager.GetOrNewAckDB(command.DbId) if db != nil { - return db.ProcessAcked(aofChannel.lockDbGlockIndex, command, result, lcount, lrcount) + return db.ProcessAckLocked(aofChannel.lockDbGlockIndex, command, result, lcount, lrcount) } return nil } diff --git a/server/protocol.go b/server/protocol.go index ff27d6a..b512d17 100755 --- a/server/protocol.go +++ b/server/protocol.go @@ -260,7 +260,7 @@ func (self *DefaultServerProtocol) FreeLockCommandLocked(command *protocol.LockC var defaultServerProtocol *DefaultServerProtocol = nil -type MemWaiterServerProtocolResultCallback func(*MemWaiterServerProtocol, *protocol.LockCommand, uint8, uint16, uint8) error +type MemWaiterServerProtocolResultCallback func(*MemWaiterServerProtocol, *protocol.LockCommand, uint8, uint16, uint8, []byte) error type MemWaiterServerProtocol struct { slock *SLock @@ -354,7 +354,7 @@ func (self *MemWaiterServerProtocol) ProcessLockCommand(lockCommand *protocol.Lo func (self *MemWaiterServerProtocol) ProcessLockResultCommand(command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8, data []byte) error { if self.resultCallback != nil { - return self.resultCallback(self, command, result, lcount, lrcount) + return self.resultCallback(self, command, result, lcount, lrcount, data) } self.glock.Lock() diff --git a/server/replication.go b/server/replication.go index ee2ec6f..6171126 100644 --- a/server/replication.go +++ b/server/replication.go @@ -1229,7 +1229,7 @@ func (self *ReplicationAckDB) PushUnLock(glockIndex uint16, lock *AofLock) error return nil } -func (self *ReplicationAckDB) Process(glockIndex uint16, aofLock *AofLock) error { +func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, aofLock *AofLock) error { requestId := aofLock.GetRequestId() self.ackGlocks[glockIndex].Lock() if lock, ok := self.locks[glockIndex][requestId]; ok { @@ -1311,15 +1311,19 @@ func (self *ReplicationAckDB) PushAckLock(glockIndex uint16, aofLock *AofLock) e requestId := aofLock.GetRequestId() self.ackGlocks[glockIndex].Lock() if ackLock, ok := self.ackLocks[glockIndex][requestId]; !ok { - self.glock.Lock() if self.freeAckLocksIndex > 0 { - self.freeAckLocksIndex-- - ackLock = self.freeAckLocks[self.freeAckLocksIndex] - self.glock.Unlock() - ackLock.locked = false - ackLock.aofed = false + self.glock.Lock() + if self.freeAckLocksIndex > 0 { + self.freeAckLocksIndex-- + ackLock = self.freeAckLocks[self.freeAckLocksIndex] + self.glock.Unlock() + ackLock.locked = false + ackLock.aofed = false + } else { + self.glock.Unlock() + ackLock = NewReplicationAckLock() + } } else { - self.glock.Unlock() ackLock = NewReplicationAckLock() } self.ackLocks[glockIndex][requestId] = ackLock @@ -1328,7 +1332,7 @@ func (self *ReplicationAckDB) PushAckLock(glockIndex uint16, aofLock *AofLock) e return nil } -func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error { +func (self *ReplicationAckDB) ProcessAckLocked(glockIndex uint16, command *protocol.LockCommand, result uint8, lcount uint16, lrcount uint8) error { self.ackGlocks[glockIndex].Lock() ackLock, ok := self.ackLocks[glockIndex][command.RequestId] if !ok { @@ -1348,7 +1352,6 @@ func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol. ackLock.lockResult.Lrcount = lrcount ackLock.lockResult.Rcount = command.Rcount ackLock.locked = true - if !ackLock.aofed { self.ackGlocks[glockIndex].Unlock() return nil @@ -1359,12 +1362,14 @@ func (self *ReplicationAckDB) ProcessAcked(glockIndex uint16, command *protocol. if self.manager.clientChannel != nil { _ = self.manager.clientChannel.HandleAcked(ackLock) } - self.glock.Lock() if self.freeAckLocksIndex < self.freeAckLocksMax { - self.freeAckLocks[self.freeAckLocksIndex] = ackLock - self.freeAckLocksIndex++ + self.glock.Lock() + if self.freeAckLocksIndex < self.freeAckLocksMax { + self.freeAckLocks[self.freeAckLocksIndex] = ackLock + self.freeAckLocksIndex++ + } + self.glock.Unlock() } - self.glock.Unlock() return nil } @@ -1373,14 +1378,18 @@ func (self *ReplicationAckDB) ProcessAckAofed(glockIndex uint16, aofLock *AofLoc self.ackGlocks[glockIndex].Lock() ackLock, ok := self.ackLocks[glockIndex][requestId] if !ok { - self.glock.Lock() if self.freeAckLocksIndex > 0 { - self.freeAckLocksIndex-- - ackLock = self.freeAckLocks[self.freeAckLocksIndex] - self.glock.Unlock() - ackLock.locked = false + self.glock.Lock() + if self.freeAckLocksIndex > 0 { + self.freeAckLocksIndex-- + ackLock = self.freeAckLocks[self.freeAckLocksIndex] + self.glock.Unlock() + ackLock.locked = false + } else { + self.glock.Unlock() + ackLock = NewReplicationAckLock() + } } else { - self.glock.Unlock() ackLock = NewReplicationAckLock() } self.ackLocks[glockIndex][requestId] = ackLock @@ -1388,7 +1397,6 @@ func (self *ReplicationAckDB) ProcessAckAofed(glockIndex uint16, aofLock *AofLoc ackLock.aofResult = aofLock.Result ackLock.aofed = true - if !ackLock.locked { self.ackGlocks[glockIndex].Unlock() return nil @@ -1399,12 +1407,14 @@ func (self *ReplicationAckDB) ProcessAckAofed(glockIndex uint16, aofLock *AofLoc if self.manager.clientChannel != nil { _ = self.manager.clientChannel.HandleAcked(ackLock) } - self.glock.Lock() if self.freeAckLocksIndex < self.freeAckLocksMax { - self.freeAckLocks[self.freeAckLocksIndex] = ackLock - self.freeAckLocksIndex++ + self.glock.Lock() + if self.freeAckLocksIndex < self.freeAckLocksMax { + self.freeAckLocks[self.freeAckLocksIndex] = ackLock + self.freeAckLocksIndex++ + } + self.glock.Unlock() } - self.glock.Unlock() return nil }