Skip to content

Commit

Permalink
Migrate multipart methods to raw SQL (#1262)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 5, 2024
1 parent d410e6c commit dace166
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 214 deletions.
13 changes: 0 additions & 13 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,19 +1766,6 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set
return slabs, nil
}

func (s *SQLStore) createMultipartMetadata(tx *gorm.DB, multipartUploadID uint, metadata api.ObjectUserMetadata) error {
entities := make([]*dbObjectUserMetadata, 0, len(metadata))
for k, v := range metadata {
metadata := &dbObjectUserMetadata{
DBMultipartUploadID: &multipartUploadID,
Key: k,
Value: v,
}
entities = append(entities, metadata)
}
return tx.CreateInBatches(&entities, 1000).Error
}

// object retrieves an object from the store.
func (s *SQLStore) object(tx *gorm.DB, bucket, path string) (api.Object, error) {
// fetch raw object data
Expand Down
209 changes: 21 additions & 188 deletions stores/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package stores

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"sort"
"unicode/utf8"

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
sql "go.sia.tech/renterd/stores/sql"
"gorm.io/gorm"
"lukechampine.com/frand"
)

type (
Expand Down Expand Up @@ -49,44 +43,14 @@ func (dbMultipartPart) TableName() string {
}

func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error) {
// Marshal key
key, err := ec.MarshalBinary()
var uploadID string
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
uploadID, err = tx.InsertMultipartUpload(ctx, bucket, path, ec, mimeType, metadata)
return
})
if err != nil {
return api.MultipartCreateResponse{}, err
}
var uploadID string
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
// Get bucket id.
var bucketID uint
err := tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket).
Take(&bucketID).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("bucket %v not found: %w", bucket, api.ErrBucketNotFound)
} else if err != nil {
return fmt.Errorf("failed to fetch bucket id: %w", err)
}

// Create multipart upload
uploadIDEntropy := frand.Entropy256()
uploadID = hex.EncodeToString(uploadIDEntropy[:])
multipartUpload := dbMultipartUpload{
DBBucketID: bucketID,
Key: key,
UploadID: uploadID,
ObjectID: path,
MimeType: mimeType,
}
if err := tx.Create(&multipartUpload).Error; err != nil {
return fmt.Errorf("failed to create multipart upload: %w", err)
}

// Create multipart metadata
if err := s.createMultipartMetadata(tx, multipartUpload.ID, metadata); err != nil {
return fmt.Errorf("failed to create multipart metadata: %w", err)
}

return nil
})
return api.MultipartCreateResponse{
UploadID: uploadID,
}, err
Expand All @@ -99,155 +63,38 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS
}

func (s *SQLStore) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, err error) {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
var dbUpload dbMultipartUpload
err := tx.
Model(&dbMultipartUpload{}).
Joins("DBBucket").
Where("upload_id", uploadID).
Take(&dbUpload).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return api.ErrMultipartUploadNotFound
} else if err != nil {
return err
}
resp, err = dbUpload.convert()
return err
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
resp, err = tx.MultipartUpload(ctx, uploadID)
return
})
return
}

func (s *SQLStore) MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, limit int) (resp api.MultipartListUploadsResponse, err error) {
limitUsed := limit > 0
if !limitUsed {
limit = math.MaxInt64
} else {
limit++
}

// both markers must be used together
if (keyMarker == "" && uploadIDMarker != "") || (keyMarker != "" && uploadIDMarker == "") {
return api.MultipartListUploadsResponse{}, errors.New("both keyMarker and uploadIDMarker must be set or neither")
}
markerExpr := exprTRUE
if keyMarker != "" {
markerExpr = gorm.Expr("object_id > ? OR (object_id = ? AND upload_id > ?)", keyMarker, keyMarker, uploadIDMarker)
}

prefixExpr := exprTRUE
if prefix != "" {
prefixExpr = gorm.Expr("SUBSTR(object_id, 1, ?) = ?", utf8.RuneCountInString(prefix), prefix)
}

err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
var dbUploads []dbMultipartUpload
err := tx.
Model(&dbMultipartUpload{}).
Joins("DBBucket").
Where("DBBucket.name", bucket).
Where("?", markerExpr).
Where("?", prefixExpr).
Order("object_id ASC, upload_id ASC").
Limit(limit).
Find(&dbUploads).
Error
if err != nil {
return err
}
// Check if there are more uploads beyond 'limit'.
if limitUsed && len(dbUploads) == int(limit) {
resp.HasMore = true
dbUploads = dbUploads[:len(dbUploads)-1]
resp.NextPathMarker = dbUploads[len(dbUploads)-1].ObjectID
resp.NextUploadIDMarker = dbUploads[len(dbUploads)-1].UploadID
}
for _, upload := range dbUploads {
u, err := upload.convert()
if err != nil {
return err
}
resp.Uploads = append(resp.Uploads, u)
}
return nil
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
resp, err = tx.MultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, limit)
return
})
return
}

