Skip to content

Commit

Permalink
Optimize net conn and add buffered read performance
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Apr 19, 2024
1 parent 98df70c commit 7146549
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 26 deletions.
37 changes: 24 additions & 13 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func (self *StreamReaderBuffer) Read(buf []byte) int {
if self.index >= self.len {
return 0
}

bufSize, size := self.len-self.index, len(buf)
if bufSize > size {
copy(buf, self.buf[self.index:self.index+size])
self.index += size
return size
}

copy(buf[:bufSize], self.buf[self.index:self.len])
self.index, self.len = 0, 0
return bufSize
Expand All @@ -53,7 +51,6 @@ func (self *StreamReaderBuffer) ReadBytesSize(size int) []byte {
self.index += size
return buf
}

buf := self.buf[self.index:self.len]
self.index, self.len = 0, 0
return buf
Expand All @@ -62,7 +59,9 @@ func (self *StreamReaderBuffer) ReadBytesSize(size int) []byte {
func (self *StreamReaderBuffer) ReadFromConn(conn net.Conn, size int) (int, error) {
bufSize := self.len - self.index
freeSize := self.cap - bufSize
if freeSize <= 0 {
if bufSize <= 0 {
self.index, self.len = 0, 0
} else if freeSize <= 0 {
return bufSize, nil
}
readConnSize := size - bufSize
Expand Down Expand Up @@ -111,15 +110,19 @@ func (self *Stream) ReadBytes(buf []byte) (int, error) {
}
bufLen := len(buf)
if bufLen <= self.readerBuffer.GetSize() {
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
} else if bufLen <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, bufLen)
if err != nil {
return 0, err
}
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
}

n := self.readerBuffer.Read(buf)
Expand Down Expand Up @@ -148,14 +151,18 @@ func (self *Stream) ReadBytesSize(size int) ([]byte, error) {
return nil, io.EOF
}
if size <= self.readerBuffer.GetSize() {
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
} else if size <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, size)
if err != nil {
return nil, err
}
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
}

Expand Down Expand Up @@ -217,7 +224,9 @@ func (self *Stream) ReadSize(size int) ([]byte, error) {
return nil, io.EOF
}
if size <= self.readerBuffer.GetSize() {
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
} else if size <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, -1)
Expand Down Expand Up @@ -251,8 +260,10 @@ func (self *Stream) Read(buf []byte) (int, error) {
}
bufLen := len(buf)
if bufLen <= self.readerBuffer.GetSize() {
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
} else if bufLen <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, -1)
if err != nil {
Expand Down
37 changes: 24 additions & 13 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ func (self *StreamReaderBuffer) Read(buf []byte) int {
if self.index >= self.len {
return 0
}

bufSize, size := self.len-self.index, len(buf)
if bufSize > size {
copy(buf, self.buf[self.index:self.index+size])
self.index += size
return size
}

copy(buf[:bufSize], self.buf[self.index:self.len])
self.index, self.len = 0, 0
return bufSize
Expand All @@ -62,7 +60,6 @@ func (self *StreamReaderBuffer) ReadBytesSize(size int) []byte {
self.index += size
return buf
}

buf := self.buf[self.index:self.len]
self.index, self.len = 0, 0
return buf
Expand All @@ -71,7 +68,9 @@ func (self *StreamReaderBuffer) ReadBytesSize(size int) []byte {
func (self *StreamReaderBuffer) ReadFromConn(conn net.Conn, size int) (int, error) {
bufSize := self.len - self.index
freeSize := self.cap - bufSize
if freeSize <= 0 {
if bufSize <= 0 {
self.index, self.len = 0, 0
} else if freeSize <= 0 {
return bufSize, nil
}
readConnSize := size - bufSize
Expand Down Expand Up @@ -129,15 +128,19 @@ func (self *Stream) ReadBytes(buf []byte) (int, error) {
}
bufLen := len(buf)
if bufLen <= self.readerBuffer.GetSize() {
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
} else if bufLen <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, bufLen)
if err != nil {
return 0, err
}
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
}

n := self.readerBuffer.Read(buf)
Expand Down Expand Up @@ -166,14 +169,18 @@ func (self *Stream) ReadBytesSize(size int) ([]byte, error) {
return nil, io.EOF
}
if size <= self.readerBuffer.GetSize() {
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
} else if size <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, size)
if err != nil {
return nil, err
}
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
}

Expand Down Expand Up @@ -235,7 +242,9 @@ func (self *Stream) ReadSize(size int) ([]byte, error) {
return nil, io.EOF
}
if size <= self.readerBuffer.GetSize() {
buf := self.readerBuffer.ReadBytesSize(size)
index := self.readerBuffer.index + size
buf := self.readerBuffer.buf[self.readerBuffer.index:index]
self.readerBuffer.index = index
return buf, nil
} else if size <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, -1)
Expand Down Expand Up @@ -269,8 +278,10 @@ func (self *Stream) Read(buf []byte) (int, error) {
}
bufLen := len(buf)
if bufLen <= self.readerBuffer.GetSize() {
n := self.readerBuffer.Read(buf)
return n, nil
index := self.readerBuffer.index + bufLen
copy(buf, self.readerBuffer.buf[self.readerBuffer.index:index])
self.readerBuffer.index = index
return bufLen, nil
} else if bufLen <= self.readerBuffer.GetCapSize() {
_, err := self.readerBuffer.ReadFromConn(self.conn, -1)
if err != nil {
Expand Down

0 comments on commit 7146549

Please sign in to comment.