diff --git a/server/aof.go b/server/aof.go index 4631963..9b34f73 100644 --- a/server/aof.go +++ b/server/aof.go @@ -158,11 +158,13 @@ type AofFile struct { dataFile *os.File mode int bufSize int - buf []byte rbuf *bufio.Reader wbuf []byte dlbuf []byte + drbuf *bufio.Reader + dwbuf []byte windex int + dwindex int size int dataSize int ackRequests [][]byte @@ -170,11 +172,11 @@ type AofFile struct { ackIndex int } -func NewAofFile(aof *Aof, filename string, mode int, buf_size int) *AofFile { - buf_size = buf_size - buf_size%64 - ackRequests := make([][]byte, buf_size/64) - return &AofFile{aof.slock, aof, filename, nil, nil, mode, buf_size, - make([]byte, 64), nil, nil, make([]byte, 4), 0, 0, +func NewAofFile(aof *Aof, filename string, mode int, bufSize int) *AofFile { + bufSize = bufSize - bufSize%64 + ackRequests := make([][]byte, bufSize/64) + return &AofFile{aof.slock, aof, filename, nil, nil, mode, bufSize, + nil, nil, make([]byte, 4), nil, nil, 0, 0, 0, 0, ackRequests, false, 0} } @@ -227,25 +229,23 @@ func (self *AofFile) Open() error { } func (self *AofFile) ReadHeader() error { - n, err := self.rbuf.Read(self.buf[:12]) + buf := make([]byte, 12) + n, err := self.rbuf.Read(buf) if err != nil { return err } - if n != 12 { return errors.New("File is not AOF FIle") } - - if string(self.buf[:8]) != "SLOCKAOF" { + if string(buf[:8]) != "SLOCKAOF" { return errors.New("File is not AOF File") } - version := uint16(self.buf[8]) | uint16(self.buf[9])<<8 + version := uint16(buf[8]) | uint16(buf[9])<<8 if version != 0x0001 { return errors.New("AOF File Unknown Version") } - - headerLen := uint16(self.buf[10]) | uint16(self.buf[11])<<8 + headerLen := uint16(buf[10]) | uint16(buf[11])<<8 if headerLen > 0 { n, err = self.rbuf.Read(make([]byte, headerLen)) if err != nil { @@ -260,13 +260,13 @@ func (self *AofFile) ReadHeader() error { } func (self *AofFile) WriteHeader() error { - self.buf[0], self.buf[1], self.buf[2], self.buf[3], self.buf[4], self.buf[5], self.buf[6], self.buf[7] = 'S', 'L', 'O', 'C', 'K', 'A', 'O', 'F' - self.buf[8], self.buf[9], self.buf[10], self.buf[11] = 0x01, 0x00, 0x00, 0x00 - n, err := self.file.Write(self.buf[:12]) + buf := make([]byte, 12) + buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7] = 'S', 'L', 'O', 'C', 'K', 'A', 'O', 'F' + buf[8], buf[9], buf[10], buf[11] = 0x01, 0x00, 0x00, 0x00 + n, err := self.file.Write(buf) if n != 12 || err != nil { return errors.New("write header error") } - self.size += 12 return nil } @@ -325,15 +325,18 @@ func (self *AofFile) ReadTail(lock *AofLock) error { n, err := self.rbuf.Read(buf) if err != nil { _, _ = self.file.Seek(12, io.SeekStart) + self.rbuf.Reset(self.file) return err } lockLen := uint16(buf[0]) | uint16(buf[1])<<8 if n != int(lockLen)+2 { _, _ = self.file.Seek(12, io.SeekStart) + self.rbuf.Reset(self.file) return errors.New("Lock Len error") } _, _ = self.file.Seek(12, io.SeekStart) + self.rbuf.Reset(self.file) return nil } @@ -341,16 +344,19 @@ func (self *AofFile) ReadLockData(lock *AofLock) error { if self.dataFile == nil { return errors.New("data file error") } + if self.drbuf == nil { + self.drbuf = bufio.NewReaderSize(self.dataFile, self.bufSize*64) + } buf := self.dlbuf if len(buf) != 4 { return errors.New("buf error") } - n, err := self.dataFile.Read(buf) + n, err := self.drbuf.Read(buf) if err != nil { return err } for n < 4 { - nn, nerr := self.dataFile.Read(buf[n:]) + nn, nerr := self.drbuf.Read(buf[n:]) if nerr != nil { return nerr } @@ -364,12 +370,12 @@ func (self *AofFile) ReadLockData(lock *AofLock) error { return nil } - n, err = self.dataFile.Read(aofLockData[4:]) + n, err = self.drbuf.Read(aofLockData[4:]) if err != nil { return err } for n < dataLen { - nn, nerr := self.dataFile.Read(aofLockData[n+4:]) + nn, nerr := self.drbuf.Read(aofLockData[n+4:]) if nerr != nil { return nerr } @@ -433,7 +439,24 @@ func (self *AofFile) WriteLockData(lock *AofLock) error { if self.dataFile == nil { return errors.New("data file error") } - dataLen := len(lock.data) + if self.dwbuf == nil { + self.dwbuf = make([]byte, self.bufSize*64) + } + dataLen, bufLen := len(lock.data), len(self.dwbuf) + if self.windex > 0 && dataLen <= bufLen-self.dwindex { + copy(self.dwbuf[self.dwindex:], lock.data) + self.dwindex += dataLen + self.dataSize += dataLen + return nil + } + + err := self.Flush() + if err != nil { + return err + } + if self.windex > 0 || self.dwindex > 0 { + return errors.New("write lock data error") + } n, err := self.dataFile.Write(lock.data) if err != nil { return err @@ -445,19 +468,17 @@ func (self *AofFile) WriteLockData(lock *AofLock) error { } n += nn } - self.dataSize += n + self.dataSize += dataLen return nil } func (self *AofFile) Flush() error { - if self.windex == 0 && self.ackIndex == 0 { - return nil - } - if self.file != nil { + if self.file != nil && self.windex > 0 { for tn := 0; tn < self.windex; { n, err := self.file.Write(self.wbuf[tn:self.windex]) if err != nil { self.windex = 0 + self.dwindex = 0 for i := 0; i < self.ackIndex; i++ { _ = self.aof.lockAcked(self.ackRequests[i], false) } @@ -470,6 +491,22 @@ func (self *AofFile) Flush() error { self.dirtied = true } + if self.dataFile != nil && self.dwindex > 0 { + for tn := 0; tn < self.dwindex; { + n, err := self.dataFile.Write(self.dwbuf[tn:self.dwindex]) + if err != nil { + self.dwindex = 0 + for i := 0; i < self.ackIndex; i++ { + _ = self.aof.lockAcked(self.ackRequests[i], false) + } + self.ackIndex = 0 + return err + } + tn += n + } + self.dwindex = 0 + } + for i := 0; i < self.ackIndex; i++ { _ = self.aof.lockAcked(self.ackRequests[i], true) } @@ -524,6 +561,8 @@ func (self *AofFile) Close() error { self.dataFile = nil self.wbuf = nil self.rbuf = nil + self.dwbuf = nil + self.drbuf = nil return err }