Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve locking for deleting multipart uploads in parallel #1022

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions stores/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,32 @@ func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object stri

func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error {
return s.retryTransaction(func(tx *gorm.DB) error {
// Find multipart upload.
var mu dbMultipartUpload
err := tx.Where("upload_id = ?", uploadID).
Preload("Parts").
Joins("DBBucket").
Take(&mu).
Error
if err != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", err)
}
if mu.ObjectID != path {
// Check object id.
return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound)
} else if mu.DBBucket.Name != bucket {
// Check bucket name.
return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound)
}
err = tx.Delete(&mu).Error
if err != nil {
return fmt.Errorf("failed to delete multipart upload: %w", err)
// delete multipart upload optimistically
res := tx.
Where("upload_id", uploadID).
Where("object_id", path).
Where("db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", bucket).
Delete(&dbMultipartUpload{})
if res.Error != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", res.Error)
}
// if the upload wasn't found, find out why
if res.RowsAffected == 0 {
var mu dbMultipartUpload
err := tx.Where("upload_id = ?", uploadID).
Joins("DBBucket").
Take(&mu).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return api.ErrMultipartUploadNotFound
} else if err != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", err)
} else if mu.ObjectID != path {
return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound)
} else if mu.DBBucket.Name != bucket {
return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound)
}
return errors.New("failed to delete multipart upload for unknown reason")
}
// Prune the slabs.
if err := pruneSlabs(tx); err != nil {
Expand Down
Loading