Skip to content

Commit

Permalink
Optimize info status information
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 27, 2024
1 parent b9f78e0 commit 22b3ac1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
20 changes: 13 additions & 7 deletions server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,24 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
continue
}

status := "sending"
if serverChannel.pulledState != 0 {
status = "pending"
}
var behindOffset uint64
if self.slock.replicationManager.bufferQueue.seq < self.slock.replicationManager.bufferQueue.seq {
behindOffset = 0xffffffffffffffff - serverChannel.bufferCursor.seq + self.slock.replicationManager.bufferQueue.seq
} else {
behindOffset = self.slock.replicationManager.bufferQueue.seq - serverChannel.bufferCursor.seq
}
infos = append(infos, fmt.Sprintf("follower%d:host=%s,aof_id=%x,behind_offset=%d,status=%s", i+1,
serverChannel.protocol.RemoteAddr().String(), serverChannel.bufferCursor.currentRequestId, behindOffset-1, status))
status := "sending"
if serverChannel.pulledState != 0 {
status = "pending"
}
aofFileSendFinish := "no"
if serverChannel.sendedFiles {
aofFileSendFinish = "yes"
}
state := serverChannel.state
infos = append(infos, fmt.Sprintf("follower%d:host=%s,aof_id=%x,behind_offset=%d,status=%s,push_count=%d,send_count=%d,ack_count=%d,send_data_size=%d,aof_file_send_finish=%s", i+1,
serverChannel.protocol.RemoteAddr().String(), serverChannel.bufferCursor.currentRequestId, behindOffset-1, status,
state.pushCount, state.sendCount, state.ackCount, state.sendDataSize, aofFileSendFinish))
}
} else {
infos = append(infos, "role:follower")
Expand All @@ -264,7 +270,7 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
infos = append(infos, "aof_file_recv_finish:no")
}
state := self.slock.replicationManager.clientChannel.state
infos = append(infos, fmt.Sprintf("load_count:%d", state.loadedCount))
infos = append(infos, fmt.Sprintf("load_count:%d", state.loadCount))
infos = append(infos, fmt.Sprintf("connect_count:%d", state.connectCount))
infos = append(infos, fmt.Sprintf("recv_count:%d", state.recvCount))
infos = append(infos, fmt.Sprintf("replay_count:%d", state.replayCount))
Expand Down
22 changes: 18 additions & 4 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (self *ReplicationBufferQueue) Search(requestId [16]byte, cursor *Replicati

type ReplicationClientState struct {
connectCount uint64
loadedCount uint64
loadCount uint64
recvCount uint64
recvDataSize uint64
replayCount uint64
Expand Down Expand Up @@ -648,7 +648,7 @@ func (self *ReplicationClient) recvFiles() error {
return err
}
}
self.state.loadedCount++
self.state.loadCount++

buf := self.aofLock.buf
self.currentRequestId[0], self.currentRequestId[1], self.currentRequestId[2], self.currentRequestId[3], self.currentRequestId[4], self.currentRequestId[5], self.currentRequestId[6], self.currentRequestId[7],
Expand Down Expand Up @@ -726,7 +726,7 @@ func (self *ReplicationClient) Process() error {
self.replayQueue <- aofLock
self.aofQueue <- aofLock
self.pushQueue <- aofLock
self.state.loadedCount++
self.state.loadCount++
self.rbufIndex++
if self.rbufIndex >= len(self.rbufs) {
self.rbufIndex = 0
Expand Down Expand Up @@ -928,6 +928,13 @@ func (self *ReplicationClient) WakeupRetryConnect() error {
return nil
}

type ReplicationServerState struct {
pushCount uint64
sendCount uint64
sendDataSize uint64
ackCount uint64
}

type ReplicationServer struct {
manager *ReplicationManager
stream *Stream
Expand All @@ -936,6 +943,7 @@ type ReplicationServer struct {
raofLock *AofLock
waofLock *AofLock
bufferCursor *ReplicationBufferQueueCursor
state *ReplicationServerState
pulledState uint32
pulledWaiter chan bool
wakeupedBuffer bool
Expand All @@ -946,9 +954,10 @@ type ReplicationServer struct {

func NewReplicationServer(manager *ReplicationManager, serverProtocol *BinaryServerProtocol) *ReplicationServer {
waofLock := NewAofLock()
state := &ReplicationServerState{0, 0, 0, 0}
return &ReplicationServer{manager, serverProtocol.stream, serverProtocol,
manager.slock.GetAof(), NewAofLock(), waofLock, NewReplicationBufferQueueCursor(waofLock.buf),
0, make(chan bool, 1), false, false, make(chan bool, 1), false}
state, 0, make(chan bool, 1), false, false, make(chan bool, 1), false}
}

func (self *ReplicationServer) Close() error {
Expand Down Expand Up @@ -1064,6 +1073,7 @@ func (self *ReplicationServer) sendFiles() error {
return true, err
}
}
self.state.pushCount++
return true, nil
})
if err != nil {
Expand Down Expand Up @@ -1147,9 +1157,12 @@ func (self *ReplicationServer) SendProcess() error {
atomic.AddUint32(&self.manager.serverActiveCount, 0xffffffff)
return err
}
self.state.sendDataSize += uint64(len(self.bufferCursor.data))
}
self.bufferCursor.writed = true
atomic.AddUint32(&self.bufferCursor.currentItem.pollIndex, 1)
self.state.pushCount++
self.state.sendCount++
}

err := bufferQueue.Pop(self.bufferCursor)
Expand Down Expand Up @@ -1217,6 +1230,7 @@ func (self *ReplicationServer) RecvProcess() error {
if err != nil {
return err
}
self.state.ackCount++
}
return io.EOF
}
Expand Down

0 comments on commit 22b3ac1

Please sign in to comment.