diff --git a/api/bus.go b/api/bus.go index 8b338ff0d..d73fb3094 100644 --- a/api/bus.go +++ b/api/bus.go @@ -492,8 +492,9 @@ type GougingSettings struct { // Types related to multipart uploads. type ( MultipartCreateRequest struct { - Bucket string `json:"bucket"` - Path string `json:"path"` + Bucket string `json:"bucket"` + Key object.EncryptionKey `json:"key"` + Path string `json:"path"` } MultipartCreateResponse struct { UploadID string `json:"uploadID"` @@ -503,8 +504,6 @@ type ( Path string `json:"path"` UploadID string `json:"uploadID"` } - MultipartAbortResponse struct { - } MultipartCompleteRequest struct { Bucket string `json:"bucket"` Path string `json:"path"` @@ -513,14 +512,14 @@ type ( } MultipartCompletedPart struct { PartNumber int `json:"partNumber"` - ETag string `json:"etag"` + ETag string `json:"eTag"` } MultipartCompleteResponse struct { - ETag string `json:"etag"` + ETag string `json:"eTag"` } MultipartAddPartRequest struct { Bucket string `json:"bucket"` - Etag string `json:"etag"` + Etag string `json:"eTag"` Path string `json:"path"` ContractSet string `json:"contractSet"` UploadID string `json:"uploadID"` @@ -559,7 +558,7 @@ type ( MultipartListPartItem struct { PartNumber int `json:"partNumber"` LastModified time.Time `json:"lastModified"` - ETag string `json:"etag"` + ETag string `json:"eTag"` Size int64 `json:"size"` } ) diff --git a/bus/bus.go b/bus/bus.go index 15deadbec..c2441fd8b 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -121,12 +121,12 @@ type ( RenameObject(ctx context.Context, bucket, from, to string) error RenameObjects(ctx context.Context, bucket, from, to string) error - AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (resp api.MultipartAbortResponse, err error) + AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error) AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) - CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) - ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) - ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) + CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (api.MultipartCreateResponse, error) + MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) + MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) @@ -1813,7 +1813,7 @@ func (b *bus) multipartHandlerCreatePOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path) + resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.Key) if jc.Check("failed to create multipart upload", err) != nil { return } @@ -1825,11 +1825,10 @@ func (b *bus) multipartHandlerAbortPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID) + err := b.ms.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID) if jc.Check("failed to abort multipart upload", err) != nil { return } - jc.Encode(resp) } func (b *bus) multipartHandlerCompletePOST(jc jape.Context) { @@ -1875,7 +1874,7 @@ func (b *bus) multipartHandlerListUploadsPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.ListMultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit) + resp, err := b.ms.MultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit) if jc.Check("failed to list multipart uploads", err) != nil { return } @@ -1887,7 +1886,7 @@ func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.ListMultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit)) + resp, err := b.ms.MultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit)) if jc.Check("failed to list multipart upload parts", err) != nil { return } diff --git a/bus/client.go b/bus/client.go index 00e2b3eb7..ad63c6429 100644 --- a/bus/client.go +++ b/bus/client.go @@ -941,9 +941,10 @@ func (c *Client) renameObjects(ctx context.Context, bucket, from, to, mode strin return } -func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string) (resp api.MultipartCreateResponse, err error) { +func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (resp api.MultipartCreateResponse, err error) { err = c.c.WithContext(ctx).POST("/multipart/create", api.MultipartCreateRequest{ Bucket: bucket, + Key: ec, Path: path, }, &resp) return @@ -964,12 +965,12 @@ func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet return } -func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (resp api.MultipartAbortResponse, err error) { +func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error) { err = c.c.WithContext(ctx).POST("/multipart/abort", api.MultipartAbortRequest{ Bucket: bucket, Path: path, UploadID: uploadID, - }, &resp) + }, nil) return } @@ -983,7 +984,7 @@ func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path strin return } -func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) { +func (c *Client) MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) { err = c.c.WithContext(ctx).POST("/multipart/listuploads", api.MultipartListUploadsRequest{ Bucket: bucket, Prefix: prefix, @@ -994,7 +995,7 @@ func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMa return } -func (c *Client) ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, err error) { +func (c *Client) MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, err error) { err = c.c.WithContext(ctx).POST("/multipart/listparts", api.MultipartListPartsRequest{ Bucket: bucket, Path: object, diff --git a/go.mod b/go.mod index d3d6dcdff..a7da981a0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module go.sia.tech/renterd go 1.20 require ( - github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50 + github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa github.com/go-gormigrate/gormigrate/v2 v2.1.0 github.com/google/go-cmp v0.5.9 github.com/gotd/contrib v0.19.0 diff --git a/go.sum b/go.sum index ec30cba36..dfcfceab0 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/SiaFoundation/gofakes3 v0.0.0-20230918145812-698366c1fd56 h1:Wx1Xp4d5jb2lYgHfPRPeA4PhY9/hwXO3H7557AZB6j8= -github.com/SiaFoundation/gofakes3 v0.0.0-20230918145812-698366c1fd56/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk= -github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50 h1:Z+AQLQgrIlOWihTUP/HIvOSQ0slMsDfAhzaVGydFe5Y= -github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk= +github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa h1:CMKw/7lpPrLLZaT+sFQysXEE7Zgj8Iqz+dUuaYrh0wg= +github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= diff --git a/internal/testing/s3_test.go b/internal/testing/s3_test.go index 9876c7049..069b444b6 100644 --- a/internal/testing/s3_test.go +++ b/internal/testing/s3_test.go @@ -470,4 +470,27 @@ func TestS3MultipartUploads(t *testing.T) { } else if !bytes.Equal(data, []byte("helloworld!")) { t.Fatal("unexpected data:", string(data)) } + + // Start a second multipart upload. + uploadID, err = core.NewMultipartUpload(context.Background(), "multipart", "bar", minio.PutObjectOptions{}) + if err != nil { + t.Fatal(err) + } + + // Add a part. + putPart(1, []byte("bar")) + + // Abort upload + err = core.AbortMultipartUpload(context.Background(), "multipart", "bar", uploadID) + if err != nil { + t.Fatal(err) + } + + // List it. + res, err := core.ListMultipartUploads(context.Background(), "multipart", "", "", "", "", 0) + if err != nil { + t.Fatal(err) + } else if len(res.Uploads) != 0 { + t.Fatal("expected 0 uploads") + } } diff --git a/object/object.go b/object/object.go index e701d1bfc..64f2c1ae8 100644 --- a/object/object.go +++ b/object/object.go @@ -5,6 +5,7 @@ import ( "crypto/cipher" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "math" @@ -49,9 +50,12 @@ func (k *EncryptionKey) UnmarshalText(b []byte) error { // Encrypt returns a cipher.StreamReader that encrypts r with k starting at the // given offset. -func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) cipher.StreamReader { +func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) (cipher.StreamReader, error) { + if offset%64 != 0 { + return cipher.StreamReader{}, errors.New("offset must be a multiple of 64") + } if k.IsNoopKey() { - return cipher.StreamReader{S: &noOpStream{}, R: r} + return cipher.StreamReader{S: &noOpStream{}, R: r}, nil } nonce64 := offset / (64 * math.MaxUint32) offset %= 64 * math.MaxUint32 @@ -61,7 +65,7 @@ func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) cipher.StreamReader { c, _ := chacha20.NewUnauthenticatedCipher(k.entropy[:], nonce) c.SetCounter(uint32(offset / 64)) rs := &rekeyStream{key: k.entropy[:], c: c} - return cipher.StreamReader{S: rs, R: r} + return cipher.StreamReader{S: rs, R: r}, nil } // Decrypt returns a cipher.StreamWriter that decrypts w with k, starting at the @@ -119,7 +123,7 @@ func (o Object) TotalSize() int64 { // Encrypt wraps the given reader with a reader that encrypts the stream using // the object's key. -func (o Object) Encrypt(r io.Reader, offset uint64) cipher.StreamReader { +func (o Object) Encrypt(r io.Reader, offset uint64) (cipher.StreamReader, error) { return o.Key.Encrypt(r, offset) } diff --git a/object/object_test.go b/object/object_test.go index c2e99b8b1..01a5aecbe 100644 --- a/object/object_test.go +++ b/object/object_test.go @@ -2,17 +2,54 @@ package object import ( "bytes" + "io" "math" "testing" "lukechampine.com/frand" ) +func TestEncryptionOffset(t *testing.T) { + key := GenerateEncryptionKey() + + encrypt := func(offset uint64, plainText []byte) []byte { + t.Helper() + sr, err := key.Encrypt(bytes.NewReader(plainText), offset) + if err != nil { + t.Fatal(err) + } + ct, err := io.ReadAll(sr) + if err != nil { + t.Fatal(err) + } + return ct + } + decrypt := func(offset uint64, cipherText []byte) []byte { + pt := bytes.NewBuffer(nil) + _, err := key.Decrypt(pt, offset).Write(cipherText) + if err != nil { + t.Fatal(err) + } + return pt.Bytes() + } + + data := frand.Bytes(640) + offset := uint64(64) + if !bytes.Equal(data, decrypt(offset, encrypt(offset, data))) { + t.Fatal("mismatch") + } else if bytes.Equal(data, decrypt(offset, encrypt(128, data))) { + t.Fatal("expected mismatch") + } +} + func TestEncryptionOverflow(t *testing.T) { // Create a random key. key := GenerateEncryptionKey() data := frand.Bytes(3 * 64) - sr := key.Encrypt(bytes.NewReader(data), 0) + sr, err := key.Encrypt(bytes.NewReader(data), 0) + if err != nil { + t.Fatal(err) + } // Check that the streamreader is initialized correctly. rs := sr.S.(*rekeyStream) diff --git a/s3/backend.go b/s3/backend.go index bbd8886a2..a70afe7dc 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -13,6 +13,7 @@ import ( "github.com/SiaFoundation/gofakes3" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" "go.uber.org/zap" "lukechampine.com/frand" ) @@ -360,8 +361,8 @@ func (s *s3) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta map[st }, nil } -func (s *s3) CreateMultipartUpload(bucket, object string, meta map[string]string) (gofakes3.UploadID, error) { - resp, err := s.b.CreateMultipartUpload(context.Background(), bucket, "/"+object) +func (s *s3) CreateMultipartUpload(bucket, key string, meta map[string]string) (gofakes3.UploadID, error) { + resp, err := s.b.CreateMultipartUpload(context.Background(), bucket, "/"+key, object.NoOpKey) if err != nil { return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) } @@ -386,7 +387,7 @@ func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMark } else if marker != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "marker not supported") } - resp, err := s.b.ListMultipartUploads(context.Background(), bucket, "", "", "", int(limit)) + resp, err := s.b.MultipartUploads(context.Background(), bucket, "", "", "", int(limit)) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) } @@ -414,7 +415,7 @@ func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMark } func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker int, limit int64) (*gofakes3.ListMultipartUploadPartsResult, error) { - resp, err := s.b.ListMultipartUploadParts(context.Background(), bucket, "/"+object, string(uploadID), marker, limit) + resp, err := s.b.MultipartUploadParts(context.Background(), bucket, "/"+object, string(uploadID), marker, limit) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) } @@ -441,7 +442,11 @@ func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker } func (s *s3) AbortMultipartUpload(bucket, object string, id gofakes3.UploadID) error { - return gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "abort multipart upload not supported") + err := s.b.AbortMultipartUpload(context.Background(), bucket, "/"+object, string(id)) + if err != nil { + return gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) + } + return nil } func (s *s3) CompleteMultipartUpload(bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (versionID gofakes3.VersionID, etag string, err error) { diff --git a/s3/s3.go b/s3/s3.go index cfab7b9de..f35e0a8bd 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -35,10 +35,11 @@ type bus interface { Object(ctx context.Context, path string, opts ...api.ObjectsOption) (res api.ObjectsResponse, err error) SearchObjects(ctx context.Context, bucket, key string, offset, limit int) (entries []api.ObjectMetadata, err error) + AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) - CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) - ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) - ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) + CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (api.MultipartCreateResponse, error) + MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) + MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) UploadParams(ctx context.Context) (api.UploadParams, error) } diff --git a/stores/multipart.go b/stores/multipart.go index 01d4ef062..23e604a2e 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -45,9 +45,14 @@ func (dbMultipartPart) TableName() string { return "multipart_parts" } -func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error) { +func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (api.MultipartCreateResponse, error) { + // Marshal key + key, err := ec.MarshalText() + if err != nil { + return api.MultipartCreateResponse{}, err + } var uploadID string - err := s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(func(tx *gorm.DB) error { // Get bucket id. var bucketID uint err := tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket). @@ -62,7 +67,7 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin uploadID = hex.EncodeToString(uploadIDEntropy[:]) if err := s.db.Create(&dbMultipartUpload{ DBBucketID: bucketID, - Key: []byte(object.NoOpKey.String()), // TODO: set actual key + Key: key, UploadID: uploadID, ObjectID: path, }).Error; err != nil { @@ -76,7 +81,7 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin } func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error) { - err = s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(func(tx *gorm.DB) error { // Fetch contract set. var cs dbContractSet if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil { @@ -128,12 +133,11 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS } return nil }) - return err } // TODO: f/u with support for 'prefix', 'keyMarker' and 'uploadIDMarker' -func (s *SQLStore) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) { - err := s.retryTransaction(func(tx *gorm.DB) error { +func (s *SQLStore) MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) { + err = s.retryTransaction(func(tx *gorm.DB) error { var dbUploads []dbMultipartUpload err := tx.Limit(int(maxUploads)). Find(&dbUploads). @@ -150,10 +154,10 @@ func (s *SQLStore) ListMultipartUploads(ctx context.Context, bucket, prefix, key } return nil }) - return resp, err + return } -func (s *SQLStore) ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) { +func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) { limitUsed := limit > 0 if !limitUsed { limit = math.MaxInt64 @@ -164,6 +168,7 @@ func (s *SQLStore) ListMultipartUploadParts(ctx context.Context, bucket, object err := s.retryTransaction(func(tx *gorm.DB) error { var dbParts []dbMultipartPart err := tx. + Model(&dbMultipartPart{}). Joins("INNER JOIN multipart_uploads mus ON mus.id = multipart_parts.db_multipart_upload_id"). Joins("INNER JOIN buckets b ON b.name = ? AND b.id = mus.db_bucket_id", bucket). Where("mus.object_id = ? AND mus.upload_id = ? AND part_number > ?", object, uploadID, marker). @@ -193,8 +198,31 @@ func (s *SQLStore) ListMultipartUploadParts(ctx context.Context, bucket, object return resp, err } -func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, object string, uploadID string) (api.MultipartAbortResponse, error) { - panic("not implemented") +func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error { + return s.retryTransaction(func(tx *gorm.DB) error { + // Find multipart upload. + var mu dbMultipartUpload + err := tx.Where("upload_id = ?", uploadID). + Preload("Parts"). + Joins("DBBucket"). + Take(&mu). + Error + if err != nil { + return fmt.Errorf("failed to fetch multipart upload: %w", err) + } + if mu.ObjectID != path { + // Check object id. + return fmt.Errorf("object id mismatch: %v != %v: %w", mu.ObjectID, path, api.ErrObjectNotFound) + } else if mu.DBBucket.Name != bucket { + // Check bucket name. + return fmt.Errorf("bucket name mismatch: %v != %v: %w", mu.DBBucket.Name, bucket, api.ErrBucketNotFound) + } + err = tx.Delete(&mu).Error + if err != nil { + return fmt.Errorf("failed to delete multipart upload: %w", err) + } + return pruneSlabs(tx) + }) } func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) { @@ -286,18 +314,11 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str // respect the part order. sort.Sort(sortedSlices(slices)) - // Marshal key. - // TODO: set actual key - key, err := object.NoOpKey.MarshalText() - if err != nil { - return fmt.Errorf("failed to marshal key: %w", err) - } - // Create the object. obj := dbObject{ DBBucketID: mu.DBBucketID, ObjectID: path, - Key: key, + Key: mu.Key, Size: int64(size), } if err := tx.Create(&obj).Error; err != nil { @@ -321,9 +342,12 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str } return nil }) + if err != nil { + return api.MultipartCompleteResponse{}, err + } return api.MultipartCompleteResponse{ ETag: etag, - }, err + }, nil } type sortedSlices []dbSlice diff --git a/worker/upload.go b/worker/upload.go index 9b1c8751e..cec1115dc 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -310,31 +310,6 @@ func (mgr *uploadManager) Stop() { } } -type etagger struct { - r io.Reader - h *types.Hasher -} - -func newEtagger(r io.Reader) *etagger { - return &etagger{ - r: r, - h: types.NewHasher(), - } -} - -func (e *etagger) Read(p []byte) (int, error) { - n, err := e.r.Read(p) - if _, wErr := e.h.E.Write(p[:n]); wErr != nil { - return 0, wErr - } - return n, err -} - -func (e *etagger) Etag() string { - sum := e.h.Sum() - return hex.EncodeToString(sum[:]) -} - func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.RedundancySettings, contracts []api.ContractMetadata, bh uint64, uploadPacking bool, opts ...UploadOption) (_ object.Object, used map[types.PublicKey]types.FileContractID, partialSlab []byte, etag string, err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) @@ -357,14 +332,17 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund } // wrap the reader to create an etag - tagger := newEtagger(r) + tagger := newHashReader(r) r = tagger // create the object o := object.NewObject(uc.ec) // create the cipher reader - cr := o.Encrypt(r, uc.encryptionOffset) + cr, err := o.Encrypt(r, uc.encryptionOffset) + if err != nil { + return object.Object{}, nil, nil, "", err + } // create the upload u, finishFn, err := mgr.newUpload(ctx, rs.TotalShards, contracts, bh) @@ -1436,3 +1414,28 @@ func (a *dataPoints) tryDecay() { func (sID slabID) String() string { return fmt.Sprintf("%x", sID[:]) } + +type hashReader struct { + r io.Reader + h *types.Hasher +} + +func newHashReader(r io.Reader) *hashReader { + return &hashReader{ + r: r, + h: types.NewHasher(), + } +} + +func (e *hashReader) Read(p []byte) (int, error) { + n, err := e.r.Read(p) + if _, wErr := e.h.E.Write(p[:n]); wErr != nil { + return 0, wErr + } + return n, err +} + +func (e *hashReader) Etag() string { + sum := e.h.Sum() + return hex.EncodeToString(sum[:]) +} diff --git a/worker/worker.go b/worker/worker.go index 0048baa69..c80587d72 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1132,7 +1132,7 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - // update uploader contracts + // fetch contract set contracts contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet) if jc.Check("couldn't fetch contracts from bus", err) != nil { return @@ -1199,6 +1199,19 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } + // cancel the upload if no contract set is specified + if up.ContractSet == "" { + jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) + return + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) + jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) + return + } + // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { @@ -1222,24 +1235,12 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } + // decode the part number var partNumber int if jc.DecodeForm("partnumber", &partNumber) != nil { return } - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // allow overriding the redundancy settings rs := up.RedundancySettings if jc.DecodeForm("minshards", &rs.MinShards) != nil { @@ -1280,7 +1281,7 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { // attach gouging checker to the context ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - // update uploader contracts + // fetch contract set contracts contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet) if jc.Check("couldn't fetch contracts from bus", err) != nil { return