Skip to content

Commit

Permalink
change pool to store slice pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianyi Wang committed Nov 1, 2024
1 parent 734b574 commit ff114b7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
4 changes: 2 additions & 2 deletions feature/s3/transfermanager/api_op_PutObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,12 @@ func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), erro
return nil, 0, func() {}, err
}

n, err := readFillBuf(u.in.Body, part)
n, err := readFillBuf(u.in.Body, *part)

cleanup := func() {
u.partPool.Put(part)
}
return bytes.NewReader(part[0:n]), n, cleanup, err
return bytes.NewReader((*part)[0:n]), n, cleanup, err
}

func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
Expand Down
15 changes: 8 additions & 7 deletions feature/s3/transfermanager/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ import (
)

type bytesBufferPool interface {
Get(context.Context) ([]byte, error)
Put([]byte)
Get(context.Context) (*[]byte, error)
Put(*[]byte)
Close()
}

type defaultSlicePool struct {
slices chan []byte
slices chan *[]byte
}

func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool {
p := &defaultSlicePool{}

slices := make(chan []byte, capacity)
slices := make(chan *[]byte, capacity)
for i := 0; i < capacity; i++ {
slices <- make([]byte, sliceSize)
s := make([]byte, sliceSize)
slices <- &s
}

p.slices = slices
Expand All @@ -29,7 +30,7 @@ func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool {

var errZeroCapacity = fmt.Errorf("get called on zero capacity pool")

func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) {
func (p *defaultSlicePool) Get(ctx context.Context) (*[]byte, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -50,7 +51,7 @@ func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) {
}
}

func (p *defaultSlicePool) Put(bs []byte) {
func (p *defaultSlicePool) Put(bs *[]byte) {
p.slices <- bs
}

Expand Down
5 changes: 3 additions & 2 deletions feature/s3/transfermanager/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
func TestDefaultSlicePool(t *testing.T) {
pool := newDefaultSlicePool(1, 2)

var bs []byte
var bs *[]byte
var err error
var wg sync.WaitGroup

for i := 0; i < 200; i++ {
wg.Add(1)
go func() {
defer wg.Done()
pool.Put(bs)
bs := make([]byte, 1)
pool.Put(&bs)
}()
}
// wait for a slice to be put back
Expand Down

0 comments on commit ff114b7

Please sign in to comment.