Skip to content

Commit

Permalink
Optimize aof file and add lock data read and write buffer synchroniza…
Browse files Browse the repository at this point in the history
…tion to ensure that lock and data are flushed to disk at the same time
  • Loading branch information
snower committed Apr 27, 2024
1 parent a549d59 commit 7ce0212
Showing 1 changed file with 66 additions and 27 deletions.
93 changes: 66 additions & 27 deletions server/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,25 @@ type AofFile struct {
dataFile *os.File
mode int
bufSize int
buf []byte
rbuf *bufio.Reader
wbuf []byte
dlbuf []byte
drbuf *bufio.Reader
dwbuf []byte
windex int
dwindex int
size int
dataSize int
ackRequests [][]byte
dirtied bool
ackIndex int
}

func NewAofFile(aof *Aof, filename string, mode int, buf_size int) *AofFile {
buf_size = buf_size - buf_size%64
ackRequests := make([][]byte, buf_size/64)
return &AofFile{aof.slock, aof, filename, nil, nil, mode, buf_size,
make([]byte, 64), nil, nil, make([]byte, 4), 0, 0,
func NewAofFile(aof *Aof, filename string, mode int, bufSize int) *AofFile {
bufSize = bufSize - bufSize%64
ackRequests := make([][]byte, bufSize/64)
return &AofFile{aof.slock, aof, filename, nil, nil, mode, bufSize,
nil, nil, make([]byte, 4), nil, nil, 0, 0, 0,
0, ackRequests, false, 0}
}

Expand Down Expand Up @@ -227,25 +229,23 @@ func (self *AofFile) Open() error {
}

func (self *AofFile) ReadHeader() error {
n, err := self.rbuf.Read(self.buf[:12])
buf := make([]byte, 12)
n, err := self.rbuf.Read(buf)
if err != nil {
return err
}

if n != 12 {
return errors.New("File is not AOF FIle")
}

if string(self.buf[:8]) != "SLOCKAOF" {
if string(buf[:8]) != "SLOCKAOF" {
return errors.New("File is not AOF File")
}

version := uint16(self.buf[8]) | uint16(self.buf[9])<<8
version := uint16(buf[8]) | uint16(buf[9])<<8
if version != 0x0001 {
return errors.New("AOF File Unknown Version")
}

headerLen := uint16(self.buf[10]) | uint16(self.buf[11])<<8
headerLen := uint16(buf[10]) | uint16(buf[11])<<8
if headerLen > 0 {
n, err = self.rbuf.Read(make([]byte, headerLen))
if err != nil {
Expand All @@ -260,13 +260,13 @@ func (self *AofFile) ReadHeader() error {
}

func (self *AofFile) WriteHeader() error {
self.buf[0], self.buf[1], self.buf[2], self.buf[3], self.buf[4], self.buf[5], self.buf[6], self.buf[7] = 'S', 'L', 'O', 'C', 'K', 'A', 'O', 'F'
self.buf[8], self.buf[9], self.buf[10], self.buf[11] = 0x01, 0x00, 0x00, 0x00
n, err := self.file.Write(self.buf[:12])
buf := make([]byte, 12)
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7] = 'S', 'L', 'O', 'C', 'K', 'A', 'O', 'F'
buf[8], buf[9], buf[10], buf[11] = 0x01, 0x00, 0x00, 0x00
n, err := self.file.Write(buf)
if n != 12 || err != nil {
return errors.New("write header error")
}

self.size += 12
return nil
}
Expand Down Expand Up @@ -325,32 +325,38 @@ func (self *AofFile) ReadTail(lock *AofLock) error {
n, err := self.rbuf.Read(buf)
if err != nil {
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return err
}

lockLen := uint16(buf[0]) | uint16(buf[1])<<8
if n != int(lockLen)+2 {
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return errors.New("Lock Len error")
}
_, _ = self.file.Seek(12, io.SeekStart)
self.rbuf.Reset(self.file)
return nil
}

func (self *AofFile) ReadLockData(lock *AofLock) error {
if self.dataFile == nil {
return errors.New("data file error")
}
if self.drbuf == nil {
self.drbuf = bufio.NewReaderSize(self.dataFile, self.bufSize*64)
}
buf := self.dlbuf
if len(buf) != 4 {
return errors.New("buf error")
}
n, err := self.dataFile.Read(buf)
n, err := self.drbuf.Read(buf)
if err != nil {
return err
}
for n < 4 {
nn, nerr := self.dataFile.Read(buf[n:])
nn, nerr := self.drbuf.Read(buf[n:])
if nerr != nil {
return nerr
}
Expand All @@ -364,12 +370,12 @@ func (self *AofFile) ReadLockData(lock *AofLock) error {
return nil
}

n, err = self.dataFile.Read(aofLockData[4:])
n, err = self.drbuf.Read(aofLockData[4:])
if err != nil {
return err
}
for n < dataLen {
nn, nerr := self.dataFile.Read(aofLockData[n+4:])
nn, nerr := self.drbuf.Read(aofLockData[n+4:])
if nerr != nil {
return nerr
}
Expand Down Expand Up @@ -433,7 +439,24 @@ func (self *AofFile) WriteLockData(lock *AofLock) error {
if self.dataFile == nil {
return errors.New("data file error")
}
dataLen := len(lock.data)
if self.dwbuf == nil {
self.dwbuf = make([]byte, self.bufSize*64)
}
dataLen, bufLen := len(lock.data), len(self.dwbuf)
if self.windex > 0 && dataLen <= bufLen-self.dwindex {
copy(self.dwbuf[self.dwindex:], lock.data)
self.dwindex += dataLen
self.dataSize += dataLen
return nil
}

err := self.Flush()
if err != nil {
return err
}
if self.windex > 0 || self.dwindex > 0 {
return errors.New("write lock data error")
}
n, err := self.dataFile.Write(lock.data)
if err != nil {
return err
Expand All @@ -445,19 +468,17 @@ func (self *AofFile) WriteLockData(lock *AofLock) error {
}
n += nn
}
self.dataSize += n
self.dataSize += dataLen
return nil
}

func (self *AofFile) Flush() error {
if self.windex == 0 && self.ackIndex == 0 {
return nil
}
if self.file != nil {
if self.file != nil && self.windex > 0 {
for tn := 0; tn < self.windex; {
n, err := self.file.Write(self.wbuf[tn:self.windex])
if err != nil {
self.windex = 0
self.dwindex = 0
for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], false)
}
Expand All @@ -470,6 +491,22 @@ func (self *AofFile) Flush() error {
self.dirtied = true
}

if self.dataFile != nil && self.dwindex > 0 {
for tn := 0; tn < self.dwindex; {
n, err := self.dataFile.Write(self.dwbuf[tn:self.dwindex])
if err != nil {
self.dwindex = 0
for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], false)
}
self.ackIndex = 0
return err
}
tn += n
}
self.dwindex = 0
}

for i := 0; i < self.ackIndex; i++ {
_ = self.aof.lockAcked(self.ackRequests[i], true)
}
Expand Down Expand Up @@ -524,6 +561,8 @@ func (self *AofFile) Close() error {
self.dataFile = nil
self.wbuf = nil
self.rbuf = nil
self.dwbuf = nil
self.drbuf = nil
return err
}

Expand Down

0 comments on commit 7ce0212

Please sign in to comment.