Skip to content

Commit

Permalink
Merge branch 'chris/multipart-uploads' into chris/nextcloud-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Sep 19, 2023
2 parents 617dba6 + b159c28 commit 9c8e2a7
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 102 deletions.
15 changes: 7 additions & 8 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,9 @@ type GougingSettings struct {
// Types related to multipart uploads.
type (
MultipartCreateRequest struct {
Bucket string `json:"bucket"`
Path string `json:"path"`
Bucket string `json:"bucket"`
Key object.EncryptionKey `json:"key"`
Path string `json:"path"`
}
MultipartCreateResponse struct {
UploadID string `json:"uploadID"`
Expand All @@ -503,8 +504,6 @@ type (
Path string `json:"path"`
UploadID string `json:"uploadID"`
}
MultipartAbortResponse struct {
}
MultipartCompleteRequest struct {
Bucket string `json:"bucket"`
Path string `json:"path"`
Expand All @@ -513,14 +512,14 @@ type (
}
MultipartCompletedPart struct {
PartNumber int `json:"partNumber"`
ETag string `json:"etag"`
ETag string `json:"eTag"`
}
MultipartCompleteResponse struct {
ETag string `json:"etag"`
ETag string `json:"eTag"`
}
MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
Etag string `json:"etag"`
Etag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
Expand Down Expand Up @@ -559,7 +558,7 @@ type (
MultipartListPartItem struct {
PartNumber int `json:"partNumber"`
LastModified time.Time `json:"lastModified"`
ETag string `json:"etag"`
ETag string `json:"eTag"`
Size int64 `json:"size"`
}
)
Expand Down
17 changes: 8 additions & 9 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ type (
RenameObject(ctx context.Context, bucket, from, to string) error
RenameObjects(ctx context.Context, bucket, from, to string) error

AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (resp api.MultipartAbortResponse, err error)
AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error)
CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error)
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)
CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (api.MultipartCreateResponse, error)
MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
Expand Down Expand Up @@ -1813,7 +1813,7 @@ func (b *bus) multipartHandlerCreatePOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path)
resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.Key)
if jc.Check("failed to create multipart upload", err) != nil {
return
}
Expand All @@ -1825,11 +1825,10 @@ func (b *bus) multipartHandlerAbortPOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID)
err := b.ms.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID)
if jc.Check("failed to abort multipart upload", err) != nil {
return
}
jc.Encode(resp)
}

func (b *bus) multipartHandlerCompletePOST(jc jape.Context) {
Expand Down Expand Up @@ -1875,7 +1874,7 @@ func (b *bus) multipartHandlerListUploadsPOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.ListMultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit)
resp, err := b.ms.MultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.KeyMarker, req.UploadIDMarker, req.Limit)
if jc.Check("failed to list multipart uploads", err) != nil {
return
}
Expand All @@ -1887,7 +1886,7 @@ func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.ListMultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit))
resp, err := b.ms.MultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit))
if jc.Check("failed to list multipart upload parts", err) != nil {
return
}
Expand Down
11 changes: 6 additions & 5 deletions bus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,9 +941,10 @@ func (c *Client) renameObjects(ctx context.Context, bucket, from, to, mode strin
return
}

func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string) (resp api.MultipartCreateResponse, err error) {
func (c *Client) CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (resp api.MultipartCreateResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/create", api.MultipartCreateRequest{
Bucket: bucket,
Key: ec,
Path: path,
}, &resp)
return
Expand All @@ -964,12 +965,12 @@ func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet
return
}

func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (resp api.MultipartAbortResponse, err error) {
func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error) {
err = c.c.WithContext(ctx).POST("/multipart/abort", api.MultipartAbortRequest{
Bucket: bucket,
Path: path,
UploadID: uploadID,
}, &resp)
}, nil)
return
}

Expand All @@ -983,7 +984,7 @@ func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path strin
return
}

func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) {
func (c *Client) MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/listuploads", api.MultipartListUploadsRequest{
Bucket: bucket,
Prefix: prefix,
Expand All @@ -994,7 +995,7 @@ func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMa
return
}

