Skip to content

Commit

Permalink
Optimize aof flush execution logic
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 24, 2024
1 parent dab797e commit 21dd050
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 57 deletions.
79 changes: 37 additions & 42 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,28 +448,26 @@ func (self *AofFile) WriteLockData(lock *AofLock) error {
}

func (self *AofFile) Flush() error {
if self.windex == 0 {
if self.windex == 0 && self.ackIndex == 0 {
return nil
}
if self.file == nil {
return errors.New("File Unopen")
}

tn := 0
for tn < self.windex {
n, err := self.file.Write(self.wbuf[tn:self.windex])
if err != nil {
self.windex = 0
for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], false)
if self.file != nil {
tn := 0
for tn < self.windex {
n, err := self.file.Write(self.wbuf[tn:self.windex])
if err != nil {
self.windex = 0
for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], false)
}
self.ackIndex = 0
return err
}
self.ackIndex = 0
return err
tn += n
}
tn += n
self.windex = 0
self.dirtied = true
}
self.windex = 0
self.dirtied = true

for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], true)
Expand All @@ -482,16 +480,14 @@ func (self *AofFile) Sync() error {
if !self.dirtied {
return nil
}
if self.file == nil {
return errors.New("File Unopen")
}

err := self.file.Sync()
if err != nil {
return err
if self.file != nil {
err := self.file.Sync()
if err != nil {
return err
}
}
if self.dataFile != nil {
err = self.dataFile.Sync()
err := self.dataFile.Sync()
if err != nil {
return err
}
Expand Down Expand Up @@ -1433,11 +1429,9 @@ func (self *Aof) waitLockAofChannel(_ *AofChannel) {

self.aofGlock.Lock()
if self.aofFile != nil {
if self.aofFile.windex > 0 && self.aofFile.ackIndex > 0 {
err := self.aofFile.Flush()
if err != nil {
self.slock.Log().Errorf("Aof flush file error %v", err)
}
err := self.aofFile.Flush()
if err != nil {
self.slock.Log().Errorf("Aof flush file error %v", err)
}
}
if self.channelFlushWaiter != nil {
Expand All @@ -1453,9 +1447,7 @@ func (self *Aof) syncFileAofChannel(_ *AofChannel) {
}

self.aofGlock.Lock()
if self.aofFile.windex > 0 || self.aofFile.dirtied {
self.Flush()
}
self.Flush()
self.aofGlock.Unlock()
}

Expand Down Expand Up @@ -1618,15 +1610,19 @@ func (self *Aof) lockLoaded(aofChannel *AofChannel, _ *MemWaiterServerProtocol,
}

func (self *Aof) Flush() {
err := self.aofFile.Flush()
if err != nil {
self.slock.Log().Errorf("Aof flush file error %v", err)
if self.aofFile == nil {
return
}
err = self.aofFile.Sync()
if err != nil {
self.slock.Log().Errorf("Aof Sync file error %v", err)
return
if self.aofFile.windex > 0 || self.aofFile.dirtied || self.aofFile.ackIndex > 0 {
err := self.aofFile.Flush()
if err != nil {
self.slock.Log().Errorf("Aof flush file error %v", err)
return
}
err = self.aofFile.Sync()
if err != nil {
self.slock.Log().Errorf("Aof Sync file error %v", err)
}
}
}

Expand Down Expand Up @@ -1657,7 +1653,6 @@ func (self *Aof) Reset(aofFileIndex uint32) error {

if self.aofFile != nil {
self.Flush()

err := self.aofFile.Close()
if err != nil {
self.slock.Log().Errorf("Aof close file %s.%d error %v", "append.aof", self.aofFileIndex, err)
Expand Down Expand Up @@ -1705,11 +1700,11 @@ func (self *Aof) Reset(aofFileIndex uint32) error {
func (self *Aof) RewriteAofFile() error {
if self.aofFile != nil {
self.Flush()

err := self.aofFile.Close()
if err != nil {
self.slock.Log().Errorf("Aof close file %s.%d error %v", "append.aof", self.aofFileIndex, err)
}
self.aofFile = nil
}

aofFilename := fmt.Sprintf("%s.%d", "append.aof", self.aofFileIndex+1)
Expand Down
20 changes: 5 additions & 15 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,27 +696,21 @@ func (self *ReplicationClient) getLock() error {
case aofLock := <-self.rbufChannel:
if aofLock == nil {
self.aof.aofGlock.Lock()
if self.aof.aofFile.windex > 0 || self.aof.aofFile.dirtied {
self.aof.Flush()
}
self.aof.Flush()
self.aof.aofGlock.Unlock()
return io.EOF
}

self.aofLock = aofLock
return nil
default:
if self.closed {
self.aof.aofGlock.Lock()
if self.aof.aofFile.windex > 0 || self.aof.aofFile.dirtied {
self.aof.Flush()
}
self.aof.Flush()
self.aof.aofGlock.Unlock()
return io.EOF
}

self.aof.aofGlock.Lock()
if self.aof.aofFile.windex > 0 && self.aof.aofFile.ackIndex > 0 {
if self.aof.aofFile != nil {
err := self.aof.aofFile.Flush()
if err != nil {
self.manager.slock.Log().Errorf("Replication flush file error %v", err)
Expand All @@ -728,9 +722,7 @@ func (self *ReplicationClient) getLock() error {
case aofLock := <-self.rbufChannel:
if aofLock == nil {
self.aof.aofGlock.Lock()
if self.aof.aofFile.windex > 0 || self.aof.aofFile.dirtied {
self.aof.Flush()
}
self.aof.Flush()
self.aof.aofGlock.Unlock()
return io.EOF
}
Expand All @@ -739,9 +731,7 @@ func (self *ReplicationClient) getLock() error {
return nil
case <-time.After(200 * time.Millisecond):
self.aof.aofGlock.Lock()
if self.aof.aofFile.windex > 0 || self.aof.aofFile.dirtied {
self.aof.Flush()
}
self.aof.Flush()
self.aof.aofGlock.Unlock()
aofLock := <-self.rbufChannel
if aofLock == nil {
Expand Down

0 comments on commit 21dd050

Please sign in to comment.