Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various rclone related fixes/improvements #1056

Merged
merged 22 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0fd50d4
stores: delete objects first
ChrisSchinnerl Mar 12, 2024
0f60a16
stores: add context wo when deleteObject fails
ChrisSchinnerl Mar 12, 2024
401be07
stores: TestUpdateObjectParallel
ChrisSchinnerl Mar 12, 2024
47fafa0
stores: use dir instead of cfg.dir in newTestSQLStore
ChrisSchinnerl Mar 12, 2024
83f1e64
stores: fix TestBucketObjects
ChrisSchinnerl Mar 12, 2024
d4e4ed8
stores: fix config for mysql
ChrisSchinnerl Mar 12, 2024
2610af2
stores: move pruning of slabs out of transaction in UpdateObject
ChrisSchinnerl Mar 12, 2024
239f01a
stores: fix TestSQLMetadataStore
ChrisSchinnerl Mar 12, 2024
508624d
stores: reorder UpdateObject
ChrisSchinnerl Mar 12, 2024
c3b3a4f
stores: check if object exists before deleting
ChrisSchinnerl Mar 12, 2024
dc65a4a
stores: fetch bucket id within txn again
ChrisSchinnerl Mar 13, 2024
9afe29b
stores: overwrite object instead of deleting it
ChrisSchinnerl Mar 13, 2024
6490dd8
stores: fix TestSQLMetadataStore
ChrisSchinnerl Mar 13, 2024
3351f91
s3: extend tests to check etag on GetObject and HeadObject and fix ne…
ChrisSchinnerl Mar 13, 2024
08cecc9
stores: fix TestSQLMetadattaStore
ChrisSchinnerl Mar 13, 2024
b7f373b
Merge branch 'dev' into chris/rclone-fixes
ChrisSchinnerl Mar 13, 2024
c5334a5
stores: only run TestUpdateObjectParallel against MySQL
ChrisSchinnerl Mar 13, 2024
7e7c14e
e2e: check etag for exact md5 value
ChrisSchinnerl Mar 13, 2024
e27b6f8
e2e: fix TestObjectMetadata
ChrisSchinnerl Mar 13, 2024
5a4cc47
e2e: fix TestS3List
ChrisSchinnerl Mar 13, 2024
148bf94
Merge branch 'dev' into chris/rclone-fixes
ChrisSchinnerl Mar 13, 2024
1ee2f04
stores: fix ListObjects not returning etag or mimetype
ChrisSchinnerl Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type (
// HeadObjectResponse is the response type for the HEAD /worker/object endpoint.
HeadObjectResponse struct {
ContentType string `json:"contentType"`
Etag string `json:"eTag"`
LastModified string `json:"lastModified"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
Expand Down Expand Up @@ -212,7 +213,8 @@ type (
}

HeadObjectOptions struct {
Range DownloadRange
IgnoreDelim bool
Range DownloadRange
}

DownloadObjectOptions struct {
Expand Down Expand Up @@ -310,6 +312,12 @@ func (opts DeleteObjectOptions) Apply(values url.Values) {
}
}

func (opts HeadObjectOptions) Apply(values url.Values) {
if opts.IgnoreDelim {
values.Set("ignoreDelim", "true")
}
}

func (opts HeadObjectOptions) ApplyHeaders(h http.Header) {
if opts.Range != (DownloadRange{}) {
if opts.Range.Length == -1 {
Expand Down
3 changes: 3 additions & 0 deletions internal/test/e2e/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestObjectMetadata(t *testing.T) {
}
if !reflect.DeepEqual(gor.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", gor.Metadata)
} else if gor.Etag == "" {
t.Fatal("missing etag")
}

// perform a HEAD request and assert the headers are all present
Expand All @@ -63,6 +65,7 @@ func TestObjectMetadata(t *testing.T) {
t.Fatal(err)
} else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{
ContentType: or.Object.ContentType(),
Etag: gor.Etag,
LastModified: or.Object.LastModified(),
Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))},
Size: int64(len(data)),
Expand Down
37 changes: 36 additions & 1 deletion internal/test/e2e/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package e2e
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -72,8 +74,12 @@ func TestS3Basic(t *testing.T) {

// add object to the bucket
data := frand.Bytes(10)
etag := md5.Sum(data)
uploadInfo, err := s3.PutObject(context.Background(), bucket, objPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
tt.OK(err)
if uploadInfo.ETag != hex.EncodeToString(etag[:]) {
t.Fatalf("expected ETag %v, got %v", hex.EncodeToString(etag[:]), uploadInfo.ETag)
}
busObject, err := cluster.Bus.Object(context.Background(), bucket, objPath, api.GetObjectOptions{})
tt.OK(err)
if busObject.Object == nil {
Expand All @@ -92,13 +98,19 @@ func TestS3Basic(t *testing.T) {
t.Fatal(err)
} else if !bytes.Equal(b, data) {
t.Fatal("data mismatch")
} else if info, err := obj.Stat(); err != nil {
t.Fatal(err)
} else if info.ETag != uploadInfo.ETag {
t.Fatal("unexpected ETag:", info.ETag, uploadInfo.ETag)
}

// stat object
info, err := s3.StatObject(context.Background(), bucket, objPath, minio.StatObjectOptions{})
tt.OK(err)
if info.Size != int64(len(data)) {
t.Fatal("size mismatch")
} else if info.ETag != uploadInfo.ETag {
t.Fatal("unexpected ETag:", info.ETag)
}

// add another bucket
Expand Down Expand Up @@ -487,6 +499,13 @@ func TestS3List(t *testing.T) {
if !cmp.Equal(test.want, got) {
t.Errorf("test %d: unexpected response, want %v got %v", i, test.want, got)
}
for _, obj := range result.Contents {
if obj.ETag == "" {
t.Fatal("expected non-empty ETag")
} else if obj.LastModified.IsZero() {
t.Fatal("expected non-zero LastModified")
}
}
}
}

Expand Down Expand Up @@ -580,12 +599,28 @@ func TestS3MultipartUploads(t *testing.T) {
}

// Download object
expectedData := []byte("helloworld!")
downloadedObj, err := s3.GetObject(context.Background(), "multipart", "foo", minio.GetObjectOptions{})
tt.OK(err)
if data, err := io.ReadAll(downloadedObj); err != nil {
t.Fatal(err)
} else if !bytes.Equal(data, []byte("helloworld!")) {
} else if !bytes.Equal(data, expectedData) {
t.Fatal("unexpected data:", string(data))
} else if info, err := downloadedObj.Stat(); err != nil {
t.Fatal(err)
} else if info.ETag != ui.ETag {
t.Fatal("unexpected ETag:", info.ETag)
} else if info.Size != int64(len(expectedData)) {
t.Fatal("unexpected size:", info.Size)
}

// Stat object
if info, err := s3.StatObject(context.Background(), "multipart", "foo", minio.StatObjectOptions{}); err != nil {
t.Fatal(err)
} else if info.ETag != ui.ETag {
t.Fatal("unexpected ETag:", info.ETag)
} else if info.Size != int64(len(expectedData)) {
t.Fatal("unexpected size:", info.Size)
}

// Download again with range request.
Expand Down
17 changes: 0 additions & 17 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -146,22 +145,6 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
28 changes: 21 additions & 7 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -268,7 +269,14 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range
res.Metadata["Content-Type"] = res.ContentType
res.Metadata["Last-Modified"] = res.LastModified

// etag to bytes
etag, err := hex.DecodeString(res.Etag)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}

return &gofakes3.Object{
Hash: etag,
Name: gofakes3.URLEncode(objectName),
Metadata: res.Metadata,
Size: res.Size,
Expand All @@ -287,9 +295,8 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range
// HeadObject should return a NotFound() error if the object does not
// exist.
func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*gofakes3.Object, error) {
res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{
IgnoreDelim: true,
OnlyMetadata: true,
res, err := s.w.HeadObject(ctx, bucketName, objectName, api.HeadObjectOptions{
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
IgnoreDelim: true,
})
if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) {
return nil, gofakes3.KeyNotFound(objectName)
Expand All @@ -299,18 +306,25 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go

// set user metadata
metadata := make(map[string]string)
for k, v := range res.Object.Metadata {
for k, v := range res.Metadata {
metadata[amazonMetadataPrefix+k] = v
}

// decorate metadata
metadata["Content-Type"] = res.Object.MimeType
metadata["Last-Modified"] = res.Object.LastModified()
metadata["Content-Type"] = res.ContentType
metadata["Last-Modified"] = res.LastModified

// etag to bytes
hash, err := hex.DecodeString(res.Etag)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}

return &gofakes3.Object{
Hash: hash,
Name: gofakes3.URLEncode(objectName),
Metadata: metadata,
Size: res.Object.Size,
Size: res.Size,
Contents: io.NopCloser(bytes.NewReader(nil)),
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type bus interface {

type worker interface {
GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error)
HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error)
UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error)
UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error)
}
Expand Down
54 changes: 38 additions & 16 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri
if force {
// delete potentially existing object at destination
if _, err := s.deleteObject(tx, bucket, keyNew); err != nil {
return err
return fmt.Errorf("RenameObject: failed to delete object: %w", err)
}
}
tx = tx.Exec(`UPDATE objects SET object_id = ? WHERE object_id = ? AND ?`, keyNew, keyOld, sqlWhereBucket("objects", bucket))
Expand Down Expand Up @@ -1539,6 +1539,13 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t

func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) {
err = s.retryTransaction(func(tx *gorm.DB) error {
if srcBucket != dstBucket || srcPath != dstPath {
_, err = s.deleteObject(tx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("CopyObject: failed to delete object: %w", err)
}
}

var srcObj dbObject
err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket).
Joins("DBBucket").
Expand All @@ -1565,10 +1572,6 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath
}
return tx.Save(&srcObj).Error
}
_, err = s.deleteObject(tx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}

var srcSlices []dbSlice
err = tx.Where("db_object_id = ?", srcObj.ID).
Expand Down Expand Up @@ -1708,12 +1711,6 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,

// UpdateObject is ACID.
return s.retryTransaction(func(tx *gorm.DB) error {
// Fetch contract set.
var cs dbContractSet
if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil {
return fmt.Errorf("contract set %v not found: %w", contractSet, err)
}

// Try to delete. We want to get rid of the object and its slices if it
// exists.
//
Expand All @@ -1726,22 +1723,24 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
// object's metadata before trying to recreate it
_, err := s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
return fmt.Errorf("UpdateObject: failed to delete object: %w", err)
}

// Insert a new object.
objKey, err := o.Key.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal object key: %w", err)
}
// fetch bucket id
var bucketID uint
err = tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket).
err = s.db.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket).
Take(&bucketID).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("bucket %v not found: %w", bucket, api.ErrBucketNotFound)
} else if err != nil {
return fmt.Errorf("failed to fetch bucket id: %w", err)
}

obj := dbObject{
DBBucketID: bucketID,
ObjectID: path,
Expand All @@ -1755,6 +1754,12 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
return fmt.Errorf("failed to create object: %w", err)
}

// Fetch contract set.
var cs dbContractSet
if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil {
return fmt.Errorf("contract set %v not found: %w", contractSet, err)
}

// Fetch the used contracts.
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
Expand All @@ -1780,7 +1785,10 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error {
var err error
err = s.retryTransaction(func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, key)
return err
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
return nil
})
if err != nil {
return err
Expand Down Expand Up @@ -2722,7 +2730,21 @@ AND slabs.db_buffered_slab_id IS NULL
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) {
tx = tx.Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
// check if the object exists first to avoid unnecessary locking for the
// common case
var objID uint
resp := tx.Model(&dbObject{}).
Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
Select("id").
Limit(1).
Scan(&objID)
if err := resp.Error; err != nil {
return 0, err
} else if resp.RowsAffected == 0 {
return 0, nil
}

tx = tx.Where("id", objID).
Delete(&dbObject{})
if tx.Error != nil {
return 0, tx.Error
Expand Down Expand Up @@ -2872,7 +2894,7 @@ func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sort
}
var rows []rawObjectMetadata
if err := s.db.
Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as mimeType, o.created_at as ModTime").
Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as MimeType, o.created_at as ModTime, o.etag as ETag").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Expand Down
Loading
Loading