Skip to content

Commit

Permalink
Optimize Replication push aof lock
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 26, 2024
1 parent 4843b42 commit 13e4cba
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,7 @@ func (self *ReplicationManager) GetOrNewAckDB(dbId uint8) *ReplicationAckDB {
}

func (self *ReplicationManager) PushLock(glockIndex uint16, aofLock *AofLock) error {
if self.slock.state == STATE_LEADER && aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && aofLock.lock != nil {
if aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && self.slock.state == STATE_LEADER && aofLock.lock != nil {
db := self.GetOrNewAckDB(aofLock.DbId)
switch aofLock.CommandType {
case protocol.COMMAND_LOCK:
Expand All @@ -1865,7 +1865,7 @@ func (self *ReplicationManager) PushLock(glockIndex uint16, aofLock *AofLock) er
if aofLock.AofFlag&AOF_FLAG_CONTAINS_DATA != 0 {
err := self.bufferQueue.Push(buf, aofLock.data)
if err != nil {
if self.slock.state == STATE_LEADER && aofLock.CommandType == protocol.COMMAND_LOCK && aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && aofLock.lock != nil {
if aofLock.CommandType == protocol.COMMAND_LOCK && aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && self.slock.state == STATE_LEADER && aofLock.lock != nil {
db := self.slock.replicationManager.GetAckDB(aofLock.DbId)
if db != nil {
_ = db.ProcessLeaderPushUnLock(glockIndex, aofLock)
Expand All @@ -1876,7 +1876,7 @@ func (self *ReplicationManager) PushLock(glockIndex uint16, aofLock *AofLock) er
} else {
err := self.bufferQueue.Push(buf, nil)
if err != nil {
if self.slock.state == STATE_LEADER && aofLock.CommandType == protocol.COMMAND_LOCK && aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && aofLock.lock != nil {
if aofLock.CommandType == protocol.COMMAND_LOCK && aofLock.AofFlag&AOF_FLAG_REQUIRE_ACKED != 0 && self.slock.state == STATE_LEADER && aofLock.lock != nil {
db := self.slock.replicationManager.GetAckDB(aofLock.DbId)
if db != nil {
_ = db.ProcessLeaderPushUnLock(glockIndex, aofLock)
Expand All @@ -1895,7 +1895,7 @@ func (self *ReplicationManager) PushLock(glockIndex uint16, aofLock *AofLock) er
}

func (self *ReplicationManager) WakeupServerChannel() error {
if atomic.LoadUint32(&self.serverActiveCount) == self.serverCount {
if self.serverCount == 0 || atomic.LoadUint32(&self.serverActiveCount) == self.serverCount {
return nil
}
for _, channel := range self.serverChannels {
Expand Down

0 comments on commit 13e4cba

Please sign in to comment.