Skip to content

Commit

Permalink
filestore, s3store: Handle errors when closing files
Browse files Browse the repository at this point in the history
Fixes #698
  • Loading branch information
Acconut committed Aug 23, 2023
1 parent a3fa09c commit f3e28ed
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
20 changes: 16 additions & 4 deletions pkg/filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,17 @@ func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.R
if err != nil {
return 0, err
}
defer file.Close()
// Avoid the use of defer file.Close() here to ensure no errors are lost
// See https://github.com/tus/tusd/issues/698.

n, err := io.Copy(file, src)

upload.info.Offset += n
return n, err
if err != nil {
file.Close()
return n, err
}

return n, file.Close()
}

func (upload *fileUpload) GetReader(ctx context.Context) (io.ReadCloser, error) {
Expand All @@ -187,7 +192,14 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.U
if err != nil {
return err
}
defer file.Close()
defer func() {
// Ensure that close error is propagated, if it occurs.
// See https://github.com/tus/tusd/issues/698.
cerr := file.Close()
if err == nil {
err = cerr
}
}()

for _, partialUpload := range uploads {
fileUpload := partialUpload.(*fileUpload)
Expand Down
12 changes: 8 additions & 4 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,9 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
upload.parts = append(upload.parts, part)

wg.Add(1)
go func(file io.ReadSeeker, part *s3Part, closePart func()) {
go func(file io.ReadSeeker, part *s3Part, closePart func() error) {
defer upload.store.releaseUploadSemaphore()
defer wg.Done()
defer closePart()

t := time.Now()
uploadPartInput := &s3.UploadPartInput{
Expand All @@ -519,17 +518,22 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
} else {
part.etag = etag
}
if cerr := closePart(); cerr != nil && uploadErr == nil {
uploadErr = cerr
}
}(partfile, part, closePart)
} else {
wg.Add(1)
go func(file io.ReadSeeker, closePart func()) {
go func(file io.ReadSeeker, closePart func() error) {
defer upload.store.releaseUploadSemaphore()
defer wg.Done()
defer closePart()

if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil {
uploadErr = err
}
if cerr := closePart(); cerr != nil && uploadErr == nil {
uploadErr = cerr
}
upload.incompletePartSize = partsize
}(partfile, closePart)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/s3store/s3store_part_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type s3PartProducer struct {

type fileChunk struct {
reader io.ReadSeeker
closeReader func()
closeReader func() error
size int64
}

Expand Down Expand Up @@ -116,9 +116,11 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {

return fileChunk{
reader: file,
closeReader: func() {
file.Close()
os.Remove(file.Name())
closeReader: func() error {
if err := file.Close(); err != nil {
return err
}
return os.Remove(file.Name())
},
size: n,
}, true, nil
Expand Down Expand Up @@ -148,7 +150,7 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
return fileChunk{
// buf does not get written to anymore, so we can turn it into a reader
reader: bytes.NewReader(buf.Bytes()),
closeReader: func() {},
closeReader: func() error { return nil },
size: n,
}, true, nil
}
Expand Down

0 comments on commit f3e28ed

Please sign in to comment.