Skip to content

Commit

Permalink
Optimize not to resend all files when the aof file data has not chang…
Browse files Browse the repository at this point in the history
…ed each time it is started.
  • Loading branch information
snower committed Apr 27, 2024
1 parent 7ce0212 commit 988ac10
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 61 deletions.
180 changes: 139 additions & 41 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,26 @@ func NewAofFile(aof *Aof, filename string, mode int, bufSize int) *AofFile {

func (self *AofFile) Open() error {
mode := self.mode
dataFilename := fmt.Sprintf("%s.%s", self.filename, "dat")
if mode == os.O_WRONLY {
mode |= os.O_CREATE
mode |= os.O_TRUNC
mode |= os.O_APPEND

fileinfo, err := os.Stat(self.filename)
if err == nil {
self.size = int(fileinfo.Size())
}
fileinfo, err = os.Stat(dataFilename)
if err == nil {
self.dataSize = int(fileinfo.Size())
}
}

file, err := os.OpenFile(self.filename, mode, 0644)
if err != nil {
return err
}
dataFile, err := os.OpenFile(fmt.Sprintf("%s.%s", self.filename, "dat"), mode, 0644)
dataFile, err := os.OpenFile(dataFilename, mode, 0644)
if err != nil && self.mode == os.O_WRONLY {
_ = file.Close()
return err
Expand All @@ -199,18 +210,26 @@ func (self *AofFile) Open() error {
self.file = file
self.dataFile = dataFile
if self.mode == os.O_WRONLY {
self.wbuf = make([]byte, self.bufSize)
err = self.WriteHeader()
if self.size == 0 {
err = self.WriteHeader()
} else if self.size < 12 {
err = self.file.Truncate(0)
if err == nil {
err = self.WriteHeader()
}
} else {
err = nil
}
if err != nil {
_ = self.file.Close()
if self.dataFile != nil {
_ = self.dataFile.Close()
}
self.file = nil
self.dataFile = nil
self.wbuf = nil
return err
}
self.wbuf = make([]byte, self.bufSize)
} else {
self.rbuf = bufio.NewReaderSize(self.file, self.bufSize)
err = self.ReadHeader()
Expand Down Expand Up @@ -306,37 +325,26 @@ func (self *AofFile) ReadTail(lock *AofLock) error {
if self.file == nil {
return errors.New("File Unopen")
}

buf := lock.GetBuf()
if len(buf) < 64 {
return errors.New("Buffer Len error")
}

stat, err := self.file.Stat()
fileinfo, err := self.file.Stat()
if err != nil {
return err
}

if stat.Size() < 76 {
fileSize := fileinfo.Size()
if fileSize < 76 {
return io.EOF
}

_, _ = self.file.Seek(64, io.SeekEnd)
n, err := self.rbuf.Read(buf)
n, err := self.file.ReadAt(buf, fileSize-64)
if err != nil {
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return err
}

lockLen := uint16(buf[0]) | uint16(buf[1])<<8
if n != int(lockLen)+2 {
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return errors.New("Lock Len error")
}
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return nil
}

Expand Down Expand Up @@ -443,17 +451,24 @@ func (self *AofFile) WriteLockData(lock *AofLock) error {
self.dwbuf = make([]byte, self.bufSize*64)
}
dataLen, bufLen := len(lock.data), len(self.dwbuf)
if self.windex > 0 && dataLen <= bufLen-self.dwindex {
copy(self.dwbuf[self.dwindex:], lock.data)
self.dwindex += dataLen
self.dataSize += dataLen
return nil
if self.windex > 0 {
if dataLen <= bufLen-self.dwindex {
copy(self.dwbuf[self.dwindex:], lock.data)
self.dwindex += dataLen
self.dataSize += dataLen
return nil
}
err := self.Flush()
if err != nil {
return err
}
} else if self.dwindex > 0 {
err := self.Flush()
if err != nil {
return err
}
}

err := self.Flush()
if err != nil {
return err
}
if self.windex > 0 || self.dwindex > 0 {
return errors.New("write lock data error")
}
Expand Down Expand Up @@ -1133,23 +1148,49 @@ func NewAof() *Aof {
&sync.Mutex{}, 0, nil, 0, 0, 0, false, false, false}
}

func (self *Aof) Init() error {
func (self *Aof) Init() ([16]byte, error) {
self.rewriteSize = uint32(Config.AofFileRewriteSize)
dataDir, err := filepath.Abs(Config.DataDir)
if err != nil {
return err
return [16]byte{}, err
}

self.dataDir = dataDir
if _, serr := os.Stat(self.dataDir); os.IsNotExist(serr) {
return serr
return [16]byte{}, serr
}
self.slock.Log().Infof("Aof config data dir %s", self.dataDir)

appendFiles, _, ferr := self.FindAofFiles()
if ferr != nil {
return [16]byte{}, ferr
}
var aofLock *AofLock = nil
if len(appendFiles) > 0 {
aofFileIndex, perr := strconv.ParseInt(appendFiles[len(appendFiles)-1][11:], 10, 64)
if perr != nil {
return [16]byte{}, perr
}
self.aofFileIndex = uint32(aofFileIndex)
aofLock, err = self.LoadFileMaxAofLock(fmt.Sprintf("%s.%d", "append.aof", self.aofFileIndex))
if err != nil {
if err != io.EOF {
return [16]byte{}, err
}
self.aofId = 0
} else {
self.aofId = aofLock.AofId
}
self.slock.Log().Infof("Aof init current file %s.%d by id %d", "append.aof", self.aofFileIndex, self.aofId)
}

_ = self.WaitFlushAofChannel()
self.inited = true
self.slock.Log().Infof("Aof init finish")
return nil
if aofLock != nil {
return aofLock.GetRequestId(), nil
}
return [16]byte{}, nil
}

func (self *Aof) LoadAndInit() error {
Expand All @@ -1169,7 +1210,6 @@ func (self *Aof) LoadAndInit() error {
if err != nil {
return err
}

if len(appendFiles) > 0 {
aofFileIndex, perr := strconv.ParseInt(appendFiles[len(appendFiles)-1][11:], 10, 64)
if perr != nil {
Expand All @@ -1178,15 +1218,48 @@ func (self *Aof) LoadAndInit() error {
self.aofFileIndex = uint32(aofFileIndex)
}

self.aofFile = NewAofFile(self, filepath.Join(self.dataDir, fmt.Sprintf("%s.%d", "append.aof", self.aofFileIndex+1)), os.O_WRONLY, int(Config.AofFileBufferSize))
aofFilenames := make([]string, 0)
if rewriteFile != "" {
aofFilenames = append(aofFilenames, rewriteFile)
}
aofFilenames = append(aofFilenames, appendFiles...)
err = self.LoadAofFiles(aofFilenames, time.Now().Unix(), func(filename string, aofFile *AofFile, lock *AofLock, firstLock bool) (bool, error) {
lerr := self.LoadLock(lock)
if lerr != nil {
return true, lerr
}
return true, nil
})
if err != nil {
return err
}
self.slock.Log().Infof("Aof loaded files %v", aofFilenames)

self.aofFile = NewAofFile(self, filepath.Join(self.dataDir, fmt.Sprintf("%s.%d", "append.aof", self.aofFileIndex)), os.O_WRONLY, int(Config.AofFileBufferSize))
err = self.aofFile.Open()
if err != nil {
self.aofFile = nil
return err
}
self.aofFileIndex++
self.slock.Log().Infof("Aof create current file %s.%d", "append.aof", self.aofFileIndex)

_ = self.WaitFlushAofChannel()
if len(appendFiles) > 0 {
go self.rewriteAofFiles()
}
self.inited = true
self.slock.Log().Infof("Aof init finish")
return nil
}

func (self *Aof) LoadFiles() error {
if self.aofFile != nil {
return nil
}
appendFiles, rewriteFile, err := self.FindAofFiles()
if err != nil {
return err
}
aofFilenames := make([]string, 0)
if rewriteFile != "" {
aofFilenames = append(aofFilenames, rewriteFile)
Expand All @@ -1204,12 +1277,16 @@ func (self *Aof) LoadAndInit() error {
}
self.slock.Log().Infof("Aof loaded files %v", aofFilenames)

_ = self.WaitFlushAofChannel()
if len(appendFiles) > 0 {
go self.rewriteAofFiles()
self.aofFile = NewAofFile(self, filepath.Join(self.dataDir, fmt.Sprintf("%s.%d", "append.aof", self.aofFileIndex)), os.O_WRONLY, int(Config.AofFileBufferSize))
err = self.aofFile.Open()
if err != nil {
self.aofFile = nil
return err
}
self.inited = true
self.slock.Log().Infof("Aof init finish")
self.slock.Log().Infof("Aof open current file %s.%d", "append.aof", self.aofFileIndex)

_ = self.WaitFlushAofChannel()
self.slock.Log().Infof("Aof load finish")
return nil
}

Expand Down Expand Up @@ -1272,6 +1349,27 @@ func (self *Aof) LoadMaxId() ([16]byte, error) {
return fileAofId, nil
}

func (self *Aof) LoadFileMaxAofLock(filename string) (*AofLock, error) {
aofFile := NewAofFile(self, filepath.Join(self.dataDir, filename), os.O_RDONLY, int(Config.AofFileBufferSize))
err := aofFile.Open()
if err != nil {
return nil, err
}
aofLock := NewAofLock()
err = aofFile.ReadTail(aofLock)
if err != nil {
_ = aofFile.Close()
return nil, err
}
err = aofLock.Decode()
if err != nil {
_ = aofFile.Close()
return nil, err
}
_ = aofFile.Close()
return aofLock, nil
}

func (self *Aof) GetCurrentAofID() [16]byte {
if !self.inited {
return [16]byte{}
Expand Down
41 changes: 27 additions & 14 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (self *ReplicationBufferQueue) Search(requestId [16]byte, cursor *Replicati
currentItem := self.tailItem
if currentItem == nil {
self.glock.RUnlock()
return errors.New("search error")
return io.EOF
}

for currentItem != nil {
Expand All @@ -284,7 +284,7 @@ func (self *ReplicationBufferQueue) Search(requestId [16]byte, cursor *Replicati
cursor.currentRequestId[8], cursor.currentRequestId[9], cursor.currentRequestId[10], cursor.currentRequestId[11], cursor.currentRequestId[12], cursor.currentRequestId[13], cursor.currentRequestId[14], cursor.currentRequestId[15] = buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10],
buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18]
cursor.seq = currentItem.seq
cursor.writed = false
cursor.writed = true
self.glock.RUnlock()
return nil
}
Expand Down Expand Up @@ -489,6 +489,10 @@ func (self *ReplicationClient) InitSync() error {
}

if self.aofLock != nil {
err = self.aof.LoadFiles()
if err != nil {
return err
}
err = self.sendStarted()
if err != nil {
return err
Expand Down Expand Up @@ -1015,18 +1019,27 @@ func (self *ReplicationServer) handleInitSync(command *protocol.CallCommand) (*p
if buf[4] == 0 && buf[5] == 0 && buf[6] == 0 && buf[7] == 0 {
return protocol.NewCallResultCommand(command, 0, "ERR_NOT_FOUND", nil), nil
}
initedAofId := [16]byte{buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]}
serr := self.manager.bufferQueue.Search(initedAofId, self.bufferCursor)
initedRequestId := [16]byte{buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]}
var requestId string
serr := self.manager.bufferQueue.Search(initedRequestId, self.bufferCursor)
if serr != nil {
return protocol.NewCallResultCommand(command, 0, "ERR_NOT_FOUND", nil), nil
if initedRequestId != self.manager.currentRequestId {
return protocol.NewCallResultCommand(command, 0, "ERR_NOT_FOUND", nil), nil
}
self.bufferCursor.currentRequestId = self.manager.currentRequestId
self.bufferCursor.currentItem = nil
self.bufferCursor.seq = self.manager.bufferQueue.seq
self.bufferCursor.writed = true
requestId = fmt.Sprintf("%x", initedRequestId)
} else {
self.waofLock.buf = self.bufferCursor.buf
err = self.waofLock.Decode()
if err != nil {
return protocol.NewCallResultCommand(command, 0, "ERR_ENCODE", nil), nil
}
requestId = fmt.Sprintf("%x", self.waofLock.GetRequestId())
}

self.waofLock.buf = self.bufferCursor.buf
err = self.waofLock.Decode()
if err != nil {
return protocol.NewCallResultCommand(command, 0, "ERR_ENCODE", nil), nil
}
requestId := fmt.Sprintf("%x", self.waofLock.GetRequestId())
response := protobuf.SyncResponse{AofId: requestId}
data, err := proto.Marshal(&response)
if err != nil {
Expand Down Expand Up @@ -1660,14 +1673,13 @@ func (self *ReplicationManager) GetHandlers() map[string]TextServerProtocolComma
return handlers
}

func (self *ReplicationManager) Init(leaderAddress string) error {
func (self *ReplicationManager) Init(leaderAddress string, requestId [16]byte) error {
self.leaderAddress = leaderAddress
self.currentRequestId = requestId
self.slock.Log().Infof("Replication aof ring buffer init size %d", int(Config.AofRingBufferSize))
if self.slock.state == STATE_LEADER {
self.currentRequestId = self.slock.aof.GetCurrentAofID()
self.slock.Log().Infof("Replication init leader %x", self.currentRequestId)
} else {
self.currentRequestId = [16]byte{}
_ = self.transparencyManager.ChangeLeader(leaderAddress)
self.slock.Log().Infof("Replication init follower %s %x", leaderAddress, self.currentRequestId)
}
Expand Down Expand Up @@ -2061,6 +2073,7 @@ func (self *ReplicationManager) SwitchToFollower(address string) error {
return nil
}

self.currentRequestId = self.slock.aof.GetCurrentAofID()
err := self.StartSync()
if err != nil {
return err
Expand Down
Loading

0 comments on commit 988ac10

Please sign in to comment.