Skip to content

Commit

Permalink
Optimize unified naming rules and optimize log and status report outp…
Browse files Browse the repository at this point in the history
…ut information
  • Loading branch information
snower committed Apr 29, 2024
1 parent 9bbed7f commit 261a546
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 202 deletions.
12 changes: 6 additions & 6 deletions server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
if member.status == ARBITER_MEMBER_STATUS_ONLINE {
status = "online"
}
infos = append(infos, fmt.Sprintf("member%d:host=%s,weight=%d,arbiter=%s,role=%s,status=%s,self=%s,aof_id=%x,update=%d,delay=%.2f", i+1, member.host, member.weight,
arbiter, ROLE_NAMES[member.role], status, isself, aofId, member.lastUpdated/1e6, float64(member.lastDelay)/1e6))
infos = append(infos, fmt.Sprintf("member%d:host=%s,weight=%d,arbiter=%s,role=%s,status=%s,self=%s,aof_id=%s,update=%d,delay=%.2f", i+1, member.host, member.weight,
arbiter, ROLE_NAMES[member.role], status, isself, FormatAofId(aofId), member.lastUpdated/1e6, float64(member.lastDelay)/1e6))
}
infos = append(infos, "")
}
Expand All @@ -227,7 +227,7 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
if self.slock.state == STATE_LEADER {
infos = append(infos, "role:leader")
infos = append(infos, fmt.Sprintf("connected_followers:%d", len(self.slock.replicationManager.serverChannels)))
infos = append(infos, fmt.Sprintf("current_aof_id:%x", self.slock.replicationManager.currentRequestId))
infos = append(infos, fmt.Sprintf("current_aof_id:%s", FormatAofId(self.slock.replicationManager.currentAofId)))
infos = append(infos, fmt.Sprintf("current_offset:%d", self.slock.replicationManager.bufferQueue.seq))
for i, serverChannel := range self.slock.replicationManager.serverChannels {
if serverChannel.protocol == nil {
Expand All @@ -249,8 +249,8 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
aofFileSendFinish = "yes"
}
state := serverChannel.state
infos = append(infos, fmt.Sprintf("follower%d:host=%s,aof_id=%x,behind_offset=%d,status=%s,push_count=%d,send_count=%d,ack_count=%d,send_data_size=%d,aof_file_send_finish=%s", i+1,
serverChannel.protocol.RemoteAddr().String(), serverChannel.bufferCursor.currentRequestId, behindOffset-1, status,
infos = append(infos, fmt.Sprintf("follower%d:host=%s,aof_id=%s,behind_offset=%d,status=%s,push_count=%d,send_count=%d,ack_count=%d,send_data_size=%d,aof_file_send_finish=%s", i+1,
serverChannel.protocol.RemoteAddr().String(), FormatAofId(serverChannel.bufferCursor.currentAofId), behindOffset-1, status,
state.pushCount, state.sendCount, state.ackCount, state.sendDataSize, aofFileSendFinish))
}
} else {
Expand All @@ -263,7 +263,7 @@ func (self *Admin) commandHandleInfoCommand(serverProtocol *TextServerProtocol,
}

if self.slock.replicationManager.clientChannel != nil {
infos = append(infos, fmt.Sprintf("current_aof_id:%x", self.slock.replicationManager.clientChannel.currentRequestId))
infos = append(infos, fmt.Sprintf("current_aof_id:%s", FormatAofId(self.slock.replicationManager.clientChannel.currentAofId)))
if self.slock.replicationManager.clientChannel.recvedFiles {
infos = append(infos, "aof_file_recv_finish:yes")
} else {
Expand Down
92 changes: 53 additions & 39 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"github.com/snower/slock/protocol"
Expand Down Expand Up @@ -29,11 +30,26 @@ const AOF_FLAG_UPDATED = 0x0008
const AOF_FLAG_REQUIRE_ACKED = 0x1000
const AOF_FLAG_CONTAINS_DATA = 0x2000

func FormatAofId(aofId [16]byte) string {
return fmt.Sprintf("%x", [16]byte{aofId[7], aofId[6], aofId[5], aofId[4], aofId[3], aofId[2], aofId[1], aofId[0], aofId[15], aofId[14], aofId[13], aofId[12], aofId[11], aofId[10], aofId[9], aofId[8]})
}

func ParseAofId(aofIdString string) ([16]byte, error) {
if len(aofIdString) != 32 {
return [16]byte{}, errors.New("len error")
}
buf, err := hex.DecodeString(aofIdString)
if err != nil {
return [16]byte{}, err
}
return [16]byte{buf[7], buf[6], buf[5], buf[4], buf[3], buf[2], buf[1], buf[0], buf[15], buf[14], buf[13], buf[12], buf[11], buf[10], buf[9], buf[8]}, nil
}

type AofLock struct {
HandleType uint8
CommandType uint8
AofIndex uint32
AofId uint32
AofOffset uint32
CommandTime uint64
Flag uint8
DbId uint8
Expand Down Expand Up @@ -72,7 +88,7 @@ func (self *AofLock) Decode() error {

self.CommandType = buf[2]

self.AofId, self.AofIndex = uint32(buf[3])|uint32(buf[4])<<8|uint32(buf[5])<<16|uint32(buf[6])<<24, uint32(buf[7])|uint32(buf[8])<<8|uint32(buf[9])<<16|uint32(buf[10])<<24
self.AofOffset, self.AofIndex = uint32(buf[3])|uint32(buf[4])<<8|uint32(buf[5])<<16|uint32(buf[6])<<24, uint32(buf[7])|uint32(buf[8])<<8|uint32(buf[9])<<16|uint32(buf[10])<<24
self.CommandTime = uint64(buf[11]) | uint64(buf[12])<<8 | uint64(buf[13])<<16 | uint64(buf[14])<<24 | uint64(buf[15])<<32 | uint64(buf[16])<<40 | uint64(buf[17])<<48 | uint64(buf[18])<<56

self.Flag, self.DbId = buf[19], buf[20]
Expand Down Expand Up @@ -103,7 +119,7 @@ func (self *AofLock) Encode() error {

buf[2] = self.CommandType

buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10] = byte(self.AofId), byte(self.AofId>>8), byte(self.AofId>>16), byte(self.AofId>>24), byte(self.AofIndex), byte(self.AofIndex>>8), byte(self.AofIndex>>16), byte(self.AofIndex>>24)
buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10] = byte(self.AofOffset), byte(self.AofOffset>>8), byte(self.AofOffset>>16), byte(self.AofOffset>>24), byte(self.AofIndex), byte(self.AofIndex>>8), byte(self.AofIndex>>16), byte(self.AofIndex>>24)
buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18] = byte(self.CommandTime), byte(self.CommandTime>>8), byte(self.CommandTime>>16), byte(self.CommandTime>>24), byte(self.CommandTime>>32), byte(self.CommandTime>>40), byte(self.CommandTime>>48), byte(self.CommandTime>>56)

buf[19], buf[20] = self.Flag, self.DbId
Expand All @@ -126,28 +142,26 @@ func (self *AofLock) Encode() error {
return nil
}

func (self *AofLock) UpdateAofIndexId(aof_index uint32, aof_id uint32) error {
self.AofIndex = aof_index
self.AofId = aof_id
func (self *AofLock) UpdateAofId(aofIndex uint32, aofOffset uint32) error {
self.AofIndex = aofIndex
self.AofOffset = aofOffset

buf := self.buf
if len(buf) < 64 {
return errors.New("Buffer Len error")
}

buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10] = byte(aof_id), byte(aof_id>>8), byte(aof_id>>16), byte(aof_id>>24), byte(aof_index), byte(aof_index>>8), byte(aof_index>>16), byte(aof_index>>24)
buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10] = byte(aofOffset), byte(aofOffset>>8), byte(aofOffset>>16), byte(aofOffset>>24), byte(aofIndex), byte(aofIndex>>8), byte(aofIndex>>16), byte(aofIndex>>24)
return nil
}

func (self *AofLock) GetRequestId() [16]byte {
requestId := [16]byte{}
requestId[0], requestId[1], requestId[2], requestId[3], requestId[4], requestId[5], requestId[6], requestId[7] = byte(self.AofId), byte(self.AofId>>8), byte(self.AofId>>16), byte(self.AofId>>24), byte(self.AofIndex), byte(self.AofIndex>>8), byte(self.AofIndex>>16), byte(self.AofIndex>>24)
requestId[8], requestId[9], requestId[10], requestId[11], requestId[12], requestId[13], requestId[14], requestId[15] = byte(self.CommandTime), byte(self.CommandTime>>8), byte(self.CommandTime>>16), byte(self.CommandTime>>24), byte(self.CommandTime>>32), byte(self.CommandTime>>40), byte(self.CommandTime>>48), byte(self.CommandTime>>56)
return requestId
func (self *AofLock) GetAofId() [16]byte {
return [16]byte{byte(self.AofOffset), byte(self.AofOffset >> 8), byte(self.AofOffset >> 16), byte(self.AofOffset >> 24), byte(self.AofIndex), byte(self.AofIndex >> 8), byte(self.AofIndex >> 16), byte(self.AofIndex >> 24),
byte(self.CommandTime), byte(self.CommandTime >> 8), byte(self.CommandTime >> 16), byte(self.CommandTime >> 24), byte(self.CommandTime >> 32), byte(self.CommandTime >> 40), byte(self.CommandTime >> 48), byte(self.CommandTime >> 56)}
}

func (self *AofLock) SetRequestId(buf [16]byte) {
self.AofId, self.AofIndex = uint32(buf[0])|uint32(buf[1])<<8|uint32(buf[2])<<16|uint32(buf[3])<<24, uint32(buf[4])|uint32(buf[5])<<8|uint32(buf[6])<<16|uint32(buf[7])<<24
func (self *AofLock) SetAofId(buf [16]byte) {
self.AofOffset, self.AofIndex = uint32(buf[0])|uint32(buf[1])<<8|uint32(buf[2])<<16|uint32(buf[3])<<24, uint32(buf[4])|uint32(buf[5])<<8|uint32(buf[6])<<16|uint32(buf[7])<<24
self.CommandTime = uint64(buf[8]) | uint64(buf[9])<<8 | uint64(buf[10])<<16 | uint64(buf[11])<<24 | uint64(buf[12])<<32 | uint64(buf[13])<<40 | uint64(buf[14])<<48 | uint64(buf[15])<<56
}

Expand Down Expand Up @@ -713,7 +727,7 @@ func (self *AofChannel) Push(dbId uint8, lock *Lock, commandType uint8, lockComm
aofLock := self.getAofLock()
aofLock.CommandType = commandType
aofLock.AofIndex = 0
aofLock.AofId = 0
aofLock.AofOffset = 0
if lock.expriedTime > self.lockDb.currentTime {
aofLock.CommandTime = uint64(self.lockDb.currentTime)
} else {
Expand Down Expand Up @@ -839,7 +853,7 @@ func (self *AofChannel) AofAcked(buf []byte, succed bool) error {
func (self *AofChannel) Acked(commandResult *protocol.LockResultCommand) error {
aofLock := self.getAofLock()
aofLock.CommandType = commandResult.CommandType
aofLock.SetRequestId(commandResult.RequestId)
aofLock.SetAofId(commandResult.RequestId)
aofLock.Flag = commandResult.Flag
aofLock.DbId = commandResult.DbId
aofLock.LockId = commandResult.LockId
Expand Down Expand Up @@ -962,7 +976,7 @@ func (self *AofChannel) HandleLoad(aofLock *AofLock) {

lockCommand := self.serverProtocol.GetLockCommand()
lockCommand.CommandType = aofLock.CommandType
lockCommand.RequestId = aofLock.GetRequestId()
lockCommand.RequestId = aofLock.GetAofId()
lockCommand.Flag = aofLock.Flag | 0x04
lockCommand.DbId = aofLock.DbId
lockCommand.LockId = aofLock.LockId
Expand Down Expand Up @@ -1010,7 +1024,7 @@ func (self *AofChannel) HandleReplay(aofLock *AofLock) {

lockCommand := self.serverProtocol.GetLockCommand()
lockCommand.CommandType = aofLock.CommandType
lockCommand.RequestId = aofLock.GetRequestId()
lockCommand.RequestId = aofLock.GetAofId()
lockCommand.Flag = aofLock.Flag | 0x04
lockCommand.DbId = aofLock.DbId
lockCommand.LockId = aofLock.LockId
Expand Down Expand Up @@ -1092,6 +1106,7 @@ type Aof struct {
glock *sync.Mutex
dataDir string
aofFileIndex uint32
aofFileOffset uint32
aofFile *AofFile
aofGlock *sync.Mutex
replGlock *sync.Mutex
Expand All @@ -1105,17 +1120,16 @@ type Aof struct {
rewritedWaiter chan bool
rewriteSize uint32
aofLockCount uint64
aofFileId uint32
isWaitRewite bool
isRewriting bool
inited bool
closed bool
}

func NewAof() *Aof {
return &Aof{nil, &sync.Mutex{}, "", 1, nil, &sync.Mutex{}, &sync.Mutex{},
return &Aof{nil, &sync.Mutex{}, "", 1, 0, nil, &sync.Mutex{}, &sync.Mutex{},
make([]*AofChannel, 0), 0, 0, nil, make([]*AofLockQueue, 256),
&sync.Mutex{}, 0, nil, 0, 0, 0, false, false, false, false}
&sync.Mutex{}, 0, nil, 0, 0, false, false, false, false}
}

func (self *Aof) Init() ([16]byte, error) {
Expand Down Expand Up @@ -1147,18 +1161,18 @@ func (self *Aof) Init() ([16]byte, error) {
if err != io.EOF {
return [16]byte{}, err
}
self.aofFileId = 0
self.aofFileOffset = 0
} else {
self.aofFileId = aofLock.AofId
self.aofFileOffset = aofLock.AofOffset
}
self.slock.Log().Infof("Aof init current file %s.%d by id %d", "append.aof", self.aofFileIndex, self.aofFileId)
self.slock.Log().Infof("Aof init current file %s.%d by id %d", "append.aof", self.aofFileIndex, self.aofFileOffset)
}

_ = self.WaitFlushAofChannel()
self.inited = true
self.slock.Log().Infof("Aof init finish")
if aofLock != nil {
return aofLock.GetRequestId(), nil
return aofLock.GetAofId(), nil
}
return [16]byte{}, nil
}
Expand Down Expand Up @@ -1201,7 +1215,7 @@ func (self *Aof) LoadAndInit() error {
return true, lerr
}
if lock.AofIndex == self.aofFileIndex {
self.aofFileId = lock.AofId
self.aofFileOffset = lock.AofOffset
}
return true, nil
})
Expand Down Expand Up @@ -1268,7 +1282,7 @@ func (self *Aof) Load() error {
return nil
}

func (self *Aof) LoadMaxId() ([16]byte, error) {
func (self *Aof) LoadMaxAofId() ([16]byte, error) {
dataDir, err := filepath.Abs(Config.DataDir)
if err != nil {
return [16]byte{}, err
Expand All @@ -1286,13 +1300,13 @@ func (self *Aof) LoadMaxId() ([16]byte, error) {

aofLock := NewAofLock()
aofLock.AofIndex = 1
fileAofId := aofLock.GetRequestId()
fileAofId := aofLock.GetAofId()
if len(appendFiles) > 0 {
aofFileIndex, perr := strconv.ParseUint(appendFiles[len(appendFiles)-1][11:], 10, 64)
if perr == nil {
aofLock.AofIndex = uint32(aofFileIndex)
aofLock.AofId = 0
fileAofId = aofLock.GetRequestId()
aofLock.AofOffset = 0
fileAofId = aofLock.GetAofId()
}
}

Expand Down Expand Up @@ -1322,7 +1336,7 @@ func (self *Aof) LoadMaxId() ([16]byte, error) {
return fileAofId, err
}
_ = aofFile.Close()
return aofLock.GetRequestId(), nil
return aofLock.GetAofId(), nil
}
return fileAofId, nil
}
Expand Down Expand Up @@ -1355,8 +1369,8 @@ func (self *Aof) GetCurrentAofID() [16]byte {

aofLock := NewAofLock()
aofLock.AofIndex = self.aofFileIndex
aofLock.AofId = self.aofFileId
return aofLock.GetRequestId()
aofLock.AofOffset = self.aofFileOffset
return aofLock.GetAofId()
}

func (self *Aof) FindAofFiles() ([]string, string, error) {
Expand Down Expand Up @@ -1682,8 +1696,8 @@ func (self *Aof) PushLock(glockIndex uint16, aofLock *AofLock) {
return
}
}
self.aofFileId++
_ = aofLock.UpdateAofIndexId(self.aofFileIndex, self.aofFileId)
self.aofFileOffset++
_ = aofLock.UpdateAofId(self.aofFileIndex, self.aofFileOffset)

werr := self.aofFile.WriteLock(aofLock)
if werr == nil && aofLock.AofFlag&AOF_FLAG_CONTAINS_DATA != 0 {
Expand Down Expand Up @@ -1716,7 +1730,7 @@ func (self *Aof) AppendLock(aofLock *AofLock) bool {
self.aofGlock.Unlock()
return false
}
self.aofFileId = aofLock.AofId
self.aofFileOffset = aofLock.AofOffset
}

err := self.aofFile.WriteLock(aofLock)
Expand All @@ -1729,7 +1743,7 @@ func (self *Aof) AppendLock(aofLock *AofLock) bool {
self.slock.Log().Errorf("Aof append file write data error %v", err)
}
}
self.aofFileId = aofLock.AofId
self.aofFileOffset = aofLock.AofOffset
self.aofLockCount++
self.aofGlock.Unlock()
return self.isWaitRewite
Expand Down Expand Up @@ -1840,7 +1854,7 @@ func (self *Aof) Reset(aofFileIndex uint32, aofFileId uint32) error {
return err
}
self.aofFileIndex = aofFileIndex
self.aofFileId = aofFileId
self.aofFileOffset = aofFileId
self.slock.Log().Infof("Aof create current file %s.%d", "append.aof", aofFileIndex)
return nil
}
Expand All @@ -1867,7 +1881,7 @@ func (self *Aof) RewriteAofFile(startRewite bool) error {
}
self.aofFile = aofFile
self.aofFileIndex = aofFileIndex
self.aofFileId = 0
self.aofFileOffset = 0
self.slock.Log().Infof("Aof create current file %s.%d", "append.aof", aofFileIndex)

if startRewite {
Expand Down
26 changes: 11 additions & 15 deletions server/arbiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package server

import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"github.com/snower/slock/client"
Expand Down Expand Up @@ -1059,7 +1058,7 @@ func (self *ArbiterVoter) DoVote() error {

self.voteHost = selectVoteResponse.Host
self.voteAofId = self.manager.DecodeAofId(selectVoteResponse.AofId)
self.manager.slock.Log().Infof("Arbier voter do vote succed, host %s aof_id %x proposal_id %d", self.voteHost, self.voteAofId, self.proposalId)
self.manager.slock.Log().Infof("Arbier voter do vote succed, host %s aofId %s proposalId %d", self.voteHost, FormatAofId(self.voteAofId), self.proposalId)
return nil
}

Expand All @@ -1081,7 +1080,7 @@ func (self *ArbiterVoter) DoProposal() error {
self.manager.slock.Log().Errorf("Arbier voter do proposal fail")
return errors.New("member accept proposal count too small")
}
self.manager.slock.Log().Infof("Arbier voter do proposal succed, host %s aof_id %x proposal_id %d", self.voteHost, self.voteAofId, self.proposalId)
self.manager.slock.Log().Infof("Arbier voter do proposal succed, host %s aofId %s proposalId %d", self.voteHost, FormatAofId(self.voteAofId), self.proposalId)
return nil
}

Expand All @@ -1101,7 +1100,7 @@ func (self *ArbiterVoter) DoCommit() error {
return errors.New("member accept proposal count too small")
}

self.manager.slock.Log().Infof("Arbier voter do commit succed, host %s aof_id %x commit_id %d", self.voteHost, self.voteAofId, self.commitId)
self.manager.slock.Log().Infof("Arbier voter do commit succed, host %s aofId %s commitId %d", self.voteHost, FormatAofId(self.voteAofId), self.commitId)
return nil
}

Expand Down Expand Up @@ -1271,13 +1270,13 @@ func (self *ArbiterManager) Load() error {
return err
}

aofId, err := self.slock.aof.LoadMaxId()
aofId, err := self.slock.aof.LoadMaxAofId()
if err != nil {
self.slock.Log().Errorf("Arbiter load aof file maxid error %v", err)
self.slock.Log().Errorf("Arbiter load aof file maxAofId error %v", err)
return err
}
self.slock.Log().Infof("Arbiter load aof file maxid %x", aofId)
self.slock.replicationManager.currentRequestId = aofId
self.slock.Log().Infof("Arbiter load aof file maxAofId %s", FormatAofId(aofId))
self.slock.replicationManager.currentAofId = aofId
self.voter.proposalId = self.voter.commitId
return nil
}
Expand Down Expand Up @@ -1881,15 +1880,12 @@ func (self *ArbiterManager) GetCurrentAofID() [16]byte {
}

func (self *ArbiterManager) EncodeAofId(aofId [16]byte) string {
return fmt.Sprintf("%x", aofId)
return FormatAofId(aofId)
}

func (self *ArbiterManager) DecodeAofId(aofId string) [16]byte {
buf, err := hex.DecodeString(aofId)
if err != nil || len(buf) != 16 {
return [16]byte{}
}
return [16]byte{buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]}
func (self *ArbiterManager) DecodeAofId(aofIdString string) [16]byte {
aofId, _ := ParseAofId(aofIdString)
return aofId
}

func (self *ArbiterManager) CompareAofId(a [16]byte, b [16]byte) int {
Expand Down
Loading

0 comments on commit 261a546

Please sign in to comment.