func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) {
limitUsed := limit > 0
if !limitUsed {
limit = math.MaxInt64
} else {
limit++
}

err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
var dbParts []dbMultipartPart
err := tx.
Model(&dbMultipartPart{}).
Joins("INNER JOIN multipart_uploads mus ON mus.id = multipart_parts.db_multipart_upload_id").
Joins("INNER JOIN buckets b ON b.name = ? AND b.id = mus.db_bucket_id", bucket).
Where("mus.object_id = ? AND mus.upload_id = ? AND part_number > ?", object, uploadID, marker).
Order("part_number ASC").
Limit(int(limit)).
Find(&dbParts).
Error
if err != nil {
return err
}
// Check if there are more parts beyond 'limit'.
if limitUsed && len(dbParts) == int(limit) {
resp.HasMore = true
dbParts = dbParts[:len(dbParts)-1]
resp.NextMarker = dbParts[len(dbParts)-1].PartNumber
}
for _, part := range dbParts {
resp.Parts = append(resp.Parts, api.MultipartListPartItem{
PartNumber: part.PartNumber,
LastModified: api.TimeRFC3339(part.CreatedAt.UTC()),
ETag: part.Etag,
Size: int64(part.Size),
})
}
return nil
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
resp, err = tx.MultipartUploadParts(ctx, bucket, object, uploadID, marker, limit)
return
})
return resp, err
}

func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
// delete multipart upload optimistically
res := tx.
Where("upload_id", uploadID).
Where("object_id", path).
Where("db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", bucket).
Delete(&dbMultipartUpload{})
if res.Error != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", res.Error)
}
// if the upload wasn't found, find out why
if res.RowsAffected == 0 {
var mu dbMultipartUpload
err := tx.Where("upload_id = ?", uploadID).
Joins("DBBucket").
Take(&mu).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return api.ErrMultipartUploadNotFound
} else if err != nil {
return fmt.Errorf("failed to fetch multipart upload: %w", err)
} else if mu.ObjectID != path {
return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound)
} else if mu.DBBucket.Name != bucket {
return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound)
}
return errors.New("failed to delete multipart upload for unknown reason")
}
// Prune the dangling slabs.
s.triggerSlabPruning()
return nil
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.AbortMultipartUpload(ctx, bucket, path, uploadID)
})
if err != nil {
return err
}
s.triggerSlabPruning()
return nil
}

func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) {
Expand Down Expand Up @@ -288,17 +135,3 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str
ETag: eTag,
}, nil
}

func (u dbMultipartUpload) convert() (api.MultipartUpload, error) {
var key object.EncryptionKey
if err := key.UnmarshalBinary(u.Key); err != nil {
return api.MultipartUpload{}, fmt.Errorf("failed to unmarshal key: %w", err)
}
return api.MultipartUpload{
Bucket: u.DBBucket.Name,
Key: key,
Path: u.ObjectID,
UploadID: u.UploadID,
CreatedAt: api.TimeRFC3339(u.CreatedAt.UTC()),
}, nil
}
19 changes: 19 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type (
}

DatabaseTx interface {
// AbortMultipartUpload aborts a multipart upload and deletes it from
// the database.
AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error

// AddMultipartPart adds a part to an unfinished multipart upload.
AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices object.SlabSlices) error

Expand All @@ -52,6 +56,10 @@ type (
// the bucket already exists, api.ErrBucketExists is returned.
CreateBucket(ctx context.Context, bucket string, policy api.BucketPolicy) error

// InsertMultipartUpload creates a new multipart upload and returns a
// unique upload ID.
InsertMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error)

// DeleteBucket deletes a bucket. If the bucket isn't empty, it returns
// api.ErrBucketNotEmpty. If the bucket doesn't exist, it returns
// api.ErrBucketNotFound.
Expand All @@ -74,6 +82,17 @@ type (
// MakeDirsForPath creates all directories for a given object's path.
MakeDirsForPath(ctx context.Context, path string) (int64, error)

// MultipartUpload returns the multipart upload with the given ID or
// api.ErrMultipartUploadNotFound if the upload doesn't exist.
MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error)

// MultipartUploadParts returns a list of all parts for a given
// multipart upload
MultipartUploadParts(ctx context.Context, bucket, key, uploadID string, marker int, limit int64) (api.MultipartListPartsResponse, error)

// MultipartUploads returns a list of all multipart uploads.
MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, limit int) (api.MultipartListUploadsResponse, error)

// PruneEmptydirs prunes any directories that are empty.
PruneEmptydirs(ctx context.Context) error

Expand Down
Loading

0 comments on commit dace166

Please sign in to comment.