func (c *Client) ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, err error) {
func (c *Client) MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/listparts", api.MultipartListPartsRequest{
Bucket: bucket,
Path: object,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module go.sia.tech/renterd
go 1.20

require (
github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50
github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa
github.com/go-gormigrate/gormigrate/v2 v2.1.0
github.com/google/go-cmp v0.5.9
github.com/gotd/contrib v0.19.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SiaFoundation/gofakes3 v0.0.0-20230918145812-698366c1fd56 h1:Wx1Xp4d5jb2lYgHfPRPeA4PhY9/hwXO3H7557AZB6j8=
github.com/SiaFoundation/gofakes3 v0.0.0-20230918145812-698366c1fd56/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk=
github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50 h1:Z+AQLQgrIlOWihTUP/HIvOSQ0slMsDfAhzaVGydFe5Y=
github.com/SiaFoundation/gofakes3 v0.0.0-20230919075754-db4f3d79eb50/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk=
github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa h1:CMKw/7lpPrLLZaT+sFQysXEE7Zgj8Iqz+dUuaYrh0wg=
github.com/SiaFoundation/gofakes3 v0.0.0-20230919115219-af470842bcfa/go.mod h1:pm3DyXGoeF7/gka6OYDqAW4E5hkcnXm/GfUagzztlxk=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY=
Expand Down
23 changes: 23 additions & 0 deletions internal/testing/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,4 +470,27 @@ func TestS3MultipartUploads(t *testing.T) {
} else if !bytes.Equal(data, []byte("helloworld!")) {
t.Fatal("unexpected data:", string(data))
}

// Start a second multipart upload.
uploadID, err = core.NewMultipartUpload(context.Background(), "multipart", "bar", minio.PutObjectOptions{})
if err != nil {
t.Fatal(err)
}

// Add a part.
putPart(1, []byte("bar"))

// Abort upload
err = core.AbortMultipartUpload(context.Background(), "multipart", "bar", uploadID)
if err != nil {
t.Fatal(err)
}

// List it.
res, err := core.ListMultipartUploads(context.Background(), "multipart", "", "", "", "", 0)
if err != nil {
t.Fatal(err)
} else if len(res.Uploads) != 0 {
t.Fatal("expected 0 uploads")
}
}
12 changes: 8 additions & 4 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/cipher"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -49,9 +50,12 @@ func (k *EncryptionKey) UnmarshalText(b []byte) error {

// Encrypt returns a cipher.StreamReader that encrypts r with k starting at the
// given offset.
func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) cipher.StreamReader {
func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) (cipher.StreamReader, error) {
if offset%64 != 0 {
return cipher.StreamReader{}, errors.New("offset must be a multiple of 64")
}
if k.IsNoopKey() {
return cipher.StreamReader{S: &noOpStream{}, R: r}
return cipher.StreamReader{S: &noOpStream{}, R: r}, nil
}
nonce64 := offset / (64 * math.MaxUint32)
offset %= 64 * math.MaxUint32
Expand All @@ -61,7 +65,7 @@ func (k EncryptionKey) Encrypt(r io.Reader, offset uint64) cipher.StreamReader {
c, _ := chacha20.NewUnauthenticatedCipher(k.entropy[:], nonce)
c.SetCounter(uint32(offset / 64))
rs := &rekeyStream{key: k.entropy[:], c: c}
return cipher.StreamReader{S: rs, R: r}
return cipher.StreamReader{S: rs, R: r}, nil
}

// Decrypt returns a cipher.StreamWriter that decrypts w with k, starting at the
Expand Down Expand Up @@ -119,7 +123,7 @@ func (o Object) TotalSize() int64 {

// Encrypt wraps the given reader with a reader that encrypts the stream using
// the object's key.
func (o Object) Encrypt(r io.Reader, offset uint64) cipher.StreamReader {
func (o Object) Encrypt(r io.Reader, offset uint64) (cipher.StreamReader, error) {
return o.Key.Encrypt(r, offset)
}

Expand Down
39 changes: 38 additions & 1 deletion object/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,54 @@ package object

import (
"bytes"
"io"
"math"
"testing"

"lukechampine.com/frand"
)

func TestEncryptionOffset(t *testing.T) {
key := GenerateEncryptionKey()

encrypt := func(offset uint64, plainText []byte) []byte {
t.Helper()
sr, err := key.Encrypt(bytes.NewReader(plainText), offset)
if err != nil {
t.Fatal(err)
}
ct, err := io.ReadAll(sr)
if err != nil {
t.Fatal(err)
}
return ct
}
decrypt := func(offset uint64, cipherText []byte) []byte {
pt := bytes.NewBuffer(nil)
_, err := key.Decrypt(pt, offset).Write(cipherText)
if err != nil {
t.Fatal(err)
}
return pt.Bytes()
}

data := frand.Bytes(640)
offset := uint64(64)
if !bytes.Equal(data, decrypt(offset, encrypt(offset, data))) {
t.Fatal("mismatch")
} else if bytes.Equal(data, decrypt(offset, encrypt(128, data))) {
t.Fatal("expected mismatch")
}
}

func TestEncryptionOverflow(t *testing.T) {
// Create a random key.
key := GenerateEncryptionKey()
data := frand.Bytes(3 * 64)
sr := key.Encrypt(bytes.NewReader(data), 0)
sr, err := key.Encrypt(bytes.NewReader(data), 0)
if err != nil {
t.Fatal(err)
}

// Check that the streamreader is initialized correctly.
rs := sr.S.(*rekeyStream)
Expand Down
15 changes: 10 additions & 5 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/SiaFoundation/gofakes3"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.uber.org/zap"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -360,8 +361,8 @@ func (s *s3) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta map[st
}, nil
}

func (s *s3) CreateMultipartUpload(bucket, object string, meta map[string]string) (gofakes3.UploadID, error) {
resp, err := s.b.CreateMultipartUpload(context.Background(), bucket, "/"+object)
func (s *s3) CreateMultipartUpload(bucket, key string, meta map[string]string) (gofakes3.UploadID, error) {
resp, err := s.b.CreateMultipartUpload(context.Background(), bucket, "/"+key, object.NoOpKey)
if err != nil {
return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
Expand All @@ -386,7 +387,7 @@ func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMark
} else if marker != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "marker not supported")
}
resp, err := s.b.ListMultipartUploads(context.Background(), bucket, "", "", "", int(limit))
resp, err := s.b.MultipartUploads(context.Background(), bucket, "", "", "", int(limit))
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
Expand Down Expand Up @@ -414,7 +415,7 @@ func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMark
}

func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker int, limit int64) (*gofakes3.ListMultipartUploadPartsResult, error) {
resp, err := s.b.ListMultipartUploadParts(context.Background(), bucket, "/"+object, string(uploadID), marker, limit)
resp, err := s.b.MultipartUploadParts(context.Background(), bucket, "/"+object, string(uploadID), marker, limit)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
Expand All @@ -441,7 +442,11 @@ func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker
}

func (s *s3) AbortMultipartUpload(bucket, object string, id gofakes3.UploadID) error {
return gofakes3.ErrorMessage(gofakes3.ErrNotImplemented, "abort multipart upload not supported")
err := s.b.AbortMultipartUpload(context.Background(), bucket, "/"+object, string(id))
if err != nil {
return gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
return nil
}

func (s *s3) CompleteMultipartUpload(bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (versionID gofakes3.VersionID, etag string, err error) {
Expand Down
7 changes: 4 additions & 3 deletions s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type bus interface {
Object(ctx context.Context, path string, opts ...api.ObjectsOption) (res api.ObjectsResponse, err error)
SearchObjects(ctx context.Context, bucket, key string, offset, limit int) (entries []api.ObjectMetadata, err error)

AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error)
CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error)
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)
CreateMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey) (api.MultipartCreateResponse, error)
MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

UploadParams(ctx context.Context) (api.UploadParams, error)
}
Expand Down
Loading

0 comments on commit 9c8e2a7

Please sign in to comment.