Skip to content

Commit

Permalink
Various rclone related fixes/improvements (#1056)
Browse files Browse the repository at this point in the history
- refactor database code to reduce deadlocks + add test for triggering
deadlocks
- turn ETag into md5 hash and set it on objects in `backend.go` 
- Fix ListObjects not returning etag or modtime
  • Loading branch information
ChrisSchinnerl authored Mar 14, 2024
2 parents 0567309 + 1ee2f04 commit 7477f87
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 58 deletions.
10 changes: 9 additions & 1 deletion api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type (
// HeadObjectResponse is the response type for the HEAD /worker/object endpoint.
HeadObjectResponse struct {
ContentType string `json:"contentType"`
Etag string `json:"eTag"`
LastModified string `json:"lastModified"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
Expand Down Expand Up @@ -212,7 +213,8 @@ type (
}

HeadObjectOptions struct {
Range DownloadRange
IgnoreDelim bool
Range DownloadRange
}

DownloadObjectOptions struct {
Expand Down Expand Up @@ -310,6 +312,12 @@ func (opts DeleteObjectOptions) Apply(values url.Values) {
}
}

func (opts HeadObjectOptions) Apply(values url.Values) {
if opts.IgnoreDelim {
values.Set("ignoreDelim", "true")
}
}

func (opts HeadObjectOptions) ApplyHeaders(h http.Header) {
if opts.Range != (DownloadRange{}) {
if opts.Range.Length == -1 {
Expand Down
3 changes: 3 additions & 0 deletions internal/test/e2e/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestObjectMetadata(t *testing.T) {
}
if !reflect.DeepEqual(gor.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", gor.Metadata)
} else if gor.Etag == "" {
t.Fatal("missing etag")
}

// perform a HEAD request and assert the headers are all present
Expand All @@ -63,6 +65,7 @@ func TestObjectMetadata(t *testing.T) {
t.Fatal(err)
} else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{
ContentType: or.Object.ContentType(),
Etag: gor.Etag,
LastModified: or.Object.LastModified(),
Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))},
Size: int64(len(data)),
Expand Down
37 changes: 36 additions & 1 deletion internal/test/e2e/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package e2e
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -72,8 +74,12 @@ func TestS3Basic(t *testing.T) {

// add object to the bucket
data := frand.Bytes(10)
etag := md5.Sum(data)
uploadInfo, err := s3.PutObject(context.Background(), bucket, objPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
tt.OK(err)
if uploadInfo.ETag != hex.EncodeToString(etag[:]) {
t.Fatalf("expected ETag %v, got %v", hex.EncodeToString(etag[:]), uploadInfo.ETag)
}
busObject, err := cluster.Bus.Object(context.Background(), bucket, objPath, api.GetObjectOptions{})
tt.OK(err)
if busObject.Object == nil {
Expand All @@ -92,13 +98,19 @@ func TestS3Basic(t *testing.T) {
t.Fatal(err)
} else if !bytes.Equal(b, data) {
t.Fatal("data mismatch")
} else if info, err := obj.Stat(); err != nil {
t.Fatal(err)
} else if info.ETag != uploadInfo.ETag {
t.Fatal("unexpected ETag:", info.ETag, uploadInfo.ETag)
}

// stat object
info, err := s3.StatObject(context.Background(), bucket, objPath, minio.StatObjectOptions{})
tt.OK(err)
if info.Size != int64(len(data)) {
t.Fatal("size mismatch")
} else if info.ETag != uploadInfo.ETag {
t.Fatal("unexpected ETag:", info.ETag)
}

// add another bucket
Expand Down Expand Up @@ -487,6 +499,13 @@ func TestS3List(t *testing.T) {
if !cmp.Equal(test.want, got) {
t.Errorf("test %d: unexpected response, want %v got %v", i, test.want, got)
}
for _, obj := range result.Contents {
if obj.ETag == "" {
t.Fatal("expected non-empty ETag")
} else if obj.LastModified.IsZero() {
t.Fatal("expected non-zero LastModified")
}
}
}
}

Expand Down Expand Up @@ -580,12 +599,28 @@ func TestS3MultipartUploads(t *testing.T) {
}

// Download object
expectedData := []byte("helloworld!")
downloadedObj, err := s3.GetObject(context.Background(), "multipart", "foo", minio.GetObjectOptions{})
tt.OK(err)
if data, err := io.ReadAll(downloadedObj); err != nil {
t.Fatal(err)
} else if !bytes.Equal(data, []byte("helloworld!")) {
} else if !bytes.Equal(data, expectedData) {
t.Fatal("unexpected data:", string(data))
} else if info, err := downloadedObj.Stat(); err != nil {
t.Fatal(err)
} else if info.ETag != ui.ETag {
t.Fatal("unexpected ETag:", info.ETag)
} else if info.Size != int64(len(expectedData)) {
t.Fatal("unexpected size:", info.Size)
}

// Stat object
if info, err := s3.StatObject(context.Background(), "multipart", "foo", minio.StatObjectOptions{}); err != nil {
t.Fatal(err)
} else if info.ETag != ui.ETag {
t.Fatal("unexpected ETag:", info.ETag)
} else if info.Size != int64(len(expectedData)) {
t.Fatal("unexpected size:", info.Size)
}

// Download again with range request.
Expand Down
17 changes: 0 additions & 17 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -146,22 +145,6 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
28 changes: 21 additions & 7 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -268,7 +269,14 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range
res.Metadata["Content-Type"] = res.ContentType
res.Metadata["Last-Modified"] = res.LastModified

// etag to bytes
etag, err := hex.DecodeString(res.Etag)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}

return &gofakes3.Object{
Hash: etag,
Name: gofakes3.URLEncode(objectName),
Metadata: res.Metadata,
Size: res.Size,
Expand All @@ -287,9 +295,8 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range
// HeadObject should return a NotFound() error if the object does not
// exist.
func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*gofakes3.Object, error) {
res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{
IgnoreDelim: true,
OnlyMetadata: true,
res, err := s.w.HeadObject(ctx, bucketName, objectName, api.HeadObjectOptions{
IgnoreDelim: true,
})
if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) {
return nil, gofakes3.KeyNotFound(objectName)
Expand All @@ -299,18 +306,25 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go

// set user metadata
metadata := make(map[string]string)
for k, v := range res.Object.Metadata {
for k, v := range res.Metadata {
metadata[amazonMetadataPrefix+k] = v
}

// decorate metadata
metadata["Content-Type"] = res.Object.MimeType
metadata["Last-Modified"] = res.Object.LastModified()
metadata["Content-Type"] = res.ContentType
metadata["Last-Modified"] = res.LastModified

// etag to bytes
hash, err := hex.DecodeString(res.Etag)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}

return &gofakes3.Object{
Hash: hash,
Name: gofakes3.URLEncode(objectName),
Metadata: metadata,
Size: res.Object.Size,
Size: res.Size,
Contents: io.NopCloser(bytes.NewReader(nil)),
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type bus interface {

type worker interface {
GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error)
HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error)
UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error)
UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error)
}
Expand Down
54 changes: 38 additions & 16 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri
if force {
// delete potentially existing object at destination
if _, err := s.deleteObject(tx, bucket, keyNew); err != nil {
return err
return fmt.Errorf("RenameObject: failed to delete object: %w", err)
}
}
tx = tx.Exec(`UPDATE objects SET object_id = ? WHERE object_id = ? AND ?`, keyNew, keyOld, sqlWhereBucket("objects", bucket))
Expand Down Expand Up @@ -1539,6 +1539,13 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t

func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) {
err = s.retryTransaction(func(tx *gorm.DB) error {
if srcBucket != dstBucket || srcPath != dstPath {
_, err = s.deleteObject(tx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("CopyObject: failed to delete object: %w", err)
}
}

var srcObj dbObject
err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket).
Joins("DBBucket").
Expand All @@ -1565,10 +1572,6 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath
}
return tx.Save(&srcObj).Error
}
_, err = s.deleteObject(tx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}

var srcSlices []dbSlice
err = tx.Where("db_object_id = ?", srcObj.ID).
Expand Down Expand Up @@ -1708,12 +1711,6 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,

// UpdateObject is ACID.
return s.retryTransaction(func(tx *gorm.DB) error {
// Fetch contract set.
var cs dbContractSet
if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil {
return fmt.Errorf("contract set %v not found: %w", contractSet, err)
}

// Try to delete. We want to get rid of the object and its slices if it
// exists.
//
Expand All @@ -1726,22 +1723,24 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
// object's metadata before trying to recreate it
_, err := s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
return fmt.Errorf("UpdateObject: failed to delete object: %w", err)
}

// Insert a new object.
objKey, err := o.Key.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal object key: %w", err)
}
// fetch bucket id
var bucketID uint
err = tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket).
err = s.db.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket).
Take(&bucketID).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("bucket %v not found: %w", bucket, api.ErrBucketNotFound)
} else if err != nil {
return fmt.Errorf("failed to fetch bucket id: %w", err)
}

obj := dbObject{
DBBucketID: bucketID,
ObjectID: path,
Expand All @@ -1755,6 +1754,12 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
return fmt.Errorf("failed to create object: %w", err)
}

// Fetch contract set.
var cs dbContractSet
if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil {
return fmt.Errorf("contract set %v not found: %w", contractSet, err)
}

// Fetch the used contracts.
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
Expand All @@ -1780,7 +1785,10 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error {
var err error
err = s.retryTransaction(func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, key)
return err
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
return nil
})
if err != nil {
return err
Expand Down Expand Up @@ -2722,7 +2730,21 @@ AND slabs.db_buffered_slab_id IS NULL
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) {
tx = tx.Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
// check if the object exists first to avoid unnecessary locking for the
// common case
var objID uint
resp := tx.Model(&dbObject{}).
Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
Select("id").
Limit(1).
Scan(&objID)
if err := resp.Error; err != nil {
return 0, err
} else if resp.RowsAffected == 0 {
return 0, nil
}

tx = tx.Where("id", objID).
Delete(&dbObject{})
if tx.Error != nil {
return 0, tx.Error
Expand Down Expand Up @@ -2872,7 +2894,7 @@ func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sort
}
var rows []rawObjectMetadata
if err := s.db.
Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as mimeType, o.created_at as ModTime").
Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as MimeType, o.created_at as ModTime, o.etag as ETag").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Expand Down
Loading

0 comments on commit 7477f87

Please sign in to comment.